In [1]:
import seaborn as sns
import pandas as pd
import urllib
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType, NumericType
from pyspark.ml.feature import Imputer,  StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.sql.functions import col, sum

from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.classification import MultilayerPerceptronClassifier


from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator


from pyspark.ml import Pipeline

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pprint import pprint

In [2]:
spark = SparkSession.builder.appName("regresion_diamonds").getOrCreate()

In [3]:
schema = StructType([
    StructField("carat", DoubleType(), True),
    StructField("cut", StringType(), True),
    StructField("color", StringType(), True),
    StructField("clarity", StringType(), True),
    StructField("depth", DoubleType(), True),
    StructField("table", DoubleType(), True),
    StructField("price", IntegerType(), True),
    StructField("x", DoubleType(), True),
    StructField("y", DoubleType(), True),
    StructField("z", DoubleType(), True)
])

url = "https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/diamonds.csv"
local_path = "/tmp/diamonds.csv"

urllib.request.urlretrieve(url, local_path)

df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load(local_path)


df.show(5)


+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows



In [4]:
df.printSchema()

root
 |-- carat: double (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: double (nullable = true)
 |-- table: double (nullable = true)
 |-- price: integer (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



## prediccion price

In [5]:

label_col = 'price'
numerical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
categorical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]


In [6]:
# variables numericas imputadas con media
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='median'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['carat_imputed', 'depth_imputed', 'table_imputed', 'price_imputed', 'x_imputed', 'y_imputed', 'z_imputed']


In [7]:

assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = MinMaxScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)


In [8]:
# variables categoricas
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['cut_indexed', 'color_indexed', 'clarity_indexed']


In [9]:
imputer_categorical = Imputer(
    inputCols=categorical_cols_indexed,
    outputCols=[c + '_imputed' for c in categorical_cols_indexed],
    strategy='mode'
)
categorical_cols_indexed_imputed = [c + '_imputed' for c in categorical_cols_indexed]
print(categorical_cols_indexed_imputed)

['cut_indexed_imputed', 'color_indexed_imputed', 'clarity_indexed_imputed']


In [10]:
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed_imputed
]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed_imputed]
print(categorical_cols_onehot)

['cut_indexed_imputed_onehot', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [11]:
#todo df
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'cut_indexed_imputed_onehot', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [12]:

assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

In [13]:

#modelo
regresor_decisiontree = DecisionTreeRegressor(seed=42, labelCol=label_col)

In [14]:
# particionamiento de datos
df_train, df_test = df.randomSplit([0.8, 0.2], seed=42)

In [15]:


pipeline = Pipeline(stages = [
    
    *indexers_features,
    imputer_categorical,
    *encoders_onehot,
    imputer_numerical,
    assembler_numerical,
    scaler,
    assembler_all,
    regresor_decisiontree
])

In [16]:
#entrenamos y predecimos
pipeline_model = pipeline.fit(df_train)
df_pred_price = pipeline_model.transform(df_test)

In [17]:
#evaluamos

evaluator_r2 = RegressionEvaluator(metricName='r2', labelCol=label_col)
evaluator_mae = RegressionEvaluator(metricName='mae', labelCol=label_col)
evaluator_mse = RegressionEvaluator(metricName='mse', labelCol=label_col)
evaluator_rmse = RegressionEvaluator(metricName='rmse', labelCol=label_col)


r2_ = evaluator_r2.evaluate(df_pred_price)
mae_ = evaluator_mae.evaluate(df_pred_price)
mse_= evaluator_mse.evaluate(df_pred_price)
rmse_= evaluator_rmse.evaluate(df_pred_price)

metrics = {
    "modelo": ["DecisionTreeRegressor"], 
    "r2": [r2_],
    "mae": [mae_],
    "mse": [mse_],
    "rmse": [rmse_]
}

# Convertir el diccionario a un DataFrame de pandas
df_price = pd.DataFrame(metrics)
df_price




Unnamed: 0,modelo,r2,mae,mse,rmse
0,DecisionTreeRegressor,0.994727,168.17409,85786.051188,292.892559


## Clasificacion de cut con  MultiLayerPerceptronClassifier

In [18]:
df.show(5)

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows



In [19]:

df = df.dropna(subset=['cut'])

# contar nulos
df.select([sum(col(c).isNull().cast('int')).alias(c) for c in df.columns]).show()

+-----+---+-----+-------+-----+-----+-----+---+---+---+
|carat|cut|color|clarity|depth|table|price|  x|  y|  z|
+-----+---+-----+-------+-----+-----+-----+---+---+---+
|    0|  0|    0|      0|    0|    0|    0|  0|  0|  0|
+-----+---+-----+-------+-----+-----+-----+---+---+---+



In [20]:
# Preprocesados
label_col = 'cut'
numerical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
categorical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType) and field.name != label_col]


In [21]:


indexer_label = StringIndexer(
    inputCol=label_col,
    outputCol='label',
    handleInvalid='keep'
)

In [22]:
#categoricas indexadas
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['color_indexed', 'clarity_indexed']


