In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler, Imputer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Iniciar sesión de Spark
spark = SparkSession.builder.appName("Diamonds ML Pipeline").getOrCreate()

# Parte 1: Definir el esquema y cargar los datos desde la URL
schema = StructType([
    StructField("carat", DoubleType(), True),
    StructField("cut", StringType(), True),
    StructField("color", StringType(), True),
    StructField("clarity", StringType(), True),
    StructField("depth", DoubleType(), True),
    StructField("table", DoubleType(), True),
    StructField("price", IntegerType(), True),
    StructField("x", DoubleType(), True),
    StructField("y", DoubleType(), True),
    StructField("z", DoubleType(), True)
])

# Cargar los datos con el esquema definido
data_url = "https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/diamonds.csv"
df_spark = spark.read.csv(data_url, header=True, schema=schema)


ModuleNotFoundError: No module named 'distutils'

In [None]:

# ----- Parte 2: PIPELINE REGRESIÓN ----- 
num_features = ["carat", "depth", "table", "x", "y", "z"]
cat_features = ["cut", "color", "clarity"]

# Preprocesamiento
imputer = Imputer(inputCols=num_features, outputCols=[f"{col}_imputed" for col in num_features])
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_indexed") for col in cat_features]
encoders = [OneHotEncoder(inputCol=f"{col}_indexed", outputCol=f"{col}_encoded") for col in cat_features]
assembler = VectorAssembler(inputCols=[f"{col}_imputed" for col in num_features] + [f"{col}_encoded" for col in cat_features], outputCol="features")
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
regressor = RandomForestRegressor(featuresCol="scaled_features", labelCol="price")

# Definir el pipeline de regresión
pipeline_reg = Pipeline(stages=[imputer] + indexers + encoders + [assembler, scaler, regressor])
model_reg = pipeline_reg.fit(df_spark)
predictions_reg = model_reg.transform(df_spark)

# Evaluación Regresión
evaluator_reg = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator_reg.evaluate(predictions_reg)
print(f"RMSE del modelo de regresión: {rmse}")


In [None]:

# ----- Parte 3: PIPELINE CLASIFICACIÓN ----- 
# Ajustar el tamaño del vector de características para el clasificador
assembler_class = VectorAssembler(inputCols=[f"{col}_imputed" for col in num_features] + [f"{col}_encoded" for col in cat_features], outputCol="features")
feature_vector_size = len(num_features) + len(cat_features)
layers = [feature_vector_size, 5, 5, 5]  # Ejemplo: Entrada -> 3 capas ocultas -> Salida

mlp = MultilayerPerceptronClassifier(featuresCol="features", labelCol="cut_indexed", layers=layers)

# Definir el pipeline de clasificación
pipeline_mlp = Pipeline(stages=[imputer] + indexers + encoders + [assembler_class, mlp])
model_mlp = pipeline_mlp.fit(df_spark)
predictions_mlp = model_mlp.transform(df_spark)

# Evaluación Clasificación
evaluator_class = MulticlassClassificationEvaluator(labelCol="cut_indexed", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_class.evaluate(predictions_mlp)
print(f"Precisión del modelo de clasificación MLP: {accuracy}")


In [None]:

# ----- Parte 4: GRIDSEARCH Y CROSSVALIDATION ----- 

# GridSearch para el modelo de regresión
paramGrid_reg = ParamGridBuilder() \
    .addGrid(regressor.numTrees, [10, 20, 50]) \
    .addGrid(regressor.maxDepth, [5, 10, 15]) \
    .build()

crossval_reg = CrossValidator(estimator=pipeline_reg, 
                              estimatorParamMaps=paramGrid_reg, 
                              evaluator=evaluator_reg, 
                              numFolds=3)

cv_model_reg = crossval_reg.fit(df_spark)
best_model_reg = cv_model_reg.bestModel
print("GridSearch completado para el modelo de regresión. Mejor modelo obtenido.")

# GridSearch para el modelo de clasificación
paramGrid_mlp = ParamGridBuilder() \
    .addGrid(mlp.maxIter, [50, 100, 200]) \
    .addGrid(mlp.layers, [[feature_vector_size, 5, 5, 5], [feature_vector_size, 10, 10, 5]]) \
    .build()

crossval_mlp = CrossValidator(estimator=pipeline_mlp, 
                               estimatorParamMaps=paramGrid_mlp, 
                               evaluator=evaluator_class, 
                               numFolds=3)

cv_model_mlp = crossval_mlp.fit(df_spark)
best_model_mlp = cv_model_mlp.bestModel
print("GridSearch y CrossValidation completados para el modelo de clasificación. Mejor modelo obtenido.")
