In [None]:
import requests
from io import BytesIO, StringIO
from zipfile import ZipFile
from pandas import read_csv
from decimal import Decimal
from datetime import date, timedelta
import logging

from pyspark.sql.functions import (
    col,
    max,
    mean,
    floor,
    concat,
    lit,
    to_date,
    date_format,
    count,
    row_number,
    stddev,
    last_day,
    dayofmonth,
    udf,
)
from pyspark.sql.types import DecimalType
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

In [None]:
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

In [None]:
dbutils.widgets.text("airport_token", "")
dbutils.widgets.text("wheather_token", "")

AIRPORT_TOKEN: str = dbutils.widgets.get("airport_token")
WHEATER_TOKEN: str = dbutils.widgets.get("wheather_token")

In [None]:
AIRPORT_SESSION = requests.Session()
AIRPORT_SESSION.params = {"apiToken": AIRPORT_TOKEN}

WHEATER_SESSION = requests.Session()
WHEATER_SESSION.params = {"key": WHEATER_TOKEN}

## Download da base airports-database

In [None]:
response = requests.get(
    "https://github.com/PicPay/case-machine-learning-engineer-pleno/raw/main/notebook/airports-database.zip"
)
files = ZipFile(BytesIO(response.content))
csv = files.open("airports-database.csv").read().decode("utf-8")
df = spark.createDataFrame(read_csv(StringIO(csv)))
df.write.saveAsTable("airports_database", format="delta", mode="overwrite")
del response, files, csv

## Respondendo as questões

### Questão 1 - Qual é o número total de voos no conjunto de dados?

In [None]:
df.count()  # Podemos obter o valor a partir da contagem de linhas
df.select(
    max(col("id"))
).display()  # Após verificar rapidamente que a base é sequencial, também podemos obter o valor pegando o ultimo id criado

max(id)
336775


###Questão 2 - Quantos voos foram cancelados?
(Considerando que voos cancelados têm dep_time e arr_time nulos)


In [None]:
# Podemos utilizar tanto o filter quando o where já que são equivalentes.
# Ao todo temos 8255 voos cancelados. Com base na regra de negócio fornecida.
(df.filter(col("dep_time").isNull() & col("arr_time").isNull()).count())

### Questão 3 - Qual é o atraso médio na partida dos voos?
(dep_delay)

In [None]:
# Devemos nos atentar que devemos considerar apenas os voos atrasados e não os adiantados.
# Desta forma o tempo médio de atraso é de cerca de 39 minutos e 22 segundos.
(
    df.filter(col("dep_delay") > 0)
    .select(mean(col("dep_delay")).alias("mean_dep_delay"))
    .select(
        floor(col("mean_dep_delay")).alias("minutes"),
        floor((col("mean_dep_delay") - floor(col("mean_dep_delay"))) * 60).alias(
            "seconds"
        ),
    )
    .display()
)

minutes,seconds
39,22


### Questão 4 - Quais são os 5 aeroportos com maior número de pousos?

In [None]:
# Filtrando os pousos devidamente finalizados, realizando um groupBy olhando para os destinos, em seguida contando e ordenando temos
# que os destinos mais procurados são:
# 1- ATL com 16873 pousos
# 2- ORD com 16607 pousos
# 3- LAX com 16058 pousos
# 4- BOS com 15028 pousos
# 5- MCO com 13979 pousos
(
    df.filter(col("dep_time").isNotNull() & col("arr_time").isNotNull())
    .groupBy(col("dest"))
    .count()
    .orderBy(col("count").desc())
    .limit(5)
    .display()
)

dest,count
ATL,16873
ORD,16607
LAX,16058
BOS,15028
MCO,13979


### Questão 5 - Qual é a rota mais frequente (par origin-dest)?

