In [None]:
# !pip install -q vertexai==1.49.0 --upgrade
# !pip install -q google-cloud-pipeline-components==2.6.0 --upgrade
# !pip install -q kfp==2.4.0 --upgrade
# !pip install -q python-dotenv

### Imports


In [None]:
import kfp

from kfp.dsl import pipeline
from kfp.dsl import component
from kfp.dsl import OutputPath

from kfp.v2.dsl import (
    Artifact,
    Dataset,
    Input,
    Model,
    Output,
    Metrics,
    component,
    Markdown,
    HTML,
)

from kfp import compiler

from google.cloud import aiplatform

import json

from rich import print

In [None]:
from dotenv import dotenv_values

config = dotenv_values(".env")
PROJECT_ID = config["PROJECT_ID"]
PIPELINE_ROOT = config["PIPELINE_ROOT"]
LOCATION = config["LOCATION"]
SERVICE_ACCOUNT = config["SERVICE_ACCOUNT"]

### Authentication


In [None]:
aiplatform.init(
    project=PROJECT_ID,
    staging_bucket=PIPELINE_ROOT,
    location=LOCATION,
)

## Components


### Download Data


In [None]:
@component(
    packages_to_install=[
        "pandas==2.2.2",
        "google-cloud-aiplatform==1.49.0",
        "google-cloud-bigquery==3.15.0",
        "pyarrow==12.0.1",
        "db-dtypes==1.1.1",
    ],
    base_image="python:3.10.6",
)
def download_data(table_id: str, credentials: dict, dataset: Output[Dataset]):
    """
    Downloads data from a BigQuery table and saves it as a CSV file.

    Args:
        table_id (str): The ID of the BigQuery table to download data from.
        credentials (dict): A dictionary containing the credentials information.
        dataset (Output[Dataset]): The output dataset where the CSV file will be saved.

    Returns:
        None
    """
    import pandas as pd
    from google.cloud import bigquery
    import os
    import json

    credentials_info = json.loads(json.dumps(credentials))
    with open("credentials.json", "w") as f:
        json.dump(credentials_info, f)
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./credentials.json"

    client = bigquery.Client(location="EU")
    query = f"""
    SELECT * FROM `{table_id}`
    """
    df = client.query(query).to_dataframe()

    df.to_csv(dataset.path, index=False)

### Preprocess Data


In [None]:
@component(
    packages_to_install=[
        "pandas==2.2.2",
        "pyarrow==12.0.1",
    ],
    base_image="python:3.10.6",
)
def preprocess_data(input_data: Input[Dataset], output_data: Output[Dataset]):
    import pandas as pd

    df = pd.read_csv(input_data.path)

    df["timestamp"] = pd.to_datetime(df["timestamp"])

    df["rank"] = df.groupby("timestamp")["page_views"].rank(
        ascending=False, method="first"
    )
    df["next_timestamp_rank"] = (
        df.sort_values("timestamp").groupby("item_id")["rank"].shift(-1)
    )
    df = df.sort_values("timestamp")

    def lag_features(df, lag_hours):
        lagged_df = df.reset_index().copy()
        lagged_df["timestamp"] = lagged_df["timestamp"] + pd.Timedelta(hours=lag_hours)
        lagged_df = lagged_df.set_index(["item_id", "timestamp"])
        lagged_df = lagged_df[["page_views", "impressions", "clicks"]]
        lagged_df.columns = [f"{col}_lag_{lag_hours}" for col in lagged_df.columns]
        return lagged_df

    df = df.set_index(["item_id", "timestamp"])

    df = df.join(lag_features(df, 4), how="left")
    df = df.join(lag_features(df, 8), how="left")
    df = df.join(lag_features(df, 12), how="left")
    df = df.join(lag_features(df, 16), how="left")

    df = df.fillna(0)
    df = df.reset_index()

    # df.to_csv(output_data.path, index=False)
    df.to_parquet(output_data.path, index=False)

### Train-Test Split


