# Le basi del Dataframe
In questo notebook impareremo ad utilizzare il modulo SparkSQL insieme al Dataframe per processare dati strutturati in parallelo. Il Dataframe è un'astrazione del RDD che permette di organizzare i dati in colonne ed eseguire operazioni su di esse utilizzando l'ottimizzazione del modulo SparkSQL.

## Inizializzazione di Spark
Vediamo un modo differente per inizializzare Spark: la SparkSession, che ci permette di creare con un unica funzione il contesto per Spark (SparkContext) ed il contesto per SQL (SQLContext).

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('basic').getOrCreate()

## Creare un Dataframe
Possiamo creare un nuovo Dataframe usando il metodo *.createDataFrame(data, names)* dell'oggetto SparkSession, questo metodo ha bisogno di due parametri:

* Una lista di tuple, in cui ogni tupla corrisponde ad una riga del Dataframe.
* Una lista con i nomi per le colonne

Creiamo un Dataframe contenente, nome, sesso, età, altezza e peso di 5 persone.

In [3]:
from pyspark.sql.types import *

data = [("Giuseppe", "M", 23, 174, 70.5),
        ("Antonio", "M", 25, 179, 68.),
        ("Lorenzo", "M", 33, 172, 88.5),
        ("Luisa", "F", 48, 155, 50.2),
        ("Margheria", "F", 35, 165, 54.3)]

df = spark.createDataFrame(data, ["name", "gender", "age", "height","weight"])

Per visualizzare il Dataframe posssiamo usare il metodo *.show()*

In [4]:
df.show()

+---------+------+---+------+------+
|     name|gender|age|height|weight|
+---------+------+---+------+------+
| Giuseppe|     M| 23|   174|  70.5|
|  Antonio|     M| 25|   179|  68.0|
|  Lorenzo|     M| 33|   172|  88.5|
|    Luisa|     F| 48|   155|  50.2|
|Margheria|     F| 35|   165|  54.3|
+---------+------+---+------+------+



*.show()* stamperà fino ad un massimo di 20 righe, se vogliamo stampare un numero esatto di righe basta passare il numero come parametro del metodo.

In [5]:
df.show(3)

+--------+------+---+------+------+
|    name|gender|age|height|weight|
+--------+------+---+------+------+
|Giuseppe|     M| 23|   174|  70.5|
| Antonio|     M| 25|   179|  68.0|
| Lorenzo|     M| 33|   172|  88.5|
+--------+------+---+------+------+
only showing top 3 rows



Per conoscere il numero esatto di righe presenti all'interno del Dataframe possiamo usare il metodo *.count()*

In [6]:
df.count()

5

Per stampare i nomi delle colonne possiamo usare l'attributo *.columns*, che ritornerà una lista con i nomi.

In [7]:
df.columns

['name', 'gender', 'age', 'height', 'weight']

Per stampare lo schema del Dataframe, cioè le informazioni legate ad ogni attributo (nome, tipo, se può essere null), possiamo usare il metodo *.printSchema()*.

In [8]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: long (nullable = true)
 |-- height: long (nullable = true)
 |-- weight: double (nullable = true)



Infinie possiamo visualizzare una serie di informazioni statistiche (count, valore medio, deviazione standard, valore minimo e massimo) usando il metodo *.describe()*, il quale ci ritornerà un Dataframe contentente queste informazioni, che possiamo quindi visualizzare usando il metodo *.show()*.

In [9]:
df.describe().show()

+-------+---------+------+-----------------+-----------------+-----------------+
|summary|     name|gender|              age|           height|           weight|
+-------+---------+------+-----------------+-----------------+-----------------+
|  count|        5|     5|                5|                5|                5|
|   mean|     null|  null|             32.8|            169.0|             66.3|
| stddev|     null|  null|9.909591313469994|9.300537618869138|15.13753612712452|
|    min|  Antonio|     F|               23|              155|             50.2|
|    max|Margheria|     M|               48|              179|             88.5|
+-------+---------+------+-----------------+-----------------+-----------------+



