# Apache Spark

<img src="Apache_Spark_logo.svg.png" height="300" width="300">

Apache Spark es un motor de análisis unificado ultrarrápido para big data y aprendizaje automático. Fue desarrollado originalmente en UC Berkeley en 2009. Es un motor multilenguaje para ejecutar ingeniería de datos, ciencia de datos y aprendizaje automático en máquinas o clústeres de un solo nodo.

# Tips

- Apache spark es un Framework de procesamiento distribuido para Java, Scala, SQL, Python y R.
- Framework, set de herramientas y librerías.
- Es un motor de Procesamiento distribuido (dividir un tarea en partes paralelizables y distribuirla en nodos).
- Recordemos que Apache Spark contiene dos clases de métodos, los de transformación (Ultravelocidad para grandes volumenes de datos) y los de acción (Es un proceso lento que realiza la tarea de materializar los calculos de las transformaciones).
- Se basa en `MapReduce`.

# Concepto de Map-Reduce
<br>
<br>
<img src="Hadoop-Map-Reduce.png" height="350" width="350">
<br>
<br>

- **Map:** La función Map recibe como parámetros un par de (clave, valor) y devuelve una lista de pares. Esta función se encarga del mapeo y se aplica a cada elemento de la entrada de datos.
- **Reduce:** La función Reduce se aplica en paralelo para cada grupo creado por la función Map().La función Reduce se llama una vez para cada clave única de la salida de la función Map. Junto con esta clave, se pasa una lista de todos los valores asociados con la clave para que pueda realizar alguna fusión para producir un conjunto más pequeño de los valores.

- **Nota:** el Map parte la data y le agrega unas llaves para que el Reduce aplique una agregación a esas llaves

<br>
<br>
<img src="map-reduce.jpg" height="850" width="850">
<br>
<br>

# Arquitectura

Recordemos que la arquityectura de Apache spark se compone de los siguiente:

- Driver y SparkContext (Session)
- Cluster Manager
- Nodes
- Executor
- Task
- Caché

<img src="spark_operation.png" height="450" width="450">
<br>
<br>

# Estructuras de datos

<br>
<br>
<img src="chart3.png" height="800" width="800">
<br>
<br>

- **RDD:** Dentro de Apache Spark se encuentran los `RDD (Resilient Distributed Dataset)` que es básicamente una colección distribuida inmutable de conjuntos de objetos. Un Dataframe en Pyspark está construido en base a los RDD y a diferencia de los Dataframes de pandas, estos se encuentran organizados en columnas permitiendo consultas más rápidas sobre conjunto determinado de datos aprovechando la computación en paralelo (Bajo nivel).

- **DataFrame:** El primer detalle que salta cuando creamos un DataFrame es que poseen columnas nombradas, lo que a nivel conceptual es como trabajar con un DataFrame de Pandas. Con la excepción que a nivel interno Spark trabaja con Scala, lo cual le asigna a cada columna el tipo de dato Row, un tipo especial de objeto sin tipo definido (Alto nivel).

## RDD's

In [172]:
import pandas as pd

import findspark
findspark.init()

In [174]:
from pyspark.sql import SparkSession
from IPython.core.display import HTML

display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [175]:
spark = SparkSession.builder.appName("PysparkSession")\
                            .config("spark.shuffle.sql.partitions", 5)\
                            .getOrCreate()

In [176]:
spark

### Read Data

In [179]:
df_adult = (spark.read.format("csv").option("header", "true")
                                    .option("sep", ",")
                                    .load("./datasets/adult.csv"))
df_adult.show(5)
print(type(df_adult))

+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+-----------+-----------+------------+--------------+-----+
|age|       workclass|fnlwgt|education|education-num|    marital-status|       occupation| relationship| race|   sex|capitalgain|capitalloss|hoursperweek|native-country|class|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+-----------+-----------+------------+--------------+-----+
|  2|       State-gov| 77516|Bachelors|           13|     Never-married|     Adm-clerical|Not-in-family|White|  Male|          1|          0|           2| United-States|<=50K|
|  3|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|          0|          0|           0| United-States|<=50K|
|  2|         Private|215646|  HS-grad|            9|          Divorced|Handlers-cleaners|Not-in-family|White|  Male|   

In [181]:
df_adult_RDD = spark.sparkContext.textFile("./datasets/adult.csv")
type(df_adult_RDD)

pyspark.rdd.RDD

In [185]:
df_adult_RDD.collect()

