In [None]:
# !pip install pyspark==3.3.1

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, concat, lit, date_format, when, count, col
from pyspark.sql import functions as F

In [None]:
# Inicializa a sessão Spark
spark = SparkSession.builder \
    .appName("Colab Spark") \
    .getOrCreate()

In [None]:
file_path = 'airports-database.csv'

df = spark.read.csv(file_path, header=True, inferSchema=True)

In [None]:
df.show(5)

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

# Perguntas

## 1. Qual é o número total de voos no conjunto de dados?

In [None]:
ids_duplicados = df.groupBy('id').count().filter('count > 1')
ids_duplicados.show()

+---+-----+
| id|count|
+---+-----+
+---+-----+



In [None]:
qtd_voos = df.select('id').count()
print(f'Reposta: {qtd_voos} é o número total de voos no dataset')

Reposta: 336776 é o número total de voos no dataset


## 2. Quantos voos foram cancelados?

In [None]:
qtd_cancelados = df.filter(
    df['dep_time'].isNull()
    & df['arr_time'].isNull()
).count()
print(f'Resposta: {qtd_cancelados} é a quantidade de voos cancelados')

Resposta: 8255 é a quantidade de voos cancelados


## 3. Qual é o atraso médio na partida dos voos (dep_delay)?

In [None]:
avg_atraso_partida = round(df.agg(avg('dep_delay')).collect()[0][0], 2)
print(f'Resposta: {avg_atraso_partida} é o atraso médio na partida dos voos')

Resposta: 12.64 é o atraso médio na partida dos voos


## 4. Quais são os 5 aeroportos com maior número de pousos?

In [None]:
df_aeros_dest = df.groupBy('dest').count()
df_aeros_dest.orderBy('count', ascending=False).show(5)

+----+-----+
|dest|count|
+----+-----+
| ORD|17283|
| ATL|17215|
| LAX|16174|
| BOS|15508|
| MCO|14082|
+----+-----+
only showing top 5 rows



## 5. Qual é a rota mais frequente (par origin-dest)?

In [None]:
df = df.withColumn(
    'rota', concat(df['origin'], lit('-'), df['dest'])
)

In [None]:
df_rotas_freq = df.groupBy('rota').count()
df_rotas_freq.orderBy('count', ascending=False).show(1)

+-------+-----+
|   rota|count|
+-------+-----+
|JFK-LAX|11262|
+-------+-----+
only showing top 1 row



## 6. Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada?

In [None]:
df_avg_arr_delay = df.groupBy('carrier').agg(
    avg('arr_delay').alias('avg_arr_delay')
)
df_avg_arr_delay.orderBy('avg_arr_delay', ascending=False).show(5)

+-------+------------------+
|carrier|     avg_arr_delay|
+-------+------------------+
|     F9|21.920704845814978|
|     FL|20.115905511811025|
|     EV| 15.79643108710965|
|     YV|15.556985294117647|
|     OO|11.931034482758621|
+-------+------------------+
only showing top 5 rows



## 7. Qual é o dia da semana com maior número de voos?

In [None]:
df = df.withColumn(
    'dia_da_semana', date_format('time_hour', 'EEEE')
)

df = df.withColumn(
    'dia_da_semana_ptbr',
    when(df['dia_da_semana'] == 'Monday', 'Segunda-feira')
    .when(df['dia_da_semana'] == 'Tuesday', 'Terça-feira')
    .when(df['dia_da_semana'] == 'Wednesday', 'Quarta-feira')
    .when(df['dia_da_semana'] == 'Thursday', 'Quinta-feira')
    .when(df['dia_da_semana'] == 'Friday', 'Sexta-feira')
    .when(df['dia_da_semana'] == 'Saturday', 'Sábado')
    .when(df['dia_da_semana'] == 'Sunday', 'Domingo')
)

In [None]:
df_dia_semana = df.groupBy('dia_da_semana_ptbr').count()
df_dia_semana.orderBy('count', ascending=False).show(1)

+------------------+-----+
|dia_da_semana_ptbr|count|
+------------------+-----+
|     Segunda-feira|50690|
+------------------+-----+
only showing top 1 row



## 8. Qual o percentual mensal dos voos tiveram atraso na partida superior a 30 minutos?

In [None]:
df = df.withColumn(
    'dep_delay_sup_30',
    when(df['dep_delay'] > 30, 1).otherwise(0)
)

df_pct_mensal = df.groupBy('month').agg(
    count('*').alias('total_registros'),
    F.sum('dep_delay_sup_30').alias('atrasos_maior_30')
)

df_pct_mensal = df_pct_mensal.withColumn(
    'percentual_atrasos_maior_30',
    (col('atrasos_maior_30') / col('total_registros'))*100
)