In [23]:
#categoricad imputadas con moda
imputer_categorical = Imputer(
    inputCols=categorical_cols_indexed,
    outputCols=[c + '_imputed' for c in categorical_cols_indexed],
    strategy='mode'
)
categorical_cols_indexed_imputed = [c + '_imputed' for c in categorical_cols_indexed]
print(categorical_cols_indexed_imputed)

['color_indexed_imputed', 'clarity_indexed_imputed']


In [24]:
# one hot encoders para las categóricas indexadas imputadas
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed_imputed
]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed_imputed]
print(categorical_cols_onehot)

['color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [25]:
#variables numericas imputadas con media
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='median'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['carat_imputed', 'depth_imputed', 'table_imputed', 'price_imputed', 'x_imputed', 'y_imputed', 'z_imputed']


In [26]:
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = MinMaxScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)

In [27]:
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [28]:
#unimos tods las features sin cut
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

In [29]:
#pìpeline
pipeline_cut = Pipeline(stages = [

    indexer_label,
    *indexers_features,
    imputer_categorical,
    *encoders_onehot,
    imputer_numerical,
    assembler_numerical,
    scaler,
    assembler_all
])

In [30]:
#split
df_train, df_test = df.randomSplit([0.8, 0.2], seed=42)

In [31]:
_with_features = pipeline_cut.fit(df_train)
df_train_transformed = _with_features.transform(df_train)

In [32]:
#parametrol del mlp
num_features = df_train_transformed.first()['features'].size
num_classes = df_train_transformed.select("label").distinct().count()

layers = [num_features, 5, 4, num_classes] #dos capas oculñtas de 5 y 4 neuronas

print(f"neuronas entrada: {num_features}, neuronas salida: {num_classes}, neuronas capas ocultas: {layers[1:-1]}")

neuronas entrada: 20, neuronas salida: 5, neuronas capas ocultas: [5, 4]


In [33]:
#modelo mlp


mlp = MultilayerPerceptronClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=100,
    layers=layers,
    blockSize=128,
    seed=1234
)

In [34]:
#pipeline completo
pipeline_cut_fin = Pipeline(stages=[pipeline_cut] + [mlp])

In [35]:
#entrenar y predecir
pipeline_model_fin_cut = pipeline_cut_fin.fit(df_train)

df_pred = pipeline_model_fin_cut.transform(df_test)

In [36]:
#evaluamos
evaluator_accuracy = MulticlassClassificationEvaluator(metricName='accuracy')
evaluator_f1 = MulticlassClassificationEvaluator(metricName='f1')
evaluator_precision = MulticlassClassificationEvaluator(metricName='weightedPrecision')
evaluator_recall = MulticlassClassificationEvaluator(metricName='weightedRecall')


print('accuracy', evaluator_accuracy.evaluate(df_pred))
print('f1', evaluator_f1.evaluate(df_pred))
print('precision', evaluator_precision.evaluate(df_pred))
print('recall', evaluator_recall.evaluate(df_pred))

accuracy 0.5538362346872985
f1 0.4676510998719835
precision 0.4762104055664424
recall 0.5538362346872986


## GridSearch con CV sobre clasificación de cut

In [37]:


# hiperaámetros a probar
paramGrid = (ParamGridBuilder()
             .addGrid(mlp.maxIter, [100, 200])  
             .addGrid(mlp.layers, [
                 [num_features, 5, 4, num_classes], 
                 [num_features, 8, 6, num_classes]   
             ])
             .build())

evaluator = MulticlassClassificationEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="accuracy"
)

crossval = CrossValidator(
    estimator=pipeline_cut_fin,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=6,
    seed=42
)



In [38]:
#entrenamiento y predicción
cvModel = crossval.fit(df_train)

df_pred = cvModel.transform(df_test)
print("Accuracy:", evaluator.evaluate(df_pred))


Accuracy: 0.688588007736944


In [39]:
#mejor modelo

best_model = cvModel.bestModel.stages[-1]
param_map = best_model.extractParamMap()

param_dict = {str(param): value for param, value in param_map.items()}

param_dict

{'MultilayerPerceptronClassifier_8cdcbc1003c3__blockSize': 128,
 'MultilayerPerceptronClassifier_8cdcbc1003c3__featuresCol': 'features',
 'MultilayerPerceptronClassifier_8cdcbc1003c3__labelCol': 'label',
 'MultilayerPerceptronClassifier_8cdcbc1003c3__maxIter': 200,
 'MultilayerPerceptronClassifier_8cdcbc1003c3__predictionCol': 'prediction',
 'MultilayerPerceptronClassifier_8cdcbc1003c3__probabilityCol': 'probability',
 'MultilayerPerceptronClassifier_8cdcbc1003c3__rawPredictionCol': 'rawPrediction',
 'MultilayerPerceptronClassifier_8cdcbc1003c3__seed': 1234,
 'MultilayerPerceptronClassifier_8cdcbc1003c3__solver': 'l-bfgs',
 'MultilayerPerceptronClassifier_8cdcbc1003c3__stepSize': 0.03,
 'MultilayerPerceptronClassifier_8cdcbc1003c3__tol': 1e-06,
 'MultilayerPerceptronClassifier_8cdcbc1003c3__layers': [20, 8, 6, 5]}