## Proyecto final

Índice
1. Caracterización de la población
2. Recolección de datos
3. Construcción de la muestra considerando el volumen de datos de la población
4. Estrategia(s) de muestreo a emplear
5. Preparación de conjuntos de entrenamiento y prueba 
6. Selección de métricas para medir calidad de resultados
7. Selección de algoritmos de aprendizaje
8. Técnicas para el ajuste de hiper – parámetros
9. Mostrar los resultados obtenidos (matrices de confusión, curvas ROC, etc.)

### Pre-configuración

* Carga de la sesión de PySpark.
* Carga del CSV.

In [32]:
# Importa las librerías necesarias
from pyspark.sql import SparkSession

# Crea una sesión de Spark
spark = SparkSession.builder.appName("YellowTripDataAnalysis").config("spark.driver.memory", "22g").getOrCreate()

# Leemos el archivo CSV de datos de viajes amarillos
df = spark.read.option("header", True).option("inferSchema", True).csv("../data/yellow_tripdata/yellow_tripdata_2015-01.csv")

25/06/21 12:27:07 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:27:09 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:27:09 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
                                                                                

In [33]:
# Muestra el esquema del DataFrame
df.describe().show()

25/06/21 12:27:16 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:27:17 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:27:35 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:27:37 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+------------------+-------------------+---------------------+------------------+
|summary|          VendorID|   passenger_count|     trip_distance|   pickup_longitude|   pickup_latitude|        RateCodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|      payment_type|       fare_amount|              extra|             mta_tax|        tip_amount|       tolls_amount|improvement_surcharge|      total_amount|
+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+------------------+-------------------+-------------------

### 1. Caracterízación de la poblacón

* Obtenemos el número original de filas y columnas (como informativo para comparar al final).
* Generamos un diccionario con los factores de caracterización (previo estudio de las características detallado en la actividad 3).
* Se eliminan filas con valores nulos.
* Se crea la columna trip_duration (muy útil en vez de hora de recojida y hora de descenso).
* Se extraen las horas de recojida y horas de descenso.
* Se crean las columnas de los datos binned.
* Se vuelven a eliminar valores nulos.
* Se convierten las columnas binnes a tipo int.
* Se eliminan los outliers de las columnas numéricas.
* Se calculan las iteraciones de los valores binned.
* Se extraen las claves y valores del diccionario a listas.
* Se recorre el DataFrame por cada combinación para garantizar que abarcamos el 100% de los datos, en la impresión podemos ver: "Proporción total: 1.0000".
* Establecemos los rangos de los binnes.
* Se crean listas para almacenar los rangos de cada columna.
* Se crean los rangos para cada columna.
* Se imprimen los rangos de cada columna para verificar los rangos binneds a buscar.

In [34]:
# Imprimimos las dimensiones del DataFrame
original_num_rows = df.count()
original_num_cols = len(df.columns)

print(f"Número de filas originales: {original_num_rows}")
print(f"Número de columnas originales: {original_num_cols}")

25/06/21 12:27:39 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:27:42 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
[Stage 881:(955 + 16) / 1040][Stage 884:>  (0 + 0) / 1][Stage 885:(0 + 0) / 1040]

Número de filas originales: 12748986
Número de columnas originales: 19


                                                                                

In [35]:
# Generamos un diccionario con los factores de caracterización
charact_factor = {
    "passenger_count": 1e10,
    "trip_distance": 2.5,
    "payment_type": 3,
    "total_amount": 3,
    "tpep_pickup_datetime": 1e10,
    "tpep_dropoff_datetime": 1e10,
}

# Obtenemos una muestra del 5% de los datos originales
df_original_sample = df.select(list(charact_factor.keys())).sample(False, 0.05, seed=42)

In [36]:
# Imprimimos las dimensiones del DataFrame de la muestra
original_05_num_rows = df_original_sample.count()
original_05_num_cols = len(df_original_sample.columns)

print(f"Número de filas originales: {original_num_rows}")
print(f"Número de columnas originales: {original_num_cols}")
print(f"Número de filas del 5% de los datos originales: {original_05_num_rows}")
print(f"Número de columnas del 5% de los datos originales: {original_05_num_cols}")

25/06/21 12:27:45 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:27:48 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
[Stage 888:(950 + 16) / 1040][Stage 891:>  (0 + 0) / 1][Stage 892:(0 + 0) / 1040]

Número de filas originales: 12748986
Número de columnas originales: 19
Número de filas del 5% de los datos originales: 637214
Número de columnas del 5% de los datos originales: 6


                                                                                

In [37]:
# Importa las librerías necesarias
from pyspark.sql.functions import col, hour, when, unix_timestamp
from pyspark.sql.types import IntegerType

# Copiamos el DataFrame original a otro DataFrame
df_sample = df_original_sample.select("*")

# Se eliminan las filas con valores nulos
df_sample = df_sample.dropna()

# Se crea la columna trip_duration
df_sample = df_sample.withColumn(
    "trip_duration",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60,
)

# Se agrega la columna trip_duration al diccionario charact_factor
charact_factor['trip_duration'] = 2

# Se extraen las horas de las columnas tpep_pickup_datetime y tpep_dropoff_datetime
df_sample = df_sample.withColumn("tpep_pickup_datetime", hour("tpep_pickup_datetime"))
df_sample = df_sample.withColumn("tpep_dropoff_datetime", hour("tpep_dropoff_datetime"))

