# MODULO 6

In [85]:
import seaborn as sns
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pipeline_penguins').getOrCreate()

In [86]:
import requests
from pyspark.sql.types import StructType, StructField, FloatType, StringType, IntegerType


url = 'https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/diamonds.csv'
csv_path = 'diamonds.csv'

with open(csv_path, 'wb') as file:
    file.write(requests.get(url).content)
    
schema = StructType([
    StructField('carat', FloatType(), True),
    StructField('cut', StringType(), True),
    StructField('color', StringType(), True),
    StructField('clarity', StringType(), True),
    StructField('depth', FloatType(), True),
    StructField('table', IntegerType(), True),
    StructField('price', IntegerType(), True),
    StructField('x', FloatType(), True),
    StructField('y', FloatType(), True),
    StructField('z', FloatType(), True)
])

df = spark.read.csv(csv_path, header=True, inferSchema=False, schema=schema)
df.show(5)
df.printSchema()

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|  Ideal|    E|    SI2| 61.5|   55|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8|   61|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9|   65|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4|   58|  334| 4.2|4.23|2.63|
| 0.31|   Good|    J|    SI2| 63.3|   58|  335|4.34|4.35|2.75|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows

root
 |-- carat: float (nullable = true)
 |-- cut: string (nullable = true)
 |-- color: string (nullable = true)
 |-- clarity: string (nullable = true)
 |-- depth: float (nullable = true)
 |-- table: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- x: float (nullable = true)
 |-- y: float (nullable = true)
 |-- z: float (nullable = true)



In [87]:
from pyspark.sql.functions import col, sum 

# Como vamos a predecir island borramos filas donde island sea nan:
df_regresion = df.dropna(subset=['price'])
df_clasificacion = df.dropna(subset=['cut'])

# contar nulos en todas las columnas: equivalente a pandas df.isna().sum()
df_regresion.select([sum(col(c).isNull().cast('int')).alias(c) for c in df_regresion.columns]).show()
df_clasificacion.select([sum(col(c).isNull().cast('int')).alias(c) for c in df_clasificacion.columns]).show()

+-----+---+-----+-------+-----+-----+-----+---+---+---+
|carat|cut|color|clarity|depth|table|price|  x|  y|  z|
+-----+---+-----+-------+-----+-----+-----+---+---+---+
|    0|  0|    0|      0|    0|  924|    0|  0|  0|  0|
+-----+---+-----+-------+-----+-----+-----+---+---+---+

+-----+---+-----+-------+-----+-----+-----+---+---+---+
|carat|cut|color|clarity|depth|table|price|  x|  y|  z|
+-----+---+-----+-------+-----+-----+-----+---+---+---+
|    0|  0|    0|      0|    0|  924|    0|  0|  0|  0|
+-----+---+-----+-------+-----+-----+-----+---+---+---+



## CLASIFICACION

In [88]:
from pyspark.sql.types import NumericType, StringType

# seleccionar los nombres de las columnas a las que aplicar Preprocesados
numerical_cols = [field.name for field in df_clasificacion.schema.fields if isinstance(field.dataType, NumericType)]
categorical_cols = [field.name for field in df_clasificacion.schema.fields if isinstance(field.dataType, StringType) and field.name != 'cut']
label_col = 'cut'

In [89]:
# Indexer para 'cut' la columna a predecir
from pyspark.ml.feature import StringIndexer, Imputer, OneHotEncoder, VectorAssembler

indexer_label = StringIndexer(
    inputCol=label_col,
    outputCol='label',
    handleInvalid='keep'
)
df_indexed = indexer_label.fit(df).transform(df)

num_classes = df_indexed.select("label").distinct().count()
print("Número de clases:", num_classes)

Número de clases: 5


In [90]:
# Indexers para las features de la entrada que no son la columna label a predecir
# crea un objeto StringIndexer por cada columna categórica a indexar
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['color_indexed', 'clarity_indexed']


In [91]:
# Imputer con la moda para las columnas categóricas indexadas
imputer_categorical = Imputer(
    inputCols=categorical_cols_indexed,
    outputCols=[c + '_imputed' for c in categorical_cols_indexed],
    strategy='mode'
)
categorical_cols_indexed_imputed = [c + '_imputed' for c in categorical_cols_indexed]
print(categorical_cols_indexed_imputed)

['color_indexed_imputed', 'clarity_indexed_imputed']


In [92]:
# one hot encoders para las categóricas indexadas imputadas
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed_imputed
]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed_imputed]
print(categorical_cols_onehot)

