In [233]:
import matplotlib.pyplot as plt
import pandas as pd

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath 

from google.cloud import aiplatform

# We'll use this namespace for metadata querying
from google.cloud import aiplatform_v1

In [234]:
PROJECT_ID = "demand-forecasting-330305"
DATE_COL = "INVOICE_DATE"
PROD_COL = "ARTICLE"
CUST_COL = "PAYER"
TARGET_COL = "BILLED_QTY"
FORECAST_HORIZON = 18
BUCKET_NAME = "gs://welspun_mlops_data"
REGION="us-central1"
PIPELINE_ROOT = f"{BUCKET_NAME}/welspun_project"

In [235]:
@component(
    base_image="python:3.9",
)
def query_generator() -> str:
    try:
        import logging
        PROJECT_ID = "demand-forecasting-330305"
        DATE_COL = "INVOICE_DATE"
        PROD_COL = "ARTICLE"
        CUST_COL = "PAYER"
        TARGET_COL = "BILLED_QTY"
        FORECAST_HORIZON = 18
        DATASET_ID="WLSPN_L1_prod"
        TABLE_ID="TBL_WUSA_ZSD9_SALES_VIEW"
        BUCKET_NAME = "gs://welspun_mlops_data"
        REGION="us-central1"
        PIPELINE_ROOT = f"{BUCKET_NAME}/welspun_project"
        query = f"""
        SELECT {DATE_COL}, {PROD_COL}, {TARGET_COL},{CUST_COL}
                FROM `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}` 
                where distribution_channel=40 and billed_qty > 0 and PAYER IN ('502687','502822','501870','502488','502782','502841','502623','502567','503149','500570','503227','503315','503054','502854','500811','503326','502503') and ARTICLE IN ('EFRN-TW-BHW-03','EWAY-TW-BHW-04','EFRN-TW-BHW-06','EWAY-TW-BHW-01','EFRN-TW-BHW-01','EJAM-TW-BHW-04','EJAM-TW-BHW-03','EJAM-TW-BHW-01','EFRN-TW-BHW-05','EFRN-TW-BHW-04','EHUD-TW-BHW-01',
 'EWAY-TW-BHW-06','EHUD-TW-BHW-06','ECBA-TW-BTW3-05','EHUD-TW-BHW-02','EWAY-TW-BHW-05','EWAY-TW-BHW-02','EJAM-TW-BHW-06','ETUR-BTRG-RG12-01','EWHM-TW-8SET-09','ECBA-TW-BTW3-01','EWHM-TW-8SET-04','ECBA-TW-BTW3-07','EFRN-TW-BHW-15','EFRN-TW-BHW-02','MICP-BTRG-RG12-01','EAND-TW-BHW-02','EWAY-TW-BHW-09','EPLC-DUV-KING-01','EWHM-TW-8SET-06','ECBA-TW-BTW3-03','EAND-TW-BHW-01','EPLC-DUV-FLQN-01','EAND-TW-BHW-06','EWHM-TW-8SET-02','ETUR-BTRG-RG12-03','ECBA-TW-BTW3-04','ETUR-BTRG-RG04-01','MICP-BTRG-RG12-03',
 'EFRN-TW-BHW-07','ETUR-BTRG-RG04-03','EJAM-TW-BHW9-03','EHUD-TW-BHW-04','EWAY-TW-BHW-10','EWAY-TW-BHW-07','EMDS-TW-BHW-04','EWAY-TW-BHW-08','EHUD-TW-BHW-03','EWHM-TW-8SET-01','MICP-BTRG-RG22-03','EAND-TW-BHW-03','ETUR-BTRG-RG04-05','EFRN-TW-BHW-08','ECBA-TW-BTW3-06','EFRN-TW-BHW-11','MICP-BTRG-RG22-01','MICP-BTRG-AS-03','EWHM-TW-8SET-03','EHUD-TW-BHW-05','EFRN-TW-BHW-12','EJAM-TW-BHW9-02','EPLC-DUV-FLQN-03','MICP-BTRG-RG12-02','ETUR-BTRG-RG12-05',
 'EPLC-DUV-KING-02','EAND-TW-BHW-05','ETUR-BTRG-RG04-04','EJAM-TW-BHW-02','EFRN-TW-BHW-09','ETUR-BTRG-RG12-02','EFRN-TW-BHW-13','EFRN-TW-BS19-01','EFRN-TW-BHW-10','ETUR-BTRG-RG04-02','EAND-TW-BHW-04','EWHM-TW-8SET-07','EFRN-TW-BS19-06','ETUR-BTRG-RG12-04','EPLC-DUV-KING-03','EMDS-TW-BHW-06','ECCW-DUV-FLQN-02','EFRN-TW-BS19-03','EJAM-TW-BHW9-01','EMDS-TW-BHW-01','MICP-BTRG-RG22-02','ETUR-BTRG-RG12-06','MNST-TW-BHW-04','MICP-BTRG-AS-01','EWAY-TW-BHW-11','EJAM-TW-BHW-05','ETUR-BTRG-RG04-06','ECBB-TW-BHW-03','MNST-TW-BHW-01','EPLC-DUV-FLQN-02','EBBL-TW-BTW3-01','ECCW-DUV-FLQN-05','MICP-BTRG-AS-02','ECCW-DUV-FLQN-04','MNST-TW-BHW-05','ECBB-TW-BHW-01','EWHM-TW-8SET-08','EMDS-TW-BHW-02',
 'ECBB-TW-BHW-06','ELFL-QST-QUEN-01','EWHM-TW-8SET-12','ECBA-TW-BTW3-02','MNST-TW-BHW-02','EWHM-TW-8SET-05','EFRN-TW-BS19-02','EIDL-TW-BHW-06','EJAM-TW-5BHW-03','EBTL-TW-BTW3-02','EPLC-DUV-FLQN-04','ECBB-TW-BHW-04','EFRN-TW-BS19-05','EMDS-TW-BHW-03','EFRN-TW-5BHW-01','EMDS-TW-BHW-05','EIDL-TW-BHW-04','EAND-TW-BHW-09')
  ORDER BY {DATE_COL};
        """
        return query
    except Exception as e:
        logging.error(f"ERROR OCCURED IN COMPONENT : model_data_preperation :::: error message {e}")

