# Entire Pipeline
data-preprocessing >> [train-test-split, train-eval-model] >> element-mlflow-model-registry, update-thresholds

In [1]:
#!/usr/bin/env python
# coding: utf-8
import argparse
import datetime
import numpy as np
import os
from pathlib import Path
import pandas as pd
import sys


# Imports for vertex pipeline
from google.cloud import aiplatform
import google_cloud_pipeline_components
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from kfp.v2 import compiler
from kfp.v2.dsl import (
    Artifact,
    component,
    pipeline,
    Input,
    Output,
    Model,
    Dataset,
    InputPath,
    OutputPath,
)
import kfp.components as comp
import kfp.dsl as dsl
import warnings
warnings.filterwarnings("ignore")



sys.path.append(str(Path(".").absolute().parent))
sys.path.append(str(Path(".").absolute().parent) + "/utils")
sys.path.append(str(Path(".").absolute().parent.parent))
sys.path.append(str(Path(".").absolute().parent.parent.parent))
import pipeline_utils

In [2]:
try:
    args = pipeline_utils.get_args()
except:
    parser = argparse.ArgumentParser()
    parser.add_argument("--MODE", required=True, type=str)
    parser.add_argument("--STAGE1_FLAG", required=True, type=str)
    parser.add_argument("--ENSEMBLE_FLAG", required=True, type=str)
    parser.add_argument("--RF_CLF_MODEL_PATH", required=True, type=str)
    parser.add_argument("--LOGISTIC_CLF_MODEL_PATH", required=True, type=str)
    parser.add_argument("--STAGE1_NN_MODEL_PATH", required=True, type=str)
    parser.add_argument("--GNB_MODEL_PATH", required=True, type=str)
    parser.add_argument("--STG1_FEATURE_SELECTOR_MODEL_PATH", required=True, type=str)
    parser.add_argument("--NOSALES_MODEL_PATH", required=True, type=str)
    sys.args = [
        "--MODE", "test",
        "--STAGE1_FLAG", "train",
        "--ENSEMBLE_FLAG", "train",
        "--RF_CLF_MODEL_PATH", "",
        "--LOGISTIC_CLF_MODEL_PATH", "",
        "--STAGE1_NN_MODEL_PATH", "",
        "--GNB_MODEL_PATH", "",
        "--STG1_FEATURE_SELECTOR_MODEL_PATH", "",
        "--NOSALES_MODEL_PATH", "",
    ]
    args = parser.parse_args(sys.args)


usage: ipykernel_launcher.py [-h] --MODE MODE --STAGE1_FLAG STAGE1_FLAG
                             --ENSEMBLE_FLAG ENSEMBLE_FLAG --RF_CLF_MODEL_PATH
                             RF_CLF_MODEL_PATH --LOGISTIC_CLF_MODEL_PATH
                             LOGISTIC_CLF_MODEL_PATH --STAGE1_NN_MODEL_PATH
                             STAGE1_NN_MODEL_PATH --GNB_MODEL_PATH
                             GNB_MODEL_PATH --STG1_FEATURE_SELECTOR_MODEL_PATH
                             STG1_FEATURE_SELECTOR_MODEL_PATH
                             --NOSALES_MODEL_PATH NOSALES_MODEL_PATH
ipykernel_launcher.py: error: the following arguments are required: --MODE, --STAGE1_FLAG, --ENSEMBLE_FLAG, --RF_CLF_MODEL_PATH, --LOGISTIC_CLF_MODEL_PATH, --STAGE1_NN_MODEL_PATH, --GNB_MODEL_PATH, --STG1_FEATURE_SELECTOR_MODEL_PATH, --NOSALES_MODEL_PATH


In [3]:
# Time-stamps
from_date = ( datetime.datetime.now()  - datetime.timedelta(days=7*8) ).strftime("%Y-%m-%d")
to_date   = ( datetime.datetime.now()  - datetime.timedelta(days=1) ).strftime("%Y-%m-%d")
TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
################################
# Parameters from settings.yml
################################
PARAMS = pipeline_utils.yaml_import('settings.yml')


# Env Flag for which env to run and Service Account
ENV = PARAMS['env_flag']
SERVICE_ACCOUNT = PARAMS['envs'][ENV]['SERVICE_ACCOUNT']

# GCP Project ID, Region, Base Image and MLFlow Image
PROJECT_ID = PARAMS['envs'][ENV]['PROJECT_ID']
REGION = PARAMS['envs'][ENV]['REGION']

BASE_IMAGE = PARAMS['envs'][ENV]['BASE_IMAGE']
MLFLOW_IMAGE = PARAMS['envs'][ENV]['MLFLOW_IMAGE']

# Pipeline Run Flag
RUN_PIPELINE = PARAMS['envs'][ENV]['RUN_PIPELINE']

# Training Pipeline
NOSALE_PIPELINE_ROOT = PARAMS['envs'][ENV]['PIPELINE_ROOT'] # GCS bucket
NOSALE_PIPELINE_NAME = PARAMS['envs'][ENV]['PIPELINE_NAME'] # "oyi-nosales-model-pipeline-dev"
NOSALE_PIPELINE_JSON = NOSALE_PIPELINE_NAME + ".json" 
LATEST_NOSALES_PATH = PARAMS['envs'][ENV]['LATEST_NOSALES_MODEL_PATH'] 
LATEST_NOSALES_PATH_JSON = LATEST_NOSALES_PATH + ".json" #PARAMS['envs'][ENV]['LATEST_PIPELINE_PATH'] # 'gs://oyi-ds-vertex-pipeline-bucket-nonprod/latest_training_pipeline_dev.json'
TMP_NOSALE_JSON = os.path.join("/tmp", NOSALE_PIPELINE_JSON)