In [None]:
# Como na questão anterior, após filtrar os voos devidamente finalizados, realizamos a concatenação da tupla origin-dest para agruparmos e realizarmos a contagem.
# No final do processo obtemos que o par JFK-LAX é a rota mais frequente com 11182 voos.
(
    df.filter(col("dep_time").isNotNull() & col("arr_time").isNotNull())
    .groupBy(concat(col("origin"), lit("-"), col("dest")).alias("route"))
    .count()
    .orderBy(col("count").desc())
    .limit(1)
    .display()
)

route,count
JFK-LAX,11182


### Questão 6 - Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada?
(Exiba também o tempo)

In [None]:
# Partindo do principio que o tempo médio de atraso depende do voo partir, fiz o classico filtro de apenas voos que terminaram.
# Também realizei um filtro de apenas voos atrasados, ou seja com arr_delay maior do que 0.
# As top 5 empresas que mais atrasam voos são:
# 1 - SkyWest Airlines Inc.	60 minutos e 36 segundos
# 2 - Mesa Airlines Inc. 51 minutos e 4 segundos
# 3 - Endeavor Air Inc. 49 minutos e 16 segundos
# 4 - ExpressJet Airlines Inc. 48 minutos e16 segundos
# 5 - Frontier Airlines Inc. 47 minutos e 34 segundos
(
    df.filter(
        col("dep_time").isNotNull()
        & col("arr_time").isNotNull()
        & (col("arr_delay") > 0)
    )
    .groupBy("name")
    .mean()
    .select(col("avg(arr_delay)").alias("avg_arr_delay"), col("name"))
    .orderBy(col("avg_arr_delay").desc())
    .select(col("name"), col("avg_arr_delay"))
    .select(
        col("name"),
        floor(col("avg_arr_delay")).alias("avg_arr_delay_minutes"),
        floor((col("avg_arr_delay") - floor(col("avg_arr_delay"))) * 60).alias(
            "avg_arr_delay_seconds"
        ),
    )
    .limit(5)
    .display()
)

name,avg_arr_delay_minutes,avg_arr_delay_seconds
SkyWest Airlines Inc.,60,36
Mesa Airlines Inc.,51,4
Endeavor Air Inc.,49,16
ExpressJet Airlines Inc.,48,16
Frontier Airlines Inc.,47,34


###  Questão 7 - Qual é o dia da semana com maior número de voos?


In [None]:
# Partindo da filtragem de apenas voos que realmente aconteceram, logo em seguida calculamos o dia da semana... eu escolhi fazer através do campo date_format.
# Ai realizamos um agrupamento no dia da semana e em seguida um count. Finalmente, ordenamos e pegamos a primeira linha.
# Com isso concluimos que o dia da semana com maior números de viagens é Segunda Feira com 49398 voos.
(
    df.filter(col("dep_time").isNotNull() & col("arr_time").isNotNull())
    .select(
        date_format(to_date(col("time_hour"), "yyyy-MM-dd HH:mm:ss"), "EEEE").alias(
            "day_of_week"
        )
    )
    .groupBy("day_of_week")
    .agg(count("day_of_week").alias("trips_by_day_of_week"))
    .orderBy(col("trips_by_day_of_week").desc())
    .limit(1)
    .display()
)

day_of_week,trips_by_day_of_week
Monday,49398


### Questão 8 - Qual o percentual mensal dos voos tiveram atraso na partida superior a 30 minutos?

In [None]:
# Neste caso, entende-se que devemos considerar todos os voos que partiram, desta forma estou removendo o filtro de voos atrasados para o calculo do valor total
# Em seguida, para realizei um agrupamento a partir do mês.
# Para o calculo do valor de voos atrasados seguimos a logica anterior adicionando um filtro de atraso superior a 30 minutos.
# Em seguida, cruzamos os dois dataframes e realizamos o calculo da porcentagem mensal.
# Desta forma, concluimos que o mês com maior número de atrasos acima de 30 minutos é Julho com cerca de 23,191% de atrasos
monthly_flights = (
    df.filter(col("dep_time").isNotNull())
    .groupBy("month")
    .agg(count("month").alias("amount_monthly_flights"))
)
monthly_delayed_flights = (
    df.filter(
        col("dep_time").isNotNull()
        & col("arr_time").isNotNull()
        & (col("arr_delay") > 30)
    )
    .groupBy("month")
    .agg(count("month").alias("amount_monthly_delayed_flights"))
)
monthly_flights.join(monthly_delayed_flights, on="month", how="inner").select(
    col("month"),
    (col("amount_monthly_delayed_flights") / col("amount_monthly_flights") * 100).alias(
        "percentage"
    ),
).display()