['color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [93]:
# Imputer con la mediana para la columnas numéricas
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='median'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['carat_imputed', 'depth_imputed', 'table_imputed', 'price_imputed', 'x_imputed', 'y_imputed', 'z_imputed']


In [94]:
from pyspark.ml.feature import StandardScaler

# Escalar numéricas con StandardScaler
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = StandardScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)

In [95]:
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [96]:
# Ensamblar todo: numéricas + categóricas y obtener features
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

In [97]:
# particionamiento de datos
df_clasificacion_train, df_clasificacion_test = df_clasificacion.randomSplit([0.8, 0.2], seed=42)

In [98]:
df_clasificacion_train.show(5)

+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|  0.2|  Ideal|    D|    VS2| 61.5|   57|  367|3.81|3.77|2.33|
|  0.2|  Ideal|    E|    VS2| 59.7|   55|  367|3.86|3.84| 2.3|
|  0.2|Premium|    D|    VS2| 61.7|   60|  367|3.77|3.72|2.31|
|  0.2|Premium|    D|    VS2| 62.3|   60|  367|3.73|3.68|2.31|
|  0.2|Premium|    E|    SI2| 60.2|   62|  345|3.79|3.75|2.27|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
only showing top 5 rows



In [99]:
all_columns # Asegúrate de que estas sean las columnas usadas en 'features'

['numeric_features_scaled',
 'color_indexed_imputed_onehot',
 'clarity_indexed_imputed_onehot']

Uso LogisticRegression porque me es imposible hacer funcionar dentro del pipeline el MultilayerPerceptronClassifier

In [None]:
from pyspark.ml.classification import LogisticRegression

classifier = LogisticRegression()

In [101]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [
    # 1. Indexer para columna categórica 'cut' StringIndexer porque es la columna a predecir
    indexer_label,
    # 2. Indexers para columnas categóricas: 'color', 'clarity' 
    *indexers_features, # ponemos * porque es una lista de objetos
    # 3. Imputer para categóricas
    imputer_categorical,
    # 4. One Hot Encoders para categóricas
    *encoders_onehot, # ponemos * porque es una lista de objetos
    # 5. Imputer para numéricas
    imputer_numerical,
    # 6. Ensamblar numéricas + escalado
    assembler_numerical,
    scaler,
    # 7. Ensamblar numéricas escaladas + categóricas en una sola columna 'features'
    assembler_all,
    # 8. modelo de clasificación
    classifier
])

In [102]:
pipeline_model = pipeline.fit(df_clasificacion_train)
df_pred = pipeline_model.transform(df_clasificacion_test)


In [103]:
df_pred.show(5)

+-----+-------+-----+-------+-----+-----+-----+----+----+----+-----+-------------+---------------+---------------------+-----------------------+----------------------------+------------------------------+-------------+-------------+-------------+-------------+---------+---------+---------+--------------------+-----------------------+--------------------+--------------------+--------------------+----------+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|label|color_indexed|clarity_indexed|color_indexed_imputed|clarity_indexed_imputed|color_indexed_imputed_onehot|clarity_indexed_imputed_onehot|carat_imputed|depth_imputed|table_imputed|price_imputed|x_imputed|y_imputed|z_imputed|    numeric_features|numeric_features_scaled|            features|       rawPrediction|         probability|prediction|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+-----+-------------+---------------+---------------------+-----------------------+----------------------------+-------

In [104]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_accuracy = MulticlassClassificationEvaluator(metricName='accuracy')
evaluator_f1 = MulticlassClassificationEvaluator(metricName='f1')
evaluator_precision = MulticlassClassificationEvaluator(metricName='weightedPrecision')
evaluator_recall = MulticlassClassificationEvaluator(metricName='weightedRecall')

In [105]:
print('accuracy', evaluator_accuracy.evaluate(df_pred))
print('f1', evaluator_f1.evaluate(df_pred))
print('precision', evaluator_precision.evaluate(df_pred))
print('recall', evaluator_recall.evaluate(df_pred))

accuracy 0.6474164133738601
f1 0.6213113536919702
precision 0.635484395370381
recall 0.6474164133738601


## REGRESION

In [106]:
from pyspark.sql.types import NumericType, StringType

# seleccionar los nombres de las columnas a las que aplicar Preprocesados
numerical_cols = [field.name for field in df_regresion.schema.fields if isinstance(field.dataType, NumericType) and field.name != 'price']
categorical_cols = [field.name for field in df_regresion.schema.fields if isinstance(field.dataType, StringType)]
label_col = 'price'

In [107]:
from pyspark.ml.feature import StringIndexer, Imputer, OneHotEncoder, VectorAssembler

df_regresion = df_regresion.withColumnRenamed('price', 'label')

