In [0]:
import requests

from pyspark.sql.functions import (
    col,
    concat,
    date_add,
    date_format,
    desc,
    lit,
    lpad,
    to_timestamp,
    when,
)
from pyspark.sql.utils import AnalysisException

airports-database

In [0]:
UC_VOLUME_PATH = '/Volumes/airports_database/default/airports_database/'
CSV_FILE_NAME = 'airports-database.csv'
FINAL_UC_PATH = UC_VOLUME_PATH + CSV_FILE_NAME


df_aeroportos = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load(FINAL_UC_PATH)
)

print("\nDataFrame criado com sucesso!")


DataFrame criado com sucesso!


In [0]:
def get_airport_codes(df):
    """
    Extrai e retorna uma lista de todos os códigos de aeroportos únicos presentes nas colunas 'origin' e 'dest' do DataFrame fornecido.

    Parâmetros:
        df (DataFrame): DataFrame Spark contendo as colunas 'origin' e 'dest' com os códigos dos aeroportos.

    Retorna:
        list: Lista de códigos de aeroportos únicos.
    """
    df_origin = (df.select("origin")
             .withColumnRenamed("origin", "airport_code")
             .distinct()
             )
    df_dest = (df.select("dest")
           .withColumnRenamed("origin", "airport_code")
           .distinct()
           )
    df_airport_code = (df_origin.union(df_dest)
                .dropDuplicates()
                )
    list_airport_code = [row.airport_code for row in df_airport_code.collect()]
    return list_airport_code

AirportDB API

In [0]:
with open("/Workspace/Users/ulisses.bomjardim@gmail.com/ulisses.bomjardim@gmail.com/PicPay/config/.venv", "r") as f:
    for line in f:
        if line.startswith("AIRPORTDB_CLIENT_SECRET="):
            airportdb_key = line.split("=", 1)[1].strip().replace('"', '')
            break
list_airport_code = get_airport_codes(df_aeroportos)

In [0]:
def fetch_airport_coordinates(list_airport_code, airportdb_key, cache_table="airportdb_cache"):
    """
    Busca e retorna as coordenadas (latitude e longitude) dos aeroportos cujos códigos estão presentes em list_airport_code,
    utilizando a API do AirportDB. Utiliza um cache em tabela Spark para evitar requisições desnecessárias à API.

    Parâmetros:
        list_airport_code (list): Lista de códigos de aeroportos a serem consultados.
        airportdb_key (str): Chave de acesso à API do AirportDB.
        cache_table (str, opcional): Nome da tabela Spark usada como cache dos resultados. Padrão: "airportdb_cache".

    Retorna:
        DataFrame: DataFrame Spark contendo as colunas 'airport_code', 'latitude_deg' e 'longitude_deg' com as coordenadas dos aeroportos.
    """
    special_codes = {
        "BQN": "VVPQ",
        "SJU": "TJSJ",
        "HNL": "PHNL",
        "STT": "TIST",
        "PSE": "TJPS",
        "ANC": "PANC"
    }

    try:
        df_cache = spark.table(cache_table)
        cached_codes = set(row.airport_code for row in df_cache.select("airport_code").collect())
    except AnalysisException:
        df_cache = None
        cached_codes = set()

    codes_to_fetch = [code for code in list_airport_code if code not in cached_codes]
    records = []

    for airport_code in codes_to_fetch:
        if airport_code in special_codes:
            query_code = special_codes[airport_code]
        else:
            query_code = f"K{airport_code}"
        url = f"https://airportdb.io/api/v1/airport/{query_code}?apiToken={airportdb_key}"
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            latitude = data.get("latitude_deg")
            longitude = data.get("longitude_deg")
            records.append((airport_code, latitude, longitude))

    df_new = spark.createDataFrame(records, ["airport_code", "latitude_deg", "longitude_deg"]) if records else None

    if df_cache is not None and df_new is not None:
        df_airportdb = df_cache.unionByName(df_new).dropDuplicates(["airport_code"])
    elif df_cache is not None:
        df_airportdb = df_cache
    elif df_new is not None:
        df_airportdb = df_new
    else:
        df_airportdb = spark.createDataFrame([], "airport_code string, latitude_deg double, longitude_deg double")

    df_airportdb.write.mode("overwrite").saveAsTable(cache_table)
    return df_airportdb

df_airportdb = fetch_airport_coordinates(list_airport_code, airportdb_key)
display(df_airportdb)

