# Importing Required Libraries:

In [1]:
# !pip install google-cloud-aiplatform
# !pip install google-cloud-storage
# !pip install kfp google-cloud-pipeline-components

In [1]:
import warnings
warnings.filterwarnings("ignore")

import logging
import os
import time
from datetime import datetime

logger = logging.getLogger("logger")
logging.basicConfig(level=logging.INFO)

from typing import NamedTuple
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, ClassificationMetrics, Metrics, component, pipeline

from google.cloud import aiplatform
from google.cloud.aiplatform import Feature, Featurestore
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp

In [2]:
# Import libraries and define constants

# Setup global variables
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

# GCP project id 
PROJECT_ID = "datacouch-vertexai"

# GCP Bucket name 
BUCKET_NAME = "datacouch-vertex-ai"
GCS_BUCKET_NAME = f"gs://{BUCKET_NAME}"

# Bucket Folder
BUCKET_FOLDER = "forecasting-pipeline"

# Experiment Name
EXPERIMENT_NAME = f"{PROJECT_ID}" 

# Pipeline
PIPELINE_FOLDER = "vertex_ai_demo"
PIPELINE_ROOT = f"{GCS_BUCKET_NAME}/{BUCKET_FOLDER}/{PIPELINE_FOLDER}"

# Display Name
DISPLAY_NAME = "sku-forecast"

# Pipeline JSON Path
PIPELINE_JSON_PKG_PATH = "../pipelines/sku_forecast.json"

# Enter the region where you want to deploy the Vertex pipeline servoces
REGION = "us-central1"

In [3]:
aiplatform.init(project=PROJECT_ID, location=REGION)

# Data Load Component

In [4]:
# # from where these base images are picked and replace it with google artifact registry images

# @component(base_image="python:3.9", 
#            packages_to_install=["fsspec", "pandas", "gcsfs"], 
#            output_component_file="../yaml_files/load_sku_data.yaml")
# def load_sku_data(url:str,
#                   dataset: Output[Dataset], 
#                   kpi_output: Output[Metrics]) -> NamedTuple("output", [("final_skus", list)]):

    
#     # Step 1. Importing Packages
#     from google.cloud import storage
#     import pandas as pd
    
#     # Step 2. Load Data
#     df = pd.read_csv(url, delimiter=",")[['material','calendar_year_month', 'qty_order_change']]
    
#     # Step 3. Data Pre-Processing
#     df['calendar_year_month'] = pd.to_datetime(df['calendar_year_month'], format='%Y%m')
    
#     # Step 3.1. Save Interim Data
#     # Step 4.1 Get List of SKU Ids
#     sku_list = list(set(df.material.values.tolist()))
    
#     # Step 4.2 Save Different csv files for each SKUs and 
#     # Create a Dictionary of SKUs and it's corresponding data
#     final_skus = []
#     for sku_id in sku_list[:2]:
#         path = dataset.path + f"_{sku_id}.csv"
#         sku_df = df[df.material == sku_id][['calendar_year_month', 'qty_order_change']].copy()
#         sku_df.to_csv(path , index=False, encoding='utf-8-sig')
#         kpi_output.log_metric(f"data_size_{sku_id}", int(sku_df.shape[0]))
#         final_skus.append({sku_id: path})
    
        
#     return  (final_skus,)

In [5]:
# from where these base images are picked and replace it with google artifact registry images

@component(base_image="python:3.9", 
           packages_to_install=["fsspec", "pandas", "gcsfs", "pyarrow", "google-cloud-aiplatform", "google-cloud-bigquery-storage"], 
           output_component_file="../yaml_files/load_sku_data.yaml")
