<a href="https://colab.research.google.com/github/nelsonbeas33/Datos-Masivos/blob/main/tarea4/tarea4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Este es una prueba de concepto de lo aprendido en clase sobre el uso de spark para el proyecto

In [None]:
#docker

# Usar una imagen base de Bitnami Spark
FROM bitnami/spark:latest

# Cambiar a usuario root para instalar paquetes
USER root

# Actualizar el sistema e instalar Python y pip
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    && rm -rf /var/lib/apt/lists/*

# Instalar PyTorch y sus dependencias
RUN pip3 install torch torchvision torchaudio --extra-index-url https://download.pytorch.org/whl/cpu

#falta instalar panda
pip install pandas

# Establecer el directorio de trabajo
WORKDIR /app

# Comando por defecto para iniciar el contenedor como maestro de Spark
CMD ["/opt/bitnami/spark/bin/spark-class", "org.apache.spark.deploy.master.Master"]

usaremos spark para realizar el preprocesamiento de los datos del proyecto, se pretende generar una agrupación por orden de compra

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import time
from torch.utils.data import Dataset, DataLoader
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list
from pyspark.sql.types import IntegerType
import random
from pyspark.sql import functions as F
import os
import json
from torch.utils.data import random_split


# Iniciar sesión de Spark
spark = SparkSession.builder \
    .appName("Transformer Model with Spark") \
    .getOrCreate()

# Cargar archivos CSV
order_products_df = spark.read.option("header", "true").csv("/opt/order_products__prior.csv")
products_df = spark.read.option("header", "true").csv("/opt/products.csv")

# Renombrar las columnas 'product_id' para evitar ambigüedad después del join
order_products_df = order_products_df.withColumnRenamed("product_id", "order_product_id").limit(100)
products_df = products_df.withColumnRenamed("product_id", "product_id")

# Asegurar que las columnas 'order_product_id' y 'product_id' sean enteros
order_products_df = order_products_df.withColumn("order_product_id", order_products_df["order_product_id"].cast(IntegerType()))
products_df = products_df.withColumn("product_id", products_df["product_id"].cast(IntegerType()))

# Mostrar las primeras filas de ambos DataFrames para ver su estructura
order_products_df.show(5)
products_df.show(5)

# Realizar un join entre los productos y las órdenes
joined_df = order_products_df.join(products_df, order_products_df.order_product_id == products_df.product_id)
# Mostrar el resultado del join para verificar
joined_df.show(5)

# Agrupar por 'order_id' y crear las listas de productos, pasillos y departamentos
order_grouped_df = joined_df.groupBy("order_id").agg(
    F.collect_list("order_product_id").alias("product_ids"),
    F.collect_list("aisle_id").alias("aisle_ids"),
    F.collect_list("department_id").alias("department_ids")
)

order_grouped_df = order_grouped_df \
    .withColumn("aisle_ids", F.expr("transform(aisle_ids, x -> cast(x as int))")) \
    .withColumn("department_ids", F.expr("transform(department_ids, x -> cast(x as int))"))


order_grouped_df.show(30)

# Calcular el máximo valor de los IDs en cada columna y convertir el resultado a entero
max_product_id = order_grouped_df.selectExpr("explode(product_ids) as product_id") \
    .agg(F.max("product_id").alias("max_product_id")) \
    .collect()[0]["max_product_id"]

# Calcular el máximo de cada columna después de la conversión
max_aisle_id = order_grouped_df.selectExpr("explode(aisle_ids) as aisle_id") \
    .agg(F.max("aisle_id").alias("max_aisle_id")) \
    .collect()[0]["max_aisle_id"]

max_department_id = order_grouped_df.selectExpr("explode(department_ids) as department_id") \
    .agg(F.max("department_id").alias("max_department_id")) \
    .collect()[0]["max_department_id"]


print(f"Máximo índice de max_product_id: {max_product_id}")
print(f"Máximo índice de aisle_ids: {max_aisle_id}")
print(f"Máximo índice de max_department_id: {max_department_id}")

order_grouped_df.printSchema()



# Asegurarse de que los valores sean enteros y calcular el tamaño del vocabulario
product_vocab_size = int(max_product_id) + 1  # +1 para padding
aisle_vocab_size = int(max_aisle_id) + 1  # +1 para padding
department_vocab_size = int(max_department_id) + 1  # +1 para padding

#parametros del modelo
embed_dim = 64

print(f"Vocabulario de Productos: {product_vocab_size}")
print(f"Vocabulario de Pasillos: {aisle_vocab_size}")
print(f"Vocabulario de Departamentos: {department_vocab_size}")



