In [1]:
!pip install python-dotenv
!pip install snowflake-connector-python

Collecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Downloading python_dotenv-1.1.1-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.1.1
Collecting snowflake-connector-python
  Downloading snowflake_connector_python-4.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.0/77.0 kB[0m [31m879.7 kB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hCollecting asn1crypto<2.0.0,>0.24.0 (from snowflake-connector-python)
  Downloading asn1crypto-1.5.1-py2.py3-none-any.whl.metadata (13 kB)
Collecting filelock<4,>=3.5 (from snowflake-connector-python)
  Downloading filelock-3.20.0-py3-none-any.whl.metadata (2.1 kB)
Collecting tomlkit (from snowflake-connector-python)
  Downloading tomlkit-0.13.3-py3-none-any.whl.metadata (2.8 kB)
Collecting boto3>=1.24 (from snowflake-connector-python)
  Downloading bot

In [2]:
from dotenv import load_dotenv
import os
import pyspark
from pyspark.sql import SparkSession
import requests
import json
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import time

In [3]:
# Cargo mis variables de entorno
load_dotenv()
 
credencialesSnowflakeRaw = {
    "sfURL" : os.getenv("SNOWFLAKE_URL"),
    "sfUser" :  os.getenv("SNOWFLAKE_USER"),
    "sfPassword" : os.getenv("SNOWFLAKE_PASSWORD"),
    "sfDatabase" : os.getenv("SNOWFLAKE_DATABASE"),
    "sfSchema" : os.getenv("SNOWFLAKE_SCHEMA_RAW"),
    "sfWarehouse" : os.getenv("SNOWFLAKE_WAREHOUSE"),
    "sfRole" : os.getenv("SNOWFLAKE_ROLE"),
}

# Genero el dict de datos para conectarme con Snowflake a schema Analytics
credencialesSnowflakeAnalytics = credencialesSnowflakeRaw.copy()
credencialesSnowflakeAnalytics["sfSchema"]=os.getenv("SNOWFLAKE_SCHEMA_ANALYTICS")

In [4]:
# Creo SparkSession para conexión con Snowflake
spark = (SparkSession.builder.appName("IngestaNewYorkTaxis").config("spark.jars.packages", "net.snowflake:snowflake-jdbc:3.13.30,net.snowflake:spark-snowflake_2.12:2.9.0-spark_3.1").getOrCreate())

print(spark)
print("Spark Version : " + spark.version)

# Ejecuto una query de prueba para validar comunicacion con Snowflake
query = "SELECT current_version()"

df = spark.read.format("snowflake").options(**credencialesSnowflakeRaw).option("query", query).load()

df.show()

<pyspark.sql.session.SparkSession object at 0x7d9634dd7950>
Spark Version : 3.5.0
+-------------------+
|"CURRENT_VERSION()"|
+-------------------+
|             9.32.1|
+-------------------+



In [11]:
def generar_tabla_obt_taxis(month,year, byMonthOrNot=True):

    print("Iniciando proceso de carga de datos desde RAW para generar OBT en Schema Analytics")

    try:
        df_raw = spark.read.format("snowflake") \
                .options(**credencialesSnowflakeRaw) \
                .option("dbtable", "NY_TAXI_TRIPS_UNIFIED") \
                .load()
        
        if byMonthOrNot:
            df_raw = df_raw.filter((F.col("SOURCE_YEAR") == int(year)) & (F.col("SOURCE_MONTH") == int(month)))
        else:
            df_raw = df_raw
        
        df_zones = spark.read.format("snowflake") \
            .options(**credencialesSnowflakeRaw) \
            .option("dbtable", "NY_TAXI_RAW_TAXI_ZONES") \
            .load()

        #Agrego la data de las zones para pickup y para dropoff con joins
        pickup_str="PU_"
        dropoff_str="DO_"

        df_zones_pu = df_zones.alias("pickup_zones")
        df_zones_do = df_zones.alias("dropoff_zones")

        # Hago join para pickup locations
        df_raw_mas_pickup_zones = df_raw.join(df_zones_pu, df_raw.PULOCATIONID == F.col("pickup_zones.LOCATIONID"), "left").select(df_raw["*"],F.col("pickup_zones.BOROUGH").alias(pickup_str + "BOROUGH"),F.col("pickup_zones.ZONE").alias(pickup_str + "ZONE"),F.col("pickup_zones.SERVICE_ZONE").alias(pickup_str + "SERVICE_ZONE"))

        df_raw_mas_zones = df_raw_mas_pickup_zones.join(df_zones_do, df_raw_mas_pickup_zones.DOLOCATIONID == F.col("dropoff_zones.LOCATIONID"), "left").select(df_raw_mas_pickup_zones["*"],F.col("dropoff_zones.BOROUGH").alias(dropoff_str + "BOROUGH"),F.col("dropoff_zones.ZONE").alias(dropoff_str + "ZONE"),F.col("dropoff_zones.SERVICE_ZONE").alias(dropoff_str + "SERVICE_ZONE"))
        
        #Estandarizo nombres de columnas
        df_nombres_estandarizados=df_raw_mas_zones.withColumnRenamed("PULOCATIONID", pickup_str+"LOCATION_ID").withColumnRenamed("DOLOCATIONID", dropoff_str+"LOCATION_ID").withColumnRenamed("MTA_TAX", "METROPOLITAN_TAX").withColumnRenamed("TPEP_DROPOFF_DATETIME", "DROPOFF_DATETIME").withColumnRenamed("TPEP_PICKUP_DATETIME", "PICKUP_DATETIME").withColumnRenamed("SERVICE_TYPE", "SOURCE_SERVICE")
        #El resto de las columnas desde mi punto de vista tienen nombres entendibles

        #Derivadas documentadas: trip_duration_min, avg_speed_mph, tip_pct con reglas de cálculo y manejo de nulos/ceros.
        df_mas_trip_duration_min=df_nombres_estandarizados.withColumn("TRIP_DURATION_MIN", F.when(F.col("DROPOFF_DATETIME").isNotNull() & F.col("PICKUP_DATETIME").isNotNull(),(F.col("DROPOFF_DATETIME").cast("long") - F.col("PICKUP_DATETIME").cast("long")) / 60).otherwise(None))
        df_mas_avg_speed_mph=df_mas_trip_duration_min.withColumn("AVG_SPEED_MPH", F.when((F.col("TRIP_DURATION_MIN").isNotNull()) & (F.col("TRIP_DURATION_MIN") > 0) & (F.col("TRIP_DISTANCE").isNotNull()) &(F.col("TRIP_DISTANCE") >= 0),F.col("TRIP_DISTANCE") / (F.col("TRIP_DURATION_MIN") / 60)).otherwise(None))
        df_mas_tip_pct=df_mas_avg_speed_mph.withColumn("TIP_PCT", F.when((F.col("TOTAL_AMOUNT").isNotNull()) & (F.col("TOTAL_AMOUNT") > 0) & (F.col("TIP_AMOUNT").isNotNull()) & (F.col("TIP_AMOUNT") >= 0),(F.col("TIP_AMOUNT") / F.col("TOTAL_AMOUNT")) * 100).otherwise(None))
        df_mas_pickup_dates=df_mas_tip_pct.withColumn("PICKUP_DATE",F.to_date(F.col("PICKUP_DATETIME"))).withColumn("PICKUP_HOUR",F.hour(F.col("PICKUP_DATETIME")))
        df_mas_dropoff_dates=df_mas_pickup_dates.withColumn("DROPOFF_DATE",F.to_date(F.col("DROPOFF_DATETIME"))).withColumn("DROPOFF_HOUR",F.hour(F.col("DROPOFF_DATETIME")))
        df_mas_tiempos_adicionales=df_mas_dropoff_dates.withColumn("DAY_OF_WEEK", F.dayofweek(F.col("PICKUP_DATETIME"))).withColumn("MONTH", F.month(F.col("PICKUP_DATETIME"))).withColumn("YEAR", F.year(F.col("PICKUP_DATETIME")))

        #Regularizacion final de nombres de IDs con formato PALABRA_ID acorde a definicion de columnas de instrucciones de PSET
        df_obt=df_mas_tiempos_adicionales.withColumnRenamed("RATECODEID", "RATE_CODE_ID").withColumnRenamed("VENDORID", "VENDOR_ID")

        return df_obt
        
    except Exception as e:
        print(f"No se pudo generar la tabla OBT de Taxis: {e}")
        raise e

In [12]:
import snowflake.connector

def procesarTablaOBTIdempotencia(df, tabla: str, tabla_temp_name: str):

    #Esta sera mi query merge la cual verifica similitud con datos de clave natural (vendorID + timestamps + PU/DO) en caso de similitud hace update y sino hace insert de datos. Asi aseguro idempotencia y evito duplicados
    cols = df.columns

    # Armo dinámicamente la parte del UPDATE
    update_set = ",\n".join([f"target.{c} = source.{c}" for c in cols if c not in ["VENDOR_ID", "PICKUP_DATETIME", "DROPOFF_DATETIME", "PU_LOCATION_ID", "DO_LOCATION_ID"]])

    # Armo las listas de columnas e insert
    insert_cols = ", ".join(cols)
    insert_vals = ", ".join([f"source.{c}" for c in cols])
    
    
    merge_query = f"""
    MERGE INTO {credencialesSnowflakeAnalytics['sfDatabase']}.{credencialesSnowflakeAnalytics['sfSchema']}.{tabla} AS target
    USING {credencialesSnowflakeAnalytics['sfDatabase']}.{credencialesSnowflakeAnalytics['sfSchema']}.{tabla_temp_name} AS source
    ON target.VENDOR_ID = source.VENDOR_ID 
       AND target.PICKUP_DATETIME = source.PICKUP_DATETIME 
       AND target.DROPOFF_DATETIME = source.DROPOFF_DATETIME
       AND target.PU_LOCATION_ID = source.PU_LOCATION_ID
       AND target.DO_LOCATION_ID = source.DO_LOCATION_ID
    WHEN MATCHED THEN
        UPDATE SET
            {update_set}
    WHEN NOT MATCHED THEN
        INSERT (
            {insert_cols}
        ) VALUES (
            {insert_vals}
        )
    """

    #query para que posterior al merge se haga drop de tabla temporal
    drop_query= f"""DROP TABLE IF EXISTS {credencialesSnowflakeAnalytics['sfDatabase']}.{credencialesSnowflakeAnalytics['sfSchema']}.{tabla_temp_name}"""

    try:
        conn = snowflake.connector.connect(
        user=os.getenv("SNOWFLAKE_USER"),
        password=os.getenv("SNOWFLAKE_PASSWORD"),
        account=os.getenv("SNOWFLAKE_ACCOUNT"),
        warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
        database=os.getenv("SNOWFLAKE_DATABASE"),
        schema=os.getenv("SNOWFLAKE_SCHEMA_ANALYTICS")
        )
    
        cur = conn.cursor()

        #Hago la conexion con snowflake y ejecuto la query de merge y luego la de drop
        cur.execute(merge_query)
        cur.execute(drop_query)
    
        conn.commit()
        cur.close()
        conn.close()
        
    except Exception as e3:
        print(f"Fallo la migración de datos de la tabla temporal a la tabla OBT de Taxis: {e3}")
        raise e3
    else:
        print(f"Se migro exitosamente los datos de la tabla temporal a la principal OBT de Taxis") 
    

In [13]:
def guardar_con_reintentos(df, tabla, month, year, reintentos=3):
    
    for intento in range(reintentos):
        try:
            print(f"Intento {intento + 1} de {reintentos} para guardar en {tabla}")
            df_sin_dupes = df.dropDuplicates(["VENDOR_ID", "PICKUP_DATETIME", "DROPOFF_DATETIME", "PU_LOCATION_ID", "DO_LOCATION_ID"])
            
            tabla_temp_name=f"{tabla}_{month}_{year}_temp"
            df_sin_dupes.write.format("snowflake") \
                .options(**credencialesSnowflakeAnalytics) \
                .option("dbtable", tabla_temp_name) \
                .option("parallelism", 16) \
                .mode("overwrite") \
                .save()
            
            print(f"Tabla temporal creada exitosamente")
            procesarTablaOBTIdempotencia(df_sin_dupes, tabla, tabla_temp_name)
            return True
            
        except Exception as e:
            print(f"Error en intento {intento + 1}: {str(e)[:200]}...")
            if intento < reintentos - 1:
                wait_time = 30 
                print(f"Esperando {wait_time} segundos antes del reintento")
                time.sleep(wait_time)
            else:
                print(f"Todos los intentos fallaron para {tabla}")
                raise e

def guardar_por_lotes(df, tabla, month, year, batch_size=500000):
    
    total_count = df.count()
    num_partitions = max(1, total_count // batch_size)
    
    print(f"Dividiendo {total_count} filas en {num_partitions} particiones")
    
    df_reparticionado = df.repartition(num_partitions)
    
    return guardar_con_reintentos(df_reparticionado, tabla, month, year)

In [14]:
#Defino funciones tipicas de checkpoint para en caso de fallo no ingestar datos desde cero

def save_checkpoint(year, month, CHECKPOINT_FILE_COMBINADO):
    with open(CHECKPOINT_FILE_COMBINADO, "w") as f:
        json.dump({"year": year, "month": month}, f)

def load_checkpoint(CHECKPOINT_FILE_COMBINADO):
    if os.path.exists(CHECKPOINT_FILE_COMBINADO):
        with open(CHECKPOINT_FILE_COMBINADO, "r") as f:
            return json.load(f)
    return {"year": 0, "month": 0}

In [15]:
def cargar_tabla_obt_snowflake():

    print("Iniciando proceso de generación de tabla OBT")
    
    try:
        tabla_destino="NY_TAXI_OBT"
        CHECKPOINT_FILE_OBT="checkpointCargaOBT.json"

        lista_years = sorted([int(item) for item in (os.getenv("YEARS").split(','))])
        lista_months = sorted([int(item) for item in (os.getenv("MONTHS").split(','))])
            
        checkpoint=load_checkpoint(CHECKPOINT_FILE_OBT)
        print(f"checkpoint: {checkpoint}")

        if ( checkpoint != {"year": 0, "month": 0} and (int(checkpoint["month"]) in lista_months) and (int(checkpoint["year"]) in lista_years)):    
            if ( int(checkpoint["month"]) == lista_months[-1] and int(checkpoint["year"]) != lista_years[-1] ):
                lista_years= lista_years[lista_years.index(checkpoint["year"])+1:]
            elif ( int(checkpoint["month"]) != lista_months[-1] and int(checkpoint["year"]) != lista_years[-1] ): 
                lista_years= lista_years[lista_years.index(checkpoint["year"]):]
                lista_months= lista_months[lista_months.index(checkpoint["month"])+1:]
        else:
            #Si no hay ningun checkpoint logico genero estructura de tabla para carga desde cero de datos
            df_obt = generar_tabla_obt_taxis(0,0,False)
            df_obt.limit(0).write \
                .format("snowflake") \
                .options(**credencialesSnowflakeAnalytics) \
                .option("dbtable", tabla_destino) \
                .mode("overwrite") \
                .save()

        for year in lista_years:
            for month in lista_months:
                df_lote = generar_tabla_obt_taxis(month,year)
                print(f"Tabla OBT generada exitosamente para {month}-{year}.Se procedera a cargarlos en la base de Snowflake")
                guardar_por_lotes(df_lote, tabla_destino, month, year)
                print(f"Guardados correctamente datos OBT de taxis de year {year} + month {month}")
                save_checkpoint(year, month, CHECKPOINT_FILE_OBT)
            lista_months = sorted([int(item) for item in (os.getenv("MONTHS").split(','))])
        return True 
                
    except Exception as e2:
        print(f"Error generando o subiendo tabla OBT a Snowflake: {e2}")
        return False

In [10]:
resultado_OBT=cargar_tabla_obt_snowflake()

Iniciando proceso de generación de tabla OBT
Iniciando proceso de carga de datos desde RAW para generar OBT en Schema Analytics
Tabla OBT generada exitosamente. Total de filas: 852432668
checkpoint: {'year': 2019, 'month': 4}
Se han filtrado datos de year 2019 + month 5. Se procedera a cargarlos en la base de Snowflake
Dividiendo 8143897 filas en 16 particiones
Intento 1 de 3 para guardar en NY_TAXI_OBT
Tabla temporal creada exitosamente exitosamente
Se migro exitosamente los datos de la tabla temporal a la principal OBT de Taxis
Guardados correctamente datos OBT de taxis de year 2019 + month 5
Se han filtrado datos de year 2019 + month 6. Se procedera a cargarlos en la base de Snowflake
Dividiendo 7477798 filas en 14 particiones
Intento 1 de 3 para guardar en NY_TAXI_OBT
Tabla temporal creada exitosamente exitosamente
Se migro exitosamente los datos de la tabla temporal a la principal OBT de Taxis
Guardados correctamente datos OBT de taxis de year 2019 + month 6
Se han filtrado datos 

In [17]:
#Reprocesar un mes
try:
    df_obt_antes = spark.read.format("snowflake") \
                    .options(**credencialesSnowflakeAnalytics) \
                    .option("dbtable", "NY_TAXI_OBT") \
                    .load()
    print(f"Antes de reprocesar un mes total de filas: {df_obt_antes.count()}")
    tabla_destino="NY_TAXI_OBT"
    df_mes=generar_tabla_obt_taxis(5,2020)
    guardar_por_lotes(df_mes, tabla_destino, 5, 2020)
    print(f"Guardados correctamente datos OBT de taxis")

    df_obt_despues = spark.read.format("snowflake") \
                    .options(**credencialesSnowflakeAnalytics) \
                    .option("dbtable", "NY_TAXI_OBT") \
                    .load()
    print(f"Despues de reprocesar un mes total de filas: {df_obt_despues.count()}")
except Exception as e:
    print(f"Error reprocesando datos de un mes: {e}")

Antes de reprocesar un mes total de filas: 852432668
Iniciando proceso de carga de datos desde RAW para generar OBT en Schema Analytics
Dividiendo 405776 filas en 1 particiones
Intento 1 de 3 para guardar en NY_TAXI_OBT
Tabla temporal creada exitosamente
Se migro exitosamente los datos de la tabla temporal a la principal OBT de Taxis
Guardados correctamente datos OBT de taxis
Despues de reprocesar un mes total de filas: 852432668


In [18]:
print("Numero de filas igual antes y despues de reproceso. Se comprueba idempotencia en reproceso exitoso")

Numero de filas igual antes y despues de reproceso. Se comprueba idempotencia en reproceso exitoso
