# Import libraries

In [None]:
import os
import sys

from google.cloud import aiplatform
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component

# Setup variables

In [None]:
PROJECT_ID = "airesearch-1409"
REGION = "europe-west4"
IMAGE_URI = "europe-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-12:latest"
BUCKET_NAME = "gs://attributes_models/base_model"
PIPELINE_ROOT = f"{BUCKET_NAME}"

In [None]:
os.environ["BUCKET_NAME"] = BUCKET_NAME
os.environ["SYS_EXE_PY_CONDA"] = sys.executable

# Make package

In [None]:
%%bash
set -e
cd ../
$SYS_EXE_PY_CONDA -m build
gsutil cp ./dist/*.whl $BUCKET_NAME

# Components

In [None]:
@component(packages_to_install=["pandas"])
def create_castors(data_root: str) -> str:
    """Create castors data csv file"""
    from glob import glob
    import os

    import pandas as pd

    path_ = os.path.join(data_root, "annotations/castors.csv")
    if not os.path.isfile(path_):
        num = len("/gcs/hm_images/images/")

        file_list = glob(data_root + "/images/**/*.jpg", recursive=True)
        castors = [int(os.path.basename(path)[:-4]) for path in file_list]
        path_list = [path[num:] for path in file_list]

        out = pd.DataFrame(data={"path": path_list, "castor": castors})

        out.to_csv(path_, index=False)

    return path_

In [None]:
@component(packages_to_install=["pandas", "pyarrow", "scikit-learn"])
def create_datasets(data_root: str, castors_path: str, out_path: str) -> str:
    """Create training and test data csv file"""
    import json
    import os

    import pandas as pd
    from sklearn.model_selection import GroupShuffleSplit

    cols = [
        "product_code",
        "article_code",
        "product_age_group",
        "product_waist_rise",
        "product_sleeve_length",
        "product_garment_length",
        "product_fit",
        "product_sleeve_style",
        "product_neck_line_style",
        "product_collar_style",
        "article_presentation_color_group",
    ]

    if not os.path.isfile(os.path.join(out_path, "annotations/train.csv")):
        padma = pd.read_parquet(
            os.path.join(data_root, "dma/product_article_datamart.parquet"),
            columns=["product_code", "article_code", "castor"]
        )
        pim = pd.read_parquet(os.path.join(data_root, "dim/dim_pim.parquet"), columns=cols)
        castors = pd.read_csv(castors_path)

        # Clean data tables
        padma = padma.drop_duplicates()
        padma.castor = padma.castor.astype(int)

        pim = pim.dropna(axis=0, subset=["article_code"])
        pim = pim.drop_duplicates()

        # Process PIM data
        out = []
        for col in cols:
            out.append(pim[col].apply(lambda x: json.loads(x) if x and "[" in x else x))
        tmp = pd.concat(out, axis=1)
        out = []
        for col in cols[2:]:
            out.append(pd.get_dummies(tmp[col].explode()).reset_index().groupby("index").max())
        res = pd.concat(out, axis=1)
        res = pd.concat([pim[cols[:2]], res], axis=1)

        # Merge pim and padma table
        data = res.merge(padma, on=["product_code", "article_code"], how="left")
        data.dropna(inplace=True)
        data = data.drop(axis=1, labels=["product_code", "article_code"])

        # Merge castor data to get output
        out = castors.merge(data, on="castor", how="inner")

        # Split data into training and test dataset
        gss = GroupShuffleSplit(n_splits=1, train_size=0.9, random_state=42)
        train_idxs, test_idxs = next(gss.split(X=out.path, groups=out.castor))
        out.drop(columns=["castor"], axis=1, inplace=True)
        train = out.iloc[train_idxs, :]
        test = out.iloc[test_idxs, :]

        # Write output files
        train.to_csv(os.path.join(out_path, "train.csv"), index=False)
        test.to_csv(os.path.join(out_path, "test.csv"), index=False)
    return os.path.basename(castors_path)

In [None]:
@component(base_image=IMAGE_URI)
def train(project_id: str, region: str, bucket_name: str, data_dir: str) -> None:
    """Training component"""
    from google.cloud import aiplatform

    SERVICE_ACCOUNT = "vertex-ai-training@airesearch-1409.iam.gserviceaccount.com"
    IMAGE_URI = "europe-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-12:latest"
    TB_RESOURCE_NAME = f"projects/184243724142/locations/{region}/tensorboards/4596222486894346240"

    DISPLAY_NAME = "attributes_model"
    MODULE_NAME = "trainer.train"
    GCS_OUTPUT_URI_PREFIX = f"{bucket_name}"
    PACKAGE_NAME = "product_attributes-0.0.1-py3-none-any.whl"

    REPLICA_COUNT = 2
    MACHINE_TYPE = "n1-standard-4"
    ACCELERATOR_COUNT = 1
    ACCELERATOR_TYPE = "NVIDIA_TESLA_T4"
    ARGS = ["--batch_size", "128", "--num_epochs", "2"]

    aiplatform.init(project=project_id, staging_bucket=bucket_name, location=region)
    tensorboard = aiplatform.Tensorboard(TB_RESOURCE_NAME)

    custom_training_job = aiplatform.CustomPythonPackageTrainingJob(
        display_name=DISPLAY_NAME,
        python_package_gcs_uri= f"{bucket_name}/{PACKAGE_NAME}",
        python_module_name=MODULE_NAME,
        container_uri=IMAGE_URI,
    )

    vertex_model = custom_training_job.run(
        args=ARGS,
        base_output_dir=GCS_OUTPUT_URI_PREFIX,
        replica_count=REPLICA_COUNT,
        machine_type=MACHINE_TYPE,
        accelerator_count=ACCELERATOR_COUNT,
        accelerator_type=ACCELERATOR_TYPE,
        tensorboard=tensorboard.resource_name,
        service_account=SERVICE_ACCOUNT,
        enable_web_access=True,
    )

# Pipeline

In [None]:
@dsl.pipeline(name="attributes-model-pipeline", pipeline_root=PIPELINE_ROOT)
def pipeline(project_id: str, region: str, bucket_name: str):
    """Attribute prediction training pipeline
    Args:
        project_id: Id for the GCP project
        region: Region in GCP
        data_root: Root directory for data images, annotations etc.
    """
    castor_task = create_castors("/gcs/hm_images")
    dataset_task = create_datasets(
        "/gcs/hdl_tables", castor_task.output, "/gcs/hm_images/annotations"
    )
    train_task = train(project_id, region, bucket_name, dataset_task.output)

In [None]:
compiler.Compiler().compile(pipeline_func=pipeline, package_path="attributes_model_pipeline.json")

In [None]:
aiplatform.init(project=PROJECT_ID, location=REGION)

job = aiplatform.PipelineJob(
    display_name="attributes-model-pipeline",
    template_path="attributes_model_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        "project_id": PROJECT_ID,
        "region": REGION,
        "bucket_name": BUCKET_NAME,
    }
)

job.submit()