In [108]:
# Indexers para las features de la entrada que no son la columna label a predecir
# crea un objeto StringIndexer por cada columna categórica a indexar
indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['cut_indexed', 'color_indexed', 'clarity_indexed']


In [109]:
# Imputer con la moda para las columnas categóricas indexadas
imputer_categorical = Imputer(
    inputCols=categorical_cols_indexed,
    outputCols=[c + '_imputed' for c in categorical_cols_indexed],
    strategy='mode'
)
categorical_cols_indexed_imputed = [c + '_imputed' for c in categorical_cols_indexed]
print(categorical_cols_indexed_imputed)

['cut_indexed_imputed', 'color_indexed_imputed', 'clarity_indexed_imputed']


In [110]:
# one hot encoders para las categóricas indexadas imputadas
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed_imputed
]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed_imputed]
print(categorical_cols_onehot)

['cut_indexed_imputed_onehot', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [111]:
# Imputer con la mediana para la columnas numéricas
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='median'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['carat_imputed', 'depth_imputed', 'table_imputed', 'x_imputed', 'y_imputed', 'z_imputed']


In [112]:
from pyspark.ml.feature import StandardScaler

# Escalar numéricas con StandardScaler
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = StandardScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)

In [113]:
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'cut_indexed_imputed_onehot', 'color_indexed_imputed_onehot', 'clarity_indexed_imputed_onehot']


In [114]:
# Ensamblar todo: numéricas + categóricas y obtener features
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

In [115]:
# particionamiento de datos
df_regresion_train, df_regresion_test = df_regresion.randomSplit([0.8, 0.2], seed=42)

In [116]:
from pyspark.ml.regression import RandomForestRegressor

regressor = RandomForestRegressor(seed=42)

In [117]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [
    *indexers_features, # ponemos * porque es una lista de objetos
    imputer_categorical,
    *encoders_onehot, # ponemos * porque es una lista de objetos
    imputer_numerical,
    assembler_numerical,
    scaler,
    assembler_all,
    regressor
])

In [118]:
pipeline_model = pipeline.fit(df_regresion_train)
df_pred = pipeline_model.transform(df_regresion_test)


In [119]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator_r2 = RegressionEvaluator(metricName='r2')
evaluator_mae = RegressionEvaluator(metricName='mae')
evaluator_mse = RegressionEvaluator(metricName='mse')
evaluator_rmse = RegressionEvaluator(metricName='rmse') 

In [120]:
print('r2', evaluator_r2.evaluate(df_pred))
print('mae', evaluator_mae.evaluate(df_pred))
print('mse', evaluator_mse.evaluate(df_pred))
print('rmse', evaluator_rmse.evaluate(df_pred))

r2 0.9068588220189352
mae 683.036519430222
mse 1515470.0782001573
rmse 1231.0443039144275


## Gridsearch y validación cruzada

Equivalente a GridSearchCV de Scikit Learn, busca los mejores parámetros de un modelo.

In [121]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (
    ParamGridBuilder()
    .addGrid(regressor.numTrees, [5, 10, 15, 20, 25, 30]) # por defecto es 20
    .addGrid(regressor.maxDepth, [3, 5, 10, 15]) # por defecto es 5 rango de [0, 30]
    .build()
)

In [123]:
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid, # Parámetros para grid search hyper parameter tuning
    evaluator=evaluator_r2,
    numFolds=3, # por defecto ya 3 folds
    parallelism=4,
    seed=42
)
cv_model = crossval.fit(df_regresion_train)
df_pred = cv_model.transform(df_regresion_test)

In [124]:
print('r2', evaluator_r2.evaluate(df_pred))
print('mae', evaluator_mae.evaluate(df_pred))
print('mse', evaluator_mse.evaluate(df_pred))
print('rmse', evaluator_rmse.evaluate(df_pred))

r2 0.966849930845084
mae 374.78833004987985
mse 539374.0876323732
rmse 734.4209199310524


In [126]:
best_model = cv_model.bestModel
best_rf = best_model.stages[-1] # accede a la última fase del pipeline que es el modelo classifier
print(best_rf.extractParamMap())
print(best_rf.getNumTrees)
print(best_rf.getOrDefault('maxDepth'))
print(best_rf.featureImportances)

{Param(parent='RandomForestRegressor_db8a367123f9', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestRegressor_db8a367123f9', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestRegressor_db8a367123f9', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestRegressor_db8a367123f9', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported o

## Exportar modelo

In [127]:
# pyspark crea un directorio donde guarda archivos y metadatos para reconstruir el modelo
pipeline_model.write().overwrite().save('pipeline_spark')

In [128]:
from pyspark.ml import PipelineModel

# cargar el modelo
loaded_pipeline = PipelineModel.load('pipeline_spark')