month,percentage
1,14.12226711475286
10,9.48591770495236
11,9.728130201590533
12,21.047583917373665
2,14.229632756437317
3,15.296893432953205
4,18.79473646157183
5,14.840789147451565
6,22.765660571344643
7,23.1911532385466


### Questão 9 - Qual a origem mais comum para voos que pousaram em Seattle (SEA)?

In [None]:
# Como se trata de voos que pousaram, ou seja, foram finalizados, devemos primeiramente filtrar com base nos voos que
# devidamente aconteceram e nos que tem SEA como destino. Em seguida, devemos agrupar pela origem realizar um count ordenar e pegar o maior.
# Com isso concluimos que o aeroporto JFK é a origem mais comum para voos que pousaram em Seattle.
df.filter(
    col("dep_time").isNotNull() & col("arr_time").isNotNull() & (col("dest") == "SEA")
).groupBy(col("origin")).agg(count(col("origin")).alias("amount_flights")).orderBy(
    col("amount_flights").desc()
).limit(1).display()

origin,amount_flights
JFK,2079


### Questão 10 - Qual é a média de atraso na partida dos voos (dep_delay) para cada dia da semana?

In [None]:
# Primeiramente, filtramos apenas os voos que partiram, em seguida realizamos um filtro sobre os voos que atrasaram
# ou seja, que possuem arr_delay maior do que 0. Posteriormente, calculamos o dia da semana através da função to_date. Agrupamos pelo dia da semana
# e agregamos pela média de atraso. Finalmente realizamos os calculos para obter os minutos e segundos. E temos o seguinte resultado:
# Monday	43:31
# Tuesday	37:25
# Wednesday	39:31
# Thursday	43:08
# Friday	40:23
# Saturday	30:29
# Sunday	37:30

(
    df.filter(col("dep_time").isNotNull() & (col("dep_delay") > 0))
    .select(
        date_format(to_date(col("time_hour"), "yyyy-MM-dd HH:mm:ss"), "EEEE").alias(
            "day_of_week"
        ),
        col("dep_delay"),
    )
    .groupBy(col("day_of_week"))
    .agg(mean("dep_delay").alias("avg_dep_delay"))
    .select(
        col("day_of_week"),
        floor(col("avg_dep_delay")).alias("avg_dep_delay_minutes"),
        floor((col("avg_dep_delay") - floor(col("avg_dep_delay"))) * 60).alias(
            "avg_dep_delay_seconds"
        ),
    )
    .display()
)

day_of_week,avg_dep_delay_minutes,avg_dep_delay_seconds
Wednesday,39,31
Tuesday,37,25
Friday,40,23
Thursday,43,8
Saturday,30,29
Monday,43,31
Sunday,37,30


### Questão 11 - Qual é a rota que teve o maior tempo de voo médio (air_time)?

In [None]:
# Após fazer a limpeza padrão de considerar apenas os voos finalizados, agrupamos por origem e destino e agregamos pela média do tempo no ar.
# Em seguida ordenamos para pegar o maior e calculamos as horas, minutos e segundos.
# Desta forma, podemos concluir que a rota com duração média mais longa é a JFK-HNL com duração de 10 horas 23 minutos e 5 segundos.
df.filter(col("dep_time").isNotNull() & col("arr_time").isNotNull()).groupBy(
    col("origin"), col("dest")
).agg(mean("air_time").alias("avg_air_time")).orderBy(col("avg_air_time").desc()).limit(
    1
).select(
    col("origin"),
    col("dest"),
    floor(col("avg_air_time") / 60).alias("avg_air_time_hours"),
    floor(col("avg_air_time") % 60).alias("avg_air_time_minutes"),
    floor((col("avg_air_time") - floor(col("avg_air_time"))) * 60).alias(
        "avg_air_time_seconds"
    ),
).display()