airport_code,latitude_deg,longitude_deg
MDW,41.785999,-87.752403
PDX,45.58869934,-122.5979996
CLT,35.2140007019043,-80.94309997558594
TVC,44.74140167236328,-85.58219909667969
DCA,38.8521,-77.037697
SFO,37.61899948120117,-122.375
BDL,41.9388999939,-72.68319702149999
HOU,29.64539909,-95.27890015
GRR,42.88079834,-85.52279663
HDN,40.48120117,-107.2180023


In [0]:
UC_VOLUME_PATH = '/Volumes/airports_database/default/airports_database/'
CSV_FILE_NAME = 'airportdb.csv'
FINAL_UC_PATH = UC_VOLUME_PATH + CSV_FILE_NAME

df_airportdb.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .option("sep", ";") \
    .mode("overwrite") \
    .save(FINAL_UC_PATH)

print(f"\n {CSV_FILE_NAME} criado com sucesso!")



 airportdb.csv criado com sucesso!


In [0]:
def adicionar_coordenadas_aeroportos(df_aeroportos, df_airportdb):
    """
    Realiza o join do DataFrame de voos (df_aeroportos) com o DataFrame de coordenadas dos aeroportos (df_airportdb),
    adicionando as colunas de latitude e longitude tanto para o aeroporto de origem quanto para o de destino.

    Parâmetros:
        df_aeroportos (DataFrame): DataFrame Spark contendo informações dos voos, incluindo as colunas 'origin' e 'dest'.
        df_airportdb (DataFrame): DataFrame Spark contendo as colunas 'airport_code', 'latitude_deg' e 'longitude_deg' com as coordenadas dos aeroportos.

    Retorna:
        DataFrame: DataFrame Spark resultante do join, contendo as colunas originais de df_aeroportos acrescidas de:
            - origin_latitude, origin_longitude: coordenadas do aeroporto de origem
            - dest_latitude, dest_longitude: coordenadas do aeroporto de destino
    """
    return (
        df_aeroportos
        .join(
            df_airportdb.withColumnRenamed("airport_code", "origin_code")
                        .withColumnRenamed("latitude_deg", "origin_latitude")
                        .withColumnRenamed("longitude_deg", "origin_longitude"),
            df_aeroportos.origin == col("origin_code"),
            "left"
        )
        .join(
            df_airportdb.withColumnRenamed("airport_code", "dest_code")
                        .withColumnRenamed("latitude_deg", "dest_latitude")
                        .withColumnRenamed("longitude_deg", "dest_longitude"),
            df_aeroportos.dest == col("dest_code"),
            "left"
        )
        .drop("origin_code", "dest_code")
    )

df_aeroportos_coordinates = adicionar_coordenadas_aeroportos(df_aeroportos, df_airportdb)

In [0]:
# Cria DataFrame com informações do aeroporto de origem
df_origin = df_aeroportos_coordinates.select(
    col("id"),
    col("origin").alias("location"),
    col("time_hour"),
    col("origin_latitude").alias("latitude"),
    col("origin_longitude").alias("longitude"),
    col("arr_delay")
)

# Cria DataFrame com informações do aeroporto de destino, ajustando o campo de horário
df_dest = df_aeroportos_coordinates.withColumn(
    "time_hour",
    to_timestamp(
        concat(
            col("year"), lit("-"),
            lpad(col("month"), 2, "0"), lit("-"),
            lpad(col("day"), 2, "0"), lit(" "),
            lpad(col("sched_arr_time").cast("string"), 4, "0")
        ),
        "yyyy-MM-dd HHmm"
    )
).select(
    col("id"),
    col("dest").alias("location"),
    col("time_hour"),
    col("dest_latitude").alias("latitude"),
    col("dest_longitude").alias("longitude"),
    col("arr_delay")
)

# Une os DataFrames de origem e destino
df_prep_wind = df_origin.union(df_dest)

# Adiciona coluna de data inicial no formato yyyy-MM-dd
df_prep_wind = df_prep_wind.withColumn("start_date", date_format(col("time_hour"), "yyyy-MM-dd"))
# Adiciona coluna de data final (um dia após a data inicial) no formato yyyy-MM-dd
df_prep_wind = df_prep_wind.withColumn("end_date", date_format(date_add(col("time_hour"), 1), "yyyy-MM-dd"))
# Adiciona coluna de hora no formato HH:00:00
df_prep_wind = df_prep_wind.withColumn("hour", date_format(col("time_hour"), "HH:00:00"))

# Adiciona coluna de atraso formatado em horas e minutos
df_prep_wind = df_prep_wind.withColumn(
    "hour_delay",
    when(
        (col("arr_delay").cast("int").isNotNull()) & (col("arr_delay").cast("int") > 0),
        concat(
            lpad((col("arr_delay").cast("int") / 100).cast("int").cast("string"), 2, "0"),
            lit(":"),
            lpad((col("arr_delay").cast("int") % 100).cast("string"), 2, "0")
        )
    ).otherwise(None)
)

