# PicPay Case
Júlia Almeida

----------

Todos os imports utilizados

In [0]:
from pyspark.sql.functions import avg, desc, concat_ws, to_date, dayofweek, col, round, when, count, sum, row_number, stddev
from pyspark.sql.window import Window
import requests
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.preprocessing import LabelEncoder
import joblib

In [0]:
# Carregamento do DF (salvei em um starter warehouse no databricks)
df = spark.sql("SELECT * FROM default.airports_database")

## Utilitários

In [0]:
weekdays_names = {
    1: "Domingo", 2: "Segunda", 3: "Terça",
    4: "Quarta", 5: "Quinta", 6: "Sexta", 7: "Sábado"
}

# Perguntas

1. Qual é o número total de voos no conjunto de dados?

In [0]:
total_flights = df.count()
print(f"Há um total de {total_flights} no conjunto de dados")

Há um total de 336776 no conjunto de dados


2. Quantos voos foram cancelados? (Considerando que voos cancelados têm
dep_time e arr_time nulos)

In [0]:
canceled_flights = df.filter((df.dep_time.isNull()) & (df.arr_time.isNull())).count()
print(f"Há um total de {canceled_flights} voos cancelados no conjunto de dados")

Há um total de 8255 voos cancelados no conjunto de dados


3. Qual é o atraso médio na partida dos voos (dep_delay)?

In [0]:
# Aqui, filtrei pelos voos que realmente tiveram ATRASO, não adiantamento
# No caso, os atrasos são quando o dep_delay são maiores do que zero (valores positivos)
flights_with_delay = df.filter(df.dep_delay > 0)
depart_avg = flights_with_delay.select(avg("dep_delay")).collect()[0][0]
print(f"A partida dos voos atrasa, em média, {depart_avg:.2f} minutos.")

A partida dos voos atrasa, em média, 39.37 minutos.


4. Quais são os 5 aeroportos com maior número de pousos?

In [0]:
# A coluna "dest" contém os aeroports onde os voos pousaram
dest_airports = df.groupBy("dest") \
  .count() \
  .orderBy(desc("count")) \
  .limit(5)

print("Os 5 Aeroportos com mais pousos são:\n")
for row in dest_airports.collect():
    print(f"{row['dest']} - Pousos: {row['count']}")

Os 5 Aeroportos com mais pousos são:

ORD - Pousos: 17283
ATL - Pousos: 17215
LAX - Pousos: 16174
BOS - Pousos: 15508
MCO - Pousos: 14082


5. Qual é a rota mais frequente (par origin-dest)?

In [0]:
# Preciso criar uma coluna que agrupa origem e destino para descobrir qual a rota mais frequente
rotas_frequentes = df.withColumn("rota", concat_ws(" -> ", "origin", "dest")) \
                     .groupBy("rota") \
                     .count() \
                     .orderBy(desc("count")) \
                     .limit(1)

rota = rotas_frequentes.collect()[0]
print(f"A rota mais frequente é {rota['rota']} com {rota['count']} voos.")

A rota mais frequente é JFK -> LAX com 11262 voos.


6. Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada?
(Exiba também o tempo)

In [0]:
big_delays_companies = df.groupBy("carrier") \
                         .agg(avg("arr_delay").alias("media_atraso_chegada")) \
                         .orderBy(desc("media_atraso_chegada")) \
                         .limit(5)

print("Top 5 companhias com maior tempo médio de atraso na chegada:\n")
for row in big_delays_companies.collect():
    print(f"Companhia: {row['carrier']} - Atraso médio na chegada: {row['media_atraso_chegada']:.2f} minutos")

Top 5 companhias com maior tempo médio de atraso na chegada:

Companhia: F9 - Atraso médio na chegada: 21.92 minutos
Companhia: FL - Atraso médio na chegada: 20.12 minutos
Companhia: EV - Atraso médio na chegada: 15.80 minutos
Companhia: YV - Atraso médio na chegada: 15.56 minutos
Companhia: OO - Atraso médio na chegada: 11.93 minutos


7. Qual é o dia da semana com maior número de voos?

In [0]:
df_date = df.withColumn("data_voo", to_date(
    concat_ws("-", col("year"), col("month"), col("day"))
))

df_weekday = df_date.withColumn("dia_semana", dayofweek("data_voo"))

flights_per_day = df_weekday.groupBy("dia_semana") \
    .count() \
    .orderBy(desc("count"))

most_flights = flights_per_day.first()
dia_top = weekdays_names.get(most_flights["dia_semana"], "Desconhecido")

