# *Pipelines* - trabalhando com dados categóricos e numéricos

Um *pipeline* é uma forma de organizar em um único objeto uma sequência de operações a serem realizadas sobre nossos dados. A saída de uma operação torna-se a entrada da próxima e assim por diante, até que os dados completamente processados saiam no final. Eles deixam o código mais conciso, legível e, geralmente, mais rápido. Também fica fácil adicionar ou remover etapas conforme for necessário. 

`Pipeline` também é o nome do objeto da biblioteca `scikit-learn` usado para aplicar essas sequências de transformações a um conjunto de dados. 

Vamos mostrar como usar os *pipelines* e apresentar um exemplo ao final, usando o conjunto de dados Titanic.

In [1]:
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.datasets import make_classification
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectKBest, VarianceThreshold
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import cross_val_score, GridSearchCV, KFold, train_test_split 
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, StandardScaler, MinMaxScaler, Normalizer, MaxAbsScaler


## Criando um *Pipeline*

Um `Pipeline` é construído usando uma lista de tuplas (método, estimador), na qual "método" é uma *string* contendo o nome que você deseja dar à etapa e "estimador" é um objeto que implementa os métodos `fit` e `transform`.

Suponha que você queira fazer as seguintes manipulações nos seus dados:
- Preencher valores faltantes com a mediana dos valores da coluna
- Remover variáveis com variância menor que 0.1
- Padronizar os dados (subtrair a média e dividir pelo desvio padrão)

O *Pipeline* abaixo pode ser usado para realizar essas etapas:

In [2]:
pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),  # Preenche linhas sem valores com a mediana
    ("selector", VarianceThreshold(0.1)),  # Remove descritores variância < 0.1
    ("scaler", StandardScaler()),  # Padronização
])

pipeline

Alternativamente, podemos criar o mesmo *Pipeline* a partir de uma lista de tuplas (método, estimador):

In [3]:
etapas = []

etapas.append(("imputer", SimpleImputer(strategy="median")))
etapas.append(("selector", VarianceThreshold(0.1)))
etapas.append(("scaler", StandardScaler()))

pipeline = Pipeline(etapas)
pipeline

## *Pipelines* para diferentes tipos de variáveis

Diferentes tipos de variáveis requerem manipulações diferentes. Por exemplo, não faz sentido calcular médias para variáveis categóricas. Assim, é uma prática comum identificar suas variáveis de acordo com o tipo.

In [4]:
# Criar DataFrame
dados = {
    'Idade': [25, 30, 35, 40, 45, 50, 55, 60, 65, 70],
    'Altura': [1.70, 1.75, 1.80, np.NaN, 1.90, 1.80, 1.75, 1.68, 1.72, 1.78],
    'Gênero': ['M', 'F', 'M', 'I', 'M', 'M', np.NaN, 'I', 'F', 'M'],
    'Casado': ['Sim', 'Sim', 'Não', 'Sim', 'Não', 'Sim', 'Não', 'Sim', 'Não', 'Sim'],
    'Ensino_superior': ['Sim', 'Sim', np.NaN, 'Sim', 'Não', 'Sim', 'Sim', 'Sim', 'Não', 'Não']
}

X = pd.DataFrame(dados)
y = [50000, 60000, 30000, 50000, 40000, 100000, 11000, 20000, 13000, 14000]

Caso suas variáveis já estejam com os `dtypes` corretos, é fácil separar as numéricas das categóricas:

In [5]:
print(X.dtypes)

Idade                int64
Altura             float64
Gênero              object
Casado              object
Ensino_superior     object
dtype: object


In [6]:
numerical_columns = [col for col in X.columns if X[col].dtype in ['int64',
                                                                  'float64']]
categorical_columns = [col for col in X.columns if X[col].dtype=="object"]

In [7]:
# Também é possível listar manualmente as variáveis desejadas
numerical_features = ["Idade", "Altura"]
categorical_features = ["Gênero", "Casado", "Ensino_superior"]

In [8]:
# Exemplo de Pipeline para variáveis numéricas
numerical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="median")),  # Preenche linhas sem valores com a mediana
    ("selector", VarianceThreshold()),  # Remove descritores com baixa variância
    ("scaler", StandardScaler()),  # Padronização
    ("feature_selection", SelectKBest(k=20)),  # Seleção de variáveis
    ("PCA", PCA(n_components=2))  # Projeção para redução de dimensionalidade
])

In [9]:
# Exemplo de Pipeline para variáveis categóricas
categorical_pipeline = Pipeline([
    ("encoder", LabelEncoder())  # Transforma variável categórica no formato texto em numérica
])

In [10]:
# Exemplo de Pipeline para variáveis categóricas
categorical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent")),  # Preenche linhas sem valores com a entrada mais comum
    ("OHE", OneHotEncoder(drop="first"))  # transforma categorias em vetores numéricos
])

**Importante:** conheça bem todas as funções que pretende usar no seu *pipeline* para não ter surpresas. Por exemplo, o `SimpleImputer()` exige a identificação dos valores que representam valores faltantes (vamos usar `missing_values=np.NaN` no exemplo abaixo).

