### Import Libraries

In [None]:
# import required libraries
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, ClassificationMetrics,
                        Metrics, component)
import os
import re
from pathlib import Path

from datetime import date
from datetime import timedelta
from dateutil.relativedelta import relativedelta

import google
from google.oauth2 import credentials
from google.oauth2 import service_account
from google.oauth2.service_account import Credentials
from google.cloud import storage
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components.v1.batch_predict_job import \
    ModelBatchPredictOp as batch_prediction_op

import datetime as dt

### YAML Parameters

In [None]:
#tag cell with parameters
PROJECT_ID =  ''
DATASET_ID = ''
RESOURCE_BUCKET = ''
FILE_BUCKET = ''
REGION = ''
MODEL_ID = ''
MODEL_NAME = ''


In [None]:
#tag cell with parameters
PROJECT_ID =  'divg-groovyhoon-pr-d2eab4'
DATASET_ID = 'nba_offer_targeting'
BASE_TABLE_ID = 'qua_base_hs'
RESOURCE_BUCKET = 'divg-groovyhoon-pr-d2eab4-default'
FILE_BUCKET = 'divg-groovyhoon-pr-d2eab4-default'
MODEL_ID = ''
MODEL_NAME = 'nba_offer_targeting'

IRPC_DIGITAL_1P_BASE = 'bq_irpc_digital_1p_base'
IRPC_DIGITAL_2P_BASE = 'bq_irpc_digital_2p_base'
IRPC_CASA_BASE = 'bq_irpc_casa_base'

IRPC_OFFER_1P_PLANS = 'irpc_offer_1p_plans'
IRPC_OFFER_2P_PLANS = 'irpc_offer_2p_plans'

IRPC_OFFER_PRICES = 'irpc_offer_prices'

# define some input BQ tables
OFFER_PARAMETER = 'bi-stg-mobilityds-pr-db8ce2.nba_offer_targeting.bq_offer_targeting_params_upd' 
#WHSIA_ELIGIBLE_BASE = 'bi-srv-cpsbi-pr-a69cd8.hsce_hems_mdl_ds.bq_hh_mdl_whsiagtm_tbl'
WHSIA_ELIGIBLE_BASE = 'divg-team-v03-pr-de558a.nba_offer_targeting.bq_whsiagtm4testing'
SHS_PROFESSIONAL_INSTALL = 'divg-team-v03-pr-de558a.OT.SHS_FSA_List_native'
PROD_CD2REMOVE = 'divg-groovyhoon-pr-d2eab4.nba_offer_targeting.prod_cd_exclusions'

# define output BQ table names for eligible base
QUA_BASE_HS = 'divg-groovyhoon-pr-d2eab4.nba_offer_targeting.qua_base_hs'
QUA_BASE_MOB = 'divg-groovyhoon-pr-d2eab4.nba_offer_targeting.qua_base_mob'
QUA_BASE_CAT3 = 'divg-groovyhoon-pr-d2eab4.nba_offer_targeting.qua_base_cat3'

#offer details table
NBA_DIGITAL_OFFER_DETAILS = 'nba_digital_offer_details'
NBA_CASA_OFFER_DETAILS = 'nba_casa_offer_details'

# final ranking table
NBA_FFH_OFFER_RANKING = 'nba_ffh_offer_ranking'


### Service Parameters

In [None]:
SERVICE_TYPE = 'nba_offer_targeting'
SERVICE_TYPE_NAME = 'nba-offer-targeting'
REGION = "northamerica-northeast1"

### Pipeline Parameters

In [None]:
STACK_NAME = 'nba_offer_targeting'
SERVING_PIPELINE_NAME_PATH = 'nba_offer_targeting_pipeline/serving_pipeline'
SERVING_PIPELINE_NAME = 'nba-offer-targeting-serving-pipeline' # Same name as pulumi.yaml
SERVING_PIPELINE_DESCRIPTION = 'nba-offer-targeting-serving-pipeline'
PIPELINE_ROOT = f"gs://{FILE_BUCKET}"
REGION = "northamerica-northeast1"

### Bucket Parameters

In [None]:
# input files
DIGITAL_1P_BASE_SAVE_DATA_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_DIGITAL_1P_BASE}.csv'
OFFER_1P_PLANS_SAVE_DATA_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_OFFER_1P_PLANS}.csv'
OFFER_PRICES_SAVE_DATA_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_OFFER_PRICES}.csv'
DIGITAL_2P_BASE_SAVE_DATA_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_DIGITAL_2P_BASE}.csv'
OFFER_2P_PLANS_SAVE_DATA_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_OFFER_2P_PLANS}.csv'
casa_base_save_data_path = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_CASA_BASE}.csv'