origin,dest,avg_air_time_hours,avg_air_time_minutes,avg_air_time_seconds
JFK,HNL,10,23,5


### Questão 12 - Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?

In [None]:
# Iniciamos removendo os voos cancelados, em seguida, agrupando por origem e destino e agregamos pela contagem de cada destino.
# posteriormente realizamos um janelamento através da função rank_over particionando pelo aeroporto de origem e ordenando pela maior quantidades de voos ao
# aeroporto de destino. Em seguida filtramos pegando o maior rank e finalizamos pegando só as colunas necessárias.
# Desta forma concluimos que para os aeroportos:
# EWR o destino mais comum é ORD com 5841 voos
# JFK o destimo mais comum é LAX com 11182 voos
# LGA o destino mais comum é ATL com 10063 voos
df.filter(col("dep_time").isNotNull() & col("arr_time").isNotNull()).groupBy(
    col("origin"), col("dest")
).agg(count("dest").alias("amount_flights")).withColumn(
    "rank",
    row_number().over(
        Window.partitionBy(col("origin")).orderBy(col("amount_flights").desc())
    ),
).filter(col("rank") == 1).select(
    col("origin"), col("dest"), col("amount_flights")
).display()

origin,dest,amount_flights
EWR,ORD,5841
JFK,LAX,11182
LGA,ATL,10063


### Questão 13 - Quais são as 3 rotas que tiveram a maior variação no tempo médio de voo
(air_time) ?

In [None]:
# Considerando a variação no tempo médio de voo como o seu desvio padrão, começamos filtrando os voos que o tempo no ar não são nulos.
# Em seguida realizamos um agrupamento sobre a origem e o destino, e agregamos pelo desvio padrao do tempo de voo.
# Finalmente ordenamos, limitamos e transformamos os minutos em segundos.
# Após isso podemos concluir que:
# A primeira rota com maior variação é LGA-MYR com 25 minutos e 19 segundos
# A segunda rota com maior variação é EWR-HNL com 21 minutos e 15 segundos
# A terceira rota com maior variação é JFK-HNL com 20 minutos e 41 segundos
(
    df.filter(col("air_time").isNotNull())
    .groupBy(col("origin"), col("dest"))
    .agg(stddev(col("air_time")).alias("stddev_air_time"))
    .orderBy(col("stddev_air_time").desc())
    .limit(3)
    .select(
        col("origin"),
        col("dest"),
        floor(col("stddev_air_time")).alias("stddev_air_time_minutes"),
        floor((col("stddev_air_time") - floor(col("stddev_air_time"))) * 60).alias(
            "stddev_air_time_seconds"
        ),
    )
    .display()
)

origin,dest,stddev_air_time_minutes,stddev_air_time_seconds
LGA,MYR,25,19
EWR,HNL,21,15
JFK,HNL,20,41


### Questão 14 - Qual é a média de atraso na chegada para voos que tiveram atraso na partida superior a 1 hora?

In [None]:
# Iniciamos filtrando os voos que possuem mais de 60 minutos de atraso de saida e algum tempo de atraso.
# Em seguida calculamos a média do tempo de atraso na chegada e calculamos as horas, minutos e segundos.
# Assim concluimos que o tempo solicitado é de 1 hora 59 minutos e 3 segundos.
(
    df.filter((col("dep_delay") > 60) & (col("arr_delay") > 0))
    .select(mean(col("arr_delay")).alias("mean_arr_delay"))
    .select(
        floor(col("mean_arr_delay") / 60).alias("mean_arr_delay_hours"),
        floor(col("mean_arr_delay") % 60).alias("mean_arr_delay_minutes"),
        floor((col("mean_arr_delay") - floor(col("mean_arr_delay"))) * 60).alias(
            "mean_arr_delay_seconds"
        ),
    )
    .display()
)

