## PROYECTO CURSO PYSPARK

Autor: Martin Fierro

## Descripción General del Proyecto



### 1. Selección de la Fuente de Datos

- **Nombre del Dataset**: NYC Taxi Trips (Yellow, Green o FHV Trips).  
- **Proveedores**:  
  - [NYC Taxi & Limousine Commission (TLC)](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page), que publica mensualmente todos los viajes realizados.  
  - Copias alternativas en [Kaggle](https://www.kaggle.com/datasets), que a veces tienen los archivos consolidados en diferentes formatos.  
- **Volumen**: Se puede alcanzar fácilmente cientos de millones de registros si se descargan varios años de datos. Esto justifica el uso de técnicas de optimización y escalabilidad.

### 2. Objetivos de Negocio y Analíticos

1. **Análisis de Demanda de Taxis**:  
   - Identificar los horarios de mayor afluencia en distintas zonas de la ciudad.  
   - Calcular el volumen de viajes por día/semana/mes.  

2. **Análisis de Duración y Distancia de Viajes**:  
   - Estimar el tiempo promedio de recorrido entre distintas zonas, o el tiempo total de cada jornada.  
   - Detectar picos de duración (por congestión, eventos masivos, etc.).

3. **Costos y Propinas**:  
   - Relacionar montos de propinas con la hora del día, el día de la semana o la zona de recogida.  
   - Evaluar patrones de pagos en efectivo vs. tarjeta.

4. **Optimización de Consultas y Procesamiento**:  
   - Aplicar las técnicas de particionamiento (por fecha, zona o ambas).  
   - Usar Z-Ordering para columnas clave, como fecha/hora o zonas geográficas.  
   - Aplicar broadcast joins y manejo de skew con tablas auxiliares (por ejemplo, dimensiones de zonas o festivos).  
   - Caching y persistencia para exploraciones repetitivas.

## Librerias

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
import time

## Utils

In [3]:
def medir_tiempo_funcion(funcion):
  star_time = time.time()
  funcion()
  end_time = time.time()
  execution_time = end_time - star_time
  return execution_time

In [4]:
def medir_tiempo_repeticiones(funcion, repeticiones):
  star_time = time.time()
  for _ in range(repeticiones):
    funcion()
  end_time = time.time()
  execution_time = end_time - star_time
  return execution_time

## 1. Adquisición y Preparación de Datos

1. **Descarga de Datos**  
   - Obtener los archivos CSV o Parquet de uno o varios años de registros de taxi (p. ej. Yellow Taxi Trip Records).  
   - Verificar la disponibilidad de los campos (por ejemplo: pickup_datetime, dropoff_datetime, pickup_location, dropoff_location, passenger_count, trip_distance, total_amount, etc.).

2. **Limpieza y Estandarización**  
   - Eliminar filas con datos claramente inconsistentes (distancias negativas, tiempos de viaje nulos, etc.).  
   - Convertir tipos de columnas (fecha/hora, numéricos, etc.) y unificar formatos.  
   - Agregar columnas derivadas si se requiere (por ejemplo, “hora del día” de la recogida, “día de la semana”, “zona geográfica” unificada a partir de lat/long o “LocationID” si se usa la tabla de zonas).

3. **Enriquecimiento con Tablas Auxiliares**  
   - Incorporar la tabla “Taxi Zone Lookup” que relaciona cada LocationID con un barrio o zona (por ejemplo, Bronx, Brooklyn, Manhattan, etc.).  
   - (Opcional) Añadir información de clima o de eventos de la ciudad para un análisis predictivo más sofisticado.

In [5]:
# Crear una SparkSession
spark = SparkSession.builder \
    .appName("Leer archivo Parquet") \
    .getOrCreate()

In [6]:
LRF_codigos = spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True)

In [7]:
LRF_codigos = LRF_codigos.select("LocationID","Borough")

In [8]:
# Leer el archivo Parquet
enero = spark.read.parquet("enero.parquet")

