# Case - Machine Learning Engineer - PicPay

Este material tem por objetivo trazer uma análise dos dados disponibilizados para o case técnico relativo ao processo de preenchimento de vaga de machine learning engineer. Neste trabalho, serão disponibilizados os processamentos necessários para a resposta das perguntas fornecidas, bem como a análise de métricas para a criação do modelo utilizado na API.

## Importando Base de Dados

In [1]:
from pyspark.sql.functions import (
    isnan, col, desc, mean, count, row_number,
    min, max, concat_ws, to_date, date_format,
    broadcast
)
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
import zipfile
import os
import requests
import time
from dotenv import load_dotenv

load_dotenv()

True

In [2]:
spark=SparkSession.builder.config(
    "spark.driver.bindAddress", "localhost"
    ).config(
        "spark.network.timeout","10000000"
    ).config(
        "spark.executor.heartbeatInterval", "10000000"
    ).appName("MLE Case").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/25 09:46:14 WARN Utils: Your hostname, Notebook, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/25 09:46:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/25 09:46:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
filepath = os.path.join(os.getcwd(), "airports-database.zip")

with zipfile.ZipFile(filepath, 'r') as zip_file:
    zip_file.extractall()

In [8]:
csv_filepath = os.path.join(os.getcwd(), "airports-database.csv")

df = spark.read.csv(csv_filepath, header=True, inferSchema=True)

df.show()

                                                                                

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