# Club Threshold Pipeline
CLUB_THRESH_PIPELINE_ROOT = PARAMS['envs'][ENV]['CLUB_THRESH_PIPELINE_ROOT'] # GCS bucket
CLUB_THRESH_PIPELINE_NAME = PARAMS['envs'][ENV]['CLUB_THRESH_PIPELINE_NAME'] # "oyi-ds-club-score-cutoff-pipeline-bucket-dev"
CLUB_THRESH_PIPELINE_JSON = CLUB_THRESH_PIPELINE_NAME + ".json" #PARAMS['envs'][ENV]['CLUB_THRESH_PIPELINE_JSON']
LATEST_CLUB_THRESH_PATH = PARAMS['envs'][ENV]['CLUB_THRESH_PATH']
LATEST_CLUB_THRESH_PATH_JSON = LATEST_CLUB_THRESH_PATH + ".json" #'gs://oyi-ds-club-score-cutoff-pipeline-bucket-nonprod/latest_pipeline_dev.json'
TMP_CLUB_THRESH_JSON = os.path.join("/tmp", CLUB_THRESH_PIPELINE_JSON)


TRAINING_TABLE_NAME = PARAMS['envs'][ENV]['TRAINING_TABLE_NAME']
TRAINING_DATA_BQ_QUERY = f'select * from {TRAINING_TABLE_NAME}'

MLFLOW_EXP_NAME = PARAMS['envs'][ENV]['MLFLOW_EXP_NAME']
MODEL_REGISTRY_NAME = PARAMS['envs'][ENV]['MODEL_REGISTRY_NAME']
 
# Matches on non-word, non-regular-punctuation characters.
MATCHER = r"""[^a-zA-Z0-9'"!@#$%\^&*()\[\]{}:;<>?,.-=_+ ]+""" 

print(f"ENV: {ENV}, \nPROJECT_ID: {PROJECT_ID}, \nBASE_IMAGE: {BASE_IMAGE}, \nMLFLOW_IMAGE: {MLFLOW_IMAGE}, \n\nNOSALE_PIPELINE_NAME: {NOSALE_PIPELINE_NAME}, \nLATEST_NOSALES_PATH: {LATEST_NOSALES_PATH}")
print(f"\nCLUB_THRESH_PIPELINE_NAME: {CLUB_THRESH_PIPELINE_NAME}, \nLATEST_CLUB_THRESH_PATH: {LATEST_CLUB_THRESH_PATH}")



ENV: dev, 
PROJECT_ID: wmt-mlp-p-oyi-ds-or-oyi-dsns, 
BASE_IMAGE: gcr.io/wmt-mlp-p-oyi-ds-or-oyi-dsns/oyi-vertex-pipeline-dev:latest, 
MLFLOW_IMAGE: gcr.io/wmt-mlp-p-oyi-ds-or-oyi-dsns/mlflow-image-dev:latest, 

NOSALE_PIPELINE_NAME: oyi-nosales-model-pipeline-dev, 
LATEST_NOSALES_PATH: gs://oyi-ds-vertex-pipeline-bucket-nonprod/latest_nosales_model_output_dev

CLUB_THRESH_PIPELINE_NAME: oyi-ds-club-score-cutoff-pipeline-bucket-dev, 
LATEST_CLUB_THRESH_PATH: gs://oyi-ds-club-score-cutoff-pipeline-bucket-nonprod/latest_club_thresh_chain_dev


## Training Pipeline


In [4]:
# `@dsl.component` decorator transforms a Python function into a component, that can be used within a pipeline. 
# You can specify the type annotations on the parameters and return values. The type annotations and return value enable the KFP compiler 
# to type check any data that is passed. 
                
@component(base_image=BASE_IMAGE)
def data_preprocessing(
    training_data_bq_query_input: str,
    matcher: str,
    project_id: str,
    env: str,
    pipeline_root: str,
    training_data_output: Output[Dataset]):
    
    import pandas as pd
    from datetime import timedelta
    import utils
    from google.cloud import bigquery

    client = bigquery.Client(project=project_id)
    data = client.query(training_data_bq_query_input).to_dataframe()
    nosales_data = data[
      (data.report_type!='C') &
      (data.display_ind == "Display") &
      (data.oh_qty>=0)]
    nosales_data["item_desc"] = nosales_data['item_desc'].str.replace(matcher, "", regex=True)
    nosales_data['run_date'] = pd.to_datetime(nosales_data['run_date'])
    max_date = nosales_data['run_date'].max()
    cutoff_date = (max_date - timedelta(days=182)).strftime('%Y-%m-%d')
    nosales_data = nosales_data[nosales_data.run_date > cutoff_date]
    
    nosales_data.replace("No Action Taken, already OFS", "No Action Taken, already out for sale", inplace=True)
    nosales_data.replace('Updated the NOSALES type with scrubber event', "No Action Taken, already out for sale", inplace=True)
    nosales_data.sort_values(by = ['run_date','club_nbr','item_nbr','event_ts'],inplace = True)
    nosales_data.drop_duplicates(['old_nbr','club_nbr','run_date'], keep='first',inplace = True)
    
    nosales_ext = utils.calculate_all_level_tpr(df=nosales_data, env=env, pipeline_root=pipeline_root,save=True)
    nosales_ext.fillna(0, inplace=True)
    nosales_ext.to_csv(training_data_output.path, index=False)

