## Setup

In [1]:
!pip install pyspark==3.3.1

Collecting pyspark==3.3.1
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5 (from pyspark==3.3.1)
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m14.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845496 sha256=845d99864a57e178f67446d83d107eb64c4e8e0ecc263f069a43b1c2d9224c09
  Stored in directory: /root/.cache/pip/wheels/f8/b2/6a/477d856abe8174d27566a3c99972c3c8c03252407675cc7021
Successfully built pyspark
Installing collected packages: py4j, pyspark
  

In [2]:
import numpy as np
import pandas as pd
import requests
import pickle

from xgboost import XGBRegressor
from itertools import chain
from google.colab import userdata
from datetime import timedelta, datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, month, when, date_format, mean, avg, to_date, concat_ws, sum, round, create_map, lit, row_number, stddev, isnan, lpad
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType

from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

In [4]:
sc = SparkSession.builder.master('local[*]').getOrCreate()
df = sc.read.csv(
  '/content/sample_data/airports-database.csv',
  header=True
  )

In [5]:
df.count()

254276

In [6]:
df.show()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

### Questions using PySpark

In [7]:
months = {
    1: "Janeiro", 2: "Fevereiro", 3: "Março", 4: "Abril",
    5: "Maio", 6: "Junho", 7: "Julho", 8: "Agosto",
    9: "Setembro", 10: "Outubro", 11: "Novembro", 12: "Dezembro"
}

# 1. Qual é o número total de voos no conjunto de dados?
total = df.count()
print(f"Número total de voos:   {total}")

# 2. Quantos voos foram cancelados?
canceled = df.filter((col('dep_time').isNull()) & (col('arr_time').isNull())).count()
print(f"Voos cancelados:   {canceled}")

# 3. Qual é o atraso médio na partida dos voos (dep_delay)?
avg_delay = df.select(mean('dep_delay')).collect()[0][0]
print(f"Atraso médio na partida dos voos:   {avg_delay:.2f} minutos")

# 4. Quais são os 5 aeroportos com maior número de pousos?
top_airports = df.groupBy('dest').count().orderBy('count', ascending=False).limit(5)
print("Aeroportos com maior número de pousos:")
top_airports.show()

# 5. Qual é a rota mais frequente (par origin-dest)?
route = df.groupBy('origin', 'dest').count().orderBy(col('count').desc()).limit(1).collect()[0][0]
print(f"Rota mais frequente:   {route}")

# 6. Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada?
biggest_delay = df.groupBy("carrier").agg(avg("arr_delay").alias("avg_arr_delay")).orderBy("avg_arr_delay", ascending=False).limit(5)
print("Companhias aéreas com maior tempo médio de atraso na chegada:")
biggest_delay.show()

# 7. Qual é o dia da semana com maior número de voos?
df_date = df.withColumn("date", to_date(concat_ws("-", "year", "month", "day")))
df_weekday = df_date.withColumn("day_of_week", date_format("date", "EEEE"))
day_of_week = df_weekday.groupBy("day_of_week").count().orderBy("count", ascending=False).limit(1).collect()[0][0]
print(f"Dia da semana com maior número de voos:   {day_of_week}")

# 8. Qual o percentual mensal dos voos tiveram atraso na partida superior a 30 minutos?
df_delayed = df.withColumn("delayed", when(col("dep_delay") > 30, 1).otherwise(0))

delay_percent = df_delayed.groupBy("month").agg(
    (100 * (sum("delayed") / count("*"))).alias("percentual_atraso_maior_30min")
).orderBy("month")

month_expr = create_map([lit(x) for x in chain(*months.items())])

df_formated = delay_percent.withColumn(
    "percentual_atraso_maior_30min", round("percentual_atraso_maior_30min", 2)
).withColumn(
    "mes_nome", month_expr[col("month")]
).select("month", "mes_nome", "percentual_atraso_maior_30min").orderBy("month")

print("Percentual mensal dos voos que tiveram atraso na partida superior a 30 minutos:")
df_formated.show(12, truncate=False)

# 9. Qual a origem mais comum para voos que pousaram em Seattle (SEA)?
comum_origin = df.filter(col("dest") == "SEA").groupBy("origin").count().orderBy("count", ascending=False).limit(1).collect()[0][0]
print(f"Origem mais comum para voos que pousaram em Seattle (SEA):   {comum_origin}")

# 10. Qual é a média de atraso na partida dos voos (dep_delay) para cada dia da semana?
weekday_delay = df_weekday.groupBy("day_of_week").agg(avg("dep_delay").alias("avg_dep_delay")).orderBy("avg_dep_delay", ascending=False)
print("Média de atraso na partida dos voos (dep_delay) para cada dia da semana:")
weekday_delay.show()