# Se eliminan los valores negativos y ceros de las columnas trip_distance, 
# total_amount, passenger_count y trip_duration
for colname in ["trip_distance", "total_amount", "passenger_count", "trip_duration"]:
    df_sample = df_sample.filter(col(colname) > 0)

# Se crea la columna trip_distance_bind
df_sample = df_sample.withColumn(
    "trip_distance_bind",
    when(col("trip_distance") <= 3.0, 1)
    .when((col("trip_distance") > 3.0) & (col("trip_distance") <= 10.0), 2)
    .when(col("trip_distance") > 10.0, 3)
)

# Se crea la columna total_amount_bind
df_sample = df_sample.withColumn(
    "total_amount_bind",
    when(col("total_amount") <= 10.0, 1)
    .when((col("total_amount") > 10.0) & (col("total_amount") <= 20.0), 2)
    .when(col("total_amount") > 20.0, 3)
)

# Se crea la columna passenger_count_bind
df_sample = df_sample.withColumn(
    "passenger_count_bind",
    when(col("passenger_count") <= 2.0, 1)
    .when((col("passenger_count") > 2.0) & (col("passenger_count") <= 6.0), 2)
)

# Se crea la columna trip_duration_bind
df_sample = df_sample.withColumn(
    "trip_duration_bind",
    when(col("trip_duration") <= 10.0, 1)
    .when((col("trip_duration") > 10.0) & (col("trip_duration") <= 20.0), 2)
    .when(col("trip_duration") > 20.0, 3)
)

# Se crea la columna payment_type_bind
df_sample = df_sample.withColumn(
    "payment_type_bind",
    when(col("payment_type") == 1.0, 1)
    .when((col("payment_type") > 1.0) & (col("payment_type") <= 6.0), 2)
)

# Se vuelven a eliminar las filas con valores nulos
df_sample = df_sample.dropna()

# Se convierten las columnas binned a tipo Integer
for binned_col in [
    "trip_distance_bind",
    "trip_duration_bind",
    "total_amount_bind",
    "passenger_count_bind",
    "payment_type_bind",
]:
    df_sample = df_sample.withColumn(binned_col, col(binned_col).cast(IntegerType()))

# Se eliminan los outliers de las columnas numéricas
for col_name, factor in charact_factor.items():
    quantiles = df_sample.approxQuantile(col_name, [0.25, 0.75], 0.0)
    q1, q3 = quantiles
    iqr = q3 - q1
    lower = q1 - factor * iqr
    upper = q3 + factor * iqr
    df_sample = df_sample.filter((col(col_name) >= lower) & (col(col_name) <= upper))


25/06/21 12:27:50 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:27:56 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:27:58 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:28:04 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:28:07 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:28:17 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
25/06/21 12:28:21 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/06/21 12:32:27 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:32:35 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/06/21 12:32:38 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:32:38 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:32:50 WARN DAGScheduler: Broadcasting larg

In [38]:
# Imprimimos las dimensiones del DataFrame después de la limpieza
cleaned_05_num_rows = df_sample.count()
cleaned_05_num_cols = len(df_sample.columns)

print(f"Número de filas originales: {original_num_rows}")
print(f"Número de columnas originales: {original_num_cols}")
print(f"Número de filas del 5% de los datos originales: {original_05_num_rows}")
print(f"Número de columnas del 5% de los datos originales: {original_05_num_cols}")
print(f"Número de filas del 5%  después de la limpieza: {cleaned_05_num_rows}")
print(f"Número de columnas del 5% después de la limpieza: {cleaned_05_num_cols}")

25/06/21 12:33:26 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
25/06/21 12:33:30 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:33:49 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
[Stage 936:(910 + 16) / 1040][Stage 939:>  (0 + 0) / 1][Stage 940:(0 + 0) / 1040]

Número de filas originales: 12748986
Número de columnas originales: 19
Número de filas del 5% de los datos originales: 637214
Número de columnas del 5% de los datos originales: 6
Número de filas del 5%  después de la limpieza: 579787
Número de columnas del 5% después de la limpieza: 12


                                                                                0]

In [39]:
# Mostramos las primeras 5 filas del DataFrame final
df_sample.show(5)

25/06/21 12:33:51 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
[Stage 940:(1017 + 16) / 1040][Stage 942:>  (0 + 0) / 1][Stage 943:(0 + 0) / 1040]

+---------------+-------------+------------+------------+--------------------+---------------------+------------------+------------------+-----------------+--------------------+------------------+-----------------+
|passenger_count|trip_distance|payment_type|total_amount|tpep_pickup_datetime|tpep_dropoff_datetime|     trip_duration|trip_distance_bind|total_amount_bind|passenger_count_bind|trip_duration_bind|payment_type_bind|
+---------------+-------------+------------+------------+--------------------+---------------------+------------------+------------------+-----------------+--------------------+------------------+-----------------+
|              5|         2.83|           2|        14.3|                  19|                   19|15.333333333333334|                 1|                2|                   2|                 2|                2|
|              1|          3.0|           2|        12.8|                  14|                   14|13.116666666666667|                 1|  

                                                                                

In [40]:
# Importa las librerías necesarias
from itertools import product
from pyspark.sql.functions import col