df_pct_mensal.orderBy('month').show()

+-----+---------------+----------------+---------------------------+
|month|total_registros|atrasos_maior_30|percentual_atrasos_maior_30|
+-----+---------------+----------------+---------------------------+
|    1|          27004|            3350|         12.405569545252556|
|    2|          24951|            3182|         12.752995871908942|
|    3|          28834|            4309|         14.944163140736629|
|    4|          28330|            4531|         15.993646311330744|
|    5|          28796|            4416|         15.335463258785943|
|    6|          28243|            5717|          20.24218390397621|
|    7|          29425|            6173|          20.97875955819881|
|    8|          29327|            4238|         14.450847342039758|
|    9|          27574|            2419|           8.77275694494814|
|   10|          28889|            2697|          9.335733324102598|
|   11|          27268|            2388|          8.757517969781428|
|   12|          28135|           

## 9. Qual a origem mais comum para voos que pousaram em Seattle (SEA)?

In [None]:
df_dest_sea = df.filter(col('dest') == 'SEA')
df_agrup_origens = df_dest_sea.groupBy('origin').count()
df_agrup_origens.orderBy('count', ascending=False).show(1)

+------+-----+
|origin|count|
+------+-----+
|   JFK| 2092|
+------+-----+
only showing top 1 row



## 10. Qual é a média de atraso na partida dos voos (dep_delay) para cada dia da semana?

In [None]:
df_avg_wkd_delay = df.groupBy('dia_da_semana_ptbr').agg(
    avg('dep_delay').alias('avg_dep_delay')
)
df_avg_wkd_delay.orderBy('avg_dep_delay', ascending=False).show()

+------------------+------------------+
|dia_da_semana_ptbr|     avg_dep_delay|
+------------------+------------------+
|      Quinta-feira|16.148919990957108|
|     Segunda-feira|14.778936729330908|
|       Sexta-feira| 14.69605749486653|
|      Quarta-feira|11.803512219083876|
|           Domingo|11.589531801152422|
|       Terça-feira|10.631682565455652|
|            Sábado| 7.650502333676133|
+------------------+------------------+



## 11. Qual é a rota que teve o maior tempo de voo médio (air_time)?

In [None]:
df_avg_rota_airtime = df.groupBy('rota').agg(
    avg('air_time').alias('avg_air_time')
)
df_avg_rota_airtime.orderBy('avg_air_time', ascending=False).show(1)

+-------+-----------------+
|   rota|     avg_air_time|
+-------+-----------------+
|JFK-HNL|623.0877192982456|
+-------+-----------------+
only showing top 1 row



## 12. Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?

In [None]:
from pyspark.sql.window import Window

In [None]:
(
    df.groupBy('origin', 'dest')
    .agg(count('*').alias('qtd_voos'))
    .withColumn(
        'max_qtd_voos',
        F.max('qtd_voos').over(Window.partitionBy('origin'))
    )
    .filter(col('qtd_voos') == col('max_qtd_voos'))
    .select('origin', 'dest', 'qtd_voos')
).show()

+------+----+--------+
|origin|dest|qtd_voos|
+------+----+--------+
|   EWR| ORD|    6100|
|   JFK| LAX|   11262|
|   LGA| ATL|   10263|
+------+----+--------+



## 13. Quais são as 3 rotas que tiveram a maior variação no tempo médio de voo (air_time) ?

In [None]:
(
    df.groupBy('rota').agg(
        F.stddev('air_time').alias('std_air_time')
    )
).orderBy('std_air_time', ascending=False).show(3)

+-------+------------------+
|   rota|      std_air_time|
+-------+------------------+
|LGA-MYR| 25.32455988429677|
|EWR-HNL| 21.26613546847427|
|JFK-HNL|20.688824842787056|
+-------+------------------+
only showing top 3 rows



## 14. Qual é a média de atraso na chegada para voos que tiveram atraso na partida superior a 1 hora?

In [None]:
(
    df.filter(
        col('dep_delay') > 60
    ).agg(
        avg('arr_delay').alias('avg_arr_delay')
    )
).show()

+------------------+
|     avg_arr_delay|
+------------------+
|119.04880549963919|
+------------------+



## 15. Qual é a média de voos diários para cada mês do ano?

In [None]:
(
    df.groupBy('month', 'day').agg(
        count('*').alias('qtd_voos_diarios')
    )
    .groupBy('month')
    .agg(avg('qtd_voos_diarios').alias('avg_voos_diarios'))
    .orderBy('month')
).show()

