#### Uso de PipeLines

In [68]:
import seaborn as sns
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pipeline_penguins').getOrCreate()

In [69]:
import requests
from pyspark.sql.types import StructType, StructField, FloatType, StringType, IntegerType

url = 'https://raw.githubusercontent.com/mwaskom/seaborn-data/refs/heads/master/penguins.csv'
csv_path = 'penguins.csv'

with open(csv_path, 'wb') as file:
    file.write(requests.get(url).content)
    
schema = StructType([
    StructField('species', StringType(), True),
    StructField('island', StringType(), True),
    StructField('bill_length_mm', FloatType(), True),
    StructField('bill_depth_mm', FloatType(), True),
    StructField('flipper_length_mm', FloatType(), True),
    StructField('body_mass_g', IntegerType(), True),
    StructField('sex', StringType(), True),
])

df = spark.read.csv(csv_path, header=True, inferSchema=False, schema=schema)
df.show(3)
df.printSchema()

+-------+---------+--------------+-------------+-----------------+-----------+------+
|species|   island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+--------------+-------------+-----------------+-----------+------+
| Adelie|Torgersen|          39.1|         18.7|            181.0|       3750|  MALE|
| Adelie|Torgersen|          39.5|         17.4|            186.0|       3800|FEMALE|
| Adelie|Torgersen|          40.3|         18.0|            195.0|       3250|FEMALE|
+-------+---------+--------------+-------------+-----------------+-----------+------+
only showing top 3 rows

root
 |-- species: string (nullable = true)
 |-- island: string (nullable = true)
 |-- bill_length_mm: float (nullable = true)
 |-- bill_depth_mm: float (nullable = true)
 |-- flipper_length_mm: float (nullable = true)
 |-- body_mass_g: integer (nullable = true)
 |-- sex: string (nullable = true)



In [70]:
from pyspark.sql.functions import col, sum 

# Columna a predecir
# Borramos filas donde 'ìsland' sea nan
df = df.dropna(subset=['island'])

# contar nulos (equivalente a df.isna().sum() en Pandas)
df.select([sum(col(c).isNull().cast('int')).alias(c) for c in df.columns]).show()

+-------+------+--------------+-------------+-----------------+-----------+---+
|species|island|bill_length_mm|bill_depth_mm|flipper_length_mm|body_mass_g|sex|
+-------+------+--------------+-------------+-----------------+-----------+---+
|      0|     0|             2|            2|                2|          2| 11|
+-------+------+--------------+-------------+-----------------+-----------+---+



#### Preprocesados para hacer despues el pipeline

In [71]:
from pyspark.sql.types import NumericType, StringType

# Selecionamos las columnas a las que aplicar preprocesados
numerical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
# Filtramos island porque island es la variable a predecir y ya hemos asegurado que no tiene nan
categorical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType) and field.name != 'island']
label_col = 'island'

print(numerical_cols)
print(categorical_cols)

['bill_length_mm', 'bill_depth_mm', 'flipper_length_mm', 'body_mass_g']
['species', 'sex']


In [72]:
# Indexer para la columna 'island', que es la columna a predecir

from pyspark.ml.feature import StringIndexer, Imputer, OneHotEncoder, VectorAssembler

indexer_label = StringIndexer(
    inputCol=label_col,
    outputCol='label',
    handleInvalid='keep'
)

In [73]:
# Indexer para las features(columnas) de la entrada que no son la columna 'label' que es la que se va apredecir
# Al hacer el Indexer las transformamos a numericas (0,1,2,etc)
# crea un objeto StringIndexer por cada columna categórica a indexar

indexers_features = [
    StringIndexer(inputCol=c, outputCol=c + '_indexed', handleInvalid='keep') for c in categorical_cols
]
categorical_cols_indexed = [c + '_indexed' for c in categorical_cols]
print(categorical_cols_indexed)

['species_indexed', 'sex_indexed']


In [74]:
# Imputer con la moda a las columnas categoricas indexadas
# Al hacer el imputer rellenamos las columnas nuevas creadas

imputer_categorical = Imputer(
    inputCols=categorical_cols_indexed,
    outputCols=[c + '_imputed' for c in categorical_cols_indexed],
    strategy='mode'
)
categorical_cols_indexed_imputed = [c + '_imputed' for c in categorical_cols_indexed]
print(categorical_cols_indexed_imputed)

['species_indexed_imputed', 'sex_indexed_imputed']


In [75]:
# one hot encoders para las categóricas indexadas imputadas
encoders_onehot = [
    OneHotEncoder(inputCol=c, outputCol=c + '_onehot') 
    for c in categorical_cols_indexed_imputed
]
categorical_cols_onehot = [c + '_onehot' for c in categorical_cols_indexed_imputed]
print(categorical_cols_onehot)


['species_indexed_imputed_onehot', 'sex_indexed_imputed_onehot']


In [76]:
# Imputer con la mediana para la columnas numéricas
imputer_numerical = Imputer(
    inputCols=numerical_cols,
    outputCols=[c + '_imputed' for c in numerical_cols],
    strategy='median'
)
numerical_cols_imputed = [c + '_imputed' for c in numerical_cols]
print(numerical_cols_imputed)

['bill_length_mm_imputed', 'bill_depth_mm_imputed', 'flipper_length_mm_imputed', 'body_mass_g_imputed']


In [77]:
from pyspark.ml.feature import MinMaxScaler

# (Opcional) escalar numéricas con MinMaxScaler
assembler_numerical = VectorAssembler(
    inputCols=numerical_cols_imputed,
    outputCol='numeric_features'
)
scaler = MinMaxScaler(
    inputCol='numeric_features',
    outputCol='numeric_features_scaled'
)

In [78]:
all_columns = ['numeric_features_scaled'] + categorical_cols_onehot
print(all_columns)

['numeric_features_scaled', 'species_indexed_imputed_onehot', 'sex_indexed_imputed_onehot']


In [79]:
# Ensamblar todo: numéricas + categóricas y obtener features
assembler_all = VectorAssembler(
    inputCols=all_columns,
    outputCol='features'
)

#### Pipeline

In [80]:
from pyspark.ml.classification import RandomForestClassifier

classifier = RandomForestClassifier(seed=42)

In [81]:
# particionamiento de datos
df_train, df_test = df.randomSplit([0.8, 0.2], seed=42)

In [82]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [
    # 1. Indexer para columna categórica 'island' StringIndexer porque es la columna a predecir
    indexer_label,
    # 2. Indexers para columnas categóricas: 'species', 'sex' 
    *indexers_features, # ponemos * porque es una lista de objetos
    # 3. Imputer para categóricas
    imputer_categorical,
    # 4. One Hot Encoders para categóricas
    *encoders_onehot, # ponemos * porque es una lista de objetos
    # 5. Imputer para numéricas
    imputer_numerical,
    # 6. Ensamblar numéricas + escalado
    assembler_numerical,
    scaler,
    # 7. Ensamblar numéricas escaladas + categóricas en una sola columna 'features'
    assembler_all,
    # 8. modelo de clasificación
    classifier
])

In [83]:
pipeline_model = pipeline.fit(df_train)
df_pred = pipeline_model.transform(df_test)