# Iteraciones de los valores binned
iterations = {
    "total_amount_bind": [1, 2, 3],
    "trip_distance_bind": [1, 2, 3],
    "payment_type_bind": [1, 2],
    "passenger_count_bind": [1, 2]
}

# Número total de filas
total_rows = df_sample.count()

# Extracción de las claves y valores del diccionario
keys = list(iterations.keys())
values = list(iterations.values())

# Crear todas las combinaciones posibles de los valores
combinations = list(product(*values))

# Número de iteración
iter_num = 1

# Suma total de proporciones
total_proportion = 0

# Iterar sobre cada combinación
for comb in combinations:
    # Construir el filtro dinámicamente
    condition = (col(keys[0]) == comb[0])
    for i in range(1, len(keys)):
        condition &= (col(keys[i]) == comb[i])
    
    # Aplicar el filtro
    filtered_rows = df_sample.filter(condition)
    
    # Contar y calcular proporción
    count = filtered_rows.count()
    proportion = count / total_rows if total_rows != 0 else 0

    print(f"Combinación: {comb} / iteración: {iter_num}, Proporción: {proportion:.4f}")
    
    iter_num += 1
    total_proportion += proportion

# Mostrar proporción total
print(f"Proporción total: {total_proportion:.4f}")


25/06/21 12:33:53 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:33:53 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:33:58 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:33:58 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:34:02 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:34:04 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:34:10 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/06/21 12:34:13 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


Combinación: (1, 1, 1, 1) / iteración: 1, Proporción: 0.1930


25/06/21 12:34:19 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:34:24 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
[Stage 964:(953 + 16) / 1040][Stage 967:>  (0 + 0) / 1][Stage 968:(0 + 0) / 1040]

Combinación: (1, 1, 1, 2) / iteración: 2, Proporción: 0.0327


25/06/21 12:34:26 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:34:32 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


Combinación: (1, 1, 2, 1) / iteración: 3, Proporción: 0.1954


25/06/21 12:34:38 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


Combinación: (1, 1, 2, 2) / iteración: 4, Proporción: 0.0362


25/06/21 12:34:45 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB


Combinación: (1, 2, 1, 1) / iteración: 5, Proporción: 0.0000


                                                                                

Combinación: (1, 2, 1, 2) / iteración: 6, Proporción: 0.0000


25/06/21 12:34:54 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/06/21 12:35:06 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB


Combinación: (1, 2, 2, 1) / iteración: 7, Proporción: 0.0001


25/06/21 12:35:24 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB


Combinación: (1, 2, 2, 2) / iteración: 8, Proporción: 0.0000


                                                                                

Combinación: (1, 3, 1, 1) / iteración: 9, Proporción: 0.0000


25/06/21 12:35:45 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB


Combinación: (1, 3, 1, 2) / iteración: 10, Proporción: 0.0000


                                                                                

Combinación: (1, 3, 2, 1) / iteración: 11, Proporción: 0.0000


                                                                                

Combinación: (1, 3, 2, 2) / iteración: 12, Proporción: 0.0000


25/06/21 12:36:09 WARN DAGScheduler: Broadcasting large task binary with size 4.4 MiB
                                                                                

Combinación: (2, 1, 1, 1) / iteración: 13, Proporción: 0.2218


                                                                                

Combinación: (2, 1, 1, 2) / iteración: 14, Proporción: 0.0384


25/06/21 12:36:36 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:36:40 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


Combinación: (2, 1, 2, 1) / iteración: 15, Proporción: 0.0778


                                                                                

Combinación: (2, 1, 2, 2) / iteración: 16, Proporción: 0.0154


                                                                                

Combinación: (2, 2, 1, 1) / iteración: 17, Proporción: 0.0463


                                                                                

Combinación: (2, 2, 1, 2) / iteración: 18, Proporción: 0.0084


                                                                                

Combinación: (2, 2, 2, 1) / iteración: 19, Proporción: 0.0366


                                                                                

Combinación: (2, 2, 2, 2) / iteración: 20, Proporción: 0.0074


                                                                                

Combinación: (2, 3, 1, 1) / iteración: 21, Proporción: 0.0000


                                                                                

Combinación: (2, 3, 1, 2) / iteración: 22, Proporción: 0.0000


                                                                                

Combinación: (2, 3, 2, 1) / iteración: 23, Proporción: 0.0000


                                                                                

Combinación: (2, 3, 2, 2) / iteración: 24, Proporción: 0.0000


                                                                                

Combinación: (3, 1, 1, 1) / iteración: 25, Proporción: 0.0038


                                                                                

Combinación: (3, 1, 1, 2) / iteración: 26, Proporción: 0.0006


                                                                                

Combinación: (3, 1, 2, 1) / iteración: 27, Proporción: 0.0001


                                                                                

Combinación: (3, 1, 2, 2) / iteración: 28, Proporción: 0.0000


                                                                                

Combinación: (3, 2, 1, 1) / iteración: 29, Proporción: 0.0586


                                                                                

Combinación: (3, 2, 1, 2) / iteración: 30, Proporción: 0.0102


                                                                                

Combinación: (3, 2, 2, 1) / iteración: 31, Proporción: 0.0141


                                                                                