## ColumnTransformer

O `ColumnTransformer` nos permite juntar *Pipelines* que operam em diferentes variáveis dos nossos conjuntos de dados:

In [11]:
from sklearn.compose import ColumnTransformer

numerical_features = ["Idade", "Altura"]
categorical_features = ["Gênero", "Casado", "Ensino_superior"]

numerical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="median", missing_values=np.NaN)),
    ("selector", VarianceThreshold()),
    ("scaler", StandardScaler()),
])

categorical_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent", missing_values=np.NaN)),
    ("OHE", OneHotEncoder(drop="first"))
])

# Combinando os pipelines
preprocessor = ColumnTransformer([
    ("numerical", numerical_pipeline, numerical_features),
    ("categorical", categorical_pipeline, categorical_features)
])

In [12]:
# Vamos aplicar o pipeline ao conjunto de dados
pd.DataFrame(preprocessor.fit_transform(X))

Unnamed: 0,0,1,2,3,4,5
0,-1.566699,-1.064742,0.0,1.0,1.0,1.0
1,-1.218544,-0.219709,0.0,0.0,1.0,1.0
2,-0.870388,0.625325,0.0,1.0,0.0,1.0
3,-0.522233,-0.219709,1.0,0.0,1.0,1.0
4,-0.174078,2.315392,0.0,1.0,0.0,0.0
5,0.174078,0.625325,0.0,1.0,1.0,1.0
6,0.522233,-0.219709,0.0,1.0,0.0,1.0
7,0.870388,-1.402756,1.0,0.0,1.0,1.0
8,1.218544,-0.726729,0.0,0.0,0.0,0.0
9,1.566699,0.287311,0.0,1.0,1.0,0.0


Veja que a idade e a altura foram padronizadas (duas primeiras colunas), a variável Gênero foi convertida a duas colunas numéricas pelo OHE, e as outras colunas foram convertidas a valores binários. Nenhum valor está faltando.

## Mostrando *Pipelines*

Para mostrar um *Pipeline* em um Jupyter Notebook no formato de texto, basta usar a função `print()`.

In [13]:
print(preprocessor)

ColumnTransformer(transformers=[('numerical',
                                 Pipeline(steps=[('imputer',
                                                  SimpleImputer(strategy='median')),
                                                 ('selector',
                                                  VarianceThreshold()),
                                                 ('scaler', StandardScaler())]),
                                 ['Idade', 'Altura']),
                                ('categorical',
                                 Pipeline(steps=[('imputer',
                                                  SimpleImputer(strategy='most_frequent')),
                                                 ('OHE',
                                                  OneHotEncoder(drop='first'))]),
                                 ['Gênero', 'Casado', 'Ensino_superior'])])


Se apenas chamar o *pipeline* na célula, ele pode ser apresentado como texto ou diagrama. No meu Notebook, ele já fica no formato diagrama.

In [14]:
preprocessor