## Modificare lo schema
Come vedi lo schema del Dataframe è stato estratto direttamente dai dati, ma se volessimo definirlo noi ? Ad esempio, mettiamo caso di voler cambiare il tipo per le seguenti colonne:
* **age**: da long a intero.
* **height**: da long a intero.
* **weight**: da double a float.

Possiamo farlo creando uno schema per poi passarlo al metodo *.createDataFrame(data, schema)*.
Vediamo come fare, per prima cosa creiamo i campi dello schema, utilizzando la classe *StructField*, a cui passeremo questi parametri:
* nome della colonna
* tipo della colonna (deve essere l'istanza di una classe reperibile dal modulo pyspark.sql.types), a [questo link](https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/types/DataTypes.html) puoi trovare la classe per ogni tipo di dato.
* un valore booleano che indica se tale valore può essere lasciato vuoto (null) oppure no.

Infine per costruire lo schema utilizziamo la classe *StructType*, passando la lista di campi che abbiamo definitp sopra al parametro fields.

In [10]:
from pyspark.sql.types import *

data_schema = [StructField('name', StringType(), True),
                StructField('gender', StringType(), True),
                StructField('age', IntegerType(), True),
                StructField('height', IntegerType(), True),
                StructField('weight', FloatType(), True)]
            
schema = StructType(fields=data_schema)

Adesso creiamo il Dataframe, passando i dati e lo schema all'interno del parametro schema.

In [11]:
df = spark.createDataFrame(data, schema=schema)
df.show()

+---------+------+---+------+------+
|     name|gender|age|height|weight|
+---------+------+---+------+------+
| Giuseppe|     M| 23|   174|  70.5|
|  Antonio|     M| 25|   179|  68.0|
|  Lorenzo|     M| 33|   172|  88.5|
|    Luisa|     F| 48|   155|  50.2|
|Margheria|     F| 35|   165|  54.3|
+---------+------+---+------+------+



Il risultato sembrerebbe essere lo stesso di prima, ma vediamo lo schema.

In [12]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: float (nullable = true)



Adesso i tipi sono quelli definiti da noi :).

## Righe e Colonne
Possiamo ottenere n righe del Dataframe utilizzando il metodo *.head(n)* il risultato sarà un lista di oggetti Row.

In [13]:
df.head(5)

[Row(name='Giuseppe', gender='M', age=23, height=174, weight=70.5),
 Row(name='Antonio', gender='M', age=25, height=179, weight=68.0),
 Row(name='Lorenzo', gender='M', age=33, height=172, weight=88.5),
 Row(name='Luisa', gender='F', age=48, height=155, weight=50.20000076293945),
 Row(name='Margheria', gender='F', age=35, height=165, weight=54.29999923706055)]

Per selezionare solo una colonna del Dataframe possiamo usare l'indice...

In [14]:
df[0]

Column<b'name'>

...oppure il nome, in ogni caso il risultato sarà un oggetto Column.

In [15]:
df["name"]

Column<b'name'>

Raramente ti capiterà di lavorare direttamente con righe e colonne, piuttosto per selezionare soltanto una colonna puoi usare il metodo *.select(name)*, che ti ritonernà un Dataframe contenente soltanto la colonna selezionata.

In [16]:
dfName = df.select("name")
dfName.show()

+---------+
|     name|
+---------+
| Giuseppe|
|  Antonio|
|  Lorenzo|
|    Luisa|
|Margheria|
+---------+



Possiamo usare lo stesso metodo per selezionare più colonne, passando una lista di nomi come parametro.

In [17]:
df.select(["name","age"]).show()

+---------+---+
|     name|age|
+---------+---+
| Giuseppe| 23|
|  Antonio| 25|
|  Lorenzo| 33|
|    Luisa| 48|
|Margheria| 35|
+---------+---+



## Creare e modificare colonne
Possiamo modificare una determinata colonna utilizzando il metodo *.withColumn(name, column)*. alla quale dovremo passare il nome della riga che dovremmo modificare e un oggetto colonna con il nuovo valore. Ad esempio modifichiamo la colonna height per convertire i centimentri in metr.

In [18]:
df = df.withColumn("height", df["height"]/100)
df.show()