for row in flights_per_day.collect():
    weekday_name = weekdays_names.get(row['dia_semana'], "Desconhecido")
    print(f"{weekday_name}: {row['count']} voos")

print(f"\nO dia com mais voos é: **{dia_top}**, com {most_flights['count']} voos.")

Segunda: 50690 voos
Terça: 50422 voos
Sexta: 50308 voos
Quinta: 50219 voos
Quarta: 50060 voos
Domingo: 46357 voos
Sábado: 38720 voos

O dia com mais voos é: **Segunda**, com 50690 voos.


8. Qual o percentual mensal dos voos tiveram atraso na partida superior a 30 minutos?

In [0]:
df_delay = df.withColumn("atraso_30+", when(col("dep_delay") > 30, 1).otherwise(0))

flights_per_month = df_delay.groupBy("month") \
    .agg(
        count("*").alias("total_voos"),
        sum("atraso_30+").alias("voos_atrasados")
    ) \
    .withColumn("percentual_atraso", round((col("voos_atrasados") / col("total_voos")) * 100, 2)) \
    .orderBy("month")

print("Percentual mensal de voos com atraso > 30 min\n")
for row in flights_per_month.collect():
    print(f"Mês {row['month']:>2}: {row['percentual_atraso']}% dos voos atrasaram mais de 30 minutos")


Percentual mensal de voos com atraso > 30 min

Mês  1: 12.41% dos voos atrasaram mais de 30 minutos
Mês  2: 12.75% dos voos atrasaram mais de 30 minutos
Mês  3: 14.94% dos voos atrasaram mais de 30 minutos
Mês  4: 15.99% dos voos atrasaram mais de 30 minutos
Mês  5: 15.34% dos voos atrasaram mais de 30 minutos
Mês  6: 20.24% dos voos atrasaram mais de 30 minutos
Mês  7: 20.98% dos voos atrasaram mais de 30 minutos
Mês  8: 14.45% dos voos atrasaram mais de 30 minutos
Mês  9: 8.77% dos voos atrasaram mais de 30 minutos
Mês 10: 9.34% dos voos atrasaram mais de 30 minutos
Mês 11: 8.76% dos voos atrasaram mais de 30 minutos
Mês 12: 17.31% dos voos atrasaram mais de 30 minutos


9. Qual a origem mais comum para voos que pousaram em Seattle (SEA)?

In [0]:
from pyspark.sql.functions import desc

to_sea = df.filter(col("dest") == "SEA")

origens_mais_comuns = to_sea.groupBy("origin") \
                                   .count() \
                                   .orderBy(desc("count")) \
                                   .limit(1)

row = origens_mais_comuns.collect()[0]
print(f"A origem mais comum para voos que pousaram em Seattle (SEA) foi {row['origin']}, com {row['count']} voos.")

A origem mais comum para voos que pousaram em Seattle (SEA) foi JFK, com 2092 voos.


10. Qual é a média de atraso na partida dos voos (dep_delay) para cada dia da
semana?

In [0]:
df_date = df.withColumn("data_voo", to_date(concat_ws("-", "year", "month", "day")))
df_date = df_date.withColumn("dia_semana", dayofweek("data_voo"))

media_atraso_por_dia = df_date.groupBy("dia_semana") \
                                 .agg(avg("dep_delay").alias("media_atraso")) \
                                 .orderBy("dia_semana")
print("Média de atraso na partida dos voos (dep_delay) para cada dia da semana\n")
for row in media_atraso_por_dia.collect():
    nome_dia = weekdays_names.get(row["dia_semana"], "Desconhecido")
    print(f"{nome_dia}: atraso médio de {row['media_atraso']:.2f} minutos")

Média de atraso na partida dos voos (dep_delay) para cada dia da semana

Domingo: atraso médio de 11.59 minutos
Segunda: atraso médio de 14.78 minutos
Terça: atraso médio de 10.63 minutos
Quarta: atraso médio de 11.80 minutos
Quinta: atraso médio de 16.15 minutos
Sexta: atraso médio de 14.70 minutos
Sábado: atraso médio de 7.65 minutos


11. Qual é a rota que teve o maior tempo de voo médio (air_time)?

In [0]:
df_rota = df.withColumn("rota", concat_ws(" -> ", "origin", "dest"))

rotas_maior_tempo = df_rota.groupBy("rota") \
                           .agg(avg("air_time").alias("media_tempo_voo")) \
                           .orderBy(desc("media_tempo_voo")) \
                           .limit(1)

