In [2]:
%cd "~/okcredit_github/retraining_pipeline/training_pipeline"

/home/nikhilmishra_okcredit_in/okcredit_github/retraining_pipeline/training_pipeline


In [3]:
import pandas as pd
import numpy as np
import sys
import os
from omegaconf import OmegaConf
from okcredit_ml.ml_pipeline import classifier, model, validation

from okcredit_ml.cloud_connectors import gcp
from copy import deepcopy
from src import utils

sys.path.append('../feature_pipeline/')
sys.path.append('../feature_pipeline/svc')
from OKCFeaturePipeline import OKCFeaturePipeline

from pathlib import Path
from google.cloud import storage

gcp_bq = gcp.BQPy(project_id='okcredit-data-science')

In [4]:
data_config = OmegaConf.load('./configs/data_config.yaml')
feature_pipe_config = OmegaConf.load("./configs/feature_pipeline_config_template.yaml")

In [5]:
utils.download_files_from_gcs_folder(project=data_config.data_path.gcs.project,
                                     bucket_name=data_config.data_path.gcs.bucket_name,
                                     gcs_folder_path=data_config.data_path.gcs.input_data,
                                     local_save_path=data_config.data_path.local)


utils.create_folder_in_gcs(
    project=data_config.data_path.gcs.project,
    bucket_name=data_config.data_path.gcs.bucket_name,
    folder_name=data_config.data_path.gcs.features
)

File test_bnpl.csv downloaded to ./data/test_bnpl.csv
File test_tl.csv downloaded to ./data/test_tl.csv
File train_bnpl.csv downloaded to ./data/train_bnpl.csv
File train_comb.csv downloaded to ./data/train_comb.csv
File train_tl.csv downloaded to ./data/train_tl.csv


In [11]:
for df_name in data_config.create_features_for_files:
    
    file_path = os.path.join(data_config.data_path.local, df_name) + '.csv'
    df = pd.read_csv(file_path)
    print(f'Loaded {file_path}')
    
    df['run_date'] = pd.to_datetime(df['run_date'])

    config = utils.create_config_from_template(feature_pipe_config, df_name)
    OmegaConf.save(config=config, f=f'./configs/feature_pipeline_config_{df_name}.yaml')
    
    gcp_bq.load_pandas_df(df, disposition='WRITE_TRUNCATE', destination=config.basetables.cohort)
    print(f'Written {df_name} to {config.basetables.cohort}')

    print(f'Starting feature pipeline for {df_name}')
    
    # Delete the old feature pipeline outptu table if it exists, since feature pipeline always appends to the table
    gcp_bq.delete_table(config.sink)
    pipeline = OKCFeaturePipeline(config=config)
    pipeline.predict(X=['2022-08-13'], names= ['run_date'])

    print(f'Completed feature pipeline for {df_name}')
    
    print(f'Cleaning up temporary tables')
    utils.cleanup_tables(project=data_config.data_path.gcs.project, config=config)
    
    fts_df = gcp_bq.bq_to_pandas(f'SELECT * FROM `{config.sink}`')
    fts_df['run_date'] = pd.to_datetime(fts_df['run_date'])
    train_df = pd.merge(df, fts_df, on=['merchant_id', 'run_date'], how='left')

    save_filename = f'{df_name}_fts.csv'

    local_file_path = os.path.join(data_config.data_path.local, save_filename)
    train_df.to_csv(local_file_path, index=False)
    
    utils.copy_from_local_to_gcs(
        project=data_config.data_path.gcs.project,
        bucket_name=data_config.data_path.gcs.bucket_name,
        gcs_file_path=os.path.join(data_config.data_path.gcs.features, save_filename),
        local_file_path=local_file_path
    )

Loaded ./data/train_tl.csv
Loaded ./data/train_bnpl.csv
Loaded ./data/test_bnpl.csv
Loaded ./data/test_tl.csv