# Opcional: Mostrar el esquema del DataFrame
enero.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [9]:
numero_filas = enero.count()
numero_filas

2964624

In [10]:
nulos_por_columna = enero.select(
    [
        sum(col(c).isNull().cast("int")).alias(c) for c in enero.columns
    ]
)

nulos_por_columna.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       0|                   0|                    0|         140162|            0|    140162|            140162|           0|           0|           0|          0|    0|      0|         

In [11]:
clean_df = enero.filter(col("trip_distance") > 0)
clean_df = enero.filter(col("passenger_count") > 0)
clean_df = enero.filter(col("total_amount") > 0)

In [12]:
clean_df.count()

2928704

In [22]:
clean_df= clean_df.withColumn("dia_pickup", dayofmonth(col("tpep_pickup_datetime"))) \
    .withColumn("mes_pickup", month(col("tpep_pickup_datetime"))) \
    .withColumn("hora_pickup", hour(col("tpep_pickup_datetime")))

In [24]:
clean_df = clean_df.withColumn("dia_semana_pickup", date_format("tpep_pickup_datetime", "EEEE"))

In [25]:
clean_df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+----------+-----------+----------+---------+-----------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|dia_pickup|mes_pickup|hora_pickup|LocationID|  Borough|dia_semana_pickup|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+----------+-----------+--------

In [15]:
clean_df = clean_df.join(LRF_codigos, clean_df["PULocationID"] == LRF_codigos["LocationID"], how="left")

## 2. Particionamiento y Clustering

1. **Diseño del Esquema de Particionamiento**  
   - Decidir si se particionará por **año/mes** de la fecha de recogida (pickup_datetime) o por zona geográfica, o incluso una combinación de ambas (aunque se debe evitar crear un exceso de particiones).  
   - Justificar la decisión en función de las consultas más frecuentes (por ejemplo, filtros por fecha o análisis por zonas).

2. **Implementación y Medición de Rendimiento**  
   - Crear un directorio (o tabla en Delta, si se usa un lago de datos) particionado por la columna elegida.  
   - Medir el tamaño total de los archivos y el número de archivos resultantes.  
   - Ejecutar algunas consultas comparativas antes y después del particionado, anotando el tiempo requerido.

3. **Liquid Clustering / Auto Optimize** (según la plataforma)  
   - Describir cómo se implementarían técnicas de reescritura dinámica para mantener los datos optimizados a medida que se realizan inserciones o actualizaciones (en caso de que se use un formato como Delta Lake).  
   - Evaluar la variación en el tamaño promedio de los archivos y la latencia de las consultas tras esta optimización.

In [16]:
clean_df.groupBy("mes_pickup").count().show()

+----------+-------+
|mes_pickup|  count|
+----------+-------+
|        12|     11|
|         1|2928690|
|         2|      3|
+----------+-------+



In [17]:
clean_df.groupBy("PULocationID").count().show()

+------------+-----+
|PULocationID|count|
+------------+-----+
|         148|27566|
|         243|  480|
|          31|    7|
|         137|32575|
|         251|    4|
|          85|  197|
|          65| 1371|
|         255| 1098|
|          53|   47|
|         133|  128|
|          78|  193|
|         108|  118|
|         155|  220|
|         211|19983|
|          34|   61|
|         193| 1653|
|         126|  105|
|         101|   55|
|          81|  127|
|          28|  409|
+------------+-----+
only showing top 20 rows



In [18]:
clean_df.groupBy("Borough").count().show()

+-------------+-------+
|      Borough|  count|
+-------------+-------+
|       Queens| 266645|
|          EWR|    283|
|      Unknown|  10224|
|     Brooklyn|  24969|
|Staten Island|     69|
|          N/A|   1575|
|    Manhattan|2618093|
|        Bronx|   6846|
+-------------+-------+



In [19]:
clean_df.groupBy("hora_pickup").count().show()

