# Avanzado Data Frame

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max, col
from pyspark.sql.functions import sum, sum_distinct, avg, count
from pyspark.sql.functions import desc
# directorio donde estan los datos
path = 'files/'

In [4]:
spark = SparkSession.builder.getOrCreate()
df_vuelos = spark.read.parquet(path+'vuelos.parquet')

                                                                                

In [16]:
df_vuelos.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: integer (nullable = true)
 |-- DEPARTURE_TIME: integer (nullable = true)
 |-- DEPARTURE_DELAY: integer (nullable = true)
 |-- TAXI_OUT: integer (nullable = true)
 |-- WHEELS_OFF: integer (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- ELAPSED_TIME: integer (nullable = true)
 |-- AIR_TIME: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- SCHEDULED_ARRIVAL: integer (nullable = true)
 |-- ARRIVAL_TIME: integer (nullable = true)
 |-- ARRIVAL_DELAY: integer (null

In [17]:
df_vuelos.show(5, truncate=False)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

## Funciones

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import approx_count_distinct

In [19]:
spark = SparkSession.builder.getOrCreate()

df = spark.read.parquet(path+'dataframe.parquet')

df.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- color: string (nullable = true)
 |-- cantidad: long (nullable = true)



In [20]:
df.show()

+------+-----+--------+
|nombre|color|cantidad|
+------+-----+--------+
|  Jose| azul|    1900|
|  NULL| NULL|    1700|
|  NULL| rojo|    1300|
|  Juan| rojo|    1500|
+------+-----+--------+



### Count

In [25]:
# count
df.select(
    count('nombre').alias('conteo_nombre'),
    count('color').alias('conteo_color'),
    count('cantidad').alias('conteo_cantidad')
).show()

+-------------+------------+---------------+
|conteo_nombre|conteo_color|conteo_cantidad|
+-------------+------------+---------------+
|            2|           3|              4|
+-------------+------------+---------------+



In [26]:
df.select(
    count('*').alias('conteo_general')
).show()


+--------------+
|conteo_general|
+--------------+
|             4|
+--------------+



### countDistinct

In [28]:
# countDistinct
df.select(
    countDistinct('nombre').alias('nombre_dif'),
    countDistinct('color').alias('color_dif'),
    countDistinct('cantidad').alias('cantidad_dif'),
    countDistinct('*').alias('general_dif')
).show()


+----------+---------+------------+-----------+
|nombre_dif|color_dif|cantidad_dif|general_dif|
+----------+---------+------------+-----------+
|         2|        2|           4|          2|
+----------+---------+------------+-----------+



### approx_count_distinct

In [24]:
# approx_count_distinct
# esta funcion da una aproximacion del conteo
# para reducir el costo del conteo
df_vuelos.select(
    countDistinct('AIRLINE'),
    approx_count_distinct('AIRLINE')
).show()



+-----------------------+------------------------------+
|count(DISTINCT AIRLINE)|approx_count_distinct(AIRLINE)|
+-----------------------+------------------------------+
|                     14|                            13|
+-----------------------+------------------------------+



                                                                                

### Min & Max

In [32]:
df_vuelos.select(
    min('AIR_TIME').alias('menor_timepo'),
    max('AIR_TIME').alias('mayor_tiempo'),
    min('AIRLINE_DELAY'),
    max('AIRLINE_DELAY')
).show()

+------------+------------+------------------+------------------+
|menor_timepo|mayor_tiempo|min(AIRLINE_DELAY)|max(AIRLINE_DELAY)|
+------------+------------+------------------+------------------+
|           7|         690|                 0|              1971|
+------------+------------+------------------+------------------+



### Sum

In [5]:
df_vuelos.select(
    sum('DISTANCE').alias('sum_dis')
).show()

                                                                                

+----------+
|   sum_dis|
+----------+
|4785357409|
+----------+



### SumDistinct

In [8]:
df_vuelos.select(
    sum_distinct('DISTANCE').alias('sum_dis_dif')
).show()

+-----------+
|sum_dis_dif|
+-----------+
|    1442300|
+-----------+



### AVG

In [9]:
df_vuelos.select(
    avg('AIR_TIME').alias('promedio_aire'),
    (sum('AIR_TIME') / count('AIR_TIME')).alias('prom_manual')
).show()

+------------------+------------------+
|     promedio_aire|       prom_manual|
+------------------+------------------+
|113.51162809012519|113.51162809012519|
+------------------+------------------+



### GroupBy

In [20]:
df_vuelos.groupBy('ORIGIN_AIRPORT')\
    .count()\
    .orderBy(col('count')).limit(5).show()

+--------------+-----+
|ORIGIN_AIRPORT|count|
+--------------+-----+
|         11503|    4|
|         13502|    6|
|         14222|    9|
|         10165|    9|
|         13541|   11|
+--------------+-----+



                                                                                

In [19]:
df_vuelos.groupBy('ORIGIN_AIRPORT', 'DESTINATION_AIRPORT')\
    .count()\
    .orderBy(desc('count'))\
    .limit(5).show()



+--------------+-------------------+-----+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|count|
+--------------+-------------------+-----+
|           SFO|                LAX|13744|
|           LAX|                SFO|13457|
|           JFK|                LAX|12016|
|           LAX|                JFK|12015|
|           LAS|                LAX| 9715|
+--------------+-------------------+-----+



                                                                                

In [22]:
df_vuelos.groupBy('ORIGIN_AIRPORT').agg(
    count('AIR_TIME').alias('tiempo_aire'),
    min('AIR_TIME').alias('min'),
    max('AIR_TIME').alias('max')
).orderBy(desc('tiempo_aire')).limit(10).show()



+--------------+-----------+---+---+
|ORIGIN_AIRPORT|tiempo_aire|min|max|
+--------------+-----------+---+---+
|           ATL|     343506| 15|614|
|           ORD|     276554| 13|571|
|           DFW|     232647| 11|534|
|           DEN|     193402| 12|493|
|           LAX|     192003| 14|409|
|           PHX|     145552| 19|444|
|           SFO|     145491|  8|389|
|           IAH|     144019| 15|524|
|           LAS|     131937| 25|429|
|           MSP|     111055| 14|537|
+--------------+-----------+---+---+



                                                                                

In [24]:
df_vuelos.groupBy('MONTH').agg(
    count('ARRIVAL_DELAY').alias('conteo_de_retrasos'),
    avg('DISTANCE').alias('prom_dist')
).orderBy(desc('conteo_de_retrasos')).limit(10).show()

+-----+------------------+-----------------+
|MONTH|conteo_de_retrasos|        prom_dist|
+-----+------------------+-----------------+
|    7|            514384|841.4772794487611|
|    8|            503956|834.8244276603413|
|    6|            492847|835.6302716626612|
|    3|            492138|816.0553268611494|
|    5|            489641|823.3230588760807|
|   10|            482878|816.4436127652134|
|    4|            479251|817.0060476016745|
|   12|            469717|837.8018926194103|
|   11|            462367|820.2482434846529|
|    9|            462153|815.8487523282274|
+-----+------------------+-----------------+



### GroupBy con agregaciones(pivot)

In [30]:
df_estudiantes = spark.read.parquet(path+'estudiantes.parquet')
df_estudiantes.show()

+------+----+----+----------+
|nombre|sexo|peso|graduacion|
+------+----+----+----------+
|  Jose|   M|  80|      2000|
| Hilda|   F|  50|      2000|
|  Juan|   M|  75|      2000|
| Pedro|   M|  76|      2001|
|Katia+|   F|  65|      2001|
+------+----+----+----------+



In [26]:
df_estudiantes.groupBy('graduacion').pivot('sexo').\
    agg(avg('peso')).show()

+----------+----+----+
|graduacion|   F|   M|
+----------+----+----+
|      2001|65.0|76.0|
|      2000|50.0|77.5|
+----------+----+----+



In [27]:
# pivot
df_estudiantes.groupBy('graduacion').pivot('sexo').\
    agg(avg('peso'), min('peso'), max('peso')).show()

+----------+-----------+-----------+-----------+-----------+-----------+-----------+
|graduacion|F_avg(peso)|F_min(peso)|F_max(peso)|M_avg(peso)|M_min(peso)|M_max(peso)|
+----------+-----------+-----------+-----------+-----------+-----------+-----------+
|      2001|       65.0|         65|         65|       76.0|         76|         76|
|      2000|       50.0|         50|         50|       77.5|         75|         80|
+----------+-----------+-----------+-----------+-----------+-----------+-----------+



In [28]:
df_estudiantes.groupBy('graduacion').pivot('sexo', ['M']).\
    agg(avg('peso'), min('peso'), max('peso')).show()

+----------+-----------+-----------+-----------+
|graduacion|M_avg(peso)|M_min(peso)|M_max(peso)|
+----------+-----------+-----------+-----------+
|      2001|       76.0|         76|         76|
|      2000|       77.5|         75|         80|
+----------+-----------+-----------+-----------+



In [29]:
df_estudiantes.groupBy('graduacion').pivot('sexo', ['F']).\
    agg(avg('peso'), min('peso'), max('peso')).show()

+----------+-----------+-----------+-----------+
|graduacion|F_avg(peso)|F_min(peso)|F_max(peso)|
+----------+-----------+-----------+-----------+
|      2001|       65.0|         65|         65|
|      2000|       50.0|         50|         50|
+----------+-----------+-----------+-----------+



### Join

In [31]:
empleados = spark.read.parquet(path+'empleados')
departamentos = spark.read.parquet(path+'departamentos')

In [32]:
empleados.show()

+------+--------+
|nombre|num_dpto|
+------+--------+
|  Luis|      33|
| Katia|      33|
|  Raul|      34|
| Pedro|       0|
| Laura|      34|
|Sandro|      31|
+------+--------+



In [33]:
departamentos.show()

+---+-----------+
| id|nombre_dpto|
+---+-----------+
| 31|     letras|
| 33|    derecho|
| 34| matemática|
| 35|informática|
+---+-----------+



In [34]:
# Inner join
join_df = empleados.join(departamentos, col('num_dpto') == col('id'), 'inner')
join_df.show()

+------+--------+---+-----------+
|nombre|num_dpto| id|nombre_dpto|
+------+--------+---+-----------+
|  Luis|      33| 33|    derecho|
| Katia|      33| 33|    derecho|
|  Raul|      34| 34| matemática|
| Laura|      34| 34| matemática|
|Sandro|      31| 31|     letras|
+------+--------+---+-----------+



In [35]:
# Left Outer Join
empleados.join(departamentos,
               col('num_dpto') == col('id'), 'leftouter').show()

+------+--------+----+-----------+
|nombre|num_dpto|  id|nombre_dpto|
+------+--------+----+-----------+
|  Luis|      33|  33|    derecho|
| Katia|      33|  33|    derecho|
|  Raul|      34|  34| matemática|
| Pedro|       0|NULL|       NULL|
| Laura|      34|  34| matemática|
|Sandro|      31|  31|     letras|
+------+--------+----+-----------+



In [36]:
# Right Outer Join
empleados.join(departamentos,
               col('num_dpto') == col('id'), 'rightouter').show()

+------+--------+---+-----------+
|nombre|num_dpto| id|nombre_dpto|
+------+--------+---+-----------+
|Sandro|      31| 31|     letras|
| Katia|      33| 33|    derecho|
|  Luis|      33| 33|    derecho|
| Laura|      34| 34| matemática|
|  Raul|      34| 34| matemática|
|  NULL|    NULL| 35|informática|
+------+--------+---+-----------+



In [37]:
# Full Outer Join
empleados.join(departamentos, col('num_dpto') == col('id'), 'outer').show()

+------+--------+----+-----------+
|nombre|num_dpto|  id|nombre_dpto|
+------+--------+----+-----------+
| Pedro|       0|NULL|       NULL|
|Sandro|      31|  31|     letras|
|  Luis|      33|  33|    derecho|
| Katia|      33|  33|    derecho|
|  Raul|      34|  34| matemática|
| Laura|      34|  34| matemática|
|  NULL|    NULL|  35|informática|
+------+--------+----+-----------+



In [40]:
# Left Anti Join
empleados.join(departamentos, col('num_dpto') == col('id'), 'left_anti').show()

+------+--------+
|nombre|num_dpto|
+------+--------+
| Pedro|       0|
+------+--------+



In [41]:
departamentos.join(empleados, col('num_dpto') == col('id'), 'left_anti').show()

+---+-----------+
| id|nombre_dpto|
+---+-----------+
| 35|informática|
+---+-----------+



In [42]:
# Left Semi Join
# opuesto left_anti
empleados.join(departamentos, col('num_dpto') == col('id'), 'left_semi').show()

+------+--------+
|nombre|num_dpto|
+------+--------+
|  Luis|      33|
| Katia|      33|
|  Raul|      34|
| Laura|      34|
|Sandro|      31|
+------+--------+



In [43]:
# Cross Join
df = empleados.crossJoin(departamentos)
df.show()

+------+--------+---+-----------+
|nombre|num_dpto| id|nombre_dpto|
+------+--------+---+-----------+
|  Luis|      33| 31|     letras|
|  Luis|      33| 33|    derecho|
|  Luis|      33| 34| matemática|
|  Luis|      33| 35|informática|
| Katia|      33| 31|     letras|
| Katia|      33| 33|    derecho|
| Katia|      33| 34| matemática|
| Katia|      33| 35|informática|
|  Raul|      34| 31|     letras|
|  Raul|      34| 33|    derecho|
|  Raul|      34| 34| matemática|
|  Raul|      34| 35|informática|
| Pedro|       0| 31|     letras|
| Pedro|       0| 33|    derecho|
| Pedro|       0| 34| matemática|
| Pedro|       0| 35|informática|
| Laura|      34| 31|     letras|
| Laura|      34| 33|    derecho|
| Laura|      34| 34| matemática|
| Laura|      34| 35|informática|
+------+--------+---+-----------+
only showing top 20 rows



## Columnas con nombres duplicados

In [44]:
depa = departamentos.withColumn('num_dpto', col('id'))
depa.printSchema()

root
 |-- id: long (nullable = true)
 |-- nombre_dpto: string (nullable = true)
 |-- num_dpto: long (nullable = true)



In [45]:
depa.show()

+---+-----------+--------+
| id|nombre_dpto|num_dpto|
+---+-----------+--------+
| 31|     letras|      31|
| 33|    derecho|      33|
| 34| matemática|      34|
| 35|informática|      35|
+---+-----------+--------+



In [47]:
# Devuelve un error
try:
    empleados.join(depa, col('num_dpto') == col('num_dpto'))
except Exception as error:
    # handle the exception
    print("An exception occurred:", type(error).__name__ , error)

An exception occurred: AnalysisException [AMBIGUOUS_REFERENCE] Reference `num_dpto` is ambiguous, could be: [`num_dpto`, `num_dpto`].


24/01/05 17:34:00 WARN Column: Constructing trivially true equals predicate, ''num_dpto = 'num_dpto'. Perhaps you need to use aliases.


In [48]:
# Forma correcta
df_con_duplicados = empleados.join(
    depa, empleados['num_dpto'] == depa['num_dpto'])


In [49]:
df_con_duplicados.printSchema()

root
 |-- nombre: string (nullable = true)
 |-- num_dpto: long (nullable = true)
 |-- id: long (nullable = true)
 |-- nombre_dpto: string (nullable = true)
 |-- num_dpto: long (nullable = true)



In [None]:
spark.sparkContext.stop()