In [5]:
@component(base_image=BASE_IMAGE)
def train_test_split(
    nosales_ext_input: Input[Dataset],
    nosales_train_ext_output: Output[Dataset],
    nosales_test_ext_output: Output[Dataset],
    nosales_train_usampled_output: Output[Dataset]
    
):
    import pandas as pd
    from datetime import timedelta
    
    nosales_ext = pd.read_csv(nosales_ext_input.path)
    nosales_ext['run_date'] = pd.to_datetime(nosales_ext['run_date'])
    split_date = (nosales_ext.run_date.max() - timedelta(days=50)).strftime('%Y-%m-%d')
    nosales_train_ext = nosales_ext[pd.to_datetime(nosales_ext.run_date) < split_date].copy() 
    nosales_test_ext  = nosales_ext[pd.to_datetime(nosales_ext.run_date) >= split_date].copy() 

    x=nosales_train_ext.shape[0]
    y=nosales_test_ext.shape[0]
    print(f"split_date is {split_date}.")
    print("Train/Test ratio:", x*100/(x+y))
    seed = 2019
    frac = 11
    grouped = nosales_train_ext[nosales_train_ext.event_note == "No Action Taken, already out for sale"].groupby('club_nbr')
    u1 = grouped.apply(lambda x: x.sample(n=int(x.shape[0]/frac),  random_state=seed)).reset_index(drop=True)

    u2 = nosales_train_ext[nosales_train_ext.event_note != "No Action Taken, already out for sale"]

    nosales_train_usampled = pd.concat([u1, u2])
    nosales_train_usampled = nosales_train_usampled.sample(frac=1)
    print(nosales_train_usampled.shape)
    nosales_train_usampled.event_note.value_counts()
    
    nosales_train_ext.to_csv(nosales_train_ext_output.path, index=False)
    nosales_test_ext.to_csv(nosales_test_ext_output.path, index=False)
    nosales_train_usampled.to_csv(nosales_train_usampled_output.path, index=False)
    

