In [1]:
!pip install python-dotenv
!pip install snowflake-connector-python

Collecting python-dotenv
  Downloading python_dotenv-1.2.1-py3-none-any.whl.metadata (25 kB)
Downloading python_dotenv-1.2.1-py3-none-any.whl (21 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.2.1
Collecting snowflake-connector-python
  Downloading snowflake_connector_python-4.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.0/77.0 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting filelock<4,>=3.5 (from snowflake-connector-python)
  Downloading filelock-3.20.0-py3-none-any.whl.metadata (2.1 kB)
Collecting tomlkit (from snowflake-connector-python)
  Downloading tomlkit-0.13.3-py3-none-any.whl.metadata (2.8 kB)
Collecting boto3>=1.24 (from snowflake-connector-python)
  Downloading bot

In [2]:
from dotenv import load_dotenv
import os
import pyspark
from pyspark.sql import SparkSession
import requests
import json
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import time

In [3]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.5.jar -P ./

--2025-11-03 01:31:06--  https://jdbc.postgresql.org/download/postgresql-42.2.5.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 825943 (807K) [application/java-archive]
Saving to: ‘./postgresql-42.2.5.jar.3’


2025-11-03 01:31:07 (1.08 MB/s) - ‘./postgresql-42.2.5.jar.3’ saved [825943/825943]



In [4]:
import os
import requests

jar_path = "/home/jovyan/work/postgresql-42.2.5.jar"
print(f"JAR file exists: {os.path.exists(jar_path)}")
print(f"JAR file size: {os.path.getsize(jar_path) if os.path.exists(jar_path) else 'N/A'} bytes")

from dotenv import load_dotenv
load_dotenv()

print(f"PORT_POSTGRES: {os.getenv('PORT_POSTGRES')}")
print(f"POSTGRES_DB: {os.getenv('POSTGRES_DB')}")
print(f"POSTGRES_USER: {os.getenv('POSTGRES_USER')}")
print(f"POSTGRES_PASSWORD set: {bool(os.getenv('POSTGRES_PASSWORD'))}")

JAR file exists: True
JAR file size: 825943 bytes
PORT_POSTGRES: 5432
POSTGRES_DB: ny_taxi
POSTGRES_USER: usuario_spark
POSTGRES_PASSWORD set: True


In [5]:
try:
    spark.stop()
except:
    pass

import pyspark 
from pyspark.sql import SparkSession 
from pyspark.sql import Row 

spark = SparkSession.builder.config("spark.jars", "/home/jovyan/work/postgresql-42.2.5.jar").master("local").appName("PySpark_Postgres_test").getOrCreate()

In [6]:
df = spark.read.format("jdbc") \
    .option("url", f"jdbc:postgresql://warehouses:5432/{os.getenv('POSTGRES_DB')}") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "raw.taxi_zone_lookup") \
    .option("user", os.getenv('POSTGRES_USER')) \
    .option("password", os.getenv('POSTGRES_PASSWORD')) \
    .load()

In [7]:
df.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [8]:
def ingestar_zones_a_raw():
    SOURCE_PATH = os.getenv("SOURCE_PATH")
    path_url = f"{SOURCE_PATH}/misc/taxi_zone_lookup.csv"
    local_path = f"/tmp/taxiZones.parquet"
    
    # Descargo el archivo en carpeta temporal para posteriormente leerlo
    try:
        r = requests.get(path_url, stream=True)
        if r.status_code == 200:
            with open(local_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=10000000):
                    f.write(chunk)
        else:
            print(f"Archivo no encontrado en {path_url} (status {r.status_code})")
            return None
    except Exception as e:
        print(f"Error descargando {path_url}: {e}")
        return None
    else:
        print(f"Archivo obtenido exitosamente de: {path_url}")
    
    # Leo el archivo en un df de Spark
    try:
        df = spark.read.csv(local_path, header="true")
    except Exception as e:
        print(f"No se pudo leer {local_path}: {e}")
        return None
    else:
        print(f"Archivo leido exitosamente por Spark: {local_path}")

    conteoFilas = df.count()
    print(f"Ingestando hacia Snowflake datos de Zonas de Taxis. Total de filas: {conteoFilas}")

    try:
        df.write.format("jdbc") \
            .option("url", f"jdbc:postgresql://warehouses:5432/{os.getenv('POSTGRES_DB')}") \
            .option("driver", "org.postgresql.Driver") \
            .option("dbtable", "raw.taxi_zone_lookup") \
            .option("user", os.getenv('POSTGRES_USER')) \
            .option("password", os.getenv('POSTGRES_PASSWORD')) \
            .mode("overwrite") \
            .save()
    except Exception as e2:
        print(f"Error con ingreso de datos: {e2}")
        return None
    else:
        print("Zonas de taxis exportadas correctamente a Raw de Snowflake")

    #Me aseguro de eliminar el archivo parquet temporal
    try:
        os.remove(local_path)
        print(f"Archivo parquet temporal removido: {local_path}")
    except OSError as e:
        print(f"No se pudo remover el archivo parquet temporal {local_path}: {e}")

    #Retorno datos para tabla de conteos de datos consumidos por run
    return {
        "count": conteoFilas,
    }