# 11. Qual é a rota que teve o maior tempo de voo médio (air_time)?
biggest_time_route = df.withColumn("rota", concat_ws(" -> ", col("origin"), col("dest"))).groupBy("rota").agg(avg("air_time").alias("tempo_medio_voo")).orderBy(col("tempo_medio_voo").desc()).limit(1)
print("Rota que teve o maior tempo de voo médio:")
biggest_time_route.show()

# 12. Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?
destiny_by_airport = df.groupBy("origin", "dest") \
  .agg(count("*").alias("total_voos")) \
  .withColumn("rank", row_number().over(Window.partitionBy("origin").orderBy(col("total_voos").desc()))) \
  .filter(col("rank") == 1) \
  .select("origin", "dest", "total_voos")
print("Aeroporto de destino mais comum para cada aeroporto de origem")
destiny_by_airport.show(truncate=False)

# 13. Quais são as 3 rotas que tiveram a maior variação no tempo médio de voo?
bigget_time_variation = df.withColumn("rota", concat_ws(" -> ", col("origin"), col("dest"))) \
  .groupBy("rota") \
  .agg(stddev("air_time").alias("variacao_air_time")) \
  .orderBy(col("variacao_air_time").desc()).limit(3)
print("3 rotas que tiveram a maior variação no tempo médio de voo")
bigget_time_variation.show()

# 14. Qual é a média de atraso na chegada para voos que tiveram atraso na partida superior a 1 hora?
delay_arrival = df.filter(col("dep_delay") > 60) \
  .agg(avg("arr_delay").alias("media_atraso_chegada"))
print("Média de atraso na chegada para voos que tiveram atraso na partida superior a 1 hora")
delay_arrival.show()

# 15. Qual é a média de voos diários para cada mês do ano?
month_flight = df.groupBy("month", "day") \
  .agg(count("*").alias("voos_dia")) \
  .groupBy("month") \
  .agg(avg("voos_dia").alias("media_diaria_voos")) \
  .orderBy("month")

month_expr = create_map([lit(x) for x in chain(*months.items())])

df_formated_flight = month_flight.withColumn(
    "media_diaria_voos", round("media_diaria_voos", 2)
).withColumn(
    "mes_nome", month_expr[col("month")]
).select("month", "mes_nome", "media_diaria_voos").orderBy("month")

print("Média de voos diários para cada mês do ano:")
df_formated_flight.show(12, truncate=False)

# 16. Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos?
comum_routes_with_delay = df.filter(col("arr_delay") > 30) \
  .withColumn("rota", concat_ws(" -> ", col("origin"), col("dest"))) \
  .groupBy("rota") \
  .agg(count("*").alias("qtd_voos_com_atraso")) \
  .orderBy(col("qtd_voos_com_atraso").desc()).limit(3)
print("3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos")
comum_routes_with_delay.show()

# 17. Para cada origem, qual o principal destino?
destiny_by_origin = df.groupBy("origin", "dest") \
  .agg(count("*").alias("qtd_voos")) \
  .withColumn("rank", row_number().over(Window.partitionBy("origin").orderBy(col("qtd_voos").desc()))) \
  .filter(col("rank") == 1) \
  .select("origin", "dest", "qtd_voos") \
  .orderBy("origin")
print("Principal destino para cada origem")
destiny_by_origin.show()


Número total de voos:   254276
Voos cancelados:   5427
Atraso médio na partida dos voos:   12.52 minutos
Aeroportos com maior número de pousos:
+----+-----+
|dest|count|
+----+-----+
| ORD|13363|
| ATL|12997|
| LAX|12560|
| BOS|11733|
| CLT|10773|
+----+-----+

Rota mais frequente:   JFK
Companhias aéreas com maior tempo médio de atraso na chegada:
+-------+------------------+
|carrier|     avg_arr_delay|
+-------+------------------+
|      7|             605.0|
|     FL|22.722875816993465|
|     F9| 20.84200385356455|
|     YV|15.218961625282168|
|     EV|14.220842651859849|
+-------+------------------+

Dia da semana com maior número de voos:   Thursday
Percentual mensal dos voos que tiveram atraso na partida superior a 30 minutos:
+-----+--------+-----------------------------+
|month|mes_nome|percentual_atraso_maior_30min|
+-----+--------+-----------------------------+
|null |null    |0.0                          |
|1    |Janeiro |12.41                        |
|10   |Outubro |9.34 

## ML Model

### Step 1: Null Value Handling

In [8]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|time_hour|name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+
|  0|   1|    1|  2|    5428|             2|     5430|    5786|             3|     6363|      5|     6|   1613|     7|   8|    6366|       8|   9|    10|       11|  12|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+



In [9]:
df_cleaned = df.dropna(subset=["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time"])

df_cleaned = df_cleaned.fillna({"tailnum": "UNKNOWN"})

In [10]:
df_cleaned.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_cleaned.columns]
   ).show()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|time_hour|name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+