# base with offers files 
DIGITAL_1P_BASE_WITH_OFFERS_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_DIGITAL_1P_BASE}_with_offers.csv'
DIGITAL_2P_BASE_WITH_OFFERS_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_DIGITAL_2P_BASE}_with_offers.csv'
CASA_BASE_WITH_OFFERS_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_CASA_BASE}_with_offers.csv'

# postprocess files
DIGITAL_1P_BASE_POSTPROCESS_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_DIGITAL_1P_BASE}_postprocess.csv'
DIGITAL_2P_BASE_POSTPROCESS_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_DIGITAL_2P_BASE}_postprocess.csv'
CASA_BASE_POSTPROCESS_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_CASA_BASE}_postprocess.csv'

# irpc_offers_assigned - final file 
IRPC_OFFERS_ASSIGNED_PATH = f'gs://{FILE_BUCKET}/{STACK_NAME}/irpc_offers_assigned_existing_customers.csv'

### Import Pipeline Components

In [None]:
# download required component files to local
prefix = f'{STACK_NAME}/{SERVING_PIPELINE_NAME_PATH}/components/'
dl_dir = 'components/'

storage_client = storage.Client()
bucket = storage_client.bucket(RESOURCE_BUCKET)
blobs = bucket.list_blobs(prefix=prefix)  # Get list of files
for blob in blobs: # download each file that starts with "prefix" into "dl_dir"
    print(blob.name)
    if blob.name.endswith("/"):
        continue
    file_split = blob.name.split(prefix)
    file_path = f"{dl_dir}{file_split[-1]}"
    directory = "/".join(file_path.split("/")[0:-1])
    Path(directory).mkdir(parents=True, exist_ok=True)
    blob.download_to_filename(file_path) 

# import main pipeline components
from components.reg_offers_base_existing import reg_offers_base_existing
from components.reg_offers_base_prospects import reg_offers_base_prospects
from components.reg_offers_base_cat3 import reg_offers_base_cat3

from components.bq_create_dataset import bq_create_dataset
from components.bq_import_tbl_to_df import bq_import_tbl_to_df
from components.offer_attachment_digital import offer_attachment_digital
from components.offer_attachment_casa import offer_attachment_casa
from components.postprocess import postprocess
from components.bq_export_to_bq import bq_export_to_bq

from components.nba_ffh_offer_ranking import nba_ffh_offer_ranking


### Pipeline

In [None]:
# library imports
from kfp.v2 import compiler
from google.cloud.aiplatform import pipeline_jobs
@dsl.pipeline(
    name=SERVING_PIPELINE_NAME, 
    description=SERVING_PIPELINE_DESCRIPTION
    )
