In [None]:
# ============================================================
# PIPELINE DE MOVILIDAD - GOOGLE COLAB
# ============================================================

# CELDA 1: INSTALACIÓN DE DEPENDENCIAS
# ------------------------------------------------------------
print("Instalar dependencias")
!pip install -q redis duckdb pandas pyarrow requests

print("Terminado Dependencias instaladas\n")


Instalar dependencias
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m354.2/354.2 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25hTerminado Dependencias instaladas



In [None]:
# ============================================================
# Descargar datos NYC TAXI
# ============================================================
import requests
import pandas as pd
import os

print("=" * 60)
print("Descarga de datos NYC TAXI Enero 2024")
print("=" * 60)

url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
parquet_file = "yellow_tripdata_2024-01.parquet"

if not os.path.exists(parquet_file):
    print("Descarga del archivo)")
    response = requests.get(url, stream=True)
    total_size = int(response.headers.get('content-length', 0))

    with open(parquet_file, 'wb') as f:
        downloaded = 0
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
            downloaded += len(chunk)
            percent = (downloaded / total_size) * 100
            print(f"Progreso: {percent:.1f}%", end='\r')

    print("\nDescarga completada")
else:
    print("El Archivo ya existe")

# Verificar datos
df = pd.read_parquet(parquet_file)
print(f"\n Registros totales: {len(df):,}")
print(f"Rango de fechas: {df['tpep_pickup_datetime'].min()} a {df['tpep_pickup_datetime'].max()}")
print("\nPrimeros registros:")
print(df.head(3))

Descarga de datos NYC TAXI Enero 2024
Descarga del archivo)
Progreso: 100.0%
Descarga completada

 Registros totales: 2,964,624
Rango de fechas: 2002-12-31 22:59:39 a 2024-02-01 00:01:15

Primeros registros:
   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         2  2024-01-01 00:57:55   2024-01-01 01:17:43              1.0   
1         1  2024-01-01 00:03:00   2024-01-01 00:09:36              1.0   
2         1  2024-01-01 00:17:06   2024-01-01 00:35:01              1.0   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           1.72         1.0                  N           186            79   
1           1.80         1.0                  N           140           236   
2           4.70         1.0                  N           236            79   

   payment_type  fare_amount  extra  mta_tax  tip_amount  tolls_amount  \
0             2         17.7    1.0      0.5        0.00           0.0   
1             1         10

In [None]:
# ============================================================
# Preparacion de los datos y simulacion de la mensajeria
# ============================================================
import time
from datetime import datetime
import json

print("\n" + "=" * 60)
print("Preparacion del Pipeline")
print("=" * 60)

# Muestra entre 10000 viajes
df_sample = df.head(10000).copy()

# Creacion del ID de vehículo simulado
df_sample['vehicle_id'] = df_sample['VendorID'].astype(str) + "_" + df_sample['PULocationID'].astype(str)

# Seleccio de las columnas relevantes
df_sample = df_sample[[
    'vehicle_id',
    'tpep_pickup_datetime',
    'tpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'trip_distance',
    'fare_amount',
    'passenger_count'
]].copy()

# Conversion del formato timestamp
df_sample['pickup_timestamp'] = df_sample['tpep_pickup_datetime'].astype(str)
df_sample['dropoff_timestamp'] = df_sample['tpep_dropoff_datetime'].astype(str)

print(f"Dataset preparado: {len(df_sample):,} viajes")
print(f"Vehiculos unicos: {df_sample['vehicle_id'].nunique()}")


Preparacion del Pipeline
Dataset preparado: 10,000 viajes
Vehiculos unicos: 193


In [None]:
# ============================================================
# Almacenes de datos
# ============================================================

# Almacen operacional ruta mas usada - Ultimo estado por vehiculo
operational_store = {}

# Almacén ANALÍTICO (Cold Path) - Todos los eventos históricos
analytical_store = []

print("\n" + "=" * 60)
print("Almacen inicializado")
print("=" * 60)
print("Operacional: Diccionario Python simula Redis")
print("Analítico: Lista Python que simula DuckDB")


Almacen inicializado
Operacional: Diccionario Python simula Redis
Analítico: Lista Python que simula DuckDB


In [None]:
# ============================================================
# Productor, consumo y procesamiento
# ============================================================