In [9]:
#Cargo Zonas de Taxis en Snowflake
zonasTaxisIngesta=ingestar_zones_a_raw()
print(zonasTaxisIngesta)

Archivo obtenido exitosamente de: https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Archivo leido exitosamente por Spark: /tmp/taxiZones.parquet
Ingestando hacia Snowflake datos de Zonas de Taxis. Total de filas: 265
Zonas de taxis exportadas correctamente a Raw de Snowflake
Archivo parquet temporal removido: /tmp/taxiZones.parquet
{'count': 265}


In [10]:
import datetime

#Hago la presente funcion para generar un identificador unico asociado a cada carga de datos para el RUN_ID 
def generar_run_id():
    return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

In [13]:
import os 
import requests 
from pyspark.sql.functions import lit, current_timestamp 
from pyspark.sql.types import TimestampType

def ingestar_parquet_a_raw(service: str, year: int, month: int):
    SOURCE_PATH = os.getenv("SOURCE_PATH")
    path_url = f"{SOURCE_PATH}/trip-data/{service}_tripdata_{year}-{month:02d}.parquet"
    local_path = f"/tmp/{service}_tripdata_{year}-{month:02d}.parquet"
    print(path_url)
    
    # Descargo el archivo Parquet en carpeta temporal para posteriormente leerlo
    try:
        r = requests.get(path_url, stream=True)
        if r.status_code == 200:
            with open(local_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=10000000):
                    f.write(chunk)
        else:
            print(f"Archivo no encontrado en {path_url} (status {r.status_code})")
            return None
    except Exception as e:
        print(f"Error descargando {path_url}: {e}")
        return None
    else:
        print(f"Archivo obtenido exitosamente de: {path_url}")
    
    # Leo el archivo parquet en un df de Spark
    try:
        df = spark.read.parquet(local_path)
    except Exception as e:
        print(f"No se pudo leer {local_path}: {e}")
        return None
    else:
        print(f"Archivo leido exitosamente por Spark: {local_path}")

    run_id = generar_run_id()

    # Elimino columna conflictiva que solo esta presente en unos pocos parquets
    if 'cbd_congestion_fee' in df.columns:
        df = df.drop('cbd_congestion_fee')

    # Añado los metadatos indicados en instrucciones de PSET
    df_meta = df.withColumn("run_id", lit(run_id)) \
                .withColumn("service_type", lit(service)) \
                .withColumn("source_year", lit(year)) \
                .withColumn("source_month", lit(month)) \
                .withColumn("ingested_at_utc", current_timestamp()) \
                .withColumn("source_path", lit(path_url))

    # Convierto tipos de fecha a Timestamp porque me estaba marcando error al enviar datos al Snowflake sin esta transformacion
    for field in df_meta.schema.fields:
        if field.dataType.typeName() == "timestamp_ntz":
            df_meta = df_meta.withColumn(field.name, df_meta[field.name].cast(TimestampType()))

    primary_keys = ["tpep_pickup_datetime","tpep_dropoff_datetime","PULocationID","DoLocationID","VendorID"]

    if (service=="green"):
        primary_keys = ["lpep_pickup_datetime","lpep_dropoff_datetime","PULocationID","DoLocationID","VendorID"]

    df_meta = df_meta.dropDuplicates(primary_keys)

    conteoFilas = df_meta.count()
    print(f"Ingestando hacia Snowflake {service} {year}-{month}. Total de filas: {conteoFilas}")

    try:
        df_meta.write \
        .format("jdbc") \
        .option("url", f"jdbc:postgresql://warehouses:5432/{os.getenv('POSTGRES_DB')}") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", f"raw.{service}_taxi_trip") \
        .option("user", os.getenv('POSTGRES_USER')) \
        .option("password", os.getenv('POSTGRES_PASSWORD')) \
        .mode("append") \
        .save()
    except Exception as e2:
        print(f"Error con tabla temporal: {e2}")
        return None

    #Me aseguro de eliminar el archivo parquet temporal
    try:
        os.remove(local_path)
        print(f"Archivo parquet temporal removido: {local_path}")
    except OSError as e:
        print(f"No se pudo remover el archivo parquet temporal {local_path}: {e}")

    #Retorno datos para tabla de conteos de datos consumidos por run
    return {
        "year": year,
        "month": month,
        "count": conteoFilas,
        "run_id": run_id,
        "service_type": service
    }