In [236]:
@component(
    packages_to_install=["google-cloud-bigquery","numpy","pandas", "pyarrow","tqdm"],
    base_image="python:3.9",
)
def read_data_from_bq(
    req_query: str,
    output_bq_df_file_path: Output[Dataset]
):
    try:
        from google.cloud import bigquery
        import pandas as pd
        from pprint import pprint
        import logging
        from tqdm import tqdm
        from datetime import date, timedelta
        PROJECT_ID = "demand-forecasting-330305"
        DATE_COL = "INVOICE_DATE"
        PROD_COL = "ARTICLE"
        CUST_COL = "PAYER"
        TARGET_COL = "BILLED_QTY"
        FORECAST_HORIZON = 18
        ID_COL="CVC"
        BUCKET_NAME = "gs://welspun_mlops_data"
        REGION="us-central1"
        PIPELINE_ROOT = f"{BUCKET_NAME}/welspun_project"
        # declare the BigQuery Client
        bq_client = bigquery.Client(project = PROJECT_ID)
        # form the dataframe
        df = bq_client.query(req_query).to_dataframe()

       

        #df.to_csv(output_bq_df_file_path.path,index = False)
        
        monthly_aggrigated_ecommerce_data=pd.DataFrame()
        
        for customer in tqdm(df[CUST_COL].unique()):
            customer_df = df[(df[CUST_COL]==customer)]
            for article in customer_df[PROD_COL].unique():
                sub_df = customer_df[(customer_df[PROD_COL]==article)]
                #end_date_last_day = sub_df[DATE_COL].max().replace(day=1) - timedelta(days=1)
                

                end_date_last_day = date.today().replace(day=1) - timedelta(days=1)
                print(end_date_last_day)
                end_date_last_day=pd.to_datetime(end_date_last_day)


            #filter out the transactions or drop current month demand as it is progressively get updated
                sub_df = sub_df[(sub_df[DATE_COL] <= end_date_last_day)]
                if not(sub_df.empty):
                    sub_df_copy = sub_df.copy()
                    #add a record to fill 0s for discontinued product untill last month
                    if(sub_df[DATE_COL].max() < end_date_last_day):
                        record_to_insert = {CUST_COL : customer, PROD_COL: article, DATE_COL: end_date_last_day, TARGET_COL : 0} #populate a dummy record of last month so that resampling function handles filling 0s for discontinued products 
                        sub_df = sub_df.append(record_to_insert, ignore_index=True)

                    #sub_df = sub_df.append(generate_forecast_window(end_date_last_day, FORECAST_HORIZON, article, customer, cost=sub_df["COST"].iloc[-1]), ignore_index=True)
                    #df['date'] = pd.to_datetime(df['date'])
                    sub_df[DATE_COL]=pd.to_datetime(sub_df[DATE_COL])
                    sub_df.index = sub_df[DATE_COL]
                    sub_df = sub_df.resample(rule='MS').agg({'BILLED_QTY' : 'sum'})
                    sub_df = sub_df.reset_index()
                    sub_df[CUST_COL] = customer
                    sub_df[PROD_COL] = article  
                    sub_df[ID_COL]=sub_df[CUST_COL].map(str)+'_'+sub_df[PROD_COL].map(str)
                    
                    monthly_aggrigated_ecommerce_data = monthly_aggrigated_ecommerce_data.append(sub_df, ignore_index=True)

        monthly_aggrigated_ecommerce_data.to_csv(output_bq_df_file_path.path,index = False)


        def truncate_destination_tables(list_of_tables):
                client = bigquery.Client(project=PROJECT_ID)
                for table in list_of_tables:
                    query_string = f"""
                    TRUNCATE TABLE  {table};
                    """
                    client.query(query_string).result()
        DATASET_ID = 'mlops_ecommerse_data'    
        list_of_destination_tables = [f"{PROJECT_ID}.{DATASET_ID}.{'train_data'}",f"{PROJECT_ID}.{DATASET_ID}.{'test_data'}"]
        truncate_destination_tables(list_of_destination_tables)
    except Exception as e:
            logging.error(f"ERROR OCCURED IN COMPONENT : custom training job :::: error message {e}")


   
    
    