Combinación: (3, 2, 2, 2) / iteración: 32, Proporción: 0.0029


                                                                                

Combinación: (3, 3, 1, 1) / iteración: 33, Proporción: 0.0000


                                                                                

Combinación: (3, 3, 1, 2) / iteración: 34, Proporción: 0.0000


                                                                                

Combinación: (3, 3, 2, 1) / iteración: 35, Proporción: 0.0000


[Stage 1087:>                                                     (0 + 16) / 16]

Combinación: (3, 3, 2, 2) / iteración: 36, Proporción: 0.0000
Proporción total: 1.0000


                                                                                

In [41]:
# Establece los rangos para las columnas
total_amount_bind = [0.0, 10.0, 20.0, 100.0]
trip_distance_bind = [0.0, 3.0, 10.0, 100.0]
payment_type_bind = [0.0, 1.0, 6.0]
passenger_count_bind = [0.0, 2.0, 6.0]

# Crea listas para almacenar los rangos de cada columna
total_amount_bind_ranges = []
trip_distance_bind_ranges = []
payment_type_bind_ranges = []
passenger_count_bind_ranges = []

# Crea los rangos para cada columna
for i in range(len(total_amount_bind) - 1):
    total_amount_bind_ranges.append((total_amount_bind[i], total_amount_bind[i + 1]))
    
for i in range(len(trip_distance_bind) - 1):
    trip_distance_bind_ranges.append((trip_distance_bind[i], trip_distance_bind[i + 1]))
    
for i in range(len(payment_type_bind) - 1):
    payment_type_bind_ranges.append((payment_type_bind[i], payment_type_bind[i + 1]))
    
for i in range(len(passenger_count_bind) - 1):
    passenger_count_bind_ranges.append((passenger_count_bind[i], passenger_count_bind[i + 1]))

# Imprime los rangos de cada columna
print("Total amount bind ranges: ", total_amount_bind_ranges)
print("Trip distance bind ranges: ", trip_distance_bind_ranges)
print("Payment type bind ranges: ", payment_type_bind_ranges)
print("Passenger count bind ranges: ", passenger_count_bind_ranges)

# Crea todas las combinaciones posibles de los rangos
combinations = list(product(total_amount_bind_ranges, trip_distance_bind_ranges, payment_type_bind_ranges, passenger_count_bind_ranges))

# Imprime las combinaciones
print("Combinations: ", combinations)

