In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# default_exp tmlt

# Training Pipeline

> An API to create super fast training pipeline for machine learning models based on tabular or strucuture data

> It comes with model parallelism and cutting edge hyperparameter tuning techniques.

In [None]:
#hide
from nbdev.showdoc import *
from nbdev import *

In [None]:
# export
from tabular_ml_toolkit.dataframeloader import *
from tabular_ml_toolkit.preprocessor import *
from tabular_ml_toolkit.logger import *
from tabular_ml_toolkit.optuna_objective import *
from tabular_ml_toolkit.utility import *

In [None]:
# export
# hide
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler, MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.metrics import roc_auc_score, accuracy_score, log_loss, f1_score, precision_score, recall_score
from sklearn.model_selection import cross_val_score, GridSearchCV, StratifiedKFold
# for Optuna
import optuna
#for XGB
import xgboost

# for finding n_jobs in all sklearn estimators
from sklearn.utils import all_estimators
import inspect

# Just to compare fit times
import time

# for os specific settings
import os

In [None]:
# export

class TMLT:
    """
    Represent Tabular ML Toolkit class
    
    Attributes:\n
    spl: A Scikit MLPipeline instance \n
    dfl: A DataFrameLoader instance \n
    pp: A PreProcessor instance \n
    model: The given Model
    """

    def __init__(self):
        self.dfl = None
        self.pp = None
        self.model = None
        self.spl = None
        self.transformer_type = None
        self.problem_type = None
        self.has_n_jobs = check_has_n_jobs()
        self.IDEAL_CPU_CORES = find_ideal_cpu_cores()
        
    
    def __str__(self):
        """Returns human readable string reprsentation"""
        attr_str = ("spl, dfl, pp, model")
        return ("Training Pipeline object with attributes:"+attr_str)
    
    def __repr__(self):
        return self.__str__()
                
    ## All Core Methods ##
    
    # Bundle preprocessing and modeling code in a training pipeline
    def create_final_sklearn_pipeline(self, transformer_type, model):
        self.spl = Pipeline(
            steps=[('preprocessor', transformer_type),
                   ('model', model)])
        return self.spl
    
    # Main Method to create, load, preprocessed data based upon problem type
    def prepare_data_for_training(self, train_file_path:str,
                                  idx_col:str, target:str,
                                  random_state:int,
                                  model:object,
                                  test_file_path:str=None,
                                  problem_type="regression",
                                  nrows=None):
        #set problem type
        self.problem_type = problem_type
        # check if given model supports n_jobs aka cpu core based Parallelism
        estimator_name = model.__class__.__name__
        # logger.info(estimator_name)
        # logger.info((self.has_n_jobs)
        if estimator_name in self.has_n_jobs :
            # In order to OS not to kill the job, leave one processor out 
            model.n_jobs = self.IDEAL_CPU_CORES
            self.model = model
        else:
            print(f"{estimator_name} doesn't support parallelism yet! Training will continue on a single thread.")
            self.model = model
        
        # call DataFrameLoader module
        self.dfl = DataFrameLoader().from_csv(
            train_file_path=train_file_path,
            test_file_path=test_file_path,
            idx_col=idx_col,
            target=target,
            random_state=random_state,
            nrows=nrows)
        
        # call PreProcessor module
        self.pp = PreProcessor().preprocess_all_cols(dataframeloader=self.dfl, problem_type=self.problem_type)
        
        # call create final sklearn pipelien method
        self.spl = self.create_final_sklearn_pipeline(transformer_type=self.pp.transformer_type,
                                     model = model)
        # return tmlt
        return self
    
    # Force to update the preprocessor in pipeline
    def update_preprocessor(self,
                            num_cols__imputer=SimpleImputer(strategy='median'),
                            num_cols__scaler=StandardScaler(),
                            cat_cols__imputer=SimpleImputer(strategy='constant'),
                            cat_cols__encoder=OneHotEncoder(handle_unknown='ignore')):
        # change preprocessor
        self.pp = PreProcessor().preprocess_all_cols(self.dfl,
                                                     num_cols__imputer=num_cols__imputer,
                                                     num_cols__scaler=num_cols__scaler,
                                                     cat_cols__imputer=cat_cols__imputer,
                                                     cat_cols__encoder=cat_cols__encoder)
        # recall create final sklearn pipelien method
        self.spl = self.create_final_sklearn_pipeline(transformer_type=self.pp.transformer_type,
                                     model = self.model)
        
    
    # Force to update the model in pipeline
    def update_model(self, model:object):
        #change model
        self.model = model
        # recall create final sklearn pipelien method
        self.spl = self.create_final_sklearn_pipeline(transformer_type=self.pp.transformer_type,
                                     model = self.model)
    
    # cross validation
    def do_cross_validation(self, cv:int, scoring:str):
        scores = cross_val_score(
            estimator=self.spl,
            X=self.dfl.X,
            y=self.dfl.y,
            scoring=scoring,
            cv=cv)
        # Multiply by -1 since sklearn calculates *negative* scoring for some of the metrics
        if "neg_" in scoring:
            scores = -1 * scores
        return scores
        
    # GridSearch
    def do_grid_search(self, param_grid:object, cv:int,
                       scoring:str, n_jobs=None):
        
        if n_jobs is None:
            n_jobs = self.IDEAL_CPU_CORES
        
        # create GridSeachCV instance
        grid_search = GridSearchCV(estimator=self.spl,
                                   param_grid=param_grid,
                                   cv=cv,
                                   scoring=scoring,
                                   n_jobs=n_jobs)
        # now call fit
        grid_search.fit(self.dfl.X, self.dfl.y)
        return grid_search
    
    # do k-fold training
    # test_preds_metric has to be a single sklearn metrics object type such as mean_absoulte_error, acccuracy
    def do_kfold_training(self, n_splits:int, test_preds_metric=None, random_state=42):
        
        """
            This methods returns kfold_metrics_results and test_preds by doing kfold training
            test_preds_metric=None by default, takes only single SKLearn Metrics for your test dataset
            n_splits=5 by default, takes only int value
            random_sate=42, takes only int value

        """ 
        
        #fetch problem type params
        _, val_preds_metrics, _, _ = fetch_params_for_problem_type(self.problem_type)
        
        #create stratified K Folds instance
        kfold = StratifiedKFold(n_splits=n_splits,
                             random_state=random_state,
                             shuffle=True)
        
        # check for test dataset before prediction
        test_preds = None
        if self.dfl.X_test is not None:
            test_preds = np.zeros(self.dfl.X_test.shape[0])
        
        # list contains metrics results for each fold
        kfold_metrics_results = []
        n=0
        for train_idx, valid_idx in kfold.split(self.dfl.X, self.dfl.y):
            # create X_train
            self.dfl.X_train = self.dfl.X.iloc[train_idx]
            # create X_valid
            self.dfl.X_valid = self.dfl.X.iloc[valid_idx]
            # create y_train
            self.dfl.y_train = self.dfl.y[train_idx]
            # create y_valid
            self.dfl.y_valid = self.dfl.y[valid_idx]
            
            # fit
            #TODO use early_stopping_rounds = True for XGBoost based Sklearn Pipeline
            self.spl.fit(self.dfl.X_train, self.dfl.y_train)
            
            
            #TO-DO instead of single metrics use list of metrics and calculate mean using dict
            metric_result = {}
            for metric in val_preds_metrics:
                if ("log_loss" in str(metric.__name__)) or ("roc_auc_score" in str(metric.__name__)):
                    #logger.info("Predicting Probablities!")
                    preds_probs = self.spl.predict_proba(self.dfl.X_valid)[:, 1]
                    metric_result[str(metric.__name__)] = metric(self.dfl.y_valid, preds_probs)

                else:
                    #logger.info("Predicting Score!")
                    preds = self.spl.predict(self.dfl.X_valid)
                    metric_result[str(metric.__name__)] = metric(self.dfl.y_valid, preds)

            #now show value of all the given metrics
            for metric_name, metric_value in metric_result.items():
                logger.info(f"fold: {n+1} {metric_name} : {metric_value}")
            
            #now append each kfold metric_result dict to list
            kfold_metrics_results.append(metric_result)
            
            
            # for test preds
            if self.dfl.X_test is not None and test_preds_metric is not None:
                if ("log_loss" in str(test_preds_metric.__name__)) or ("roc_auc_score" in str(test_preds_metric.__name__)):
                    logger.info("Predicting Test Preds Probablities!")
                    test_preds += self.spl.predict_proba(self.dfl.X_test)[:,1] / kfold.n_splits
                else:
                    test_preds += self.spl.predict(self.dfl.X_test) / kfold.n_splits
            elif self.dfl.X_test is None:
                logger.warn(f"Trying to do Test Predictions but No Test Dataset Provided!")
            
            # In order to better GC, del X_train, X_valid, y_train, y_valid df after each fold is done,
            # they will recreate again next time k-fold is called
            unused_df_lst = [self.dfl.X_train, self.dfl.X_valid, self.dfl.y_train, self.dfl.y_valid]
            del unused_df_lst
            
            # increment fold counter label
            n += 1
        
        #logger.info(f"kfold_metrics_results: {kfold_metrics_results} ")
        mean_metrics_results = kfold_dict_mean(kfold_metrics_results)
        logger.info(f" Mean Metrics Results from all Folds are: {mean_metrics_results}")
        
        return mean_metrics_results, test_preds
    
    # Do optuna bases study optimization for hyperparmaeter search
    def do_xgb_optuna_optimization(self, optuna_db_path:str, use_gpu=False, opt_trials=100,
                                   opt_timeout=360):
        """
            This methods returns and do optuna bases study optimization for hyperparmaeter search
            optuna_db_path is output directory you want to use for storing sql db used for optuna
            use_gpu=False by default, make it True if running on gpu machine
            opt_trials=100 by default, change it based upon need
            opt_timeout=360 by default, timeout value in seconds

        """       
            
        # get params based on problem type
        xgb_model, val_preds_metrics, eval_metric, direction = fetch_params_for_problem_type(self.problem_type)
        
        # Load the dataset in advance for reusing it each trial execution.
        objective = Optuna_Objective(dfl=self.dfl, tmlt=self,
                                     val_preds_metrics=val_preds_metrics,
                                     xgb_model=xgb_model,
                                     xgb_eval_metric=eval_metric,
                                     use_gpu=use_gpu)
        # create sql db in optuna db path
        db_path = os.path.join(optuna_db_path, "params.db")
        
        # now create study
        logger.info(f"Optimization Direction is: {direction}")
        study = optuna.create_study(
            direction=direction,
            study_name="tmlt_autoxgb",
            storage=f"sqlite:///{db_path}",
            load_if_exists=True,
        )
        study.optimize(objective, n_trials=opt_trials, timeout=opt_timeout)
        return study

    
    # helper methods for users before updating preprocessor in pipeline
    def get_preprocessor_best_params_from_grid_search(self, grid_search_object:object):
        pp_best_params = {}
        for k in grid_search_object.best_params_:
            #print(k)
            if 'preprocessor' in k:
                key = k.split('__')[1] + "__" + k.split('__')[2] 
                pp_best_params[key] = grid_search_object.best_params_[k]
        return pp_best_params
    
    # helper methods for users before updating model in pipeline
    def get_model_best_params_from_grid_search(self, grid_search_object:object):
        model_best_params = {}
        for k in grid_search_object.best_params_:
            #print(k)
            if 'model' in k:
                key = k.split('__')[1]
                model_best_params[key] = grid_search_object.best_params_[k]
        return model_best_params