In [237]:
@component(
    packages_to_install=["numpy","pandas","google-cloud-bigquery==2.26.0","pyarrow"],
    base_image="python:3.9",
)
def train_valid_split(
    input_file_path_from_bq: InputPath("Dataset"), 
    train_df_path: Output[Dataset],
    validation_df_path: Output[Dataset]
):
    try:
        import pandas as pd 
        import numpy as np
        import logging
        from google.cloud import bigquery
        from pprint import pprint
        df = pd.read_csv(input_file_path_from_bq)
        PROJECT_ID = "demand-forecasting-330305"
        DATE_COL = "INVOICE_DATE"
        PROD_COL = "ARTICLE"
        CUST_COL = "PAYER"
        TARGET_COL = "BILLED_QTY"
        FORECAST_HORIZON = 18
        BUCKET_NAME = "gs://welspun_mlops_data"
        REGION="us-central1"
        PIPELINE_ROOT = f"{BUCKET_NAME}/welspun_project"
        ID_COL="CVC"

        def split_time_series_for_modeling(df, evaluation_size):
            import pandas as pd 
            import numpy as np
            train_size = len(df) - evaluation_size
            train, eval = df.iloc[:train_size], df.iloc[train_size:]
            return train, eval
        train_df1=pd.DataFrame()
        eval_df1=pd.DataFrame()
        customer_articles_list_for_modeling = pd.DataFrame()
        for customer in df[CUST_COL].unique():
                    customer_df = df[df[CUST_COL] == customer]
                    for article in df[PROD_COL].unique():
                        single_article_df = customer_df[customer_df[PROD_COL] == article]
                        #if(check_criteria_for_inital_modeling(single_article_df)):
                        customer_articles_list_for_modeling = customer_articles_list_for_modeling.append({CUST_COL: customer, PROD_COL : article}, ignore_index=True)

        for customer in customer_articles_list_for_modeling[CUST_COL].unique():
                    customer_df = df[df[CUST_COL] == customer]
                    article_results_df = pd.DataFrame()
                    for article in customer_articles_list_for_modeling[PROD_COL].unique():
                        single_article_df = customer_df[customer_df[PROD_COL] == article]
                        if not(single_article_df.empty):
                            train_df, evaluation_df = split_time_series_for_modeling(single_article_df, evaluation_size = 6) #Need to applied in component 2 and only train, test dfs need to be there
                            train_df1=train_df1.append(train_df)
                            eval_df1=eval_df1.append(evaluation_df)
        #evaluation size is the number of months need to be considered for evalution set out of total data
        #train_size = len(df) - evaluation_size
        #train_df = df.iloc[:train_size]
        #valid_df = df.iloc[train_size:]
        train_df1.to_csv(train_df_path.path,index = False)
        eval_df1.to_csv(validation_df_path.path,index = False)
    except Exception as e:
            logging.error(f"ERROR OCCURED IN COMPONENT : custom training job :::: error message {e}")


   
    
    
    # return train_df, valid_df

In [238]:
@component(
    packages_to_install=["numpy","pandas","google-cloud-storage","google-cloud-bigquery==2.26.0","pyarrow==4.0.1",],
    base_image="python:3.9",
)
def save_dataset_to_gcs(
    train_dataset: InputPath("Dataset"),
    validation_dataset: InputPath("Dataset")
) -> str:
    try: 
        PROJECT_ID = "demand-forecasting-330305"
        import pandas as pd 
        from google.cloud import storage
        from google.cloud import bigquery
        from datetime import datetime
        import logging
        bigquery_train_table_name='demand-forecasting-330305.mlops_ecommerse_data.train_data'
        bigquery_test_table_name='demand-forecasting-330305.mlops_ecommerse_data.test_data'

        train_df = pd.read_csv(train_dataset)
        valid_df = pd.read_csv(validation_dataset)


        train_df.to_csv("train.csv",index = False)
        valid_df.to_csv("test.csv", index = False)

        def write_dataframe_to_bigquery_table(df, bigquery_full_table_name):
            client = bigquery.Client(project=PROJECT_ID)
            job = client.load_table_from_dataframe(df, bigquery_full_table_name)
            job.result()

        write_dataframe_to_bigquery_table(train_df, bigquery_train_table_name)
        write_dataframe_to_bigquery_table(valid_df, bigquery_test_table_name)


        def upload_to_gcs(file_to_upload):
            TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

            dataset_path = "data/{TIMESTAMP}/{file_name}".format(file_name = file_to_upload,TIMESTAMP=TIMESTAMP)

            storage_client = storage.Client(project = PROJECT_ID)

            bucket = storage_client.get_bucket("welspun_mlops_data")

            blob = bucket.blob(dataset_path)

            blob.upload_from_filename(file_to_upload)

            return dataset_path

        train_datasetpath=upload_to_gcs("train.csv")
        test_datasetpath=upload_to_gcs("test.csv")

        print("train_datasetpath",train_datasetpath)
        print("test_datasetpath", test_datasetpath)
    except Exception as e:
            logging.error(f"ERROR OCCURED IN COMPONENT : custom training job :::: error message {e}")