# Ordena pelo maior atraso e seleciona os 10 maiores
df_prep_wind = df_prep_wind.orderBy(desc("arr_delay")).limit(10)

display(df_prep_wind)

id,location,time_hour,latitude,longitude,arr_delay,start_date,end_date,hour,hour_delay
7072,HNL,2013-01-09T15:30:00.000Z,21.32062,-157.924228,1272.0,2013-01-09,2013-01-10,15:00:00,12:72
7072,JFK,2013-01-09T09:00:00.000Z,40.639801,-73.7789,1272.0,2013-01-09,2013-01-10,09:00:00,12:72
235778,JFK,2013-06-15T19:00:00.000Z,40.639801,-73.7789,1127.0,2013-06-15,2013-06-16,19:00:00,11:27
235778,CMH,2013-06-15T21:20:00.000Z,39.998001,-82.891899,1127.0,2013-06-15,2013-06-16,21:00:00,11:27
8239,EWR,2013-01-10T16:00:00.000Z,40.692501,-74.168701,1109.0,2013-01-10,2013-01-11,16:00:00,11:09
8239,ORD,2013-01-10T18:10:00.000Z,41.9786,-87.9048,1109.0,2013-01-10,2013-01-11,18:00:00,11:09
327043,JFK,2013-09-20T18:00:00.000Z,40.639801,-73.7789,1007.0,2013-09-20,2013-09-21,18:00:00,10:07
327043,SFO,2013-09-20T22:10:00.000Z,37.61899948120117,-122.375,1007.0,2013-09-20,2013-09-21,22:00:00,10:07
270376,JFK,2013-07-22T16:00:00.000Z,40.639801,-73.7789,989.0,2013-07-22,2013-07-23,16:00:00,09:89
270376,CVG,2013-07-22T18:15:00.000Z,39.048801,-84.667801,989.0,2013-07-22,2013-07-23,18:00:00,09:89


Weatherbit API

In [0]:
with open("/Workspace/Users/ulisses.bomjardim@gmail.com/ulisses.bomjardim@gmail.com/PicPay/config/.venv", "r") as f:
    for line in f:
        if line.startswith("WEATHERBIT_CLIENT_SECRET="):
            weatherbit_key = line.split("=", 1)[1].strip().replace('"', '')
            break

In [0]:
def fetch_wind_speed(df_prep_wind, weatherbit_key):
    """
    Busca e retorna a velocidade do vento (wind_spd) para cada registro do DataFrame fornecido,
    utilizando a API do Weatherbit. Para cada linha do DataFrame, realiza uma requisição à API
    com as coordenadas, data de início e fim, e armazena a velocidade do vento retornada.

    Parâmetros:
        df_prep_wind (DataFrame): DataFrame Spark contendo as colunas 'id', 'location', 'latitude', 'longitude', 'start_date', 'end_date', 'hour', 'time_hour'.
        weatherbit_key (str): Chave de acesso à API do Weatherbit.

    Retorna:
        DataFrame: DataFrame Spark contendo as colunas 'id', 'location', 'time_hour', 'latitude', 'longitude', 'wind_spd' com a velocidade do vento para cada registro.
    """
    url = 'https://api.weatherbit.io/v2.0/history/daily'
    records = []
    rows = df_prep_wind.select("id", "location", "latitude", "longitude", "start_date", "end_date", "hour", "time_hour").collect()
    for row in rows:
        params = {
            'lat': row['latitude'],
            'lon': row['longitude'],
            'start_date': row['start_date'],
            'end_date': row['end_date'],
            'key': weatherbit_key,
        }
        headers = {'Accept': 'application/json'}
        response = requests.get(url, params=params, headers=headers)
        wind_spd = None
        if response.status_code == 200:
            data = response.json()
            if "data" in data and len(data["data"]) > 0:
                wind_spd = data["data"][0].get("wind_spd")
        records.append((row['id'], row['location'], row['time_hour'], row['latitude'], row['longitude'], wind_spd))
    df_aeroportos_wind = spark.createDataFrame(records, ["id", "location", "time_hour", "latitude", "longitude", "wind_spd"])
    return df_aeroportos_wind

df_aeroportos_wind = fetch_wind_speed(df_prep_wind, weatherbit_key)
display(df_aeroportos_wind)

