### PySpark RDD API

https://www.kaggle.com/divyansh22/flight-delay-prediction

(No esta flights.parquet en la carpeta de datasets)

* Ejercicios

In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd
from pyspark.sql import SQLContext, SparkSession

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

In [0]:
sqlContext = SQLContext(sc)
rdd = sqlContext.read.parquet('flights.parquet').rdd.repartition(8).cache() # Cache o sino cada vez que llamamos a rdd se hace la reparticion

In [9]:
rdd.count()

598963

In [10]:
rdd.takeSample(False, 2) # Toma la cantidad de filas que queremos y las trae al driver

[Row(DAY_OF_MONTH=7, DAY_OF_WEEK=2, OP_UNIQUE_CARRIER='DL', OP_CARRIER_AIRLINE_ID=19790, OP_CARRIER='DL', TAIL_NUM='N987DN', OP_CARRIER_FL_NUM=2336, ORIGIN_AIRPORT_ID=11057, ORIGIN_AIRPORT_SEQ_ID=1105703, ORIGIN='CLT', DEST_AIRPORT_ID=10397, DEST_AIRPORT_SEQ_ID=1039707, DEST='ATL', DEP_TIME=657.0, DEP_DEL15=0.0, DEP_TIME_BLK='0700-0759', ARR_TIME=813.0, ARR_DEL15=0.0, CANCELLED=0.0, DIVERTED=0.0, DISTANCE=226.0, TIMEDIFF=0),
 Row(DAY_OF_MONTH=3, DAY_OF_WEEK=5, OP_UNIQUE_CARRIER='OO', OP_CARRIER_AIRLINE_ID=20304, OP_CARRIER='OO', TAIL_NUM='N906SW', OP_CARRIER_FL_NUM=3873, ORIGIN_AIRPORT_ID=14869, ORIGIN_AIRPORT_SEQ_ID=1486903, ORIGIN='SLC', DEST_AIRPORT_ID=12156, DEST_AIRPORT_SEQ_ID=1215605, DEST='HLN', DEP_TIME=1107.0, DEP_DEL15=0.0, DEP_TIME_BLK='1100-1159', ARR_TIME=1245.0, ARR_DEL15=0.0, CANCELLED=0.0, DIVERTED=0.0, DISTANCE=402.0, TIMEDIFF=0)]

In [11]:
rdd.sample(False, 0.1) # Toma porcentaje del rdd y devuelve un RDD

PythonRDD[15] at RDD at PythonRDD.scala:53

In [0]:
# DISTANCE, DEP_TIME, ARR_TIME, DAY_OF_WEEK, ORIGIN, DEST, TAIL_NUM, DAY_OF_MONTH, DAY_OF_WEEK
# Ejemplo rdd.map(lambda x: (x.ORIGIN, x.DEST))

**Ejercicio 1:** Calcular la cantidad de vuelos por línea aérea (usar 
OP_UNIQUE_CARRIER). Calcular las diez (10) líneas aéreas con mayor cantidad de vuelos. Devolver una lista de Python con los códigos de estas 10 líneas.

In [13]:
carriers = rdd.map(lambda x: (x.OP_UNIQUE_CARRIER, 1))\
              .reduceByKey(lambda x,y: (x + y)).cache()
              
top10carriers = carriers.takeOrdered(10, lambda x: -x[1])
top10carriers

[('WN', 107708),
 ('DL', 79928),
 ('AA', 75472),
 ('OO', 69196),
 ('UA', 48019),
 ('YX', 28826),
 ('MQ', 25284),
 ('B6', 24621),
 ('OH', 23999),
 ('9E', 22955)]

In [0]:
top10carriers = [x[0] for x in top10carriers]

In [15]:
top10carriers

['WN', 'DL', 'AA', 'OO', 'UA', 'YX', 'MQ', 'B6', 'OH', '9E']

**Ejercicio 2:** Calcular el promedio de vuelos que llegaron con 15 minutos de demora o mas (ARR_DEL15 ==1) para las 10 líneas con mas vuelos (usar el ejercicio anterior), de estas indicar las tres mejores y las tres peores.

