# Import Libraries

In [1]:
import pandas as pd
import warnings
warnings.simplefilter("ignore")

# Load Dataset
In this example, we use titanic dataset from OpenML.

In [2]:
from sklearn.datasets import fetch_openml

X, y = fetch_openml(data_id=40945, as_frame=True, return_X_y=True, parser='auto')

In [3]:
X.head()

Unnamed: 0,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,boat,body,home.dest
0,1,"Allen, Miss. Elisabeth Walton",female,29.0,0,0,24160,211.3375,B5,S,2.0,,"St Louis, MO"
1,1,"Allison, Master. Hudson Trevor",male,0.9167,1,2,113781,151.55,C22 C26,S,11.0,,"Montreal, PQ / Chesterville, ON"
2,1,"Allison, Miss. Helen Loraine",female,2.0,1,2,113781,151.55,C22 C26,S,,,"Montreal, PQ / Chesterville, ON"
3,1,"Allison, Mr. Hudson Joshua Creighton",male,30.0,1,2,113781,151.55,C22 C26,S,,135.0,"Montreal, PQ / Chesterville, ON"
4,1,"Allison, Mrs. Hudson J C (Bessie Waldo Daniels)",female,25.0,1,2,113781,151.55,C22 C26,S,,,"Montreal, PQ / Chesterville, ON"


In [4]:
X.isnull().sum()

pclass          0
name            0
sex             0
age           263
sibsp           0
parch           0
ticket          0
fare            1
cabin        1014
embarked        2
boat          823
body         1188
home.dest     564
dtype: int64

In [5]:
X.dtypes

pclass          int64
name           object
sex          category
age           float64
sibsp           int64
parch           int64
ticket         object
fare          float64
cabin          object
embarked     category
boat           object
body          float64
home.dest      object
dtype: object

In [6]:
# Target is mapped to strings for later description
y = y.map({"0":"not survived", "1":"survived"})

In [7]:
y.value_counts()

survived
not survived    809
survived        500
Name: count, dtype: int64

In [8]:
y.isnull().sum()

0

In [9]:
y.dtypes

CategoricalDtype(categories=['not survived', 'survived'], ordered=False)

split dataset into train set and test set randomly

In [10]:
from sklearn.model_selection import train_test_split

X, X_test, y, y_test = train_test_split(X, y, shuffle=True, random_state=0, test_size=0.3)
X = X.reset_index(drop=True)
y = y.reset_index(drop=True)
X_test = X_test.reset_index(drop=True)
y_test = y_test.reset_index(drop=True)

In [11]:
X.head()

Unnamed: 0,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,boat,body,home.dest
0,2,"Mellinger, Miss. Madeleine Violet",female,13.0,0,1,250644,19.5,,S,14.0,,"England / Bennington, VT"
1,2,"Wells, Miss. Joan",female,4.0,1,1,29103,23.0,,S,14.0,,"Cornwall / Akron, OH"
2,2,"Duran y More, Miss. Florentina",female,30.0,1,0,SC/PARIS 2148,13.8583,,C,12.0,,"Barcelona, Spain / Havana, Cuba"
3,3,"Scanlan, Mr. James",male,,0,0,36209,7.725,,Q,,,
4,3,"Bradley, Miss. Bridget Delia",female,22.0,0,0,334914,7.725,,Q,13.0,,"Kingwilliamstown, Co Cork, Ireland Glens Falls..."


# Define Pipeline

## initiate pipeline

In [12]:
from imker import Pipeline, Task, TaskConfig, BaseTask, BaseProcessor, BaseSplitter, BaseModel
pipe = Pipeline(repo_dir="../../../cache", exp_name="example", pipeline_name="titanic")

## define preprocessing tasks

In this section, let's define TimeSleep task that just wait 2 seconds to show cache function of task, and set it to pipeline

In [13]:
from typing import Any, Union
import time

class DropCols(BaseTask):
    def __init__(self, cols:list) -> None:
        super().__init__()
        self.cols = cols
    
    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        X = X.drop(self.cols, axis=1)
        return X

class TimeSleep(BaseTask):
    def transform(self, X: Any) -> Any:
        time.sleep(1)
        return X

class DTypeConverter(BaseTask):
    def __init__(self, dtype: str):
        self.dtype = dtype

    def transform(self, X: Union[pd.DataFrame, pd.Series]) -> Union[pd.DataFrame, pd.Series]:
        X = X.astype(self.dtype)
        return X
    

## define PreProcessor using each tasks.
Task can accept only argument TaskConfig. Using Task class allows task to perform as like PyTorch layers. First call of task run fit method and transform method. Once task called, task run only transform method from second call.

In [14]:
from sklearn.preprocessing import OrdinalEncoder, LabelEncoder
from imker.store.cacher import CSVCacher