In [239]:
@component(
    packages_to_install=["google-cloud-bigquery==2.26.0", "pandas==1.3.0", "gcsfs==2021.6.1", "numpy==1.19.5", "pyarrow==4.0.1", "statsmodels==0.12.2", "scikit-learn==0.24.2", "pmdarima==1.8.2", 'google-cloud-aiplatform==1.3.0',"google-cloud-storage"],
    base_image="python:3.9",
    output_component_file="custom_training_jobs_batch_submission.yaml",
)
def custom_training_jobs_batch_submission(
    input_file_path_from_bq: InputPath("Dataset"),
    train_dataset_path: InputPath("Dataset"),
    validation_dataset_path: InputPath("Dataset"),
    unique_id_path: Output[Dataset],
    RUN_ID:str,
) -> str:
    
    import time
    from google.cloud import bigquery
    import pandas as pd
    import numpy as np
    from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
    import datetime
    import sys
    import json
    import gcsfs
    import logging
    from google.cloud import storage
        
    start = time.time()
    try: 
        
        PROJECT_ID = "demand-forecasting-330305"
        DATE_COL = "INVOICE_DATE"
        PROD_COL = "ARTICLE"
        CUST_COL = "PAYER"
        TARGET_COL = "BILLED_QTY"
        FORECAST_HORIZON = 18
        BUCKET_NAME = "gs://welspun_mlops_data"
        REGION="us-central1"
        PIPELINE_ROOT = f"{BUCKET_NAME}/welspun_project"
        ID_COL="CVC"
        
        
        
        PACKAGE_PATH_LIST =["gcr.io/demand-forecasting-330305/ma_package_final@sha256:ba1afe27110666701af8efa092257fdecc37f967ae68cd4294b7c561435230c6",
                           "gcr.io/demand-forecasting-330305/ar_package_final@sha256:de120e816d38010cc0623715e4009a16f30c151ae3be17229d79c06113844900",
                            "gcr.io/demand-forecasting-330305/sarimax_preestimate@sha256:7ddccccf41125da4c4014fc8d820ae3464ace0bd55e566f35aa5f6fdd3657363",
                            "gcr.io/demand-forecasting-330305/arma_preestimate@sha256:065ef13830d3fc2ffeb911a115dd1422929a306a0f3bf8e7ababc1ab9024638f",
                            "gcr.io/demand-forecasting-330305/arima_package@sha256:b65fabf0d5e7b0ca13a34af375308547d52155472e0b4ec2bc4f59282e571297",
                            "gcr.io/demand-forecasting-330305/triple_expo_package@sha256:739fbe5b5ae665b8ba9b3910e6b0f03494a200fab492c42ff44b7f04cf08f0ec",
                            "gcr.io/demand-forecasting-330305/double_exp_smothing@sha256:18fa6dd43b7c6a36fd8a96f7c4a3e8221c1fafcaf9440429f2131f2c85ea5fe3"]

        
        


        def generate_batch(skus_list, bins):
            """
            params - job_list : list of jobs to bin
                     bins : default bins 10 jobs per batch
            response - job_list_batches : dictionary containing batch_id, jobs_list pair 
            """

            skus_list_batches,lower, upper = {},0, bins
            number_of_batches = len(skus_list)//bins
            number_of_remaining_jobs = len(skus_list)%bins
            if number_of_remaining_jobs > 0:
                number_of_batches = number_of_batches + 1

            for i in range(0,number_of_batches):
                batch_id = "batch_" + str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S_')) + str(i+1)
                if(i<number_of_batches):
                    skus_list_batches[batch_id] = tuple(skus_list[lower:upper])
                    lower, upper =  upper, upper+bins
                else:
                    skus_list_batches[batch_id] = tuple(skus_list[lower:])

            return skus_list_batches

        
       
        from google.cloud import aiplatform
        from google.cloud.aiplatform import gapic as aip
        #aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

        def sumbit_vertex_ai_job(cmd_args, job_name, model_dir,PACKAGE_PATH):
            PROJECT_ID = "demand-forecasting-330305"
            api_endpoint = "us-central1-aiplatform.googleapis.com"
            client_options = {"api_endpoint": api_endpoint}
            client = aiplatform.gapic.JobServiceClient(client_options=client_options)
            custom_job = {
                "display_name": job_name,
                "job_spec": {
                    "worker_pool_specs": [
                        {
                            "machine_spec": {
                                "machine_type": "n1-standard-16",
                                #"accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
                                #"accelerator_count": 2,
                            },
                            "replica_count": 1,
                            "container_spec": {
                                "image_uri": PACKAGE_PATH,
                                "command": [],
                                "args": cmd_args,
                            },
                        }
                    ]
                },
            }
            
            parent = f"projects/{PROJECT_ID}/locations/{REGION}"
            response = client.create_custom_job(parent=parent, custom_job=custom_job)
            print("name:", response.name)
        
            return response



        df = pd.read_csv(input_file_path_from_bq)
        train_df = pd.read_csv(train_dataset_path)
        valid_df = pd.read_csv(validation_dataset_path)
        total_sku=list(df[ID_COL].unique())
        #total_sku=total_sku[0:100]
        unique_job_id=[]
        df[DATE_COL] = pd.to_datetime(df[DATE_COL])
        skus_list_batches = generate_batch(total_sku,30)
        #print(skus_list_batches)
        #unique_job_id=pd.DataFrame()
        unique_job_id = pd.DataFrame(columns=['JOB_ID'])
        for PACKAGE_PATH_ALL in PACKAGE_PATH_LIST:
            for batch_id, skus_list in skus_list_batches.items():
                    CMDARGS = [
                     "--PROJECT_ID","demand-forecasting-330305",
                     "--gcs_uri_training_data",train_dataset_path,
                     "--gcs_uri_evaluation_data",validation_dataset_path,
                     "--destination_bigquery_table","demand-forecasting-330305.mlops_ecommerse_data.CUSTOM_JOB_FINAL",
                     "--FORECAST_HORIZON","18",  
                     "--CATEGORY_COL","PAYER",     
                     "--DATE_COL","INVOICE_DATE",      
                     "--PRODUCT_COL","ARTICLE",      
                     "--TARGET_COL","BILLED_QTY",
                     "--ID_COL","CVC",
                     "--RUNID_COL",RUN_ID,
                     "--FORECAST_TYPE","stastistical",
                     "--SKUS_LIST",str(skus_list)
                    ]

                    from datetime import datetime
                   
                    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
                    JOB_NAME = "welspun_Ecommerse_job_" + RUN_ID
                    MODEL_DIR = "{}/{}".format(BUCKET_NAME, JOB_NAME)
                    response=sumbit_vertex_ai_job(CMDARGS, JOB_NAME, MODEL_DIR,PACKAGE_PATH_ALL)
                    #unique_job_id=unique_job_id.append(response.name)
                    unique_job_id = unique_job_id.append({"JOB_ID": response.name}, ignore_index=True)
                    #aa = aa.append({'A':l[i]}, ignore_index=True)
                    logging.info(f"Submitted {skus_list}\n\n")
                    print(unique_job_id["JOB_ID"])
                    time.sleep(5)
            unique_job_id.to_csv(unique_id_path.path,index = False)
                #train_df1.to_csv(train_df_path.path,index = False)
    except Exception as e:
            logging.error(f"ERROR OCCURED IN COMPONENT : custom training job :::: error message {e}")


    

