In [11]:
import zipfile
import os
import pandas as pd


zip_file_path = os.path.join(os.getcwd(), "..", "data", "raw", "airports-database.zip")
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
    with zip_ref.open("airports-database.csv") as csv_file:
        df = pd.read_csv(csv_file)

In [12]:
import os
from dotenv import load_dotenv


# API keys do arquivo .env
load_dotenv()
airportdb_key = os.getenv("AIRPORT_DB")
weatherbit_key = os.getenv("WEATHERBIT")

In [13]:
from pyspark.sql import SparkSession
import requests
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, DoubleType, DateType
from datetime import timedelta
import time


# Inicializando a SparkSession
spark = SparkSession.builder \
    .appName("ETL") \
    .getOrCreate()

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")



def get_airport_data(airport_code, api_key):
    """
    Obtém informações sobre um aeroporto a partir de seu código usando a API do AirportDB.

    Parameters
    ----------
    airport_code : str
        Código do aeroporto (ex.: 'JFK').
    api_key : str
        Token da API para autenticação.

    Returns
    -------
    dict
        Dados do aeroporto em formato JSON.

    Examples
    --------
    >>> airport_data = get_airport_data('JFK', 'seu_api_token_aqui')
    >>> print(airport_data)
    {'ident': 'KJFK', 'type': 'large_airport', 'name': 'John F Kennedy International Airport', ...}
    """
    url = f"https://airportdb.io/api/v1/airport/K{airport_code}?apiToken={api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        response.raise_for_status()


def get_weather_history(lat, lon, start_date, end_date, api_key):
    """
    Obtém dados históricos de clima para uma localização específica usando a API do Weatherbit.

    Parameters
    ----------
    lat : float
        Latitude da localização (ex.: 40.7128).
    lon : float
        Longitude da localização (ex.: -74.0060).
    start_date : str
        Data de início no formato 'YYYY-MM-DD'.
    end_date : str
        Data de término no formato 'YYYY-MM-DD'.
    api_key : str
        Chave da API para autenticação.

    Returns
    -------
    dict
        Dados do clima em formato JSON.

    Examples
    --------
    >>> weather_data = get_weather_history(40.7128, -74.0060, '2023-01-01', '2023-01-02', 'seu_api_key_aqui')
    >>> print(weather_data)
    {'data': 'city_id': '5128581', 'city_name': 'New York City', 'country_code': 'US', ...}
    """
    url = "https://api.weatherbit.io/v2.0/history/daily"
    params = {
        'lat': lat,
        'lon': lon,
        'start_date': start_date,
        'end_date': end_date,
        'key': api_key
    }
    headers = {
        'Accept': 'application/json'
    }
    response = requests.get(url, params=params, headers=headers)
    if response.status_code == 200:
        return response.json()
    else:
        response.raise_for_status()


def enrich_airport_data(spark, df, api_token):
    """
    Enriquece um DataFrame do PySpark com informações de latitude e longitude dos aeroportos.

    Parameters
    ----------
    spark : SparkSession
        Sessão Spark.
    df : pyspark.sql.DataFrame
        DataFrame contendo a coluna 'airport_cod' com os códigos dos aeroportos.
    api_token : str
        Token da API para autenticação.

    Returns
    -------
    pyspark.sql.DataFrame
        DataFrame enriquecido com as colunas 'latitude_deg' e 'longitude_deg'.
    """
    unique_codes = df.select("airport_cod").distinct().rdd.flatMap(lambda x: x).collect()
    
    latitudes = []
    longitudes = []

    for code in unique_codes:
        try:
            airport_data = get_airport_data(code, api_token)
            latitudes.append((code, airport_data.get('latitude_deg'), airport_data.get('longitude_deg')))
        except requests.exceptions.RequestException as e:
            print(f"Erro ao obter dados do aeroporto {code}: {e}")
            latitudes.append((code, None, None))

    latlon_df = spark.createDataFrame(latitudes, ["airport_cod", "latitude_deg", "longitude_deg"])
    return df.join(latlon_df, on="airport_cod", how="left")


def get_unique_airports_by_date(df, airport_col):
    """
    Obtém um DataFrame do PySpark contendo aeroportos únicos por data.

    Parameters
    ----------
    df : pyspark.sql.DataFrame
        DataFrame contendo os dados de voos.
    airport_col : str
        Campo de origem ou destino ('origin' ou 'dest').

    Returns
    -------
    pyspark.sql.DataFrame
        DataFrame contendo aeroportos únicos e suas respectivas datas.
    """
    df = df.select(airport_col, "time_hour")
    df = df.withColumn("date", F.to_date("time_hour", "yyyy-MM-dd"))
    df = df.select(F.col(airport_col).alias("airport_cod"), "date").distinct()
    return df


# variável de controle para interromper o processamento
stop_execution = [False]