+---------+------+---+------+------+
|     name|gender|age|height|weight|
+---------+------+---+------+------+
| Giuseppe|     M| 23|  1.74|  70.5|
|  Antonio|     M| 25|  1.79|  68.0|
|  Lorenzo|     M| 33|  1.72|  88.5|
|    Luisa|     F| 48|  1.55|  50.2|
|Margheria|     F| 35|  1.65|  54.3|
+---------+------+---+------+------+



Stampiamo lo schema.

In [19]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- height: double (nullable = true)
 |-- weight: float (nullable = true)



Come vedi lo schema è stato modificato automaticamente, dato che abbiamo convertito l'altezza da numeri interi a numeri con la virgola. Se la colonna che stiamo tentando di modificare non esiste, verrà creata. Ad esempio proviamo a creare una colonna contenente l'indice di massa corporea (bmi) di ogni persona, calcolandolo come:
<br><br>
$$ bmi = (\frac{weight}{height})^2 $$
<br>

In [20]:
bmi = df["weight"]/(df["height"]**2)
df = df.withColumn("bmi", bmi)
df.show()

+---------+------+---+------+------+------------------+
|     name|gender|age|height|weight|               bmi|
+---------+------+---+------+------+------------------+
| Giuseppe|     M| 23|  1.74|  70.5|23.285770907649624|
|  Antonio|     M| 25|  1.79|  68.0| 21.22280827689523|
|  Lorenzo|     M| 33|  1.72|  88.5| 29.91481882098432|
|    Luisa|     F| 48|  1.55|  50.2| 20.89490146220164|
|Margheria|     F| 35|  1.65|  54.3|19.944903301032344|
+---------+------+---+------+------+------------------+



I valori del bmi contengono molte cifre dopo la virgola, meglio approssimarle, possiamo farlo utilizzando la funzione *round(col, val)* di spark.

In [21]:
from pyspark.sql.functions import round

df = df.withColumn("bmi", round(df["bmi"], 2))
df.show()

+---------+------+---+------+------+-----+
|     name|gender|age|height|weight|  bmi|
+---------+------+---+------+------+-----+
| Giuseppe|     M| 23|  1.74|  70.5|23.29|
|  Antonio|     M| 25|  1.79|  68.0|21.22|
|  Lorenzo|     M| 33|  1.72|  88.5|29.91|
|    Luisa|     F| 48|  1.55|  50.2|20.89|
|Margheria|     F| 35|  1.65|  54.3|19.94|
+---------+------+---+------+------+-----+



Molto meglio ! Un'altra funzione utile è *when(condition, value)*, che ci permette di modificare le colonne in base a determinate condizioni. Ad esempio creiamo una nuova colonna chiamata *is_fat* che conterrà un valore true se la persona è in sovrappeso (bmi>25), False altrimenti. Possiamo gestire 'l'altrimenti' utilizzando la funzione *otherwise(value)*.

In [22]:
from pyspark.sql.functions import col, when

df = df.withColumn("is_fat", when(col("bmi")>25, True).otherwise(False))
df.show()

+---------+------+---+------+------+-----+------+
|     name|gender|age|height|weight|  bmi|is_fat|
+---------+------+---+------+------+-----+------+
| Giuseppe|     M| 23|  1.74|  70.5|23.29| false|
|  Antonio|     M| 25|  1.79|  68.0|21.22| false|
|  Lorenzo|     M| 33|  1.72|  88.5|29.91|  true|
|    Luisa|     F| 48|  1.55|  50.2|20.89| false|
|Margheria|     F| 35|  1.65|  54.3|19.94| false|
+---------+------+---+------+------+-----+------+



Ottimo ! Per finire rinominiamo la colonna *gender* in *sex*, possiamo farlo tramite il metodo *.withColumnRenamed(old_name, new_name)*.

In [23]:
df = df.withColumnRenamed("gender","sex")
df.show()

