# Estructura de una Pipeline de Machine Learning en Python y en Pyspark: el paradigma fit-transform

Vamos a ver un ejemplo de como se implementaría un método de detección de variables constantes siguiendo los modelos de las librerías Scikit-Learn y MLlib.

Para ello trabajaremos con un sencillo Dataframe con tan solo cinco observaciones y tres variables.

In [1]:
from pyspark.sql import SparkSession
from pyspark import keyword_only
from pyspark.ml.param.shared import *
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.pipeline import Estimator, Model, Pipeline
from sklearn.pipeline import Pipeline as pipeline_sklearn

In [2]:
spark = SparkSession.builder.appName("Detección Constantes").master("local").getOrCreate()

22/06/23 00:13:56 WARN Utils: Your hostname, lu-Aspire-A515-55 resolves to a loopback address: 127.0.1.1; using 10.9.103.195 instead (on interface wlp0s20f3)
22/06/23 00:13:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/23 00:13:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/23 00:13:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/06/23 00:13:57 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/06/23 00:13:57 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [3]:
df = spark.read.csv('../../Datasets/Variables_constantes.csv',header=True, inferSchema=True)

In [4]:
df.show()

+----------+----------+----------+
|Variable_1|Variable_2|Variable_3|
+----------+----------+----------+
|      Gato|    Conejo|     Perro|
|     Perro|     Perro|     Perro|
|     Perro|    Conejo|     Perro|
|      Gato|    Conejo|     Perro|
|    Conejo|     Perro|     Perro|
+----------+----------+----------+



1) Scikit-Learn:

La forma de proceder en Scikit-Learn es definiendo una clase que aplica el procedimiento de Machine Learning en cuestión y que cuenta con el método fit que ajusta el modelo y el método transform que lo aplica.

In [5]:
class DeteccionConstantes:
    def __init__(self):
        pass

    def deteccion_constantes(self, df):
        columns = df.columns
        columnas_constantes =[]
        for i in columns:
            valores_distintos = df.dropna(subset=[i]).select(i).distinct().count()
            if valores_distintos <=1:
                columnas_constantes.append(i)
        return columnas_constantes
    
    def fit(self, df):
        self.columnas_constantes = self.deteccion_constantes(df)
        return self

    def transform(self, df):
        if len(self.columnas_constantes)!=0:
            nuevo_df = df.drop(*self.columnas_constantes)
        return nuevo_df

In [6]:
deteccion_constantes = DeteccionConstantes()
modelo = deteccion_constantes.fit(df)
resultado = modelo.transform(df)
resultado.show()

+----------+----------+
|Variable_1|Variable_2|
+----------+----------+
|      Gato|    Conejo|
|     Perro|     Perro|
|     Perro|    Conejo|
|      Gato|    Conejo|
|    Conejo|     Perro|
+----------+----------+



2) MLlib:

La forma de proceder en MLlib es definiendo primero los parámetros en clases que los inicializan y después definiendo una clase para el estimador (método fit) y otra para el transformer (método transform).

In [7]:
# Clase que inicializa el único parámetro
class HasColumnasConstantes(Params):
    def __init__(self):
        super(HasColumnasConstantes, self).__init__()
        self.columnasConstantes = Param(self, "columnasConstantes", "columnasConstantes", 
                                        typeConverter=TypeConverters.identity)
        
    def setColumnasConstantes(self, value):
        return self.set(self.columnasConstantes, value)

    def getColumnasConstantes(self):
        return self.getOrDefault(self.columnasConstantes)

# Clase que define el estimador
class DeteccionConstantesEstimator(Estimator, DefaultParamsReadable, DefaultParamsWritable):
    @keyword_only
    def __init__(self):
        super(DeteccionConstantesEstimator, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
    
    @keyword_only
    def setParams(self):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    @staticmethod
    def eliminacion_constantes(df):
        columnas = df.columns
        columnas_constantes = []
        s = len(df.columns)
        for i, columna in zip(range(s), columnas):
            valores_distintos = df.dropna(subset=[columna]).select(columna).distinct().count()
            if valores_distintos<=1:
                columnas_constantes.append(columna)
        return columnas_constantes
    
    def _fit(self, df):
        columnas_constantes = self. eliminacion_constantes(df)
        return DeteccionConstantesModel(columnasConstantes=columnas_constantes)

# Clase que define el transformer
class DeteccionConstantesModel(Model, HasColumnasConstantes,  DefaultParamsReadable, DefaultParamsWritable):
    @keyword_only
    def __init__(self, columnasConstantes=[]):
        super(DeteccionConstantesModel, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, columnasConstantes=[]):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df):
        columnas_constantes = self.getColumnasConstantes()
        try:
            df_final = df.drop(*columnas_constantes)
        except:
            df_final = list([])
        return df_final

En este caso nuestro algoritmo de preprocesamiento no necesita de ningún parámetro de entrada para poder funcionar, pero si lo necesitara iría definido en la función "init" de la clase del Estimator y tendríamos por lo tanto que elegirlo al llamar a dicha clase.

In [8]:
deteccion_constantes = DeteccionConstantesEstimator()
#Vemos como las Pipelines funcionan igual que en Scikit-Learn, en este caso tendríamos una con un solo paso.
modelo = Pipeline(stages=[deteccion_constantes]).fit(df)
resultado = modelo.transform(df)
resultado.show()

+----------+----------+
|Variable_1|Variable_2|
+----------+----------+
|      Gato|    Conejo|
|     Perro|     Perro|
|     Perro|    Conejo|
|      Gato|    Conejo|
|    Conejo|     Perro|
+----------+----------+