+-----------+------+
|hora_pickup| count|
+-----------+------+
|         12|162600|
|         22|141354|
|          1| 52780|
|         13|167898|
|          6| 40916|
|         16|187894|
|          3| 24275|
|         20|157992|
|          5| 18379|
|         19|181832|
|         15|187076|
|          9|127765|
|         17|203919|
|          4| 16330|
|          8|116212|
|         23|107641|
|          7| 82990|
|         10|137293|
|         21|158887|
|         11|148908|
+-----------+------+
only showing top 20 rows



## 3. Z-Order y Optimización de Lecturas

1. **Columnas Candidatas para Z-Order**  
   - `pickup_datetime` o la componente de fecha/hora que sea más usada.  
   - `pickup_location` o `dropoff_location`, si se filtra con frecuencia por zona.  

2. **Aplicación del Z-Ordering**  
   - Indicar cómo se ejecutaría la ordenación Z-Order en la tabla particionada (por ejemplo, usando comandos `OPTIMIZE` si se está en Databricks/Delta Lake).  
   - Comprobar el impacto en el número de archivos, la fragmentación, etc.

3. **Pruebas de Consultas**  
   - Realizar queries que filtren por fechas específicas o por zonas concretas.  
   - Comparar los tiempos de respuesta con y sin Z-Order.  
   - Documentar los resultados y analizar si la cardinalidad de las columnas era la adecuada para percibir una mejora clara.

---

### 4. Joins, Skew y Broadcast

1. **Manejo de Tablas Dimensionales**  
   - Integrar la tabla de zonas (dimensión pequeña) con la tabla de viajes (tabla grande).  
   - Ilustrar cómo se puede aplicar un **broadcast join** cuando la tabla de zonas es de menor tamaño.

2. **Detección y Manejo de Skew**  
   - Explorar si ciertos LocationID, fechas o rangos horarios concentran un volumen desproporcionado de viajes.  
   - Aplicar técnicas de salting o hints en caso de alta disparidad para evitar cuellos de botella en los shuffles.

3. **Medición de Mejoras**  
   - Comparar el plan de ejecución y los tiempos con broadcast join vs. un join normal.  
   - Analizar el DAG resultante (con `explain()` o Spark UI) y señalar la diferencia en la cantidad de stages o shuffle.

---

### 5. Caching y Persistencia

1. **Identificación de Consultas Frecuentes**  
   - Definir consultas que se repitan a lo largo del proyecto, por ejemplo, agregaciones por zona y hora punta, o correlación de distancias y propinas.  
   - Decidir qué DataFrame(s) vale la pena cachear (dependerá de la exploración analítica que se realice).

2. **Aplicación de Cache**  
   - Justificar el nivel de persistencia.  
   - Medir la mejora en tiempo de ejecución de las consultas sucesivas tras el cache.

3. **Liberación de Caché**  
   - Proponer condiciones o estrategias para liberar el cache cuando ya no se requiera, optimizando el uso de memoria.

---



## 6. Análisis de Alto Nivel y Visualizaciones

1. **Consultas de Negocio**  
   - Calcular indicadores como:  
     - Volumen de viajes promedio por día de la semana.  
     - Total de viajes y recaudación por zona de recogida.  
     - Evolución de la propina media por mes.  
   - Identificar patrones relevantes, picos de demanda o anomalías en la duración de los recorridos.

2. **Visualización**  
   - Generar gráficas (de barras, líneas o mapas interactivos) que ayuden a entender mejor la distribución geográfica y temporal de los viajes.  
   - Mostrar cómo las optimizaciones (particionamiento, Z-Order, etc.) permiten que el análisis sea más ágil incluso con un dataset de gran tamaño.

---



In [20]:
clean_df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+----------+-----------+----------+---------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|dia_pickup|mes_pickup|hora_pickup|LocationID|  Borough|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+----------+----------+-----------+----------+---------+
|       2| 2024-01-01 

In [37]:
viajes_promedio = clean_df.groupBy("dia_semana_pickup").agg(count('*').alias("viajes_promedio"))