+---------+---+---+------+------+-----+------+
|     name|sex|age|height|weight|  bmi|is_fat|
+---------+---+---+------+------+-----+------+
| Giuseppe|  M| 23|  1.74|  70.5|23.29| false|
|  Antonio|  M| 25|  1.79|  68.0|21.22| false|
|  Lorenzo|  M| 33|  1.72|  88.5|29.91|  true|
|    Luisa|  F| 48|  1.55|  50.2|20.89| false|
|Margheria|  F| 35|  1.65|  54.3|19.94| false|
+---------+---+---+------+------+-----+------+



## Filtri
Possiamo filtrare le righe del Dataframe utilizzando il metodo *.filter(condition)*. Ad esempio filtriamo soltanto gli uomini, possiamo scrivere la condizione come una stringa...

In [24]:
df_male = df.filter("sex == 'M'")
df_male.show()

+--------+---+---+------+------+-----+------+
|    name|sex|age|height|weight|  bmi|is_fat|
+--------+---+---+------+------+-----+------+
|Giuseppe|  M| 23|  1.74|  70.5|23.29| false|
| Antonio|  M| 25|  1.79|  68.0|21.22| false|
| Lorenzo|  M| 33|  1.72|  88.5|29.91|  true|
+--------+---+---+------+------+-----+------+



...oppure usando la colonna, il risultato sarà lo stesso.

In [25]:
df_male = df.filter(df["sex"] == 'M')
df_male.show()

+--------+---+---+------+------+-----+------+
|    name|sex|age|height|weight|  bmi|is_fat|
+--------+---+---+------+------+-----+------+
|Giuseppe|  M| 23|  1.74|  70.5|23.29| false|
| Antonio|  M| 25|  1.79|  68.0|21.22| false|
| Lorenzo|  M| 33|  1.72|  88.5|29.91|  true|
+--------+---+---+------+------+-----+------+



## Aggregazione
Possiamo aggregare un Dataframe in base al contenuto di una colonna usando il metodo *.groupBy(col)*. Ad esempio aggreghiamo il Dataframe in base al sesso.

In [26]:
df_group = df.groupBy('sex')
type(df_group)

pyspark.sql.group.GroupedData

Il risultato sarà un'oggetto GroupedData, sulla quale possiamo eseguire diverse operazioni aritmetiche e statistiche, come conteggio

In [27]:
df_group.count().show()

+---+-----+
|sex|count|
+---+-----+
|  F|    2|
|  M|    3|
+---+-----+



media

In [28]:
df_group.mean().show()

+---+--------+-----------+-----------------+-----------------+
|sex|avg(age)|avg(height)|      avg(weight)|         avg(bmi)|
+---+--------+-----------+-----------------+-----------------+
|  F|    41.5|        1.6|            52.25|           20.415|
|  M|    27.0|       1.75|75.66666666666667|24.80666666666667|
+---+--------+-----------+-----------------+-----------------+



somma

In [29]:
df_group.sum().show()

+---+--------+-----------+-----------+--------+
|sex|sum(age)|sum(height)|sum(weight)|sum(bmi)|
+---+--------+-----------+-----------+--------+
|  F|      83|        3.2|      104.5|   40.83|
|  M|      81|       5.25|      227.0|   74.42|
+---+--------+-----------+-----------+--------+



valore massimo

In [30]:
df_group.max().show()

+---+--------+-----------+-----------+--------+
|sex|max(age)|max(height)|max(weight)|max(bmi)|
+---+--------+-----------+-----------+--------+
|  F|      48|       1.65|       54.3|   20.89|
|  M|      33|       1.79|       88.5|   29.91|
+---+--------+-----------+-----------+--------+



e valore minimo

In [31]:
df_group.min().show()

+---+--------+-----------+-----------+--------+
|sex|min(age)|min(height)|min(weight)|min(bmi)|
+---+--------+-----------+-----------+--------+
|  F|      35|       1.55|       50.2|   19.94|
|  M|      23|       1.72|       68.0|   21.22|
+---+--------+-----------+-----------+--------+



Possiamo anche operare su singole colonne usando il metodo *.agg(op)* del Dataframe, che prende come parametro un dizionario contenente nome della colonna e operazione da eseguire.

In [32]:
df.agg({'weight':'sum'}).show()

+-----------+
|sum(weight)|
+-----------+
|      331.5|
+-----------+



