In [9]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types
import os
import requests

In [None]:
spark = SparkSession.builder \
    .master("spark://Vinicius.:7077") \
    .appName('EtlScript') \
    .getOrCreate()

In [4]:
green_table = 'public."green_taxi"'
yellow_table = 'public."yellow_taxi"'
database = "testes_data_engineering"
password = "root"
user = "root"

In [5]:
def download_parquets(taxi_type: str, year: int):
    """
    Faz o download dos arquivos Parquet de viagens de táxi da NYC Taxi & Limousine Commission.
    
    Parâmetros:
        taxi_type (str): Tipo de táxi (ex: "yellow", "green", "fhv")
        year (int): Ano desejado (ex: 2021)
    """
    url_prefix = "https://d37ci6vzurychx.cloudfront.net/trip-data/"

    for month in range(1, 13):
        fmonth = f"{month:02d}"
        filename = f"{taxi_type}_tripdata_{year}-{fmonth}.parquet"
        url = f"{url_prefix}{filename}"

        local_prefix = f"data/raw/{taxi_type}/{year}"
        local_path = os.path.join(local_prefix, filename)

        os.makedirs(local_prefix, exist_ok=True)

        print(f"Downloading {url} to {local_path}")
        
        try:
            response = requests.get(url)
            response.raise_for_status()  # Lança erro se status != 200
            with open(local_path, "wb") as f:
                f.write(response.content)
        except requests.exceptions.RequestException as e:
            print(f"Erro ao baixar {url}: {e}")

In [6]:
def ingest_on_postgres(df, table, user, pwd, db):
    try:
        rows_imported = 0
        print(f'importing rows {rows_imported} to {rows_imported + df.count()}... for table {table}')
        df.write.mode("overwrite") \
        .format("jdbc") \
        .option("url", f"jdbc:postgresql://localhost:5432/{db}") \
        .option("user", user) \
        .option("password", pwd) \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", table) \
        .save()
        print("Data imported successful")
        rows_imported += df.count()
    except Exception as e:
        print("Data load error: " + str(e))

In [7]:
download_parquets('green', 2025)

Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-01.parquet to data/raw/green/2025/green_tripdata_2025-01.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-02.parquet to data/raw/green/2025/green_tripdata_2025-02.parquet
Erro ao baixar https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-02.parquet: 403 Client Error: Forbidden for url: https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-02.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-03.parquet to data/raw/green/2025/green_tripdata_2025-03.parquet
Erro ao baixar https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-03.parquet: 403 Client Error: Forbidden for url: https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-03.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-04.parquet to data/raw/green/2025/green_tripdata_2025-04.parque

In [8]:
# todos os data frames possuem mesmo schema, portanto posso lê-los de uma vez
df_green = spark.read \
    .parquet('data/raw/green/*/*')

                                                                                

In [23]:
ingest_on_postgres(df_green, green_table, user, password, database)

importing rows 0 to 48326... for table public."green_taxi"


                                                                                

Data imported successful
