# Guia 2: PySpark
### Ejercicio 3:

Se tiene un RDD con información de vuelos programados con la forma:

`(número de vuelo, código de aerolínea, código de aeropuerto de salida, código de aeropuerto de llegada, fecha de salida AAAAMMDD, hora de salida HH:MM, fecha de llegada AAAAMMDD, hora de llegada HH:MM)`.

A su vez, se cuenta con el registro actualizado del estado de los vuelos que fueron ocurriendo, con la forma:

`(número de vuelo, aerolínea, fecha de salida AAAAMMDD, hora de salida HH:MM, fecha de llegada AAAAMMDD, hora de llegada HH:MM, estado)`.

En base al estado, podría contar con algún dato en blanco, por ejemplo si el vuelo fue cancelado no tendrá información de fechas y horas, si el vuelo se encuentra aún en curso, no contendrá información de la llegada.

1. Cuál es el aeropuerto con mayor tránsito.
2. Cuál es la aerolínea con mayor cantidad de vuelos.
3. Cuál es la aerolínea con mayor cantidad de cancelaciones.
4. Cuál es el vuelo (numero de vuelo + fecha) con mayor retraso en el horario de salida.
5. Cuál es el vuelo (numero de vuelo + fecha) con mayor retraso en el horario de llegada.
6. Cuál es la aerolínea más puntual.
7. Cuál es el aeropuerto que registra mayor desviación con respecto a los horarios coordinados.


### Data

In [1]:
from datetime import datetime as dt
import pyspark
sc = pyspark.SparkContext()

In [2]:
# (número de vuelo, aerolínea, código de aeropuerto de salida, código de aeropuerto de llegada,
# fecha de salida AAAAMMDD, hora de salida HH:MM, fecha de llegada AAAAMMDD, hora de llegada HH:MM)
data=[
    (100, 'American Airlines', 'EZE', 'FCO', '20191204', '16:00', '20191204', '18:00'),
    (101, 'American Airlines', 'JFK', 'FCO', '20191205', '17:00', '20191205', '18:00'),
    (102, 'American Airlines', 'BRA', 'FCO', '20191206', '18:00', '20191206', '21:00'),
    (103, 'American Airlines', 'FCO', 'EZE', '20191207', '19:00', '20191207', '22:00'),
    (104, 'Aerolineas Argentinas', 'BRA', 'EZE', '20191208', '11:00', '20191208', '15:00'),
    (105, 'Aerolineas Argentinas', 'FCO', 'BRA', '20191209', '12:00', '20191209', '17:00'),
    (106, 'Aerolineas Argentinas', 'FCO', 'JFK', '20191210', '13:00', '20191210', '18:00'),
    (107, 'LATAM', 'EZE', 'JFK', '20191211', '14:00', '20191211', '18:00'),
    (108, 'LATAM', 'JFK', 'EZE', '20191212', '15:00', '20191212', '18:00'),
    ]

# (número de vuelo, aerolínea, fecha de salida AAAAMMDD, hora de salida HH:MM,
# fecha de llegada AAAAMMDD, hora de llegada HH:MM, estado).
data2=[
    (100, 'American Airlines', '20191204', '16:15', '20191204', '18:18', 'Finalizado'),
    (101, 'American Airlines', '20191205', '17:13', '', '', 'En curso'),
    (102, 'American Airlines', '20191206', '18:34', '20191206', '21:21', 'Finalizado'),
    (103, 'American Airlines', '20191207', '19:02', '20191207', '22:10', 'Finalizado'),
    (104, 'Aerolineas Argentinas', '20191208', '11:01', '', '', 'En curso'),
    (105, 'Aerolineas Argentinas', '20191209', '12:09', '20191209', '16:53', 'Finalizado'),
    (106, 'Aerolineas Argentinas', '20191210', '13:57', '20191210', '18:58', 'Finalizado'),
    (107, 'LATAM', '', '', '', '', 'Cancelado'),
    (108, 'LATAM', '20191212', '15:22', '20191212', '18:15', 'Finalizado'),
    ]

rdd = sc.parallelize(data)
rdd2 = sc.parallelize(data2)

# Inciso A: Aeropuerto con mayor transito

In [3]:
vuelos_salida = rdd.map(lambda x: (x[2],1))
salidas_por_aeropuerto = vuelos_salida.reduceByKey(lambda x,y:x+y)
vuelos_llegada = rdd.map(lambda x: (x[3],1))
llegadas_por_aeropuerto = vuelos_llegada.reduceByKey(lambda x,y:x+y)

In [4]:
salidas_y_llegadas_por_aeropuerto = salidas_por_aeropuerto.join(llegadas_por_aeropuerto)

In [6]:
actividad_por_aeropuerto = salidas_y_llegadas_por_aeropuerto.map(lambda x:(x[0],x[1][0]+x[1][1]))

In [9]:
actividad_por_aeropuerto.takeOrdered(1,lambda x: -x[1])