Possiamo utilizzare questo stesso metodo per eseguire operazioni differenti su colonne differenti.

In [33]:
df_group.agg({'weight':'sum', 'height':'max', 'sex':'count'}).show()

+---+----------+-----------+-----------+
|sex|count(sex)|sum(weight)|max(height)|
+---+----------+-----------+-----------+
|  F|         2|      104.5|       1.65|
|  M|         3|      227.0|       1.79|
+---+----------+-----------+-----------+



Come vedi il nome delle nuove colonne viene assegnato automaticamente, in base alla funzione ed alla colonna che abbiamo utilizzato, possiamo modificare tali nomi usando il metodo *.withColumnRenamed(old_name, new_name)*.

In [34]:
df_group.agg({'weight':'sum', 'height':'max', 'sex':'count'}).withColumnRenamed("count(sex)","count_sex").withColumnRenamed("sum(weight)","sum_weight").withColumnRenamed("max(height)","max_height").show()

+---+---------+----------+----------+
|sex|count_sex|sum_weight|max_height|
+---+---------+----------+----------+
|  F|        2|     104.5|      1.65|
|  M|        3|     227.0|      1.79|
+---+---------+----------+----------+



Piuttosto che un dizionario, possiamo anche utilizzare delle funzioni.

In [35]:
from pyspark.sql.functions import sum, max, count

df_group.agg(sum("weight"), max('height'), count('sex')).show()

+---+-----------+-----------+----------+
|sex|sum(weight)|max(height)|count(sex)|
+---+-----------+-----------+----------+
|  F|      104.5|       1.65|         2|
|  M|      227.0|       1.79|         3|
+---+-----------+-----------+----------+



In questo caso per settare arbitrariamente i nomi delle colonne possiamo creare un alias

In [36]:
from pyspark.sql.functions import sum, max, count

df_group.agg(sum("weight").alias("sum_weight"), max('height').alias("max_height"), count('sex').alias("count_sex")).show()

+---+----------+----------+---------+
|sex|sum_weight|max_height|count_sex|
+---+----------+----------+---------+
|  F|     104.5|      1.65|        2|
|  M|     227.0|      1.79|        3|
+---+----------+----------+---------+



## Ordinamento

Per ordinare un Dataframe possiamo utilizzare il metodo *.orderBy(col)*, ad esempio ordiniamo in base al peso.

In [37]:
df.orderBy("weight").show()

+---------+---+---+------+------+-----+------+
|     name|sex|age|height|weight|  bmi|is_fat|
+---------+---+---+------+------+-----+------+
|    Luisa|  F| 48|  1.55|  50.2|20.89| false|
|Margheria|  F| 35|  1.65|  54.3|19.94| false|
|  Antonio|  M| 25|  1.79|  68.0|21.22| false|
| Giuseppe|  M| 23|  1.74|  70.5|23.29| false|
|  Lorenzo|  M| 33|  1.72|  88.5|29.91|  true|
+---------+---+---+------+------+-----+------+



Di default l'ordinamento viene eseguito in maniera ascendente (dal valore minore al valore maggiore), per eseguirlo in maniera discendente ci basta impostare il parametro *ascending* a False.

In [38]:
df.orderBy("weight", ascending=False).show()

+---------+---+---+------+------+-----+------+
|     name|sex|age|height|weight|  bmi|is_fat|
+---------+---+---+------+------+-----+------+
|  Lorenzo|  M| 33|  1.72|  88.5|29.91|  true|
| Giuseppe|  M| 23|  1.74|  70.5|23.29| false|
|  Antonio|  M| 25|  1.79|  68.0|21.22| false|
|Margheria|  F| 35|  1.65|  54.3|19.94| false|
|    Luisa|  F| 48|  1.55|  50.2|20.89| false|
+---------+---+---+------+------+-----+------+



## Link utili e approfondimenti
* [Documentazione del modulo SQL e del DataFrame di pySpark](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html)
* [Differenza tra SparkContext e SparkSession (attezione: esempi in Scala)](https://medium.com/knoldus/spark-why-should-we-use-sparksession-fdebe864d895)