In [None]:
@component(
    packages_to_install=[
        "pandas==2.2.2",
        "pyarrow==12.0.1",
    ],
    base_image="python:3.10.6",
)
def train_test_split(
    input_data: Input[Dataset], train_data: Output[Dataset], test_data: Output[Dataset]
):
    """
    Splits the input dataset into train and test datasets based on timestamps.

    Args:
        input_data (Input[Dataset]): The input dataset to be split.
        train_data (Output[Dataset]): The output train dataset.
        test_data (Output[Dataset]): The output test dataset.
    """
    import pandas as pd

    df = pd.read_parquet(input_data.path)

    target_col = "next_timestamp_rank"
    baseline_target_col = "rank"

    df = df[
        df[target_col] < 30
    ]  # remove items that are not in the top 30, for NDCG to work in XGBRanker
    df = df[df[target_col] != 0]

    # invert the ranks
    df[target_col] = 31 - df[target_col]
    df[baseline_target_col] = 31 - df[baseline_target_col]

    timestamp_array = df.reset_index().sort_values("timestamp")["timestamp"].unique()
    train_test_split_loc = int(len(timestamp_array) * 0.8)
    train_timestamps = timestamp_array[:train_test_split_loc]
    test_timestamps = timestamp_array[train_test_split_loc:]

    train_df = df[df["timestamp"].isin(train_timestamps)]
    test_df = df[df["timestamp"].isin(test_timestamps)]

    train_df.to_parquet(train_data.path, index=False)
    test_df.to_parquet(test_data.path, index=False)

### Train Model


In [None]:
@component(
    packages_to_install=[
        "pandas==2.2.2",
        "xgboost==2.0.1",
        "scikit-learn==1.3.0",
        "pyarrow==12.0.1",
    ],
    base_image="python:3.10.6",
)
def train_model(
    train_data: Input[Dataset],
    baseline_train_metric: Output[Metrics],
    train_metrics: Output[Metrics],
    # test_metrics: Output[Metrics],
    model: Output[Model],
    # sample_output: Output[HTML],
):
    """
    Trains a ranking model using XGBoost on the given training data.

    Args:
        train_data (Input[Dataset]): The input training dataset.
        baseline_train_metric (Output[Metrics]): The output metrics for the baseline model.
        train_metrics (Output[Metrics]): The output metrics for the trained model.
        model (Output[Model]): The output trained model.
    """
    import pandas as pd
    import xgboost as xgb
    from sklearn.metrics import ndcg_score

    df = pd.read_parquet(train_data.path)

    feature_cols = [
        "page_views",
        "impressions",
        "clicks",
        "rank",
        "page_views_lag_4",
        "impressions_lag_4",
        "clicks_lag_4",
        "page_views_lag_8",
        "impressions_lag_8",
        "clicks_lag_8",
        "page_views_lag_12",
        "impressions_lag_12",
        "clicks_lag_12",
        "page_views_lag_16",
        "impressions_lag_16",
        "clicks_lag_16",
    ]
    target_col = "next_timestamp_rank"
    baseline_target_col = "rank"

    dtrain = xgb.DMatrix(df[feature_cols], label=df[target_col])

    # Set parameters
    params = {
        "objective": "rank:ndcg",  # Ranking objective
        "eval_metric": "ndcg@10",  # Normalized Discounted Cumulative Gain at 10
        "max_depth": 5,
        "eta": 0.01,  # Learning rate
        "seed": 42,  # Random seed for reproducibility
    }

    # Train the model
    num_rounds = 1000  # Number of boosting rounds
    ranker = xgb.train(
        params,
        dtrain,
        num_rounds,
        evals=[(dtrain, "train")],
        verbose_eval=10,
    )

    y_train_pred = ranker.predict(dtrain)

    baseline_train_metric.log_metric(
        "ndcg", ndcg_score([df[target_col]], [df[baseline_target_col]], k=10)
    )
    train_metrics.log_metric("ndcg", ndcg_score([df[target_col]], [y_train_pred], k=10))

    ranker.save_model(model.path)

### Test Model