In [240]:
@component(
    packages_to_install=["google-cloud-bigquery==2.26.0", "pandas==1.3.0", "gcsfs==2021.6.1", "numpy==1.19.5", "pyarrow==4.0.1", "statsmodels==0.12.2", "scikit-learn==0.24.2", "pmdarima==1.8.2", 'google-cloud-aiplatform==1.3.0',"google-cloud-storage"],
    base_image="python:3.9",
    output_component_file="custom_training_jobs_batch_submission.yaml",
)
def custom_job_wait(unique_id_path: InputPath("Dataset"),
) -> str:
    import time
    start = time.time()
    try:
        from google.cloud import bigquery
        import pandas as pd
        import numpy as np
        import datetime
        import sys
        import json
        import gcsfs
        import logging
        from google.cloud import storage
        from google.cloud import aiplatform
        from google.cloud.aiplatform import gapic as aip
        
        unique_job_id=pd.read_csv(unique_id_path)
        for job_id in unique_job_id["JOB_ID"].unique():

                    def get_custom_job(name, silent=False):
                        print("for loop started")
                        print(name)
                        print("get_custom_started")
                        from google.cloud import aiplatform
                        from google.cloud.aiplatform import gapic as aip
                        PROJECT_ID = "demand-forecasting-330305"
                        api_endpoint = "us-central1-aiplatform.googleapis.com"

                        client_options = {"api_endpoint": api_endpoint}
                        client = aiplatform.gapic.JobServiceClient(client_options=client_options)

                        response = client.get_custom_job(name=name)
                        if silent:
                            return response

                        print("name:", response.name)
                        #print("display_name:", response.display_name)
                        #print("state:", response.state)
                        #print("create_time:", response.create_time)
                        #print("update_time:", response.update_time)
                        return response


                    response = get_custom_job(job_id)

                    while True:
                        print("while_started")
                        response = get_custom_job(job_id, True)
                        if response.state != aip.JobState.JOB_STATE_SUCCEEDED:
                                print("Training job has not completed:", response.state)
                                print("Training job has not completed:", response.name)

                                if response.state == aip.JobState.JOB_STATE_FAILED:
                                    break
                        else:
                                #print("Training Job Time:", response.end_time - response.start_time)
                                #print("Training Elapsed Time:", response.update_time - response.create_time)
                                print("Training job completed:", response.name)
                                print("Training completed succesfully:", response.state)
                                break
                        time.sleep(60)
    except Exception as e:
        logging.error(f"ERROR OCCURED IN COMPONENT : no_demand_training :::: error message {e}")
    
       
    logging.info("waiting")    
    
    return "done"
    
    


