In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.master("local[*]").appName('test').getOrCreate()

In [None]:
sc = spark.sparkContext

## Ejercicio 1

In [None]:
df_case = spark.read.option('header','true').option('inferSchema','true').csv("/content/Case.csv",)
df_patient = spark.read.option('header','true').option('inferSchema','true').csv("/content/PatientInfo.csv")


In [None]:
df_patient.select('patient_id').distinct().count()

5164

In [None]:
df_patient.select('patient_id').count()

5165

In [None]:
df_patient = df_patient.dropDuplicates(['patient_id'])

In [None]:
(df_case
 .where(~df_case.city.isin('-','from other city'))
  .groupBy('city')
  .count()
  .orderBy('count',ascending=False)
  .limit(3)
  .select('city')
  .show()
  )

+----------+
|      city|
+----------+
|    Seo-gu|
|Gangnam-gu|
|   Guro-gu|
+----------+



In [None]:
df_patient.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- country: string (nullable = true)
 |-- province: string (nullable = true)
 |-- city: string (nullable = true)
 |-- infection_case: string (nullable = true)
 |-- infected_by: string (nullable = true)
 |-- contact_number: string (nullable = true)
 |-- symptom_onset_date: string (nullable = true)
 |-- confirmed_date: string (nullable = true)
 |-- released_date: string (nullable = true)
 |-- deceased_date: string (nullable = true)
 |-- state: string (nullable = true)



In [None]:
from pyspark.sql.functions import col

In [None]:
reason_contagiados = df_patient.where(col('infected_by').isNotNull())
print("El numero de infectados que conocen el agente de contagio es {}".format(reason_contagiados.count()))

El numero de infectados que conocen el agente de contagio es 1346


In [None]:
df_patient.where(df_patient.infected_by.isNotNull()).count()

1346

In [None]:
female_filter = reason_contagiados.filter(reason_contagiados.sex == 'female')
female_filter.count()

643

In [None]:
(female_filter
.repartition(2)
.write
 .mode('overwrite')
 .partitionBy('province')
 .parquet('/content/female_contagiados.parquet')
)

In [None]:
df_patient.show()

+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|patient_id|   sex|age|country|province|        city|      infection_case|infected_by|contact_number|symptom_onset_date|confirmed_date|released_date|deceased_date|   state|
+----------+------+---+-------+--------+------------+--------------------+-----------+--------------+------------------+--------------+-------------+-------------+--------+
|1000000001|  male|50s|  Korea|   Seoul|  Gangseo-gu|     overseas inflow|       NULL|            75|        2020-01-22|    2020-01-23|   2020-02-05|         NULL|released|
|1000000002|  male|30s|  Korea|   Seoul| Jungnang-gu|     overseas inflow|       NULL|            31|              NULL|    2020-01-30|   2020-03-02|         NULL|released|
|1000000003|  male|50s|  Korea|   Seoul|   Jongno-gu|contact with patient| 2002000001|            17|              NULL|    2020-01-30|

## Ejercicios 2

In [None]:
from pyspark.sql.functions import approx_count_distinct, countDistinct, count

Conteo aproximado de cuantos valores diferentes hay

In [None]:
df_patient.select(
    countDistinct('province').alias('count_distinct'),
    count('province').alias('count_all'),
    approx_count_distinct('province',0.1).alias('approx_count_distinct')
).show()

+--------------+---------+---------------------+
|count_distinct|count_all|approx_count_distinct|
+--------------+---------+---------------------+
|            17|     5164|                   16|
+--------------+---------+---------------------+



In [None]:
df_patient.select(approx_count_distinct("city", rsd=0.01)).show()

+---------------------------+
|approx_count_distinct(city)|
+---------------------------+
|                        162|
+---------------------------+



In [None]:
df_vuelos = spark.read.option('header','true').option('inferSchema','true').parquet("/content/vuelos.parquet")

In [92]:
df_vuelos.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

Agregaciones por grupo

In [126]:
from pyspark.sql.functions import col, sum as _sum, when, sumDistinct, avg, min, max, desc