mean_arr_delay_hours,mean_arr_delay_minutes,mean_arr_delay_seconds
1,59,3


### Questão 15 - Qual é a média de voos diários para cada mês do ano?

In [None]:
# Iniciamos removendo os voos cancelados, agrupamos pelo ano e mês e agregamos contando os voos para obter a quantidade de voos mensais.
# Em seguida obtemos a quantidade de dias em cada mês e utilizamos isso para calcular a média de voos diarios mensal.
# Finalmente chegamos a seguinte média mensal:
# 2013/1	853.8064516129032
# 2013/2	845
# 2013/3	901.3870967741935
# 2013/4	920.6666666666666
# 2013/5	909.516129032258
# 2013/6	905.7
# 2013/7	915.5483870967741
# 2013/8	929.7096774193549
# 2013/9	902.3333333333334
# 2013/10	923.9354838709677
# 2013/11	900.5
# 2013/12	873.4193548387096

(
    df.filter(col("dep_time").isNotNull() & col("arr_time").isNotNull())
    .groupBy(col("year"), col("month"))
    .agg(count(col("id")).alias("amount_flights"))
    .select(
        col("year"),
        col("month"),
        col("amount_flights"),
        dayofmonth(
            last_day(concat(col("year"), lit("-"), col("month"), lit("-"), lit(1)))
        ).alias("amount_days_month"),
    )
    .select(
        col("year"),
        col("month"),
        (col("amount_flights") / col("amount_days_month")).alias(
            "mean_daily_flights_by_month"
        ),
    )
    .orderBy(col("year"), col("month"))
    .display()
)

year,month,mean_daily_flights_by_month
2013,1,853.8064516129032
2013,2,845.0
2013,3,901.3870967741937
2013,4,920.6666666666666
2013,5,909.516129032258
2013,6,905.7
2013,7,915.548387096774
2013,8,929.7096774193548
2013,9,902.3333333333334
2013,10,923.9354838709676


### Questão 16 - Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos?

In [None]:
# Iniciamos filtrando apenas os voos com atraso de chegado acima de 30 minutos, em seguida, agrupamos por rota
# Contamos a quantidade de voos, ordenamos e pegamos os 3 maiores. Assim temos que as rotas desejadas são:
# LGA-ATL com 1563 voos com mais de 30 minutos de atraso
# JFK-LAX com 1286 voos com mais de 30 minutos de atraso
# LGA-ORD com 1188 voos com mais de 30 minutos de atraso
(
    df.filter((col("arr_delay") > 30))
    .groupBy(col("origin"), col("dest"))
    .agg(count("id").alias("amount_delayed_flights"))
    .orderBy(col("amount_delayed_flights").desc())
    .limit(3)
    .display()
)

origin,dest,amount_delayed_flights
LGA,ATL,1563
JFK,LAX,1286
LGA,ORD,1188


### Questão 17 - Para cada origem, qual o principal destino?

In [None]:
# Não consegui encontrar diferença nessa questão para a 12. Desta forma considero a resposta das duas a mesma.

### Questão Final - Enriqueça a base de dados de voos com as condições meteorológicas (velocidade do vento) para os aeroportos de origem e destino. Mostre as informações enriquecidas para os 5 voos com maior atraso na chegada.

In [None]:
# Iniciamos definindo a função que irá nos retornar a latitute e longitude do aeroporto.
def get_airport_position(airport_code: str) -> dict[str, Decimal]:
    # Necessário para arrumar o caso do aeroporto HNL (Honolulu Havaí) no qual o código ICAO
    # não começa com a letra K. Necessário implementar um dicionario de conversão IATA para ICAO.
    airport_code = f"P{airport_code}" if airport_code == "HNL" else f"K{airport_code}"
    url = f"https://airportdb.io/api/v1/airport/{airport_code}"
    res = airport_api_session.get(url)
    data = res.json()
    return {"lat": Decimal(data["latitude_deg"]), "lon": Decimal(data["longitude_deg"])}


