In [1]:
# Celda 1: Importar
import polars as pl
import os
print(f"Polars V: {pl.__version__}")

# Definir rutas
BASE_DIR = os.path.abspath(os.path.join(os.getcwd(), '..'))
RAW_DATA_PATH = os.path.join(BASE_DIR, '01_data', 'rutas_expanded.csv')
print(f"Ruta al CSV: {RAW_DATA_PATH}")


Polars V: 1.35.2
Ruta al CSV: /home/ubuntu-t430/Desktop/gcp-logistics-pipeline/01_data/rutas_expanded.csv


In [2]:
# Celda 2 (MODIFICADA)
print("Cargando datos crudos...")

# ¡CAMBIO CLAVE! Usamos 'dtypes' para forzar 'date' a ser texto
df_raw = pl.read_csv(
    RAW_DATA_PATH,
    dtypes={"date": pl.String} 
)

print(f"Datos cargados: {df_raw.shape}")
print("--- Tipo de dato 'date' (debería ser String) ---")
# Verificamos que Polars haya obedecido
print(df_raw.schema["date"])
display(df_raw.head(3))


Cargando datos crudos...


  df_raw = pl.read_csv(


Datos cargados: (246836, 35)
--- Tipo de dato 'date' (debería ser String) ---
String


route_id,date,weekday,week_number,driver_id,origin_barrio,lat_origin,lon_origin,barrios_recorridos,tipo_vehiculo,vehicle_capacity_m3,volumen_cargado_m3,carga_pct,n_paradas,n_paquetes,cliente_top_count,km_total,desviacion_km,velocidad_media_kmh,duracion_min,tiempo_conduccion_min,tiempo_paradas_min,tiempo_espera_deposito_min,tiempo_carga_min,n_incidentes,motivo_incidente,paquetes_no_entregados,porcentaje_entregas_exitosas,prioridad,entregas_retrasadas,clima,turno,estado_ruta,notas_logisticas,costo_est_operativo
i64,str,i64,i64,i64,str,f64,f64,str,str,i64,f64,f64,i64,i64,i64,f64,f64,f64,i64,i64,i64,i64,i64,i64,str,i64,f64,str,i64,str,str,str,str,f64
2000,"""2025-01-27 00:00:00""",0,4,3,"""La Plata""",-34.922486,-57.95277,"""La Plata; Tolosa""","""Furgoneta""",8,7.84,98.0,71,81,6,24.6,0.07,17.57,224,84,130,10,16,1,"""Cliente ausente""",1,98.77,"""media""",1,"""Nublado""","""mañana""","""con_incidente""","""Devolver a depósito""",29.01
2001,"""2025-03-25 00:00:00""",1,12,1,"""Abasto""",-34.820862,-57.922538,"""Abasto; Villa Elisa; City Bell""","""Camioneta""",13,4.41,33.9,85,102,7,18.7,0.99,13.68,241,82,159,2,32,0,,0,100.0,"""alta""",0,"""Despejado""","""mañana""","""completa_ok""",,30.33
2002,"""2025-01-11 00:00:00""",5,1,3,"""Abasto""",-34.817327,-57.916802,"""Abasto; Lisandro Olmos""","""Camioneta""",13,6.04,46.5,88,101,8,29.4,0.64,19.6,254,90,164,9,34,0,,0,100.0,"""baja""",0,"""Despejado""","""mañana""","""completa_ok""",,33.1


In [3]:
# Celda 3 (MODIFICADA CON DEPURACIÓN)
print("Limpiando y tipificando...")

df_clean = df_raw.select(
    pl.col("route_id").cast(pl.Int64),
    
    # LÍNEA CORREGIDA:
    pl.col("date").str.to_datetime(format="%Y-%m-%d %H:%M:%S", strict=False).cast(pl.Date).alias("date"),
    
    pl.col("driver_id").cast(pl.Int64),
    pl.col("origin_barrio").cast(pl.String),
    pl.col("barrios_recorridos").cast(pl.String),
    pl.col("tipo_vehiculo").cast(pl.String),
    pl.col("volumen_cargado_m3").cast(pl.Float64),
    pl.col("carga_pct").cast(pl.Float64),
    pl.col("n_paradas").cast(pl.Int64),
    pl.col("n_paquetes").cast(pl.Int64),
    pl.col("km_total").cast(pl.Float64),
    pl.col("velocidad_media_kmh").cast(pl.Float64),
    pl.col("duracion_min").cast(pl.Int64),
    pl.col("tiempo_conduccion_min").cast(pl.Int64),
    pl.col("tiempo_paradas_min").cast(pl.Int64),
    pl.col("porcentaje_entregas_exitosas").cast(pl.Float64),
    pl.col("clima").cast(pl.String),
    pl.col("estado_ruta").cast(pl.String),
    pl.col("n_incidentes").cast(pl.Int64)
).filter(pl.col("route_id").is_not_null()) # Quitar filas malas

# --- CÓDIGO DE DEPURACIÓN INTEGRADO ---
print("¡Datos limpios listos!")
print("--- Conteo de nulos en la columna 'date' post-conversión ---")

# Contamos cuántos nulos hay AHORA en la columna 'date' limpia
null_count = df_clean.select(pl.col("date").is_null().sum()).item()
print(f"Total de fechas nulas: {null_count}")

if null_count == len(df_clean):
    print("\n¡ERROR DE DEPURACIÓN! TODAS las fechas siguen siendo nulas.")
    print("Mostrando 5 filas 'crudas' para ver el formato original:")
    display(df_raw.head(5).select("date"))
elif null_count > 0:
    print(f"\n¡ADVERTENCIA! Se encontraron {null_count} fechas nulas.")
    print("Mostrando 5 filas 'crudas' que fallaron la conversión:")
    display(
        df_raw.filter(
            pl.col("date").str.to_date(format="%d/%m/%Y", strict=False).is_null()
        ).head(5).select("date")
    )
else:
    print("\n¡ÉXITO! Todas las fechas se convirtieron correctamente.")
    
print("\n--- Esquema final de df_clean (la columna 'date' debe ser 'Date') ---")
print(df_clean.schema)

Limpiando y tipificando...
¡Datos limpios listos!
--- Conteo de nulos en la columna 'date' post-conversión ---
Total de fechas nulas: 0

¡ÉXITO! Todas las fechas se convirtieron correctamente.

--- Esquema final de df_clean (la columna 'date' debe ser 'Date') ---
Schema({'route_id': Int64, 'date': Date, 'driver_id': Int64, 'origin_barrio': String, 'barrios_recorridos': String, 'tipo_vehiculo': String, 'volumen_cargado_m3': Float64, 'carga_pct': Float64, 'n_paradas': Int64, 'n_paquetes': Int64, 'km_total': Float64, 'velocidad_media_kmh': Float64, 'duracion_min': Int64, 'tiempo_conduccion_min': Int64, 'tiempo_paradas_min': Int64, 'porcentaje_entregas_exitosas': Float64, 'clima': String, 'estado_ruta': String, 'n_incidentes': Int64})


In [4]:
# Celda 4: (L)oad - Configurar Cliente de BigQuery
from google.cloud import bigquery

# El ID de tu proyecto
PROJECT_ID = "logistics-prod-analysis"
# El ID del Dataset (Schema) que queremos crear
DATASET_ID = "logistica_prod"
# El ID de la tabla que queremos crear
TABLE_ID = "rutas_clean"

# La autenticación (gcloud auth login) que hiciste en la terminal 
# será usada automáticamente por este cliente. ¡Magia!
client = bigquery.Client(project=PROJECT_ID)

# Define la ruta completa de la tabla
table_ref = client.dataset(DATASET_ID).table(TABLE_ID)
print(f"Cliente de BigQuery listo. Apuntando a: {table_ref.path}")


Cliente de BigQuery listo. Apuntando a: /projects/logistics-prod-analysis/datasets/logistica_prod/tables/rutas_clean


In [5]:
# Celda 5: (L)oad - Crear el Dataset (si no existe)
# Esto es como 'CREATE SCHEMA' en SQL

dataset = bigquery.Dataset(client.dataset(DATASET_ID))
dataset.location = "US" # (Elige una ubicación de "always-free")

try:
    client.create_dataset(dataset, timeout=30)
    print(f"Dataset {DATASET_ID} creado.")
except Exception as e:
    if "Already Exists" in str(e):
        print(f"Dataset {DATASET_ID} ya existe. ¡Bien!")
    else:
        raise e


Dataset logistica_prod ya existe. ¡Bien!


In [6]:
# Celda 6: (L)oad - Cargar el DataFrame a BigQuery
# ¡Este es el paso clave!
# Convertimos de Polars -> Arrow -> BigQuery (súper rápido)
# Si Polars da error, usa .to_pandas()

print("Iniciando carga de DataFrame a BigQuery...")
# Polars se integra con Arrow, que BigQuery entiende
# .to_arrow() es más eficiente que .to_pandas()
job_config = bigquery.LoadJobConfig(
    # 'WRITE_TRUNCATE' borra la tabla y la recrea. Perfecto para desarrollo.
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)

# ¡La acción!
load_job = client.load_table_from_dataframe(
    df_clean.to_pandas(), # Usar .to_pandas() es más estable para la librería de BQ
    table_ref,
    job_config=job_config
)

print("Trabajo de carga enviado. Esperando a que termine...")
load_job.result() # Espera a que el trabajo termine

print(f"¡Éxito! {load_job.output_rows} filas cargadas en {PROJECT_ID}.{DATASET_ID}.{TABLE_ID}")


Iniciando carga de DataFrame a BigQuery...




Trabajo de carga enviado. Esperando a que termine...
¡Éxito! 246836 filas cargadas en logistics-prod-analysis.logistica_prod.rutas_clean