Total amount bind ranges:  [(0.0, 10.0), (10.0, 20.0), (20.0, 100.0)]
Trip distance bind ranges:  [(0.0, 3.0), (3.0, 10.0), (10.0, 100.0)]
Payment type bind ranges:  [(0.0, 1.0), (1.0, 6.0)]
Passenger count bind ranges:  [(0.0, 2.0), (2.0, 6.0)]
Combinations:  [((0.0, 10.0), (0.0, 3.0), (0.0, 1.0), (0.0, 2.0)), ((0.0, 10.0), (0.0, 3.0), (0.0, 1.0), (2.0, 6.0)), ((0.0, 10.0), (0.0, 3.0), (1.0, 6.0), (0.0, 2.0)), ((0.0, 10.0), (0.0, 3.0), (1.0, 6.0), (2.0, 6.0)), ((0.0, 10.0), (3.0, 10.0), (0.0, 1.0), (0.0, 2.0)), ((0.0, 10.0), (3.0, 10.0), (0.0, 1.0), (2.0, 6.0)), ((0.0, 10.0), (3.0, 10.0), (1.0, 6.0), (0.0, 2.0)), ((0.0, 10.0), (3.0, 10.0), (1.0, 6.0), (2.0, 6.0)), ((0.0, 10.0), (10.0, 100.0), (0.0, 1.0), (0.0, 2.0)), ((0.0, 10.0), (10.0, 100.0), (0.0, 1.0), (2.0, 6.0)), ((0.0, 10.0), (10.0, 100.0), (1.0, 6.0), (0.0, 2.0)), ((0.0, 10.0), (10.0, 100.0), (1.0, 6.0), (2.0, 6.0)), ((10.0, 20.0), (0.0, 3.0), (0.0, 1.0), (0.0, 2.0)), ((10.0, 20.0), (0.0, 3.0), (0.0, 1.0), (2.0, 6.0)), ((10.0

### 2. Recolección de datos

* Se recorre el DataFrame en base a todas las combinaciones de caracterización obtenidas.
* De cada combinación, si representa más del 5% de datos, los datos que representa la combinación son agregados al nuevo DataFrame.
* Nuevamente se crea la columna trip_duration y se extrae la hora de recogida y descenso.

In [42]:
# Conteo de filas del DataFrame original
total_rows = df.count()

# Variables para almacenar los resultados
subsets = []
iter_num = 1
total_proportion = 0
total_proportion_in_new_df = 0
df_accumulated = None

# Iterar sobre cada combinación
for comb in combinations:
    filtered_rows = df.filter(
        (col("total_amount") > comb[0][0]) & (col("total_amount") <= comb[0][1]) &
        (col("trip_distance") > comb[1][0]) & (col("trip_distance") <= comb[1][1]) &
        (col("payment_type") > comb[2][0]) & (col("payment_type") <= comb[2][1]) &
        (col("passenger_count") > comb[3][0]) & (col("passenger_count") <= comb[3][1])
    )
    subsets.append(filtered_rows)
    
    # Calcular la proporción de filas filtradas respecto al total
    prop = filtered_rows.count() / total_rows
    total_proportion += prop
    print(f"Porcentaje: {total_proportion:.2f} / combinación: {comb} / iteración: {iter_num} / proporción: {prop:.4f}")
    iter_num += 1

    # Si la proporción es mayor que 0.05, tomar el 40% de las filas
    if prop > 0.05:

        # Tomar el 40% de las filas filtradas
        filtered_rows_fraction = filtered_rows.sample(withReplacement=False, fraction=0.4, seed=42)

        if df_accumulated is None:
            df_accumulated = filtered_rows_fraction
        else:
            df_accumulated = df_accumulated.union(filtered_rows_fraction)

        # Guarda la proporción de filas filtradas respecto al total
        total_proportion_in_new_df += filtered_rows.count() / total_rows

        print("Combination accepted")
    
# Imprime resultados
print(f"Proporción total: {total_proportion:.4f}")
print(f"Proporción total en el nuevo DataFrame: {total_proportion_in_new_df:.4f}")

                                                                                

Porcentaje: 0.18 / combinación: ((0.0, 10.0), (0.0, 3.0), (0.0, 1.0), (0.0, 2.0)) / iteración: 1 / proporción: 0.1753


                                                                                

Combination accepted


                                                                                

Porcentaje: 0.21 / combinación: ((0.0, 10.0), (0.0, 3.0), (0.0, 1.0), (2.0, 6.0)) / iteración: 2 / proporción: 0.0298


                                                                                

Porcentaje: 0.38 / combinación: ((0.0, 10.0), (0.0, 3.0), (1.0, 6.0), (0.0, 2.0)) / iteración: 3 / proporción: 0.1786


                                                                                

Combination accepted


                                                                                

Porcentaje: 0.42 / combinación: ((0.0, 10.0), (0.0, 3.0), (1.0, 6.0), (2.0, 6.0)) / iteración: 4 / proporción: 0.0330


                                                                                

Porcentaje: 0.42 / combinación: ((0.0, 10.0), (3.0, 10.0), (0.0, 1.0), (0.0, 2.0)) / iteración: 5 / proporción: 0.0000


                                                                                

Porcentaje: 0.42 / combinación: ((0.0, 10.0), (3.0, 10.0), (0.0, 1.0), (2.0, 6.0)) / iteración: 6 / proporción: 0.0000


                                                                                

Porcentaje: 0.42 / combinación: ((0.0, 10.0), (3.0, 10.0), (1.0, 6.0), (0.0, 2.0)) / iteración: 7 / proporción: 0.0002


                                                                                

Porcentaje: 0.42 / combinación: ((0.0, 10.0), (3.0, 10.0), (1.0, 6.0), (2.0, 6.0)) / iteración: 8 / proporción: 0.0000


                                                                                

Porcentaje: 0.42 / combinación: ((0.0, 10.0), (10.0, 100.0), (0.0, 1.0), (0.0, 2.0)) / iteración: 9 / proporción: 0.0000


                                                                                

Porcentaje: 0.42 / combinación: ((0.0, 10.0), (10.0, 100.0), (0.0, 1.0), (2.0, 6.0)) / iteración: 10 / proporción: 0.0000


                                                                                

Porcentaje: 0.42 / combinación: ((0.0, 10.0), (10.0, 100.0), (1.0, 6.0), (0.0, 2.0)) / iteración: 11 / proporción: 0.0001


                                                                                

Porcentaje: 0.42 / combinación: ((0.0, 10.0), (10.0, 100.0), (1.0, 6.0), (2.0, 6.0)) / iteración: 12 / proporción: 0.0000


                                                                                

Porcentaje: 0.62 / combinación: ((10.0, 20.0), (0.0, 3.0), (0.0, 1.0), (0.0, 2.0)) / iteración: 13 / proporción: 0.2025


                                                                                

Combination accepted


                                                                                

Porcentaje: 0.65 / combinación: ((10.0, 20.0), (0.0, 3.0), (0.0, 1.0), (2.0, 6.0)) / iteración: 14 / proporción: 0.0346


                                                                                

Porcentaje: 0.72 / combinación: ((10.0, 20.0), (0.0, 3.0), (1.0, 6.0), (0.0, 2.0)) / iteración: 15 / proporción: 0.0707


                                                                                

Combination accepted


                                                                                

Porcentaje: 0.74 / combinación: ((10.0, 20.0), (0.0, 3.0), (1.0, 6.0), (2.0, 6.0)) / iteración: 16 / proporción: 0.0139


                                                                                

Porcentaje: 0.78 / combinación: ((10.0, 20.0), (3.0, 10.0), (0.0, 1.0), (0.0, 2.0)) / iteración: 17 / proporción: 0.0425


                                                                                

Porcentaje: 0.79 / combinación: ((10.0, 20.0), (3.0, 10.0), (0.0, 1.0), (2.0, 6.0)) / iteración: 18 / proporción: 0.0078


                                                                                

Porcentaje: 0.82 / combinación: ((10.0, 20.0), (3.0, 10.0), (1.0, 6.0), (0.0, 2.0)) / iteración: 19 / proporción: 0.0333


                                                                                

Porcentaje: 0.83 / combinación: ((10.0, 20.0), (3.0, 10.0), (1.0, 6.0), (2.0, 6.0)) / iteración: 20 / proporción: 0.0068


                                                                                

Porcentaje: 0.83 / combinación: ((10.0, 20.0), (10.0, 100.0), (0.0, 1.0), (0.0, 2.0)) / iteración: 21 / proporción: 0.0000


                                                                                

Porcentaje: 0.83 / combinación: ((10.0, 20.0), (10.0, 100.0), (0.0, 1.0), (2.0, 6.0)) / iteración: 22 / proporción: 0.0000


                                                                                

Porcentaje: 0.83 / combinación: ((10.0, 20.0), (10.0, 100.0), (1.0, 6.0), (0.0, 2.0)) / iteración: 23 / proporción: 0.0000


                                                                                

Porcentaje: 0.83 / combinación: ((10.0, 20.0), (10.0, 100.0), (1.0, 6.0), (2.0, 6.0)) / iteración: 24 / proporción: 0.0000


                                                                                

Porcentaje: 0.83 / combinación: ((20.0, 100.0), (0.0, 3.0), (0.0, 1.0), (0.0, 2.0)) / iteración: 25 / proporción: 0.0044


                                                                                

Porcentaje: 0.83 / combinación: ((20.0, 100.0), (0.0, 3.0), (0.0, 1.0), (2.0, 6.0)) / iteración: 26 / proporción: 0.0007


                                                                                

Porcentaje: 0.83 / combinación: ((20.0, 100.0), (0.0, 3.0), (1.0, 6.0), (0.0, 2.0)) / iteración: 27 / proporción: 0.0006


                                                                                

Porcentaje: 0.83 / combinación: ((20.0, 100.0), (0.0, 3.0), (1.0, 6.0), (2.0, 6.0)) / iteración: 28 / proporción: 0.0001


                                                                                

Porcentaje: 0.91 / combinación: ((20.0, 100.0), (3.0, 10.0), (0.0, 1.0), (0.0, 2.0)) / iteración: 29 / proporción: 0.0740


                                                                                

Combination accepted


                                                                                

Porcentaje: 0.92 / combinación: ((20.0, 100.0), (3.0, 10.0), (0.0, 1.0), (2.0, 6.0)) / iteración: 30 / proporción: 0.0129


                                                                                

Porcentaje: 0.94 / combinación: ((20.0, 100.0), (3.0, 10.0), (1.0, 6.0), (0.0, 2.0)) / iteración: 31 / proporción: 0.0209


                                                                                

Porcentaje: 0.95 / combinación: ((20.0, 100.0), (3.0, 10.0), (1.0, 6.0), (2.0, 6.0)) / iteración: 32 / proporción: 0.0042


                                                                                

Porcentaje: 0.97 / combinación: ((20.0, 100.0), (10.0, 100.0), (0.0, 1.0), (0.0, 2.0)) / iteración: 33 / proporción: 0.0259


                                                                                

Porcentaje: 0.98 / combinación: ((20.0, 100.0), (10.0, 100.0), (0.0, 1.0), (2.0, 6.0)) / iteración: 34 / proporción: 0.0046


                                                                                

Porcentaje: 0.99 / combinación: ((20.0, 100.0), (10.0, 100.0), (1.0, 6.0), (0.0, 2.0)) / iteración: 35 / proporción: 0.0121


[Stage 1213:>                                                     (0 + 16) / 16]

Porcentaje: 0.99 / combinación: ((20.0, 100.0), (10.0, 100.0), (1.0, 6.0), (2.0, 6.0)) / iteración: 36 / proporción: 0.0026
Proporción total: 0.9922
Proporción total en el nuevo DataFrame: 0.7011


                                                                                

In [43]:
# Se crea la columna trip_duration
df_accumulated = df_accumulated.withColumn(
    "trip_duration",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60,
)

# Se extraen las horas de las columnas tpep_pickup_datetime y tpep_dropoff_datetime
df_accumulated = df_accumulated.withColumn("tpep_pickup_datetime", hour("tpep_pickup_datetime"))
df_accumulated = df_accumulated.withColumn("tpep_dropoff_datetime", hour("tpep_dropoff_datetime"))

# Mostramos las primeras 5 filas del DataFrame acumulado
df_accumulated.show(5)

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RateCodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|    trip_duration|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----------------+
|       2|                  19|                   19|              1|         0.89

### 3. Construcción de la muestra considerando el volumen de datos de la población

* Se generan columnas de binning para cada columna de caracterización previamente seleccionada.
* Se crea un DataFrame con los stratum_counts.
* Se agrega al DataFrame de stratum_counts una columna con la proporción de cada estrato.
* Se define el tamaño de la muestra como el 10% del total de filas.
* Se calcula el tamaño de la muestra de cada estrato.
* Se hace el muestreo estratificado con sampleBy.

In [44]:
# Importa las librerías necesarias para el muestreo estratificado
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.functions import concat_ws, round

# Binning para total_amount
df_binned = df_accumulated.withColumn("total_amount_bin",
    when((df_accumulated.total_amount >= 0) & (df_accumulated.total_amount < 10), "0-10")
    .when((df_accumulated.total_amount >= 10) & (df_accumulated.total_amount < 20), "10-20")
    .when((df_accumulated.total_amount >= 20) & (df_accumulated.total_amount < 100), "20-100")
)

# Binning para trip_distance
df_binned = df_binned.withColumn("trip_distance_bin",
    when((df_accumulated.trip_distance >= 0) & (df_accumulated.trip_distance < 3), "0-3")
    .when((df_accumulated.trip_distance >= 3) & (df_accumulated.trip_distance < 10), "3-10")
    .when((df_accumulated.trip_distance >= 10) & (df_accumulated.trip_distance < 100), "10-100")
)

# Binning para payment_type
df_binned = df_binned.withColumn("payment_type_bin",
    when((df_accumulated.payment_type >= 0) & (df_accumulated.payment_type < 1), "0")
    .when((df_accumulated.payment_type >= 1) & (df_accumulated.payment_type < 6), "1-5")
)

# Binning para passenger_count
df_binned = df_binned.withColumn("passenger_count_bin",
    when((df_accumulated.passenger_count >= 0) & (df_accumulated.passenger_count < 2), "0-1")
    .when((df_accumulated.passenger_count >= 2) & (df_accumulated.passenger_count < 6), "2-5")
)

# Concatenación de las columnas binned para crear la columna stratum
df_binned = df_binned.withColumn("stratum", concat_ws("_", 
    "total_amount_bin", "trip_distance_bin", 
    "payment_type_bin", "passenger_count_bin"
))

# Conteo total de filas en el DataFrame binned
total_count = df_binned.count()

# Creación del DataFrame stratum_counts
stratum_counts = df_binned.groupBy("stratum").count()

# Se agrega al DataFrame stratum_counts una columna con la proporción de cada estrato
stratum_props = stratum_counts.withColumn("proportion", stratum_counts["count"] / total_count)


# Se define el tamaño de la muestra como el 10% del total de filas
sample_size = int(total_count * 0.10)

# Cálculo del tamaño de la muestra para cada estrato
stratum_sizes = stratum_props.withColumn(
    "sample_size", round(stratum_props["proportion"] * sample_size).cast("int")
)

# Observaciones por estrato en el DataFrame original P
stratum_count_dict = dict(stratum_sizes.select("stratum", "count").rdd.map(lambda r: (r[0], r[1])).collect())

# Observaciones por estrato en el DataFrame muestra M
stratum_sample_dict = dict(stratum_sizes.select("stratum", "sample_size").rdd.map(lambda r: (r[0], r[1])).collect())

# Cálculo de las proporciones de cada estrato
fractions = {k: stratum_sample_dict[k] / stratum_count_dict[k] for k in stratum_sample_dict}

# Se hace el muestreo estratificado
sampled_df = df_binned.sampleBy("stratum", fractions=fractions, seed=42)

# Total de M
m_total = sampled_df.count()

# Total sumado por grupos Mi
mi_total = sampled_df.groupBy("stratum").count().agg(spark_sum("count")).collect()[0][0]

print(f"Total de M: {m_total}")
print(f"Suma de los Mi: {mi_total}")




Total de M: 359016
Suma de los Mi: 359016


                                                                                

### 4. Estrategia(s) de muestreo a emplear

Para la construcción de la muestra M se empleó una estrategia de **muestreo estratificado** en donde se utilizaron las siguientes variables de caracterización (que fueron obtenidas en el punto número 1):

1. total_amount (binned en rangos de 0-10, 10-20 y 20-100)
2. trip_distance (binned en rangos de 0-3, 3-10 y 10-100)
3. payment_type (binned en rangos de 0 y 1-5)
4. passenger_count (binned en rangos de 0-1 y 2-5)

Cada combinación única de estas variables determina un **estrato (stratum)** y posteriormente se calcularon las proporciones relativas de cada estrato en la población total, generando una muestra que representa el 10% de la población manteniendo las proporciones determinadas por los estratos.

Esta técnica permite preservar la distribución original de los datos y evita una inyección de sesgos, de esta manera se asegura que los patrones relevantes se mantienen tanto en la muestra como en los conjuntos de entrenamiento y prueba.

### 5. Preparación de conjuntos de entrenamiento y prueba 

* Evaluación: Calcula y aplica correctamente el porcentaje de división para Tri y Tsi, asegurando que no existan sesgos, documentando detalladamente.

Aquí se van a generar los DataFrames de entrenamiento y prueba, recorriendo los estratos para extraer la proporción de cada uno de ellos.

- Se define manualmente la proporción deseada.
- Se inicia una lista para almacenar los DataFrames.
- Se obtienen los estratos únicos.
- Se itera sobre cada estrato único.
- Se filtra cada estrato Mi.
- Se obtiene el conjunto de entrenamiento y prueba del estrato Mi.
- Se agrega el conjunto a su lista respectiva.
- Se unen los DataFrames enlistados en un solo DataFrame de entrenamiento y prueba.

In [45]:
# Se define manualmente la proporcion de train y test
train_prop = 0.8
test_prop = 0.2

# Iniciamos una lista para almacenar los DataFrames de entrenamiento y prueba
train_dfs = []
test_dfs = []

# Obtenemos los estratos únicos
unique_strata = sampled_df.select("stratum").distinct().rdd.map(lambda r: r[0]).collect()

# Iteramos sobre cada estrato
for strata in unique_strata:
    # Filtramos el estrato Mi
    mi = sampled_df.filter(col("stratum") == strata)

    # Obtenemos el test y el train de cada estrato
    mi_train, mi_test = mi.randomSplit([train_prop, test_prop], seed=42)

    # Agregramos las partes a las listas correspondientes
    train_dfs.append(mi_train)
    test_dfs.append(mi_test)

# Unimos todos los DataFrames de entrenamiento y prueba
train_df = train_dfs[0]
for df in train_dfs[1:]:
    train_df = train_df.union(df)

test_df = test_dfs[0]
for df in test_dfs[1:]:
    test_df = test_df.union(df)

                                                                                

### 6. Selección de métricas para medir calidad de resultados

* Evaluación: Selecciona y argumenta detalladamente las métricas más adecuadas, considerando grandes volúmenes de datos y documentando exhaustivamente.

A continuación, se seleccionará y defenderá la métrica para evaluar el modelo.

- El objetivo es predecir una variable continua (total_amount), esto convierte al problema en un enfoque de regresión. La métrica seleccionada deberá ser robusta frente a outliers, ser eficiente y ofrecer una gran interpretabilidad.

- Se elige la métrica: RMSE (Root Mean Squared Error).

- Ventajas:

    - Penalización de errores grandes. Esta métrica penaliza con mayor rudeza los grandes errores, algo deseado en modelos financieros.
    - Ampliamente utilizado. La utilización y comprensión de esta métrica esta más que probada que es útil en problemas de regresión.
    - Unidad de medida. La unidad de error es la misma que la variable, por lo que su interpretabilidad es bastante elevada.

### 7. Selección de algoritmos de aprendizaje

En esta sección se realiza el entrenamiento del modelo de aprendizaje y se recorre una cuadrícula de parámetros para encontrar el mejor ajuste.

- Se definen las variables independientes.
- Se definen la variable dependiente.
- Se ensamblan las características en un vector con VectorAssembler.
- Se selecciona el modelo RandomForestRegressor por los siguientes motivos:
    - El objetivo actual es predecir una variable numérica continua, esto se traduce en un problema de regresión supervisada, algo que RandomForest es perfectamente capáz de realizar.
    - RandomForest tiene una buena robustez ante valores atípicos y datos ruidosos y por la cantidad de datos que se manejan, puede ser normal que existan dichos valores.
    - Tiene una buena capacidad para modelar relaciones no lineales a diferencia de la regresión lineal.
    - No requiere normalización ni escalado, lo que ahorra tiempo de procesamiento en dataset grandes.
    - Tiene una amplia capacidad para trabajar con muchas variables (si se requiriera) y puede modelar relaciones complejas.
    - Tiene facilidad para ser ajustado con validación cruzada y grid search, ajustando el número de arboles y su profundiad.

In [46]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Se definen la variables independientes
feature_cols = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "payment_type",
    "trip_duration",
]

# Se define la variable dependiente (columna objetivo)
target_col = "total_amount"

# Se ensamblan las características en un vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Se define el modelo
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol=target_col,
    seed=42
)

### 8. Técnicas para el ajuste de hiper – parámetros

- Se genera un pipeline con el ensamblador y el modelo para automatizar el proceso.
- Se define la cuadrícula de parámetros para la búsqueda del mejor modelo.
- se define el evaluador con la métrica RMSE que hemos determinado es la mejor en el paso anterior.
- Se configura el CrossValidator.
- Se entrena el modelo con validación cruzada.
- Se obtiene el mejor modelo de la validación cruzada.


In [47]:
# Se genera un pipeline con el ensamblador y el modelo
pipeline = Pipeline(stages=[assembler, rf])

# Se define la cuadrícula de parámetros para la búsqueda
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Se define el evaluador para la métrica de regresión
evaluator = RegressionEvaluator(
    labelCol=target_col,
    predictionCol="prediction",
    metricName="rmse"
)

# Se configura el CrossValidator
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2,
    seed=42
)