In [None]:
@component(
    packages_to_install=[
        "pandas==2.2.2",
        "pyarrow==12.0.1",
        "xgboost==2.0.1",
        "scikit-learn==1.3.0",
        "google-cloud-aiplatform==1.49.0",
        "tabulate",  # for Markdown
    ],
    base_image="python:3.10.6",
)
def test_model(
    test_data: Input[Dataset],
    model: Input[Model],
    baseline_test_metric: Output[Metrics],
    test_metrics: Output[Metrics],
    sample_output: Output[Markdown],
):
    """
    Test the trained model using the provided test data.

    Args:
        test_data (Input[Dataset]): Input dataset containing the test data.
        model (Input[Model]): Input model to be tested.
        baseline_test_metric (Output[Metrics]): Output metrics for the baseline test.
        test_metrics (Output[Metrics]): Output metrics for the test.
        sample_output (Output[Markdown]): Output markdown file containing the sample predictions.

    Returns:
        None
    """
    import pandas as pd
    import xgboost as xgb
    from sklearn.metrics import ndcg_score

    df = pd.read_parquet(test_data.path)

    feature_cols = [
        "page_views",
        "impressions",
        "clicks",
        "rank",
        "page_views_lag_4",
        "impressions_lag_4",
        "clicks_lag_4",
        "page_views_lag_8",
        "impressions_lag_8",
        "clicks_lag_8",
        "page_views_lag_12",
        "impressions_lag_12",
        "clicks_lag_12",
        "page_views_lag_16",
        "impressions_lag_16",
        "clicks_lag_16",
    ]
    target_col = "next_timestamp_rank"
    baseline_target_col = "rank"

    dtest = xgb.DMatrix(df[feature_cols], label=df[target_col])

    ranker = xgb.Booster()
    ranker.load_model(model.path)

    y_test_pred = ranker.predict(dtest)

    baseline_test_metric.log_metric(
        "ndcg", ndcg_score([df[target_col]], [df[baseline_target_col]], k=10)
    )
    test_metrics.log_metric("ndcg", ndcg_score([df[target_col]], [y_test_pred], k=10))

    df_sample = df[df.timestamp == df.timestamp.unique()[-1]].set_index("item_id")
    X_sample = df_sample[feature_cols]
    # y_sample = df_sample[target_col]
    d_sample = xgb.DMatrix(X_sample)
    y_sample_pred = ranker.predict(d_sample)

    y_sample_pred_rank = (
        pd.DataFrame(
            pd.Series(y_sample_pred, index=X_sample.index).rank(
                ascending=True, method="first"
            )
        )
        .reset_index()
        .rename(columns={0: "rank"})
    )
    y_sample_pred_rank = y_sample_pred_rank.sort_values("rank")
    y_sample_pred_rank["timestamp"] = df_sample["timestamp"].values[0]
    with open(sample_output.path, "w") as f:
        f.write(y_sample_pred_rank.to_markdown())

### Upload Model


In [None]:
@component(
    packages_to_install=["google-cloud-aiplatform==1.3.0"],
    base_image="python:3.10.6",
)
def upload_model(
    model: Input[Model],
    project: str,
    region: str,
):
    """
    Uploads a model to Google Cloud AI Platform.

    Args:
        model (Input[Model]): The model to upload.
        project (str): The Google Cloud project ID.
        region (str): The region where the model will be deployed.

    Returns:
        None
    """
    import logging
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)

    logging.basicConfig(level=logging.DEBUG)
    logging.debug(model)

    print(model)
    print(model.uri)

    import os

    path, file = os.path.split(model.uri)

    uploaded_model = aiplatform.Model.upload(
        display_name="trending-content-ranker",
        artifact_uri=path,
        serving_container_image_uri=f"europe-docker.pkg.dev/{project}/custom-repo/xgboost-image:tag1",
    )

## Pipeline


### Trending Pipeline Steps:

1. **Download Data:**
   - Fetch data from BigQuery table "aicamp_2024.trending_training_view".
   - Convert data to CSV format.
   - Store data in a Dataset artifact.