In [None]:
show_doc(TMLT)

<h2 id="TMLT" class="doc_header"><code>class</code> <code>TMLT</code><a href="" class="source_link" style="float:right">[source]</a></h2>

> <code>TMLT</code>()

Represent Tabular ML Toolkit class

Attributes:

spl: A Scikit MLPipeline instance 

dfl: A DataFrameLoader instance 

pp: A PreProcessor instance 

model: The given Model

In [None]:
show_doc(TMLT.prepare_data_for_training)

<h4 id="TMLT.prepare_data_for_training" class="doc_header"><code>TMLT.prepare_data_for_training</code><a href="__main__.py#L43" class="source_link" style="float:right">[source]</a></h4>

> <code>TMLT.prepare_data_for_training</code>(**`train_file_path`**:`str`, **`idx_col`**:`str`, **`target`**:`str`, **`random_state`**:`int`, **`model`**:`object`, **`test_file_path`**:`str`=*`None`*, **`problem_type`**=*`'regression'`*, **`nrows`**=*`None`*)



In [None]:
show_doc(TMLT.do_xgb_optuna_optimization)

<h4 id="TMLT.do_xgb_optuna_optimization" class="doc_header"><code>TMLT.do_xgb_optuna_optimization</code><a href="__main__.py#L226" class="source_link" style="float:right">[source]</a></h4>

> <code>TMLT.do_xgb_optuna_optimization</code>(**`optuna_db_path`**:`str`, **`use_gpu`**=*`False`*, **`opt_trials`**=*`100`*, **`opt_timeout`**=*`360`*)

This methods returns and do optuna bases study optimization for hyperparmaeter search
optuna_db_path is output directory you want to use for storing sql db used for optuna
use_gpu=False by default, make it True if running on gpu machine
opt_trials=100 by default, change it based upon need
opt_timeout=360 by default, timeout value in seconds

In [None]:
# hide
# run the script to build 

from nbdev.export import notebook2script; notebook2script()

Converted 00_dataframeloader.ipynb.
Converted 01_preprocessor.ipynb.
Converted 02_tmlt.ipynb.
Converted 04_optuna_objective.ipynb.
Converted 13_Kaggle_TPS_Tutorial.ipynb.
Converted BAK_index.ipynb.
Converted index.ipynb.
Converted logger.ipynb.
Converted utility.ipynb.
