# PySpark

* [Creacción SparkSession](#chapter1)
* [Creacción DataFrame](#chapter2)
* [Operaciones básicas](#chapter3)
    * [Select](#sec3_1)
    * [Filter](#sec3_2)
    * [Aggregations](#sec3_3)
    * [Sort](#sec3_4)
* [DateTime](#chapter4)
* [Funciones](#chapter5)
* [Valores nulos](#chapter6)

In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as f
import pandas as pd

## Creacción SparkSession <a class="anchor" id="chapter1"></a>

SparkSession es el punto de entrada a PySpark, internamente crea SparkContext que es el motor interno que permite las conexiones con los clusters.

In [2]:
spark = SparkSession.builder\
        .master("local[8]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

## Creacción DataFrame <a class="anchor" id="chapter2"></a>

In [173]:
sc = spark.sparkContext
  
pokedex = [("Bulbasaur","Grass",1,0.7),
           ("Ivysaur","Grass",2,1.0),
           ("Venusaur","Grass",3,2.0),
           ("Lugia","Psychic", 4, 5.2),
           ("Charmeleon","Fire",5,1.1),
           ("Charizard","Fire",6,1.7),
           ("Wartortle","Water",8,1.0),
           ("Blastoise","Water",9,1.6)]
 
schema = ["Name","PrimaryType","Index","Height"]
df = spark.createDataFrame(data=pokedex, schema = schema)

In [174]:
df.show()

+----------+-----------+-----+------+
|      Name|PrimaryType|Index|Height|
+----------+-----------+-----+------+
| Bulbasaur|      Grass|    1|   0.7|
|   Ivysaur|      Grass|    2|   1.0|
|  Venusaur|      Grass|    3|   2.0|
|     Lugia|    Psychic|    4|   5.2|
|Charmeleon|       Fire|    5|   1.1|
| Charizard|       Fire|    6|   1.7|
| Wartortle|      Water|    8|   1.0|
| Blastoise|      Water|    9|   1.6|
+----------+-----------+-----+------+



In [172]:
df.show(n=4, truncate=False, vertical=True)

-RECORD 0----------------
 Name        | Bulbasaur 
 PrimaryType | Grass     
 id          | 1         
 Height      | 0.7       
-RECORD 1----------------
 Name        | Ivysaur   
 PrimaryType | Grass     
 id          | 2         
 Height      | 1.0       
-RECORD 2----------------
 Name        | Venusaur  
 PrimaryType | Grass     
 id          | 3         
 Height      | 2.0       
-RECORD 3----------------
 Name        | Lugia     
 PrimaryType | Psychic   
 id          | 4         
 Height      | 5.2       
only showing top 4 rows



## Operaciones básicas <a class="anchor" id="chapter3"></a>

**count**: cuenta el número de filas

In [145]:
df.count()

8

**columns**: nombre de las columnas

In [146]:
df.columns

['Name', 'PrimaryType', 'Index', 'Height']

**dtypes**: muestra el DateType de las columnas

In [147]:
df.dtypes

[('Name', 'string'),
 ('PrimaryType', 'string'),
 ('Index', 'bigint'),
 ('Height', 'double')]

**printSchema**: muestra un esquema del DataFrame

In [148]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- PrimaryType: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Height: double (nullable = true)



**withColumn**: toma dos parametros (nombre_columna, datos) y crea una nueva columna. `lit()` asigna una constante o valor literal.

In [149]:
df = df.withColumn("Game", lit("Pokemon"))
df.show()

+----------+-----------+-----+------+-------+
|      Name|PrimaryType|Index|Height|   Game|
+----------+-----------+-----+------+-------+
| Bulbasaur|      Grass|    1|   0.7|Pokemon|
|   Ivysaur|      Grass|    2|   1.0|Pokemon|
|  Venusaur|      Grass|    3|   2.0|Pokemon|
|     Lugia|    Psychic|    4|   5.2|Pokemon|
|Charmeleon|       Fire|    5|   1.1|Pokemon|
| Charizard|       Fire|    6|   1.7|Pokemon|
| Wartortle|      Water|    8|   1.0|Pokemon|
| Blastoise|      Water|    9|   1.6|Pokemon|
+----------+-----------+-----+------+-------+



Se pueden crear varias columnas a la vez y de dos métodos distintos:

In [150]:
# df.withColumn("Game",lit("Pokemon")).withColumn("Generation",lit(1))
# df.select(col("Name"),col("PrimaryType"),col("Index"),lit("Pokemon").alias("Game"),lit(1).alias("Generation")) 

**withColumnRenamed**: toma dos parametros (nombre_antiguo, nombre_nuevo) y cambia el nombre de las columnas.

In [180]:
df = df.withColumnRenamed('Index', 'id')
df.columns

['Name', 'PrimaryType', 'id', 'Height']

**drop**: elimina una columna

In [152]:
df = df.drop('Game')
df.show()

+----------+-----------+---+------+
|      Name|PrimaryType| id|Height|
+----------+-----------+---+------+
| Bulbasaur|      Grass|  1|   0.7|
|   Ivysaur|      Grass|  2|   1.0|
|  Venusaur|      Grass|  3|   2.0|
|     Lugia|    Psychic|  4|   5.2|
|Charmeleon|       Fire|  5|   1.1|
| Charizard|       Fire|  6|   1.7|
| Wartortle|      Water|  8|   1.0|
| Blastoise|      Water|  9|   1.6|
+----------+-----------+---+------+



### Select <a class="anchor" id="sec3_1"></a>

In [153]:
df.select("id", "name").show()

+---+----------+
| id|      name|
+---+----------+
|  1| Bulbasaur|
|  2|   Ivysaur|
|  3|  Venusaur|
|  4|     Lugia|
|  5|Charmeleon|
|  6| Charizard|
|  8| Wartortle|
|  9| Blastoise|
+---+----------+



### Filter <a class="anchor" id="sec3_2"></a>
Filtra las filas en base a determinada condición.

In [154]:
df.filter(df["id"] == 1).show()

    # Todas las siguientes ejecucciones tienen el mismo resultado
#df.filter(df.id == 1).show()
#df.filter(col("id") == 1).show()
#df.filter("id = 1").show()

+---------+-----------+---+------+
|     Name|PrimaryType| id|Height|
+---------+-----------+---+------+
|Bulbasaur|      Grass|  1|   0.7|
+---------+-----------+---+------+



A continuación vemos `between`, `when`, `rlike`

In [155]:
df.filter(df.id.between(2, 5)).show()

+----------+-----------+---+------+
|      Name|PrimaryType| id|Height|
+----------+-----------+---+------+
|   Ivysaur|      Grass|  2|   1.0|
|  Venusaur|      Grass|  3|   2.0|
|     Lugia|    Psychic|  4|   5.2|
|Charmeleon|       Fire|  5|   1.1|
+----------+-----------+---+------+



In [157]:
df.select('Name', 'Height', f.when(df.Height >= 1.5, 1).otherwise(0)).show()

+----------+------+-------------------------------------------+
|      Name|Height|CASE WHEN (Height >= 1.5) THEN 1 ELSE 0 END|
+----------+------+-------------------------------------------+
| Bulbasaur|   0.7|                                          0|
|   Ivysaur|   1.0|                                          0|
|  Venusaur|   2.0|                                          1|
|     Lugia|   5.2|                                          1|
|Charmeleon|   1.1|                                          0|
| Charizard|   1.7|                                          1|
| Wartortle|   1.0|                                          0|
| Blastoise|   1.6|                                          1|
+----------+------+-------------------------------------------+



In [169]:
df.select('Name','PrimaryType', df.PrimaryType.rlike('^G').alias('Tipo empieza por G')).show()

+----------+-----------+------------------+
|      Name|PrimaryType|Tipo empieza por G|
+----------+-----------+------------------+
| Bulbasaur|      Grass|              true|
|   Ivysaur|      Grass|              true|
|  Venusaur|      Grass|              true|
|     Lugia|    Psychic|             false|
|Charmeleon|       Fire|             false|
| Charizard|       Fire|             false|
| Wartortle|      Water|             false|
| Blastoise|      Water|             false|
+----------+-----------+------------------+



### Aggregations <a class="anchor" id="sec3_3"></a>

In [190]:
df.groupBy('PrimaryType').agg(
    count("Name").alias("Total"),
    sum("Height").alias("Total_height")
    ).show()

+-----------+-----+------------+
|PrimaryType|Total|Total_height|
+-----------+-----+------------+
|      Water|    2|         2.6|
|    Psychic|    1|         5.2|
|       Fire|    2|         2.8|
|      Grass|    3|         3.7|
+-----------+-----+------------+



### Sort <a class="anchor" id="sec3_4"></a>

In [196]:
df.sort("Height").show()

+----------+-----------+---+------+
|      Name|PrimaryType| id|Height|
+----------+-----------+---+------+
| Bulbasaur|      Grass|  1|   0.7|
| Wartortle|      Water|  8|   1.0|
|   Ivysaur|      Grass|  2|   1.0|
|Charmeleon|       Fire|  5|   1.1|
| Blastoise|      Water|  9|   1.6|
| Charizard|       Fire|  6|   1.7|
|  Venusaur|      Grass|  3|   2.0|
|     Lugia|    Psychic|  4|   5.2|
+----------+-----------+---+------+



## DateTime <a class="anchor" id="chapter4"></a>

In [44]:
emp = [(1, "AAA", "dept1", 1000, "2019-02-01 15:12:13"),
    (2, "BBB", "dept1", 1100, "2018-04-01 5:12:3"),
    (3, "CCC", "dept1", 3000, "2017-06-05 1:2:13"),
    (4, "DDD", "dept1", 1500, "2019-08-10 10:52:53"),
    (5, "EEE", "dept2", 8000, "2016-01-11 5:52:43"),
    (6, "FFF", "dept2", 7200, "2015-04-14 19:32:33"),
    (7, "GGG", "dept3", 7100, "2019-02-21 15:42:43"),
    (8, "HHH", "dept3", 3700, "2016-09-25 15:32:33"),
    (9, "III", "dept3", 4500, "2017-10-15 15:22:23"),
    (10, "JJJ", "dept5", 3400, "2018-12-17 15:14:17")]
empdf = spark.createDataFrame(emp, ["id", "name", "dept", "salary", "date"])
empdf.show()

+---+----+-----+------+-------------------+
| id|name| dept|salary|               date|
+---+----+-----+------+-------------------+
|  1| AAA|dept1|  1000|2019-02-01 15:12:13|
|  2| BBB|dept1|  1100|  2018-04-01 5:12:3|
|  3| CCC|dept1|  3000|  2017-06-05 1:2:13|
|  4| DDD|dept1|  1500|2019-08-10 10:52:53|
|  5| EEE|dept2|  8000| 2016-01-11 5:52:43|
|  6| FFF|dept2|  7200|2015-04-14 19:32:33|
|  7| GGG|dept3|  7100|2019-02-21 15:42:43|
|  8| HHH|dept3|  3700|2016-09-25 15:32:33|
|  9| III|dept3|  4500|2017-10-15 15:22:23|
| 10| JJJ|dept5|  3400|2018-12-17 15:14:17|
+---+----+-----+------+-------------------+



`add_months`: añadimos una columna llamada <i>next_month</i> generada a partir de añadir un mes a <i>date</i>.

In [48]:
df = (empdf
    .select("date")
    .withColumn("next_month", add_months("date", 1)))
df.show()

+-------------------+----------+
|               date|next_month|
+-------------------+----------+
|2019-02-01 15:12:13|2019-03-01|
|  2018-04-01 5:12:3|2018-05-01|
|  2017-06-05 1:2:13|2017-07-05|
|2019-08-10 10:52:53|2019-09-10|
| 2016-01-11 5:52:43|2016-02-11|
|2015-04-14 19:32:33|2015-05-14|
|2019-02-21 15:42:43|2019-03-21|
|2016-09-25 15:32:33|2016-10-25|
|2017-10-15 15:22:23|2017-11-15|
|2018-12-17 15:14:17|2019-01-17|
+-------------------+----------+



`date_add`: añadimos 5 días a la fecha leida. `date_sub` sería lo contrario

In [58]:
df = (empdf
    .select("date")
    .withColumn("next_date", date_add("date", 5)))
df.show(2)

+-------------------+----------+
|               date| next_date|
+-------------------+----------+
|2019-02-01 15:12:13|2019-02-06|
|  2018-04-01 5:12:3|2018-04-06|
+-------------------+----------+
only showing top 2 rows



`current_date` y `current_timestamp`: fecha actual

In [57]:
df = (empdf
    .withColumn("current_date", current_date())
    .withColumn("current_timestamp", current_timestamp())  
    .select("id", "current_date","current_timestamp"))
df.show(2, truncate=False)

+---+------------+--------------------------+
|id |current_date|current_timestamp         |
+---+------------+--------------------------+
|1  |2021-04-05  |2021-04-05 08:04:13.670002|
|2  |2021-04-05  |2021-04-05 08:04:13.670002|
+---+------------+--------------------------+
only showing top 2 rows



`date_format`: convierte el formato de tiempo

In [60]:
df = (empdf
    .select("date")
    .withColumn("new_date", date_format("date", "dd/MM/yyyy")))
df.show(2)

+-------------------+----------+
|               date|  new_date|
+-------------------+----------+
|2019-02-01 15:12:13|01/02/2019|
|  2018-04-01 5:12:3|01/04/2018|
+-------------------+----------+
only showing top 2 rows



`date_trunc`: retorna timestamp truncado a una unidad de tiempo específica. Podemos cambiar month por: year, week, day, minute, etc.

In [61]:
df = (empdf
    .select("date")
    .withColumn("new_date", date_trunc("month", "date")))
df.show()

+-------------------+-------------------+
|               date|           new_date|
+-------------------+-------------------+
|2019-02-01 15:12:13|2019-02-01 00:00:00|
|  2018-04-01 5:12:3|2018-04-01 00:00:00|
|  2017-06-05 1:2:13|2017-06-01 00:00:00|
|2019-08-10 10:52:53|2019-08-01 00:00:00|
| 2016-01-11 5:52:43|2016-01-01 00:00:00|
|2015-04-14 19:32:33|2015-04-01 00:00:00|
|2019-02-21 15:42:43|2019-02-01 00:00:00|
|2016-09-25 15:32:33|2016-09-01 00:00:00|
|2017-10-15 15:22:23|2017-10-01 00:00:00|
|2018-12-17 15:14:17|2018-12-01 00:00:00|
+-------------------+-------------------+



`datediff`: calcula el tiempo transcurrido entre dos fechas

In [63]:
df = (empdf.select("date")
        .withColumn("current_date", current_date()) 
        .withColumn("date_diff", datediff("current_date", "date"))) 
df.show(4)

+-------------------+------------+---------+
|               date|current_date|date_diff|
+-------------------+------------+---------+
|2019-02-01 15:12:13|  2021-04-05|      794|
|  2018-04-01 5:12:3|  2021-04-05|     1100|
|  2017-06-05 1:2:13|  2021-04-05|     1400|
|2019-08-10 10:52:53|  2021-04-05|      604|
+-------------------+------------+---------+
only showing top 4 rows



`dayofmonth`, `dayofweek`, `dayofyear`: día del mes, semana o año

In [66]:
df = (empdf
    .select("date")
    .withColumn("dayofmonth", dayofmonth("date"))
    .withColumn("dayofweek", dayofweek("date"))
    .withColumn("dayofyear", dayofyear("date")))
df.show(3)

+-------------------+----------+---------+---------+
|               date|dayofmonth|dayofweek|dayofyear|
+-------------------+----------+---------+---------+
|2019-02-01 15:12:13|         1|        6|       32|
|  2018-04-01 5:12:3|         1|        1|       91|
|  2017-06-05 1:2:13|         5|        2|      156|
+-------------------+----------+---------+---------+
only showing top 3 rows



`hour`: devuelve la hora del formato fecha. También se puede consultar minute, month, year, weekofyear,  etc.

In [69]:
df = (empdf
    .select("date")
    .withColumn("hour", hour("date")))
df.show(2)

+-------------------+----+
|               date|hour|
+-------------------+----+
|2019-02-01 15:12:13|  15|
|  2018-04-01 5:12:3|   5|
+-------------------+----+
only showing top 2 rows



`to_date`: convierte un string o timestamp en Date

In [74]:
df = (empdf
    .select("date")
    .withColumn("to_date", to_date("date")))
df.show(2)
df.printSchema()

+-------------------+----------+
|               date|   to_date|
+-------------------+----------+
|2019-02-01 15:12:13|2019-02-01|
|  2018-04-01 5:12:3|2018-04-01|
+-------------------+----------+
only showing top 2 rows

root
 |-- date: string (nullable = true)
 |-- to_date: date (nullable = true)



Si el formato de la fecha es distinto lo especificamos, un ejemplo a continuación:

In [73]:
df1 = spark.createDataFrame([('15/02/2019 10:30:00',)], ['date'])
df2 = (df1
    .withColumn("new_date", to_date("date", 'dd/MM/yyyy HH:mm:ss')))    
df2.show(2)

+-------------------+----------+
|               date|  new_date|
+-------------------+----------+
|15/02/2019 10:30:00|2019-02-15|
+-------------------+----------+



## Funciones definidas por el usuario (UDF) <a class="anchor" id="chapter5"></a>

Creamos un DataFrame para aplicar UDF y más adelante analizar los valores nulos

In [210]:
emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"]) 

df.show()

+----+----+-----+------+
|  id|name| dept|salary|
+----+----+-----+------+
|   1| AAA|dept1|  1000|
|   2| BBB|dept1|  1100|
|   3| CCC|dept1|  3000|
|   4| DDD|dept1|  1500|
|   5| EEE|dept2|  8000|
|   6| FFF|dept2|  7200|
|   7| GGG|dept3|  7100|
|null|null| null|  7500|
|   9| III| null|  4500|
|  10|null|dept5|  2500|
+----+----+-----+------+



Buscamos agrupar el salario en tres niveles, para ello creamos una función

In [200]:
def detSalary_Level(sal):
    level = None

    if(sal > 5000):
        level = 'high_salary'
    elif(sal > 2000):
        level = 'mid_salary'
    elif(sal > 0):
        level = 'low_salary'
    else:
        level = 'invalid_salary'
    return level

Tenemos que registrar la función como UDF

In [202]:
sal_level = udf(detSalary_Level, StringType())

Aplicamos la función sobre una columna determinada

In [203]:
newdf = df.withColumn("salary_level", sal_level("salary"))
newdf.show()

+----+----+-----+------+------------+
|  id|name| dept|salary|salary_level|
+----+----+-----+------+------------+
|   1| AAA|dept1|  1000|  low_salary|
|   2| BBB|dept1|  1100|  low_salary|
|   3| CCC|dept1|  3000|  mid_salary|
|   4| DDD|dept1|  1500|  low_salary|
|   5| EEE|dept2|  8000| high_salary|
|   6| FFF|dept2|  7200| high_salary|
|   7| GGG|dept3|  7100| high_salary|
|null|null| null|  7500| high_salary|
|   9| III| null|  4500|  mid_salary|
|  10|null|dept5|  2500|  mid_salary|
+----+----+-----+------+------------+



## Valores nulos o NaN <a class="anchor" id="chapter6"></a>

**isNull()** y **inNotNull()**: nos dan las filas con valores nulos o sin ellos

In [211]:
newdf = df.filter(df["dept"].isNull())
newdf.show()

+----+----+----+------+
|  id|name|dept|salary|
+----+----+----+------+
|null|null|null|  7500|
|   9| III|null|  4500|
+----+----+----+------+



In [212]:
newdf = df.filter(df["dept"].isNotNull())
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



**fillna()**: sobreescribimos los valores nulos de una columna

In [213]:
newdf = df.fillna("no_data", ["dept"])
newdf.show()

+----+----+-------+------+
|  id|name|   dept|salary|
+----+----+-------+------+
|   1| AAA|  dept1|  1000|
|   2| BBB|  dept1|  1100|
|   3| CCC|  dept1|  3000|
|   4| DDD|  dept1|  1500|
|   5| EEE|  dept2|  8000|
|   6| FFF|  dept2|  7200|
|   7| GGG|  dept3|  7100|
|null|null|no_data|  7500|
|   9| III|no_data|  4500|
|  10|null|  dept5|  2500|
+----+----+-------+------+



**dropna()**: remueve las filas con valores nulosº

In [214]:
newdf = df.dropna() #df.dropna(how="any") por defecto
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
+---+----+-----+------+



Para aplicarlo sobre una columna lo señalamos en <i>subset</i>

In [217]:
newdf = df.dropna(subset = "dept")
newdf.show()

+---+----+-----+------+
| id|name| dept|salary|
+---+----+-----+------+
|  1| AAA|dept1|  1000|
|  2| BBB|dept1|  1100|
|  3| CCC|dept1|  3000|
|  4| DDD|dept1|  1500|
|  5| EEE|dept2|  8000|
|  6| FFF|dept2|  7200|
|  7| GGG|dept3|  7100|
| 10|null|dept5|  2500|
+---+----+-----+------+



Estudiar particiones: https://medium.com/parrot-prediction/partitioning-in-apache-spark-8134ad840b0#:~:text=Spark%20can%20run%201%20concurrent,2%E2%80%933x%20times%20more).

Continuar tuto: https://github.com/NeerajBhadani/bigdata-ml/blob/master/apache-spark/getting-started-with-apache-spark-part-3.ipynb

    Webgraphy:
 - https://towardsdatascience.com/beginners-guide-to-pyspark-bbe3b553b79f
 - https://www.guru99.com/pyspark-tutorial.html
 - https://amiradata.com/pyspark-lit-function-to-add-literal-constant-column/
 - https://github.com/NeerajBhadani/bigdata-ml/blob/master/apache-spark/getting-started-with-apache-spark-part-2.ipynb