<a href="https://colab.research.google.com/github/pp-cat/ml_lab/blob/main/common_ml_pipeline_template.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np # linear algebra
import time
import pandas as pd
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split

from sklearn.base import BaseEstimator, TransformerMixin

from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, RobustScaler
from sklearn.preprocessing import KBinsDiscretizer, MultiLabelBinarizer, FunctionTransformer


'''
 base grid search code
'''
def ml_grid_search(x_train, x_test, y_train, y_test, feature_engineering_pipeline):
    '''
    simple helper function to grid search an ExtraTreesClassifier model and
    print out a classification report for the best param set.
    Best here is defined as having the best cross-validated accuracy on the training set
    '''

    params = {  # some simple parameters to grid search
        'max_depth': [10, None],
        'n_estimators': [10, 50, 100, 500],
        'criterion': ['gini', 'entropy']
    }

    base_model = ExtraTreesClassifier()
    model_grid_search = GridSearchCV(base_model, param_grid=params, cv=3)

    start_time = time.time()  # capture the start time

    if feature_engineering_pipeline:  # fit FE pipeline to training data and use it to transform test data
        parsed_x_train = feature_engineering_pipeline.fit_transform(x_train, y_train)
        parsed_x_test = feature_engineering_pipeline.transform(x_test)
    else:
        parsed_x_train = x_train
        parsed_x_test = x_test

    parse_time = time.time()
    print(f"1) Parsing took {(parse_time - start_time):.2f} seconds")

    model_grid_search.fit(parsed_x_train, y_train)

    fit_time = time.time()
    print(f"2) Training took {(fit_time - start_time):.2f} seconds")

    best_model = model_grid_search.best_estimator_

    print(classification_report(y_true=y_test, y_pred=best_model.predict(parsed_x_test)))
    end_time = time.time()
    print(f"3) Overall took {(end_time - start_time):.2f} seconds")

    return best_model

def make_train_test_dataset(df, response):
    X, y = df.drop([response], axis=1), df[response]
    x_train, x_test, y_train, y_test = train_test_split(
        X, y, stratify=y, random_state=0, test_size=.2)
    return x_train, x_test, y_train, y_test


def value_counts_of_column_list(df, column_list):
    '''
    df: dataframe of pandas
    column_list: list of columns of the df

    It show value count of the selected columns(column_list) from df(pandas dataframe)
    May work with function "separate_column_by_type(df)"
    '''
    for column in column_list:
        print('==============')
        print(column)
        print('==============')
        print(df[column].value_counts(dropna=False))

def separate_column_by_type(df):
    '''
    return numerical type columns and categorical type columns
    respectively of the dataframe
    '''
    numerical_types = ['float16', 'float32', 'float64', 'int16', 'int32', 'int64'] # the numeric types in Pandas
    categorical_type = ['O'] # the object type in Pandas

    numerical_columns = df.select_dtypes(include=numerical_types).columns.tolist()
    categorical_columns = df.select_dtypes(include=categorical_type).columns.tolist()

    return numerical_columns ,categorical_columns


'''
 pipline construction
 change the pipepline to test the performance and evaluate the model !!
'''
categorical_00_pipeline = Pipeline(
    [
        ('select_categorical_00_features', FunctionTransformer(lambda df: df[categorical_columns])),
        ('fill_na', SimpleImputer(strategy='constant', fill_value='unknown')),
        ('encoder', OneHotEncoder(sparse_output=False, handle_unknown='ignore'))
    ]
)

numerical_00_pipeline = Pipeline(
    [
        ('select_numerical_00_features', FunctionTransformer(lambda df: df[numerical_columns])),
        ('impute', SimpleImputer(strategy='median')),
        ('scaler', RobustScaler())
    ]
)

# Union binary and numerical data set
# please add additional pipeline if necessary !

simple_fe = FeatureUnion(
    [
        ('categorical_00', categorical_00_pipeline),
        ('numerical_00', numerical_00_pipeline)
    ]
)


# initial two required variables
numerical_columns = []
categorical_columns = []


'''
 Simple exploratory data analysis rountine
'''
def simple_eda(df):
    print(f"Shape-> f{df.shape}")
    print(f"\n\n--- Have a look about head Record ---")
    print(df.head())
    print("-"*80)
    print(f"\n\n--- info of dataframe ---")
    print(df.info())
    print("-"*80)
    print(f"\n\n--- null values mean distribution ---")
    print(df.isnull().mean())
    print("-"*80)
    print(f"\n\n--- overview statistic of dataframe ---")
    print(df.describe())
    print("-"*80)

    numerical_columns, categorical_columns = separate_column_by_type(df)
    print(f"\n\n--- categorical columns ---")
    print(df[categorical_columns].columns)
    print("-"*80)
    print(f"\n\n--- value counts of categorical columns ---")
    print(value_counts_of_column_list(df ,categorical_columns))
    print("-"*80)
    print(f"\n\n--- numerical columns ---")
    print(df[numerical_columns].columns)
    print("-"*80)
    print(f"\n\n--- value counts of numerical columns ---")
    print(value_counts_of_column_list(df ,numerical_columns))

#
# need to load csv before run first simple_eda() !
#
# pipeline need categorical_columns and numerical_columns to function !!
#
# subroutine need those two variables to separate pipeline for input and
# add additional as required !!
#

