**MAESTRÍA EN INTELIGENCIA ARTIFICIAL APLICADA**

**Curso: TC4034.10 - Análisis de grandes volúmenes de datos**


**Tecnológico de Monterrey**

**Actividad 4 | Métricas de calidad de resultados**

**Alumno**

**Isaid Posadas Oropeza A01795015**


# 1. Construcción de la muestra M

En esta sección se construye la muestra representativa **M** a partir del dataset original recolectado (E-Commerce Behavior Data), aplicando estrategias que garanticen una representación balanceada y sin sesgos de la población **P**.

## 1.1 Objetivo

Generar una muestra **M** que refleje de manera equitativa los principales comportamientos de los usuarios (`view`, `cart`, `purchase`) y que esté compuesta por subconjuntos **Mi**, definidos a partir de variables de caracterización seleccionadas previamente.

## 1.2 Variables de caracterización seleccionadas

De acuerdo con el análisis previo en la Actividad 3, las siguientes variables permiten segmentar el comportamiento de los usuarios:

- **event_day**: Día de la semana en que ocurrió el evento.
- **event_type**: Tipo de evento registrado (view, cart, purchase).
- **category_code**: Categoría del producto.
- **event_hour**: Hora del evento.
- **brand**: Marca del producto.

Para efectos de esta actividad, se utilizará como variable principal de particionamiento el **event_day**, generando subconjuntos **Mi**, donde cada **Mi** contiene exclusivamente eventos ocurridos en un mismo día de la semana.

## 1.3 Construcción de la muestra balanceada M

1. Se realiza un **muestreo estratificado balanceado** por `event_type`, asegurando que `view`, `cart` y `purchase` estén equitativamente representados.
2. Se eliminan registros con valores nulos.
3. Se eliminan valores atípicos en la variable `price` utilizando los percentiles 1% y 99%.
4. Se convierten variables de tipo fecha (`event_time`) en variables derivadas (`event_date`, `event_hour`, `event_day`).
5. Se particiona la muestra M en subconjuntos **Mi**, uno por cada valor único de `event_day` (Lunes, Martes, etc.).

## 1.4 Importancia de las particiones Mi

Este particionamiento permite analizar el comportamiento de los usuarios en función del día de la semana, lo que facilita identificar patrones específicos (por ejemplo, mayor número de compras los fines de semana).

Además, garantiza que los modelos construidos posteriormente no se vean sesgados por concentraciones excesivas de datos en ciertos días, y puedan generalizar mejor a lo largo de toda la semana.

Cada partición **Mi** es mutuamente excluyente y la unión de todas ellas forma el conjunto completo **M**, cumpliendo con lo solicitado en la instrucción.


## Librerías necesarias

In [1]:
# Instalar Java (necesario para Spark)
!apt-get install openjdk-11-jdk -y

# Descargar e instalar Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz
!mv spark-3.3.2-bin-hadoop3 /opt/spark

# Configurar las variables de entorno
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark"


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jre
  x11-utils
Suggested packages:
  libxt-doc openjdk-11-demo openjdk-11-source visualvm mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java
  libatk-wrapper-java-jni libxt-dev libxtst6 libxxf86dga1 openjdk-11-jdk
  openjdk-11-jre x11-utils
