In [None]:
import pandas as pd
import numpy as np
from sklearn.feature_selection import VarianceThreshold
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from imblearn.over_sampling import SMOTE
from sklearn.metrics import balanced_accuracy_score
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import GridSearchCV
 
majors = ['SiO2_normalized', 'TiO2_normalized', 'Al2O3_normalized',
          'FeOT_normalized', 'MnO_normalized', 'MgO_normalized', 'CaO_normalized',
          'Na2O_normalized', 'K2O_normalized']
traces = ['Rb', 'Sr', 'Y', 'Zr', 'Nb', 'Cs', 'Ba', 'La',
          'Ce', 'Pr', 'Nd', 'Sm', 'Eu', 'Gd', 'Tb', 'Dy',
          'Ho', 'Er', 'Tm', 'Yb', 'Lu', 'Hf', 'Ta', 'Pb',
          'Th', 'U']
 
numeric_features = ['Rb', 'Sr', 'Y', 'Zr', 'Nb', 'Cs', 'Ba', 'La',
                    'Ce', 'Pr', 'Nd', 'Sm', 'Eu', 'Gd', 'Tb', 'Dy',
                    'Ho', 'Er', 'Tm', 'Yb', 'Lu', 'Hf', 'Ta', 'Pb',
                    'Th', 'U', 'SiO2_normalized', 'TiO2_normalized', 'Al2O3_normalized',
                    'FeOT_normalized', 'MnO_normalized', 'MgO_normalized', 'CaO_normalized',
                    'Na2O_normalized', 'K2O_normalized'
]
 
categorical_features = []
 
class FeatureEngineeringTransformer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X['Total_traces'] = X[traces].sum(axis=1)
        X['Total_majors'] = X[majors].sum(axis=1)
        
        liste = ['Dy','Gd','Sm','Pr','La', 'Cs', 'Zr', 'Sr']
        new_cols = []
        for i, col_i in enumerate(liste):
            for j, col_j in enumerate(liste[i+1:], start=i+1):
                ratio_col_name = f'{col_i}_{col_j}_Ratio'
                product_col_name = f'{col_i}_{col_j}_Product'
                difference_col_name = f'{col_i}_{col_j}_Difference'
                sum_col_name = f'{col_i}_{col_j}_Sum'
 
                X[ratio_col_name] = X[col_i] / X[col_j]
                X[product_col_name] = X[col_i] * X[col_j]
                X[difference_col_name] = X[col_i] - X[col_j]
                X[sum_col_name] = X[col_i] + X[col_j]
                
                new_cols.extend([ratio_col_name, product_col_name, difference_col_name, sum_col_name])
        
        X = pd.concat([X, X[new_cols]], axis=1)
        
        return X
 
class Classifier(BaseEstimator):
    def __init__(self):
        numeric_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', StandardScaler())])
 
        categorical_transformer = Pipeline(steps=[
            ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
            ('onehot', OneHotEncoder(handle_unknown='ignore'))])
 
        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features),
                ('cat', categorical_transformer, categorical_features),
                ('feature_engineering', FeatureEngineeringTransformer(), numeric_features)])
        
        self.model_pipeline = Pipeline(steps=[
            ('preprocessor', preprocessor),
            ('imputer', SimpleImputer(strategy='median')),
            ('classifier', RandomForestClassifier(max_depth=20, min_samples_leaf=1, min_samples_split=2, n_estimators=300))
        ])
        
    def preprocessing(self, X, y):
        smote = SMOTE()
        X_resampled, y_resampled = smote.fit_resample(X, y)
        return X_resampled, y_resampled
 
    def fit(self, X, y):
        X = X.replace([np.inf, -np.inf], np.nan)
        X = X.fillna(X.median())
        X_resampled, y_resampled = self.preprocessing(X, y)          
        self.model_pipeline.fit(X_resampled, y_resampled)
 
    def predict(self, X):
        return self.model_pipeline.predict(X)
 
    def predict_proba(self, X):
        return self.model_pipeline.predict_proba(X)
    
    def get_params(self, deep=True):
        return self.model_pipeline.get_params(deep=True)