[('FCO', 6)]

# Inciso B: Aerolinea con mayor cantidad de vuelos

In [10]:
vuelos_por_aerolinea = rdd.map(lambda x: (x[1],1)).reduceByKey(lambda x,y:x+y)

In [11]:
vuelos_por_aerolinea.takeOrdered(1,lambda x:-x[1])

[('American Airlines', 4)]

# Inciso C: Aerolinea con mayor cant de cancelaciones

In [12]:
cancelaciones_por_aerolinea = rdd2.filter(lambda x: x[6]=='Cancelado').map(lambda x: (x[1],1)).reduceByKey(lambda x,y:x+y)

In [15]:
cancelaciones_por_aerolinea.reduce(lambda x,y: x if x[1] > y[1] else y)

('LATAM', 1)

In [16]:
cancelaciones_por_aerolinea.collect()

[('LATAM', 1)]

# Inciso D: Vuelo con mayor retraso en el horario de salida

In [27]:
horarios_salida_programados = rdd.map(lambda x:(x[0],dt.strptime(f"{x[4]}-{x[5]}",'%Y%m%d-%H:%M')))

In [34]:
horarios_salida_reales = rdd2.filter(lambda x: x[6] in ["Finalizado","En curso"]).map(lambda x:(x[0],dt.strptime(f"{x[2]}-{x[3]}",'%Y%m%d-%H:%M')))

In [35]:
horarios_salida = horarios_salida_programados.join(horarios_salida_reales)

In [36]:
horarios_salida.take(2)

[(104,
  (datetime.datetime(2019, 12, 8, 11, 0),
   datetime.datetime(2019, 12, 8, 11, 1))),
 (105,
  (datetime.datetime(2019, 12, 9, 12, 0),
   datetime.datetime(2019, 12, 9, 12, 9)))]

In [37]:
retrasos = horarios_salida.map(lambda x: (x[0],x[1][1]-x[1][0]))

In [40]:
vuelo_mas_retrasado = retrasos.reduce(lambda x,y: x if x[1] > y[1] else y)[0]

In [41]:
horarios_salida_programados.filter(lambda x:x[0]==vuelo_mas_retrasado).collect()

[(106, datetime.datetime(2019, 12, 10, 13, 0))]

# Inciso E: Vuelo con mas retraso en la llegada

In [43]:
horarios_llegada_programados = rdd.map(lambda x:(x[0],dt.strptime(f"{x[6]}-{x[7]}",'%Y%m%d-%H:%M')))

In [44]:
horarios_llegada_reales = rdd2.filter(lambda x: x[6] in ["Finalizado"]).map(lambda x:(x[0],dt.strptime(f"{x[4]}-{x[5]}",'%Y%m%d-%H:%M')))

In [46]:
horarios_llegada = horarios_llegada_reales.join(horarios_llegada_programados)

In [48]:
retrasos_llegada = horarios_llegada.map(lambda x:(x[0],x[1][0]-x[1][1]))

In [49]:
vuelo_mas_retrasado_llegada = retrasos_llegada.reduce(lambda x,y: x if x[1] > y[1] else y)[0]

In [51]:
horarios_llegada_programados.filter(lambda x:x[0] == vuelo_mas_retrasado_llegada).collect()

[(106, datetime.datetime(2019, 12, 10, 18, 0))]

# Inciso F: Aerolinea mas puntual

In [58]:
aerolinea_por_vuelo = rdd.map(lambda x:(x[0],x[1]))

In [59]:
retrasos.take(1) #Retrasos de salida

[(104, datetime.timedelta(0, 60))]

In [63]:
retrasos_por_aerolinea = retrasos.join(aerolinea_por_vuelo).map(lambda x: (x[1][1],(x[1][0],1)))

In [69]:
retrasos_por_aerolinea.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).map(lambda x:(x[0],x[1][0]/x[1][1])).reduce(lambda x,y: x if x[1]< y[1] else y)

('American Airlines', datetime.timedelta(0, 960))

# Inciso G: Aerolinea que presenta mas desviacion respecto de horarios coordinados

In [70]:
retrasos.take(1)

[(104, datetime.timedelta(0, 60))]

In [71]:
retrasos_llegada.take(1)

[(105, datetime.timedelta(-1, 85980))]

In [76]:
retrasos_total = retrasos.leftOuterJoin(retrasos_llegada).map(lambda x:(x[0],(x[1][0],0 if not x[1][1] else x[1][1]))).map(lambda x:(x[0],x[1][0]+x[1][1]))

In [85]:
retrasos.join(aerolinea_por_vuelo).map(lambda x: (x[1][1],(x[1][0],1))).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).map(lambda x:(x[0],x[1][0]/x[1][1])).reduce(lambda x,y: x if x[1]>y[1] else y)

('Aerolineas Argentinas', datetime.timedelta(0, 1340))