In [16]:
import json
import os
import requests

#Defino funciones tipicas de checkpoint para en caso de fallo no ingestar datos desde cero
def save_checkpoint(year, month):
    with open(CHECKPOINT_FILE, "w") as f:
        json.dump({"year": year, "month": month}, f)

def load_checkpoint(CHECKPOINT_FILE):
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, "r") as f:
            return json.load(f)
    return {"year": 0, "month": 0}

#Genero arreglo que contendra datos de ingesta para tabla de conteos
resultadosGeneralesIngesta=[]

try:
    #Leo los datos de tipos de taxis, years y months desde variables de entorno
    tipos_taxis=os.getenv("SERVICES").split(',')
    for tipo_taxi in tipos_taxis:
        
        lista_years = sorted([int(item) for item in (os.getenv("YEARS").split(','))])
        lista_months = sorted([int(item) for item in (os.getenv("MONTHS").split(','))])
        #Cargo el checkpoint y en caso de que tenga registros recorto los arreglos de months y years para recorrer desde ultima ingesta exitosa
        CHECKPOINT_FILE = f"checkpointTaxis{tipo_taxi.capitalize()}.json"
        print(CHECKPOINT_FILE)
        checkpoint=load_checkpoint(CHECKPOINT_FILE)
        print(f"checkpoint: {checkpoint}")
        
        if ( checkpoint != {"year": 0, "month": 0} and (int(checkpoint["month"]) in lista_months) and (int(checkpoint["year"]) in lista_years)):    
            if ( int(checkpoint["month"]) == lista_months[-1] and int(checkpoint["year"]) != lista_years[-1] ):
                lista_years= lista_years[lista_years.index(checkpoint["year"])+1:]
            elif ( int(checkpoint["month"]) == lista_months[-1] and int(checkpoint["year"]) == lista_years[-1] ): 
                continue
            else:
                lista_years= lista_years[lista_years.index(checkpoint["year"]):]
                lista_months= lista_months[lista_months.index(checkpoint["month"])+1:]
        
        for year_taxi in lista_years:
            for month_taxi in lista_months:  
                #Llamo a la funcion de ingesta de datos iterativamente para cada mes, year y tipo de taxi
                print(f"Iniciando ingesta de datos de taxis {tipo_taxi}: {month_taxi}-{year_taxi}")
                resultadosParciales=ingestar_parquet_a_raw(tipo_taxi, year_taxi, month_taxi)
                #Guardo los resultados y genero el checkpint
                if (resultadosParciales != None):
                    resultadosGeneralesIngesta.append(resultadosParciales)
                    save_checkpoint(year_taxi,month_taxi)
            lista_months = sorted([int(item) for item in (os.getenv("MONTHS").split(','))])
                
except Exception as e5:
    #Como en todas las funciones vistas hago manejo de errores
    print(f"Fallo el proceso de ingesta masiva de datos de taxis NY: {e5}")
else:
    print("El proceso de ingesta masiva de taxis NY fue exitoso")

checkpointTaxisYellow.json
checkpoint: {'year': 2025, 'month': 9}
Iniciando ingesta de datos de taxis yellow: 10-2025
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-10.parquet
Archivo no encontrado en https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-10.parquet (status 403)
Iniciando ingesta de datos de taxis yellow: 11-2025
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet
Archivo no encontrado en https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet (status 403)
Iniciando ingesta de datos de taxis yellow: 12-2025
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-12.parquet
Archivo no encontrado en https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-12.parquet (status 403)
checkpointTaxisGreen.json
checkpoint: {'year': 0, 'month': 0}
Iniciando ingesta de datos de taxis green: 1-2020
https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2020-01.p