In [1]:
import seaborn as sns
import pandas as pd
import requests

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, StringType, IntegerType, NumericType
from pyspark.sql.functions import col, sum, when
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Imputer, StandardScaler, MinMaxScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor, DecisionTreeRegressor
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [2]:
spark = SparkSession.builder.appName('proyecto_m6').getOrCreate()

# Carga de datos y schema

In [3]:
url = 'https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/diamonds.csv'
csv_path = 'diamonds.csv'
with open(csv_path, 'wb') as file:
    file.write(requests.get(url).content)

In [4]:
schema = StructType([
    StructField('carat', FloatType(), True),
    StructField('cut', StringType(), True),
    StructField('color', StringType(), True),
    StructField('clarity', StringType(), True),
    StructField('depth', FloatType(), True),
    StructField('table', FloatType(), True),
    StructField('price', IntegerType(), True),
    StructField('x', FloatType(), True),
    StructField('y', FloatType(), True),
    StructField('z', FloatType(), True)
])

In [5]:
df = spark.read.csv(csv_path, header=True, inferSchema=False, schema=schema)
df.show(5)

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows



In [6]:
# Compruebo nulos
df.select([sum(col(c).isNull().cast('int')).alias(c) for c in df.columns]).show()

+-----+---+-----+-------+-----+-----+-----+---+---+---+
|carat|cut|color|clarity|depth|table|price|  x|  y|  z|
+-----+---+-----+-------+-----+-----+-----+---+---+---+
|    0|  0|    0|      0|    0|    0|    0|  0|  0|  0|
+-----+---+-----+-------+-----+-----+-----+---+---+---+



In [7]:
df.describe().toPandas()

Unnamed: 0,summary,carat,cut,color,clarity,depth,table,price,x,y,z
0,count,53940.0,53940,53940,53940,53940.0,53940.0,53940.0,53940.0,53940.0,53940.0
1,mean,0.7979397459442544,,,,61.749404890324215,57.45718390839959,3932.799721913237,5.731157212872659,5.734525955793015,3.5387337920972493
2,stddev,0.474011242836904,,,,1.432621320665403,2.2344905638396657,3989.439738146397,1.1217607437465076,1.1421346736743894,0.705698843275196
3,min,0.2,Fair,D,I1,43.0,43.0,326.0,0.0,0.0,0.0
4,max,5.01,Very Good,J,VVS2,79.0,95.0,18823.0,10.74,58.9,31.8


En las columnas x, y, z, considero los valores 0 como nulos y los reemplazo por None para aplicar imputer a esas columnas.

In [8]:
df = df.withColumn('x', when(df['x'] == 0, None).otherwise(df['x'])) \
.withColumn('y', when(df['y'] == 0, None).otherwise(df['y'])) \
.withColumn('z', when(df['z'] == 0, None).otherwise(df['z']))

df.select([sum(col(c).isNull().cast('int')).alias(c) for c in df.columns]).show()

+-----+---+-----+-------+-----+-----+-----+---+---+---+
|carat|cut|color|clarity|depth|table|price|  x|  y|  z|
+-----+---+-----+-------+-----+-----+-----+---+---+---+
|    0|  0|    0|      0|    0|    0|    0|  8|  7| 20|
+-----+---+-----+-------+-----+-----+-----+---+---+---+



# Regresión

In [9]:
# DF para regresión del precio.
# Renombro la columna price por label.
df1 = df.withColumnRenamed('price', 'label')
df1.show(1)

+-----+-----+-----+-------+-----+-----+-----+----+----+----+
|carat|  cut|color|clarity|depth|table|label|   x|   y|   z|
+-----+-----+-----+-------+-----+-----+-----+----+----+----+
| 0.23|Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
+-----+-----+-----+-------+-----+-----+-----+----+----+----+
only showing top 1 row



* Preprocesados.

In [10]:
numerical_cols = [field.name for field in df1.schema.fields if isinstance(field.dataType, NumericType) and field.name != 'label']
categorical_cols = [field.name for field in df1.schema.fields if isinstance(field.dataType, StringType)]
print(numerical_cols)
print(categorical_cols)

['carat', 'depth', 'table', 'x', 'y', 'z']
['cut', 'color', 'clarity']


* Indexación de columnas categóricas con StringIndexer.

In [11]:
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['cut_indexed', 'color_indexed', 'clarity_indexed']


* Codificación de las columnas categóricas ya indexadas con OneHotEncoder.

In [12]:
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed
    ]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed]
print(categorical_cols_onehot)

['cut_indexed_onehot', 'color_indexed_onehot', 'clarity_indexed_onehot']


* Imputación de valores nulos.

In [13]:
# Imputo la media a los nulos de las columnas numéricas, a las categóricas no es necesario ya que no tienen NaN's.
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='mean'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['carat_imputed', 'depth_imputed', 'table_imputed', 'x_imputed', 'y_imputed', 'z_imputed']


* Normalización de datos con StandarScaler.

In [14]:
# Aplico el scaler a las columnas numéricas.
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = StandardScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)