In [0]:
delays = rdd.filter(lambda x: x.OP_UNIQUE_CARRIER in top10carriers)\
            .map(lambda x: (x.OP_UNIQUE_CARRIER, (float(x.ARR_DEL15), 1)))\
            .reduceByKey(lambda x,y: ( x[0] + y[0], x[1] + y[1]))\
            .map(lambda x: (x[0], x[1][0] / x[1][1])).collect()

In [17]:
sorted(delays, key = lambda x:x[1], reverse = True)[:3]

[('OH', 0.21525896912371348),
 ('MQ', 0.19692295522860306),
 ('OO', 0.15386727556506155)]

In [18]:
sorted(delays, key = lambda x:x[1], reverse = False)[:3]

[('WN', 0.09582389423255469),
 ('DL', 0.10583274947452707),
 ('9E', 0.10956218688738836)]

**Ejercicio 3:** Calcular la cantidad de vuelos por ruta. Usando ORIGIN y DEST para la ruta. Devolver un rdd con la siguiente estructura: (RUTA, #Vuelos). Indicar además cuáles son las 10 rutas mas frecuentes y su cantidad de vuelos.

In [0]:
routes = rdd.map(lambda x: ((x.ORIGIN + x.DEST),1))\
            .reduceByKey(lambda x,y : x + y)\
            .cache()

In [21]:
routes.takeOrdered(10, lambda x:-x[1])

[('LAXSFO', 1191),
 ('SFOLAX', 1189),
 ('LASLAX', 1050),
 ('LAXLAS', 1033),
 ('JFKLAX', 1023),
 ('ORDLGA', 1022),
 ('LAXJFK', 1020),
 ('LGAORD', 1015),
 ('HNLOGG', 941),
 ('OGGHNL', 939)]

**Ejercicio 4:** Consideremos ahora la cantidad de líneas aéreas que transitan cada ruta, queremos saber cuáles son las diez rutas realizadas por mayor cantidad de líneas aéreas y cuáles son las diez líneas aéreas con mayor cantidad de rutas.
Devolver: Una lista de 10 tuplas de forma (ROUTE, #CARRIERS) y Una lista de 10 tuplas de tipo (Carrier, #ROUTES)

In [0]:
routes_by_lines = rdd.map(lambda x: ((x.ORIGIN + x.DEST, x.OP_UNIQUE_CARRIER), 1))\
                     .reduceByKey(lambda x,y : x + y).cache()

In [11]:
routes_by_lines.take(1)

[(('ORDABQ', 'MQ'), 37)]

In [14]:
routes_by_lines.map(lambda x: (x[0][0],1))\
              .reduceByKey(lambda x,y: x + y)\
              .takeOrdered(10, lambda x: -x[1])

[('ORDATL', 8),
 ('LGAORD', 8),
 ('ATLORD', 8),
 ('ORDLGA', 8),
 ('LASLAX', 8),
 ('ORDRDU', 8),
 ('RDUORD', 8),
 ('LAXLAS', 8),
 ('SEADEN', 7),
 ('MSPORD', 7)]

In [15]:
routes_by_lines.map(lambda x: (x[0][1],1))\
.reduceByKey(lambda x,y: x + y).takeOrdered(10, lambda x: -x[1])

[('OO', 1425),
 ('WN', 1413),
 ('DL', 874),
 ('YX', 808),
 ('AA', 777),
 ('UA', 693),
 ('G4', 685),
 ('F9', 502),
 ('MQ', 481),
 ('9E', 464)]

**Ejercicio 5:** Por cada ruta aérea calcular el promedio de tiempo de vuelo. Calculando ARR_TIME - DEP_TIME usando la función provista. Al calculo del tiempo de vuelo en minutos hay que sumarle TIMEDIFF que es la diferencia horaria entre las ciudades (en horas). Por lo tanto el calculo es:

```
hhmmtimediff(x.DEP_TIME,x.ARR_TIME) + (x.TIMEDIFF * 60)
```

Puntos extras: Además del promedio de tiempo de vuelo calcular la desviación standard del tiempo de vuelo para cada ruta.
Devolver: 
- Una lista de 10 tuplas de tipo (ROUTE, average_time)
- Una lista de 10 tuplas de tipo (ROUTE, time_std) (solo para las rutas con mas de 50 vuelos)

In [0]:
from numpy import sqrt
# Computes time diff in format HHMM (in minutes)
def hhmmtimediff(t1, t2):
  m2 = (t2 // 100) * 60 + (t2 % 100)
  m1 = (t1 // 100) * 60 + (t1 % 100)
  return m2 - m1 

In [0]:
routes_duration = rdd.map(lambda x: ((x.ORIGIN + x.DEST), hhmmtimediff(x.DEP_TIME, x.ARR_TIME) + (x.TIMEDIFF * 60)))\
.filter(lambda x:x[0] != 'GUMHNL')\
.filter(lambda x:x[1]>0)\
.cache()

In [18]:
routes_duration.take(10)

[('ORDGRB', 47.0),
 ('ORDGRB', 84.0),
 ('ORDGRB', 64.0),
 ('ORDGRB', 75.0),
 ('ORDGRB', 69.0),
 ('ORDGRB', 50.0),
 ('ORDGRB', 56.0),
 ('ORDGRB', 73.0),
 ('ORDGRB', 45.0),
 ('ORDGRB', 57.0)]

In [22]:
routes_duration.map(lambda x: (x[0], (x[1], 1))).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))\
                .map(lambda x: (x[0], x[1][0] / x[1][1])).take(10)

[('DTWGRB', 80.77064220183486),
 ('IAHABQ', 141.89),
 ('SLCABQ', 94.39024390243902),
 ('OKCDCA', 155.61666666666667),
 ('SAVDCA', 94.53846153846153),
 ('ORFDCA', 56.88636363636363),
 ('RDUDCA', 68.32558139534883),
 ('CLTDCA', 79.61685823754789),
 ('JANDCA', 138.31034482758622),
 ('INDDCA', 97.00689655172414)]

Para la desviacion standard usamos:
stdev = sqrt((sum_x2 / n) - (mean * mean))

In [27]:
routes_duration.map(lambda x: (x[0], (x[1], 1, x[1]**2))).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2]))\
                .map(lambda x: (x[0], sqrt(x[1][2]/x[1][1] - (x[1][0] / x[1][1])**2))).take(10)