row = rotas_maior_tempo.collect()[0]
print(f"A rota com maior tempo médio de voo é {row['rota']}, com média de {row['media_tempo_voo']:.2f} minutos.")

A rota com maior tempo médio de voo é JFK -> HNL, com média de 623.09 minutos.


12. Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?

In [0]:
routes_count = df.groupBy("origin", "dest") \
                 .count()

window_spec = Window.partitionBy("origin").orderBy(col("count").desc())
ranked_routes = routes_count.withColumn("rank", row_number().over(window_spec))
top_destinations_per_origin = ranked_routes.filter(col("rank") == 1)

for row in top_destinations_per_origin.collect():
    print(f"Origem: {row['origin']} → Destinos mais comuns: {row['dest']} ({row['count']} voos)")

Origem: EWR → Destinos mais comuns: ORD (6100 voos)
Origem: JFK → Destinos mais comuns: LAX (11262 voos)
Origem: LGA → Destinos mais comuns: ATL (10263 voos)


13. Quais são as 3 rotas que tiveram a maior variação no tempo médio de voo
(air_time) ?

In [0]:
df_with_route = df.withColumn("route", concat_ws(" -> ", "origin", "dest"))

route_variation = df_with_route.groupBy("route") \
    .agg(stddev("air_time").alias("air_time_stddev")) \
    .orderBy(col("air_time_stddev").desc()) \
    .limit(3)

for row in route_variation.collect():
    print(f"Rota: {row['route']} → Variação no tempo médio de voo: {row['air_time_stddev']:.2f} minutos")

Rota: LGA -> MYR → Variação no tempo médio de voo: 25.32 minutos
Rota: EWR -> HNL → Variação no tempo médio de voo: 21.27 minutos
Rota: JFK -> HNL → Variação no tempo médio de voo: 20.69 minutos


14. Qual é a média de atraso na chegada para voos que tiveram atraso na partida
superior a 1 hora?

In [0]:
delayed_flights = df.filter(col("dep_delay") > 60)

average_arrival_delay = delayed_flights.select(avg("arr_delay")).collect()[0][0]

print(f"Para voos com atraso de mais de 1 hora, a média de atraso na chegada é {average_arrival_delay:.2f} minutos.")


Para voos com atraso de mais de 1 hora, a média de atraso na chegada é 119.05 minutos.


15. Qual é a média de voos diários para cada mês do ano?

In [0]:
df_with_date = df.withColumn("flight_date", to_date(concat_ws("-", "year", "month", "day")))
daily_flights = df_with_date.groupBy("month", "flight_date").agg(count("*").alias("flights_per_day"))

monthly_avg = daily_flights.groupBy("month") \
    .agg(avg("flights_per_day").alias("average_daily_flights")) \
    .orderBy("month")

print("Média de voos diários para cada mês do ano: \n")
for row in monthly_avg.collect():
    print(f"Mês {row['month']:>2}: média de {row['average_daily_flights']:.2f} voos por dia")


Média de voos diários para cada mês do ano: 

Mês  1: média de 871.10 voos por dia
Mês  2: média de 891.11 voos por dia
Mês  3: média de 930.13 voos por dia
Mês  4: média de 944.33 voos por dia
Mês  5: média de 928.90 voos por dia
Mês  6: média de 941.43 voos por dia
Mês  7: média de 949.19 voos por dia
Mês  8: média de 946.03 voos por dia
Mês  9: média de 919.13 voos por dia
Mês 10: média de 931.90 voos por dia
Mês 11: média de 908.93 voos por dia
Mês 12: média de 907.58 voos por dia


16. Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30
minutos?

In [0]:
delayed_arrivals = df.filter(col("arr_delay") > 30)

delayed_arrivals = delayed_arrivals.withColumn("route", concat_ws(" -> ", "origin", "dest"))

top_3_delayed_routes = delayed_arrivals.groupBy("route") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(3)

print("3 rotas mais comuns que tiveram atrasos na chegada (>30 minutos)\n")
for row in top_3_delayed_routes.collect():
    print(f"Rota: {row['route']} → {row['count']} voos com atraso superior a 30 minutos na chegada")


3 rotas mais comuns que tiveram atrasos na chegada (>30 minutos)

Rota: LGA -> ATL → 1563 voos com atraso superior a 30 minutos na chegada
Rota: JFK -> LAX → 1286 voos com atraso superior a 30 minutos na chegada
Rota: LGA -> ORD → 1188 voos com atraso superior a 30 minutos na chegada


17. Para cada origem, qual o principal destino?

In [0]:
# Esse aqui é igual à pergunta 12.
routes_count = df.groupBy("origin", "dest") \
                 .count()