def process_trip_event(trip_data):
    """
    Procesa un evento de viaje y lo envía a ambos almacenes
    HOT PATH: Guarda solo el último estado del vehículo
    COLD PATH: Guarda todos los eventos históricos
    """
    vehicle_id = trip_data['vehicle_id']

    # HOT PATH: Actualizar último estado (simula Redis)
    operational_store[vehicle_id] = {
        'vehicle_id': vehicle_id,
        'last_pickup_time': trip_data['pickup_timestamp'],
        'last_dropoff_time': trip_data['dropoff_timestamp'],
        'last_pickup_location': int(trip_data['PULocationID']),
        'last_dropoff_location': int(trip_data['DOLocationID']),
        'last_trip_distance': float(trip_data['trip_distance']),
        'last_fare': float(trip_data['fare_amount']),
        'last_updated': datetime.now().isoformat()
    }

    # COLD PATH: Agregar a historial completo (simula DuckDB)
    analytical_store.append({
        'vehicle_id': vehicle_id,
        'pickup_time': trip_data['pickup_timestamp'],
        'dropoff_time': trip_data['dropoff_timestamp'],
        'pickup_location': int(trip_data['PULocationID']),
        'dropoff_location': int(trip_data['DOLocationID']),
        'trip_distance': float(trip_data['trip_distance']),
        'fare_amount': float(trip_data['fare_amount']),
        'passenger_count': int(trip_data['passenger_count'])
    })

print("\n" + "=" * 60)
print("Procesar los eventos de viajes")
print("=" * 60)
print("Simulacion de streaming de datos...\n")

# Procesar todos los viajes
start_time = time.time()
total_events = len(df_sample)

for idx, row in df_sample.iterrows():
    process_trip_event(row)

    # Mostrar el progreso cada 1000 eventos
    if (idx + 1) % 1000 == 0:
        elapsed = time.time() - start_time
        events_per_sec = (idx + 1) / elapsed
        print(f"Procesados: {idx + 1:,}/{total_events:,} eventos "
              f"({events_per_sec:.0f} eventos/seg)")

elapsed_total = time.time() - start_time
print(f"\n Procesamiento completado")
print(f"Tiempo total: {elapsed_total:.2f} segundos")
print(f"Eventos procesados: {total_events:,}")
print(f"Hot Store: {len(operational_store)} vehículos")
print(f"Cold Store: {len(analytical_store):,} registros históricos")


Procesar los eventos de viajes
Simulacion de streaming de datos...

Procesados: 1,000/10,000 eventos (6876 eventos/seg)
Procesados: 2,000/10,000 eventos (7569 eventos/seg)
Procesados: 3,000/10,000 eventos (7941 eventos/seg)
Procesados: 4,000/10,000 eventos (8114 eventos/seg)
Procesados: 5,000/10,000 eventos (8387 eventos/seg)
Procesados: 6,000/10,000 eventos (8493 eventos/seg)
Procesados: 7,000/10,000 eventos (8390 eventos/seg)
Procesados: 8,000/10,000 eventos (8474 eventos/seg)
Procesados: 9,000/10,000 eventos (8467 eventos/seg)
Procesados: 10,000/10,000 eventos (8372 eventos/seg)

 Procesamiento completado
Tiempo total: 1.19 segundos
Eventos procesados: 10,000
Hot Store: 193 vehículos
Cold Store: 10,000 registros históricos


In [None]:
# ============================================================
# Consultas Operacionales Hot Path
# ============================================================
print("\n" + "=" * 60)
print("Consultas Operacionales Hot Path")
print("=" * 60)

# Consulta 1: ultimo estado de un vehiculo espefico
vehicle_example = list(operational_store.keys())[0]
last_state = operational_store[vehicle_example]

print(f"\n1️ 1. ¿Dónde está el vehículo '{vehicle_example}' ahora mismo?")
print(f"   Última ubicación (dropoff): Zona {last_state['last_dropoff_location']}")
print(f"   Última actualización: {last_state['last_updated']}")
print(f"   Última distancia recorrida: {last_state['last_trip_distance']:.2f} millas")
print(f"   Última tarifa: ${last_state['last_fare']:.2f}")

# Consulta 2: Estados de multiples vehiculos
print(f"\n 2. Estado de los primeros 5 vehículos:")
for i, (vid, state) in enumerate(list(operational_store.items())[:5]):
    print(f"   • Vehículo {vid}: Zona {state['last_dropoff_location']} "
          f"(${state['last_fare']:.2f})")