+-----+-----------------+
|month| avg_voos_diarios|
+-----+-----------------+
|    1|871.0967741935484|
|    2|891.1071428571429|
|    3|930.1290322580645|
|    4|944.3333333333334|
|    5|928.9032258064516|
|    6|941.4333333333333|
|    7|949.1935483870968|
|    8|946.0322580645161|
|    9|919.1333333333333|
|   10|931.9032258064516|
|   11|908.9333333333333|
|   12|907.5806451612904|
+-----+-----------------+



## 16. Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos?

In [None]:
(
    df.filter(col('arr_delay') > 30)
    .groupBy('rota')
    .agg(count('*').alias('qtd_voos'))
    .orderBy('qtd_voos', ascending=False)
).show(3)

+-------+--------+
|   rota|qtd_voos|
+-------+--------+
|LGA-ATL|    1563|
|JFK-LAX|    1286|
|LGA-ORD|    1188|
+-------+--------+
only showing top 3 rows



## 17. Para cada origem, qual o principal destino?

In [None]:
(
    df.groupBy('origin', 'dest')
    .agg(count('*').alias('qtd_voos'))
    .withColumn(
        'max_qtd_voos',
        F.max('qtd_voos').over(Window.partitionBy('origin'))
    )
    .filter(col('qtd_voos') == col('max_qtd_voos'))
    .select('origin', 'dest')
).show()

+------+----+
|origin|dest|
+------+----+
|   EWR| ORD|
|   JFK| LAX|
|   LGA| ATL|
+------+----+



# Enriquecimento dos dados

In [None]:
import requests
from datetime import timedelta

In [None]:
airportdb_key = '9bdb5e6f5f603599c7083d38203f81ebf2f0178b59e611b85f55959fe54b90f8e543328b913212073e00b1498e0d8941'
weatherbit_key = '746c0c74a5984b0390f851bac42819c3'

In [None]:
voos_atrasados = df.orderBy(col("arr_delay").desc()).limit(5).collect()

In [None]:
dict_voos = {}

for voo in voos_atrasados:
  dict_voos[voo['id']] = {
      'aero_origem': voo['origin'],
      'aero_dest': voo['dest'],
      'datetime': voo['time_hour'],
  }

In [None]:
resultados_enriquecidos = {}

for voo, infos in dict_voos.items():
  lista_aeros = [('origem', infos['aero_origem']), ('destino', infos['aero_dest'])]
  date = infos['datetime']

  for tipo, airport_code in lista_aeros:
    # Obtém dados de latitude e longitude da AirportDB API
    url = f"https://airportdb.io/api/v1/airport/K{airport_code}?apiToken={airportdb_key}"
    response = requests.get(url)
    data = response.json()

    try:
      latitude = data['latitude_deg']
      longitude = data['longitude_deg']
    except:
      latitude = 'Not found'
      longitude = 'Not found'

    # Obtém dados de velocidade do vento da Weatherbit API
    start_date = date.strftime('%Y-%m-%d')
    end_date = (date + timedelta(days=1)).strftime('%Y-%m-%d')

    url = 'https://api.weatherbit.io/v2.0/history/daily'
    params = {
      'lat': latitude,
      'lon': longitude,
      'start_date': start_date,
      'end_date': end_date,
      'key': weatherbit_key,
    }
    headers = {
      'Accept': 'application/json',
    }
    response = requests.get(url, params=params, headers=headers)
    data = response.json()

    try:
      velocidade_vento = data['data'][0]['wind_spd']
    except:
      velocidade_vento = 'null'

    # Alimenta o dicionário com dados enriquecidos
    resultados_enriquecidos[str(voo)+'-'+tipo] = {
        'aeroporto': airport_code,
        'data': start_date,
        'latitude': latitude,
        'longitude': longitude,
        'velocidade_vento': velocidade_vento
    }

In [None]:
resultados_enriquecidos