def load_sku_data(featurestore_id: str,
                  project:str,
                  region:str,
                  sku_list: list,
                  dataset: Output[Dataset], 
                  kpi_output: Output[Metrics]) -> NamedTuple("output", [("final_skus", list)]):

    
    # Step 1. Importing Packages
    from google.cloud import storage
    import pandas as pd
    from google.cloud.aiplatform import Feature, Featurestore
    
    # Step 2. Retrieve Data from Feature Store
    fs = Featurestore(
    featurestore_name=featurestore_id,
    project=project,
    location=region
    )
    
    SERVING_FEATURE_IDS = {
    "id": ['*']
    }
    
    final_skus = [] # to create a list of dictionary and corresponding csv files
    for sku_id in sku_list:
        READ_INSTANCES_URI = f'gs://datacouch-vertex-ai/forecasting-pipeline/read_instance/{sku_id}_input.csv' 
        path = dataset.path + f"_{sku_id}_output.csv"
        df = pd.read_csv(READ_INSTANCES_URI)
        df.timestamp = pd.to_datetime(df.timestamp)
        output_df = fs.batch_serve_to_df(serving_feature_ids=SERVING_FEATURE_IDS, read_instances_df=df)
        output_df[['calendar_year_month', 'qty_order_change']].to_csv(path , index=False, encoding='utf-8-sig')
        kpi_output.log_metric(f"data_size_{sku_id}", int(output_df.shape[0]))
        final_skus.append({sku_id: path})
    
        
    return  (final_skus,)

# Training Component

In [6]:
@component(
    packages_to_install = ["fsspec", "pandas", "scikit-learn", "gcsfs", "pystan", "prophet"], 
    base_image="python:3.9", 
    output_component_file="../yaml_files/train_model.yaml"
)
def train_model(
    sku_dict:dict,
    bucket_name: str,
    # destination_blob_name: str,
    model: Output[Model], 
    kpi_output: Output[Metrics]) -> NamedTuple("output", [("filename", str)]):
    
    import pandas as pd
    import pickle
    from prophet import Prophet
    from google.cloud import storage
    
    # Step 2. Load Data
    sku_id = list(sku_dict.keys())[0]
    df = pd.read_csv(sku_dict[sku_id])
    
    # prepare expected column names
    df.columns = ['ds', 'y']
    df['ds']= pd.to_datetime(df['ds'])
    data_size = df.shape[0]
    
    # define the model
    train_model = Prophet()
    
    # fit the model
    train_model.fit(df)
    file_name = model.path + ".pkl"
    with open(file_name, 'wb') as file:
        pickle.dump(train_model, file)

    return (file_name,)

# Model Registry

In [7]:
@component(
    packages_to_install=["google-cloud-aiplatform", "kfp"],
    base_image="python:3.9",
    output_component_file="../deploy_model.yaml"
)
def deploy_model(
    sku_dict:dict,
    model: Input[Model],
    project: str,
    region: str,
    serving_container_image_uri : str, 
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
)-> NamedTuple(
  'Outputs',
  [
    ('endpoint_resource_name', str)
  ]):
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)
    
    sku_id = list(sku_dict.keys())[0]
    DISPLAY_NAME  = f"{sku_id}_forecasting"
    MODEL_NAME = f"{sku_id}_demand_forecasting"
    ENDPOINT_NAME = f"{sku_id}_demand_forecasting"
    
    def create_endpoint():
        endpoints = aiplatform.Endpoint.list(
        filter='display_name="{}"'.format(ENDPOINT_NAME),
        order_by='create_time desc',
        project=project, 
        location=region,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0]  # most recently created
        else:
            endpoint = aiplatform.Endpoint.create(
            display_name = ENDPOINT_NAME, project=project, location=region
        )
        return endpoint
    
    endpoint = create_endpoint()
    artifact_uri = str(model.uri.replace(f"model", ""))
    t = str(endpoint)
    test = t.split(": ")[-1]


    #Import a model programmatically
    
    models = aiplatform.Model.list(filter=("display_name={}").format(DISPLAY_NAME))
  
    if len(models) == 0:
      model_upload = aiplatform.Model.upload(
        display_name = DISPLAY_NAME,
        version_description="Linear Regression Model",
        version_aliases=["v1"],
        labels={"release": "dev"},
        artifact_uri = model.uri[:-6], 
        serving_container_image_uri =  serving_container_image_uri,
        serving_container_health_route=f"/v1/models/{MODEL_NAME}",
        serving_container_predict_route=f"/v1/models/{MODEL_NAME}:predict",
        serving_container_environment_variables={
        "MODEL_NAME": MODEL_NAME,
        }, 
      )
    else:
        parent_model = models[0].resource_name
        new_version_id = eval(models[0].version_id) + 1
        model_upload = aiplatform.Model.upload(
            display_name = DISPLAY_NAME, 
            artifact_uri = model.uri[:-6],
            version_description="Linear Regression Model",
            version_aliases=[f"v{new_version_id}"],
            labels={"release": "dev"},
            serving_container_image_uri =  serving_container_image_uri,
            serving_container_health_route=f"/v{new_version_id}/models/{MODEL_NAME}",
            serving_container_predict_route=f"/v{new_version_id}/models/{MODEL_NAME}:predict",
            serving_container_environment_variables={
            "MODEL_NAME": MODEL_NAME,
        },  
            parent_model = parent_model
        )
    