def enrich_weather_data(spark, df, api_key):
    """
    Enriquece um DataFrame do PySpark com informações de velocidade do vento para as coordenadas do aeroporto.
    Interrompe a execução ao receber o erro 429.

    Parameters
    ----------
    spark : SparkSession
        Sessão Spark.
    df : pyspark.sql.DataFrame
        DataFrame contendo colunas de data, latitude e longitude do aeroporto.
    api_key : str
        Chave da API para autenticação.

    Returns
    -------
    pyspark.sql.DataFrame
        DataFrame enriquecido com a coluna 'wind_spd', contendo os dados obtidos até o momento.
    """

    def fetch_weather(lat, lon, date):
        if lat is not None and lon is not None:
            if stop_execution[0]:
                return None  # evita fazer requisições adicionais após erro 429
            try:
                start_date = date.strftime("%Y-%m-%d")
                end_date = (date + timedelta(days=1)).strftime("%Y-%m-%d")

                # faz a chamada para a API de clima
                weather_data = get_weather_history(lat, lon, start_date, end_date, api_key)
                return weather_data["data"][0]["wind_spd"]
            except requests.exceptions.RequestException as e:
                if "429" in str(e):
                    print("Erro 429 recebido. Parando novas requisições.")
                    stop_execution[0] = True  # atualiza para parar futuras requisições
                return None  # retorna None para a UDF continuar o processamento restante
        else:
            return None

    fetch_weather_udf = F.udf(fetch_weather, DoubleType())

    # enriquecendo o DataFrame com a coluna 'wind_spd'
    df = df.withColumn("wind_spd", fetch_weather_udf(F.col("latitude_deg"), F.col("longitude_deg"), F.col("date")))

    return df


In [14]:
df = spark.createDataFrame(df)
df = df.withColumn("time_hour", F.to_timestamp("time_hour", "yyyy-MM-dd HH:mm:ss"))

# códigos de aeroportos únicos
origin_df = df.select("origin").distinct()
dest_df = df.select("dest").distinct()

airports_cod_df = origin_df.union(dest_df).distinct().withColumnRenamed("origin", "airport_cod")

# enriquecendo o DataFrame com latitudes e longitudes dos aeroportos
latlon_airport_data_df = enrich_airport_data(spark, airports_cod_df, airportdb_key)

latlon_airport_data_df.show()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


24/10/09 11:30:35 WARN TaskSetManager: Stage 75 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
24/10/09 11:31:00 WARN TaskSetManager: Stage 76 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Erro ao obter dados do aeroporto PSE: 404 Client Error: Not Found for url: https://airportdb.io/api/v1/airport/KPSE?apiToken=e1974b7ce50088c4bf8c096e8722010381ebe6e8aef06041cb3cb2901699c023addbc1da0ae7ff96a58434dad6b76639
Erro ao obter dados do aeroporto HNL: 404 Client Error: Not Found for url: https://airportdb.io/api/v1/airport/KHNL?apiToken=e1974b7ce50088c4bf8c096e8722010381ebe6e8aef06041cb3cb2901699c023addbc1da0ae7ff96a58434dad6b76639
Erro ao obter dados do aeroporto SJU: 404 Client Error: Not Found for url: https://airportdb.io/api/v1/airport/KSJU?apiToken=e1974b7ce50088c4bf8c096e8722010381ebe6e8aef06041cb3cb2901699c023addbc1da0ae7ff96a58434dad6b76639
Erro ao obter dados do aeroporto BQN: 404 Client Error: Not Found for url: https://airportdb.io/api/v1/airport/KBQN?apiToken=e1974b7ce50088c4bf8c096e8722010381ebe6e8aef06041cb3cb2901699c023addbc1da0ae7ff96a58434dad6b76639
Erro ao obter dados do aeroporto ANC: 404 Client Error: Not Found for url: https://airportdb.io/api/v1/airport/K

24/10/09 11:32:26 WARN TaskSetManager: Stage 84 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
24/10/09 11:32:36 WARN TaskSetManager: Stage 85 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-----------+------------------+------------------+
|airport_cod|      latitude_deg|     longitude_deg|
+-----------+------------------+------------------+
|        LGA|         40.777199|        -73.872597|
|        EWR|         40.692501|        -74.168701|
|        JFK|         40.639801|          -73.7789|
|        PSE|              NULL|              NULL|
|        MSY| 29.99340057373047|-90.25800323486328|
|        SNA|         33.675701|       -117.867996|
|        BUR|         34.197703|       -118.356378|
|        GRR|       42.88079834|      -85.52279663|
|        MYR|     33.6796989441|    -78.9282989502|
|        GSO|36.097801208496094|-79.93730163574219|
|        PVD|         41.732601|        -71.420403|
|        OAK|         37.721298|       -122.221001|
|        MSN|           43.1399|        -89.337502|
|        DCA|           38.8521|        -77.037697|
|        LEX|  38.0364990234375|-84.60590362548828|
|        ORF| 36.89459991455078|-76.20120239257812|
|        CRW

In [15]:
# obtendo aeroportos únicos por data para origem e destino
origin_df = get_unique_airports_by_date(df, "origin")
dest_df = get_unique_airports_by_date(df, "dest")
unique_airports_by_date_df = origin_df.union(dest_df).distinct()

unique_airports_by_date_df.show()