In [241]:
@component(
    packages_to_install=["google-cloud-bigquery==2.26.0", "pandas==1.3.0", "gcsfs==2021.6.1", "numpy==1.19.5", "pyarrow==4.0.1", "statsmodels==0.12.2", "scikit-learn==0.24.2", "pmdarima==1.8.2", 'google-cloud-aiplatform==1.3.0'],
    base_image="python:3.9",
    output_component_file="normal_demand_custom_training_jobs_batch_submission.yaml",
)
def evaluation(status:str,RUN_ID: str,evaluation_df_path: Output[Dataset],
):
    try:
        from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, mean_absolute_error,r2_score
        import numpy as np
        import pandas as pd
        from google.cloud import storage
        from google.cloud import bigquery
        from datetime import datetime
        import logging
        bigquery_evaluation_table_name='demand-forecasting-330305.mlops_ecommerse_data.EVALUATION_FINAL'

        PROJECT_ID = "demand-forecasting-330305"
        DATE_COL = "INVOICE_DATE"
        PROD_COL = "ARTICLE"
        CUST_COL = "PAYER"
        TARGET_COL = "BILLED_QTY"
        FORECAST_HORIZON = 18
        BUCKET_NAME = "gs://welspun_mlops_data"
        REGION="us-central1"
        PIPELINE_ROOT = f"{BUCKET_NAME}/welspun_project"
        ID_COL="CVC" 
        predicted_col="BILLED_QTY_PREDICTED"
        MODEL_NAME="MODEL"
        TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
        RUN_ID=f"{RUN_ID}"
        #RUN_ID= """+RUN_ID+"""
        final_query = f"""
            SELECT * from `{PROJECT_ID}.mlops_ecommerse_data.CUSTOM_JOB_FINAL` 
            ORDER BY {DATE_COL};
            """
        bq_client = bigquery.Client(project = PROJECT_ID)

        modeling_df = bq_client.query(final_query).to_dataframe()
        modeling_df.to_csv("modeling_df.csv",index = False)
        #forecast_df.to_csv("forecast_df.csv",index = False)

        #modeling_df = pd.read_csv(final_dataset)
        modeling_df=modeling_df.fillna(0)
        def upload_to_gcs(file_to_upload):

                TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

                dataset_path = "data/{TIMESTAMP}/{file_name}".format(file_name = file_to_upload,TIMESTAMP=TIMESTAMP)

                storage_client = storage.Client(project = PROJECT_ID)

                bucket = storage_client.get_bucket("welspun_mlops_data")

                blob = bucket.blob(dataset_path)

                blob.upload_from_filename(file_to_upload)

        upload_to_gcs("modeling_df.csv")

        def write_dataframe_to_bigquery_table(df, bigquery_full_table_name):
            print("WRITE Started!!!")
            client = bigquery.Client(project=PROJECT_ID)
            job = client.load_table_from_dataframe(df, bigquery_full_table_name)
            job.result()
            print("WRITE SUCESSFUL!!!")
        
        def welspun_accuracy_metric(actual, predicted):
            if(actual==predicted):
                return 100
            try:
                Error =  abs(predicted - actual) / max(predicted, actual)
                return (1 - Error) * 100
            except ZeroDivisionError:
                print ("zero division error occured")
                if(predicted<0):#if actual vs prediction is less than zero we are considering prediction is zero
                    return 0



                #evaluation_df = pd.DataFrame(columns=['RMSE','MAE','MAPE','MSE','R2_SCORE','MODEL','ID','TIMESTAMP'])
        
        evaluation_df = pd.DataFrame(columns=['RMSE','MAE','MAPE','MSE','R2_SCORE','MODEL','RUN_ID','CVC','ACCURACY'])  
        #for CVC_ID in modeling_df[ID_COL].unique()
        try:
                for ID in modeling_df[ID_COL].unique():
                        au = modeling_df[modeling_df[ID_COL] == ID]
                        models_list=au[MODEL_NAME].unique()
                        print(ID)
                        print(models_list)
                        for model in au[MODEL_NAME].unique():
                            single_article_results = modeling_df[(modeling_df[ID_COL] == ID) & (modeling_df[MODEL_NAME] == model) & (modeling_df["RUN_ID"] ==RUN_ID)]
                            if not(single_article_results.empty):
                                rmse = np.sqrt(mean_squared_error(single_article_results[TARGET_COL].to_list(), single_article_results[predicted_col].to_list()))
                                mae = mean_absolute_error(single_article_results[TARGET_COL].to_list(), single_article_results[predicted_col].to_list())
                                mape = mean_absolute_percentage_error(single_article_results[TARGET_COL].to_list(), single_article_results[predicted_col].to_list())
                                mse = mean_squared_error(single_article_results[TARGET_COL].to_list(), single_article_results[predicted_col].to_list())
                                pt=single_article_results.apply(lambda x : welspun_accuracy_metric(x[TARGET_COL],x[predicted_col]), axis=1)
                                Accuracy=pt.mean()
                                r_value = r2_score(single_article_results[TARGET_COL].to_list(), single_article_results[predicted_col].to_list())
                                evaluation_df = evaluation_df.append({"CVC":ID,"ACCURACY":Accuracy,"RMSE":rmse,"MAE":mae,"MAPE":mape,"MSE":mse,"R2_SCORE":r_value,"MODEL":model,"RUN_ID":RUN_ID}, ignore_index=True)
                                
 
        except Exception as e:
            logging.error(f"ERROR OCCURED IN COMPONENT : evaluation :::: error message {e}")
    
        write_dataframe_to_bigquery_table(evaluation_df,bigquery_evaluation_table_name)

        evaluation_df.to_csv(evaluation_df_path.path,index = False)
        
      
        
    except Exception as e:
        logging.error(f"ERROR OCCURED IN COMPONENT : evaluation :::: error message {e}")
    
    
    
    
    