# Em seguida partimos para a definição da função que retorna a velocidade média no dia desejado do aeroporto desejado
def airport_wind_spd(airport_code: str, flight_date: date) -> Decimal | None:
    try:
        airport_pos = get_airport_position(airport_code=airport_code)
        res = WHEATER_SESSION.get(
            "https://api.weatherbit.io/v2.0/history/daily",
            params={
                "lat": airport_pos["lat"],
                "lon": airport_pos["lon"],
                "start_date": flight_date.isoformat(),
                "end_date": (flight_date + timedelta(days=1)).isoformat(),
            },
        )
        return Decimal(res.json()["data"][0]["wind_spd"])
    except Exception as err:
        logging.error(err)
        return None


# Transformamos a função anterior em uma UDF (User Defined Function)
airport_wind_spd_udf = udf(airport_wind_spd, DecimalType(12, 1))

# Aplicamos a nossa UDF em cima dos aeroportos solicitados, no caso 5
df.orderBy(col("arr_delay").desc()).limit(5).select(
    col("*"),
    airport_wind_spd_udf(
        col("origin"), to_date(col("time_hour"), "yyyy-MM-dd HH:mm:ss")
    ).alias("origin_wind_spd"),
    airport_wind_spd_udf(
        col("dest"), to_date(col("time_hour"), "yyyy-MM-dd HH:mm:ss")
    ).alias("dest_wind_spd"),
).display()

## Modelo de ML

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import joblib

### Limpeza do dataset

In [None]:
# Nosso modelo consistirá em receber os horarios de saida e chegada do voo e retornar o tempo no ar.
# ao filtrar os voos terminados, temos a garantia de que o air_time não é nulo.
mldf = df.filter(
    col("dep_time").isNotNull()
    & col("arr_time").isNotNull()
    & col("air_time").isNotNull()
).select(col("dep_time"), col("arr_time"), col("air_time"))

### Definição de features e target, e divisão de treino e teste

In [None]:
# Definimos as features e o target, e dividimos em teste e treino
assembler = VectorAssembler(inputCols=["dep_time", "arr_time"], outputCol="features")
df_train, df_test = (
    assembler.transform(mldf)
    .select(col("features"), col("air_time"))
    .randomSplit([0.9, 0.1])
)

### Treino e avaliação

In [None]:
# Realizamos o treinamento e as predições
reg = LinearRegression(featuresCol="features", labelCol="air_time")
reg = reg.fit(df_train)
pred = reg.transform(df_test)

Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

### Calculo de métricas

In [None]:
# Tiramos as
# Avaliar o modelo usando métricas como RMSE e R2, que fazem sentido visto que é uma regressão linear
evaluator = RegressionEvaluator(
    labelCol="air_time", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(pred)
print(f"Root Mean Squared Error (RMSE): {rmse}")
evaluator_r2 = RegressionEvaluator(
    labelCol="air_time", predictionCol="prediction", metricName="r2"
)
r2 = evaluator_r2.evaluate(pred)
print(f"R2: {r2}")

Root Mean Squared Error (RMSE): 93.62597387680323
R2: 0.007876950067743804


### Exportar modelo

In [None]:
# Como fizemos uma regressão linear, podemos simplificar exportando apenas os coeficientes e o interceptador
coefficients = reg.coefficients.toArray()
intercept = reg.intercept
model_params = {"coefficients": coefficients, "intercept": intercept}

In [None]:
# Em seguida colocamos o modelo em um lugar temporario e movemos para o FileStore
joblib.dump(model_params, "/tmp/model_params.pkl")
dbutils.fs.mv("file:/tmp/model_params.pkl", "/FileStore/models/model_params.pkl")

# Agora o modelo está disponivel para download através do link
# https://community.cloud.databricks.com/files/models/model_params.pkl

True