window_spec = Window.partitionBy("origin").orderBy(col("count").desc())
ranked_routes = routes_count.withColumn("rank", row_number().over(window_spec))
top_destinations_per_origin = ranked_routes.filter(col("rank") == 1)

for row in top_destinations_per_origin.collect():
    print(f"Origem: {row['origin']} → Destinos mais comuns: {row['dest']} ({row['count']} voos)")


Origem: EWR → Destinos mais comuns: ORD (6100 voos)
Origem: JFK → Destinos mais comuns: LAX (11262 voos)
Origem: LGA → Destinos mais comuns: ATL (10263 voos)


-------------

# Enriquecimento dos Dados

In [0]:
AIRPORT_DB_TOKEN = "1bfe4ab495c115498b46201d53fbefefdb130595c8d0514a71cf25ea83f2173be94e42ce0600d942e481ae84aaa6735a"
WEATHERBIT_TOKEN = "2872953c63434f6fbf9058c6ce3624ca"

## Buscando dados pelo Airportdb

In [0]:
def get_coordinates(airport_code: str, token: str):
    url = f"https://airportdb.io/api/v1/airport/K{airport_code}?apiToken={token}"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        return data.get("latitude_deg"), data.get("longitude_deg")
    else:
        return None, None


## Buscando dados de Vento pelo Weatherbit

In [0]:
def get_wind_speed(lat: float, lon: float, date: str, key: str):
    import datetime
    next_day = (datetime.datetime.strptime(date, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime("%Y-%m-%d")

    url = "https://api.weatherbit.io/v2.0/history/daily"
    params = {
        "lat": lat,
        "lon": lon,
        "start_date": date,
        "end_date": next_day,
        "key": key
    }

    response = requests.get(url, params=params)
    if response.status_code == 200:
        data = response.json()
        try:
            return data["data"][0]["wind_spd"]
        except:
            return None
    else:
        return None

## Enriquecendo os dados da base utilizando as funções acima

In [0]:
# Essa etapa eu fiz localmente pois o Databricks bloqueou a minha requisição para as APIs acima

**Pergunta final**: Enriqueça a base de dados de voos com as condições meteorológicas
(velocidade do vento) para os aeroportos de origem e destino. Mostre as informações
enriquecidas para os 5 voos com maior atraso na chegada.

In [0]:
# Carregamento do DF enriquecido (salvei em um starter warehouse no databricks)
df_enriched = spark.sql("SELECT * FROM default.`enriched-airports`")

In [0]:
for row in df_enriched.collect():
    print(f"""
Voo {row['origin']} → {row['dest']} no dia {row['date']}:
    • Velocidade do vento na origem: {row['wind_origin']} m/s
    • Velocidade do vento no destino: {row['wind_dest']} m/s
""")


Voo JFK → HNL no dia 2013-01-09:
    • Velocidade do vento na origem: 3.5 m/s
    • Velocidade do vento no destino: None m/s


Voo JFK → CMH no dia 2013-06-15:
    • Velocidade do vento na origem: 4.1 m/s
    • Velocidade do vento no destino: 1.9 m/s


Voo EWR → ORD no dia 2013-01-10:
    • Velocidade do vento na origem: 4.1 m/s
    • Velocidade do vento no destino: 4.1 m/s


Voo JFK → SFO no dia 2013-09-20:
    • Velocidade do vento na origem: 3.7 m/s
    • Velocidade do vento no destino: 4.3 m/s


Voo JFK → CVG no dia 2013-07-22:
    • Velocidade do vento na origem: 3.2 m/s
    • Velocidade do vento no destino: 2.3 m/s



# Modelo ML - Regressão Logística

In [0]:
df_pd = df.toPandas()
df_pd = df_pd.dropna(subset=['dep_delay', 'distance', 'air_time', 'origin', 'dest'])
df_pd['target'] = (df_pd['arr_delay'] > 15).astype(int)

for col in ['origin', 'dest']:
    le = LabelEncoder()
    df_pd[col] = le.fit_transform(df_pd[col])

X = df_pd[['dep_delay', 'distance', 'air_time', 'origin', 'dest']]
y = df_pd['target']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = LogisticRegression()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))

joblib.dump(model, 'model.pkl')


STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


              precision    recall  f1-score   support

           0       0.92      0.97      0.94     49838
           1       0.89      0.72      0.80     15632

    accuracy                           0.91     65470
   macro avg       0.91      0.85      0.87     65470
weighted avg       0.91      0.91      0.91     65470



['model.pkl']