In [6]:
@component(base_image=BASE_IMAGE)
def train_eval_model(
    nosales_ext_input: Input[Dataset],
    nosales_train_ext_input: Input[Dataset],
    nosales_test_ext_input: Input[Dataset],
    nosales_train_usampled_input: Input[Dataset],
    mode: str,
    stage1_flag: str,
    ensemble_flag: str,
    rf_clf_model_path_input: str,
    logistic_clf_model_path_input: str,
    stage1_nn_model_path_input: str,
    gnb_model_path_input: str,
    stg1_feature_selector_model_path_input: str,
    nosales_model_path_input: str,
    latest_nosales_model_path_input: str,
    project_id: str,
    region: str,
    timestamp: str,
    rf_clf_model_output: Output[Model],
    logistic_clf_model_output: Output[Model],
    stage1_nn_model_output: Output[Model],
    gnb_model_output: Output[Model],
    stg1_feature_selector_model_output: Output[Model],
    nosales_model_output: Output[Model],
    nosales_test_ext_output: Output[Dataset]
):
    import os 
    import pandas as pd
    from sklearn.pipeline import Pipeline, make_pipeline
    import utils
    import diagnosis_utils
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.naive_bayes import GaussianNB
    from keras.wrappers.scikit_learn import KerasClassifier
    from sklearn.cluster import KMeans
    import pickle
    from google.cloud import storage, aiplatform
    
    nosales_ext = pd.read_csv(nosales_ext_input.path)
    nosales_train_ext = pd.read_csv(nosales_train_ext_input.path)
    nosales_test_ext = pd.read_csv(nosales_test_ext_input.path)
    nosales_train_usampled = pd.read_csv(nosales_train_usampled_input.path)
    
    nosales_ext['run_date'] = pd.to_datetime(nosales_ext['run_date'])
    nosales_train_ext['run_date'] = pd.to_datetime(nosales_train_ext['run_date'])
    nosales_test_ext['run_date'] = pd.to_datetime(nosales_test_ext['run_date'])
    nosales_train_usampled['run_date'] = pd.to_datetime(nosales_train_usampled['run_date'])
    
    tpr_features = [col for col in nosales_train_ext.columns if '_tpr' in col]  # len(tpr_features) : 45

    numerical_features= ['gap_days','exp_scn_in_nosale_period', 'unit_retail','oh_qty','avg_sales_interval']
    numerical_features.extend(tpr_features)
    categorical_features = ['club_nbr','state','cat']

    all_features = numerical_features + categorical_features
    target = ['event_note']

    top_features = list(['oh_qty_log',  'club_nbr_cat_update_loc_tpr_log',  'club_nbr_cat_new_price_sign_tpr_log',  'club_nbr_update_loc_tpr_log',
    'club_nbr_new_price_sign_tpr_log',  'club_nbr_cat_add_to_picklist_tpr_log',  'item_nbr_update_ohq_tpr_log',
    'item_nbr_add_to_picklist_tpr_log',  'club_nbr_add_to_picklist_tpr_log',  'avg_sales_interval_log', 
    'club_nbr_cat_no_action_taken_tpr_log',  'club_nbr_no_action_taken_tpr_log',  'item_nbr_no_action_taken_tpr_log',
    'cat_add_to_picklist_tpr_log',  'unit_retail_log',  'exp_scn_in_nosale_period_log',  'club_nbr_cat_update_ohq_tpr_log', 
    'cat_update_ohq_tpr_log',  'club_nbr_update_ohq_tpr_log',  'state_cat_add_to_picklist_tpr_log',  'reg_cat_update_ohq_tpr_log',
    'state_cat_new_price_sign_tpr_log',  'mkt_cat_new_price_sign_tpr_log',  'mkt_cat_update_ohq_tpr_log', 
    'reg_cat_add_to_picklist_tpr_log',  'state_cat_update_ohq_tpr_log',  'cat_new_price_sign_tpr_log', 
    'mkt_cat_update_loc_tpr_log',  'mkt_update_loc_tpr_log',  'mkt_new_price_sign_tpr_log', 
    'mkt_cat_add_to_picklist_tpr_log',  'mkt_no_action_taken_tpr_log',  'reg_no_action_taken_tpr_log', 
    'cat_no_action_taken_tpr_log',  'mkt_cat_no_action_taken_tpr_log',  'state_cat_update_loc_tpr_log', 
    'gap_days_log',  'reg_new_price_sign_tpr_log',  'mkt_update_ohq_tpr_log',  'state_cat_no_action_taken_tpr_log'])

    if mode == 'test':
        verbose_flag = True
    else:
        verbose_flag = False


    feature_flags = {'kmeans_clustering': False}

    class_weights = dict(nosales_train_usampled.event_note.value_counts()[0]/nosales_train_usampled.event_note.value_counts()[:])


    # pipeline: location-feat
    location_features_tf= Pipeline([
        ('select_loc', utils.DataFrameSelector(['sales_floor_location']))
    ])

    # pipeline: time-feat
    time_features_tf= Pipeline([
        ('select_rundate', utils.DataFrameSelector(['run_date'])),
        ('time_featurize', utils.TimeExtractor())
    ])


    # pipeline: other-catg-feat
    add_cat_tf= Pipeline([
        ('select_other_cat', utils.DataFrameSelector(['club_nbr','cat','state']))
    ])  


    # pipeline: K-means clustering
    kmeans_tf = make_pipeline(
        utils.DataFrameSelector(numerical_features),
        utils.MinMaxScalerTransformer(),
        utils.ModelTransformer(KMeans(2))
    )

    ######################################## Assembling 'Catg' n 'Numeric' Features  #####################################

    # list(catg pipelines)
    list_of_pipelines_for_catg_feat = [
        ('loc_features',location_features_tf),
        ('time_features',time_features_tf),
        ('other_cat_features', add_cat_tf)
    ]
    if feature_flags['kmeans_clustering']:
        list_of_pipelines_for_catg_feat.append(('clusters', kmeans_tf))


    # pipeline: encoding the catg features.
    cat_tf = Pipeline([
        ('combine_cats', utils.ColumnMerge(transformer_list=list_of_pipelines_for_catg_feat)),
        ('cat_featurize', utils.CategoryFeaturizer())
    ])


    # pipeline: numeric_features + log-transformation   
    num_features_tf= Pipeline([
        ('select_num', utils.DataFrameSelector(numerical_features)),
        ('log', utils.LogFeaturizer()),
        ('select_top_features', utils.DataFrameSelector(top_features))
    ])

    stage2_init_feature_num = 20
    num_features_tf2= Pipeline([
        ('select_num', utils.DataFrameSelector(numerical_features)),
        ('log', utils.LogFeaturizer()),
        ('select_top_features', utils.DataFrameSelector(top_features[:stage2_init_feature_num]))
    ])


    # all_feat => catg_feat + numerical_feat
    add_all_tf= utils.ColumnMerge([
        ('num_features',num_features_tf),
        ('cat_features',cat_tf)
    ])

    ############################################################## Final pipelines ######################################################################

    # Lone classifier-pipelines and pre-processors

    #1
    rf_clf = RandomForestClassifier(n_jobs=-1, criterion='gini',n_estimators=50, max_depth=7,max_features='sqrt',
                                    class_weight = class_weights )

    #2
    logistic_clf = LogisticRegression(n_jobs=-1, multi_class='multinomial', solver='lbfgs', max_iter=1000, penalty='l2', class_weight=class_weights)
    

    #3
    gnb = utils.CustomizedGaussianNB()

    #4
    stage1_nn = utils.Stage1_NeuralNetwork(num_classes=5, batch_size=128, epochs=25, verbose=verbose_flag)


    stage1_classifiers = {'rf_clf':rf_clf, 'logistic_clf':logistic_clf, 'stage1_nn':stage1_nn, 'gnb':gnb}

    stage2_nn_input_dimen = stage2_init_feature_num + len(stage1_classifiers)*5
    stage2_estimator = KerasClassifier(build_fn=utils.stage2_nn, input_dimen=stage2_nn_input_dimen, epochs=5, batch_size=128, verbose=verbose_flag)
    
    ##set flags when in mode: 'test'#####
    # True: if you want to save stage1 models during test. Will automatically set to False when in prod
    s1_save_flag = True

    # Stage 1 models
    ####################################
    # Force flag to be 'train' during prod
    if mode == 'prod':
        s1_save_flag = False
        stage1_flag = 'train'

    stg1_feature_selector = num_features_tf



    if stage1_flag == 'train': 
        print("Training and saving models...")
        X_train = stg1_feature_selector.fit_transform(nosales_train_usampled, nosales_train_usampled.event_note)
        X_train = X_train.astype('float128')
        y_train = nosales_train_usampled.event_note
        if s1_save_flag:
            with open(stg1_feature_selector_model_output.path, 'wb') as file:  
                pickle.dump(stg1_feature_selector, file)
    
        X_test= stg1_feature_selector.transform(nosales_test_ext)
        stage1_model_output_paths = {'rf_clf':rf_clf_model_output.path, 'logistic_clf':logistic_clf_model_output.path,
                               'stage1_nn':stage1_nn_model_output.path, 'gnb':gnb_model_output.path}
        for clf in stage1_classifiers:
            print(clf)

            model = stage1_classifiers[clf]
            # filename = clf + ".model"
            model.fit(X_train, y_train)

            print("\n")
            if s1_save_flag:
                save_path = stage1_model_output_paths[clf]
                with open(save_path, 'wb') as file:  
                    pickle.dump(model, file)

    else:
        print("Loading models...")
        
        with open(rf_clf_model_path_input, "rb") as handler:
            rf_clf = pickle.load(handler)
       
        with open(logistic_clf_model_path_input, "rb") as handler:
            logistic_clf = pickle.load(handler)
        
        with open(stage1_nn_model_path_input, "rb") as handler:
            stage1_nn = pickle.load(handler)
        
        with open(gnb_model_path_input, "rb") as handler:
            gnb = pickle.load(handler)
       
        stage1_classifiers = {'rf_clf':rf_clf, 'logistic_clf':logistic_clf, 'stage1_nn':stage1_nn, 'gnb':gnb}
        
        with open(stg1_feature_selector_model_path_input, "rb") as handler:
            stg1_feature_selector = pickle.load(handler)
        X_test= stg1_feature_selector.transform(nosales_test_ext)
        for clf in stage1_classifiers:
            print(clf)
            model = stage1_classifiers[clf]
            nosales_test_ext = diagnosis_utils.model_diag(nosales_test_ext, model.predict_proba(X_test), model.classes_)
            print("\n")
        
        rf_clf_model_output.path = rf_clf_model_path_input
        logistic_clf_model_output.path = logistic_clf_model_path_input
        stage1_nn_model_output.path = stage1_nn_model_path_input
        gnb_model_output.path = gnb_model_path_input
        stg1_feature_selector_model_output.path = stg1_feature_selector_model_path_input
        


    # ensemble model
    #################################################################### 
    if mode == 'test':
        train_x = nosales_train_ext
        train_y = nosales_train_ext.event_note

    if mode == 'prod':
        ensemble_flag = 'train'
        train_x = nosales_ext
        train_y = nosales_ext.event_note


    print(mode, ensemble_flag, train_x.shape[0])  

    stg2_feture_selector = num_features_tf2

    if ensemble_flag == 'train': 
        print("Training and saving ensemble...")
        stack_pipeline = Pipeline([
            ('ensemble_classifier', utils.EnsembleClassifier(stg1_feature_selector, list(stage1_classifiers.values()),
                                                     stg2_feture_selector, stage2_estimator)) ])
        stack_pipeline.fit(train_x, train_y)
        with open(nosales_model_output.path, 'wb') as file:  
            pickle.dump(stack_pipeline, file)
        
        with open('latest_nosales_model_output', 'wb') as file:  
            pickle.dump(stack_pipeline, file) 
        blob = storage.blob.Blob.from_string(latest_nosales_model_path_input, client=storage.Client())
        blob.upload_from_filename('latest_nosales_model_output')
        print("Saved the final model")
        
        if mode == 'test':
            nosales_test_ext = diagnosis_utils.model_diag(nosales_test_ext, stack_pipeline.predict_proba(nosales_test_ext), stack_pipeline.classes_)
        

    else:
        print("Loading ensemble...")
        with open(nosales_model_path_input, "rb") as handler:
            stack_pipeline = pickle.load(handler)
        nosales_test_ext = diagnosis_utils.model_diag(nosales_test_ext, stack_pipeline.predict_proba(nosales_test_ext), stack_pipeline.classes_)
        
        nosales_model_output.path = nosales_model_path_input
        with open('latest_nosales_model_output', 'wb') as file:  
            pickle.dump(stack_pipeline, file) 
        blob = storage.blob.Blob.from_string(latest_nosales_model_path_input, client=storage.Client())
        blob.upload_from_filename('latest_nosales_model_output')
       
        
    nosales_test_ext.to_csv(nosales_test_ext_output.path, index = False)


