In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from functools import reduce  # Manejo de listas
from pyspark.sql.functions import unix_timestamp  # Formatos de fecha
from pyspark.sql.functions import col #Manipulación de columnas
from pyspark.sql.functions import when, lit #Condiciones y remplazos en Data Frames
from pyspark.sql.functions import regexp_replace # Remplazar Expresiones regulares
from pyspark.sql.functions import upper # Upper
from pyspark.sql.functions import trim # Trim
from pyspark.sql.functions import unix_timestamp, from_unixtime

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 3, Finished, Available, Finished)

In [3]:
# File location and type
file_location = "Files/clothes_price_prediction_dat.csv"
file_type = "csv"

df_raw = spark.read.format("csv").options(header=True, delimiter=';', encoding='utf-8').load(file_location)



# Filtrar columnas que **NO** comienzan con '_c'
df_cleaned = df_raw.select([col(c) for c in df_raw.columns if not c.startswith("_c")])


# Obtener la primera fila como encabezado
first_row = df_cleaned.first()
columns = [str(cell).strip() for cell in first_row]

# Aplicar los nuevos nombres de columna
df_renamed = df_cleaned.rdd.zipWithIndex().filter(lambda row: row[1] > 0).keys().toDF(columns)


display(df_renamed)

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 529b294f-1461-4441-96d6-282ba1f7b1b8)

In [4]:
from pyspark.sql.functions import split, col

# Nombre original de la columna (con coma)
col_name = "Brand,Category,Color,Size,Material,Price"

# Separar los valores en una nueva columna
df_split = df_renamed.withColumn("split_col", split(col(f"`{col_name}`"), ","))

# Asignar los elementos a nuevas columnas
df_final = df_split.select(
    col("split_col")[0].alias("Brand"),
    col("split_col")[1].alias("Category"),
    col("split_col")[2].alias("Color"),
    col("split_col")[3].alias("Size"),
    col("split_col")[4].alias("Material"),
    col("split_col")[5].alias("Price")
)

df_final.show(truncate=False)

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 6, Finished, Available, Finished)

+------------+--------+------+----+---------+-----+
|Brand       |Category|Color |Size|Material |Price|
+------------+--------+------+----+---------+-----+
|New Balance |Dress   |White |XS  |Nylon    |182  |
|New Balance |Jeans   |Black |XS  |Silk     |57   |
|Under Armour|Dress   |Red   |M   |Wool     |127  |
|Nike        |Shoes   |Green |M   |Cotton   |77   |
|Adidas      |Sweater |White |M   |Nylon    |113  |
|Reebok      |Jacket  |Red   |XL  |Nylon    |19   |
|Puma        |Jacket  |Red   |XXL |Polyester|31   |
|Adidas      |Dress   |Red   |XS  |Denim    |46   |
|Reebok      |Dress   |Black |S   |Wool     |97   |
|Adidas      |Jeans   |Yellow|L   |Wool     |80   |
|Nike        |Jacket  |White |XL  |Silk     |98   |
|Puma        |Jacket  |White |XL  |Silk     |150  |
|Under Armour|Jacket  |White |S   |Nylon    |68   |
|Under Armour|T-shirt |Blue  |XL  |Polyester|49   |
|New Balance |Jacket  |Green |S   |Nylon    |97   |
|Under Armour|Jacket  |Green |XXL |Silk     |184  |
|New Balance

In [5]:
from pyspark.sql.types import IntegerType

# Convertir la columna Price a entero
df_final = df_final.withColumn("Price", col("Price").cast(IntegerType()))

df_final.printSchema()
df_final.show(truncate=False)

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 7, Finished, Available, Finished)

root
 |-- Brand: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Material: string (nullable = true)
 |-- Price: integer (nullable = true)

+------------+--------+------+----+---------+-----+
|Brand       |Category|Color |Size|Material |Price|
+------------+--------+------+----+---------+-----+
|New Balance |Dress   |White |XS  |Nylon    |182  |
|New Balance |Jeans   |Black |XS  |Silk     |57   |
|Under Armour|Dress   |Red   |M   |Wool     |127  |
|Nike        |Shoes   |Green |M   |Cotton   |77   |
|Adidas      |Sweater |White |M   |Nylon    |113  |
|Reebok      |Jacket  |Red   |XL  |Nylon    |19   |
|Puma        |Jacket  |Red   |XXL |Polyester|31   |
|Adidas      |Dress   |Red   |XS  |Denim    |46   |
|Reebok      |Dress   |Black |S   |Wool     |97   |
|Adidas      |Jeans   |Yellow|L   |Wool     |80   |
|Nike        |Jacket  |White |XL  |Silk     |98   |
|Puma        |Jacket  |White |XL  |Sil