Caso no seu apareça como texto, ainda é possível mostrá-lo no formato diagrama, usando `set_config`. Veja mais [aqui](https://scikit-learn.org/dev/auto_examples/miscellaneous/plot_pipeline_display.html#sphx-glr-auto-examples-miscellaneous-plot-pipeline-display-py)


In [15]:
from sklearn import set_config

set_config(display="diagram")

preprocessor

## Adicionando um modelo ao *Pipeline*

Além de processar dados, é possível também incluir um estimador (por exemplo, um modelo de classificação/regressão) como etapa **final** do `Pipeline`. As etapas intermediárias devem ser necessariamente transformações dos dados. Em termos de métodos do `scikit-learn`, dizemos que as etapas intermediárias devem implementar os métodos `fit()` e `transform()`, enquanto o estimador final só precisa implementar o `fit()`.

Veja abaixo um exemplo em que fazemos uma seleção de variáveis com variância maior que 0.2, seguida de padronização e criação de um modelo de classificação usando k-NN.

In [16]:
# Generate synthetic classification data
X, y = make_classification(n_samples=1000, n_features=10, n_classes=2, random_state=42)

# Dividir os dados em conjuntos de treinamento e teste
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [17]:
from sklearn.neighbors import KNeighborsClassifier

model = Pipeline([
    ("selector", VarianceThreshold()),
    ('classifier', KNeighborsClassifier())
])

In [18]:
# Para treinar o modelo:
model.fit(X_train, y_train)

In [19]:
# E usamos o pipeline para fazer previsões para os dados de teste
y_pred = model.predict(X_test)
print("Predições:", y_pred)

# Calcular erro quadrado médio para a predição
acc = accuracy_score(y_test, y_pred)
print("Acurácia no conjunto de teste: ", acc)

Predições: [0 1 0 1 0 1 0 0 0 0 0 1 1 1 0 0 1 1 1 1 1 0 1 0 0 1 0 0 1 0 1 1 0 0 0 0 1
 1 0 1 0 1 0 0 0 0 1 1 1 1 0 1 1 0 1 0 1 0 1 0 1 0 0 1 0 1 0 1 0 1 1 1 1 0
 1 0 0 1 0 1 0 0 1 0 1 0 0 0 0 1 1 1 1 1 1 1 0 0 1 0 1 0 1 0 0 1 0 1 0 1 1
 1 1 1 1 0 0 1 0 0 1 1 0 1 1 1 1 1 0 1 1 0 1 1 0 0 0 0 0 1 1 0 0 0 1 1 1 0
 0 0 1 0 0 1 0 0 1 0 0 1 0 1 0 0 0 0 0 0 0 0 1 0 0 0 1 1 0 0 1 1 1 1 1 1 1
 0 0 0 0 1 1 0 0 0 1 0 1 1 1 1]
Acurácia no conjunto de teste:  0.8


## Uso de *pipelines* com validação cruzada

Sabemos que não é recomendável aplicar transformações (por exemplo, padronizar os dados) a um conjunto de dados completo, antes de separar em conjunto de treinamento e teste, devido ao problema denominado *data leakage*. Usando `Pipeline` podemos reunir várias etapas de manipulação dos dados em um único transformador, permitindo, por exemplo, um tratamento individual para cada *fold* em uma validação cruzada, evitando assim o *data leakage*. 

In [20]:
from sklearn.model_selection import cross_val_score, KFold

# Criar pipeline
model = Pipeline([
    ("selector", VarianceThreshold()),
    ('classifier', KNeighborsClassifier())
])

# Avaliar modelo usando validação cruzada
# As etapas serão executadas individualmente em cada fold
kfold = KFold(n_splits=3)
results = cross_val_score(model, X, y, cv=kfold, scoring="accuracy")

print(results)

[0.82934132 0.84384384 0.81681682]


## Usando um *Pipeline* para ajustar hiperparâmetros

Podemos incluir diferentes métodos e/ou hiperparâmetros de métodos a um dicionário, usá-lo em um *grid search* e adicionar os valores que fornecerem os melhores resultados a um *Pipeline*. Note que no caso de hiperparâmetros, os nomes dos métodos devem corresponder aos nomes definidos pelo `scikit-learn`.

Abaixo, vamos criar um *pipeline* com as etapas:
- Padronização
- Remoção de variáveis com baixa variância
- Modelo classificador k-NN

E vamos tentar encontrar a melhor combinação para:
- Método de padronização (etapa 1)
- *Threshold* para a variância (etapa 2)
- Hiperparâmetros n_neighbors, p e leaf_size do KNeighborsClassifier (etapa 3)

Veja que os parâmetros dos estimadores no *pipeline* podem ser acessados usando a sintaxe `estimator__parameter`

[Fonte](https://machinelearningmastery.com/modeling-pipeline-optimization-with-scikit-learn/)

In [21]:
from sklearn.model_selection import GridSearchCV

pipe = Pipeline([
    ('scaler', StandardScaler()),
    ('selector', VarianceThreshold()),
    ('classifier', KNeighborsClassifier())
])

# Dicionário com o grid de hiperparâmetros que vamos explorar
parameters = {
    'scaler': [StandardScaler(), MinMaxScaler(), Normalizer(), MaxAbsScaler()],
    'selector__threshold': [0, 0.001, 0.01],
    'classifier__n_neighbors': [1, 5, 10],
    'classifier__p': [1, 2],
    'classifier__leaf_size': [1, 5]
}

grid_search = GridSearchCV(pipe, param_grid=parameters, cv=2).fit(X_train, y_train)
 
print('Score conjunto de treinamento: ' + str(grid_search.score(X_train, y_train)))
print('Score conjunto de teste: ' + str(grid_search.score(X_test, y_test)))

Score conjunto de treinamento: 0.88625
Score conjunto de teste: 0.845


In [22]:
# Acessar os melhores hiperparâmetros
best_params = grid_search.best_params_
print(best_params)

{'classifier__leaf_size': 1, 'classifier__n_neighbors': 10, 'classifier__p': 1, 'scaler': Normalizer(), 'selector__threshold': 0}


In [23]:
# Salvar os melhores hiperparâmetros na variável best_pipe
best_pipe = grid_search.best_estimator_
print(best_pipe)

Pipeline(steps=[('scaler', Normalizer()),
                ('selector', VarianceThreshold(threshold=0)),
                ('classifier',
                 KNeighborsClassifier(leaf_size=1, n_neighbors=10, p=1))])


In [24]:
# Acessando o valor de p do k-NN pipeline final
best_pipe.get_params()['classifier__p']

1

Etapas não finais podem ser ignoradas usando "passthrough". 

No exemplo abaixo, mostramos como explorar o efeito redução de dimensionalidade no preprocessamento dos dados, testando as seguintes situações: nenhuma redução de dimensionalidade, PCA com 5 PCs e PCA com 10 PCs. Também testamos dois classificadores distintos.

In [25]:
pipe = Pipeline([
    ('reduce_dim', PCA()),
    ('classifier', KNeighborsClassifier())
])

param_grid = dict(reduce_dim=["passthrough", PCA(5), PCA(10)],
                  classifier=[KNeighborsClassifier(), LogisticRegression()])

grid_search = GridSearchCV(pipe, param_grid=param_grid, cv=2).fit(X_train, y_train)

In [26]:
best_params = grid_search.best_params_
print(best_params)

{'classifier': LogisticRegression(), 'reduce_dim': PCA(n_components=5)}


## Criando um transformador personalizado

Em alguns casos, podemos querer usar uma etapa em um *Pipeline* para a qual não existe função implementada. Podemos criar nossas próprias classes, lembrando que ela necessariamente deverá implementar os métodos `fit()` e `transform()`.

Veja abaixo uma função para remover variáveis correlacionadas entre si, que usei no Notebook [Seleção de variáveis e Redução de dimensionalidade](https://github.com/rflameiro/Python_e_Quiminformatica/blob/main/Machine_Learning/Sele%C3%A7%C3%A3o%20de%20vari%C3%A1veis%20e%20Redu%C3%A7%C3%A3o%20de%20dimensionalidade.ipynb).

In [27]:
from sklearn.base import BaseEstimator, TransformerMixin

In [28]:
# Função para remover variáveis correlacionadas entre si
# Adaptado de https://github.com/fraunhoferportugal/tsfel/issues/91
class CorrelationThreshold(BaseEstimator, TransformerMixin):
    
    def __init__(self, threshold = 0.9):
        self.threshold = threshold
        self.to_drop = None
        self.to_keep = None

    def fit(self, X, y=None): 
        corr_matrix = X.corr(method='pearson').abs()
        upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(np.bool))
        self.to_drop = [column for column in upper.columns if any(upper[column] > self.threshold)]
        # self.to_keep = list(set(X.columns) - set(self.to_drop))
        # The method above does not preserve the order of the columns
        self.to_keep = [value for value in X.columns if value not in self.to_drop]
        return self
        
    def transform(self, X, y = None):
        X_selected = X[self.to_keep]
        return X_selected
    
    def get_support(self):
        return self.to_keep

## make_pipeline

A função utilitária `make_pipeline()` facilita a construção de *pipelines*; ela toma como entrada um número qualquer de estimadores e retorna um *pipeline*, preenchendo os nomes automaticamente:

In [29]:
from sklearn.pipeline import make_pipeline
from sklearn.naive_bayes import MultinomialNB
from sklearn.preprocessing import Binarizer

pipe = make_pipeline(Binarizer(), MultinomialNB())

In [30]:
print(pipe)

Pipeline(steps=[('binarizer', Binarizer()), ('multinomialnb', MultinomialNB())])


## FeatureUnion

Parecido com o *Pipeline* mas, em vez de realizar as operações sequencialmente, as realiza em paralelo e concatena os resultados. Por exemplo, o objeto abaixo realizará duas reduções de dimensionalidade, uma com PCA e outra com SVD, e todas as colunas resultantes (uma da PCA e duas da SVD) serão incluídas no *DataFrame* resultante.

In [31]:
from sklearn.pipeline import FeatureUnion
from sklearn.decomposition import PCA, TruncatedSVD

union = FeatureUnion([("pca", PCA(n_components=1)),
                      ("svd", TruncatedSVD(n_components=2))])

X = [[0., 1., 3], [2., 2., 5], [1., 2., 3], [2., 4., 3]]

union.fit_transform(X)

array([[ 1.80155683,  2.89772015, -1.07557265],
       [-0.44937328,  5.63365372, -0.97470578],
       [ 0.41367381,  3.73706775, -0.04782552],
       [-1.76585736,  5.10116037,  1.72246899]])

# Exemplo: Titanic

Trago aqui o exemplo [deste Notebook do Kaggle](https://www.kaggle.com/code/erkanhatipoglu/introduction-to-sklearn-pipelines-with-titanic), em que o autor define várias funções para lidar com os diferentes tipos de dados do *dataset* Titanic.

In [32]:
from category_encoders import BinaryEncoder
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest
from sklearn.impute import SimpleImputer
from sklearn.metrics import make_scorer, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import RandomForestClassifier

In [33]:
# Importar dataset Titanic - contém variáveis de diversos tipos
url = "https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv"
df = pd.read_csv(url, index_col='PassengerId')

In [34]:
# Funções úteis

def get_titles(df):
    '''Obter as entradas do campo Name (títulos como Mr, Mrs...)'''
    titles=set()
    for name in df:
        if name.find('.'):
            title = name.split('.')[0].split()[-1]
            titles.add(title)
    return titles


def get_family_name(df):
    '''Obter os nomes de família do campo Name'''
    family_names=set()
    for name in df:
        if name.find(','):
            family_name = name.split(',')[0].split()[-1]
            family_names.add(family_name)
    return family_names

def get_cabin_chars(df):
    '''Obter os caracteres do campo Cabin'''
    cabin_chars = set()
    for word in df:
        cabin_chars.add(str(word)[0]) 
    return cabin_chars


In [35]:
y = df["Survived"]
df.drop(["Survived", "Ticket"], axis=1, inplace=True)
X = df.copy()

# Split para validação
X_train, X_valid, y_train, y_valid = train_test_split(X,y, random_state=2)

In [36]:
# Identificar colunas com valores faltantes
missing_values = [col for col in X_train.columns if X_train[col].isnull().sum()]
print("Colunas com valores faltantes: {}".format(missing_values))

# Identificar colunas numéricas e categóricas pelo dtype
numerical_columns = [col for col in X.columns if X[col].dtype in ['int64',
                                                                  'float64']]
print("Colunas numéricas: {}".format(numerical_columns))

categorical_columns = [col for col in X.columns if X[col].dtype=="object"]
print("Colunas categóricas: {}".format(categorical_columns))

# Extrair títulos da coluna Name
titles = list(get_titles(X["Name"]))
print("\nTitles: {}".format(titles))

# Extrair caracteres da coluna Cabin
chars = list(get_cabin_chars(X["Cabin"]))
print("\nChars in Cabin column: {}".format(chars))

# Extrair sobrenomes da coluna Name
surname = list(get_family_name(X["Name"]))
print("\nSurnames of the passengers: {}".format(surname))

Colunas com valores faltantes: ['Age', 'Cabin', 'Embarked']
Colunas numéricas: ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare']
Colunas categóricas: ['Name', 'Sex', 'Cabin', 'Embarked']

Titles: ['Miss', 'Mlle', 'Don', 'Dr', 'Rev', 'Capt', 'Mme', 'Mr', 'Master', 'Sir', 'Mrs', 'Col', 'Lady', 'Countess', 'Ms', 'Jonkheer', 'Major']

Chars in Cabin column: ['G', 'F', 'E', 'C', 'A', 'D', 'B', 'n', 'T']

Surnames of the passengers: ['Brewe', 'Thayer', 'Humblen', 'Lindell', 'Reuchlin', 'Emanuel', 'Tornquist', 'Reynaldo', 'Johanson', 'Laitinen', 'Nankoff', 'Harper', 'Meanwell', 'Impe', 'Lemberopolous', 'Artagaveytia', 'Cairns', 'Silven', 'Dennis', 'Pekoniemi', 'Backstrom', 'Strom', 'Lobb', 'Bradley', 'Gustafsson', 'Hocking', 'Mullens', 'Sjoblom', 'Abbott', 'Wiseman', 'Davis', 'Danoff', 'Cunningham', 'Arnold-Franchi', 'Greenberg', 'Banfield', 'Goldenberg', 'Weir', 'Moore', 'Quick', 'Salonen', 'Newsom', 'Hays', 'Andrew', 'Zimmerman', 'Lindqvist', 'Ponesell', 'Andersson', 'Wiklund', 'Heikkinen', 'Betr

In [37]:
# Custom transformer classes
class NameColumnTransformer(BaseEstimator, TransformerMixin):
    """
    a general class for transforming Name, SibSp and Parch columns of Titanic dataset
    for using in the machine learning pipeline
    """
    def __init__(self):
        """
        constructor
        """
        # Will be used for fitting data
        self.titles_set = set()
        self.surname_set = set()
        # Titles captured from train data
        self.normal_titles_list = ["Mr", "Mrs", "Mme", "Miss", "Mlle", "Ms", "Master", "Dona"]
        self.titles_dict = {"Mr": ['Mr', 'Major', 'Jonkheer', 'Capt', 'Col', 'Don', 'Sir',
                                   'Rev'],
                            "Mrs": ['Mrs', 'Mme', 'Lady','Countess', 'Dona'],
                            "Miss": ['Miss', 'Mlle', 'Ms'],
                            "Master": ['Master'],
                            "Dr": ['Dr']}

    def fit(self, X, y=None, **kwargs):
        """
        an abstract method that is used to fit the step and to learn by examples
        :param X: features - Dataframe
        :param y: target vector - Series
        :param kwargs: free parameters - dictionary
        :return: self: the class object - an instance of the transformer - Transformer
        """
        '''Fits the titles, family and rank from Names Column'''
        
        # Make a copy to avoid changing original data
        X_temp = X.copy()
        # Create Titles column
        if "Title" in X_temp.columns:
            X_temp.drop("Title", axis=1, inplace=True)
        else:
            pd.DataFrame.insert(X_temp, len(X_temp.columns),"Title","",False)  
            
        # Get the index values
        index_values=X_temp.index.values.astype(int)
        
        # Set state (Add to: {titles_set, surname_set} attributes) of the object
        for i in index_values:
            
            # Get the name for the ith index
            name = X_temp.loc[i,'Name']
            # Get the number of followers for the ith index
            number_of_followers = X_temp.loc [i, 'SibSp'] + X_temp.loc [i, 'Parch']
            
            # Split the title from name
            if name.find('.'):
                title = name.split('.')[0].split()[-1]
                if title in self.titles_dict.keys():
                    X_temp.loc[i, 'Title'] = title
                else:
                    X_temp.loc[i, 'Title'] = np.NaN
                # Add title to titles_set to use in transform method
                self.titles_set.add(title)
            
            # Split the surname from name
            if name.find(','):
                surname = name.split(',')[0].split()[-1]
                # Add surname to surname_set to use in transform method
                if number_of_followers > 0:
                    self.surname_set.add(surname)
                    X_temp.loc[i,"Family"]=surname
                
        # Title Encoding
        
        # Drop missing Title rows (Hi rank columns that are mapped to titles_dict keys)
        # so that no 'Title_' columns will appear in transform 
        X_temp.dropna(axis = "index", subset=['Title'], inplace=True)
        
        # Apply one-hot encoding to the Title column.
        self.OH_encoder = OneHotEncoder(handle_unknown = 'ignore', sparse_output = False)
        # Get column names to use in transform.
        self.OH_encoder = self.OH_encoder.fit(X_temp[['Title']])
        self.title_columns = self.OH_encoder.get_feature_names_out(['Title'])

        # Family Encoding
        
        # Drop missing Family rows
        # so that no 'Family_' columns will appear in transform 
        X_temp.dropna(axis = "index", subset=['Family'], inplace=True)
        
        # Apply binary encoding to the Family column.
        self.binary_encoder = BinaryEncoder(cols =['Family'])
        self.binary_encoder = self.binary_encoder.fit(X_temp[['Family']])
        
        return self

    def transform(self, X, y=None, **kwargs):
        """
        an abstract method that is used to transform according to what happend in the fit method
        :param X: features - Dataframe
        :param y: target vector - Series
        :param kwargs: free parameters - dictionary
        :return: X: the transformed data - Dataframe
        """
        '''Transforms the titles and family from Names Column'''
        
        # Make a copy to avoid changing original data
        X_temp = X.copy()
        
        # Create Titles column    
        pd.DataFrame.insert(X_temp, len(X_temp.columns),"Title","",False)    
        # Create Family column
        pd.DataFrame.insert(X_temp, len(X_temp.columns),"Family","",False)          
        # Create Rank column
        pd.DataFrame.insert(X_temp, len(X_temp.columns),"Rank","",False)
        # Create Followers column
        pd.DataFrame.insert(X_temp, len(X_temp.columns),"Followers","",False)
        
        # Get the index values
        index_values=X_temp.index.values.astype(int)
        
        for i in index_values:
            # Get the name for the ith index
            name = X_temp.loc[i,'Name']
            # Get the number of followers for the ith index
            number_of_followers = X_temp.loc [i, 'SibSp'] + X_temp.loc [i, 'Parch']
            X_temp.loc[i, 'Followers'] = number_of_followers
            
            # Split the title from name
            if name.find('.'):
                title = name.split('.')[0].split()[-1]
                if title in self.titles_set:
                    for key in self.titles_dict:
                        # Insert title
                        if title in self.titles_dict[key]:
                            X_temp.loc[i, 'Title'] = key 
                        
                        # Insert rank
                        if title in self.normal_titles_list:
                            X_temp.loc[i, 'Rank'] = "Normal"
                        else:
                            X_temp.loc[i, 'Rank'] = "High"
                else:
                    X_temp.loc[i, 'Title'] = "Other"
                    X_temp.loc[i, 'Rank'] = "Normal"
                    
                    
            # Split the surname from name
            if name.find(','):
                surname = name.split(',')[0].split()[-1]
                if surname in self.surname_set and number_of_followers > 0:
                    X_temp.loc[i, 'Family'] = surname                 
                else:
                    X_temp.loc[i, 'Family'] = "NA"                    
        
        # Encoding Title
        encoded = self.OH_encoder.transform(X_temp[['Title']])
        # convert arrays to a dataframe 
        encoded = pd.DataFrame(encoded) 
        # One-hot encoding removed index; put it back
        encoded.index = X_temp.index
        # Insert column names
        encoded.columns = self.title_columns
        encoded = encoded.astype('int64')
        # concating dataframes  
        X_temp = pd.concat([X_temp, encoded], axis = 1)
        
        # Encoding Family
        bin_encoded = self.binary_encoder.transform(X_temp[['Family']])
        # convert arrays to a dataframe 
        bin_encoded = pd.DataFrame(bin_encoded) 
        # One-hot encoding removed index; put it back
        bin_encoded.index = X_temp.index
        bin_encoded = bin_encoded.astype('int64')
        # concating dataframes  
        X_temp = pd.concat([X_temp, bin_encoded], axis = 1)
        # We do not need Family any more
        X_temp.drop("Family", axis = 1, inplace=True) 
        
        # Encoding Rank
        X_temp['Rank'] = X_temp['Rank'].apply(lambda x: 1 if x =='Normal' else (0 if x =='High' else None))
        # We do not need Name any more
        X_temp.drop("Name", axis = 1, inplace=True)

        return X_temp

    def fit_transform(self, X, y=None, **kwargs):
        """
        perform fit and transform over the data
        :param X: features - Dataframe
        :param y: target vector - Series
        :param kwargs: free parameters - dictionary
        :return: X: the transformed data - Dataframe
        """
        self = self.fit(X, y)
        return self.transform(X, y)

    
# Custom transformer classes
class AgeColumnTransformer(BaseEstimator, TransformerMixin):
    """
    a general class for transforming age column of Titanic dataset for using in the machine learning pipeline
    """
    def __init__(self):
        """
        constructor
        """
        # Will be used for fitting data
        self.titles_set = set()
        self.titles_dict = {}


    def fit(self, X, y=None, **kwargs):
        """
        an abstract method that is used to fit the step and to learn by examples
        :param X: features - Dataframe
        :param y: target vector - Series
        :param kwargs: free parameters - dictionary
        :return: self: the class object - an instance of the transformer - Transformer
        """
        '''Fits the titles, family and rank from Names Column'''
 
        # Make a copy to avoid changing original data
        X_temp = X.copy()
    
        # Get the index values
        index_values = X_temp.index.values.astype(int)
        
        # Get all the titles from dataset
        for i in index_values:
            title = X_temp.loc [i, 'Title']
            self.titles_set.add(title)
        
        # Calculate mean for all titles
        for title in self.titles_set:
            mean = self.calculate_mean_age(title, X_temp)
            self.titles_dict[title] = mean
            #print("Avarage age for title '{}' is {}".format(title, mean))
       
        return self

    def transform(self, X, y=None, **kwargs):
        """
        an abstract method that is used to transform according to what happend in the fit method
        :param X: features - Dataframe
        :param y: target vector - Series
        :param kwargs: free parameters - dictionary
        :return: X: the transformed data - Dataframe
        """
        '''Transforms the titles and family from Names Column'''
            
        # Make a copy to avoid changing original data
        X_temp = X.copy()        
        
        # Get the index values
        index_values = X_temp.index.values.astype(int)
        
        # If a passangers age is Nan replace it with the avarage value
        # of that title class. e.g. if that passanger is master use the
        # mean value calculated for the masters.
        for i in index_values:
            age = X_temp.at[i, 'Age'].astype(float)
            if np.isnan(age):
                title = X_temp.loc [i, 'Title']
                X_temp.loc[i,'Age'] =  round(self.titles_dict.get(title),2)

        # We do not need Title any more
        X_temp.drop("Title", axis = 1, inplace=True)
        
        return X_temp

    def fit_transform(self, X, y=None, **kwargs):
        """
        perform fit and transform over the data
        :param X: features - Dataframe
        :param y: target vector - Series
        :param kwargs: free parameters - dictionary
        :return: X: the transformed data - Dataframe
        """
        self = self.fit(X, y)
        
        return self.transform(X, y)
                   
    def calculate_mean_age(self, title, X):
        
        # Make a copy to avoid changing original data
        X_temp = X.copy()  
        
        # title_X = X_temp[[title in x for x in X_temp['Title']]][X_temp["Age"].notnull()]
        title_X = X_temp[[title in x for x in X_temp['Title']] & X_temp["Age"].notnull()]
        return title_X["Age"].mean()


# Custom transformer classes
class CabinColumnTransformer(BaseEstimator, TransformerMixin):
    """
    a general class for transforming cabin column of Titanic dataset for using in the machine 
    learning pipeline
    """
    def __init__(self):
        """
        constructor
        """
        # Will be used for fitting data
        self.cabin_set = set()

    def fit(self, X, y=None, **kwargs):
        """
        an abstract method that is used to fit the step and to learn by examples
        :param X: features - Dataframe
        :param y: target vector - Series
        :param kwargs: free parameters - dictionary
        :return: self: the class object - an instance of the transformer - Transformer
        """
        '''Fits the titles, family and rank from Names Column'''
         
        # Make a copy to avoid changing original data
        X_temp = X.copy()
        
        # Imputation on X_temp_imputed
        imputer = SimpleImputer(strategy='constant', fill_value="NaN")
        X_temp_imputed = pd.DataFrame(imputer.fit_transform(X_temp[['Cabin']]))
        # Imputation removed column names; put them back
        X_temp_imputed.columns = X_temp[['Cabin']].columns
        X_temp_imputed.index = X_temp[['Cabin']].index
    
        # Get the index values
        index_values = X_temp.index.values.astype(int)
        
        # For each cabin
        for i in index_values:
            cabin = X_temp_imputed.loc[i, 'Cabin']
            X_temp_imputed.loc[i, 'Cabin'] = cabin[0]
            self.cabin_set.add(cabin[0])
        
        # Cabin Encoding
        
        # Apply one-hot encoding to the Cabin column.
        self.OH_encoder = OneHotEncoder(handle_unknown = 'ignore', sparse_output = False)
        # Get column names to use in transform.
        self.OH_encoder = self.OH_encoder.fit(X_temp_imputed[['Cabin']])
        self.cabin_columns = self.OH_encoder.get_feature_names_out(['Cabin'])
      
        return self

    def transform(self, X, y=None, **kwargs):
        """
        an abstract method that is used to transform according to what happend in the fit method
        :param X: features - Dataframe
        :param y: target vector - Series
        :param kwargs: free parameters - dictionary
        :return: X: the transformed data - Dataframe
        """
        '''Transforms the titles and family from Names Column'''

        # Make a copy to avoid changing original data
        X_temp = X.copy()        
        
        # Get the index values
        index_values = X_temp.index.values.astype(int)
        
        # Imputation on X_imputed
        imputer = SimpleImputer(strategy='constant', fill_value="NaN")
        X_imputed = pd.DataFrame(imputer.fit_transform(X_temp[['Cabin']]))
        # Imputation removed column names; put them back
        X_imputed.columns = X_temp[['Cabin']].columns
        X_imputed.index = X_temp[['Cabin']].index
        
        for i in index_values:
            cabin = X_imputed.loc[i, 'Cabin']
            if cabin[0] in self.cabin_set:
                X_imputed.loc [i, 'Cabin'] = cabin[0]
            else:
                X_imputed.loc [i, 'Cabin'] = "N"
        X_temp.drop("Cabin",axis = 1, inplace = True)

        # concating dataframes  
        X_temp = pd.concat([X_temp, X_imputed], axis = 1)
               
        # Encoding Cabin
        encoded = self.OH_encoder.transform(X_imputed[['Cabin']])
        # convert arrays to a dataframe 
        encoded = pd.DataFrame(encoded) 
        # One-hot encoding removed index; put it back
        encoded.index = X_imputed.index
        # Insert column names
        encoded.columns = self.cabin_columns
        encoded = encoded.astype('int64')
        # concating dataframes  
        X_temp = pd.concat([X_temp, encoded], axis = 1)

        X_temp.drop("Cabin",axis = 1, inplace = True)
        
        return X_temp

    def fit_transform(self, X, y=None, **kwargs):
        """
        perform fit and transform over the data
        :param X: features - Dataframe
        :param y: target vector - Series
        :param kwargs: free parameters - dictionary
        :return: X: the transformed data - Dataframe
        """
        self = self.fit(X, y)
        return self.transform(X, y)

In [38]:
# Definir os transformadores customizados
name_transformer = NameColumnTransformer()
age_transformer = AgeColumnTransformer()
cabin_transformer = CabinColumnTransformer()

In [39]:
# Definir as colunas que serão processadas pelo ColumnTransformer, para as quais
# não serão usadas os transformadores personalizados
numerical_cols = ['Pclass', 'Fare']
categorical_cols = ['Sex', 'Embarked']

In [40]:
# Pipeline para os dados numéricos
numerical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler())])


# Pipeline para os dados categóricos
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown = 'ignore', sparse_output=False))
])