# Consulta 3: Vehículos por zona
vehicles_by_zone = {}
for vid, state in operational_store.items():
    zone = state['last_dropoff_location']
    if zone not in vehicles_by_zone:
        vehicles_by_zone[zone] = []
    vehicles_by_zone[zone].append(vid)

print(f"\n 3. Top 5 zonas con más vehículos activos:")
sorted_zones = sorted(vehicles_by_zone.items(), key=lambda x: len(x[1]), reverse=True)[:5]
for zone, vehicles in sorted_zones:
    print(f"  Zona {zone}: {len(vehicles)} vehículos")


Consultas Operacionales Hot Path

1️ 1. ¿Dónde está el vehículo '2_186' ahora mismo?
   Última ubicación (dropoff): Zona 137
   Última actualización: 2025-11-25T22:53:16.243063
   Última distancia recorrida: 1.23 millas
   Última tarifa: $12.10

 2. Estado de los primeros 5 vehículos:
   • Vehículo 2_186: Zona 137 ($12.10)
   • Vehículo 1_140: Zona 141 ($7.90)
   • Vehículo 1_236: Zona 238 ($10.70)
   • Vehículo 1_79: Zona 107 ($10.00)
   • Vehículo 1_211: Zona 162 ($24.00)

 3. Top 5 zonas con más vehículos activos:
  Zona 107: 6 vehículos
  Zona 41: 6 vehículos
  Zona 79: 6 vehículos
  Zona 141: 5 vehículos
  Zona 236: 5 vehículos


In [None]:
# ============================================================
# CELDA 7: CONSULTAS ANALÍTICAS (COLD PATH)
# Consultas analiticas Cold Path
# ============================================================
print("\n" + "=" * 60)
print("CONSULTAS ANALÍTICAS Cold Path")
print("=" * 60)

# Convertir a DataFrame para análisis
df_analytics = pd.DataFrame(analytical_store)
df_analytics['pickup_time'] = pd.to_datetime(df_analytics['pickup_time'])
df_analytics['dropoff_time'] = pd.to_datetime(df_analytics['dropoff_time'])
df_analytics['trip_duration'] = (df_analytics['dropoff_time'] - df_analytics['pickup_time']).dt.total_seconds() / 60

# Consulta 1: Estadisticas generales
print(f"\n 1. Estadísticas generales del periodo:")
print(f"   Total de viajes: {len(df_analytics):,}")
print(f"   Distancia promedio: {df_analytics['trip_distance'].mean():.2f} millas")
print(f"   Tarifa promedio: ${df_analytics['fare_amount'].mean():.2f}")
print(f"   Duración promedio: {df_analytics['trip_duration'].mean():.1f} minutos")
print(f"   Pasajeros promedio: {df_analytics['passenger_count'].mean():.1f}")

# Consulta 2: Viajes por zona de origen (Top 10)
print(f"\n 2. Top 10 zonas de origen con más viajes:")
top_origins = df_analytics['pickup_location'].value_counts().head(10)
for zone, count in top_origins.items():
    percent = (count / len(df_analytics)) * 100
    print(f"   Zona {zone}: {count:,} viajes ({percent:.1f}%)")

# Consulta 3: Análisis por hora del día
df_analytics['hour'] = df_analytics['pickup_time'].dt.hour
hourly_stats = df_analytics.groupby('hour').agg({
    'vehicle_id': 'count',
    'fare_amount': 'mean',
    'trip_distance': 'mean'
}).round(2)
hourly_stats.columns = ['viajes', 'tarifa_promedio', 'distancia_promedio']

print(f"\n 3. Análisis por hora del día:")
print(hourly_stats.head(10))

# Consulta 4: Viajes de larga distancia
long_trips = df_analytics[df_analytics['trip_distance'] > 10].sort_values('trip_distance', ascending=False).head(5)
print(f"\n 4. Top 5 viajes más largos:")
for idx, trip in long_trips.iterrows():
    print(f"   {trip['trip_distance']:.2f} millas - "
          f"${trip['fare_amount']:.2f} - "
          f"{trip['trip_duration']:.1f} min")

# Consulta 5: Distribución de pasajeros
print(f"\n 5. Distribución de pasajeros:")
passenger_dist = df_analytics['passenger_count'].value_counts().sort_index()
for passengers, count in passenger_dist.items():
    percent = (count / len(df_analytics)) * 100
    print(f"   {int(passengers)} pasajero(s): {count:,} viajes ({percent:.1f}%)")