In [242]:
@component(
    packages_to_install=["google-cloud-bigquery==2.26.0", "pandas==1.3.0", "gcsfs==2021.6.1", "numpy==1.19.5", "pyarrow==4.0.1", "statsmodels==0.12.2", "scikit-learn==0.24.2", "pmdarima==1.8.2", 'google-cloud-aiplatform==1.3.0'],
    base_image="python:3.9",
    output_component_file="normal_demand_custom_training_jobs_batch_submission.yaml",
)
def model_chapionship(evaluation_df_path: InputPath("Dataset"),champ_df_path:Output[Dataset],RUN_ID: str,
):
    try:
        from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error, mean_absolute_error,r2_score
        import numpy as np
        import pandas as pd
        from google.cloud import storage
        from google.cloud import bigquery
        from datetime import datetime
        import logging
        from pandas.tseries.offsets import MonthEnd
        PROJECT_ID = "demand-forecasting-330305"
        DATE_COL = "INVOICE_DATE"
        PROD_COL = "ARTICLE"
        CUST_COL = "PAYER"
        CUST_NAME_COL = "CUSTOMER_NAME_"
        TARGET_COL = "BILLED_QTY"
        FORECAST_HORIZON = 18
        BUCKET_NAME = "gs://welspun_mlops_data"
        REGION="us-central1"
        PIPELINE_ROOT = f"{BUCKET_NAME}/welspun_project"
        ID_COL="CVC" 
        predicted_col="BILLED_QTY_PREDICTED"
        MODEL_NAME="MODEL"
       
        #RUN_ID= """+RUN_ID+"""
        TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
        bigquery_forecast_error_table_name='demand-forecasting-330305.mlops_ecommerse_data.FORECAST_ERROR'
        bigquery_rolling_error_table_name='demand-forecasting-330305.mlops_ecommerse_data.ROLLING_FORECAST'
        bigquery_MERGED_Final_table_name='demand-forecasting-330305.mlops_ecommerse_data.champ_best_model'
        bigquery_MERGED_TABLE_1_table_name='demand-forecasting-330305.mlops_ecommerse_data.champ_merged_final'
        
        def write_dataframe_to_bigquery_table(df, bigquery_full_table_name):
            print("WRITE Started!!!",bigquery_full_table_name)
            client = bigquery.Client(project=PROJECT_ID)
            job = client.load_table_from_dataframe(df, bigquery_full_table_name)
            job.result()
            print("WRITE SUCESSFUL!!!")
            
        def execute_query(query_string):
            print(query_string)
            client = bigquery.Client(project=PROJECT_ID)
            return (client.query(query_string).result().to_dataframe(create_bqstorage_client=True))

        #RUN_ID=f"{RUN_ID}" 
        print(RUN_ID)
        #RUN_ID="{}".format(RUN_ID)
        RUN_ID_final='"'+RUN_ID+'"'
        print(RUN_ID)
        merge_query_string = f"""
            SELECT DISTINCT  *FROM `demand-forecasting-330305.mlops_ecommerse_data.CUSTOM_JOB_FINAL` AS model_data
            LEFT JOIN  (SELECT  * FROM  `demand-forecasting-330305.mlops_ecommerse_data.EVALUATION_FINAL`) AS transaction_data 
            on (model_data.CVC = transaction_data.CVC AND model_data.MODEL = transaction_data.MODEL AND model_data.RUN_ID = transaction_data.RUN_ID)
            where model_data.RUN_ID={RUN_ID_final}
            """
        print("merge statement ran")
        merged_data  = execute_query(merge_query_string)   
        merged_data['DT_KEY_FOR_END_DATE'] = pd.to_datetime(merged_data['INVOICE_DATE'],format="%Y%m%d") + MonthEnd(0)
        merged_data['DT_KEY_FOR_END_DATE']=merged_data['DT_KEY_FOR_END_DATE'].apply(lambda x: x.strftime('%Y%m%d'))
        merged_data['DT_KEY_FOR_END_DATE']=merged_data['DT_KEY_FOR_END_DATE'].astype(int)
      


        print("merge finished")
        write_dataframe_to_bigquery_table(merged_data,bigquery_MERGED_TABLE_1_table_name)
        final_merged_df = pd.DataFrame()
        for ID in merged_data[ID_COL].unique() :
            single_article_results = merged_data[(merged_data[ID_COL] == ID) & (merged_data['RUN_ID'] == RUN_ID)]
            rslt_df_1 = single_article_results[single_article_results['ACCURACY']==single_article_results['ACCURACY'].max()] 
            final_merged_df = final_merged_df.append(rslt_df_1,ignore_index=True)
            
        
        final_merged_df.to_csv(champ_df_path.path,index = False)
        
        write_dataframe_to_bigquery_table(final_merged_df, bigquery_MERGED_Final_table_name)
        #RUN_ID="{}".format(RUN_ID)
        rolling_forecast_string=f"""
            SELECT CVC,BILLED_QTY,BILLED_QTY_PREDICTED,BILLED_QTY_Forecasted,DT_KEY_IN,INVOICE_DATE,DT_KEY_FOR_END_DATE,DT_KEY_FOR_START_DATE,RUN_ID FROM  `demand-forecasting-330305.mlops_ecommerse_data.champ_merged_final` 
            where RUN_ID={RUN_ID_final}
            """
        forecast_error_string=f"""
            SELECT CVC,BILLED_QTY,BILLED_QTY_PREDICTED,BILLED_QTY_Forecasted,DT_KEY_IN,INVOICE_DATE,DT_KEY_FOR_END_DATE,DT_KEY_FOR_START_DATE,MODEL,RUN_ID,ACCURACY,RMSE,MAE,MAPE,MSE,R2_SCORE,FORECAST_TYPE FROM  `demand-forecasting-330305.mlops_ecommerse_data.champ_merged_final`  
             where RUN_ID={RUN_ID_final}
            """
           
        forecast_data  = execute_query(forecast_error_string)
        rolling_data  = execute_query(rolling_forecast_string)
     
       
        write_dataframe_to_bigquery_table(rolling_data ,bigquery_rolling_error_table_name)
        write_dataframe_to_bigquery_table(forecast_data, bigquery_forecast_error_table_name)
        
      
    except Exception as e:
        logging.error(f"ERROR OCCURED IN COMPONENT : evaluation :::: error message {e}")
        
     
    
    