In [15]:
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'cut_indexed_onehot', 'color_indexed_onehot', 'clarity_indexed_onehot']


* Ensamblar las columnas numéricas y categóricas para obtener features.

In [16]:
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

* Bucle para probar varios modelos de regresión con particionamiento.

In [17]:
df_train1, df_test1 = df1.randomSplit([0.8, 0.2], seed=42)
regressors = [LinearRegression(), DecisionTreeRegressor(), RandomForestRegressor(), GBTRegressor()]

for regressor in regressors:
    pipeline_reg = Pipeline(stages=[
        *indexers_features,
        *encoders_onehot,
        imputer_numerical,
        assembler_numerical,
        scaler,
        assembler_all,
        regressor
    ])
    
    pipeline_regresion = pipeline_reg.fit(df_train1)
    df1_pred = pipeline_regresion.transform(df_test1)
    
    evaluator = RegressionEvaluator(metricName="r2")
    r2 = evaluator.evaluate(df1_pred)
    
    print(f"R2 con {regressor.__class__.__name__}: {r2}")


R2 con LinearRegression: 0.9225571268177755
R2 con DecisionTreeRegressor: 0.9049996572272508
R2 con RandomForestRegressor: 0.9114267936861328
R2 con GBTRegressor: 0.9639392749178024


# ParamGridBuilder y CrossValidator

In [18]:
best_regressor= GBTRegressor()

pipeline_cross1 = Pipeline(stages=[
    *indexers_features,
    *encoders_onehot,
    imputer_numerical,
    assembler_numerical,
    scaler,
    assembler_all,
    best_regressor
])

paramGrid1 = (
    ParamGridBuilder()
    .addGrid(best_regressor.maxIter, [10, 20]) # Solo pruebo con 2 parámetros para reducir tiempo de ejecución.
    .addGrid(best_regressor.maxDepth, [5, 10])
    .build()
)

In [19]:
# Tarda 2 minutos
crossval1 = CrossValidator(
    estimator=pipeline_cross1,
    estimatorParamMaps= paramGrid1,
    evaluator= RegressionEvaluator(metricName='r2'),
    parallelism=4,
    seed=42  
)
cross1_model= crossval1.fit(df_train1)
best_model = cross1_model.bestModel
best_gbt = best_model.stages[-1]
print(best_gbt)
print(best_gbt.getNumTrees)
print(best_gbt.getOrDefault('maxDepth'))

GBTRegressionModel: uid=GBTRegressor_d884df60507b, numTrees=20, numFeatures=26
20
10


* Modelo elegido para regresión.

In [20]:
regressor_final= GBTRegressor(maxIter=20, maxDepth=10, seed=42)

pipeline_GBT = Pipeline(stages = [
    *indexers_features,
    *encoders_onehot,
    imputer_numerical,
    assembler_numerical,
    scaler,
    assembler_all,
    regressor_final
])

pipeline_reg_final = pipeline_GBT.fit(df_train1)
pred_GBT = pipeline_reg_final.transform(df_test1)

evaluator_r2 = RegressionEvaluator(metricName='r2')
evaluator_mae = RegressionEvaluator(metricName='mae')
evaluator_mse = RegressionEvaluator(metricName='mse')
evaluator_rmse = RegressionEvaluator(metricName='rmse')

print('r2', evaluator_r2.evaluate(pred_GBT))
print('mae', evaluator_mae.evaluate(pred_GBT))
print('mse', evaluator_mse.evaluate(pred_GBT))
print('rmse', evaluator_rmse.evaluate(pred_GBT))

r2 0.9741729088799557
mae 335.22433062772063
mse 420214.8768759756
rmse 648.2398297512855


# Clasificación

In [21]:
# DF para clasificación del corte
df2 = df.select("*")
df2.show(1)

+-----+-----+-----+-------+-----+-----+-----+----+----+----+
|carat|  cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-----+-----+-------+-----+-----+-----+----+----+----+
| 0.23|Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
+-----+-----+-----+-------+-----+-----+-----+----+----+----+
only showing top 1 row



* Preprocesados.

In [22]:
numerical_cols = [field.name for field in df2.schema.fields if isinstance(field.dataType, NumericType)]
categorical_cols = [field.name for field in df2.schema.fields if isinstance(field.dataType, StringType) and field.name != 'cut']
label_col = 'cut'
print(numerical_cols)
print(categorical_cols)

['carat', 'depth', 'table', 'price', 'x', 'y', 'z']
['color', 'clarity']


* Indexación de la columna a predecir y de las columnas categóricas con StringIndexer.

In [23]:
indexer_label = StringIndexer(
    inputCol=label_col,
    outputCol='label',
    handleInvalid='keep'
)

In [24]:
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['color_indexed', 'clarity_indexed']


* Codificación de las columnas categóricas ya indexadas con OneHotEncoder.

In [25]:
# He comprobado que las columnas categóricas no contienen valores nulos, no es necesario hacer imputer.
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed
    ]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed]
print(categorical_cols_onehot)

['color_indexed_onehot', 'clarity_indexed_onehot']


* Imputación de valores nulos de las columnas numéricas.