# Juntar os pipelines para dados numéricos e categóricos usando ColumnTransformer
# remainder='passthrough' indica que as colunas não processadas são mantidas
column_transformer = ColumnTransformer(
    transformers=[
        ('numerical', numerical_transformer, numerical_cols),
        ('categorical', categorical_transformer, categorical_cols)
    ], remainder='passthrough')

In [41]:
# Redução de dimensionalidade 

# PCA
pca = PCA(n_components=2)

# Além disso, vamos manter algumas das colunas originais usando SelectKBest
selection = SelectKBest(k=12)

# Aplicar ambas as transformações em paralelo usando FeatureUnion
feature_union = FeatureUnion([('pca', pca), ('select', selection)])

In [42]:
# Definir pipeline de preprocessamento
preprocessor = Pipeline(steps=[('name', name_transformer),
                              ('age', age_transformer),
                              ('cabin', cabin_transformer),
                              ('column', column_transformer),
                              ('union', feature_union)])

# Preprocessar conjunto de validação
X_valid_transf = preprocessor.fit(X_train, y_train).transform(X_valid)

In [43]:
# Display the number of remaining columns after transformation 
print("Restam", X_valid_transf.shape[1], "variáveis")

Restam 14 variáveis


In [44]:
# Definir modelo
my_model = RandomForestClassifier(
    n_estimators=500,
    random_state=42)

In [45]:
# Criar e executar o Pipeline
# Juntamos as etapas de preprocessamento com o modelo
my_pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                              ('model', my_model)
                             ])

# O conjunto de treinamento é preprocessado e é feito o fit do modelo
my_pipeline.fit(X_train, y_train)

# As predições são obtidas para o conjunto de validação
preds = my_pipeline.predict(X_valid)

# Finalmente, determinamos a acurácia do modelo
score = accuracy_score(y_valid, preds)
score

0.7892376681614349

## Referências

https://inria.github.io/scikit-learn-mooc/python_scripts/03_categorical_pipeline_column_transformer.html

https://www.kaggle.com/code/erkanhatipoglu/introduction-to-sklearn-pipelines-with-titanic

https://medium.com/@ayush-thakur02/wait-what-are-pipelines-in-python-628f4b5021fd

https://machinelearningmastery.com/modeling-pipeline-optimization-with-scikit-learn/

https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html