class PreProcessor(BaseProcessor):
    def __init__(self):
        self.drop = Task(TaskConfig(task=DropCols, 
                                    init_params={"cols":["name", "cabin", "ticket", "body", "boat", "home.dest"]}, 
                                    ))
        self.cat_encoder = Task(TaskConfig(task=OrdinalEncoder, # you can use scikit learn class as it is
                                           init_params={"handle_unknown":"use_encoded_value", 
                                                        "unknown_value":-1, 
                                                        "encoded_missing_value":-999}, 
                                            ))
        self.target_label_enc = Task(TaskConfig(task=LabelEncoder))
        self.dtype_converter = Task(TaskConfig(task=DTypeConverter, 
                                               init_params={"dtype":"int8"}))
        self.sleep = Task(TaskConfig(task=TimeSleep, 
                                     cache=True))# If you pass True, output of the task will be cached.
        
    def forward(self, X, y=None):
        X = self.drop(X)
        X[["sex", "embarked"]] = self.cat_encoder(X[["sex", "embarked"]])
        X[["sex", "embarked"]] = self.dtype_converter(X[["sex", "embarked"]])
        if y is not None:
            y = self.target_label_enc(y) # target variable can be transformed as well as features
        X = self.sleep(X)
        return X, y


## set preprocessing to pipeline and see the results

In [15]:
pipe.set_preprocessor(PreProcessor)

In [16]:
%%time
pipe.test_preprocessing(X, y)

CPU times: user 34 ms, sys: 0 ns, total: 34 ms
Wall time: 1.03 s