2. **Preprocess Data:**
   - Read data from Dataset artifact.
   - Convert timestamp column to datetime format.
   - Calculate rank and lagged features for each item based on timestamp and page views, impressions, and clicks.
   - Fill missing values with 0.
   - Store preprocessed data in a Dataset artifact.
3. **Train-Test Split:**
   - Read data from Dataset artifact.
   - Filter data to only include items with next timestamp rank less than 30 and not equal to 0.
   - Split data into training and testing sets based on timestamps.
   - Store training and testing data in separate Dataset artifacts.
4. **Train Model:**
   - Read training data from Dataset artifact.
   - Train an XGBoost model using NDCG as the objective function.
   - Log training metrics (NDCG score).
   - Save the trained model in a Model artifact.
5. **Test Model:**
   - Read testing data from Dataset artifact.
   - Load trained model from Model artifact.
   - Calculate and log testing metrics (NDCG score).
   - Generate sample predictions and store them in Markdown format.
6. **Upload Model:**
   - Load trained model from Model artifact.
   - Upload the model to Vertex AI with the specified display name, artifact URI, and serving container image URI.


### Define Pipeline


In [None]:
@pipeline(
    name="treding_training_pipeline",
    pipeline_root=PIPELINE_ROOT + "treding_training_pipeline",
)
def trending_pipeline():
    """
    This function defines a pipeline for training a trending model.

    The pipeline consists of the following steps:
    1. Downloading data from a specified table using service account credentials.
    2. Preprocessing the downloaded data.
    3. Splitting the preprocessed data into train and test sets.
    4. Training a model using the train set.
    5. Testing the trained model using the test set.
    6. Uploading the trained model to a specified project and region.

    Each step in the pipeline has specified CPU and memory limits.

    Returns:
        None
    """
    with open("service_account.json", "r") as f:
        raw_credential = json.load(f)

    download_data_job = (
        download_data(
            table_id="aicamp_2024.trending_training_view", credentials=raw_credential
        )
        .set_cpu_limit("2")
        .set_memory_limit("8G")
    )
    print(download_data_job.outputs)

    preprocess_data_job = (
        preprocess_data(input_data=download_data_job.outputs["dataset"])
        .set_cpu_limit("4")
        .set_memory_limit("16G")
    )
    print(preprocess_data_job.outputs)

    train_test_split_job = (
        train_test_split(input_data=preprocess_data_job.outputs["output_data"])
        .set_cpu_limit("4")
        .set_memory_limit("16G")
    )

    train_model_job = (
        train_model(train_data=train_test_split_job.outputs["train_data"])
        .set_cpu_limit("12")
        .set_memory_limit("32G")
    )
    print(train_model_job.outputs)

    test_model_job = (
        test_model(
            test_data=train_test_split_job.outputs["test_data"],
            model=train_model_job.outputs["model"],
        )
        .set_cpu_limit("4")
        .set_memory_limit("16G")
    )
    print(test_model_job.outputs)

    upload_model_job = (
        upload_model(
            model=train_model_job.outputs["model"],
            project=PROJECT_ID,
            region=LOCATION,
        )
        .set_cpu_limit("1")
        .set_memory_limit("2G")
    )
    print(upload_model_job.outputs)

### Compile Pipeline


In [None]:
compiler.Compiler().compile(
    pipeline_func=trending_pipeline,
    package_path="pipelines/treding_training_pipeline.json",
)

`compiler.Compiler():` This creates an instance of the Compiler class.

`.compile():` This is a method of the Compiler class. It's used to compile a pipeline function into a file.

`pipeline_func=trending_pipeline:` This is the pipeline function that you want to compile. trending_pipeline is a function that defines your pipeline.

`package_path="pipelines/trending_training_pipeline.json":` This is the path where the output JSON file will be written. The compiled pipeline will be saved in this file.


### Run Pipeline


In [None]:
job = aiplatform.PipelineJob(
    display_name="treding_training_pipeline",
    template_path="pipelines/treding_training_pipeline.json",
    enable_caching=True,
)
job.run()