24/10/09 11:33:16 WARN TaskSetManager: Stage 96 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
24/10/09 11:33:32 WARN TaskSetManager: Stage 97 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-----------+----------+
|airport_cod|      date|
+-----------+----------+
|        JFK|2013-12-06|
|        EWR|2013-12-25|
|        EWR|2013-02-10|
|        LGA|2013-02-17|
|        LGA|2013-10-02|
|        EWR|2013-04-11|
|        LGA|2013-04-16|
|        EWR|2013-05-25|
|        EWR|2013-06-02|
|        LGA|2013-08-17|
|        EWR|2013-09-11|
|        JFK|2013-09-29|
|        JFK|2013-01-06|
|        EWR|2013-01-14|
|        LGA|2013-10-28|
|        LGA|2013-06-16|
|        LGA|2013-06-21|
|        LGA|2013-07-25|
|        LGA|2013-07-31|
|        JFK|2013-08-03|
+-----------+----------+
only showing top 20 rows



In [16]:
# unindo dados de lat/lon com aeroportos por data
airport_data_df = unique_airports_by_date_df.join(latlon_airport_data_df, on="airport_cod", how="inner")

airport_data_df.show()

24/10/09 11:34:05 WARN TaskSetManager: Stage 105 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
24/10/09 11:34:19 WARN TaskSetManager: Stage 106 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
24/10/09 11:34:32 WARN TaskSetManager: Stage 107 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
24/10/09 11:34:44 WARN TaskSetManager: Stage 108 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.

+-----------+----------+------------+-------------+
|airport_cod|      date|latitude_deg|longitude_deg|
+-----------+----------+------------+-------------+
|        LGA|2013-09-04|   40.777199|   -73.872597|
|        LGA|2013-04-10|   40.777199|   -73.872597|
|        LGA|2013-11-08|   40.777199|   -73.872597|
|        LGA|2013-09-15|   40.777199|   -73.872597|
|        LGA|2013-06-03|   40.777199|   -73.872597|
|        LGA|2013-05-28|   40.777199|   -73.872597|
|        LGA|2013-03-04|   40.777199|   -73.872597|
|        LGA|2013-09-08|   40.777199|   -73.872597|
|        LGA|2013-07-26|   40.777199|   -73.872597|
|        LGA|2013-09-17|   40.777199|   -73.872597|
|        LGA|2013-09-11|   40.777199|   -73.872597|
|        LGA|2013-05-17|   40.777199|   -73.872597|
|        LGA|2013-04-03|   40.777199|   -73.872597|
|        LGA|2013-02-23|   40.777199|   -73.872597|
|        LGA|2013-02-13|   40.777199|   -73.872597|
|        LGA|2013-01-11|   40.777199|   -73.872597|
|        LGA

                                                                                

In [17]:
# enriquecendo o DataFrame com dados climáticos (velocidade do vento)
final_airport_data_df = enrich_weather_data(spark, airport_data_df, weatherbit_key)

# DataFrame com valores não nulos na coluna 'wind_spd'
non_null_wind_spd_df = final_airport_data_df.filter(F.col("wind_spd").isNotNull())

In [18]:
# DataFrame com campo 'date'
df = df.withColumn("date", F.concat(F.col("year"), F.lit("-"), F.col("month"), F.lit("-"), F.col("day")))
df = df.withColumn("date", F.to_date("date", "yyyy-MM-dd"))

In [19]:
# join para obter o 'wind_spd_origin'
df_enriched = df.alias("o").join(
    non_null_wind_spd_df.drop("latitude_deg", "longitude_deg").alias("n"),
    (F.col("o.date").cast(StringType()) == F.col("n.date").cast(StringType())) &
    (F.col("o.origin").cast(StringType()) == F.col("n.airport_cod").cast(StringType())),
    "left"
).drop(F.col("n.date"), F.col("n.airport_cod"))

df_enriched = df_enriched.withColumn("wind_spd_origin", F.col("n.wind_spd")).drop(F.col("n.wind_spd"))

# join para obter o 'wind_spd_dest'
df_enriched = df_enriched.alias("o").join(
    non_null_wind_spd_df.drop("latitude_deg", "longitude_deg").alias("n"),
    (F.col("o.date").cast(StringType()) == F.col("n.date").cast(StringType())) &
    (F.col("o.dest").cast(StringType()) == F.col("n.airport_cod").cast(StringType())),
    "left"
).drop(F.col("n.date"), F.col("n.airport_cod"))

df_enriched = df_enriched.withColumn("wind_spd_dest", F.col("n.wind_spd")).drop(F.col("n.wind_spd"))

In [20]:
df_enriched.write.mode("overwrite").option("header", "true").parquet("../data/processed/")

24/10/09 11:35:00 WARN TaskSetManager: Stage 126 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
24/10/09 11:35:07 WARN TaskSetManager: Stage 127 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
24/10/09 11:35:14 WARN TaskSetManager: Stage 128 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
24/10/09 11:35:17 WARN TaskSetManager: Stage 129 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
24/10/09 11:35:22 WARN TaskSetManager: Stage 130 contains a task of very large size (3929 KiB). The maximum recommended task size is 1000 KiB.
Erro 429 recebido. Parando novas requisições.                       (0 + 1) / 1]
24/10/09 11:35:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                 