# Trendspotting POC

Goal of this notebook is to
* Load signals data into a managed vertex dataset for time series forecasting
* Create a forecast prediction model for each term, geo and category combination
* Project the forecasts on a holdout set of data to assess performance and trends
* Clean up results of test predictions
* Cluster test predictions
* Create dashboard for backtesting

End users would take this parameterized pipeline to produce futurama backtests using clustering and forecasting

[Source Control Link](https://source.cloud.google.com/cpg-cdp/trendspotting/+/master:pipeline_train.ipynb)

When run - the piepline will look something like this:

![pipeline example](img/pipeline_example.png)

[Link to pipeline](https://pantheon.corp.google.com/vertex-ai/locations/us-central1/pipelines/runs/report-pipe-trendspotting-pipeline-20220309212043?authuser=0&project=cpg-cdp)
## Install packages, create bucket (only run once)

In [1]:
# ! pip3 install -U google-cloud-storage $USER_FLAG
# ! pip3 install kfp google-cloud-pipeline-components
# # !git clone https://github.com/kubeflow/pipelines.git
# # !pip install pipelines/components/google-cloud/.
# !pip install google-cloud-aiplatform
# from google_cloud_pipeline_components.v1.bigquery import BigqueryCreateModelJobOp

### Import libs and types for KFP pipeline

In [2]:
# !pip install protobuf==3.2.0
# ! pip install google-cloud-pipeline-components==2.9.0

In [2]:
from datetime import datetime
import json
import os
from IPython.display import clear_output
from google_cloud_pipeline_components.v1.bigquery import BigqueryCreateModelJobOp

from google.cloud import aiplatform

from google_cloud_pipeline_components.v1 import dataset as dataset_pipeline_components
from google_cloud_pipeline_components.v1.automl import training_job as training_pipeline_components

import kfp
import kfp.dsl

In [3]:
PROJECT_ID = "cpg-cdp"
VERTEX_PROJECT = PROJECT_ID
LOCATION = "us-central1"
BUCKET = "gs://trendspotting-pipeline"
SA = "vertex-pipelines@cpg-cdp.iam.gserviceaccount.com"

One time - create the bucket

In [4]:
# ! gsutil mb -l $LOCATION $BUCKET

In [5]:
PIPELINES = {}

PIPELINES_FILEPATH = BUCKET

if os.path.isfile(PIPELINES_FILEPATH):
    with open(PIPELINES_FILEPATH) as f:
        PIPELINES = json.load(f)
else:
    PIPELINES = {}


def save_pipelines():
    with open(PIPELINES_FILEPATH, "w") as f:
        json.dump(PIPELINES, f)

### Pipeline meta-paramter settings

In [6]:
SERVICE_ACCOUNT = (SA,)  # <--- TODO: Change This if needed
N_CLUSTERS = 20
# BQ dataset for source data source
TOP_N_RESULTS = 50
# TODO: Forecasting Configuration:
HISTORY_WINDOW_n = 52  #  {type: 'integer'} # context_window
FORECAST_HORIZON = 52  #  {type: 'integer'}
BUDGET_MILLI_NODE_HOURS = 20000
BUDGET_MILLI_NODE_HOURS_CLUSTER = 1000
BUDGET_HOURS_CLASSIFICATION = 1
CATEGORY_ID = 10889

SRC_TABLE_ID = "cuisines_10889_thailand_2764"
# SRC_TABLE_ID = 'cuisines_10889_malaysia_2458'
# SRC_TABLE_ID = 'skincare_10047_unitedstates_2840'

SRC_TABLE = f"{PROJECT_ID}.trends_data.{SRC_TABLE_ID}"
K_MEANS_MODEL_NAME = f"cpg-cdp.trendspotting.{SRC_TABLE_ID}_trendspotting_{N_CLUSTERS}"
MODEL_NAME = f"cpg-cdp.trendspotting.{SRC_TABLE_ID}_kmeans_{N_CLUSTERS}"

VERSION = "v7_1"

TRAIN_ST = "2021-08-29"
TRAIN_END = "2022-02-13"
VALID_ST = "2022-02-13"
VALID_END = "2022-02-27"
PREDICT_ON_DT = "2022-03-06"
SIX_MONTH_DT = "2022-10-02"

categories = ["NA"]

### Component Code is organized in the `src/components` module

### Pipeline 

Uses custom components, also uses reusable vertex components for creating the training dataset and training the forecast models

Notice the output for testing in BQ is set by `target_table`, assigned to `export_evaluated_data_items_bigquery_destination_uri`

In [9]:
from src.components import components

In [15]:
PIPELINE_TAG = f"{SRC_TABLE_ID}-trendspotting-pipeline-{VERSION}"  # <--- TODO; optionally name pipeline


@kfp.dsl.pipeline(
    name=f"{PIPELINE_TAG}".replace("_", "-"),
    pipeline_root=PIPELINES_FILEPATH,
)
def pipeline(
    vertex_project: str,
    location: str,
    version: str,
    ds_display_name_terms: str,
    ds_display_name_cluster: str,
    label_table: str,
    scored_classification_table: str,
    classification_train_table: str,
    classification_model_name: str,
    classification_model_budget: int,
    auto_cluster_train_table: str,
    auto_min_cluster: int,
    auto_max_cluster: int,
    auto_cluster_target_table: str,
    label_list: list,
    train_st: str,
    train_end: str,
    valid_st: str,
    valid_end: str,
    predict_on_dt: str,
    fix_embed_target: str,
    n_clusters: int,
    top_n_results: int,
    six_month_dt: str,
    source_table: str,
    target_term_forecast_table: str,
    target_cluster_forecast_table: str,
    budget_milli_node_hours: int,
    budget_milli_node_hours_cluster: int,
    context_window: int,
    forecast_horizon: int,
    top_movers_target_table: str,
    cluster_table_agg: str,
    cluster_table: str,
    subcat_id: int,
    model_name: str,
    cluster_table_agg_basic: str,
    target_cluster_forecast_table_basic: str,
    target_cluster_forecast_table_basic_partitioned: str,
    sustained_riser_table: str,
):

    embed_terms = (
        components.create_prediction_dataset_term_level(
            target_table=f"{vertex_project}.trends_pipeline.{SRC_TABLE_ID}_ETL_futurama_weekly_embed_{VERSION}",
            source_table_uri=source_table,
            train_st=train_st,
            train_end=train_end,
            valid_st=valid_st,
            valid_end=valid_end,
            subcat_id=subcat_id,
        )
        .set_display_name("Add embeddings and split data")
        .set_caching_options(True)
    )

    fix_embed = (
        components.prep_forecast_term_level(
            source_table=embed_terms.outputs["training_data_table_uri"],
            target_table=fix_embed_target,
        )
        .set_display_name("Prep Data For Training")
        .set_caching_options(True)
    )

    time_series_dataset_create_op = (
        dataset_pipeline_components.TimeSeriesDatasetCreateOp(
            display_name=ds_display_name_terms,
            bq_source=fix_embed.outputs["term_train_table"],
            project=vertex_project,
            location=location,
        )
        .set_display_name("Prep data for training")
        .set_caching_options(True)
    )

    term_forecasting_op = (
        training_pipeline_components.AutoMLForecastingTrainingJobRunOp(
            display_name=f"train-point-forecast-futurama",
            model_display_name="point-forecast-futurama",
            dataset=time_series_dataset_create_op.outputs["dataset"],
            context_window=context_window,
            forecast_horizon=forecast_horizon,
            budget_milli_node_hours=budget_milli_node_hours,
            project=vertex_project,
            location=location,
            export_evaluated_data_items=True,
            export_evaluated_data_items_override_destination=True,
            target_column="score",
            time_column="date",
            time_series_identifier_column="series_id",
            time_series_attribute_columns=[
                "sentences",
                "geo_id",
                "emb1",
                "emb2",
                "emb3",
                "emb4",
                "emb5",
                "emb6",
                "emb7",
                "emb8",
                "emb9",
                "emb10",
                "emb11",
                "emb12",
                "emb13",
                "emb14",
                "emb15",
                "emb16",
                "emb17",
                "emb18",
                "emb19",
                "emb20",
            ],
            unavailable_at_forecast_columns=["score"],
            available_at_forecast_columns=["date"],
            data_granularity_unit="week",
            data_granularity_count=1,
            predefined_split_column_name="split_col",
            optimization_objective="minimize-rmse",
            column_specs={
                "date": "timestamp",
                "geo_id": "categorical",
                "score": "numeric",
                "sentences": "categorical",
                "emb1": "numeric",
                "emb2": "numeric",
                "emb3": "numeric",
                "emb4": "numeric",
                "emb5": "numeric",
                "emb6": "numeric",
                "emb7": "numeric",
                "emb8": "numeric",
                "emb9": "numeric",
                "emb10": "numeric",
                "emb11": "numeric",
                "emb12": "numeric",
                "emb13": "numeric",
                "emb14": "numeric",
                "emb15": "numeric",
                "emb16": "numeric",
                "emb17": "numeric",
                "emb18": "numeric",
                "emb19": "numeric",
                "emb20": "numeric",
            },
            export_evaluated_data_items_bigquery_destination_uri=target_term_forecast_table,  # must be format:``bq://<project_id>:<dataset_id>:<table>``
        )
        .set_display_name("Forecast term-level")
        .set_caching_options(True)
    )

    sustained_risers_data_op = (
        components.sustained_riser_report(
            source_table=target_term_forecast_table,
            target_table=sustained_riser_table,
            top_n=top_n_results,
            predicted_on_dt=predict_on_dt,
        )
        .after(term_forecasting_op)
        .set_display_name("Create Sustained Riser Table")
        .set_caching_options(True)
    )

    top_movers_data_op = (
        components.create_top_mover_table(
            source_table=target_term_forecast_table,
            target_table=top_movers_target_table,
            predict_on_dt=predict_on_dt,
            six_month_dt=six_month_dt,
            trained_model=term_forecasting_op.outputs["model"],
            top_n_results=top_n_results,
        )
        .set_display_name("Generate the top mover table")
        .set_caching_options(True)
    )

    top_mover_post_process = (
        components.alter_topmover_schema(source_table=top_movers_target_table)
        .after(top_movers_data_op)
        .set_caching_options(True)
        .set_display_name("Adding descriptions to the output table")
    )

    # HIGH LEVEL REPORT PIPELINE STARTS HERE

    #######################################

    model_train_sql = components.get_model_train_sql(
        model_name, n_clusters, source_table, train_st, subcat_id
    )
    # tell if the scored topic tables exist

    sct_exists_task = components.if_tbl_exists(
        table_ref=label_table, project_id=vertex_project
    )
    with kfp.dsl.Condition(sct_exists_task.output == "True"):
        ### if labeled data exists, we will create a model and auto cluster each category

        train_model_op = (
            components.train_classification_model(
                target_table=scored_classification_table,
                source_table=fix_embed.outputs["term_train_table"],
                label_table=label_table,
                train_table=classification_train_table,
                classification_model_name=classification_model_name,
                project_id=vertex_project,
                classification_budget_hours=classification_model_budget,
            )
            .set_display_name("Train classification model on examples")
            .set_caching_options(True)
        )

        auto_cluster_op = (
            components.auto_cluster(
                cluster_min=auto_min_cluster,
                cluster_max=auto_max_cluster,
                labels=label_list,
                cluster_train_table=auto_cluster_train_table,
                classified_terms_table=train_model_op.output,
                target_table=auto_cluster_target_table,
                project_id=vertex_project,
            )
            .set_display_name("Auto cluster each category")
            .set_caching_options(True)
        )

        aggregate_cluster_op = (
            components.aggregate_clusters(
                source_table=cluster_table,
                category_table=auto_cluster_op.output,
                target_table=cluster_table_agg,
                train_st=train_st,
                train_end=train_end,
                valid_st=valid_st,
                valid_end=valid_end,
                model_name=model_name,
            )
            .set_display_name("Aggregate category clusters")
            .set_caching_options(True)
        )

        time_series_dataset_create_op_high_level = (
            dataset_pipeline_components.TimeSeriesDatasetCreateOp(
                display_name=ds_display_name_cluster,
                bq_source=aggregate_cluster_op.outputs["term_cluster_agg_table"],
                project=vertex_project,
                location=location,
            )
            .set_display_name("Forecast term-level")
            .set_caching_options(True)
        )

        term_forecasting_op = (
            training_pipeline_components.AutoMLForecastingTrainingJobRunOp(
                display_name=f"train-cluster-forecast-futurama",
                model_display_name="cluster-forecast-futurama",
                dataset=time_series_dataset_create_op_high_level.outputs["dataset"],
                context_window=context_window,
                forecast_horizon=forecast_horizon,
                budget_milli_node_hours=budget_milli_node_hours_cluster,
                project=vertex_project,
                location=location,
                export_evaluated_data_items=True,
                export_evaluated_data_items_override_destination=True,
                target_column="score",
                time_column="date",
                time_series_identifier_column="series_id",
                time_series_attribute_columns=[
                    "topic_id",
                    "category",
                    "comments_embed_p1",
                    "comments_embed_p2",
                    "comments_embed_p3",
                    "comments_embed_p4",
                    "comments_embed_p5",
                    "comments_embed_p6",
                    "comments_embed_p7",
                    "comments_embed_p8",
                    "comments_embed_p9",
                    "comments_embed_p10",
                    "comments_embed_p11",
                    "comments_embed_p12",
                    "comments_embed_p13",
                    "comments_embed_p14",
                    "comments_embed_p15",
                    "comments_embed_p16",
                    "comments_embed_p17",
                    "comments_embed_p18",
                    "comments_embed_p19",
                    "comments_embed_p20",
                ],
                unavailable_at_forecast_columns=["score"],
                available_at_forecast_columns=["date"],
                data_granularity_unit="week",
                data_granularity_count=1,
                predefined_split_column_name="split_col",
                optimization_objective="minimize-rmse",
                column_transformations=components.COLUMN_TRANSFORMS_CLUSTER,
                export_evaluated_data_items_bigquery_destination_uri=target_cluster_forecast_table,  # must be format:``bq://<project_id>:<dataset_id>:<table>``
            )
            .set_display_name("Forecast category clusters")
            .set_caching_options(True)
        )

    with kfp.dsl.Condition(sct_exists_task.output == "False"):
        train_k_means_op = (
            BigqueryCreateModelJobOp(
                project=PROJECT_ID, location="US", query=model_train_sql
            )
            .set_display_name("Train k-means basic BQ Model")
            .set_caching_options(True)
        )

        create_cluster_terms_op = (
            components.nlp_featurize_and_cluster(
                source_table=source_table,
                target_table=cluster_table,
                train_st=train_st,
                train_end=train_end,
                subcat_id=subcat_id,
                model_name=model_name,
                n_clusters=n_clusters,
            )
            .after(train_k_means_op)
            .set_display_name("Add NLP embeddings and cluster")
            .set_caching_options(True)
        )

        cluster_term_table_basic_post_processing = (
            components.alter_basic_cluster_term_table(source_table=cluster_table)
            .after(create_cluster_terms_op)
            .set_display_name("Altering table descriptions")
            .set_caching_options(True)
        )

        aggregate_cluster_op = (
            components.aggregate_clusters_basic(
                source_table=create_cluster_terms_op.outputs["term_cluster_table"],
                target_table=cluster_table_agg_basic,
                train_st=train_st,
                train_end=train_end,
                valid_st=valid_st,
                valid_end=valid_end,
            )
            .set_display_name("Basic clustering (unsupervised)")
            .set_caching_options(True)
        )

        time_series_dataset_create_op_high_level = (
            dataset_pipeline_components.TimeSeriesDatasetCreateOp(
                display_name=ds_display_name_cluster,
                bq_source=aggregate_cluster_op.outputs["term_cluster_agg_table"],
                project=vertex_project,
                location=location,
            )
            .set_display_name("Aggregate basic clusters")
            .set_caching_options(True)
        )

        term_forecasting_op = (
            training_pipeline_components.AutoMLForecastingTrainingJobRunOp(
                display_name=f"train-cluster-forecast-futurama",
                model_display_name="cluster-forecast-futurama",
                dataset=time_series_dataset_create_op_high_level.outputs["dataset"],
                context_window=context_window,
                forecast_horizon=forecast_horizon,
                budget_milli_node_hours=budget_milli_node_hours_cluster,
                project=vertex_project,
                location=location,
                export_evaluated_data_items=True,
                export_evaluated_data_items_override_destination=True,
                target_column="score",
                time_column="date",
                time_series_identifier_column="topic_id",
                time_series_attribute_columns=[
                    "comments_embed_p1",
                    "comments_embed_p2",
                    "comments_embed_p3",
                    "comments_embed_p4",
                    "comments_embed_p5",
                    "comments_embed_p6",
                    "comments_embed_p7",
                    "comments_embed_p8",
                    "comments_embed_p9",
                    "comments_embed_p10",
                    "comments_embed_p11",
                    "comments_embed_p12",
                    "comments_embed_p13",
                    "comments_embed_p14",
                    "comments_embed_p15",
                    "comments_embed_p16",
                    "comments_embed_p17",
                    "comments_embed_p18",
                    "comments_embed_p19",
                    "comments_embed_p20",
                ],
                unavailable_at_forecast_columns=["score"],
                available_at_forecast_columns=["date"],
                data_granularity_unit="week",
                data_granularity_count=1,
                predefined_split_column_name="split_col",
                optimization_objective="minimize-rmse",
                column_transformations=components.COLUMN_TRANSFORMS_CLUSTER,
                export_evaluated_data_items_bigquery_destination_uri=target_cluster_forecast_table_basic,  # must be format:``bq://<project_id>:<dataset_id>:<table>``\n",
            )
            .set_display_name("Forecast basic clusters")
            .set_caching_options(True)
        )

        cluster_forecast_fix_table_op = (
            components.create_partitioned_forecast_table(
                source_table=target_cluster_forecast_table_basic,
                target_table=target_cluster_forecast_table_basic_partitioned,
            )
            .after(term_forecasting_op)
            .set_display_name("Creating final partitioned table")
            .set_caching_options(True)
        )

        cluster_forecast_table_post_process = (
            components.alter_basic_cluster_forecast_table(
                source_table=target_cluster_forecast_table_basic_partitioned
            )
            .after(cluster_forecast_fix_table_op)
            .set_display_name("Adding table descriptions")
            .set_caching_options(True)
        )

  with kfp.dsl.Condition(sct_exists_task.output == "True"):
  with kfp.dsl.Condition(sct_exists_task.output == "False"):


## todo - to get explainations
drop predictions on train , use `google_cloud_pipeline_components.aiplatform.ModelBatchPredictOp`

In [16]:
kfp.compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="trendspotting.json",
)

### Set parameters for pipeline here

In [19]:
! echo Y | gcloud config set project $PROJECT_ID

Updated property [core/project].


### Run the pipeline
Follow the link to see the exectution

In [20]:
PIPELINE_PARAMETERS = {
    "subcat_id": CATEGORY_ID,
    "vertex_project": PROJECT_ID,
    "location": LOCATION,
    "version": VERSION,
    "label_table": f"{PROJECT_ID}.trends_pipeline.labels_jw_pl_{VERSION}",
    "scored_classification_table": f"{PROJECT_ID}.trends_pipeline.{SRC_TABLE_ID}_ETL_classified_terms_bqml_aml_pl_{VERSION}",
    "fix_embed_target": f"{PROJECT_ID}.trends_pipeline.{SRC_TABLE_ID}_ETL_futurama_weekly_embed_aml_pl_{VERSION}",
    "drop_embed_target": f"{PROJECT_ID}.trends_pipeline.{SRC_TABLE_ID}_ETL_futurama_weekly_no_embed_aml_pl_{VERSION}",
    "classification_train_table": f"{PROJECT_ID}.trends_pipeline.{SRC_TABLE_ID}_ETL_labeled_distinct_training_jw_pl_{VERSION}",
    "classification_model_name": f"trends_pipeline.{SRC_TABLE_ID}_bqml_distinct_pl_{VERSION}",
    "classification_model_budget": BUDGET_HOURS_CLASSIFICATION,
    "auto_min_cluster": 2,
    "auto_max_cluster": 9,
    "auto_cluster_train_table": f"{PROJECT_ID}.trends_pipeline.{SRC_TABLE_ID}_cat_clus_train_{VERSION}",
    "auto_cluster_target_table": f"{PROJECT_ID}.trends_results.{SRC_TABLE_ID}_categoryclusters_{VERSION}",
    "label_list": categories,
    "train_st": TRAIN_ST,
    "train_end": TRAIN_END,
    "valid_st": VALID_ST,
    "valid_end": VALID_END,
    "predict_on_dt": PREDICT_ON_DT,
    "six_month_dt": SIX_MONTH_DT,
    "context_window": HISTORY_WINDOW_n,
    "forecast_horizon": FORECAST_HORIZON,
    "budget_milli_node_hours": BUDGET_MILLI_NODE_HOURS,
    "ds_display_name_terms": f"{SRC_TABLE_ID}-futurama-term-forecasts-{VERSION}",
    "ds_display_name_cluster": f"{SRC_TABLE_ID}-futurama-clusters-{VERSION}",
    "k_means_name": K_MEANS_MODEL_NAME,
    "n_clusters": N_CLUSTERS,
    "top_n_results": TOP_N_RESULTS,
    "cluster_table_agg": f"{PROJECT_ID}.trends_pipeline.{SRC_TABLE_ID}_ETL_futurama_weekly_embed_cluster_agg_{N_CLUSTERS}_{VERSION}",
    "target_cluster_forecast_table_basic": f"{PROJECT_ID}.trends_pipeline.{SRC_TABLE_ID}_predictions_cluster_basic_{N_CLUSTERS}_{VERSION}",
    "target_cluster_forecast_table_basic_partitioned": f"{PROJECT_ID}.trends_results.{SRC_TABLE_ID}_predictions_cluster_basic_{N_CLUSTERS}_{VERSION}",
    "cluster_table": f"{PROJECT_ID}.trends_results.{SRC_TABLE_ID}_categoryclusters_basic_{N_CLUSTERS}_{VERSION}",
    "cluster_table_agg_basic": f"{PROJECT_ID}.trends_pipeline.{SRC_TABLE_ID}_ETL_futurama_weekly_embed_cluster_agg_basic_{N_CLUSTERS}_{VERSION}",
    "override": "false",
    "source_table": SRC_TABLE,
    "target_term_forecast_table": f"{PROJECT_ID}.trends_pipeline.{SRC_TABLE_ID}_ETL_predict_{VERSION}",
    "target_cluster_forecast_table": f"{PROJECT_ID}.trends_results.{SRC_TABLE_ID}_predictions_{VERSION}",
    "top_movers_target_table": f"{PROJECT_ID}.trends_results.{SRC_TABLE_ID}_topmovers_{VERSION}",
    "budget_milli_node_hours_cluster": BUDGET_MILLI_NODE_HOURS_CLUSTER,
    "model_name": MODEL_NAME,
    "sustained_riser_table": f"{PROJECT_ID}.trends_results.{SRC_TABLE_ID}_sustained_risers_{VERSION}",
}

job = aiplatform.PipelineJob(
    display_name=f'trendspotting_{PIPELINE_PARAMETERS["subcat_id"]}_{VERSION}',
    template_path="trendspotting.json",
    pipeline_root=PIPELINES_FILEPATH,
    parameter_values=PIPELINE_PARAMETERS,
    project=PROJECT_ID,
    location=LOCATION,
    enable_caching=False,
)

job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/939655404703/locations/us-central1/pipelineJobs/cuisines-10889-thailand-2764-trendspotting-pipeline-v7-1-20240221132548
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/939655404703/locations/us-central1/pipelineJobs/cuisines-10889-thailand-2764-trendspotting-pipeline-v7-1-20240221132548')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/cuisines-10889-thailand-2764-trendspotting-pipeline-v7-1-20240221132548?project=939655404703