In [7]:
@component(base_image=BASE_IMAGE)
def update_thresholds(
    nosales_test_ext_input: Input[Dataset],
    club_thresh_ext_input: Input[Dataset],
    club_thresh_path_input: str,
    nosales_model_input: Input[Model],
    club_threshold_output: Output[Dataset]
):
    
    import utils
    import pandas as pd
    import pickle
    import os
    from google.cloud import storage
    from tempfile import TemporaryFile
    
    nosales_test_ext = pd.read_csv(nosales_test_ext_input.path)
    nosales_test_ext['run_date'] = pd.to_datetime(nosales_test_ext['run_date'])
    
    with open(nosales_model_input.path, "rb") as handler:
        stack_pipeline = pickle.load(handler)
    
    nosales_thresh = utils.gen_thresholds(df = nosales_test_ext,  predictions = stack_pipeline.predict_proba(X=nosales_test_ext), classes = stack_pipeline.classes_)
    df_nosales_thresh = pd.DataFrame(nosales_thresh.items(), columns = ['club_nbr','nosales_club_thresh'])
    
    # Old
    club_threshold_file_path = os.path.join(club_thresh_path_input, "club_thresh_chain.csv")
    # df_cancelled_thresh = pd.read_csv(club_thresh_path_input.path).drop(columns = 'nosales_club_thresh')
    # New 
    df_cancelled_thresh = pd.read_csv(club_thresh_ext_input.path).drop(columns = 'nosales_club_thresh')
    
    all_thresh = df_cancelled_thresh.merge(df_nosales_thresh, how = 'left', on = 'club_nbr')
    club_threshold_output.path = club_threshold_file_path # ????
    all_thresh.to_csv(club_threshold_file_path, index = False)

### Club Score Cutoff


In [8]:
@component(base_image=BASE_IMAGE)
def get_logger(
        from_date: str,
        to_date: str,
        project_id: str,
        df_subset_output: Output[Dataset]
    ):
    
    from google.cloud import bigquery
    import pandas as pd

    client = bigquery.Client(project=project_id)
    sql = """(select * from oyi.rm_report_logger where event_ts>= '{from_date}' and event_ts<= '{to_date}' AND
TRIM(LOWER(event_user)) LIKE '%.%' AND TRIM(LOWER(event_txt)) = 'root_cause')""".format(from_date=from_date,to_date=to_date)
    df = client.query(sql).to_dataframe()
    action_thresh = 5
    df.event_ts= pd.to_datetime(df.event_ts)
    df['central_ts']= df.event_ts.dt.tz_convert('US/Central')
    df_subset= df[(df.event_txt=='root_cause') & (df.event_user.str.match('\w+\.\w+'))].copy()
    df_subset['central_dt']=df_subset.central_ts.dt.date
    df_subset= df_subset.sort_values(['central_dt','club_nbr','item_nbr','central_ts'], ascending=False)
    #df_subset= df_subset[~df_subset.duplicated(['central_dt','club_nbr','item_nbr'],keep= 'first')]
    df_subset= df_subset[~df_subset.duplicated(['ds_uuid','club_nbr','item_nbr'],keep= 'first')]
    df_subset= df_subset.sort_values(['club_nbr','event_user','central_ts'],ascending=True)
    gp= df_subset.groupby(['club_nbr','event_user'])
    df_subset['ts_shifted']=gp.central_ts.transform(lambda x:x.shift(1))
    #df_subset.ts_shifted= df_subset.ts_shifted.dt.tz_localize('GMT').dt.tz_convert('US/Central')
    df_subset.ts_shifted= df_subset.ts_shifted.dt.tz_convert('US/Central')
    df_subset['ts_diff']=  df_subset.central_ts- df_subset.ts_shifted
    df_subset['spurious']= ~(df_subset.ts_diff.isna()) & (df_subset.ts_diff.dt.seconds <= action_thresh)
    df_subset= df_subset[~df_subset.spurious].copy()

    df_subset.to_csv(df_subset_output.path, index=False)