In [130]:
df_vuelos.groupBy('MONTH').agg(
    count('ARRIVAL_DELAY').alias('conteo_de_retrasos'),
    avg('DISTANCE').alias('prom_dist')
).orderBy(desc('conteo_de_retrasos')).show()

+-----+------------------+-----------------+
|MONTH|conteo_de_retrasos|        prom_dist|
+-----+------------------+-----------------+
|    7|            514384|841.4772794487611|
|    8|            503956|834.8244276603413|
|    6|            492847|835.6302716626612|
|    3|            492138|816.0553268611494|
|    5|            489641|823.3230588760807|
|   10|            482878|816.4436127652134|
|    4|            479251|817.0060476016745|
|   12|            469717|837.8018926194103|
|   11|            462367|820.2482434846529|
|    9|            462153|815.8487523282274|
|    1|            457013|803.2612794913696|
|    2|            407663| 800.785449834689|
+-----+------------------+-----------------+



In [145]:
df_vuelos.select(
    countDistinct(col("ARRIVAL_DELAY")).alias("valores_distintos"),
    sum(when(col("ARRIVAL_DELAY").isNull(), 1).otherwise(0)).alias("num_nulls")
).show()

+-----------------+---------+
|valores_distintos|num_nulls|
+-----------------+---------+
|             1240|   105071|
+-----------------+---------+



In [146]:
df_vuelos.groupBy('origin_airport','destination_airport').agg(
    approx_count_distinct('DISTANCE',0.2).alias('different distances'),
    min('DISTANCE').alias('min distance'),
    max('DISTANCE').alias('max distance'),
).show()

+--------------+-------------------+-------------------+------------+------------+
|origin_airport|destination_airport|different distances|min distance|max distance|
+--------------+-------------------+-------------------+------------+------------+
|           BQN|                MCO|                  1|        1129|        1129|
|           PHL|                MCO|                  1|         861|         861|
|           MCI|                IAH|                  1|         643|         643|
|           SPI|                ORD|                  1|         174|         174|
|           SNA|                PHX|                  1|         338|         338|
|           LBB|                DEN|                  1|         456|         456|
|           ORD|                PDX|                  1|        1739|        1739|
|           EWR|                STT|                  1|        1634|        1634|
|           ATL|                GSP|                  1|         153|         153|
|   

In [148]:
total_rows = df_vuelos.count()
distinct_rows = df_vuelos.distinct().count()
duplicados = total_rows - distinct_rows

In [116]:
df_vuelos.select(col('CANCELLED')).distinct().show()

+---------+
|CANCELLED|
+---------+
|        1|
|        0|
+---------+



## Joins

inner, left, right, semi_left, left_anti, cross_join

In [150]:
df_departamentos = spark.read.option('header','true').option('inferSchema','true').parquet('/content/departamentos.parquet')

In [151]:
df_empleados = spark.read.option('header','true').option('inferSchema','true').parquet('/content/empleados.parquet')

In [158]:
print(df_departamentos.printSchema())
print(df_empleados.printSchema())

root
 |-- id: long (nullable = true)
 |-- nombre_dpto: string (nullable = true)

None
root
 |-- nombre: string (nullable = true)
 |-- num_dpto: long (nullable = true)

None


Si no hay repeticion de nombre en columnas, todo ok, sino usar declaración explicita indicando el nombre del df o usar alias

In [160]:
## Inner join
df_empleados.join(df_departamentos, col('num_dpto')==col('id')).show()

+------+--------+---+-----------+
|nombre|num_dpto| id|nombre_dpto|
+------+--------+---+-----------+
|  Luis|      33| 33|    derecho|
| Katia|      33| 33|    derecho|
|  Raul|      34| 34| matemática|
| Laura|      34| 34| matemática|
|Sandro|      31| 31|     letras|
+------+--------+---+-----------+



In [165]:
df_empleados.join(df_departamentos, df_departamentos.id==col('num_dpto'), 'left').show()

+------+--------+----+-----------+
|nombre|num_dpto|  id|nombre_dpto|
+------+--------+----+-----------+
|  Luis|      33|  33|    derecho|
| Katia|      33|  33|    derecho|
|  Raul|      34|  34| matemática|
| Pedro|       0|NULL|       NULL|
| Laura|      34|  34| matemática|
|Sandro|      31|  31|     letras|
+------+--------+----+-----------+