(     pclass  sex      age  sibsp  parch     fare  embarked
 0         2    0  13.0000      0      1  19.5000         2
 1         2    0   4.0000      1      1  23.0000         2
 2         2    0  30.0000      1      0  13.8583         0
 3         3    1      NaN      0      0   7.7250         1
 4         3    0  22.0000      0      0   7.7250         1
 ..      ...  ...      ...    ...    ...      ...       ...
 911       3    0   0.1667      1      2  20.5750         2
 912       3    1      NaN      0      0   8.0500         2
 913       3    0      NaN      0      0   7.7333         1
 914       2    0  20.0000      0      0  36.7500         2
 915       3    0  32.0000      1      1  15.5000         1
 
 [916 rows x 7 columns],
 array([1, 1, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1,
        0, 0, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0,
        1, 0, 1, 0, 0, 1, 0, 1,

TimeSleep task is cached, so it doesn't take a second to run from second time

In [17]:
%%time
pipe.test_preprocessing(X, y)

CPU times: user 9.84 ms, sys: 5.77 ms, total: 15.6 ms
Wall time: 13.3 ms


(     pclass  sex      age  sibsp  parch     fare  embarked
 0         2    0  13.0000      0      1  19.5000         2
 1         2    0   4.0000      1      1  23.0000         2
 2         2    0  30.0000      1      0  13.8583         0
 3         3    1      NaN      0      0   7.7250         1
 4         3    0  22.0000      0      0   7.7250         1
 ..      ...  ...      ...    ...    ...      ...       ...
 911       3    0   0.1667      1      2  20.5750         2
 912       3    1      NaN      0      0   8.0500         2
 913       3    0      NaN      0      0   7.7333         1
 914       2    0  20.0000      0      0  36.7500         2
 915       3    0  32.0000      1      1  15.5000         1
 
 [916 rows x 7 columns],
 array([1, 1, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1,
        0, 0, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0,
        1, 0, 1, 0, 0, 1, 0, 1,

imker provides simple repository viewer to see outputs of each tasks.

In [18]:
from imker import RepositoryViewer
viewer = RepositoryViewer(repo_dir="../../../cache/")
viewer.search_repo()

Unnamed: 0,task_id,lastUpdatedDate,repo,method,processor,cachefile,config
0,5,2023-07-09T09:30:49.209410,cache,transform,TimeSleep,../../../cache/task/transform/TimeSleep/51397c...,../../../cache/task/transform/TimeSleep/51397c...
1,4,2023-07-09T09:30:48.201331,cache,fit,TimeSleep,../../../cache/task/fit/TimeSleep/e36debdeb673...,../../../cache/task/fit/TimeSleep/e36debdeb673...
2,3,2023-07-09T09:30:48.189330,cache,fit,OrdinalEncoder,../../../cache/task/fit/OrdinalEncoder/0f9a614...,../../../cache/task/fit/OrdinalEncoder/0f9a614...
3,2,2023-07-09T09:30:48.201331,cache,fit,LabelEncoder,../../../cache/task/fit/LabelEncoder/26a33793a...,../../../cache/task/fit/LabelEncoder/26a33793a...
4,1,2023-07-09T09:30:48.185329,cache,fit,DropCols,../../../cache/task/fit/DropCols/074bd8c804115...,../../../cache/task/fit/DropCols/074bd8c804115...
5,0,2023-07-09T09:30:48.197330,cache,fit,DTypeConverter,../../../cache/task/fit/DTypeConverter/ae1dc0e...,../../../cache/task/fit/DTypeConverter/ae1dc0e...


In [19]:
# load_cache method accepts int number of task_id or strings of path 
viewer.load_cache(5)

Unnamed: 0,pclass,sex,age,sibsp,parch,fare,embarked
0,2,0,13.0000,0,1,19.5000,2
1,2,0,4.0000,1,1,23.0000,2
2,2,0,30.0000,1,0,13.8583,0
3,3,1,,0,0,7.7250,1
4,3,0,22.0000,0,0,7.7250,1
...,...,...,...,...,...,...,...
911,3,0,0.1667,1,2,20.5750,2
912,3,1,,0,0,8.0500,2
913,3,0,,0,0,7.7333,1
914,2,0,20.0000,0,0,36.7500,2


In [20]:
viewer.load_config(3)

{'init_params': {'encoded_missing_value': -999,
  'handle_unknown': 'use_encoded_value',
  'unknown_value': -1},
 'fit_params': {},
 'transform_params': {},
 'predict_params': {},
 'cache_processor': 'PickledBz2Cacher',
 'seed': 42}

## define data splitter for validation

In [21]:
from sklearn.model_selection import StratifiedKFold

class Splitter(BaseSplitter):
    def __init__(self):
        self.splitter = Task(TaskConfig(task=StratifiedKFold, 
                                    init_params={"n_splits":5, "random_state":0, "shuffle":True}))

    def get_n_splits(self):
        return self.splitter.get_n_splits()
    
    def split(self, X, y=None):
        return self.splitter(X, y)

## set splitter to pipeline

In [22]:
pipe.set_splitter(Splitter)

## define oof preprocessing

In [23]:
class FillNa(BaseTask):
    def __init__(self, values:Union[dict, float]) -> None:
        super().__init__()
        self.values = values

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        return X.fillna(self.values)

In [24]:
from sklearn.preprocessing import StandardScaler

class OOFPreProcessor(BaseProcessor):
    def __init__(self):
        self.std_scaler = Task(TaskConfig(task=StandardScaler))
        self.fillna = Task(TaskConfig(task=FillNa, 
                                      init_params={"values":-999}))

    def forward(self, X, y=None):
        X[["age", "fare"]] = self.std_scaler(X[["age", "fare"]])
        X = self.fillna(X)
        return X, y

## set oof preprocessing to pipeline

In [25]:
pipe.set_oof_preprocessor(OOFPreProcessor)

## confirm oof subsets

In [26]:
g = pipe.test_oof_preprocessing(X, y)
oof = next(g)
oof.keys()

odict_keys(['X_train', 'y_train', 'X_valid', 'y_valid', 'idx_train', 'idx_valid'])

In [27]:
oof.X_train.head()

Unnamed: 0,pclass,sex,age,sibsp,parch,fare,embarked
1,2,0,-1.8299,1,1,-0.177361,2
2,2,0,0.001725,1,0,-0.37615,0
3,3,1,-999.0,0,0,-0.509521,1
4,3,0,-0.561852,0,0,-0.509521,1
5,3,1,-999.0,0,0,-0.506168,1


In [28]:
oof.X_valid.head()

Unnamed: 0,pclass,sex,age,sibsp,parch,fare,embarked
0,2,0,-1.195876,0,1,-0.25347,2
10,2,1,-1.970795,1,1,-0.177361,2
11,2,0,0.847091,1,0,-0.112125,2
16,2,1,-0.35051,0,0,-0.394814,2
18,1,0,0.001725,0,0,1.636741,0


## define model

In [29]:
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from lightgbm.callback import early_stopping
from lightgbm import LGBMClassifier

class Classifier(BaseModel):
    def __init__(self):
        self.lr = Task(TaskConfig(LogisticRegression))
        self.knn = Task(TaskConfig(KNeighborsClassifier))
        self.lgb = Task(TaskConfig(task=LGBMClassifier, 
                                   init_params={"n_estimators":200, 
                                                "n_jobs":1}, 
                                   fit_params={"callbacks":[
                                       early_stopping(stopping_rounds=30)
                                   ]}))

    def forward(self, X, y=None, eval_set=None, proba=False):
        return {"lr": self.lr(X, y, proba=proba), 
                "knn":self.knn(X, y, proba=proba), 
                "lgb":self.lgb(X, y, eval_set=eval_set, proba=proba)}

## set predictor to pipeline

In [30]:
pipe.set_model(Classifier)

## Train pipeline

In [31]:
pipe.train(X, y)

Training until validation scores don't improve for 30 rounds
Early stopping, best iteration is:
[32]	valid_0's binary_logloss: 0.393501
Training until validation scores don't improve for 30 rounds
Early stopping, best iteration is:
[29]	valid_0's binary_logloss: 0.393851
Training until validation scores don't improve for 30 rounds
Early stopping, best iteration is:
[22]	valid_0's binary_logloss: 0.4582
Training until validation scores don't improve for 30 rounds
Early stopping, best iteration is:
[17]	valid_0's binary_logloss: 0.493462
Training until validation scores don't improve for 30 rounds
Early stopping, best iteration is:
[17]	valid_0's binary_logloss: 0.496432


<imker.pipeline.pipeline.Pipeline at 0x7ff403f9f640>

## validate with 2 metrics

In [32]:
from sklearn.metrics import accuracy_score, f1_score
pipe.set_metrics([accuracy_score, f1_score])

In [33]:
val_preds = pipe.validate(X, y)

In [34]:
pipe.get_scores()

Unnamed: 0,Unnamed: 1,fold0,fold1,fold2,fold3,fold4
lr,accuracy_score,0.798913,0.797814,0.819672,0.775956,0.743169
lr,f1_score,0.725926,0.733813,0.744186,0.691729,0.666667
knn,accuracy_score,0.798913,0.781421,0.781421,0.743169,0.710383
knn,f1_score,0.725926,0.705882,0.701493,0.635659,0.629371
lgb,accuracy_score,0.847826,0.830601,0.803279,0.765027,0.786885
lgb,f1_score,0.774194,0.75969,0.709677,0.632479,0.692913


In [35]:
pd.DataFrame(val_preds)

Unnamed: 0,lr,knn,lgb
0,1,1,1
1,1,1,1
2,1,1,1
3,0,0,0
4,1,1,1
...,...,...,...
911,1,0,0
912,0,0,0
913,1,1,1
914,1,1,1


If you want to get the results of validation subsets as probability, you just specify the proba argument as True

In [36]:
val_preds = pipe.validate(X, y, proba=True, calc_metrics=False)

In [37]:
val_preds.lr

array([[0.21607822, 0.78392178],
       [0.26313298, 0.73686702],
       [0.17453992, 0.82546008],
       ...,
       [0.41097422, 0.58902578],
       [0.221676  , 0.778324  ],
       [0.35114269, 0.64885731]])

## inference test data

In [38]:
test_preds = pipe.inference(X_test)

In [39]:
test_preds.keys()

odict_keys(['lr', 'knn', 'lgb'])

In [40]:
test_preds.lgb[:30]

array([0. , 1. , 0. , 0. , 0. , 1. , 0. , 0. , 0. , 0. , 0. , 0.2, 0.6,
       0.8, 1. , 0. , 1. , 0. , 1. , 0. , 0. , 0. , 0. , 1. , 0.4, 1. ,
       0. , 0. , 0. , 1. ])

Probability is easy to get

In [41]:
test_preds = pipe.inference(X_test, proba=True)

In [42]:
test_preds.lr[:30]

array([[0.85458997, 0.14541003],
       [0.2392615 , 0.7607385 ],
       [0.81433214, 0.18566786],
       [0.90030581, 0.09969419],
       [0.76920669, 0.23079331],
       [0.41265946, 0.58734054],
       [0.65905569, 0.34094431],
       [0.65815252, 0.34184748],
       [0.77254042, 0.22745958],
       [0.90090945, 0.09909055],
       [0.82132764, 0.17867236],
       [0.86090982, 0.13909018],
       [0.83079222, 0.16920778],
       [0.32824226, 0.67175774],
       [0.20922674, 0.79077326],
       [0.77301994, 0.22698006],
       [0.28880779, 0.71119221],
       [0.62451712, 0.37548288],
       [0.13292815, 0.86707185],
       [0.8545747 , 0.1454253 ],
       [0.85468096, 0.14531904],
       [0.95159217, 0.04840783],
       [0.85460243, 0.14539757],
       [0.20033224, 0.79966776],
       [0.88039689, 0.11960311],
       [0.2216245 , 0.7783755 ],
       [0.7730252 , 0.2269748 ],
       [0.8369871 , 0.1630129 ],
       [0.89482262, 0.10517738],
       [0.32867321, 0.67132679]])

# Reproduce pipeline

In [43]:
pipe2 = Pipeline.load(repo_dir="../../../cache/", exp_name="example", pipeline_name="titanic", 
                      preprocessor=PreProcessor, 
                      splitter=Splitter,
                      oof_preprocessor=OOFPreProcessor, 
                      model=Classifier
                      )
pipe2

<imker.pipeline.pipeline.Pipeline at 0x7ff3fa0443d0>

In [44]:
all(pipe2.inference(X_test).lr == pipe.inference(X_test).lr)

True