0 upgraded, 10 newly installed, 0 to remove and 35 not upgraded.
Need to get 6,920 kB of archives.
After this operation, 16.9 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-core all 2.37-2build1 [1,041 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-extra all 2.37-2build1 [2,041 kB]
Get:3 http://archive.ubuntu.com/ubuntu jam

In [2]:
!pip install pyspark
!pip install findspark


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("Test").getOrCreate()
spark.range(5).show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



### Carga y filtrado de eventos relevantes

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os

os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 32g pyspark-shell"

from google.colab import drive
drive.mount('/content/drive')

# Crear la sesión de Spark

spark = SparkSession.builder \
    .appName("Actividad 4 - Spark estable en Colab Pro+") \
    .config("spark.driver.memory", "12g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

# Ruta del archivo
dataset_path = "/content/drive/MyDrive/Big Data/2019-Oct.csv"


# Cargar el dataset
df = spark.read.csv(dataset_path, header=True, inferSchema=True)

# Filtrar solo los eventos 'view', 'cart' y 'purchase'
partitions = df.filter(
    (col("event_type") == "view") |
    (col("event_type") == "cart") |
    (col("event_type") == "purchase")
)


Mounted at /content/drive


## Muestreo estratificado balanceado

In [4]:
# Contar eventos por tipo
event_type_counts = partitions.groupBy("event_type").count()
event_type_counts.show()

# Obtener el tamaño mínimo entre las clases para balancear
min_sample_size = min(row["count"] for row in event_type_counts.collect())

# Construir muestra balanceada
balanced_df = (
    partitions.filter(col("event_type") == "view").limit(min_sample_size)
    .union(partitions.filter(col("event_type") == "cart").limit(min_sample_size))
    .union(partitions.filter(col("event_type") == "purchase").limit(min_sample_size))
)

# Verificar conteo
print("Total de eventos balanceados:", balanced_df.count())
balanced_df.groupBy("event_type").count().show()


+----------+--------+
|event_type|   count|
+----------+--------+
|  purchase|  742849|
|      cart|  926516|
|      view|40779399|
+----------+--------+

Total de eventos balanceados: 2228547
+----------+------+
|event_type| count|
+----------+------+
|      view|742849|
|      cart|742849|
|  purchase|742849|
+----------+------+



## Limpieza de datos
Para garantizar que la muestra M no contenga ruido ni valores faltantes que afecten los modelos, se aplicó una limpieza de datos mediante la eliminación de registros con nulos y outliers extremos en la variable price. Además, se extrajeron atributos temporales a partir de event_time para utilizarse como criterios de particionamiento.

## Eliminación de valores nulos

In [5]:
# Eliminar registros con valores nulos
balanced_df = balanced_df.na.drop()

# Verificar tamaño después de limpieza
print("Eventos después de eliminar nulos:", balanced_df.count())


Eventos después de eliminar nulos: 1660655


## Elimicación de outliers en la variable Price

In [6]:
from pyspark.sql.functions import col

# Obtener percentiles 1% y 99% para 'price'
quantiles = balanced_df.approxQuantile("price", [0.01, 0.99], 0.05)
min_price, max_price = quantiles

# Filtrar registros dentro del rango aceptable
balanced_df = balanced_df.filter((col("price") >= min_price) & (col("price") <= max_price))

# Verificar conteo
print("Eventos después de eliminar outliers en price:", balanced_df.count())


Eventos después de eliminar outliers en price: 1660655


## Converisón de variables de tiempo

In [7]:
from pyspark.sql.functions import to_timestamp, date_format

# Convertir 'event_time' a timestamp
balanced_df = balanced_df.withColumn("event_time_ts", to_timestamp("event_time", "yyyy-MM-dd HH:mm:ss"))

# Extraer componentes temporales
balanced_df = balanced_df.withColumn("event_date", date_format("event_time_ts", "yyyy-MM-dd"))
balanced_df = balanced_df.withColumn("event_hour", date_format("event_time_ts", "HH").cast("int"))
balanced_df = balanced_df.withColumn("event_day", date_format("event_time_ts", "E"))

# Mostrar ejemplo
balanced_df.select("event_time", "event_time_ts", "event_date", "event_hour", "event_day").show(5, truncate=False)


+-------------------+-------------------+----------+----------+---------+
|event_time         |event_time_ts      |event_date|event_hour|event_day|
+-------------------+-------------------+----------+----------+---------+
|2019-10-01 00:00:00|2019-10-01 00:00:00|2019-10-01|0         |Tue      |
|2019-10-01 00:00:01|2019-10-01 00:00:01|2019-10-01|0         |Tue      |
|2019-10-01 00:00:04|2019-10-01 00:00:04|2019-10-01|0         |Tue      |
|2019-10-01 00:00:05|2019-10-01 00:00:05|2019-10-01|0         |Tue      |
|2019-10-01 00:00:10|2019-10-01 00:00:10|2019-10-01|0         |Tue      |
+-------------------+-------------------+----------+----------+---------+
only showing top 5 rows



## Generación de particiones Mi por el día en que ha ocurrido el evento: event_day

Para evitar sesgos derivados de una distribución desigual de eventos según el día de la semana (event_day), se generaron particiones Mi balanceadas con un mismo número de registros por día. Posteriormente, se unieron todas las Mi para formar la muestra representativa M. Esto garantiza equidad en la representación de las distintas condiciones temporales en el modelo.



In [8]:
from pyspark.sql.functions import col
from functools import reduce
from pyspark.sql import DataFrame

# Verificar los días únicos en el dataset balanceado
dias_disponibles = [row["event_day"] for row in balanced_df.select("event_day").distinct().collect()]
print("Días encontrados:", dias_disponibles)

# Crear un diccionario para almacenar las particiones Mi
particiones_mi = {}

# Obtener el número mínimo de registros por día para balancear
conteo_por_dia = balanced_df.groupBy("event_day").count()
minimo_por_dia = conteo_por_dia.agg({"count": "min"}).collect()[0][0]
print(f"Número mínimo de instancias por día (para balancear): {minimo_por_dia}")

# Crear una partición Mi para cada día con igual número de registros
for dia in dias_disponibles:
    particiones_mi[dia] = balanced_df.filter(col("event_day") == dia).limit(minimo_por_dia)

# Unir todas las particiones para formar M final
muestra_m = reduce(DataFrame.unionAll, particiones_mi.values())

# Verificar que M es la unión de todos los Mi
print("Total de registros en la muestra M balanceada:", muestra_m.count())
muestra_m.groupBy("event_day").count().show()


Días encontrados: ['Tue', 'Wed', 'Sun', 'Sat', 'Thu', 'Mon', 'Fri']
Número mínimo de instancias por día (para balancear): 154482
Total de registros en la muestra M balanceada: 1081374
+---------+------+
|event_day| count|
+---------+------+
|      Tue|154482|
|      Wed|154482|
|      Sun|154482|
|      Sat|154482|
|      Thu|154482|
|      Mon|154482|
|      Fri|154482|
+---------+------+



# 2. Construcción Train – Test

## División y preprocesamiento por partición Mi (`event_day`)

Este bloque de código realiza la división de cada partición `Mi` (agrupadas por `event_day`) en conjuntos de entrenamiento (`Tri`) y prueba (`Tsi`), y luego aplica transformaciones por separado para cada uno.

### Objetivo

- Mantener una representación balanceada de los datos por día.
- Aplicar estandarización y codificación **sin filtrar información del conjunto de prueba**, es decir, evitando *data leakage*.

### Preprocesamiento aplicado

1. **Estandarización (Z-score)** del atributo numérico `price` usando `StandardScaler`:
   - Se utiliza `VectorAssembler` para ensamblar `price` en un vector.
   - `StandardScaler` se ajusta (`fit`) **solo con `Tri`** para calcular media y desviación estándar.
   - Luego se aplica (`transform`) tanto a `Tri` como a `Tsi`.

2. **Codificación de variables categóricas** usando `StringIndexer`:
   - También se ajusta (`fit`) **solo con `Tri`**.
   - Se aplica (`transform`) en ambos conjuntos (`Tri` y `Tsi`) para mantener coherencia.

In [None]:
from pyspark.sql import DataFrame
from functools import reduce
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer

# Inicializar listas para almacenar Tri y Tsi
train_scaled_list = []
test_scaled_list = []
train_unscaled_list = []
test_unscaled_list = []

# Parámetros de división
train_ratio = 0.8
test_ratio = 0.2
seed = 42

# Columnas a codificar
columns_to_index = [
    "event_type", "event_day", "event_hour", "event_date",
    "category_code", "brand", "product_id", "user_id"
]

# División y procesamiento por partición Mi (día)
for dia, mi_df in particiones_mi.items():
    print(f"\n Procesando día: {dia}")

    # === División de la partición Mi ===
    tri_raw, tsi_raw = mi_df.randomSplit([train_ratio, test_ratio], seed=seed)

    # === ESCALADO ===
    assembler = VectorAssembler(inputCols=["price"], outputCol="price_vector")
    tri_vec = assembler.transform(tri_raw)
    tsi_vec = assembler.transform(tsi_raw)

    scaler = StandardScaler(inputCol="price_vector", outputCol="price_scaled", withMean=True, withStd=True)
    scaler_model = scaler.fit(tri_vec)

    tri_scaled = scaler_model.transform(tri_vec)
    tsi_scaled = scaler_model.transform(tsi_vec)

    # === CODIFICACIÓN ===
    for col_name in columns_to_index:
        indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_indexed")
        indexer_model = indexer.fit(tri_scaled)
        tri_scaled = indexer_model.transform(tri_scaled)
        tsi_scaled = indexer_model.transform(tsi_scaled)

    # === Versión sin escalar (solo codificación) ===
    tri_unscaled = tri_raw
    tsi_unscaled = tsi_raw

    for col_name in columns_to_index:
        indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_indexed")
        indexer_model = indexer.fit(tri_unscaled)
        tri_unscaled = indexer_model.transform(tri_unscaled)
        tsi_unscaled = indexer_model.transform(tsi_unscaled)

    # === Acumular en listas globales ===
    train_scaled_list.append(tri_scaled)
    test_scaled_list.append(tsi_scaled)
    train_unscaled_list.append(tri_unscaled)
    test_unscaled_list.append(tsi_unscaled)

# === Unión global ===
train_scaled = reduce(DataFrame.unionAll, train_scaled_list)
test_scaled = reduce(DataFrame.unionAll, test_scaled_list)
train_unscaled = reduce(DataFrame.unionAll, train_unscaled_list)
test_unscaled = reduce(DataFrame.unionAll, test_unscaled_list)

# === Validaciones ===
print("Ejemplos (Train escalado):")
train_scaled.select("event_type", "price", "price_scaled").limit(5).show(truncate=False)

print(" Ejemplos (Test no escalado):")
test_unscaled.select("event_type", "price").limit(5).show(truncate=False)

print(" Columnas (escalado):", train_scaled.columns)
print("Columnas (no escalado):", train_unscaled.columns)



 Procesando día: Tue

 Procesando día: Wed

 Procesando día: Sun

 Procesando día: Sat

 Procesando día: Thu

 Procesando día: Mon

 Procesando día: Fri


## Validación

In [10]:
# === Conteo total por conjunto ===
print("=== Conteo total de registros ===")
print(f"Train (escalado): {train_scaled.count()} registros")
print(f"Test  (escalado): {test_scaled.count()} registros")
print(f"Train (no escalado): {train_unscaled.count()} registros")
print(f"Test  (no escalado): {test_unscaled.count()} registros")

# === Conteo por clase: event_type ===
print("\n=== Distribución de clases por conjunto ===")

print("\nTrain (escalado):")
train_scaled.groupBy("event_type").count().orderBy("event_type").show()

print("\nTest (escalado):")
test_scaled.groupBy("event_type").count().orderBy("event_type").show()

print("\nTrain (no escalado):")
train_unscaled.groupBy("event_type").count().orderBy("event_type").show()

print("\nTest (no escalado):")
test_unscaled.groupBy("event_type").count().orderBy("event_type").show()


=== Conteo total de registros ===
Train (escalado): 864031 registros
Test  (escalado): 217343 registros
Train (no escalado): 864031 registros
Test  (no escalado): 217343 registros

=== Distribución de clases por conjunto ===

Train (escalado):
+----------+------+
|event_type| count|
+----------+------+
|      cart|432998|
|  purchase|307600|
|      view|123433|
+----------+------+


Test (escalado):
+----------+------+
|event_type| count|
+----------+------+
|      cart|108740|
|  purchase| 77554|
|      view| 31049|
+----------+------+


Train (no escalado):
+----------+------+
|event_type| count|
+----------+------+
|      cart|432998|
|  purchase|307600|
|      view|123433|
+----------+------+


Test (no escalado):
+----------+------+
|event_type| count|
+----------+------+
|      cart|108740|
|  purchase| 77554|
|      view| 31049|
+----------+------+



## 3. Selección de métricas para medir calidad de resultados

Para evaluar adecuadamente los modelos de aprendizaje generados, es fundamental seleccionar métricas que se adapten a la naturaleza del problema, considerando también el gran volumen de datos procesados.

En este caso se han implementado dos enfoques de aprendizaje:
- **Supervisado**, mediante un modelo de clasificación multiclase (Random Forest).
- **No supervisado**, mediante agrupamiento (clustering) con KMeans.

A continuación, se detallan las métricas seleccionadas para cada enfoque.

---

### 3.1 Métricas para el modelo supervisado (Random Forest)

El objetivo del modelo supervisado es predecir el tipo de evento de usuario (`view`, `cart`, `purchase`), lo cual es un problema de **clasificación multiclase**. Por tanto, se seleccionan las siguientes métricas:

#### 1. **Accuracy (Precisión global)**

Proporción de predicciones correctas sobre el total de observaciones. Es útil como una visión general del rendimiento del modelo.

> **Fórmula:**  
> `Accuracy = (TP + TN) / (TP + FP + FN + TN)`

#### 2. **Precision (Precisión ponderada)**

Evalúa cuántas de las predicciones positivas fueron correctas. Se utiliza la versión ponderada que tiene en cuenta el soporte (frecuencia relativa) de cada clase.

> **Fórmula:**  
> `Precision = TP / (TP + FP)`

#### 3. **Recall (Sensibilidad ponderada)**

Mide cuántos verdaderos positivos fueron correctamente identificados. Se pondera para reflejar adecuadamente el impacto de cada clase.

> **Fórmula:**  
> `Recall = TP / (TP + FN)`

#### 4. **F1-Score (Promedio armónico ponderado)**

Combina precisión y recall. Es especialmente útil cuando hay desbalance o se necesita equilibrio entre ambas métricas.

> **Fórmula:**  
> `F1 = 2 * (Precision * Recall) / (Precision + Recall)`

#### Justificación:
- Todas las métricas son escalables en PySpark.
- La versión ponderada permite mantener validez incluso en presencia de desbalance leve.
- Estas métricas facilitan la comparación con otros modelos y algoritmos supervisados.

---

### 3.2 Métricas para el modelo no supervisado (KMeans)

El modelo de agrupamiento no cuenta con etiquetas reales, por lo que se utilizan métricas que evalúan la **calidad de la segmentación** de los datos.

#### 1. **Silhouette Score**

Mide la calidad de la separación entre clusters considerando cohesión interna y separación externa. Un valor cercano a 1 indica que los puntos están bien agrupados y separados de otros clusters.

> **Valor esperado:** entre -1 y 1  
> Un valor > 0.5 es generalmente aceptable.

#### 2. **SSE (Suma de errores cuadráticos)**

También conocida como **inercia**, mide la compactación de los clusters. Se utiliza para aplicar el método del codo ("elbow method") y seleccionar el número óptimo de clusters `k`.

> **Fórmula:**  
> `SSE = sum((p_i - c_j)^2)` para cada punto `p_i` y su centroide `c_j`

#### Justificación:
- Ambas métricas permiten medir la calidad sin necesidad de etiquetas.
- El Silhouette Score proporciona una medida relativa de desempeño entre múltiples ejecuciones con diferente `k`.
- El SSE es útil para detectar el punto de inflexión donde agregar más clusters no mejora significativamente el modelo.
- Son eficientes y escalables en PySpark.

---

### Conclusión general

Se utilizarán métricas distintas según el tipo de modelo implementado:

| Tipo de Modelo   | Métricas Seleccionadas                          |
|------------------|-------------------------------------------------|
| Supervisado      | Accuracy, Precision (weighted), Recall (weighted), F1-Score (weighted) |
| No supervisado   | Silhouette Score, SSE (inercia)                |

Estas métricas permiten evaluar correctamente el desempeño de los modelos en el contexto de Big Data, maximizando la utilidad de los resultados para identificar patrones significativos en los datos de comportamiento de usuarios.

Todas las métricas fueron seleccionadas no solo por su relevancia conceptual, sino también por su capacidad de ser computadas eficientemente en entornos de Big Data mediante PySpark, asegurando su viabilidad técnica durante la experimentación con millones de registros.
---

## Referencias
1. **Apache Spark.** (2025). *DecisionTreeClassifier — PySpark 3.5.5 documentation*. Recuperado de [Apache Spark Documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.DecisionTreeClassifier.html)
2. **Apache Spark.** (2025). *KMeans — PySpark 3.5.5 documentation*. Recuperado de [Apache Spark Documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.clustering.KMeans.html)
3. **Apache Spark.** (2025). *Clustering - Spark 3.5.5 Documentation*. Recuperado de [Apache Spark Documentation](https://spark.apache.org/docs/latest/ml-clustering.html)
4. **Apache Spark.** (2025). *Classification and regression - Spark 3.5.5 Documentation*. Recuperado de [Apache Spark Documentation](https://spark.apache.org/docs/latest/ml-classification-regression.html)
5. **GeeksforGeeks.** (2025). *K-Means Clustering using PySpark Python*. Recuperado de [GeeksforGeeks](https://www.geeksforgeeks.org/k-means-clustering-using-pyspark-python/)


## 4. Entrenamiento de Modelos de Aprendizaje

En esta sección se implementan dos modelos de aprendizaje: uno supervisado y uno no supervisado, con el objetivo de identificar patrones de comportamiento en grandes volúmenes de datos derivados del entorno de e-commerce.

---

### 4.1 Modelo Supervisado: Random Forest Classifier

Se seleccionó el modelo **Random Forest** como clasificador supervisado por su robustez ante datos ruidosos, su capacidad para manejar grandes volúmenes de datos y su buen desempeño incluso con pocas transformaciones previas. Se probó con diferentes combinaciones de hiperparámetros para mejorar el rendimiento.

#### Características del modelo:
- **Tipo**: Supervisado
- **Algoritmo**: Random Forest Classifier
- **Librería**: PySpark MLlib

#### Variables objetivo y predictoras:
- **Target (`label`)**: `event_type_indexed` (codificado como view, cart, purchase)
- **Features**: `price_scaled`, `event_day_indexed`, `event_hour_indexed`

#### Hiperparámetros utilizados:
| Hiperparámetro           | Valor                 | Descripción                                       |
|--------------------------|-----------------------|---------------------------------------------------|
| `numTrees`               | 100, 200              | Número de árboles en el bosque                   |
| `maxDepth`               | 10, 15, 20            | Profundidad máxima de cada árbol                 |
| `minInstancesPerNode`    | 5 (opcional)          | Mínimo de instancias por nodo para regularización|
| `featureSubsetStrategy`  | `"sqrt"` (opcional)   | Subconjunto aleatorio de características por árbol|
| `maxBins`                | 2000 (en algunos casos)| Número de divisiones para variables continuas    |
| `seed`                   | 42                    | Reproducibilidad del experimento                 |

#### Procesamiento:
- Los datos fueron balanceados por clase y divididos en subconjuntos \( M_i \) por día.
- Se aplicó codificación consistente a variables categóricas.
- Se ensamblaron las variables con `VectorAssembler` para formar la columna `features`.
- Se dividió el conjunto en entrenamiento y prueba (80/20).
- Se evaluaron distintas configuraciones para identificar la mejor combinación de hiperparámetros.

#### Prevención de sobreajuste:
- Se limitaron la profundidad de los árboles (`maxDepth`) y el número mínimo de instancias por nodo (`minInstancesPerNode`).
- Se utilizó `featureSubsetStrategy` para aumentar la diversidad de árboles.
- Las métricas se compararon entre entrenamiento y prueba para validar la generalización.

#### Métricas utilizadas:
- **Accuracy**
- **Precision**
- **Recall**
- **F1-score**

Estas métricas permitieron evaluar el desempeño global, el balance entre clases y la capacidad de identificar correctamente eventos importantes como "purchase".

---

### 4.2 Modelo No Supervisado: Bisecting KMeans

Como modelo no supervisado se seleccionó **BisectingKMeans**, una variante jerárquica de KMeans que divide los datos recursivamente para formar clústeres. Se eligió por su escalabilidad, buena separación y eficiencia sobre grandes volúmenes de datos.

#### Características del modelo:
- **Tipo**: No supervisado
- **Algoritmo**: Bisecting KMeans Clustering
- **Librería**: PySpark MLlib

#### Variables utilizadas:
- `price_scaled`
- `event_day_indexed`
- `event_hour_indexed`

#### Hiperparámetros evaluados:
| Hiperparámetro           | Valor probado       | Descripción                                      |
|--------------------------|---------------------|--------------------------------------------------|
| `k`                      | 2, 3, 4, 5           | Número de clústeres                              |
| `maxIter`                | 50                   | Máximo de iteraciones por partición              |
| `seed`                   | 42                   | Semilla para reproducibilidad                    |

#### Estrategia de entrenamiento:
- Se generaron vectores de características con `VectorAssembler`.
- Se entrenaron modelos con diferentes valores de `k` (número de clústeres).
- Se aplicó evaluación usando **Silhouette Score** y **SSE** (Suma de Errores Cuadráticos) para comparar agrupaciones.
- Se comparó el desempeño entre el conjunto de entrenamiento y prueba para verificar la estabilidad.

#### Prevención de sobreajuste:
- Se evitó usar valores de `k` demasiado grandes que generaran clústeres artificiales.
- Se comparó la estabilidad del Silhouette Score entre entrenamiento y prueba.

#### Métricas utilizadas:
- **Silhouette Score**
- **SSE (Sum of Squared Errors)**

Estas métricas permitieron validar la calidad de los agrupamientos generados, observando la cohesión y separación entre clústeres en el espacio vectorial.

---

En ambos enfoques se garantizó la reproducibilidad, el uso de datos balanceados por clase, y se documentó cada paso del entrenamiento para asegurar que los modelos fueran confiables, interpretables y aplicables a un contexto real de análisis de comportamiento en plataformas de comercio electrónico.


## Modelo supervisado RandomForest

### Reindexación de variables categóricas

En esta celda se realiza una reindexación completa de las variables categóricas para asegurar la consistencia de los valores indexados en todos los conjuntos de datos (`train_scaled`, `test_scaled`, `train_unscaled`, `test_unscaled`).

#### ¿Por qué es necesario este paso?

Durante el preprocesamiento inicial, los índices de las variables categóricas se generaron por separado en cada conjunto o partición. Esto puede ocasionar inconsistencias, por ejemplo:

- La categoría `purchase` podría recibir el índice `0` en `train_scaled`, pero `1` en `test_scaled`.
- Esto causa errores durante el entrenamiento de modelos que asumen consistencia en los valores de entrada, como los clasificadores basados en árboles.

#### ¿Qué se hace en esta celda?

1. Se eliminan las columnas `*_indexed` existentes, si las hubiera, para evitar conflictos.
2. Se entrena un nuevo `StringIndexer` sobre `train_scaled` para cada variable categórica.
3. Los modelos `StringIndexer` se aplican a todos los conjuntos (`train_scaled`, `test_scaled`, `train_unscaled`, `test_unscaled`) para mantener los mismos índices en todos.
4. Se establece el parámetro `handleInvalid="keep"` para evitar errores en caso de que aparezcan categorías no vistas durante el entrenamiento del indexador.

Este proceso asegura que los modelos de machine learning reciban entradas coherentes, lo cual es fundamental para obtener resultados confiables y evitar errores de ejecución.


In [18]:
from pyspark.ml.feature import StringIndexer

# Columnas categóricas a codificar
columns_to_index = [
    "event_type", "event_day", "event_hour", "event_date",
    "category_code", "brand", "product_id", "user_id"
]

# Eliminar columnas indexadas existentes si ya estaban
for col in columns_to_index:
    indexed_col = f"{col}_indexed"
    for df_name in ["train_scaled", "test_scaled", "train_unscaled", "test_unscaled"]:
        df = globals()[df_name]
        if indexed_col in df.columns:
            df = df.drop(indexed_col)
        globals()[df_name] = df  # Actualiza variable global

# Aplicar StringIndexer desde train_scaled y transformar en todos
for col in columns_to_index:
    indexed_col = f"{col}_indexed"
    indexer = StringIndexer(inputCol=col, outputCol=indexed_col, handleInvalid="keep")
    model = indexer.fit(train_scaled)

    train_scaled = model.transform(train_scaled)
    test_scaled = model.transform(test_scaled)
    train_unscaled = model.transform(train_unscaled)
    test_unscaled = model.transform(test_unscaled)


In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# === Ensamblador de características ===
assembler = VectorAssembler(
    inputCols=["price_scaled", "event_day_indexed", "event_hour_indexed"],
    outputCol="features"
)

# === Definición del modelo Random Forest con hiperparámetros ===
rf = RandomForestClassifier(
    labelCol="event_type_indexed",   # Columna objetivo
    featuresCol="features",          # Columna de características
    numTrees=100,                    # Hiperparámetro: número de árboles
    maxDepth=15,                     # Hiperparámetro: profundidad máxima
    seed=42                          # Hiperparámetro: reproducibilidad
)

# === Pipeline de ensamblado y modelo ===
pipeline = Pipeline(stages=[assembler, rf])

# === Entrenamiento del modelo ===
model = pipeline.fit(train_scaled)

# === Evaluación ===
evaluator = MulticlassClassificationEvaluator(
    labelCol="event_type_indexed",
    predictionCol="prediction"
)

# === Predicciones y métricas en conjunto de entrenamiento ===
train_preds = model.transform(train_scaled)
train_metrics = {
    "Accuracy": evaluator.setMetricName("accuracy").evaluate(train_preds),
    "Precision": evaluator.setMetricName("weightedPrecision").evaluate(train_preds),
    "Recall": evaluator.setMetricName("weightedRecall").evaluate(train_preds),
    "F1-score": evaluator.setMetricName("f1").evaluate(train_preds)
}

# === Predicciones y métricas en conjunto de prueba ===
test_preds = model.transform(test_scaled)
test_metrics = {
    "Accuracy": evaluator.setMetricName("accuracy").evaluate(test_preds),
    "Precision": evaluator.setMetricName("weightedPrecision").evaluate(test_preds),
    "Recall": evaluator.setMetricName("weightedRecall").evaluate(test_preds),
    "F1-score": evaluator.setMetricName("f1").evaluate(test_preds)
}

# === Mostrar comparación ===
print("\n--- Comparación de métricas ---")
for metric in train_metrics:
    print(f"{metric}:")
    print(f"  Train: {train_metrics[metric]:.4f}")
    print(f"  Test : {test_metrics[metric]:.4f}")



--- Comparación de métricas ---
Accuracy:
  Train: 0.6537
  Test : 0.6500
Precision:
  Train: 0.6408
  Test : 0.6338
Recall:
  Train: 0.6537
  Test : 0.6500
F1-score:
  Train: 0.5884
  Test : 0.5842


In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# === Ensamblador de características ===
assembler = VectorAssembler(
    inputCols=["price_scaled", "event_day_indexed", "event_hour_indexed"],
    outputCol="features"
)

# === Definición del modelo Random Forest con hiperparámetros ajustados ===
rf = RandomForestClassifier(
    labelCol="event_type_indexed",
    featuresCol="features",
    numTrees=200,                   # Aumentamos número de árboles
    maxDepth=20,                    # Profundidad moderadamente alta
    minInstancesPerNode=5,          # Evita árboles muy específicos
    featureSubsetStrategy="sqrt",   # Reduce correlación entre árboles
    seed=42
)

# === Pipeline de ensamblado y modelo ===
pipeline = Pipeline(stages=[assembler, rf])

# === Entrenamiento del modelo ===
model = pipeline.fit(train_scaled)

# === Evaluador ===
evaluator = MulticlassClassificationEvaluator(
    labelCol="event_type_indexed",
    predictionCol="prediction"
)

# === Evaluación en conjunto de entrenamiento ===
train_preds = model.transform(train_scaled)
train_metrics = {
    "Accuracy": evaluator.setMetricName("accuracy").evaluate(train_preds),
    "Precision": evaluator.setMetricName("weightedPrecision").evaluate(train_preds),
    "Recall": evaluator.setMetricName("weightedRecall").evaluate(train_preds),
    "F1-score": evaluator.setMetricName("f1").evaluate(train_preds)
}

# === Evaluación en conjunto de prueba ===
test_preds = model.transform(test_scaled)
test_metrics = {
    "Accuracy": evaluator.setMetricName("accuracy").evaluate(test_preds),
    "Precision": evaluator.setMetricName("weightedPrecision").evaluate(test_preds),
    "Recall": evaluator.setMetricName("weightedRecall").evaluate(test_preds),
    "F1-score": evaluator.setMetricName("f1").evaluate(test_preds)
}

# === Mostrar comparación ===
print("\n--- Comparación de métricas ---")
for metric in train_metrics:
    print(f"{metric}:")
    print(f"  Train: {train_metrics[metric]:.4f}")
    print(f"  Test : {test_metrics[metric]:.4f}")



--- Comparación de métricas ---
Accuracy:
  Train: 0.6547
  Test : 0.6501
Precision:
  Train: 0.6415
  Test : 0.6332
Recall:
  Train: 0.6547
  Test : 0.6501
F1-score:
  Train: 0.5956
  Test : 0.5903


In [25]:

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# === Ensamblador de características ===
assembler = VectorAssembler(
    inputCols=["price_scaled", "event_day_indexed", "event_hour_indexed"],
    outputCol="features"
)

# === Definición del modelo Random Forest con hiperparámetros ajustados ===
rf = RandomForestClassifier(
    labelCol="event_type_indexed",
    featuresCol="features",
    numTrees=100,
    maxDepth=10,
    maxBins=2000,
    impurity="gini",
    seed=42
)

# === Pipeline de ensamblado y modelo ===
pipeline = Pipeline(stages=[assembler, rf])

# === Entrenamiento del modelo ===
model = pipeline.fit(train_scaled)

# === Evaluador ===
evaluator = MulticlassClassificationEvaluator(
    labelCol="event_type_indexed",
    predictionCol="prediction"
)

# === Evaluación en conjunto de entrenamiento ===
train_preds = model.transform(train_scaled)
train_metrics = {
    "Accuracy": evaluator.setMetricName("accuracy").evaluate(train_preds),
    "Precision": evaluator.setMetricName("weightedPrecision").evaluate(train_preds),
    "Recall": evaluator.setMetricName("weightedRecall").evaluate(train_preds),
    "F1-score": evaluator.setMetricName("f1").evaluate(train_preds)
}

# === Evaluación en conjunto de prueba ===
test_preds = model.transform(test_scaled)
test_metrics = {
    "Accuracy": evaluator.setMetricName("accuracy").evaluate(test_preds),
    "Precision": evaluator.setMetricName("weightedPrecision").evaluate(test_preds),
    "Recall": evaluator.setMetricName("weightedRecall").evaluate(test_preds),
    "F1-score": evaluator.setMetricName("f1").evaluate(test_preds)
}

# === Mostrar comparación ===
print("\n--- Comparación de métricas ---")
for metric in train_metrics:
    print(f"{metric}:")
    print(f"  Train: {train_metrics[metric]:.4f}")
    print(f"  Test : {test_metrics[metric]:.4f}")



--- Comparación de métricas ---
Accuracy:
  Train: 0.6575
  Test : 0.6556
Precision:
  Train: 0.6579
  Test : 0.6547
Recall:
  Train: 0.6575
  Test : 0.6556
F1-score:
  Train: 0.5783
  Test : 0.5754


In [31]:

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# === Ensamblador de características ===
assembler = VectorAssembler(
    inputCols=["price_scaled", "event_day_indexed", "event_hour_indexed"],
    outputCol="features"
)

# === Definición del modelo Random Forest con hiperparámetros ajustados ===
rf = RandomForestClassifier(
    labelCol="event_type_indexed",
    featuresCol="features",
    numTrees=200,
    maxDepth=15,
    minInstancesPerNode=5,
    maxBins=2000,
    impurity="gini",
    featureSubsetStrategy="sqrt",
    seed=42
)

# === Pipeline de ensamblado y modelo ===
pipeline = Pipeline(stages=[assembler, rf])

# === Entrenamiento del modelo ===
model = pipeline.fit(train_scaled)

# === Evaluador ===
evaluator = MulticlassClassificationEvaluator(
    labelCol="event_type_indexed",
    predictionCol="prediction"
)

# === Evaluación en conjunto de entrenamiento ===
train_preds = model.transform(train_scaled)
train_metrics = {
    "Accuracy": evaluator.setMetricName("accuracy").evaluate(train_preds),
    "Precision": evaluator.setMetricName("weightedPrecision").evaluate(train_preds),
    "Recall": evaluator.setMetricName("weightedRecall").evaluate(train_preds),
    "F1-score": evaluator.setMetricName("f1").evaluate(train_preds)
}

# === Evaluación en conjunto de prueba ===
test_preds = model.transform(test_scaled)
test_metrics = {
    "Accuracy": evaluator.setMetricName("accuracy").evaluate(test_preds),
    "Precision": evaluator.setMetricName("weightedPrecision").evaluate(test_preds),
    "Recall": evaluator.setMetricName("weightedRecall").evaluate(test_preds),
    "F1-score": evaluator.setMetricName("f1").evaluate(test_preds)
}

# === Mostrar comparación ===
print("\n--- Comparación de métricas ---")
for metric in train_metrics:
    print(f"{metric}:")
    print(f"  Train: {train_metrics[metric]:.4f}")
    print(f"  Test : {test_metrics[metric]:.4f}")



--- Comparación de métricas ---
Accuracy:
  Train: 0.6802
  Test : 0.6689
Precision:
  Train: 0.6976
  Test : 0.6749
Recall:
  Train: 0.6802
  Test : 0.6689
F1-score:
  Train: 0.6215
  Test : 0.6076


## Modelo No Supervisado K-Means

In [30]:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
import pandas as pd

# === VectorAssembler (si aún no se ha aplicado) ===
clustering_features = ["price_scaled", "event_day_indexed", "event_hour_indexed"]
assembler = VectorAssembler(inputCols=clustering_features, outputCol="features")

train_vector = assembler.transform(train_scaled).select("features")
test_vector = assembler.transform(test_scaled).select("features")

# === Evaluador de Silhouette Score ===
evaluator = ClusteringEvaluator(
    featuresCol="features",
    predictionCol="prediction",
    metricName="silhouette"
)

# === Probar múltiples valores de k ===
results = []

for k in range(2, 6):  # Puedes ampliar este rango
    print(f"\nEvaluando BisectingKMeans con k={k}")

    bkm = BisectingKMeans(
        k=k,
        seed=42,
        maxIter=50,
        featuresCol="features",
        predictionCol="prediction"
    )

    model = bkm.fit(train_vector)

    train_preds = model.transform(train_vector)
    test_preds = model.transform(test_vector)

    silhouette_train = evaluator.evaluate(train_preds)
    silhouette_test = evaluator.evaluate(test_preds)
    sse = model.computeCost(train_vector)

    results.append({
        "k": k,
        "Silhouette (Train)": round(silhouette_train, 4),
        "Silhouette (Test)": round(silhouette_test, 4),
        "SSE (Train)": round(sse, 2)
    })

# === Mostrar tabla ordenada por Silhouette Test Score ===
df_results = pd.DataFrame(results).sort_values(by="Silhouette (Test)", ascending=False)

from IPython.display import display
print("\n--- Comparación de modelos BisectingKMeans ---")
display(df_results)



Evaluando BisectingKMeans con k=2





Evaluando BisectingKMeans con k=3





Evaluando BisectingKMeans con k=4





Evaluando BisectingKMeans con k=5





--- Comparación de modelos BisectingKMeans ---


Unnamed: 0,k,Silhouette (Train),Silhouette (Test),SSE (Train)
0,2,0.6965,0.6972,11352677.25
2,4,0.4906,0.49,6210845.38
3,5,0.477,0.4773,5347390.95
1,3,0.4449,0.4445,9221124.2


# 5. Análisis de resultados



En esta sección se presenta un análisis detallado de los modelos de aprendizaje supervisado (Random Forest) y no supervisado (Bisecting KMeans), con base en las métricas obtenidas, los hiperparámetros utilizados y las decisiones de preprocesamiento, particularmente la partición de la muestra en subconjuntos \( M_i \).

---

### Modelos Supervisados – Random Forest

Se implementaron y compararon tres configuraciones del modelo `RandomForestClassifier`, variando los hiperparámetros `numTrees`, `maxDepth`, `minInstancesPerNode`, `maxBins` y `featureSubsetStrategy`. Los resultados fueron los siguientes:

| Modelo          | numTrees | maxDepth | minInstancesPerNode | maxBins | featureSubsetStrategy | Accuracy (Train) | Accuracy (Test) | Precision (Train) | Precision (Test) | Recall (Train) | Recall (Test) | F1-score (Train) | F1-score (Test) |
|-----------------|----------|----------|----------------------|---------|------------------------|------------------|-----------------|-------------------|------------------|----------------|---------------|------------------|-----------------|
| RF Base         | 100      | 15       | -                    | -       | -                      | 0.6537           | 0.6500          | 0.6408            | 0.6338           | 0.6537         | 0.6500        | 0.5884           | 0.5842          |
| RF Mid          | 100      | 10       | -                    | 2000    | -                      | 0.6575           | 0.6556          | 0.6579            | 0.6547           | 0.6575         | 0.6556        | 0.5783           | 0.5754          |
| RF Optimizado   | 200      | 15       | 5                    | 2000    | sqrt                   | 0.6802           | 0.6689          | 0.6976            | 0.6749           | 0.6802         | 0.6689        | 0.6215           | 0.6076          |

**Fortalezas**  
- El modelo Random Forest mostró buena generalización, con métricas muy similares entre entrenamiento y prueba.  
- La configuración optimizada logró mejorar significativamente el rendimiento general, en especial en F1-score.  
- El uso de `featureSubsetStrategy="sqrt"` y `minInstancesPerNode=5` redujo el sobreajuste y mejoró la estabilidad.

**Áreas de oportunidad**  
- Las métricas aún no superan el umbral del 70%, lo que indica que el modelo tiene espacio de mejora.  
- Se podrían explorar nuevas variables, técnicas de ingeniería de características o más combinaciones de hiperparámetros mediante validación cruzada.

---

### Modelos No Supervisados – Bisecting KMeans

Se evaluó el rendimiento del modelo `BisectingKMeans` con distintos valores de `k` (de 2 a 5), utilizando como criterios de evaluación el `Silhouette Score` y la `SSE (Suma de errores cuadráticos)`:

| k | Silhouette (Train) | Silhouette (Test) | SSE (Train)     |
|---|---------------------|--------------------|------------------|
| 2 | 0.6965              | 0.6972             | 11352677.25      |
| 4 | 0.4906              | 0.4900             | 6210845.38       |
| 5 | 0.4770              | 0.4773             | 5347390.95       |
| 3 | 0.4449              | 0.4445             | 9221124.20       |

**Fortalezas**  
- El valor óptimo de `k=2` obtuvo el mayor Silhouette Score (~0.6972), lo que indica que los datos están bien agrupados en dos clústeres principales.  
- Las diferencias entre las métricas de entrenamiento y prueba fueron mínimas, lo que indica buena generalización del modelo no supervisado.

**Áreas de oportunidad**  
- Aunque se detectó una buena segmentación interna, el modelo no permite interpretar directamente las etiquetas.  
- Se podrían analizar las características promedio de cada clúster para identificar perfiles de comportamiento más claramente.

---

### Reflexión sobre la división en subconjuntos \( M_i \)

Como parte fundamental del enfoque Big Data, la muestra representativa \( M \) fue dividida en subconjuntos \( M_i \) con base en la variable de caracterización `event_day`. Esta división permitió generar conjuntos de entrenamiento y prueba balanceados y representativos, evitando sesgos derivados de la distribución temporal de los eventos.

Aunque la división en \( M_i \) no tuvo un impacto directo en el incremento de métricas como el F1-score o la accuracy, fue esencial para asegurar:
- Que cada partición tuviera una representación equitativa de las clases (`view`, `cart`, `purchase`).
- Que los conjuntos derivados conservaran la estructura natural de los datos, favoreciendo la estabilidad del entrenamiento.
- Que los modelos no se vieran influenciados por posibles sesgos de días con pocos eventos o distribuciones atípicas.

Esta práctica también simula escenarios reales de análisis por bloques o regiones, donde los datos se deben procesar de forma distribuida o segmentada.

---

### Conclusión General

El modelo Random Forest permitió predecir de forma razonable los tipos de eventos (`view`, `cart`, `purchase`) y mejoró con ajustes de hiperparámetros. Por otro lado, Bisecting KMeans reveló una estructura clara en los datos al detectar dos agrupaciones naturales consistentes.

Ambos enfoques demostraron su utilidad: Random Forest para clasificación predictiva y Bisecting KMeans para análisis exploratorio. El proceso mostró la importancia de dividir correctamente la muestra, ajustar hiperparámetros cuidadosamente y evaluar tanto en conjuntos de entrenamiento como de prueba para validar la calidad de los resultados.