In [26]:
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='mean'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['carat_imputed', 'depth_imputed', 'table_imputed', 'price_imputed', 'x_imputed', 'y_imputed', 'z_imputed']


* Escalado de datos con MinMaxScaler.

In [27]:
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = MinMaxScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'color_indexed_onehot', 'clarity_indexed_onehot']


* Ensamblar las columnas numéricas y categóricas para obtener features.

In [28]:
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

In [29]:
df_train2, df_test2 = df2.randomSplit([0.8, 0.2], seed=42)
classifiers = [LogisticRegression(), DecisionTreeClassifier(), RandomForestClassifier()]

for classifier in classifiers:
    pipeline_clas = Pipeline(stages=[
        indexer_label,
        *indexers_features,
        *encoders_onehot,
        imputer_numerical,
        assembler_numerical,
        scaler,
        assembler_all,
        classifier])
        
    model = pipeline_clas.fit(df_train2)
    df2_pred = model.transform(df_test2)
    
    evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
    accuracy = evaluator.evaluate(df2_pred)
    
    print(f"Accuracy con {classifier.__class__.__name__}: {accuracy}")

Accuracy con LogisticRegression: 0.6481532651745417
Accuracy con DecisionTreeClassifier: 0.7046145344017685
Accuracy con RandomForestClassifier: 0.6927327991157778


# ParamGridBuilder y CrossValidator

In [30]:
best_classifier = DecisionTreeClassifier()

pipeline_cross2 = Pipeline(stages=[
    indexer_label,
    *indexers_features,
    *encoders_onehot,
    imputer_numerical,
    assembler_numerical,
    scaler,
    assembler_all,
    best_classifier])

paramGrid2 = (
    ParamGridBuilder()
    .addGrid(best_classifier.maxDepth,[10, 20])
    .build()
)


In [31]:
crossval2 = CrossValidator(estimator=pipeline_cross2,
                          estimatorParamMaps=paramGrid2,
                          evaluator=MulticlassClassificationEvaluator(metricName='accuracy'),
                          parallelism= 4,
                          seed= 4)

cv_model = crossval2.fit(df_train2)
best_model_class = cv_model.bestModel
best_tree = best_model_class.stages[-1]
print(best_tree)
print(best_tree.getMaxDepth())

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_c89ec50a4457, depth=10, numNodes=865, numClasses=6, numFeatures=22
10


* Modelo elegido para clasificación.

In [32]:
classifier_final = DecisionTreeClassifier(maxDepth=10)

pipeline_tree = Pipeline(stages=[
    indexer_label,
    *indexers_features,
    *encoders_onehot,
    imputer_numerical,
    assembler_numerical,
    scaler,
    assembler_all,
    classifier_final])

pipeline_cl_final = pipeline_tree.fit(df_train2)
pred_tree = pipeline_cl_final.transform(df_test2)

evaluator_accuracy = MulticlassClassificationEvaluator(metricName='accuracy')
evaluator_f1 = MulticlassClassificationEvaluator(metricName='f1')
evaluator_precision = MulticlassClassificationEvaluator(metricName='weightedPrecision')
evaluator_recall = MulticlassClassificationEvaluator(metricName='weightedRecall')

print('accuracy', evaluator_accuracy.evaluate(pred_tree))
print('f1', evaluator_f1.evaluate(pred_tree))
print('precision', evaluator_precision.evaluate(pred_tree))
print('recall', evaluator_recall.evaluate(pred_tree))

accuracy 0.721101593442019
f1 0.7005178358023688
precision 0.7095851859626563
recall 0.721101593442019


* MultiLayerPerceptronClassifier

In [33]:
df2_pred.select("features").first()[0].size

22

In [34]:
classifier_MPC = MultilayerPerceptronClassifier(layers=[22,10,5], maxIter=1000, seed=42)

pipeline_MPC = Pipeline(stages=[
    indexer_label,
    *indexers_features,
    *encoders_onehot,
    imputer_numerical,
    assembler_numerical,
    scaler,
    assembler_all,
    classifier_MPC
])

pipeline_MPC_final= pipeline_MPC.fit(df_train2)
pred_MPC = pipeline_MPC_final.transform(df_test2)

print('accuracy', evaluator_accuracy.evaluate(pred_MPC))
print('f1', evaluator_f1.evaluate(pred_MPC))
print('precision', evaluator_precision.evaluate(pred_MPC))
print('recall', evaluator_recall.evaluate(pred_MPC))

accuracy 0.7445887445887446
f1 0.7389614986861559
precision 0.7380184703437936
recall 0.7445887445887446


Conclusiones
* El modelo de regresión obtiene un 0.97 de precisión, explica bien la variación de precios de los diamantes.
* En la clasificación no he conseguido un modelo con buena precisión, obtiene un 0.71 de exactitud al clasificar los 5 tipos de cortes de diamantes. Si se ajusta el desbalanceo entre las clases podría mejorar.
* La red neuronal supera ligeramente a los otros modelos de clasificación, ajustando los hiperparámetros mejoraría pero no lo suficiente como para que compense la exigencia de recursos.