def pipeline(
        project_id: str = PROJECT_ID,
        region: str = REGION,
        resource_bucket: str = RESOURCE_BUCKET, 
        file_bucket: str = FILE_BUCKET
    ):
    
    #### this code block is only for a personal workbench 
    
    import google.oauth2.credentials
    token = !gcloud auth print-access-token
    token_str = token[0]
    
    #### the end
    
    from datetime import datetime
    update_ts = datetime.now()
    update_ts_str = update_ts.strftime('%Y-%m-%d %H:%M:%S')
    
    ################################################
    ############## Existing Customers ##############
    ################################################
    
    # ----- Create Regular Offer Base Table ------
    reg_offers_base_existing_op = reg_offers_base_existing(project_id = PROJECT_ID
                                    , offer_parameter = OFFER_PARAMETER
                                    , whsia_eligible_base = WHSIA_ELIGIBLE_BASE
                                    , shs_professional_install = SHS_PROFESSIONAL_INSTALL
                                    , prod_cd2remove = PROD_CD2REMOVE
                                    , qua_base = QUA_BASE_HS
                                    , token = token_str 
                                    )
    
    reg_offers_base_existing_op.set_memory_limit('16G')
    reg_offers_base_existing_op.set_cpu_limit('8')
        
    # ----- Create IRPC Digital 1P, Digital 2P, and Casa Base Tables --------
    bq_create_dataset_op = bq_create_dataset(project_id=PROJECT_ID
                                  , dataset_id=DATASET_ID
                                  , token=token_str
                                  )

    bq_create_dataset_op.set_memory_limit('16G')
    bq_create_dataset_op.set_cpu_limit('8')
    
    #############################################
    ############## IRPC Digital 1P ##############
    #############################################
    
    # ----- save irpc digital 1p base to gcs --------
    irpc_digital_1p_base_to_gcs_op = bq_import_tbl_to_df(project_id=PROJECT_ID, 
                                              dataset_id=DATASET_ID, 
                                              table_id=IRPC_DIGITAL_1P_BASE, 
                                              save_data_path=DIGITAL_1P_BASE_SAVE_DATA_PATH, 
                                              token=token_str)
    
    irpc_digital_1p_base_to_gcs_op.set_memory_limit('16G')
    irpc_digital_1p_base_to_gcs_op.set_cpu_limit('8')
    
    # ----- save offer 1p plans to gcs --------
    irpc_offer_1p_plans_to_gcs_op = bq_import_tbl_to_df(project_id=PROJECT_ID, 
                                              dataset_id=DATASET_ID, 
                                              table_id=IRPC_OFFER_1P_PLANS, 
                                              save_data_path=OFFER_1P_PLANS_SAVE_DATA_PATH, 
                                              token=token_str)

    irpc_offer_1p_plans_to_gcs_op.set_memory_limit('16G')
    irpc_offer_1p_plans_to_gcs_op.set_cpu_limit('8')
    
    # ----- save offer 1p plans to gcs --------
    irpc_offer_prices_to_gcs_op = bq_import_tbl_to_df(project_id=PROJECT_ID
                                                      , dataset_id=DATASET_ID
                                                      , table_id=IRPC_OFFER_PRICES
                                                      , save_data_path=OFFER_PRICES_SAVE_DATA_PATH
                                                      , token=token_str)

    irpc_offer_prices_to_gcs_op.set_memory_limit('16G')
    irpc_offer_prices_to_gcs_op.set_cpu_limit('8')
    
    # ----- offer attachment - digital 1p --------    
    offer_attachment_digital_1p_op = offer_attachment_digital(irpc_base_csv = DIGITAL_1P_BASE_SAVE_DATA_PATH
                                                              , irpc_offer_plans_csv = OFFER_1P_PLANS_SAVE_DATA_PATH
                                                              , irpc_offer_prices_csv = OFFER_PRICES_SAVE_DATA_PATH
                                                              , channel = 'digital'
                                                              , save_data_path = DIGITAL_1P_BASE_WITH_OFFERS_PATH
                                                              )

    offer_attachment_digital_1p_op.set_memory_limit('32G')
    offer_attachment_digital_1p_op.set_cpu_limit('16')
    
    postprocess_digital_1p_op = postprocess(project_id = PROJECT_ID
                                , dataset_id = DATASET_ID
                                , table_id = NBA_DIGITAL_OFFER_DETAILS
                                , read_data_path = DIGITAL_1P_BASE_WITH_OFFERS_PATH
                                , save_data_path = DIGITAL_1P_BASE_POSTPROCESS_PATH # f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_DIGITAL_1P_BASE}_postprocess.csv'
                                , base_type = 'digital_1p'
                                , token=token_str)
    
    postprocess_digital_1p_op.set_memory_limit('32G')
    postprocess_digital_1p_op.set_cpu_limit('16')
    
    #############################################
    ############## IRPC Digital 2P ##############
    #############################################
    
    # ----- save irpc digital 2p base to gcs --------
    irpc_digital_2p_base_to_gcs_op = bq_import_tbl_to_df(project_id=PROJECT_ID, 
                                              dataset_id=DATASET_ID, 
                                              table_id=IRPC_DIGITAL_2P_BASE, 
                                              save_data_path=DIGITAL_2P_BASE_SAVE_DATA_PATH, 
                                              token=token_str)

    irpc_digital_2p_base_to_gcs_op.set_memory_limit('16G')
    irpc_digital_2p_base_to_gcs_op.set_cpu_limit('8')
    
    # ----- save offer 2p plans to gcs --------
    irpc_offer_2p_plans_to_gcs_op = bq_import_tbl_to_df(project_id=PROJECT_ID, 
                                              dataset_id=DATASET_ID, 
                                              table_id=IRPC_OFFER_2P_PLANS, 
                                              save_data_path=OFFER_2P_PLANS_SAVE_DATA_PATH, 
                                              token=token_str)

    irpc_offer_2p_plans_to_gcs_op.set_memory_limit('16G')
    irpc_offer_2p_plans_to_gcs_op.set_cpu_limit('8')
    
    # ----- offer attachment - digital 2p --------    
    offer_attachment_digital_2p_op = offer_attachment_digital(irpc_base_csv = DIGITAL_2P_BASE_SAVE_DATA_PATH,
                                                             irpc_offer_plans_csv = OFFER_2P_PLANS_SAVE_DATA_PATH,
                                                             irpc_offer_prices_csv = OFFER_PRICES_SAVE_DATA_PATH,
                                                             channel = 'digital',
                                                             save_data_path = DIGITAL_2P_BASE_WITH_OFFERS_PATH
                                                              )

    offer_attachment_digital_2p_op.set_memory_limit('32G')
    offer_attachment_digital_2p_op.set_cpu_limit('16')
    
    postprocess_digital_2p_op = postprocess(project_id = PROJECT_ID
                                , dataset_id = DATASET_ID
                                , table_id = NBA_DIGITAL_OFFER_DETAILS
                                , read_data_path = DIGITAL_2P_BASE_WITH_OFFERS_PATH
                                , save_data_path = DIGITAL_2P_BASE_POSTPROCESS_PATH # f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_DIGITAL_1P_BASE}_postprocess.csv'
                                , base_type = 'digital_2p'
                                , token=token_str)
    
    postprocess_digital_2p_op.set_memory_limit('32G')
    postprocess_digital_2p_op.set_cpu_limit('16')
    
    #############################################
    ################# IRPC Casa #################
    #############################################
    
    # ----- save irpc digital 2p base to gcs --------
    irpc_casa_base_to_gcs_op = bq_import_tbl_to_df(project_id=PROJECT_ID, 
                                              dataset_id=DATASET_ID, 
                                              table_id=IRPC_CASA_BASE, 
                                              save_data_path=casa_base_save_data_path, 
                                              token=token_str)

    irpc_casa_base_to_gcs_op.set_memory_limit('16G')
    irpc_casa_base_to_gcs_op.set_cpu_limit('8')
    
    # ----- offer attachment - casa --------    
    offer_attachment_casa_op = offer_attachment_casa(irpc_base_csv = casa_base_save_data_path
                                                     , irpc_offer_plans_csv = OFFER_1P_PLANS_SAVE_DATA_PATH
                                                     , irpc_offer_prices_csv = OFFER_PRICES_SAVE_DATA_PATH
                                                     , channel = 'casa'
                                                     , save_data_path = CASA_BASE_WITH_OFFERS_PATH
                                                    )

    offer_attachment_casa_op.set_memory_limit('32G')
    offer_attachment_casa_op.set_cpu_limit('16')
    
    postprocess_casa_op = postprocess(project_id = PROJECT_ID
                                , dataset_id = DATASET_ID
                                , table_id = NBA_CASA_OFFER_DETAILS
                                , read_data_path = CASA_BASE_WITH_OFFERS_PATH
                                , save_data_path = CASA_BASE_POSTPROCESS_PATH # f'gs://{FILE_BUCKET}/{STACK_NAME}/{IRPC_DIGITAL_1P_BASE}_postprocess.csv'
                                , base_type = 'casa'
                                , token=token_str)
    
    postprocess_casa_op.set_memory_limit('32G')
    postprocess_casa_op.set_cpu_limit('16')
    
    # ----- export final qua_base_hs_irpc table to bq --------    
    bq_export_to_bq_op = bq_export_to_bq(project_id = PROJECT_ID
                                         , dataset_id = DATASET_ID
                                         , table_id = BASE_TABLE_ID
                                         , temp_table = 'qua_base_hs_irpc'
                                         , digital_1p_data_path = DIGITAL_1P_BASE_POSTPROCESS_PATH
                                         , digital_2p_data_path = DIGITAL_2P_BASE_POSTPROCESS_PATH
                                         , casa_data_path = CASA_BASE_POSTPROCESS_PATH
                                         , save_data_path = IRPC_OFFERS_ASSIGNED_PATH
                                         , token = token_str
                                        )
    
    bq_export_to_bq_op.set_memory_limit('32G')
    bq_export_to_bq_op.set_cpu_limit('16')

    # ----- pipeline sequence -----
    
    bq_create_dataset_op.after(reg_offers_base_existing_op)
    
    irpc_digital_1p_base_to_gcs_op.after(bq_create_dataset_op)
    irpc_offer_1p_plans_to_gcs_op.after(irpc_digital_1p_base_to_gcs_op)
    irpc_offer_prices_to_gcs_op.after(irpc_offer_1p_plans_to_gcs_op)
    offer_attachment_digital_1p_op.after(irpc_offer_prices_to_gcs_op)
    postprocess_digital_1p_op.after(offer_attachment_digital_1p_op)
    
    irpc_digital_2p_base_to_gcs_op.after(bq_create_dataset_op)
    irpc_offer_2p_plans_to_gcs_op.after(irpc_digital_2p_base_to_gcs_op)
    offer_attachment_digital_2p_op.after(irpc_offer_prices_to_gcs_op)
    offer_attachment_digital_2p_op.after(irpc_offer_2p_plans_to_gcs_op)
    postprocess_digital_2p_op.after(offer_attachment_digital_2p_op)
    
    irpc_casa_base_to_gcs_op.after(bq_create_dataset_op)
    offer_attachment_casa_op.after(irpc_offer_1p_plans_to_gcs_op)
    offer_attachment_casa_op.after(irpc_offer_prices_to_gcs_op)
    offer_attachment_casa_op.after(irpc_casa_base_to_gcs_op)
    postprocess_casa_op.after(offer_attachment_casa_op)
    
    bq_export_to_bq_op.after(postprocess_digital_1p_op)
    bq_export_to_bq_op.after(postprocess_digital_2p_op)
    bq_export_to_bq_op.after(postprocess_casa_op)
    
    ################################################
    ################### Prospects ##################
    ################################################
    
    # ----- Create Regular Offer Base Table ------
    reg_offers_base_prospects_op = reg_offers_base_prospects(project_id = PROJECT_ID
                                    , offer_parameter = OFFER_PARAMETER
                                    , whsia_eligible_base = WHSIA_ELIGIBLE_BASE
                                    , shs_professional_install = SHS_PROFESSIONAL_INSTALL
                                    , prod_cd2remove = PROD_CD2REMOVE
                                    , qua_base = QUA_BASE_MOB
                                    , token = token_str 
                                    )

    reg_offers_base_prospects_op.set_memory_limit('32G')
    reg_offers_base_prospects_op.set_cpu_limit('16')
    
    ################################################
    ################### Category 3 #################
    ################################################
    
    # ----- Create Regular Offer Base Table ------
    reg_offers_base_cat3_op = reg_offers_base_cat3(project_id = PROJECT_ID
                                    , offer_parameter = OFFER_PARAMETER
                                    , whsia_eligible_base = WHSIA_ELIGIBLE_BASE
                                    , qua_base = QUA_BASE_CAT3
                                    , token = token_str 
                                    )

    reg_offers_base_cat3_op.set_memory_limit('32G')
    reg_offers_base_cat3_op.set_cpu_limit('16')
    
    ################################################
    ############### FINAL NBA RANKING ##############
    ################################################

    # ----- final NBA ranking --------    
    nba_ffh_offer_ranking_op = nba_ffh_offer_ranking(project_id = PROJECT_ID
                                                  , dataset_id = DATASET_ID
                                                  , table_id = NBA_FFH_OFFER_RANKING
                                                  , file_bucket = FILE_BUCKET
                                                  , stack_name = STACK_NAME
                                                  , token = token_str
                                                 )
    
    nba_ffh_offer_ranking_op.set_memory_limit('16G')
    nba_ffh_offer_ranking_op.set_cpu_limit('8')
    
    nba_ffh_offer_ranking_op.after(bq_export_to_bq_op)
    nba_ffh_offer_ranking_op.after(reg_offers_base_prospects_op)
    nba_ffh_offer_ranking_op.after(reg_offers_base_cat3_op)
    

### Run the Pipeline Job

In [None]:
# from kfp.v2 import compiler
# from google.cloud.aiplatform import pipeline_jobs
# import json

# compiler.Compiler().compile(
#    pipeline_func=pipeline, package_path="pipeline.json"
# )

# job = pipeline_jobs.PipelineJob(
#                                    display_name=TRAIN_PIPELINE_NAME,
#                                    template_path="pipeline.json",
#                                    location=REGION,
#                                    enable_caching=False,
#                                    pipeline_root = PIPELINE_ROOT
#                                 )
# job.run(service_account = f"bilayer-sa@{PROJECT_ID}.iam.gserviceaccount.com")


In [None]:
import google.oauth2.credentials
import json

token = !gcloud auth print-access-token
CREDENTIALS = google.oauth2.credentials.Credentials(token[0])

compiler.Compiler().compile(
   pipeline_func=pipeline, package_path="pipeline.json"
)

job = pipeline_jobs.PipelineJob(
   display_name=SERVING_PIPELINE_NAME,
   template_path="pipeline.json",
   credentials = CREDENTIALS,
   pipeline_root = PIPELINE_ROOT,
   location=REGION,
   enable_caching=False # I encourage you to enable caching when testing as it will reduce resource use
)

job.run()