In [9]:
@component(base_image=BASE_IMAGE)
def get_inv(
        from_date: str,
        to_date: str,
        project_id: str,
        invdash_output: Output[Dataset]
    ):
    
    from google.cloud import bigquery
    import pandas as pd
    
    from_rundate= (pd.to_datetime(from_date)- pd.Timedelta('1 days')).date().strftime('%Y-%m-%d')
    to_rundate= (pd.to_datetime(to_date)- pd.Timedelta('1 days')).date().strftime('%Y-%m-%d')
    
    
    client = bigquery.Client(project=project_id)
    sql = f"""select club_nbr,item_nbr,old_nbr,run_date,raw_score,special_item, report_type, uuid from oyi_prod.inventory_dashboard_history 
        where run_date>= '{from_rundate}' and run_date <= '{to_rundate}'
        and display_ind='Display'
        """.format(from_rundate=from_rundate,to_rundate=to_rundate)
    invdash = client.query(sql).to_dataframe()
    print(invdash.columns)
    invdash.run_date = pd.to_datetime(invdash.run_date)
    invdash['actual_date']= invdash.run_date+ pd.Timedelta('1 day')
    invdash.actual_date= invdash.actual_date.dt.date
    invdash= invdash[~invdash.duplicated(['actual_date','club_nbr','old_nbr'],keep= 'first')].copy()
    invdash.actual_date = invdash.actual_date.astype(str)
    #Removing special items which are always added to the list
    invdash = invdash[~(invdash.special_item==1)]
    
    invdash.to_csv(invdash_output.path, index=False)


In [10]:

@component(base_image=BASE_IMAGE)
def dataprep(
        logger_input: Input[Dataset],
        inv_input: Input[Dataset],
        match_nosales_output: Output[Dataset],
        match_cancelled_output: Output[Dataset]
    ):
    
    import pandas as pd
    
    logger = pd.read_csv(logger_input.path)
    inv = pd.read_csv(inv_input.path)
  
    match= pd.merge(left= logger,
                    right= inv, 
                    left_on = ['ds_uuid','club_nbr','item_nbr'],
                    right_on= ['uuid','club_nbr','old_nbr'],
                    how= 'inner', indicator=True, validate='one_to_one')
    match['run_date'] = match['run_date'].astype(str)
    match['action']= ~(match.event_note.isin(['No Action Taken, already out for sale','No Action Taken, already OFS']))
    match_nosales= match[~ (match.report_type=='C')]
    match_cancelled= match[(match.report_type=='C')]
  
    match_nosales.to_csv(match_nosales_output.path, index=False)
    match_cancelled.to_csv(match_cancelled_output.path, index=False)

In [11]:

@component(base_image=BASE_IMAGE)
def get_raw_score_thresholds(
        train_input: Input[Dataset],
        club_thresh_output: Output[Dataset]
    ):
    
    import numpy as np
    import pandas as pd
    from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score
    
    club_thresh = {}
    club_prec = {}
    club_recall = {}
    
    mins, maxs= {},{}
    
    train = pd.read_csv(train_input.path)
    for club in train.club_nbr.unique():
        train_club = train[train.club_nbr==club]
        thresholds = np.sort(list(set(np.round(train_club.raw_score.unique(), 4))))

        f1_arr = []
        prec_arr = []
        recall_arr= []
        for th in thresholds:
            y_pred = list(train_club.raw_score >= th)
            y_true = list(train_club.action == True)
            f1 = f1_score(y_true, y_pred)
            prec = precision_score(y_true, y_pred)
            recall = recall_score(y_true, y_pred)
            f1_arr.append(f1)
            prec_arr.append(prec)
            recall_arr.append(recall)
        
        club_thresh[club] = thresholds[np.argmax(f1_arr)]
#         get the precision and recall associated with the max F1 socre
        club_prec[club] = np.round(prec_arr[np.argmax(f1_arr)], 4)
        club_recall[club] = np.round(recall_arr[np.argmax(f1_arr)], 4)
   
    df_club_prec = pd.DataFrame(club_prec.items(),columns = ['club_nbr','precision'])
    df_club_recall = pd.DataFrame(club_recall.items(),columns = ['club_nbr','recall'])
    df_club_thresh = pd.DataFrame(club_thresh.items(),columns = ['club_nbr','club_thresh'])
    df_thresholds = df_club_thresh.merge(df_club_prec, how = 'left', on = 'club_nbr')\
                                      .merge(df_club_recall, how = 'left', on = 'club_nbr')
    df_thresholds.to_csv(club_thresh_output.path, index = False)