|  0|   0|    0|  0|       0|             0|        0|       0|             0|        0|      0|     0|      0|     0|   0|       0|       0|   1|     1|        2|   3|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+---------+----+



### Step 2: Separation of Variables, Features (X) and Target (y)

In [11]:
target_col = "arr_delay"

cols_to_exclude = ["id", "arr_time", "arr_delay", "tailnum", "sched_arr_time", "name"]

feature_cols = [col for col in df_cleaned.columns if col not in cols_to_exclude]

df_pd = df_cleaned.select(feature_cols + [target_col]).toPandas()

X = df_pd[feature_cols]
y = df_pd[target_col]

### Step 3: Split the data into training and testing sets

In [12]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3
)

### Step 4: Validate the model on the test set

In [14]:
categorical_features = X_train.select_dtypes(include=['object']).columns
numeric_features = X_train.select_dtypes(include=[np.number]).columns

categorical_transformer = OneHotEncoder(handle_unknown='ignore')

numeric_transformer = Pipeline(steps=[
    ('scaler', StandardScaler())
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

xgb_model = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', XGBRegressor(n_estimators=200,
                                learning_rate=0.2,
                                max_depth=10,
                                random_state=42,
                                subsample=0.8,
                                colsample_bytree= 1,
                                verbosity=1))
])

xgb_model.fit(X_train, y_train)

### Step 5: Metrics

In [15]:
y_pred = xgb_model.predict(X_test)

mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print("Avaliação do modelo XGBoost:")
print(f"MSE  (Erro Quadrático Médio):        {mse:.2f}")
print(f"MAE  (Erro Absoluto Médio):           {mae:.2f}")
print(f"R²   (Coeficiente de Determinação):   {r2:.2f}")

Avaliação do modelo XGBoost:
MSE  (Erro Quadrático Médio):        562.99
MAE  (Erro Absoluto Médio):           14.81
R²   (Coeficiente de Determinação):   0.72


### Step 6: Save the model

In [16]:
model_pkl_file = "airport_delay.pkl"

with open(model_pkl_file, 'wb') as file:
    pickle.dump(xgb_model, file)

## Enrich database

### API Request

In [17]:
def get_airport_coordinates(airport_code, api_key):
    try:
        url = f"https://airportdb.io/api/v1/airport/K{airport_code}?apiToken={api_key}"
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        return float(data['latitude_deg']), float(data['longitude_deg'])
    except Exception as e:
        print(f"Erro ao obter coordenadas para {airport_code}: {e}")
        return None, None

In [18]:
def get_wind_speed(lat, lon, date, api_key):
    try:
        start_date = date.strftime("%Y-%m-%d")
        end_date = (date + timedelta(days=1)).strftime("%Y-%m-%d")
        url = 'https://api.weatherbit.io/v2.0/history/daily'
        params = {
            'lat': lat,
            'lon': lon,
            'start_date': start_date,
            'end_date': end_date,
            'key': api_key,
        }
        headers = {'Accept': 'application/json'}
        response = requests.get(url, params=params, headers=headers)
        data = response.json()
        return data['data'][0]['wind_spd']
    except:
        return None


### Final question

In [19]:
df_pd = df_cleaned.toPandas()

df_pd['date'] = pd.to_datetime(df_pd['year'].astype(str) + '-' +
                               df_pd['month'].astype(str).str.zfill(2) + '-' +
                               df_pd['day'].astype(str).str.zfill(2))

top_delays = df_pd.sort_values(by='arr_delay', ascending=False).head(5)

airportdb_key = userdata.get('airport_key')
weatherbit_key = userdata.get('weatherkey')

enriched_data = []
for _, row in top_delays.iterrows():
    origin = row['origin']
    dest = row['dest']
    date = row['date']

    lat_o, lon_o = get_airport_coordinates(origin, airportdb_key)
    lat_d, lon_d = get_airport_coordinates(dest, airportdb_key)

    wind_o = get_wind_speed(lat_o, lon_o, date, weatherbit_key)
    wind_d = get_wind_speed(lat_d, lon_d, date, weatherbit_key)

    row_data = row.to_dict()
    row_data.update({
        'wind_origin_mps': wind_o,
        'wind_dest_mps': wind_d
    })
    enriched_data.append(row_data)

enriched_df = pd.DataFrame(enriched_data)
print(enriched_df[['origin', 'dest', 'arr_delay', 'wind_origin_mps', 'wind_dest_mps']])


  origin dest arr_delay  wind_origin_mps  wind_dest_mps
0    JFK  RSW      99.0              5.3            3.8
1    EWR  BNA      99.0              2.8            2.6
2    LGA  MSY      99.0              4.2            3.2
3    EWR  FLL      99.0              1.6            4.2
4    EWR  AUS      99.0              3.6            1.7