In [36]:
total_viajes = clean_df.groupBy("Borough").agg(count('*').alias("total viajes"),count('total_amount').alias("total recaudacion"))
total_viajes.show()

+-------------+------------+-----------------+
|      Borough|total viajes|total recaudacion|
+-------------+------------+-----------------+
|       Queens|      266645|           266645|
|          EWR|         283|              283|
|      Unknown|       10224|            10224|
|     Brooklyn|       24969|            24969|
|Staten Island|          69|               69|
|          N/A|        1575|             1575|
|    Manhattan|     2618093|          2618093|
|        Bronx|        6846|             6846|
+-------------+------------+-----------------+



In [38]:
propina = clean_df.groupBy("mes_pickup").agg(avg('tip_amount').alias("propina media"))
propina.show()

+----------+------------------+
|mes_pickup|     propina media|
+----------+------------------+
|        12| 2.388181818181818|
|         1|3.3759141595735245|
|         2|2.0933333333333333|
+----------+------------------+



In [42]:
horas_pico = clean_df.groupBy("hora_pickup").agg(count('*').alias("total_viajes"))
horas_pico = horas_pico.orderBy("total_viajes", ascending=False)
horas_pico.show()

+-----------+------------+
|hora_pickup|total_viajes|
+-----------+------------+
|         18|      210308|
|         17|      203919|
|         16|      187894|
|         15|      187076|
|         19|      181832|
|         14|      180848|
|         13|      167898|
|         12|      162600|
|         21|      158887|
|         20|      157992|
|         11|      148908|
|         22|      141354|
|         10|      137293|
|          9|      127765|
|          8|      116212|
|         23|      107641|
|          7|       82990|
|          0|       77778|
|          1|       52780|
|          6|       40916|
+-----------+------------+
only showing top 20 rows



In [41]:
horas_pico_distancia = clean_df.groupBy("hora_pickup").agg(avg('trip_distance').alias("distancia media"))
horas_pico_distancia.show()

+-----------+------------------+
|hora_pickup|   distancia media|
+-----------+------------------+
|         12| 3.297698093480924|
|         22|  3.58507145181598|
|          1| 3.130948275862061|
|         13| 3.117276560769043|
|          6|12.982980252224083|
|         16|3.3483180410231133|
|          3|3.3264238928939114|
|         20|3.3116525520279745|
|          5| 8.736985146090621|
|         19|3.1091326609177736|
|         15|3.8971129915114684|
|          9|3.0005532814151024|
|         17|3.0107887445504926|
|          4| 4.565380281690132|
|          8| 5.465322341926821|
|         23|3.9124187809478097|
|          7| 6.026193758284102|
|         10| 3.279535154741996|
|         21|3.3958133768023853|
|         11|3.4306486555457236|
+-----------+------------------+
only showing top 20 rows



## 7. Medición de Tiempos y Plan de Ejecución (DAG)

1. **Comparativa de Escenarios**  
   - **Sin particionar** vs. **Particionado** vs. **Particionado + Z-Order**.  
   - **Join normal** vs. **Broadcast join**.  
   - **Sin cache** vs. **Con cache**.  
   - Resumir los resultados en tablas o gráficas, indicando la reducción de latencia o la diferencia en número de stages/shuffles.

2. **Spark UI (opcional) y `explain()`**  
   - Revisar el DAG para cada escenario y anotar los cambios en la estrategia de join, la cantidad de datos barajados (shuffle) o las tareas generadas.  
   - Describir las observaciones más destacadas (por ejemplo, reducción de etapas en broadcast join, mitigación de skew, etc.).

---

### 8. Conclusiones

1. **Conclusiones Principales**  
   - Indicar qué técnicas ofrecieron mayor beneficio en términos de latencia y escalabilidad.  
   - Destacar si hubo alguna sorpresa, como poca ganancia en Z-Order por baja cardinalidad, o una gran mejora en broadcast join.