In [12]:
@component(base_image=BASE_IMAGE)
def combine_results(
        nosales_club_thresholds_input: Input[Dataset],
        cancelled_club_thresholds_input: Input[Dataset],
        club_thresh_chain_path_input: str,
        regularized_club_thresh_chain_output: Output[Dataset],
        unregularized_club_thresh_chain_output: Output[Dataset]
    ):
    import pandas as pd
    
    nosales_club_thresholds = pd.read_csv(nosales_club_thresholds_input.path)
    nosales_club_thresholds = nosales_club_thresholds.rename(columns = {'club_thresh': 'nosales_club_thresh',
                                                                        'precision': 'nosales_precision',
                                                                        'recall': 'nosales_recall'})
    cancelled_club_thresholds = pd.read_csv(cancelled_club_thresholds_input.path)
    cancelled_club_thresholds = cancelled_club_thresholds.rename(columns = {'club_thresh': 'cancelled_club_thresh',
                                                                        'precision': 'cancelled_precision',
                                                                        'recall': 'cancelled_recall'})
   
    # merge the DFs
    df_thresholds = nosales_club_thresholds.merge(cancelled_club_thresholds, how = 'left', on = 'club_nbr')
    # Regularize the chosen values by averaging the results with the group mean.
    df_thresholds['nosales_club_thresh'] = ((df_thresholds['nosales_club_thresh'] + df_thresholds['nosales_club_thresh'].mean()) / 2).round(4)
    df_thresholds['cancelled_club_thresh'] = ((df_thresholds['cancelled_club_thresh'] + df_thresholds['cancelled_club_thresh'].mean()) / 2).round(4)


    current_time = pd.datetime.now()
    df_thresholds['update_ts'] = current_time
    df_thresholds.to_csv(regularized_club_thresh_chain_output.path, index=False)
    
    df_thresholds_unregularized = nosales_club_thresholds[['club_nbr', 'nosales_club_thresh']].merge(cancelled_club_thresholds[['club_nbr', 'cancelled_club_thresh']],
                                                                      how = 'left', on = 'club_nbr')
    # old LATEST_CLUB_THRESH_PATH
    # unregularized_club_thresh_chain_output.path = f'{LATEST_CLUB_THRESH_PATH}/club_thresh_chain.csv'
    # df_thresholds_unregularized.to_csv(f'{LATEST_CLUB_THRESH_PATH}/club_thresh_chain.csv', index=False)
    # new
    df_thresholds_unregularized.to_csv(unregularized_club_thresh_chain_output.path, index=False)
    


In [13]:
# # Club-score cutoff
# logger = get_logger(from_date=from_date,
#                        to_date=to_date,
#                        project_id = PROJECT_ID)

# invdash = get_inv(from_date=from_date,
#                   to_date=to_date,
#                   project_id = PROJECT_ID)

# match_data = dataprep(logger_input=logger.outputs['df_subset_output'],
#                    inv_input=invdash.outputs['invdash_output'])

# nosales_result = get_raw_score_thresholds(train_input=match_data.outputs['match_nosales_output'])
# cancelled_result = get_raw_score_thresholds(train_input=match_data.outputs['match_cancelled_output'])

# club_thresh_chain = combine_results(nosales_club_thresholds_input=nosales_result.outputs['club_thresh_output'],
#                                         cancelled_club_thresholds_input=cancelled_result.outputs['club_thresh_output'],
#                                         club_thresh_chain_path_input=LATEST_CLUB_THRESH_PATH)

In [14]:
# print(logger.outputs["df_subset_output"])

In [None]:
# @component(base_image=BASE_IMAGE)
# def out(
#         logger_input: Input[Dataset],
#         inv_input: Input[Dataset],
#         out0: Output[Dataset],
#         out1: Output[Dataset],
#         out2: Output[Dataset]
#     ):
    
#     import pandas as pd
#     out0 = logger_input.path
#     # print(f"out0: {out0}")
#     out1 = pd.read_csv(logger_input.path)
#     # print(f"out1: {out1}")
#     out1.head()
#     out2 = pd.read_csv(inv_input.path)
    
    

In [None]:
# out(logger_input=logger.outputs['df_subset_output'],inv_input=invdash.outputs['invdash_output'])

In [None]:
# logger.outputs["df_subset_output"]

In [None]:
# out(logger_input=logger.outputs['df_subset_output'],inv_input=invdash.outputs['invdash_output']).outputs["out0"]

In [None]:
# a = pd.read_csv(logger.outputs['df_subset_output'])
# a
# # club_thresh_chain.outputs["unregularized_club_thresh_chain_output"]

In [None]:
# @component(packages_to_install=['pandas==1.3.5'])
# def create_dataset(iris_dataset: Output[Dataset]):
#     import pandas as pd

#     csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
#     col_names = [
#         'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
#     ]
#     df = pd.read_csv(csv_url, names=col_names)

#     with open(iris_dataset.path, 'w') as f:
#         df.to_csv(f)

In [None]:
# create_dataset().outputs["iris_dataset"]

In [None]:
# logger = get_logger(from_date=from_date,
#                            to_date=to_date,
#                            project_id = PROJECT_ID)
# # import pprint
# # pp = pprint.PrettyPrinter(indent=4)

# # pp.pprint(logger.outputs['df_subset_output'])
# logger