#     model_deploy = model_upload.deploy(
#         machine_type="n1-standard-2", 
#         endpoint=endpoint,
#         traffic_split={"0":100},
#         deployed_model_display_name=DISPLAY_NAME,
#     )

# #   Save data to the output params
#     vertex_model.uri = model_deploy.resource_name
    return ("success",)

# Inferencing

# Pipeline

In [10]:
@dsl.pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context
    name="sku-forecast-pipeline-job",
)
def pipeline(
    url: str = "gs://datacouch-vertex-ai/forecasting-pipeline/raw_data/sku_day_level.csv",
    project: str = PROJECT_ID,
    region: str = REGION, 
    display_name: str = DISPLAY_NAME,
    api_endpoint: str = REGION+"-aiplatform.googleapis.com",
    serving_container_image_uri: str = "asia-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest" # "asia-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-6:latest"
    ):
    
    
    notify_train_task = VertexNotificationEmailOp(recipients=["shadab.cs0058@gmail.com"])
    with dsl.ExitHandler(notify_train_task, name="pipeline-status"):
        data_op = load_sku_data(featurestore_id="sku_forecasting",
                  project=project,
                  region=region,
                  sku_list=['8ghjhj', 'hjkjhiu']).set_retry(num_retries=2, backoff_duration='10s', 
                                               backoff_factor=2, backoff_max_duration ='60s')
        with dsl.ParallelFor(data_op.outputs["final_skus"], parallelism=2) as sku:
            train_model_op = train_model(sku_dict=sku, bucket_name=BUCKET_NAME).after(data_op)
            deploy_model_op = deploy_model(sku_dict=sku,
            model=train_model_op.outputs["model"],
            project=project,
            region=region, 
            serving_container_image_uri = serving_container_image_uri,
            ).after(train_model_op)

In [11]:
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path=PIPELINE_JSON_PKG_PATH)

In [12]:
start_pipeline = aiplatform.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path=PIPELINE_JSON_PKG_PATH,
    pipeline_root=PIPELINE_ROOT,
    project=PROJECT_ID,
    enable_caching=True,
    location=REGION,
)

In [13]:
start_pipeline.submit()

INFO:google.cloud.aiplatform.utils.gcs_utils:Creating GCS bucket for Vertex Pipelines: "datacouch-vertex-ai"
INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/808205789435/locations/us-central1/pipelineJobs/sku-forecast-pipeline-job-20250202144600
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/808205789435/locations/us-central1/pipelineJobs/sku-forecast-pipeline-job-20250202144600')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/sku-forecast-pipeline-job-20250202144600?project=808205789435