Left anti join

In [162]:
df_empleados.join(df_departamentos, col('num_dpto')==col('id'), 'left_anti').show()

+------+--------+
|nombre|num_dpto|
+------+--------+
| Pedro|       0|
+------+--------+



In [171]:
df_empleados.join(df_departamentos, df_empleados['num_dpto']==df_departamentos['id'], 'left_anti').show()

+------+--------+
|nombre|num_dpto|
+------+--------+
| Pedro|       0|
+------+--------+



Con alias

In [166]:
emp = df_empleados.alias("emp")
dep = df_departamentos.alias("dep")

In [167]:
df_empleados.join(df_departamentos, col('num_dpto')==col('id'), 'left_semi').show()

+------+--------+
|nombre|num_dpto|
+------+--------+
|  Luis|      33|
| Katia|      33|
|  Raul|      34|
| Laura|      34|
|Sandro|      31|
+------+--------+



Cross join

In [168]:
df_empleados.crossJoin(df_departamentos).show()

+------+--------+---+-----------+
|nombre|num_dpto| id|nombre_dpto|
+------+--------+---+-----------+
|  Luis|      33| 31|     letras|
|  Luis|      33| 33|    derecho|
|  Luis|      33| 34| matemática|
|  Luis|      33| 35|informática|
| Katia|      33| 31|     letras|
| Katia|      33| 33|    derecho|
| Katia|      33| 34| matemática|
| Katia|      33| 35|informática|
|  Raul|      34| 31|     letras|
|  Raul|      34| 33|    derecho|
|  Raul|      34| 34| matemática|
|  Raul|      34| 35|informática|
| Pedro|       0| 31|     letras|
| Pedro|       0| 33|    derecho|
| Pedro|       0| 34| matemática|
| Pedro|       0| 35|informática|
| Laura|      34| 31|     letras|
| Laura|      34| 33|    derecho|
| Laura|      34| 34| matemática|
| Laura|      34| 35|informática|
+------+--------+---+-----------+
only showing top 20 rows



Shufle Hasj Koin y BroadCast hash join

In [173]:
from pyspark.sql.functions import broadcast

In [175]:
df_empleados.join(broadcast(df_departamentos),col('num_dpto')==col('id')).show()

+------+--------+---+-----------+
|nombre|num_dpto| id|nombre_dpto|
+------+--------+---+-----------+
|  Luis|      33| 33|    derecho|
| Katia|      33| 33|    derecho|
|  Raul|      34| 34| matemática|
| Laura|      34| 34| matemática|
|Sandro|      31| 31|     letras|
+------+--------+---+-----------+



In [177]:
df_empleados.join(broadcast(df_departamentos),col('num_dpto')==col('id')).explain(True)

== Parsed Logical Plan ==
Join Inner, (num_dpto#54261L = id#54256L)
:- Relation [nombre#54260,num_dpto#54261L] parquet
+- ResolvedHint (strategy=broadcast)
   +- Relation [id#54256L,nombre_dpto#54257] parquet

== Analyzed Logical Plan ==
nombre: string, num_dpto: bigint, id: bigint, nombre_dpto: string
Join Inner, (num_dpto#54261L = id#54256L)
:- Relation [nombre#54260,num_dpto#54261L] parquet
+- ResolvedHint (strategy=broadcast)
   +- Relation [id#54256L,nombre_dpto#54257] parquet

== Optimized Logical Plan ==
Join Inner, (num_dpto#54261L = id#54256L), rightHint=(strategy=broadcast)
:- Filter isnotnull(num_dpto#54261L)
:  +- Relation [nombre#54260,num_dpto#54261L] parquet
+- Filter isnotnull(id#54256L)
   +- Relation [id#54256L,nombre_dpto#54257] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [num_dpto#54261L], [id#54256L], Inner, BuildRight, false
   :- Filter isnotnull(num_dpto#54261L)
   :  +- FileScan parquet [nombre#54260,num_dpto#54261L] Ba