[('DTWGRB', 11.122861016944148),
 ('IAHABQ', 16.166567353647043),
 ('SLCABQ', 10.700759145124943),
 ('OKCDCA', 12.690011382535715),
 ('SAVDCA', 6.766607883461319),
 ('ORFDCA', 14.06028563832917),
 ('RDUDCA', 12.566328079680467),
 ('CLTDCA', 10.773119276309428),
 ('JANDCA', 14.5153983595443),
 ('INDDCA', 11.405078452449999)]

In [0]:
# Misma resolucion pero pasado a rows
routes_stats = routes_duration.map(lambda x: (x[0],(x[1],x[1] ** 2,1)))\
.reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1], x[2] + y[2]))\
.map(lambda x: (x[0], x[1][0] / x[1][2], x[1][2], sqrt((x[1][1] / x[1][2])- ((x[1][0]/x[1][2])**2))))\
.map(lambda x: Row(ROUTE=x[0],AVERAGE_DURATION=x[1],NUM_FLIGHTS=x[2],DURATION_STD=x[3] )).cache()

In [29]:
routes_stats.takeOrdered(10, lambda x: -x.AVERAGE_DURATION)

[Row(AVERAGE_DURATION=677.0, DURATION_STD=25.78297703450658, NUM_FLIGHTS=21, ROUTE='BOSHNL'),
 Row(AVERAGE_DURATION=670.2, DURATION_STD=23.25240386958354, NUM_FLIGHTS=35, ROUTE='JFKHNL'),
 Row(AVERAGE_DURATION=664.0322580645161, DURATION_STD=21.654894210968795, NUM_FLIGHTS=31, ROUTE='EWRHNL'),
 Row(AVERAGE_DURATION=651.125, DURATION_STD=21.43850216316429, NUM_FLIGHTS=8, ROUTE='IADHNL'),
 Row(AVERAGE_DURATION=597.9666666666667, DURATION_STD=17.11428902668256, NUM_FLIGHTS=30, ROUTE='ATLHNL'),
 Row(AVERAGE_DURATION=589.0, DURATION_STD=7.035623639735144, NUM_FLIGHTS=4, ROUTE='DTWHNL'),
 Row(AVERAGE_DURATION=559.25, DURATION_STD=23.411519914475928, NUM_FLIGHTS=68, ROUTE='ORDHNL'),
 Row(AVERAGE_DURATION=559.0909090909091, DURATION_STD=19.05949796846426, NUM_FLIGHTS=22, ROUTE='ORDOGG'),
 Row(AVERAGE_DURATION=538.0909090909091, DURATION_STD=27.14759170786847, NUM_FLIGHTS=22, ROUTE='MSPHNL'),
 Row(AVERAGE_DURATION=522.2258064516129, DURATION_STD=12.696734854383065, NUM_FLIGHTS=31, ROUTE='IAHHNL

In [30]:
routes_stats.takeOrdered(10, lambda x: x.AVERAGE_DURATION)

[Row(AVERAGE_DURATION=25.466666666666665, DURATION_STD=5.891990797307446, NUM_FLIGHTS=30, ROUTE='PSGWRG'),
 Row(AVERAGE_DURATION=25.821428571428573, DURATION_STD=6.530442839001753, NUM_FLIGHTS=28, ROUTE='WRGPSG'),
 Row(AVERAGE_DURATION=33.32142857142857, DURATION_STD=4.375692365040032, NUM_FLIGHTS=28, ROUTE='SFOSTS'),
 Row(AVERAGE_DURATION=34.728813559322035, DURATION_STD=5.436320130240698, NUM_FLIGHTS=59, ROUTE='SJUSTT'),
 Row(AVERAGE_DURATION=35.9941348973607, DURATION_STD=4.460970618117005, NUM_FLIGHTS=682, ROUTE='LIHHNL'),
 Row(AVERAGE_DURATION=36.0, DURATION_STD=3.0, NUM_FLIGHTS=2, ROUTE='RICIAD'),
 Row(AVERAGE_DURATION=36.56190476190476, DURATION_STD=4.922105259074188, NUM_FLIGHTS=105, ROUTE='KOAOGG'),
 Row(AVERAGE_DURATION=36.953125, DURATION_STD=4.641624471494328, NUM_FLIGHTS=64, ROUTE='STTSJU'),
 Row(AVERAGE_DURATION=37.285714285714285, DURATION_STD=5.267536309842602, NUM_FLIGHTS=105, ROUTE='OGGKOA'),
 Row(AVERAGE_DURATION=37.96551724137931, DURATION_STD=7.53604102879183, NUM_

In [31]:
routes_stats.takeOrdered(10, lambda x: -x.DURATION_STD)

[Row(AVERAGE_DURATION=67.28395061728395, DURATION_STD=100.71723460935365, NUM_FLIGHTS=405, ROUTE='ATLBNA'),
 Row(AVERAGE_DURATION=52.194945848375454, DURATION_STD=86.06523998701414, NUM_FLIGHTS=277, ROUTE='ATLBHM'),
 Row(AVERAGE_DURATION=307.0, DURATION_STD=71.17935093831638, NUM_FLIGHTS=4, ROUTE='DFWSTT'),
 Row(AVERAGE_DURATION=252.66666666666666, DURATION_STD=40.17185305382092, NUM_FLIGHTS=9, ROUTE='HDNFLL'),
 Row(AVERAGE_DURATION=141.08333333333334, DURATION_STD=37.34402748618423, NUM_FLIGHTS=12, ROUTE='SGFSFB'),
 Row(AVERAGE_DURATION=221.25, DURATION_STD=36.77890020106637, NUM_FLIGHTS=4, ROUTE='HDNIAD'),
 Row(AVERAGE_DURATION=310.22222222222223, DURATION_STD=36.328066216931056, NUM_FLIGHTS=9, ROUTE='BOSHDN'),
 Row(AVERAGE_DURATION=236.6, DURATION_STD=33.75559212930507, NUM_FLIGHTS=5, ROUTE='HDNEWR'),
 Row(AVERAGE_DURATION=298.4, DURATION_STD=32.96422303043125, NUM_FLIGHTS=5, ROUTE='EWRHDN'),
 Row(AVERAGE_DURATION=168.0, DURATION_STD=32.526911934581186, NUM_FLIGHTS=10, ROUTE='DSMPIE

**Ejercicio 6:** Para cada linea aerea contar cuantos vuelos tuvieron cuya duracion se excedio en 15 minutos o mas la duracion promedio de la ruta (para todas las lineas). Indicar las 10 mejores lineas aereas y las 10 peores de acuerdo a esta metrica.

In [0]:
rdd_1 = routes_stats.map(lambda x: (x.ROUTE, x.AVERAGE_DURATION))
rdd_2 = rdd.map(lambda x: (x.ORIGIN + x.DEST,(x.OP_UNIQUE_CARRIER, hhmmtimediff(x.DEP_TIME, x.ARR_TIME) + (x.TIMEDIFF * 60))))

In [0]:
carrier_delays = rdd_1.join(rdd_2).cache()

In [37]:
carrier_delays.take(1)

[('SLCABQ', (94.39024390243902, ('OO', 85.0)))]

In [0]:
delays_by_carrier = carrier_delays.filter(lambda x: (x[1][1][1] - x[1][0]) > 15)\
                                  .map(lambda x: (x[1][1][0], 1))\
                                  .reduceByKey(lambda x, y: x + y).cache()

In [49]:
# 10 mejores
delays_by_carrier.takeOrdered(10, lambda x: x[1])

[('HA', 389),
 ('G4', 521),
 ('EV', 1447),
 ('NK', 1557),
 ('YV', 1565),
 ('F9', 1777),
 ('9E', 2288),
 ('B6', 2427),
 ('OH', 2696),
 ('MQ', 2962)]

In [50]:
# 10 peores
delays_by_carrier.takeOrdered(10, lambda x: -x[1])

[('OO', 8281),
 ('AA', 7949),
 ('DL', 6890),
 ('WN', 4896),
 ('UA', 4622),
 ('YX', 3908),
 ('AS', 3448),
 ('MQ', 2962),
 ('OH', 2696),
 ('B6', 2427)]

In [0]:
carrier_delays = carrier_delays.filter(lambda x: (x[1][1][1] - x[1][0]) > 15)\
.map(lambda x: (x[1][1][0],1))\
.reduceByKey(lambda x,y: x + y)\
.cache()

In [0]:
carrier_delays.takeOrdered(10, lambda x: -x[1])

[('OO', 8281),
 ('AA', 7949),
 ('DL', 6890),
 ('WN', 4896),
 ('UA', 4622),
 ('YX', 3908),
 ('AS', 3448),
 ('MQ', 2962),
 ('OH', 2696),
 ('B6', 2427)]

In [0]:
carrier_delays.takeOrdered(10, lambda x: x[1])

[('HA', 389),
 ('G4', 521),
 ('EV', 1447),
 ('NK', 1557),
 ('YV', 1565),
 ('F9', 1777),
 ('9E', 2288),
 ('B6', 2427),
 ('OH', 2696),
 ('MQ', 2962)]

#### Anexo de comentarios extra
* CombineByKey 

In [0]:
r1 = sc.parallelize([('A',1),('B',2),('C',3),('A',2),('C',28),('A',2)],4)

In [0]:
r1.reduceByKey(lambda x,y : x + y).collect()

[('A', 5), ('B', 2), ('C', 31)]

In [0]:
r1.combineByKey(lambda x: (x,1), lambda x,y: (x[0] + y[0] ,x[1] + y[1]), lambda x,y: (x[0] + y[0], x[1] + y[1])).collect()

[('A', (5, 3)), ('B', (2, 1)), ('C', (31, 2))]

* Tablas sql

In [0]:
sqlContext.read.parquet('flights.parquet').registerTempTable('flights')


In [0]:
sqlContext.sql("SELECT distinct(OP_UNIQUE_CARRIER) from flights WHERE ORIGIN = 'ORD'").show()

+-----------------+
|OP_UNIQUE_CARRIER|
+-----------------+
|               UA|
|               NK|
|               AA|
|               EV|
|               B6|
|               DL|
|               OO|
|               F9|
|               MQ|
|               YX|
|               AS|
|               9E|
+-----------------+