{'7072-origem': {'aeroporto': 'JFK',
  'data': '2013-01-09',
  'latitude': 40.639801,
  'longitude': -73.7789,
  'velocidade_vento': 3.5},
 '7072-destino': {'aeroporto': 'HNL',
  'data': '2013-01-09',
  'latitude': 'Not found',
  'longitude': 'Not found',
  'velocidade_vento': 'null'},
 '235778-origem': {'aeroporto': 'JFK',
  'data': '2013-06-15',
  'latitude': 40.639801,
  'longitude': -73.7789,
  'velocidade_vento': 4.1},
 '235778-destino': {'aeroporto': 'CMH',
  'data': '2013-06-15',
  'latitude': 39.998001,
  'longitude': -82.891899,
  'velocidade_vento': 1.9},
 '8239-origem': {'aeroporto': 'EWR',
  'data': '2013-01-10',
  'latitude': 40.692501,
  'longitude': -74.168701,
  'velocidade_vento': 4.1},
 '8239-destino': {'aeroporto': 'ORD',
  'data': '2013-01-10',
  'latitude': 41.9786,
  'longitude': -87.9048,
  'velocidade_vento': 4.1},
 '327043-origem': {'aeroporto': 'JFK',
  'data': '2013-09-20',
  'latitude': 40.639801,
  'longitude': -73.7789,
  'velocidade_vento': 3.7},
 '327043

# Modelagem

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import joblib

pd.set_option('display.max_columns', None)

In [2]:
pd_df = pd.read_csv('airports-database.csv')

In [3]:
pd_df.head(2)

Unnamed: 0,id,year,month,day,dep_time,sched_dep_time,dep_delay,arr_time,sched_arr_time,arr_delay,carrier,flight,tailnum,origin,dest,air_time,distance,hour,minute,time_hour,name
0,0,2013,1,1,517.0,515,2.0,830.0,819,11.0,UA,1545,N14228,EWR,IAH,227.0,1400,5,15,2013-01-01 05:00:00,United Air Lines Inc.
1,1,2013,1,1,533.0,529,4.0,850.0,830,20.0,UA,1714,N24211,LGA,IAH,227.0,1416,5,29,2013-01-01 05:00:00,United Air Lines Inc.


In [4]:
pd_df.dtypes

id                  int64
year                int64
month               int64
day                 int64
dep_time          float64
sched_dep_time      int64
dep_delay         float64
arr_time          float64
sched_arr_time      int64
arr_delay         float64
carrier            object
flight              int64
tailnum            object
origin             object
dest               object
air_time          float64
distance            int64
hour                int64
minute              int64
time_hour          object
name               object
dtype: object

## Seleção de variáveis

In [5]:
# Variáveis que provavelmente tem o maior impacto no atraso na chegada:
sel_cols = [
    'sched_dep_time',
    'dep_delay',
    'sched_arr_time',
    'air_time',
    'distance',
    'hour'
]

## Separação X & y / Tratativa de nulos

In [6]:
X = pd_df[sel_cols]
y = pd_df['arr_delay']

In [7]:
X = X.fillna(X.median())
y = y.fillna(0)

## Split treino e teste

In [8]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

## Treinamento

In [9]:
model = RandomForestRegressor(n_estimators=10, random_state=42)
model.fit(X_train, y_train)

## Avaliação do modelo

In [10]:
y_pred = model.predict(X_test)

In [11]:
# Métricas
mae = mean_absolute_error(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)
rmse = mse**0.5
r2 = r2_score(y_test, y_pred)

print("Métricas do modelo:")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"Mean Squared Error (MSE): {mse}")
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"R2 Score: {r2}")

Métricas do modelo:
Mean Absolute Error (MAE): 10.977757241475953
Mean Squared Error (MSE): 259.7905467723191
Root Mean Squared Error (RMSE): 16.118019319144615
R2 Score: 0.8638915012732711


### MAE: Mean Absolute Error
- Definição: O MAE mede a média das diferenças absolutas entre os valores reais e os valores previstos. Ele indica o erro médio sem considerar a direção do erro (positivo ou negativo).
- Interpretação: O MAE é simples de entender e oferece uma média dos erros absolutos. Quanto menor o MAE, melhor o modelo, já que ele indica uma previsão mais próxima dos valores reais.

### MSE: Mean Squared Error
- Definição: O MSE mede a média dos quadrados das diferenças entre os valores reais e os valores previstos. Ele penaliza erros maiores de forma mais severa, pois os erros são elevados ao quadrado.
- Interpretação: O MSE penaliza mais os erros grandes (como desvios maiores entre o valor real e o previsto), o que pode ser útil se for importante evitar grandes erros. No entanto, ele é sensível a outliers devido ao termo ao quadrado.

### RMSE: Root Mean Squared Error
- Definição: O RMSE é simplesmente a raiz quadrada do MSE. Ele também penaliza mais os erros grandes, mas o resultado é trazido para a mesma unidade da variável de interesse.
- Interpretação: O RMSE é mais interpretável do que o MSE, pois suas unidades são as mesmas dos dados originais. Ele é útil quando você deseja medir o erro em unidades reais. Como o MSE, o RMSE também é sensível a outliers.

### R² (R-Squared ou Coeficiente de Determinação)
- Definição: O R² mede a proporção da variabilidade dos dados que é explicada pelo modelo. Ele compara o desempenho do modelo com a média dos valores reais e indica o quão bem o modelo se ajusta aos dados. O R² pode variar de 0 a 1.
- Interpretação: Um R² próximo de 1 indica que o modelo explica uma grande parte da variação nos dados.

## Salvar modelo treinado

In [12]:
joblib.dump(model, 'modelo_voos_atraso.pkl')

['modelo_voos_atraso.pkl']