['age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capitalgain,capitalloss,hoursperweek,native-country,class',
 '2,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,1,0,2,United-States,<=50K',
 '3,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,0,United-States,<=50K',
 '2,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,2,United-States,<=50K',
 '3,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,2,United-States,<=50K',
 '1,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,2,Cuba,<=50K',
 '2,Private,284582,Masters,14,Married-civ-spouse,Exec-managerial,Wife,White,Female,0,0,2,United-States,<=50K',
 '3,Private,160187,9th,5,Married-spouse-absent,Other-service,Not-in-family,Black,Female,0,0,0,Jamaica,<=50K',
 '3,Self-emp-not-inc,209642,HS-grad,9,Married-civ-spouse,Exec-manag

In [186]:
df_adult_RDD

./datasets/adult.csv MapPartitionsRDD[27] at textFile at NativeMethodAccessorImpl.java:0

### Map Function

In [187]:
map_function = lambda x: x.split(",")
df_adult_RDD.map(map_function).collect()

[['age',
  'workclass',
  'fnlwgt',
  'education',
  'education-num',
  'marital-status',
  'occupation',
  'relationship',
  'race',
  'sex',
  'capitalgain',
  'capitalloss',
  'hoursperweek',
  'native-country',
  'class'],
 ['2',
  'State-gov',
  '77516',
  'Bachelors',
  '13',
  'Never-married',
  'Adm-clerical',
  'Not-in-family',
  'White',
  'Male',
  '1',
  '0',
  '2',
  'United-States',
  '<=50K'],
 ['3',
  'Self-emp-not-inc',
  '83311',
  'Bachelors',
  '13',
  'Married-civ-spouse',
  'Exec-managerial',
  'Husband',
  'White',
  'Male',
  '0',
  '0',
  '0',
  'United-States',
  '<=50K'],
 ['2',
  'Private',
  '215646',
  'HS-grad',
  '9',
  'Divorced',
  'Handlers-cleaners',
  'Not-in-family',
  'White',
  'Male',
  '0',
  '0',
  '2',
  'United-States',
  '<=50K'],
 ['3',
  'Private',
  '234721',
  '11th',
  '7',
  'Married-civ-spouse',
  'Handlers-cleaners',
  'Husband',
  'Black',
  'Male',
  '0',
  '0',
  '2',
  'United-States',
  '<=50K'],
 ['1',
  'Private',
  '338409',

In [188]:
map_function = lambda x: x.split(",")[3]
df_adult_RDD.map(map_function).collect()

['education',
 'Bachelors',
 'Bachelors',
 'HS-grad',
 '11th',
 'Bachelors',
 'Masters',
 '9th',
 'HS-grad',
 'Masters',
 'Bachelors',
 'Some-college',
 'Bachelors',
 'Bachelors',
 'Assoc-acdm',
 'Assoc-voc',
 '7th-8th',
 'HS-grad',
 'HS-grad',
 '11th',
 'Masters',
 'Doctorate',
 'HS-grad',
 '9th',
 '11th',
 'HS-grad',
 'Bachelors',
 'HS-grad',
 'Some-college',
 'HS-grad',
 'HS-grad',
 'Assoc-acdm',
 'Some-college',
 'Bachelors',
 'Some-college',
 'Some-college',
 '11th',
 'Some-college',
 'HS-grad',
 'Some-college',
 'Assoc-acdm',
 '9th',
 'Bachelors',
 'Bachelors',
 'HS-grad',
 'HS-grad',
 'Bachelors',
 'HS-grad',
 'Masters',
 'Assoc-voc',
 'Assoc-voc',
 'Some-college',
 'HS-grad',
 'Prof-school',
 'Bachelors',
 'HS-grad',
 'Some-college',
 '5th-6th',
 'Assoc-voc',
 'HS-grad',
 'HS-grad',
 'Bachelors',
 '7th-8th',
 'HS-grad',
 'Doctorate',
 'Some-college',
 'HS-grad',
 'Some-college',
 'HS-grad',
 'Some-college',
 'Some-college',
 'Some-college',
 'Bachelors',
 'Bachelors',
 'Some-co

### Filter Function

In [190]:
map_function = lambda x: x.split(",")[3]
filter_function = lambda x: (x == "Masters") | (x == "Bachelors")

df_adult_RDD.map(map_function).filter(filter_function).collect()

['Bachelors',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Bachelors',
 'Masters',
 'Bachelors',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Masters',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Masters',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Bachelors',
 'Bachelors',
 'Masters',
 'Bach

### Reduce Function

In [196]:
map_function = lambda x: (x.split(",")[3], 1)
filter_function = lambda x: (x[0] == "Masters") | (x[0] == "Bachelors")
reduce_function = lambda val1, val2: val1 + val2
df_adult_RDD.map(map_function).filter(filter_function).reduceByKey(reduce_function).collect()

[('Bachelors', 8025), ('Masters', 2657)]

# Pyspark Transformatios

## Drop Columns

In [200]:
df_house = spark.read.csv("./datasets/Bengaluru_House_Data.csv", header = True)
df_house.show(5)

+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
|           area_type| availability|            location|     size|society|total_sqft|bath|balcony|price|
+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
|Super built-up  Area|       19-Dec|Electronic City P...|    2 BHK|Coomee |      1056|   2|      1|39.07|
|          Plot  Area|Ready To Move|    Chikka Tirupathi|4 Bedroom|Theanmp|      2600|   5|      3|  120|
|      Built-up  Area|Ready To Move|         Uttarahalli|    3 BHK|   null|      1440|   2|      3|   62|
|Super built-up  Area|Ready To Move|  Lingadheeranahalli|    3 BHK|Soiewre|      1521|   3|      1|   95|
|Super built-up  Area|Ready To Move|            Kothanur|    2 BHK|   null|      1200|   2|      1|   51|
+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
only showing top 5 rows



In [201]:
df_house_pd = df_house.toPandas()
df_house_pd.head()

Unnamed: 0,area_type,availability,location,size,society,total_sqft,bath,balcony,price
0,Super built-up Area,19-Dec,Electronic City Phase II,2 BHK,Coomee,1056,2,1,39.07
1,Plot Area,Ready To Move,Chikka Tirupathi,4 Bedroom,Theanmp,2600,5,3,120.0
2,Built-up Area,Ready To Move,Uttarahalli,3 BHK,,1440,2,3,62.0
3,Super built-up Area,Ready To Move,Lingadheeranahalli,3 BHK,Soiewre,1521,3,1,95.0
4,Super built-up Area,Ready To Move,Kothanur,2 BHK,,1200,2,1,51.0


In [204]:
df_house_pd.drop(["area_type"], axis = 1).head()

Unnamed: 0,availability,location,size,society,total_sqft,bath,balcony,price
0,19-Dec,Electronic City Phase II,2 BHK,Coomee,1056,2,1,39.07
1,Ready To Move,Chikka Tirupathi,4 Bedroom,Theanmp,2600,5,3,120.0
2,Ready To Move,Uttarahalli,3 BHK,,1440,2,3,62.0
3,Ready To Move,Lingadheeranahalli,3 BHK,Soiewre,1521,3,1,95.0
4,Ready To Move,Kothanur,2 BHK,,1200,2,1,51.0


In [207]:
df_house.drop("area_type").show(5)

+-------------+--------------------+---------+-------+----------+----+-------+-----+
| availability|            location|     size|society|total_sqft|bath|balcony|price|
+-------------+--------------------+---------+-------+----------+----+-------+-----+
|       19-Dec|Electronic City P...|    2 BHK|Coomee |      1056|   2|      1|39.07|
|Ready To Move|    Chikka Tirupathi|4 Bedroom|Theanmp|      2600|   5|      3|  120|
|Ready To Move|         Uttarahalli|    3 BHK|   null|      1440|   2|      3|   62|
|Ready To Move|  Lingadheeranahalli|    3 BHK|Soiewre|      1521|   3|      1|   95|
|Ready To Move|            Kothanur|    2 BHK|   null|      1200|   2|      1|   51|
+-------------+--------------------+---------+-------+----------+----+-------+-----+
only showing top 5 rows



## Distinct count

In [208]:
len(df_house_pd.society.unique())

2689

In [214]:
df_house.select("society").distinct().count()

2689

## First

In [218]:
df_house.select("society").first()

Row(society='Coomee ')

# Spark SQL

Nos permite manipular los DataFrames a través de consultas SQL

In [219]:
df_house.show(5)

+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
|           area_type| availability|            location|     size|society|total_sqft|bath|balcony|price|
+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
|Super built-up  Area|       19-Dec|Electronic City P...|    2 BHK|Coomee |      1056|   2|      1|39.07|
|          Plot  Area|Ready To Move|    Chikka Tirupathi|4 Bedroom|Theanmp|      2600|   5|      3|  120|
|      Built-up  Area|Ready To Move|         Uttarahalli|    3 BHK|   null|      1440|   2|      3|   62|
|Super built-up  Area|Ready To Move|  Lingadheeranahalli|    3 BHK|Soiewre|      1521|   3|      1|   95|
|Super built-up  Area|Ready To Move|            Kothanur|    2 BHK|   null|      1200|   2|      1|   51|
+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
only showing top 5 rows



In [220]:
df_area_type = df_house.select("area_type", "availability", "location", "bath", "balcony", "price")
df_area_type.show(3)

+--------------------+-------------+--------------------+----+-------+-----+
|           area_type| availability|            location|bath|balcony|price|
+--------------------+-------------+--------------------+----+-------+-----+
|Super built-up  Area|       19-Dec|Electronic City P...|   2|      1|39.07|
|          Plot  Area|Ready To Move|    Chikka Tirupathi|   5|      3|  120|
|      Built-up  Area|Ready To Move|         Uttarahalli|   2|      3|   62|
+--------------------+-------------+--------------------+----+-------+-----+
only showing top 3 rows



## Create Table

In [226]:
palabra = """

fqwerfwe
fw
efqwe
fqw
ef
qwef


"""
palabra

'\n\nfqwerfwe\nfw\nefqwe\nfqw\nef\nqwef\n\n\n'

In [221]:
df_area_type.createOrReplaceTempView("df_area_type")

In [250]:
spark.sql("DESCRIBE df_area_type").show()

+------------+---------+-------+
|    col_name|data_type|comment|
+------------+---------+-------+
|   area_type|   string|   null|
|availability|   string|   null|
|    location|   string|   null|
|        bath|   string|   null|
|     balcony|   string|   null|
|       price|   string|   null|
+------------+---------+-------+



In [227]:
spark.sql("SELECT area_type, availability, location, bath, balcony, price FROM df_area_type").show(5)

+--------------------+-------------+--------------------+----+-------+-----+
|           area_type| availability|            location|bath|balcony|price|
+--------------------+-------------+--------------------+----+-------+-----+
|Super built-up  Area|       19-Dec|Electronic City P...|   2|      1|39.07|
|          Plot  Area|Ready To Move|    Chikka Tirupathi|   5|      3|  120|
|      Built-up  Area|Ready To Move|         Uttarahalli|   2|      3|   62|
|Super built-up  Area|Ready To Move|  Lingadheeranahalli|   3|      1|   95|
|Super built-up  Area|Ready To Move|            Kothanur|   2|      1|   51|
+--------------------+-------------+--------------------+----+-------+-----+
only showing top 5 rows



In [228]:
spark.sql("""

    SELECT area_type,
           availability, 
           location, 
           bath, 
           balcony, 
           price
    FROM df_area_type
    
""").show(5)

+--------------------+-------------+--------------------+----+-------+-----+
|           area_type| availability|            location|bath|balcony|price|
+--------------------+-------------+--------------------+----+-------+-----+
|Super built-up  Area|       19-Dec|Electronic City P...|   2|      1|39.07|
|          Plot  Area|Ready To Move|    Chikka Tirupathi|   5|      3|  120|
|      Built-up  Area|Ready To Move|         Uttarahalli|   2|      3|   62|
|Super built-up  Area|Ready To Move|  Lingadheeranahalli|   3|      1|   95|
|Super built-up  Area|Ready To Move|            Kothanur|   2|      1|   51|
+--------------------+-------------+--------------------+----+-------+-----+
only showing top 5 rows



In [264]:
df = spark.sql("""

    SELECT area_type,
           AVG(price) AS avg_price,
           AVG(bath) AS avg_bath
    FROM df_area_type
    GROUP BY area_type
    
""")

df.show()

+--------------------+------------------+------------------+
|           area_type|         avg_price|          avg_bath|
+--------------------+------------------+------------------+
|      Built-up  Area|104.28549834574028|2.6493775933609958|
|Super built-up  Area| 92.97175711035274|2.4303855394119664|
|          Plot  Area|208.49548641975312|3.8934793429566947|
|        Carpet  Area| 89.50235632183907|2.5057471264367814|
+--------------------+------------------+------------------+



In [232]:
spark.sql("""

    SELECT area_type,
           AVG(price) AS avg_price,
           AVG(bath) AS avg_bath
    FROM df_area_type
    GROUP BY area_type
    ORDER BY avg_price DESC
    
""").show(5)

+--------------------+------------------+------------------+
|           area_type|         avg_price|          avg_bath|
+--------------------+------------------+------------------+
|          Plot  Area|208.49548641975312|3.8934793429566947|
|      Built-up  Area|104.28549834574028|2.6493775933609958|
|Super built-up  Area| 92.97175711035274|2.4303855394119664|
|        Carpet  Area| 89.50235632183907|2.5057471264367814|
+--------------------+------------------+------------------+



In [243]:
from pyspark.sql.functions import col, udf
df_area_type = df_area_type.withColumn("price", col("price").cast("float"))
df_area_type = df_area_type.withColumn("bath", col("bath").cast("float"))
df_area_type.printSchema()

root
 |-- area_type: string (nullable = true)
 |-- availability: string (nullable = true)
 |-- location: string (nullable = true)
 |-- bath: float (nullable = true)
 |-- balcony: string (nullable = true)
 |-- price: float (nullable = true)



In [248]:
df_area_type.groupBy("area_type")\
            .mean("price", "bath")\
            .sort("avg(price)", ascending = False)\
            .show()

+--------------------+------------------+------------------+
|           area_type|        avg(price)|         avg(bath)|
+--------------------+------------------+------------------+
|          Plot  Area|208.49548641299023|3.8934793429566947|
|      Built-up  Area|104.28549835798265|2.6493775933609958|
|Super built-up  Area| 92.97175712943483|2.4303855394119664|
|        Carpet  Area| 89.50235632096214|2.5057471264367814|
+--------------------+------------------+------------------+



## Pyspark Expressions

Es normalmente utilizada para hacer expresiones tipo `CASE WHEN en SQL`

In [251]:
from pyspark.sql.functions import col, udf, expr

In [254]:
df_athletes = spark.read.format("csv").option("header", True)\
                                      .option("sep", ",")\
                                      .load("./datasets/athletes.csv")
df_athletes.show(5)

+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+
|       id|          name|nationality|   sex|     dob|height|weight|    sport|gold|silver|bronze|
+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+
|736041664|A Jesus Garcia|        ESP|  male|10/17/69|  1.72|    64|athletics|   0|     0|     0|
|532037425|    A Lam Shin|        KOR|female| 9/23/86|  1.68|    56|  fencing|   0|     0|     0|
|435962603|   Aaron Brown|        CAN|  male| 5/27/92|  1.98|    79|athletics|   0|     0|     1|
|521041435|    Aaron Cook|        MDA|  male|  1/2/91|  1.83|    80|taekwondo|   0|     0|     0|
| 33922579|    Aaron Gate|        NZL|  male|11/26/90|  1.81|    71|  cycling|   0|     0|     0|
+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+
only showing top 5 rows



In [263]:
df_athletes.withColumn("sex", expr("""

CASE
    WHEN sex == 'male' THEN 'M'
    WHEN sex == 'female' THEN 'F'
ELSE 'UNKNOWN'
    END
    
""")).show(5)

+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
|       id|          name|nationality|sex|     dob|height|weight|    sport|gold|silver|bronze|
+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
|736041664|A Jesus Garcia|        ESP|  M|10/17/69|  1.72|    64|athletics|   0|     0|     0|
|532037425|    A Lam Shin|        KOR|  F| 9/23/86|  1.68|    56|  fencing|   0|     0|     0|
|435962603|   Aaron Brown|        CAN|  M| 5/27/92|  1.98|    79|athletics|   0|     0|     1|
|521041435|    Aaron Cook|        MDA|  M|  1/2/91|  1.83|    80|taekwondo|   0|     0|     0|
| 33922579|    Aaron Gate|        NZL|  M|11/26/90|  1.81|    71|  cycling|   0|     0|     0|
+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
only showing top 5 rows



In [260]:
df_house.show(2)

+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
|           area_type| availability|            location|     size|society|total_sqft|bath|balcony|price|
+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
|Super built-up  Area|       19-Dec|Electronic City P...|    2 BHK|Coomee |      1056|   2|      1|39.07|
|          Plot  Area|Ready To Move|    Chikka Tirupathi|4 Bedroom|Theanmp|      2600|   5|      3|  120|
+--------------------+-------------+--------------------+---------+-------+----------+----+-------+-----+
only showing top 2 rows



In [262]:
df_house = df_house.withColumn("society", expr("""

    TRIM(society)

"""))

df_house.first().society

'Coomee'

In [288]:
df_house.select("balcony", expr("balcony + 10 as balcony_sum")).show(3)

+-------+-----------+
|balcony|balcony_sum|
+-------+-----------+
|      1|       11.0|
|      3|       13.0|
|      3|       13.0|
+-------+-----------+
only showing top 3 rows



In [287]:
df_house.select("balcony", expr("balcony + 10").alias("balcony_sum")).show(3)

+-------+-----------+
|balcony|balcony_sum|
+-------+-----------+
|      1|       11.0|
|      3|       13.0|
|      3|       13.0|
+-------+-----------+
only showing top 3 rows



# Pyspark When Otherwise

Funciona similar al `CASE WHEN de SQL`, el cual permite comprobar una sentencia y en caso de que se cumpla retornará un valor, 
en caso contrario de que no se cumplan las condiciones de devolverá otro valor

In [271]:
from pyspark.sql.functions import col, udf, expr, when

In [269]:
df_athletes.withColumn("sex", expr("""

CASE
    WHEN sex == 'male' THEN 'M'
    WHEN sex == 'female' THEN 'F'
ELSE 'UNKNOWN'
    END
    
""")).show(3)

+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
|       id|          name|nationality|sex|     dob|height|weight|    sport|gold|silver|bronze|
+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
|736041664|A Jesus Garcia|        ESP|  M|10/17/69|  1.72|    64|athletics|   0|     0|     0|
|532037425|    A Lam Shin|        KOR|  F| 9/23/86|  1.68|    56|  fencing|   0|     0|     0|
|435962603|   Aaron Brown|        CAN|  M| 5/27/92|  1.98|    79|athletics|   0|     0|     1|
+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
only showing top 3 rows



In [274]:
df_athletes.withColumn("sex", when(df_athletes.sex == "male", "M")
                             .when(df_athletes.sex == "female", "F")
                             .when(df_athletes.sex.isNull(), "UNKNOWN")
                             .otherwise(df_athletes.sex)).show(3)

+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
|       id|          name|nationality|sex|     dob|height|weight|    sport|gold|silver|bronze|
+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
|736041664|A Jesus Garcia|        ESP|  M|10/17/69|  1.72|    64|athletics|   0|     0|     0|
|532037425|    A Lam Shin|        KOR|  F| 9/23/86|  1.68|    56|  fencing|   0|     0|     0|
|435962603|   Aaron Brown|        CAN|  M| 5/27/92|  1.98|    79|athletics|   0|     0|     1|
+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
only showing top 3 rows



In [280]:
df_athletes.select(col("*"), when(df_athletes.sex == "male", "M")
                            .when(df_athletes.sex == "female", "F")
                            .when(df_athletes.sex.isNull(), "UNKNOWN")
                            .otherwise(df_athletes.sex).alias("new_sex")).show(3)

+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+-------+
|       id|          name|nationality|   sex|     dob|height|weight|    sport|gold|silver|bronze|new_sex|
+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+-------+
|736041664|A Jesus Garcia|        ESP|  male|10/17/69|  1.72|    64|athletics|   0|     0|     0|      M|
|532037425|    A Lam Shin|        KOR|female| 9/23/86|  1.68|    56|  fencing|   0|     0|     0|      F|
|435962603|   Aaron Brown|        CAN|  male| 5/27/92|  1.98|    79|athletics|   0|     0|     1|      M|
+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+-------+
only showing top 3 rows



In [267]:
df_fuel = spark.read.csv("./datasets/FuelConsumption.csv", header = True)
df_fuel.show(5)

+---------+-----+----------+------------+----------+---------+------------+--------+--------------------+-------------------+--------------------+------------------------+------------+
|MODELYEAR| MAKE|     MODEL|VEHICLECLASS|ENGINESIZE|CYLINDERS|TRANSMISSION|FUELTYPE|FUELCONSUMPTION_CITY|FUELCONSUMPTION_HWY|FUELCONSUMPTION_COMB|FUELCONSUMPTION_COMB_MPG|CO2EMISSIONS|
+---------+-----+----------+------------+----------+---------+------------+--------+--------------------+-------------------+--------------------+------------------------+------------+
|     2014|ACURA|       ILX|     COMPACT|         2|        4|         AS5|       Z|                 9.9|                6.7|                 8.5|                      33|         196|
|     2014|ACURA|       ILX|     COMPACT|       2.4|        4|          M6|       Z|                11.2|                7.7|                 9.6|                      29|         221|
|     2014|ACURA|ILX HYBRID|     COMPACT|       1.5|        4|         AV7|

In [284]:
df_fuel.withColumn("MODEL_new", when(df_fuel.MODEL == "ILX", "ilx")
                               .otherwise(df_fuel.MODEL)).show(10)

+---------+-----+----------+------------+----------+---------+------------+--------+--------------------+-------------------+--------------------+------------------------+------------+----------+
|MODELYEAR| MAKE|     MODEL|VEHICLECLASS|ENGINESIZE|CYLINDERS|TRANSMISSION|FUELTYPE|FUELCONSUMPTION_CITY|FUELCONSUMPTION_HWY|FUELCONSUMPTION_COMB|FUELCONSUMPTION_COMB_MPG|CO2EMISSIONS| MODEL_new|
+---------+-----+----------+------------+----------+---------+------------+--------+--------------------+-------------------+--------------------+------------------------+------------+----------+
|     2014|ACURA|       ILX|     COMPACT|         2|        4|         AS5|       Z|                 9.9|                6.7|                 8.5|                      33|         196|       ilx|
|     2014|ACURA|       ILX|     COMPACT|       2.4|        4|          M6|       Z|                11.2|                7.7|                 9.6|                      29|         221|       ilx|
|     2014|ACURA|ILX

# User Defined Functions

Tambien se conocen como `UDF's` y son ampliamente utilizadas para crear funciones definidas por el usuario y que se 
puedan aplicar a una columna especifica de un dataFrame de Spark, similar a utilizar un apply en Pandas, se deben seguir los siguientes pasos:

- Definir Pyspark DataFrame
- Definir Python Function
- Convertir Python function to UDF Spark
- Aplicar UDF a la columna

In [289]:
df_athletes.withColumn("sex", expr("""

CASE
    WHEN sex == 'male' THEN 'M'
    WHEN sex == 'female' THEN 'F'
ELSE 'UNKNOWN'
    END
    
""")).show(3)

+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
|       id|          name|nationality|sex|     dob|height|weight|    sport|gold|silver|bronze|
+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
|736041664|A Jesus Garcia|        ESP|  M|10/17/69|  1.72|    64|athletics|   0|     0|     0|
|532037425|    A Lam Shin|        KOR|  F| 9/23/86|  1.68|    56|  fencing|   0|     0|     0|
|435962603|   Aaron Brown|        CAN|  M| 5/27/92|  1.98|    79|athletics|   0|     0|     1|
+---------+--------------+-----------+---+--------+------+------+---------+----+------+------+
only showing top 3 rows



In [295]:
df_athletes_pd = df_athletes.toPandas()
df_athletes_pd.head()

Unnamed: 0,id,name,nationality,sex,dob,height,weight,sport,gold,silver,bronze
0,736041664,A Jesus Garcia,ESP,male,10/17/69,1.72,64,athletics,0,0,0
1,532037425,A Lam Shin,KOR,female,9/23/86,1.68,56,fencing,0,0,0
2,435962603,Aaron Brown,CAN,male,5/27/92,1.98,79,athletics,0,0,1
3,521041435,Aaron Cook,MDA,male,1/2/91,1.83,80,taekwondo,0,0,0
4,33922579,Aaron Gate,NZL,male,11/26/90,1.81,71,cycling,0,0,0


## Definir Python function

In [290]:
def convert_sex_letter(item):
    
    if item == "male":
        return "M"
    elif item == "female":
        return "F"
    else:
        return "UNKNOWN"

In [296]:
df_athletes_pd["sex"] = df_athletes_pd["sex"].apply(lambda x: convert_sex_letter(x))
df_athletes_pd.head()

Unnamed: 0,id,name,nationality,sex,dob,height,weight,sport,gold,silver,bronze
0,736041664,A Jesus Garcia,ESP,M,10/17/69,1.72,64,athletics,0,0,0
1,532037425,A Lam Shin,KOR,F,9/23/86,1.68,56,fencing,0,0,0
2,435962603,Aaron Brown,CAN,M,5/27/92,1.98,79,athletics,0,0,1
3,521041435,Aaron Cook,MDA,M,1/2/91,1.83,80,taekwondo,0,0,0
4,33922579,Aaron Gate,NZL,M,11/26/90,1.81,71,cycling,0,0,0


## Definir Spark UDF

In [294]:
from pyspark.sql.functions import udf

In [297]:
convert_sex_letter_UDF = udf(lambda x: convert_sex_letter(x))
df_athletes.select(col("sex"), convert_sex_letter_UDF(col("sex")).alias("new_sex")).show()

In [302]:
convert_lower = lambda x: x.lower()
convert_lower_UDF = udf(lambda x: convert_lower(x))

In [304]:
df_athletes.select(col("nationality"), convert_lower_UDF(col("nationality")).alias("new_nationality")).show(3)

+-----------+---------------+
|nationality|new_nationality|
+-----------+---------------+
|        ESP|            esp|
|        KOR|            kor|
|        CAN|            can|
+-----------+---------------+
only showing top 3 rows



In [307]:
df_athletes.withColumn("nationality", convert_lower_UDF(col("nationality"))).show(5)

+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+
|       id|          name|nationality|   sex|     dob|height|weight|    sport|gold|silver|bronze|
+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+
|736041664|A Jesus Garcia|        esp|  male|10/17/69|  1.72|    64|athletics|   0|     0|     0|
|532037425|    A Lam Shin|        kor|female| 9/23/86|  1.68|    56|  fencing|   0|     0|     0|
|435962603|   Aaron Brown|        can|  male| 5/27/92|  1.98|    79|athletics|   0|     0|     1|
|521041435|    Aaron Cook|        mda|  male|  1/2/91|  1.83|    80|taekwondo|   0|     0|     0|
| 33922579|    Aaron Gate|        nzl|  male|11/26/90|  1.81|    71|  cycling|   0|     0|     0|
+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+
only showing top 5 rows



In [309]:
df_athletes.describe().show()

+-------+--------------------+--------------+-----------+------+------+-------------------+------------------+---------+--------------------+-------------------+--------------------+
|summary|                  id|          name|nationality|   sex|   dob|             height|            weight|    sport|                gold|             silver|              bronze|
+-------+--------------------+--------------+-----------+------+------+-------------------+------------------+---------+--------------------+-------------------+--------------------+
|  count|               11538|         11538|      11538| 11538| 11537|              11208|             10879|    11538|               11538|              11538|               11538|
|   mean|4.9998850863052523E8|          null|       null|  null|  null| 1.7662821199143461| 72.06820479823513|     null|0.057722308892355696|0.05676893742416363|0.061015773964291906|
| stddev| 2.908647923664012E8|          null|       null|  null|  null|0.112718691176

# Built Functions

## Lit

In [311]:
from pyspark.sql.functions import udf, col, lit

In [313]:
df_athletes.show(3)

+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+
|       id|          name|nationality|   sex|     dob|height|weight|    sport|gold|silver|bronze|
+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+
|736041664|A Jesus Garcia|        ESP|  male|10/17/69|  1.72|    64|athletics|   0|     0|     0|
|532037425|    A Lam Shin|        KOR|female| 9/23/86|  1.68|    56|  fencing|   0|     0|     0|
|435962603|   Aaron Brown|        CAN|  male| 5/27/92|  1.98|    79|athletics|   0|     0|     1|
+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+
only showing top 3 rows



In [314]:
df_athletes.select(col("height"), col("weight"), lit("Olimpiadas").alias("Etiquetas")).show(3)

+------+------+----------+
|height|weight| Etiquetas|
+------+------+----------+
|  1.72|    64|Olimpiadas|
|  1.68|    56|Olimpiadas|
|  1.98|    79|Olimpiadas|
+------+------+----------+
only showing top 3 rows



In [318]:
df_athletes.withColumn("Winner", when(col("gold") == 1, lit("Winner"))
                                .otherwise(lit("loser"))).show(5)

+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+------+
|       id|          name|nationality|   sex|     dob|height|weight|    sport|gold|silver|bronze|Winner|
+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+------+
|736041664|A Jesus Garcia|        ESP|  male|10/17/69|  1.72|    64|athletics|   0|     0|     0| loser|
|532037425|    A Lam Shin|        KOR|female| 9/23/86|  1.68|    56|  fencing|   0|     0|     0| loser|
|435962603|   Aaron Brown|        CAN|  male| 5/27/92|  1.98|    79|athletics|   0|     0|     1| loser|
|521041435|    Aaron Cook|        MDA|  male|  1/2/91|  1.83|    80|taekwondo|   0|     0|     0| loser|
| 33922579|    Aaron Gate|        NZL|  male|11/26/90|  1.81|    71|  cycling|   0|     0|     0| loser|
+---------+--------------+-----------+------+--------+------+------+---------+----+------+------+------+
only showing top 5 rows

