# Desafio Técnico PicPay

# 1. Upload da base de dados
Essa primeira etapa consiste de fazer o upload da base de dados `airports-database.csv` para o databrics.  
  
Na página inicial do Databrics, podemos clicar em `Create Table`.  
![criando tabela](assets/1_create_table.png)  
  
E realizar o upload do arquivo no Databricks.  
![fazendo o upload no databrics](assets/2_upload.png)
  
O databrics permite que criemos uma tabela por meio da interface gráfica e por meio do notebook (opção escolhida), que será realizada logo abaixo.

## 1.1. Catálogo de Dados
Temos o nosso conjunto de dados no databrics em formato CSV, esse é o nosso dado raw (ou dado bruto), sem qualquer tipo de transformação.  
Essa tabela é composta pelas seguintes colunas:
| Coluna | Descrição |
|---|---|
| id | Um identificador único para cada registro de voo. |
| year | O ano em que o voo ocorreu (2013 neste conjunto de dados). |
| month | O mês em que o voo ocorreu (1 a 12). |
| day| O dia do mês em que o voo ocorreu (1 a 31). |
| dep_time | O horário local real de partida do voo, no formato 24 horas (hhmm). |
| sched_dep_time | O horário local programado de partida do voo, no formato 24 horas (hhmm). |
| dep_delay | A diferença entre os horários real e programado de partida do voo, em minutos. Um valor positivo indica uma partida atrasada, enquanto um valor negativo indica uma partida adiantada. |
| arr_time | O horário local real de chegada do voo, no formato 24 horas (hhmm). |
| sched_arr_time | O horário local programado de chegada do voo, no formato 24 horas (hhmm).|
| arr_delay | A diferença entre os horários real e programado de chegada do voo, em minutos. Um valor positivo indica uma chegada atrasada, enquanto um valor negativo indica uma chegada adiantada. |
| carrier | O código de duas letras da companhia aérea do voo. |
| flight | O número do voo. |
| tailnum | O identificador único da aeronave usada no voo. |
| origin | O código de três letras do aeroporto de origem do voo. |
| dest | O código de três letras do aeroporto de destino do voo. |
| air_time | A duração do voo, em minutos. |
| distance | A distância entre os aeroportos de origem e destino, em milhas. |
| hour | O componente da hora do horário programado de partida, no horário local. |
| minute | O componente dos minutos do horário programado de partida, no horário local. |
| time_hour | O horário programado de partida do voo, no formato local e de data-hora (yyyy-mm-dd hh)
| name | O nome da companhia aérea do voo. |