# Se entrena el modelo con validación cruzada
cv_model = cv.fit(train_df)

# Se obtiene el mejor modelo de la validación cruzada
best_model = cv_model.bestModel

# Evaluamos el mejor modelo en el conjunto de prueba
predictions = best_model.transform(test_df)

25/06/21 12:39:53 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
25/06/21 12:39:53 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
25/06/21 12:44:00 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:44:02 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:44:02 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:44:04 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:44:04 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:44:06 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:44:08 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:44:10 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:44:13 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/06/21 12:44:15 WARN DAGScheduler: Broadcasting larg

### 9. Mostrar los resultados obtenidos (matrices de confusión, curvas ROC, etc.)


- Se evalua el mejor modelo con el conjunto de prueba.
- Se imprimen los resultados de RMSE.

In [48]:
# Evaluamos el modelo
rmse = evaluator.evaluate(predictions)

# Imprimimos el RMSE del mejor modelo en el conjunto de prueba
print(f"Mejor modelo RMSE en test: {rmse:.4f}")

25/06/21 13:30:58 WARN DAGScheduler: Broadcasting large task binary with size 2041.7 KiB

Mejor modelo RMSE en test: 1.6823


25/06/21 13:35:02 WARN DAGScheduler: Broadcasting large task binary with size 2042.8 KiB
                                                                                