In [6]:
df_final_sin_duplicados = df_final.dropDuplicates()

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 8, Finished, Available, Finished)

In [8]:
df_final_sin_duplicados.repartition(1).write.format('parquet').mode('overwrite').save('Files/clothes/dfclothes')

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 10, Finished, Available, Finished)

In [9]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# 1. Crear sesión Spark (ya viene por defecto en Databricks, se incluye por claridad)
#spark = SparkSession.builder.getOrCreate()

# 2. Leer archivo Parquet
df = spark.read.parquet("Files/clothes/dfclothes")

# 3. Mostrar datos
display(df)
df.printSchema()

# 4. Convertir columna Price a entero si es necesario
from pyspark.sql.functions import col
df = df.withColumn("Price", col("Price").cast("int"))

# 5. Eliminar nulos (opcional según tus datos)
df = df.na.drop(subset=["Brand", "Category", "Color", "Size", "Material", "Price"])

# 6. Definir columnas categóricas
categorical_cols = ["Brand", "Category", "Color", "Size", "Material"]

# 7. Indexar y codificar variables categóricas
indexers = [StringIndexer(inputCol=c, outputCol=c + "_Index", handleInvalid="keep") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=c + "_Index", outputCol=c + "_Vec") for c in categorical_cols]

# 8. Ensamblar características
feature_cols = [c + "_Vec" for c in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# 9. Modelo de regresión
lr = LinearRegression(featuresCol="features", labelCol="Price")

# 10. Crear pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])

# 11. Dividir datos
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# 12. Entrenar modelo
model = pipeline.fit(train_data)

# 13. Predecir sobre el conjunto de prueba
predictions = model.transform(test_data)

# 14. Evaluar modelo
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction")

rmse = evaluator.setMetricName("rmse").evaluate(predictions)
r2 = evaluator.setMetricName("r2").evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {rmse:.2f}")
print(f"R-squared (R2): {r2:.2f}")

# 15. Mostrar algunas predicciones
display(predictions.select("Brand", "Category", "Color", "Size", "Material", "Price", "prediction"))

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d2b0b648-f32f-4852-9001-9b70c821ee56)

root
 |-- Brand: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Material: string (nullable = true)
 |-- Price: integer (nullable = true)



Root Mean Squared Error (RMSE): 54.56
R-squared (R2): -0.01


SynapseWidget(Synapse.DataFrame, 7d4e74b3-5b98-40a1-87a2-c300371ed022)

In [10]:
# Guardar modelo en DBFS
model.write().overwrite().save("Files/clothes/modeldfclothes")

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 12, Finished, Available, Finished)

In [11]:
from pyspark.ml import PipelineModel

# Cargar el modelo desde DBFS
model_loaded = PipelineModel.load("Files/clothes/modeldfclothes")

# Realizar predicciones y guardar el resultado en un DataFrame
predictions_df = model_loaded.transform(test_data).select(
    "Brand", "Category", "Color", "Size", "Material", "Price", "prediction"
)

# Mostrar resultado
display(predictions_df)

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ed02624f-fe41-4a9f-be18-c9a9b8648402)

In [12]:
predictions_df.repartition(1).write.format('parquet').mode('overwrite').save('Files/clothes/dfmodelclothes')

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 14, Finished, Available, Finished)

In [13]:
df = spark.read.parquet("Files/clothes/dfmodelclothes/part-00000-c35c2ede-ee09-422e-9c47-eea3d540307a-c000.snappy.parquet")
# df now is a Spark DataFrame containing parquet data from "Files/clothes/dfmodelclothes/part-00000-c35c2ede-ee09-422e-9c47-eea3d540307a-c000.snappy.parquet".
display(df)

StatementMeta(, 7a3a2f9c-802a-45e7-9add-5a3909b12f5f, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 054a3930-a7b3-4542-a1ef-5c13f1475865)