In [15]:
# `@dsl.pipeline` decorator transforms a Python function into a pipeline that can be executed by KFP back-end. 
@dsl.pipeline(pipeline_root=NOSALE_PIPELINE_ROOT, name=NOSALE_PIPELINE_NAME)
def pipeline():
    # Club-score cutoff
    logger = get_logger(from_date=from_date,
                           to_date=to_date,
                           project_id = PROJECT_ID)
    
    invdash = get_inv(from_date=from_date,
                      to_date=to_date,
                      project_id = PROJECT_ID)
    
    match_data = dataprep(logger_input=logger.outputs['df_subset_output'],
                       inv_input=invdash.outputs['invdash_output'])
    
    nosales_result = get_raw_score_thresholds(train_input=match_data.outputs['match_nosales_output'])
    cancelled_result = get_raw_score_thresholds(train_input=match_data.outputs['match_cancelled_output'])
    
    club_thresh_chain = combine_results(nosales_club_thresholds_input=nosales_result.outputs['club_thresh_output'],
                                        cancelled_club_thresholds_input=cancelled_result.outputs['club_thresh_output'],
                                        club_thresh_chain_path_input=LATEST_CLUB_THRESH_PATH)
    
    # Training 
    data = data_preprocessing(training_data_bq_query_input=TRAINING_DATA_BQ_QUERY,
                              matcher=MATCHER,
                              project_id=PROJECT_ID,
                              env=ENV,
                              pipeline_root=NOSALE_PIPELINE_ROOT)
    
    train_test_data = train_test_split(nosales_ext_input=data.outputs['training_data_output'])
    
    train_eval_data = train_eval_model(nosales_ext_input=data.outputs['training_data_output'],
                                       nosales_train_ext_input=train_test_data.outputs['nosales_train_ext_output'],
                                       nosales_test_ext_input=train_test_data.outputs['nosales_test_ext_output'],
                                       nosales_train_usampled_input=train_test_data.outputs['nosales_train_usampled_output'],
                                       mode=args.MODE,
                                       stage1_flag=args.STAGE1_FLAG,
                                       ensemble_flag=args.ENSEMBLE_FLAG,
                                       rf_clf_model_path_input=args.RF_CLF_MODEL_PATH,
                                       logistic_clf_model_path_input=args.LOGISTIC_CLF_MODEL_PATH,
                                       stage1_nn_model_path_input=args.STAGE1_NN_MODEL_PATH,
                                       gnb_model_path_input=args.GNB_MODEL_PATH,
                                       stg1_feature_selector_model_path_input=args.STG1_FEATURE_SELECTOR_MODEL_PATH,
                                       nosales_model_path_input=args.NOSALES_MODEL_PATH,
                                       latest_nosales_model_path_input=LATEST_NOSALES_PATH,
                                       project_id=PROJECT_ID,
                                       region=REGION,
                                       timestamp=TIMESTAMP)
   
    updated_thresholds = update_thresholds(nosales_test_ext_input=train_eval_data.outputs['nosales_test_ext_output'],  
                                           club_thresh_ext_input=club_thresh_chain.outputs["unregularized_club_thresh_chain_output"], 
                                           club_thresh_path_input=LATEST_CLUB_THRESH_PATH,
                                           nosales_model_input=train_eval_data.outputs['nosales_model_output'])

    element_model_registry = CustomTrainingJobOp(
        project=PROJECT_ID,
        location=REGION,
        service_account=SERVICE_ACCOUNT,
        network="projects/12856960411/global/networks/vpcnet-shared-prod-01",
        reserved_ip_ranges=["vpcnet-shared-prod-01-datafusion-01"],

        display_name="mlflow-model-registry",

        worker_pool_specs=[{
            "replica_count": 1,
            "machine_spec": {
                "machine_type": "n1-standard-4",
                "accelerator_count": 0,
            },
            # The below dictionary specifies:
            #   1. The URI of the custom image to run this CustomTrainingJobOp against
            #      - this image is built from ../../custom_image_builds/model_registry_image_build.ipynb
            #   2. The command to run against that image
            #   3. The arguments to supply to that custom image 
            "container_spec": {
                "image_uri": MLFLOW_IMAGE,
                "command": [
                    "python3", "nosales_model_registry.py"
                ],
                "args": [
                    "--GCS_MODEL_PATH", LATEST_NOSALES_PATH,
                    "--MLFLOW_EXP_NAME", MLFLOW_EXP_NAME,
                    "--MODEL_REGISTRY_NAME", MODEL_REGISTRY_NAME
                ],
            },
        }],

    ).set_display_name("element-mlflow-model-registry")
    element_model_registry.after(train_eval_data)
    
    
#     element_model_registry = CustomTrainingJobOp(
#         project=PROJECT_ID,
#         location=REGION,
#         service_account=SERVICE_ACCOUNT,
#         network="projects/12856960411/global/networks/vpcnet-shared-prod-01",
#         reserved_ip_ranges=["vpcnet-shared-prod-01-datafusion-01"],

#         display_name="mlflow-model-registry",

#         worker_pool_specs=[{
#             "replica_count": 1,
#             "machine_spec": {
#                 "machine_type": "n1-standard-4",
#                 "accelerator_count": 0,
#             },
#             # The below dictionary specifies:
#             #   1. The URI of the custom image to run this CustomTrainingJobOp against
#             #      - this image is built from ../../custom_image_builds/model_registry_image_build.ipynb
#             #   2. The command to run against that image
#             #   3. The arguments to supply to that custom image 
#             "container_spec": {
#                 "image_uri": MLFLOW_IMAGE,
#                 "command": [
#                     "python3", "cancelled_model_registry.py"
#                 ],
#                 "args": [
#                     "--GCS_MODEL_PATH", CANCELLED_MODEL_PATH,
#                     "--MLFLOW_EXP_NAME", MLFLOW_EXP_NAME,
#                     "--MODEL_REGISTRY_NAME", MODEL_REGISTRY_NAME,
#                 ],
#             },
#         }],

#     ).set_display_name("element-mlflow-model-registry")
    
    

In [16]:
compiler.Compiler().compile(
    pipeline_func=pipeline, 
    package_path=TMP_NOSALE_JSON,
)

In [17]:
pipeline_job = aiplatform.PipelineJob(
    display_name=f"{NOSALE_PIPELINE_NAME}-{TIMESTAMP}",
    template_path=TMP_NOSALE_JSON,
    pipeline_root=NOSALE_PIPELINE_ROOT,
    parameter_values={},
    enable_caching=False,
)


In [18]:
pipeline_utils.store_pipeline(
    storage_path=LATEST_NOSALES_PATH_JSON, 
    filename=TMP_NOSALE_JSON
)

/tmp/oyi-nosales-model-pipeline-dev.json
contents /tmp/oyi-nosales-model-pipeline-dev.json uploaded to gs://oyi-ds-vertex-pipeline-bucket-nonprod/latest_nosales_model_output_dev.json.


In [19]:
pipeline_job.submit(service_account=SERVICE_ACCOUNT)

Creating PipelineJob
PipelineJob created. Resource name: projects/335163835346/locations/us-central1/pipelineJobs/oyi-nosales-model-pipeline-dev-20221013052512
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/335163835346/locations/us-central1/pipelineJobs/oyi-nosales-model-pipeline-dev-20221013052512')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/oyi-nosales-model-pipeline-dev-20221013052512?project=335163835346