In [243]:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [244]:
RUN_ID="run_id-{0}".format(TIMESTAMP)

In [245]:
@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="pipeline-trial",
)
def my_pipeline(RUN_ID:str="run_id-{0}".format(TIMESTAMP)):  
    query_generator_task = query_generator()
    
    read_data_from_bq_task = read_data_from_bq(query_generator_task.output)
    
    train_valid_split_task = train_valid_split(read_data_from_bq_task.outputs["output_bq_df_file_path"])
    
    save_dataset_to_gcs_task = save_dataset_to_gcs(train_valid_split_task.outputs["train_df_path"],train_valid_split_task.outputs["validation_df_path"])
    
    custom_training_jobs_batch_submission_task=custom_training_jobs_batch_submission(read_data_from_bq_task.outputs["output_bq_df_file_path"],train_valid_split_task.outputs["train_df_path"],train_valid_split_task.outputs["validation_df_path"],RUN_ID)
    
    custom_job_wait_task=custom_job_wait(custom_training_jobs_batch_submission_task.outputs["unique_id_path"])
    
    evaluation_task=evaluation( custom_job_wait_task.output,RUN_ID)
    
    model_chapionship_task=model_chapionship(evaluation_task.outputs["evaluation_df_path"],RUN_ID)
    
    
    #save_forecasted_dataset_to_gcs_task=save_forecasted_dataset_to_gcs(evaluation_task.outputs["evaluation_df_path"])
    
    
    #train_model_task = train_model(read_data_from_bq_task.outputs["output_bq_df_file_path"],train_valid_split_task.outputs["train_df_path"],train_valid_split_task.outputs["validation_df_path"])
    
    #load_final_data_to_gcs_task = load_final_data_to_gcs(train_model_task.outputs["final_df_path"])
    
    #evaluation_task=evaluation(train_model_task.outputs["final_df_path"])
    
    #write_evaluation_task=write_evaluation(evaluation_task.outputs["evaluation_df_path"])
    
    
    #load_data_task=load_data(train_model_task.outputs["final_df_path"])
    

In [246]:
compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="welspun-ecommerse-pipeline.json"
)

In [247]:
RUN_ID="run-id-{0}".format(TIMESTAMP)
job = aiplatform.PipelineJob(
    display_name="welspun-ecommerse-pipeline",
    template_path="welspun-ecommerse-pipeline.json",
    job_id="welspun-ecommerse-pipeline-{0}".format(RUN_ID),
    enable_caching=False,
)

In [None]:
job.run(sync=True)

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/610719353811/locations/us-central1/pipelineJobs/welspun-ecommerse-pipeline-run-id-20220214083831
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/610719353811/locations/us-central1/pipelineJobs/welspun-ecommerse-pipeline-run-id-20220214083831')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/welspun-ecommerse-pipeline-run-id-20220214083831?project=610719353811
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/610719353811/locations/us-central1/pipelineJobs/welspun-ecommerse-pipeline-run-id-20220214083831 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:Pi