CONSULTAS ANALÍTICAS Cold Path

 1. Estadísticas generales del periodo:
   Total de viajes: 10,000
   Distancia promedio: 2.85 millas
   Tarifa promedio: $17.92
   Duración promedio: 17.4 minutos
   Pasajeros promedio: 1.6

 2. Top 10 zonas de origen con más viajes:
   Zona 79: 559 viajes (5.6%)
   Zona 237: 428 viajes (4.3%)
   Zona 142: 417 viajes (4.2%)
   Zona 263: 376 viajes (3.8%)
   Zona 161: 370 viajes (3.7%)
   Zona 239: 366 viajes (3.7%)
   Zona 170: 355 viajes (3.5%)
   Zona 107: 345 viajes (3.5%)
   Zona 141: 320 viajes (3.2%)
   Zona 249: 314 viajes (3.1%)

 3. Análisis por hora del día:
      viajes  tarifa_promedio  distancia_promedio
hour                                             
0       5563            18.29                2.90
1       4423            17.45                2.77
2          4            20.68                3.71
23        10            14.41                2.60

 4. Top 5 viajes más largos:
   54.94 millas - $243.10 - 89.5 min
   41.20 millas - $-201.

In [None]:
# ============================================================
# Resumen Final y arquitectura
# ============================================================
print("\n" + "=" * 60)
print("RESUMEN DE ARQUITECTURA")
print("=" * 60)

print("""
 ARQUITECTURA IMPLEMENTADA:

1.CAPA DE INGESTA:
   • Dataset: NYC Taxi (Parquet) - 10,000 viajes
   • Procesamiento: Streaming simulado evento por evento

2.CAPA DE PROCESAMIENTO:
   • Pipeline dual: Hot Path + Cold Path simultáneos
   • Cada evento se procesa para ambos casos de uso

3️.ALMACÉN OPERACIONAL (Hot Path):
   • Tecnología simulada: Redis (dict Python)
   • Función: Último estado de cada vehículo
   • Casos de uso:
     - ¿Dónde está el vehículo X ahora?
     - Estado actual de la flota
     - Consultas de baja latencia

4️.ALMACÉN ANALÍTICO (Cold Path):
   • Tecnología simulada: DuckDB (DataFrame)
   • Función: Historial completo de eventos
   • Casos de uso:
     - Análisis de tendencias
     - Reportes históricos
     - Agregaciones complejas

 MÉTRICAS:
   • Vehículos rastreados: """ + f"{len(operational_store)}" + """
   • Eventos históricos: """ + f"{len(analytical_store):,}" + """
   • Velocidad de procesamiento: """ + f"{total_events/elapsed_total:.0f} eventos/seg" + """

 REQUISITOS CUMPLIDOS:
   Ingesta de datos (streaming simulado)
   Procesamiento dual path
   Almacén operacional (estado actual)
   Almacén analítico (histórico)
   Consultas de demostración
""")

print("=" * 60)
print("PIPELINE FUNCIONAL COMPLETO")
print("=" * 60)


RESUMEN DE ARQUITECTURA

 ARQUITECTURA IMPLEMENTADA:

1.CAPA DE INGESTA:
   • Dataset: NYC Taxi (Parquet) - 10,000 viajes
   • Procesamiento: Streaming simulado evento por evento

2.CAPA DE PROCESAMIENTO:
   • Pipeline dual: Hot Path + Cold Path simultáneos
   • Cada evento se procesa para ambos casos de uso

3️.ALMACÉN OPERACIONAL (Hot Path):
   • Tecnología simulada: Redis (dict Python)
   • Función: Último estado de cada vehículo
   • Casos de uso: 
     - ¿Dónde está el vehículo X ahora?
     - Estado actual de la flota
     - Consultas de baja latencia

4️.ALMACÉN ANALÍTICO (Cold Path):
   • Tecnología simulada: DuckDB (DataFrame)
   • Función: Historial completo de eventos
   • Casos de uso:
     - Análisis de tendencias
     - Reportes históricos
     - Agregaciones complejas

 MÉTRICAS:
   • Vehículos rastreados: 193
   • Eventos históricos: 10,000
   • Velocidad de procesamiento: 8370 eventos/seg

 REQUISITOS CUMPLIDOS:
   Ingesta de datos (streaming simulado)
   Procesamient