In [8]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: double (nullable = true)
 |-- sched_dep_time: integer (nullable = true)
 |-- dep_delay: double (nullable = true)
 |-- arr_time: double (nullable = true)
 |-- sched_arr_time: integer (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: double (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- time_hour: timestamp (nullable = true)
 |-- name: string (nullable = true)



In [9]:
df.count()

                                                                                

336776

## Respostas das perguntas

#### 1. Qual o número total de voos no conjunto de dados?

In [10]:
distinct_flights = df.select('flight').distinct()

distinct_flights.dropDuplicates().count()

3844

Portanto, o número total é de 3844 voos.

#### 2. Quantos voos foram cancelados?

In [11]:

df.filter(
    (isnan(col('dep_time')) | col('dep_time').isNull()) &
    (isnan(col('arr_time')) | col('arr_time').isNull())
).count()

8255

O número de voos cancelados, considerando a situação onde tanto a coluna `arr_time` quanto `dep_time` são nulos é de 8255 vôos.

#### 3. Quais são os 5 aeroportos com maior número de pousos?

In [12]:
res = df.groupBy('dest').count().orderBy(desc('count'))

print("5 aeroportos com maior número de pousos:")
res.show(5)

5 aeroportos com maior número de pousos:




+----+-----+
|dest|count|
+----+-----+
| ORD|17283|
| ATL|17215|
| LAX|16174|
| BOS|15508|
| MCO|14082|
+----+-----+
only showing top 5 rows


                                                                                

Assim, temos que os aeroportos com maior número de pousos são: O'Hare International Airport, em Chicago, EUA; Hartsfield-Jackson Atlanta International Airport, em Atlanta, EUA; Los Angeles International Airport, em Los Angeles, EUA; Logan International Airport, em Boston, EUA; Orlando International Airport, em Orlando, EUA.

#### 4. Qual a rota mais frequente (par origin-dest)?

In [13]:
df.groupBy(['origin', 'dest']).count().orderBy(desc('count')).show(1)

+------+----+-----+
|origin|dest|count|
+------+----+-----+
|   JFK| LAX|11262|
+------+----+-----+
only showing top 1 row


Portanto, temos que a rota mais frequênte parte do John F. Kennedy International Airport, em Nova Iorque, EUA, com destino ao Los Angeles International Airport, Los Angeles, EUA.

#### 5. Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada?

In [14]:
df.groupBy('carrier')\
    .agg(mean('arr_delay').alias('mean_delay'))\
    .orderBy(col('mean_delay').desc()).show(5)


+-------+------------------+
|carrier|        mean_delay|
+-------+------------------+
|     F9|21.920704845814978|
|     FL|20.115905511811025|
|     EV| 15.79643108710965|
|     YV|15.556985294117647|
|     OO|11.931034482758621|
+-------+------------------+
only showing top 5 rows


Temos evidências de que as 5 companhias aéreas com maior tempo médio de atraso na chegada são:
1. Frontier Airlines;
2. Fly Play;
3. EVA Air;
4. Mesa Airlines;
5. SkyWest Airlines.

#### 6. Qual é o dia da semana com maior número de vôos?

In [15]:
date_df = df.select('year', 'month', 'day').withColumn('date_as_str', concat_ws('-', 'year', 'month', 'day'))
date_df = date_df.withColumn('date', to_date('date_as_str', 'yyyy-M-d'))
date_df = date_df.withColumn('weekday', date_format('date', 'EEEE'))
date_df = date_df.groupBy('weekday').agg(count('*').alias('count_weekday')).orderBy(desc('count_weekday'))
date_df.show()

[Stage 24:>                                                         (0 + 4) / 4]

+---------+-------------+
|  weekday|count_weekday|
+---------+-------------+
|   Monday|        50690|
|  Tuesday|        50422|
|   Friday|        50308|
| Thursday|        50219|
|Wednesday|        50060|
|   Sunday|        46357|
| Saturday|        38720|
+---------+-------------+



                                                                                

Portanto, o dia da semana com maior número de voos é a Segunda-Feira.

#### 7. Qual a rota que teve maior tempo de vôo médio (air_time)?

In [16]:
df.groupBy(['origin', 'dest']).agg(mean('air_time').alias('mean_air_time')).orderBy(desc('mean_air_time')).show(1)



+------+----+-----------------+
|origin|dest|    mean_air_time|
+------+----+-----------------+
|   JFK| HNL|623.0877192982456|
+------+----+-----------------+
only showing top 1 row


                                                                                

A rota com maior tempo de vôo médio é a que parte do John F. Kennedy International Airport, em Nova Iorque, EUA, com destino ao Honolulu International Airport, Honolulu, EUA

#### 8. Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?

In [17]:
count_dest = df.groupBy('origin', 'dest').agg(count('*').alias('count_dest'))
window_func = Window.partitionBy('origin').orderBy(col('count_dest').desc())

wind_count_dest = count_dest.withColumn('pos', row_number().over(window_func))

most_common_dest = wind_count_dest.filter(col('pos') == 1).drop('pos')

most_common_dest.show()

+------+----+----------+
|origin|dest|count_dest|
+------+----+----------+
|   EWR| ORD|      6100|
|   JFK| LAX|     11262|
|   LGA| ATL|     10263|
+------+----+----------+



Daí, é possível verificar que o destino mais comum é:
* O'Hare International Airport, Chicaco, EUA quando saindo de Newark Liberty Airport, Newark, EUA;
* John F. Kennedy International Airport, Nova Iorque, EUA quando saindo de Los Angeles International Airport, Los Angeles, EUA;
* Atlantic City International Airport, Atlantic City, EUA quando saindo de LaGuardia Airport, Nova Iorque, EUA.

#### 9. Quais são as 3 rotas que tiveram maior variação no tempo médio de voo (air_time)?

In [18]:
df.groupBy('origin', 'dest')\
    .agg(
        min('air_time').alias('min_time'),
        max('air_time').alias('max_time')
    ).withColumn('variation', col('max_time') - col('min_time'))\
    .orderBy(col('variation').desc()).show(3)

+------+----+--------+--------+---------+
|origin|dest|min_time|max_time|variation|
+------+----+--------+--------+---------+
|   JFK| SFO|   301.0|   490.0|    189.0|
|   JFK| LAX|   275.0|   440.0|    165.0|
|   JFK| EGE|   219.0|   382.0|    163.0|
+------+----+--------+--------+---------+
only showing top 3 rows


                                                                                

Assim, temos as rotas com maior variação no tempo de vôo sendo:
* John F. Kennedy International Airport até San Francisco International Airport;
* John F. Kennedy International Airport até Los Angeles International Airport;
* John F. Kennedy International Airport até Eagle County Regional Airport.

#### 10. Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos?

In [19]:
df.groupby('origin', 'dest').agg(count(col('arr_delay') > 30).alias('delay')).orderBy(desc('delay')).show(3)

+------+----+-----+
|origin|dest|delay|
+------+----+-----+
|   JFK| LAX|11159|
|   LGA| ATL|10041|
|   LGA| ORD| 8507|
+------+----+-----+
only showing top 3 rows


Portanto, as rotas são:
* John F. Kennedy International Airport até Los Angeles International Airport;
* LaGuardia Airport até Hartsfield-Jackson Atlanta International Airport;
* LaGuardia Airport até O'Hare International Airport

# Respostas de perguntas extras

In [None]:
dest = df.select('dest').distinct().collect()

airport_list = []

for row in dest:
    airport_list.append(row.dest)

                                                                                

### Configuração de APIs externas

Obs: Como a API do Weatherbit não está funcionando da forma esperada (retorna erro de máximo de requisições por dia), será utilizada outra fonte para conseguir o dado de velocidade do vento. Nesse sentido, foi escolhida a Open-Meteor, que retorna a velocidade máxima do vento, para conclusão da atividade, visto que será apenas para fins ilustrativos.

In [22]:
import requests
import os
import time
from dotenv import load_dotenv


load_dotenv()

airportdb_token = os.getenv('AIRPORTDB_TOKEN')

start_date = '2013-01-01'
end_date = '2013-12-31'

windspeed_list = []
airports_not_found = []


for airport_code in airport_list:
    url = f"https://airportdb.io/api/v1/airport/K{airport_code}?apiToken={airportdb_token}"
    response = requests.get(url)
    if response.status_code == 404:
        print(f"Código ICAO errado: {airport_code}")
        airports_not_found.append(airport_code)
        continue
    response = response.json()
    lat = response['latitude_deg']
    lon = response['longitude_deg']
    url = "https://archive-api.open-meteo.com/v1/era5?" + \
    f"latitude={lat}&longitude={lon}&start_date={start_date}"+ \
    f"&end_date={end_date}&daily=wind_speed_10m_max"

    try:
        response = requests.get(url, timeout=20)
    except Exception as e:
        print(f"Erro: {airport_code}")
        print(e)
        continue
    response = response.json()
    for index in range(0, 365):
        windspeed_list.append([
            airport_code,
            response['daily']['time'][index],
            response['daily']['wind_speed_10m_max'][index]
        ])
    print(f"Sucesso: {airport_code}")
    time.sleep(20)

Código ICAO errado: PSE
Sucesso: MSY
Sucesso: SNA
Sucesso: BUR
Sucesso: GRR
Sucesso: MYR
Sucesso: GSO
Sucesso: PVD
Sucesso: OAK
Erro: MSN
HTTPSConnectionPool(host='archive-api.open-meteo.com', port=443): Read timed out. (read timeout=20)
Sucesso: DCA
Sucesso: LEX
Sucesso: ORF
Sucesso: CRW
Sucesso: SAV
Sucesso: CMH
Sucesso: CAK
Sucesso: IAH
Código ICAO errado: HNL
Sucesso: CVG
Erro: SJC
HTTPSConnectionPool(host='archive-api.open-meteo.com', port=443): Read timed out. (read timeout=20)
Erro: BUF
HTTPSConnectionPool(host='archive-api.open-meteo.com', port=443): Read timed out. (read timeout=20)
Sucesso: AUS
Código ICAO errado: SJU
Sucesso: AVL
Sucesso: LGB
Sucesso: SRQ
Sucesso: EYW
Sucesso: SBN
Sucesso: JAC
Sucesso: CHS
Sucesso: RSW
Sucesso: TUL
Sucesso: BOS
Erro: LAS
HTTPSConnectionPool(host='archive-api.open-meteo.com', port=443): Read timed out. (read timeout=20)
Sucesso: XNA
Sucesso: DEN
Sucesso: ALB
Sucesso: IAD
Sucesso: PSP
Sucesso: SEA
Sucesso: MCI
Sucesso: CLT
Sucesso: BNA
Sucesso

In [23]:
df_windspeed_list_1 = spark.createDataFrame(windspeed_list, ['origin', 'date-as-str', 'wind_speed'])
df_windspeed_list_1.write.csv('./windspeed_list_1.csv', header=True, mode='overwrite')

                                                                                

In [None]:
df_windspeed_list_origin = spark.createDataFrame(windspeed_list, ['origin', 'date-as-str', 'wind_speed'])
df_windspeed_list_origin.write.csv('./windspeed_list.csv', header=True, mode='overwrite')

In [27]:
df_windspeed_list_origin.coalesce(1).write.mode('overwrite').option('header', 'true').csv('windspeed_list.csv')

Existiram problemas na ingestão (requisições com erro de timeout, alguns aeroportos que não tinham o código ICAO igual ao código IATA). Serão tratados agora.

In [24]:
airportdb_token = os.getenv('AIRPORTDB_TOKEN')

start_date = '2013-01-01'
end_date = '2013-12-31'

wrong_icao_list = [
    ['PSE', 'TJPS'], ['HNL', 'PHNL'], ['SJU', 'TJSJ'], ['BQN', 'TJBQ'], ['STT', 'TIST'], ['ANC', 'PANC']
]

windspeed_list = []

for airport_code in wrong_icao_list:
    url = f"https://airportdb.io/api/v1/airport/{airport_code[1]}?apiToken={airportdb_token}"
    response = requests.get(url)
    if response.status_code == 404:
        print(f"Código ICAO errado: {airport_code}")
        airports_not_found.append(airport_code)
        continue
    response = response.json()
    lat = response['latitude_deg']
    lon = response['longitude_deg']
    url = "https://archive-api.open-meteo.com/v1/era5?" + \
    f"latitude={lat}&longitude={lon}&start_date={start_date}"+ \
    f"&end_date={end_date}&daily=wind_speed_10m_max"

    try:
        response = requests.get(url)
    except Exception as e:
        print(f"Erro: {airport_code[0]}")
        print(e)
        continue
    response = response.json()
    for index in range(0, 365):
        windspeed_list.append([
            airport_code[0],
            response['daily']['time'][index],
            response['daily']['wind_speed_10m_max'][index]
        ])
    print(f"Sucesso: {airport_code}")
    time.sleep(60)

Sucesso: ['PSE', 'TJPS']
Sucesso: ['HNL', 'PHNL']
Sucesso: ['SJU', 'TJSJ']
Sucesso: ['BQN', 'TJBQ']
Sucesso: ['STT', 'TIST']
Sucesso: ['ANC', 'PANC']


In [25]:
df_windspeed_list_2 = spark.createDataFrame(windspeed_list, ['origin', 'date-as-str', 'wind_speed'])
df_windspeed_list_2.write.csv('./windspeed_list_2.csv', header=True, mode='overwrite')

                                                                                

In [30]:
timedout_list = [
    'MSN', 'SJC', 'BUF', 'LAS', 'DFW', 'EGE', 'BHM', 'ILM', 'MDW', 'JAX', 'BZN', 'PHL'
]

windspeed_list = []

for airport_code in timedout_list:
    url = f"https://airportdb.io/api/v1/airport/K{airport_code}?apiToken={airportdb_token}"
    response = requests.get(url)
    if response.status_code == 404:
        print(f"Código ICAO errado: {airport_code}")
        airports_not_found.append(airport_code)
        continue
    response = response.json()
    lat = response['latitude_deg']
    lon = response['longitude_deg']
    url = "https://archive-api.open-meteo.com/v1/era5?" + \
    f"latitude={lat}&longitude={lon}&start_date={start_date}"+ \
    f"&end_date={end_date}&daily=wind_speed_10m_max"

    try:
        response = requests.get(url, timeout=20)
    except Exception as e:
        print(f"Erro: {airport_code}")
        print(e)
        continue
    response = response.json()
    for index in range(0, 365):
        windspeed_list.append([
            airport_code,
            response['daily']['time'][index],
            response['daily']['wind_speed_10m_max'][index]
        ])
    print(f"Sucesso: {airport_code}")
    time.sleep(60)

Sucesso: MSN
Sucesso: SJC
Sucesso: BUF
Sucesso: LAS
Sucesso: DFW
Sucesso: EGE
Sucesso: BHM
Sucesso: ILM
Sucesso: MDW
Sucesso: JAX
Sucesso: BZN
Sucesso: PHL


In [43]:
df_windspeed_list_3 = spark.createDataFrame(windspeed_list, ['origin', 'date-as-str', 'wind_speed'])
df_windspeed_list_3.write.csv('./windspeed_list_3.csv', header=True, mode='overwrite')

                                                                                

#### Consulta de aerportos de origem

In [26]:
origin_airports = [
    'EWR', 'LGA', 'JFK'
]
origin_windspeed = []
for airport_code in origin_airports:
    url = f"https://airportdb.io/api/v1/airport/K{airport_code}?apiToken={airportdb_token}"
    response = requests.get(url)
    if response.status_code == 404:
        print(f"Código ICAO errado: {airport_code}")
        airports_not_found.append(airport_code)
        continue
    response = response.json()
    lat = response['latitude_deg']
    lon = response['longitude_deg']
    url = "https://archive-api.open-meteo.com/v1/era5?" + \
    f"latitude={lat}&longitude={lon}&start_date={start_date}"+ \
    f"&end_date={end_date}&daily=wind_speed_10m_max"

    try:
        response = requests.get(url)
    except Exception as e:
        print(f"Erro: {airport_code}")
        print(e)
        continue
    response = response.json()
    for index in range(0, 365):
        origin_windspeed.append([
            airport_code,
            response['daily']['time'][index],
            response['daily']['wind_speed_10m_max'][index]
        ])
    print(f"Sucesso: {airport_code}")
    time.sleep(60)

Sucesso: EWR
Sucesso: LGA
Sucesso: JFK


In [32]:
df_windspeed_origin = spark.createDataFrame(origin_windspeed, ['origin', 'date-as-str', 'wind_speed'])

df_windspeed_origin.coalesce(1).write.csv('./windspeed_origin', header=True, mode='overwrite')


### Resposta da questão

In [15]:
df_windspeed_list_complete = spark.read.csv('windspeed-list-complete.csv', header=True, inferSchema=True)


In [16]:
df_windspeed_origin = spark.read.csv('windspeed-origin.csv', header=True, inferSchema=True)

In [17]:
csv_filepath = os.path.join(os.getcwd(), "airports-database.csv")

df2 = spark.read.csv(csv_filepath, header=True, inferSchema=True)

In [18]:
df2 = df2.withColumn('date_as_str', concat_ws('-', 'year', 'month', 'day'))
df2 = df2.withColumn('date', to_date('date_as_str', 'yyyy-M-d'))

df2.show()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+-----------+----------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|date_as_str|      date|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+-----------+----------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|   2013-1-1|2013-01-01|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   22

In [19]:
df_windspeed_origin = df_windspeed_origin.withColumnsRenamed({
    'date-as-str': 'date', 'wind_speed': 'wind_speed_origin'
}).withColumn('date', to_date('date', 'yyyy-M-d'))
df_windspeed_origin.show()

+------+----------+-----------------+
|origin|      date|wind_speed_origin|
+------+----------+-----------------+
|   EWR|2013-01-01|             19.4|
|   EWR|2013-01-02|             16.6|
|   EWR|2013-01-03|             12.7|
|   EWR|2013-01-04|             21.7|
|   EWR|2013-01-05|             17.7|
|   EWR|2013-01-06|             11.5|
|   EWR|2013-01-07|             14.3|
|   EWR|2013-01-08|             13.4|
|   EWR|2013-01-09|             15.0|
|   EWR|2013-01-10|             19.3|
|   EWR|2013-01-11|              9.5|
|   EWR|2013-01-12|              8.7|
|   EWR|2013-01-13|              7.9|
|   EWR|2013-01-14|             10.3|
|   EWR|2013-01-15|             13.2|
|   EWR|2013-01-16|             10.2|
|   EWR|2013-01-17|             14.3|
|   EWR|2013-01-18|             20.9|
|   EWR|2013-01-19|             22.5|
|   EWR|2013-01-20|             26.8|
+------+----------+-----------------+
only showing top 20 rows


In [20]:
extended_df = df2.join(
    broadcast(df_windspeed_origin),
    on=['origin', 'date'],
    how='left'
)

extended_df.show()

+------+----------+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+----+--------+--------+----+------+-------------------+--------------------+-----------+-----------------+
|origin|      date| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|dest|air_time|distance|hour|minute|          time_hour|                name|date_as_str|wind_speed_origin|
+------+----------+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+----+--------+--------+----+------+-------------------+--------------------+-----------+-----------------+
|   EWR|2013-01-01|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|   2013-1-1|             19.4|
|   LGA|2013-01-01|  1|2013|    1|  1|   533.0|         

In [21]:
df_windspeed_list_complete = df_windspeed_list_complete.withColumnsRenamed({
    'origin': 'dest', 'date-as-str': 'date', 'wind_speed': 'wind_speed_dest'
}).withColumn('date', to_date('date', 'yyyy-M-d'))

df_windspeed_list_complete.show()

+----+----------+---------------+
|dest|      date|wind_speed_dest|
+----+----------+---------------+
| MSY|2013-01-01|           24.5|
| MSY|2013-01-02|           24.1|
| MSY|2013-01-03|           21.8|
| MSY|2013-01-04|           20.3|
| MSY|2013-01-05|           17.1|
| MSY|2013-01-06|           23.0|
| MSY|2013-01-07|           19.8|
| MSY|2013-01-08|           20.7|
| MSY|2013-01-09|           24.5|
| MSY|2013-01-10|           29.4|
| MSY|2013-01-11|           23.7|
| MSY|2013-01-12|           22.5|
| MSY|2013-01-13|           26.5|
| MSY|2013-01-14|           25.5|
| MSY|2013-01-15|           25.9|
| MSY|2013-01-16|           22.7|
| MSY|2013-01-17|           27.1|
| MSY|2013-01-18|           20.2|
| MSY|2013-01-19|           13.5|
| MSY|2013-01-20|            9.4|
+----+----------+---------------+
only showing top 20 rows


In [22]:
extended_df = extended_df.join(
    broadcast(df_windspeed_list_complete),
    on=['dest', 'date'],
    how='left'
)

extended_df.show()

                                                                                

+----+----------+------+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+--------+--------+----+------+-------------------+--------------------+-----------+-----------------+---------------+
|dest|      date|origin| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|air_time|distance|hour|minute|          time_hour|                name|date_as_str|wind_speed_origin|wind_speed_dest|
+----+----------+------+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+--------+--------+----+------+-------------------+--------------------+-----------+-----------------+---------------+
| IAH|2013-01-01|   EWR|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|   2013-1-1|             19.4|         

In [23]:
extended_df.coalesce(1).write.csv('./airports-database-extended.csv', header=True, mode='overwrite')

                                                                                

In [24]:
extended_df.groupBy(['origin', 'dest', 'wind_speed_origin', 'wind_speed_dest']).agg(max('arr_delay').alias('max_arr_delay')).orderBy(desc('max_arr_delay')).show(5)

[Stage 44:>                                                         (0 + 4) / 4]

+------+----+-----------------+---------------+-------------+
|origin|dest|wind_speed_origin|wind_speed_dest|max_arr_delay|
+------+----+-----------------+---------------+-------------+
|   JFK| HNL|             17.7|           30.5|       1272.0|
|   JFK| CMH|             15.3|            9.2|       1127.0|
|   EWR| ORD|             19.3|           18.5|       1109.0|
|   JFK| SFO|             16.5|           18.1|       1007.0|
|   JFK| CVG|             14.5|           11.1|        989.0|
+------+----+-----------------+---------------+-------------+
only showing top 5 rows


                                                                                

Finalmente, temos os dados expandidos dos 5 vôos com maiores atrasos na chegada, que são:
* John F. Kennedy International Airport até Honolulu International Airport;
* John F. Kennedy International Airport até Port Columbus International Airport;
* Newark Liberty Airport até O'Hare International Airport;
* John F. Kennedy International Airport até San Francisco International Airport;
* John F. Kennedy International Airport até Cincinnati/Northern Kentucky International Airport.