# 2. Carga
Aqui faremos o carregamento do arquivo para uma tabela delta na camada bronze.  
A arquitetura usada aqui é a [arquitetura medallion](https://www.databricks.com/glossary/medallion-architecture).  
![medallion architecture](assets/3_medallion_architecture.png)

In [None]:
# File location and type
file_location = "/FileStore/tables/airports_database.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df.limit(5).display()

id,year,month,day,dep_time,sched_dep_time,dep_delay,arr_time,sched_arr_time,arr_delay,carrier,flight,tailnum,origin,dest,air_time,distance,hour,minute,time_hour,name
0,2013,1,1,517.0,515,2.0,830.0,819,11.0,UA,1545,N14228,EWR,IAH,227.0,1400,5,15,2013-01-01 05:00:00,United Air Lines Inc.
1,2013,1,1,533.0,529,4.0,850.0,830,20.0,UA,1714,N24211,LGA,IAH,227.0,1416,5,29,2013-01-01 05:00:00,United Air Lines Inc.
2,2013,1,1,542.0,540,2.0,923.0,850,33.0,AA,1141,N619AA,JFK,MIA,160.0,1089,5,40,2013-01-01 05:00:00,American Airlines Inc.
3,2013,1,1,544.0,545,-1.0,1004.0,1022,-18.0,B6,725,N804JB,JFK,BQN,183.0,1576,5,45,2013-01-01 05:00:00,JetBlue Airways
4,2013,1,1,554.0,600,-6.0,812.0,837,-25.0,DL,461,N668DN,LGA,ATL,116.0,762,6,0,2013-01-01 06:00:00,Delta Air Lines Inc.


In [None]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- sched_dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- sched_arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- time_hour: string (nullable = true)
 |-- name: string (nullable = true)



Aqui estarei salvando como uma tabela delta na camada bronze.

In [None]:
df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("bronze_airports_database")

Verificando se a nossa tabela foi corretamente criada por meio de uma consulta SQL que seleciona todos os dados da tabela e retorna as 5 primeiras linhas

In [None]:
%sql
SELECT * FROM `bronze_airports_database` LIMIT 5;

id,year,month,day,dep_time,sched_dep_time,dep_delay,arr_time,sched_arr_time,arr_delay,carrier,flight,tailnum,origin,dest,air_time,distance,hour,minute,time_hour,name
0,2013,1,1,517.0,515,2.0,830.0,819,11.0,UA,1545,N14228,EWR,IAH,227.0,1400,5,15,2013-01-01 05:00:00,United Air Lines Inc.
1,2013,1,1,533.0,529,4.0,850.0,830,20.0,UA,1714,N24211,LGA,IAH,227.0,1416,5,29,2013-01-01 05:00:00,United Air Lines Inc.
2,2013,1,1,542.0,540,2.0,923.0,850,33.0,AA,1141,N619AA,JFK,MIA,160.0,1089,5,40,2013-01-01 05:00:00,American Airlines Inc.
3,2013,1,1,544.0,545,-1.0,1004.0,1022,-18.0,B6,725,N804JB,JFK,BQN,183.0,1576,5,45,2013-01-01 05:00:00,JetBlue Airways
4,2013,1,1,554.0,600,-6.0,812.0,837,-25.0,DL,461,N668DN,LGA,ATL,116.0,762,6,0,2013-01-01 06:00:00,Delta Air Lines Inc.


Consultando a quantidade de registros na nossa tabela bronze

In [None]:
%sql
SELECT COUNT(*) FROM `bronze_airports_database`;

count(1)
336776


# 3. Perguntas

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, count, mean, when, round, row_number, stddev
from pyspark.sql.window import Window
from datetime import datetime

**1.** Qual é o número total de voos no conjunto de dados?  
**Resposta:** O conjunto de dados possui o registro de 336.776 voos.

In [None]:
df.count()

Out[10]: 336776

**2.** Quantos voos foram cancelados? (Considerando que voos cancelados têm `dep_time` e `arr_time` nulos)  
**Resposta:** 8.255 voos foram cancelados.

In [None]:
df.filter(col("dep_time").isNull() & col("arr_time").isNull()).count()

Out[11]: 8255

**3.** Qual é o atraso médio na partida dos voos (`dep_delay`)?  
**Resposta:** O atraso médio é de 12.57 minutos.

Primeiramente, irei criar um dataframe apenas com voos que não foram cancelados. Esse dataframe será usado para responder essa e as próximas perguntas.

In [None]:
df_voos = df.filter(col("dep_time").isNotNull() & col("arr_time").isNotNull())
df_voos.select(mean(col("dep_delay"))).display()

avg(dep_delay)
12.575974736559749


**4.** Quais são os 5 aeroportos com maior número de pousos?

In [None]:
df_voos.groupBy('dest').count().sort('count', ascending=False).limit(5).display()

dest,count
ATL,16873
ORD,16607
LAX,16058
BOS,15028
MCO,13979


**5.** Qual é a rota mais frequente (par origin-dest)?  
**Resposta:** A rota mais frequente é a de JFK para LAX com 11.182 voos.

In [None]:
df_voos.groupBy(['origin', 'dest']).count().sort('count', ascending=False).limit(1).display()

origin,dest,count
JFK,LAX,11182


**6.** Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada? (Exiba também o tempo)

In [None]:
df_voos.groupBy('name').agg(mean('arr_delay')).sort('avg(arr_delay)', ascending=False).limit(5).display()

name,avg(arr_delay)
Frontier Airlines Inc.,21.920704845814974
AirTran Airways Corporation,20.115905511811025
ExpressJet Airlines Inc.,15.79643108710965
Mesa Airlines Inc.,15.556985294117649
SkyWest Airlines Inc.,11.93103448275862


**7.** Qual é o dia da semana com maior número de voos?  
**Resposta:** O dia da semana com mais voos é Segunda.

In [None]:
def get_weekday(date):
    """
    Retorna o dia da semana, onde Segunda é 0 e Domingo é 6
    """
    date = datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
    return date.weekday()

udf_function = udf(lambda date: get_weekday(date))

df_voos = df_voos.withColumn('weekday', udf_function(df["time_hour"]))

# agrupando por dia da semana, contando a quantidade e retornando o resultado com o maior valor
df_voos.groupBy('weekday').count().sort('count', ascending=False).limit(1).display()

weekday,count
0,49398


**8.** Qual o percentual mensal dos voos tiveram atraso na partida superior a 30 minutos?

In [None]:
monthly_counts = df_voos.groupBy("month") \
    .agg(
        count("*").alias("total_count"),
        count(when(col("dep_delay") > 30, True)).alias("greater_than_30_count")
    )

monthly_percentage = monthly_counts.withColumn(
    "percentage", round((col("greater_than_30_count") / col("total_count")) * 100, 2)
)

monthly_percentage.sort("percentage", ascending=False).display()

month,total_count,greater_than_30_count,percentage
7,28382,6117,21.55
6,27171,5684,20.92
12,27076,4852,17.92
4,27620,4507,16.32
5,28195,4399,15.6
3,27943,4295,15.37
8,28821,4233,14.69
2,23660,3172,13.41
1,26468,3345,12.64
10,28642,2693,9.4


**9.** Qual a origem mais comum para voos que pousaram em Seattle (SEA)?  
**Resposta:** A origem mais comum é de JFK.

In [None]:
seattle_dest = df_voos.filter(df_voos.dest == "SEA")
seattle_dest.groupBy("origin").count().sort("count", ascending=False).limit(1).display()

origin,count
JFK,2079


**10.** Qual é a média de atraso na partida dos voos (`dep_delay`) para cada dia da semana?

In [None]:
df_voos.groupBy('weekday').agg(mean('dep_delay')).sort('avg(dep_delay)', ascending=False).display()

weekday,avg(dep_delay)
3,16.064836929665624
0,14.734220008907243
4,14.660643065663418
2,11.699197998071911
6,11.495974376412258
1,10.591532413709038
5,7.620118218281613


**11.** Qual é a rota que teve o maior tempo de voo médio (`air_time`)?  
**Resposta:** A rota com o maior tempo é a rota JFK -> HNL com tempo médio de 623 minutos.

In [None]:
df_voos.groupBy(["origin", "dest"]).agg(mean('air_time')).sort('avg(air_time)', ascending=False).limit(1).display()

origin,dest,avg(air_time)
JFK,HNL,623.0877192982456


**12.** Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?

In [None]:
destination_counts = df_voos.groupBy("origin", "dest").agg(
    count("*").alias("count")
)

window_spec = Window.partitionBy("origin").orderBy(col("count").desc())
destination_counts.withColumn(
    "rank", row_number().over(window_spec)
).filter(col("rank") == 1).drop("rank").display()

origin,dest,count
EWR,ORD,5841
JFK,LAX,11182
LGA,ATL,10063


**13.** Quais são as 3 rotas que tiveram a maior variação no tempo médio de voo (`air_time`)?

In [None]:
df_voos.groupBy(["origin", "dest"]).agg(stddev("air_time")).sort('stddev_samp(air_time)', ascending=False).limit(3).display()

origin,dest,stddev_samp(air_time)
LGA,MYR,25.32455988429677
EWR,HNL,21.266135468474197
JFK,HNL,20.688824842787028


**14.** Qual é a média de atraso na chegada para voos que tiveram atraso na partida superior a 1 hora?  
**Resposta:** A média de atraso é de 119 minutos (1 hora e 59 minutos).

In [None]:
df_voos.where(col("dep_delay") > 60).select(mean(col("arr_delay"))).display()

avg(arr_delay)
119.0488054996392


**15.** Qual é a média de voos diários para cada mês do ano?

In [None]:
flights_count = df_voos.groupBy(["day", "month"]).count()
flights_count.groupBy("month").agg(round(mean("count"), 2)).sort("month").display()

month,"round(avg(count), 2)"
1,853.81
10,923.94
11,900.5
12,873.42
2,845.0
3,901.39
4,920.67
5,909.52
6,905.7
7,915.55


**16.** Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos?

In [None]:
df_voos.where(col("arr_delay") > 30) \
    .groupBy(["origin", "dest"]) \
    .agg(F.count('arr_delay')) \
    .sort('count(arr_delay)', ascending=False).limit(3).display()

origin,dest,count(arr_delay)
LGA,ATL,1563
JFK,LAX,1286
LGA,ORD,1188


**17.** Para cada origem, qual o principal destino?  
**OBS:** Igual a pergunta 12.

In [None]:
destination_counts = df_voos.groupBy("origin", "dest").agg(
    count("*").alias("count")
)

window_spec = Window.partitionBy("origin").orderBy(col("count").desc())
destination_counts.withColumn(
    "rank", row_number().over(window_spec)
).filter(col("rank") == 1).drop("rank").display()

origin,dest,count
EWR,ORD,5841
JFK,LAX,11182
LGA,ATL,10063


# 4. Enriquecimento da base de dados
Duas APIs externas serão usadas para enriquecer a nossa base de dados
* [Weatherbit API](https://www.weatherbit.io/): Fornece dados históricos sobre as condições meteorológicas.
* [AirportDB API](https://airportdb.io/): Fornece informações detalhadas sobre aeroportos, incluindo
coordenadas geográficas.

In [None]:
import requests
from datetime import datetime, timedelta
from pyspark.sql.types import FloatType

Primeiramente, iremos coletar os dados dos aeroportos, para isso irei retornar todos os valores únicos das colunas `origin` e `dest` como uma lista.

In [None]:
airports = df_voos.select("origin").distinct().union(df_voos.select("dest").distinct()).distinct()
airports_list = airports.select('origin').rdd.flatMap(lambda x: x).collect()
airports_list[:5]

Out[28]: ['LGA', 'EWR', 'JFK', 'PSE', 'MSY']

A Weatherbit API precisa da latitude e longitude de cada aeroporto, então primeiramente irei obter essas informações com o AirportDB.

In [None]:
airportdb_key = "YOUR_API_KEY"

def get_lat_long(airport: str, airportdb_key: str):
    response = requests.get(url=f'https://airportdb.io/api/v1/airport/K{airport}?apiToken={airportdb_key}').json()

    lat = response.get("latitude_deg")
    lon = response.get("longitude_deg")
    return lat, lon

In [None]:
airports_lat_long = list()

for airport in airports_list:
    lat, lon = get_lat_long(airport=airport, airportdb_key=airportdb_key)
    airports_lat_long.append({
        "airport": airport,
        "lat": lat,
        "lon": lon
    })

airports_df = spark.createDataFrame(airports_lat_long)
airports_df.limit(5).display()

airport,lat,lon
LGA,40.777199,-73.872597
EWR,40.692501,-74.168701
JFK,40.639801,-73.7789
PSE,,
MSY,29.99340057373047,-90.25800323486328


Agora farei um join, adicionando a latitude a longitude do aeroporto de origem e de destino

In [None]:
df_coordinates = df_voos.join(airports_df, df_voos.origin == airports_df.airport, "inner")
df_coordinates = df_coordinates.withColumnRenamed("lat", "origin_lat") \
    .withColumnRenamed("lon", "origin_lon").drop("airport")

df_coordinates = df_coordinates.join(airports_df, df_coordinates.dest == airports_df.airport, "inner")
df_coordinates = df_coordinates.withColumnRenamed("lat", "dest_lat") \
    .withColumnRenamed("lon", "dest_lon").drop("airport")

df_coordinates.limit(5).display()

id,year,month,day,dep_time,sched_dep_time,dep_delay,arr_time,sched_arr_time,arr_delay,carrier,flight,tailnum,origin,dest,air_time,distance,hour,minute,time_hour,name,weekday,origin_lat,origin_lon,dest_lat,dest_lon
0,2013,1,1,517.0,515,2.0,830.0,819,11.0,UA,1545,N14228,EWR,IAH,227.0,1400,5,15,2013-01-01 05:00:00,United Air Lines Inc.,1,40.692501,-74.168701,29.984399795532227,-95.34140014648438
1,2013,1,1,533.0,529,4.0,850.0,830,20.0,UA,1714,N24211,LGA,IAH,227.0,1416,5,29,2013-01-01 05:00:00,United Air Lines Inc.,1,40.777199,-73.872597,29.984399795532227,-95.34140014648438
2,2013,1,1,542.0,540,2.0,923.0,850,33.0,AA,1141,N619AA,JFK,MIA,160.0,1089,5,40,2013-01-01 05:00:00,American Airlines Inc.,1,40.639801,-73.7789,25.79319953918457,-80.29060363769531
3,2013,1,1,544.0,545,-1.0,1004.0,1022,-18.0,B6,725,N804JB,JFK,BQN,183.0,1576,5,45,2013-01-01 05:00:00,JetBlue Airways,1,40.639801,-73.7789,,
4,2013,1,1,554.0,600,-6.0,812.0,837,-25.0,DL,461,N668DN,LGA,ATL,116.0,762,6,0,2013-01-01 06:00:00,Delta Air Lines Inc.,1,40.777199,-73.872597,33.6367,-84.428101


Agora, nesse etapa irei criar uma nova coluna, adicionando a velocidade do vento em metros por segundo usando o Weatherbit API.

In [None]:
def get_date(date):
    start_date = datetime.strptime(date, "%Y-%m-%d %H:%M:%S")
    end_date = start_date + timedelta(days=1)

    start_date = start_date.strftime("%Y-%m-%d")
    end_date = end_date.strftime("%Y-%m-%d")
    return start_date, end_date

def get_wind_speed(lat, lon, start_date):
    weatherbit_key = "YOUR_API_KEY"

    try:
        url = 'https://api.weatherbit.io/v2.0/history/daily'
        start_date, end_date = get_date(start_date)
        params = {
            'lat': lat,
            'lon': lon,
            'start_date': start_date,
            'end_date': end_date,
            'key': weatherbit_key
        }

        headers = {
            'Accept': 'application/json'
        }

        response = requests.get(url, params=params, headers=headers).json()

        return response.get("data")[0].get("wind_spd")
    except:
        return None

In [None]:
wind_udf = udf(get_wind_speed, FloatType())
df_wind = df_coordinates.select(["arr_delay", "origin_lat", "origin_lon", "dest_lat", "dest_lon", "time_hour"])
df_wind = df_wind.withColumn("origin_wind_speed", wind_udf("origin_lat", "origin_lon", "time_hour"))
df_wind = df_wind.withColumn("dest_wind_speed", wind_udf("dest_lat", "dest_lon", "time_hour"))
df_wind.limit(5).display()

arr_delay,origin_lat,origin_lon,dest_lat,dest_lon,time_hour,origin_wind_speed,dest_wind_speed
7.0,40.777199,-73.872597,36.1245002746582,-86.6781997680664,2013-09-30 21:00:00,,
8.0,40.777199,-73.872597,44.80739974975586,-68.8281021118164,2013-09-30 22:00:00,,
3.0,40.777199,-73.872597,26.072599,-80.152702,2013-09-30 21:00:00,,
-2.0,40.777199,-73.872597,37.50519943237305,-77.3197021484375,2013-09-30 20:00:00,,
-24.0,40.777199,-73.872597,38.13859939575195,-78.4529037475586,2013-09-30 21:00:00,,


Informações enriquecidas para os 5 voos com maior atraso na chegada

In [None]:
# OBS: Não executou
df_wind.sort("arr_delay", ascending=False).limit(5).display()

In [None]:
# OBS: Não executou
df_wind.write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("silver_airports_database")

OBS: Em diversas tentativas a célula acima ficava carregando e não terminava de executar, acredito que possa ser uma limitação da versão community do databrics.  
![salvando tabela silver](assets/4_silver.png)  
  
Na seção abaixo, deixei os scripts de treinamento e avaliação do modelo prontos, embora não tenha conseguido executar após diversas tentativas.
  
Para conseguir usar um modelo na API, fiz um treinamento de um modelo com dados fictícios.

# 5. Modelo de Machine Learning

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error

In [None]:
pandas_df = df_wind.toPandas()
pandas_df = pandas_df.dropna(subset=["origin_wind_speed", "dest_wind_speed"])
pandas_df.head()

In [None]:
train_df, test_df = train_test_split(pandas_df, test_size=0.3)

In [None]:
X = train_df[["origin_wind_speed", "dest_wind_speed"]]
y = train_df["arr_delay"]

linear_model = LinearRegression()
linear_model.fit(X, y)

In [None]:
y_pred = linear_model.predict(test_df[["origin_wind_speed", "dest_wind_speed"]])
r2_score(test_df["arr_delay"], y_pred)

In [None]:
mean_squared_error(test_df["arr_delay"], y_pred)