id,location,time_hour,latitude,longitude,wind_spd
7072,HNL,2013-01-09T15:30:00.000Z,21.32062,-157.924228,5.5
7072,JFK,2013-01-09T09:00:00.000Z,40.639801,-73.7789,3.5
235778,JFK,2013-06-15T19:00:00.000Z,40.639801,-73.7789,4.1
235778,CMH,2013-06-15T21:20:00.000Z,39.998001,-82.891899,1.9
8239,EWR,2013-01-10T16:00:00.000Z,40.692501,-74.168701,4.1
8239,ORD,2013-01-10T18:10:00.000Z,41.9786,-87.9048,4.1
327043,JFK,2013-09-20T18:00:00.000Z,40.639801,-73.7789,3.7
327043,SFO,2013-09-20T22:10:00.000Z,37.61899948120117,-122.375,4.3
270376,JFK,2013-07-22T16:00:00.000Z,40.639801,-73.7789,3.2
270376,CVG,2013-07-22T18:15:00.000Z,39.048801,-84.667801,2.3


In [0]:
UC_VOLUME_PATH = '/Volumes/airports_database/default/airports_database/'
CSV_FILE_NAME = 'aeroportos_wind.csv'
FINAL_UC_PATH = UC_VOLUME_PATH + CSV_FILE_NAME

df_aeroportos_wind.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .option("sep", ";") \
    .mode("overwrite") \
    .save(FINAL_UC_PATH)

print(f"\n {CSV_FILE_NAME} criado com sucesso!")


 aeroportos_wind.csv criado com sucesso!


**Pergunta final:** Enriqueça a base de dados de voos com as condições meteorológicas 
(velocidade do vento) para os aeroportos de origem e destino. Mostre as informações 
enriquecidas **apenas** para os 5 voos com maior atraso na chegada. 

In [0]:
df_aeroportos_enriched = (
    df_aeroportos_coordinates.select(
        "id",
        "carrier",
        "flight",
        "origin",
        "dest",
        "arr_delay",
        "sched_arr_time",
        "origin_latitude",
        "origin_longitude",
        "dest_latitude",
        "dest_longitude",
    )
    .join(
        df_aeroportos_wind.withColumnRenamed("wind_spd", "wind_spd_origin")
                          .withColumnRenamed("id", "id_origin")
                          .withColumnRenamed("location", "location_origin"),
        (col("id") == col("id_origin")) & (col("origin") == col("location_origin")),
        "left"
    ).drop("id_origin", "location_origin")
    .join(
        df_aeroportos_wind.withColumnRenamed("wind_spd", "wind_spd_dest")
                          .withColumnRenamed("id", "id_dest")
                          .withColumnRenamed("location", "location_dest"),
        (col("id") == col("id_dest")) & (col("dest") == col("location_dest")),
        "left"
    ).drop("id_dest", "location_dest")
    .select(
        "id",
        "carrier",
        "flight",
        "origin",
        "dest",
        "arr_delay",
        "sched_arr_time",
        "origin_latitude",
        "origin_longitude",
        "dest_latitude",
        "dest_longitude",
        "wind_spd_origin",
        "wind_spd_dest"
    )
    .orderBy(desc("arr_delay"))
    .limit(5)
)

display(df_aeroportos_enriched)

id,carrier,flight,origin,dest,arr_delay,sched_arr_time,origin_latitude,origin_longitude,dest_latitude,dest_longitude,wind_spd_origin,wind_spd_dest
7072,HA,51,JFK,HNL,1272.0,1530,40.639801,-73.7789,21.32062,-157.924228,3.5,5.5
235778,MQ,3535,JFK,CMH,1127.0,2120,40.639801,-73.7789,39.998001,-82.891899,4.1,1.9
8239,MQ,3695,EWR,ORD,1109.0,1810,40.692501,-74.168701,41.9786,-87.9048,4.1,4.1
327043,AA,177,JFK,SFO,1007.0,2210,40.639801,-73.7789,37.61899948120117,-122.375,3.7,4.3
270376,MQ,3075,JFK,CVG,989.0,1815,40.639801,-73.7789,39.048801,-84.667801,3.2,2.3


In [0]:
UC_VOLUME_PATH = '/Volumes/airports_database/default/airports_database/'
CSV_FILE_NAME = 'aeroportos_enriched.csv'
FINAL_UC_PATH = UC_VOLUME_PATH + CSV_FILE_NAME

df_aeroportos_enriched.coalesce(1).write.format("csv") \
    .option("header", "true") \
    .option("sep", ";") \
    .mode("overwrite") \
    .save(FINAL_UC_PATH)

print(f"\n {CSV_FILE_NAME} criado com sucesso!")


 aeroportos_enriched.csv criado com sucesso!
