# Preprocessing with NVTabular

In [None]:
# !pip install google-cloud-pipeline-components 
# !pip install google-cloud-bigquery-storage 
# !pip install kfp

In [1]:
import os
import json
from datetime import datetime
from time import time
import pandas as pd
# disable INFO and DEBUG logging everywhere
import logging
import time
from pprint import pprint

logging.disable(logging.WARNING)

# import nvtabular as nvt
# from nvtabular.ops import (
#     Categorify,
#     TagAsUserID,
#     TagAsItemID,
#     TagAsItemFeatures,
#     TagAsUserFeatures,
#     AddMetadata,
#     ListSlice
# )
# import nvtabular.ops as ops

# from merlin.schema.tags import Tags

# import merlin.models.tf as mm
# from merlin.io.dataset import Dataset
# import tensorflow as tf

from google.cloud import aiplatform as vertex_ai
from kfp.v2 import compiler

# for running this example on CPU, comment out the line below
# os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"

### Load config from setup notebook

Set the constants used in this notebook and load the config settings from the 00_environment_setup.ipynb notebook.

In [2]:
GCP_PROJECTS = !gcloud config get-value project
PROJECT_ID = GCP_PROJECTS[0]
# BUCKET_NAME = f"{PROJECT_ID}-merlintowers"
# config = !gsutil cat gs://{BUCKET_NAME}/config/notebook_env.py
# print(config.n)
# exec(config.n)

In [3]:
VERTEX_SA = '934903580331-compute@developer.gserviceaccount.com'

REGION = "us-central1"

# Define preprocess pipeline

In [4]:
# # Bucket definitions
# VERSION = 'v1-subset'
# APP = 'spotify'
# MODEL_DISPLAY_NAME = f'nvt-prep-last5-{VERSION}'
# WORKSPACE = f'gs://{BUCKET_destin}/{MODEL_DISPLAY_NAME}'

# # Docker definitions
# IMAGE_NAME = 'nvt-preprocessing'
# IMAGE_URI = f'gcr.io/{PROJECT_ID}/{IMAGE_NAME}'
# DOCKERNAME = f'nvtabular-160' # 150

# # Pipeline definitions
# PREPROCESS_PARQUET_PIPELINE_NAME = f'nvt-parquet-pipeline-{VERSION}'
# PREPROCESS_PARQUET_PIPELINE_ROOT = os.path.join(WORKSPACE, PREPROCESS_PARQUET_PIPELINE_NAME)

# print(f"VERSION: {VERSION}")
# print(f"APP: {APP}")
# print(f"MODEL_DISPLAY_NAME: {MODEL_DISPLAY_NAME}")
# print(f"WORKSPACE: {WORKSPACE}")
# print(f"IMAGE_NAME: {IMAGE_NAME}")
# print(f"IMAGE_URI: {IMAGE_URI}")
# print(f"DOCKERNAME: {DOCKERNAME}")
# print(f"PREPROCESS_PARQUET_PIPELINE_NAME: {PREPROCESS_PARQUET_PIPELINE_NAME}")
# print(f"PREPROCESS_PARQUET_PIPELINE_ROOT: {PREPROCESS_PARQUET_PIPELINE_ROOT}")

In [5]:
!pwd

/home/jupyter/merlin-on-vertex-ORIGINAL/merlin-on-vertex


In [6]:
#list the current work dir
# os.chdir('/home/jupyter/spotify-merlin')
os.chdir('/home/jupyter/merlin-on-vertex-ORIGINAL/merlin-on-vertex')
os.getcwd()

'/home/jupyter/merlin-on-vertex-ORIGINAL/merlin-on-vertex'

In [7]:
REPO_DOCKER_PATH_PREFIX = 'src'
PREPROC_SUB_DIR = 'preprocessor'
PIPELINE_SUB_DIR = 'process_pipes'

In [8]:
# Make the training subfolder
! rm -rf {REPO_DOCKER_PATH_PREFIX}/{PREPROC_SUB_DIR}
! mkdir {REPO_DOCKER_PATH_PREFIX}/{PREPROC_SUB_DIR}
! touch {REPO_DOCKER_PATH_PREFIX}/{PREPROC_SUB_DIR}/__init__.py

! rm -rf {REPO_DOCKER_PATH_PREFIX}/{PIPELINE_SUB_DIR}
! mkdir {REPO_DOCKER_PATH_PREFIX}/{PIPELINE_SUB_DIR}

## preprocessing task

In [9]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PREPROC_SUB_DIR}/preprocess_task.py

import argparse
import logging
# logging.disable(logging.WARNING)
import os
import sys
import time
import numpy as np
from typing import Dict, List, Union

from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import fsspec

import nvtabular as nvt
from merlin.io.shuffle import Shuffle
from nvtabular.ops import (
    Categorify,
    TagAsUserID,
    TagAsItemID,
    TagAsItemFeatures,
    TagAsUserFeatures,
    AddMetadata,
    ListSlice
)
import nvtabular.ops as ops
from nvtabular.utils import device_mem_size

from merlin.schema.tags import Tags
# import merlin.models.tf as mm
from merlin.io.dataset import Dataset

import tensorflow as tf

# for running this example on CPU, comment out the line below
os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"

# =============================================
# featutres
# =============================================
item_id = ["track_uri_can"] >> Categorify(dtype="int32") >> TagAsItemID() 
playlist_id = ["pid"] >> Categorify(dtype="int32") >> TagAsUserID() 


item_features_cat = [
    'artist_name_can',
    'track_name_can',
    'artist_genres_can',
]

item_features_cont = [
    'duration_ms_can',
    'track_pop_can',
    'artist_pop_can',
    'artist_followers_can',
]

playlist_features_cat = [
    'description_pl',
    'name',
    'collaborative',
]

playlist_features_cont = [
    'duration_ms_seed_pl',
    'n_songs_pl',
    'num_artists_pl',
    'num_albums_pl',
]

seq_feats_cat = [
    'artist_name_pl',
    'track_uri_pl',
    'track_name_pl',
    'album_name_pl',
    'artist_genres_pl',
]

CAT = playlist_features_cat + item_features_cat
CONT = item_features_cont + playlist_features_cont

# =============================================
# create cluster
# =============================================
def create_cluster(
    n_workers,
    device_limit_frac,
    device_pool_frac,
    memory_limit
):
    """Create a Dask cluster to apply the transformations steps to the Dataset."""
    device_size = device_mem_size()
    device_limit = int(device_limit_frac * device_size)
    device_pool_size = int(device_pool_frac * device_size)
    rmm_pool_size = (device_pool_size // 256) * 256

    cluster = LocalCUDACluster(
        n_workers=n_workers,
        device_memory_limit=device_limit,
        rmm_pool_size=rmm_pool_size,
        memory_limit=memory_limit
    )

    return Client(cluster)

# =============================================
#            Create & Save dataset
# =============================================

def create_parquet_nvt_dataset(
    # data_path,
    frac_size,
    data_prefix,
    bucket_name,
    file_pattern,
):
    """Create a nvt.Dataset definition for the parquet files."""
    
    # BUCKET = 'gs://spotify-builtin-2t'
    # DATA_PATH = f"{BUCKET}/{data_prefix}/0000000000**.snappy.parquet"
    DATA_PATH = f"gs://{bucket_name}/{data_prefix}/{file_pattern}" #0000000000**.snappy.parquet"
    logging.info(f"DATA_PATH: {DATA_PATH}")
    
    fs = fsspec.filesystem('gs')
    
    file_list = fs.glob(DATA_PATH)
        # os.path.join(data_path, '*.parquet')
    # )

    if not file_list:
        raise FileNotFoundError('Parquet file(s) not found')

    file_list = [os.path.join('gs://', i) for i in file_list]
    
    logging.info(f"Number of files: {len(file_list)}")

    # return nvt.Dataset(f"{bucket_name}/{data_prefix}/0000000000**.snappy.parquet", part_mem_fraction=frac_size)
    return nvt.Dataset(
        file_list,
        engine='parquet',
        part_mem_fraction=frac_size
  )

def save_dataset(
    dataset,
    output_path,
    output_files,
    # categorical_cols,
    # continuous_cols,
    shuffle=None,
):
    """Save dataset to parquet files to path."""
    categorical_cols=CAT
    continuous_cols=CONT

    dict_dtypes = {}
    for col in categorical_cols:
        dict_dtypes[col] = np.int32

    for col in continuous_cols:
        dict_dtypes[col] = np.float64

    dataset.to_parquet(
        output_path=output_path,
        shuffle=shuffle,
        output_files=output_files,
        dtypes=dict_dtypes,
        cats=categorical_cols,
        conts=continuous_cols,
    )

# =============================================
#            Workflow
# =============================================
def create_nvt_workflow():
    '''
    Create a nvt.Workflow definition with transformation all the steps
    '''
    item_id = ["track_uri_can"] >> Categorify(dtype="int32") >> TagAsItemID() 
    playlist_id = ["pid"] >> Categorify(dtype="int32") >> TagAsUserID() 


    item_features_cat = ['artist_name_can',
            'track_name_can',
            'artist_genres_can',
        ]

    item_features_cont = [
            'duration_ms_can',
            'track_pop_can',
            'artist_pop_can',
            'artist_followers_can',
        ]

    playlist_features_cat = [
            'description_pl',
            'name',
            'collaborative',
        ]

    playlist_features_cont = [
            'duration_ms_seed_pl',
            'n_songs_pl',
            'num_artists_pl',
            'num_albums_pl',
        ]

    seq_feats_cat = [
            'artist_name_pl',
            'track_uri_pl',
            'track_name_pl',
            'album_name_pl',
            'artist_genres_pl',
        ]

    CAT = playlist_features_cat + item_features_cat
    CONT = item_features_cont + playlist_features_cont

    item_feature_cat_node = item_features_cat >> nvt.ops.FillMissing()>> Categorify(dtype="int32") >> TagAsItemFeatures()

    item_feature_cont_node =  item_features_cont >> nvt.ops.FillMissing() >>  nvt.ops.Normalize() >> TagAsItemFeatures()

    playlist_feature_cat_node = playlist_features_cat >> nvt.ops.FillMissing() >> Categorify(dtype="int32") >> TagAsUserFeatures() 

    playlist_feature_cont_node = playlist_features_cont >> nvt.ops.FillMissing() >>  nvt.ops.Normalize() >> TagAsUserFeatures()

    playlist_feature_cat_seq_node = seq_feats_cat >> nvt.ops.FillMissing() >> Categorify(dtype="int32") >> TagAsUserFeatures()
    
    # define a workflow
    output = playlist_id + item_id \
    + item_feature_cat_node \
    + item_feature_cont_node \
    + playlist_feature_cat_node \
    + playlist_feature_cont_node \
    + playlist_feature_cat_seq_node 

    workflow = nvt.Workflow(output)
    
    return workflow

# =============================================
#            Create Parquet Dataset 
# =============================================

def create_parquet_dataset_definition(
    # data_paths,
    # recursive,
    # col_dtypes,
    frac_size,
    bucket_name,
    data_prefix,
    file_pattern,
    # sep='\t'
):
    from google.cloud import storage
    storage_client = storage.Client()
    
    DATASET_DEFINITION = f"gs://{bucket_name}/{data_prefix}/{file_pattern}"  # 0000000000**.snappy.parquet"
    
    logging.info(f'DATASET_DEFINITION: {DATASET_DEFINITION}')
    
    fs = fsspec.filesystem('gs')
    file_list = fs.glob(DATASET_DEFINITION)

    if not file_list:
        raise FileNotFoundError('Parquet file(s) not found')

    file_list = [os.path.join('gs://', i) for i in file_list]
    logging.info(f"Number of files: {len(file_list)}")
    
    return nvt.Dataset(f"{DATASET_DEFINITION}", engine='parquet', part_mem_fraction=frac_size)


def convert_definition_to_parquet(
    output_path,
    dataset,
    output_files,
    shuffle=None
):
    """Convert Parquet files to parquet and write to GCS."""
    if shuffle == 'None':
        shuffle = None
    else:
        try:
            shuffle = getattr(Shuffle, shuffle)
        except:
            print('Shuffle method not available. Using default.')
            shuffle = None

    dataset.to_parquet(
        output_path,
        shuffle=shuffle,
        output_files=output_files
    )
    
# =============================================
#            Create nv-tabular definition
# =============================================
def main_convert(args):
    
    logging.info('Beginning main-convert from preprocess_task.py...')
    logging.info(f'args.output_path: {args.output_path}')
    
    logging.info('Creating cluster')
    client = create_cluster(
        args.n_workers,
        args.device_limit_frac,
        args.device_pool_frac,
        args.memory_limit
    )
    
    logging.info('Creating parquet dataset definition')
    dataset = create_parquet_dataset_definition(
        # data_paths=args.parq_data_path,
        # recursive=False,
        bucket_name=args.bucket_name,     # 'spotify-builtin-2t', # TODO: parameterize
        data_prefix=args.data_prefix,     # 'train', # TODO: JT check
        frac_size=args.frac_size,
        file_pattern=file_pattern,
    )

    logging.info('Converting definition to Parquet')
    convert_definition_to_parquet(
        args.output_path,
        dataset,
        args.output_files
    )
    
# =============================================
#            Analyse Dataset 
# =============================================
def main_analyze(args):
    
    logging.info('Beginning main-analyze from preprocess_task.py...')
    logging.info(f'args.bucket_name: {args.bucket_name}')
    
    logging.info('Creating cluster')
    client = create_cluster(
        args.n_workers,
        args.device_limit_frac,
        args.device_pool_frac,
        args.memory_limit
    )
    
    logging.info('Creating Parquet dataset')
    dataset = create_parquet_nvt_dataset(
        # data_dir=args.parquet_data_path,
        frac_size=args.frac_size,
        data_prefix='train_data_parquet', # TODO: JT check
        bucket_name=args.bucket_name,
        file_pattern=file_pattern #"0000000000**.snappy.parquet",
    )
  
    logging.info('Creating Workflow')
    # Create Workflow
    nvt_workflow = create_nvt_workflow()
  
    logging.info('Analyzing dataset')
    nvt_workflow = nvt_workflow.fit(dataset)

    logging.info('Saving Workflow')
    nvt_workflow.save(args.output_path)
    
# =============================================
#            Transform Dataset 
# =============================================
def main_transform(args):
    
    logging.info('Beginning main-transform from preprocess_task.py...')
    logging.info(f'args.bucket_name: {args.bucket_name}')
    
    client = create_cluster(
        args.n_workers,
        args.device_limit_frac,
        args.device_pool_frac,
        args.memory_limit,
    )

    # nvt_workflow = create_nvt_workflow()
    nvt_workflow = nvt.Workflow.load(args.workflow_path, client)

    # dataset = create_parquet_nvt_dataset(
    #     args.parquet_data_path, 
    #     frac_size=args.frac_size)
    
    dataset = create_parquet_nvt_dataset(
        # data_dir=args.parquet_data_path,
        frac_size=args.frac_size,
        data_prefix='train_data_parquet', # TODO: JT check
        bucket_name=args.bucket_name,
        file_pattern=file_pattern #"0000000000**.snappy.parquet",
    )

    logging.info('Transforming Dataset')
    transformed_dataset = nvt_workflow.transform(dataset)

    logging.info('Saving transformed dataset')
    save_dataset(
        transformed_dataset,
        output_path=args.output_path,
        output_files=args.output_files,
        # categorical_cols=CAT,
        # continuous_cols=CONT,
        shuffle=nvt.io.Shuffle.PER_PARTITION,
    )
    
# =============================================
#            args
# =============================================
def parse_args():
    """Parses command line arguments."""

    parser = argparse.ArgumentParser()
  
    parser.add_argument(
        '--task',
        type=str,
        required=False
    )
    parser.add_argument(
        '--bucket_name',
        type=str,
        required=False
    )
    parser.add_argument(
        '--parquet_data_path',
        type=str,
        required=False
    )
    parser.add_argument(
        '--parq_data_path',
        required=False,
        nargs='+'
    )
    parser.add_argument(
        '--output_path',
        type=str,
        required=False
    )
    parser.add_argument(
        '--output_files',
        type=int,
        required=False
    )
    parser.add_argument(
        '--workflow_path',
        type=str,
        required=False
    )
    parser.add_argument(
        '--n_workers',
        type=int,
        required=False
    )
    parser.add_argument(
        '--frac_size',
        type=float,
        required=False,
        default=0.10
    )
    parser.add_argument(
        '--memory_limit',
        type=int,
        required=False,
        default=100_000_000_000
    )
    parser.add_argument(
        '--device_limit_frac',
        type=float,
        required=False,
        default=0.60
    )
    parser.add_argument(
        '--device_pool_frac',
        type=float,
        required=False,
        default=0.90
    )

    return parser.parse_args()

if __name__ == '__main__':
    logging.basicConfig(
        format='%(asctime)s - %(message)s',
        level=logging.INFO, 
        datefmt='%d-%m-%y %H:%M:%S',
        stream=sys.stdout
    )

    parsed_args = parse_args()

    start_time = time.time()
    logging.info('Timing task')

    if parsed_args.task == 'transform':
        main_transform(parsed_args)
    elif parsed_args.task == 'analyze':
        main_analyze(parsed_args)
    elif parsed_args.task == 'convert':
        main_convert(parsed_args)

    end_time = time.time()
    elapsed_time = end_time - start_time
    logging.info('Task completed. Elapsed time: %s', elapsed_time)

Writing src/preprocessor/preprocess_task.py


## pipe components

In [10]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PIPELINE_SUB_DIR}/pipe_components.py
"""KFP components."""

from typing import Optional
from . import config

from kfp.v2 import dsl
from kfp.v2.dsl import Artifact
from kfp.v2.dsl import Dataset
from kfp.v2.dsl import Input
from kfp.v2.dsl import Model
from kfp.v2.dsl import Output

# =============================================
#            convert_to_parquet_op
# =============================================
@dsl.component(
    base_image=config.NVT_IMAGE_URI,
    install_kfp_package=False
)
def convert_parquet_op(
    output_dataset: Output[Dataset],
    bucket_name: str,
    data_prefix: str,
    file_pattern: str,
    output_path_defined_dir: str,
    # data_dir_pattern: str,
    # data_paths: list,
    split: str,
    num_output_files: int,
    n_workers: int,
    shuffle: Optional[str] = None,
    recursive: Optional[bool] = False,
    device_limit_frac: Optional[float] = 0.6,
    device_pool_frac: Optional[float] = 0.9,
    frac_size: Optional[float] = 0.10,
    memory_limit: Optional[int] = 100_000_000_000
):
    '''
    Component to create NVTabular definition.
    
    Args:
    output_dataset: Output[Dataset]
      Output metadata with references to the converted CSV files in GCS
      and the split name.The path to the files are in GCS fuse format:
      /gcs/<bucket name>/path/to/file
    bucket: gcs bucket holding train & valid data
    data_path_prefix: file path to GCS blobl object (e.g., gs://...data/path/prefix.../blob.xxx)
    data_paths: list
    split: str
      Split name of the dataset. Example: train or valid
    shuffle: str
      How to shuffle the converted CSV, default to None. Options:
        PER_PARTITION
        PER_WORKER
        FULL
    device_limit_frac: Optional[float] = 0.6
    device_pool_frac: Optional[float] = 0.9
    frac_size: Optional[float] = 0.10
    memory_limit: Optional[int] = 100_000_000_000
    '''
    
    # =========================================================
    #            import packages
    # =========================================================
    import os
    import logging
    from google.cloud import storage
    
    storage_client = storage.Client()

    from preprocess_task import (
        create_cluster,
        create_parquet_dataset_definition,
        convert_definition_to_parquet,
        # get_criteo_col_dtypes,
    )
    
    os.environ["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"

    logging.info('Base path in %s', output_dataset.path)
    # =========================================================
    #            Define data paths
    # =========================================================
    logging.info(f'bucket_name: {bucket_name}')
    logging.info(f'data_prefix: {data_prefix}')
    
    # Write metadata
    output_dataset.metadata['split'] = split

    logging.info('Creating cluster')
    create_cluster(
        n_workers=n_workers,
        device_limit_frac=device_limit_frac,
        device_pool_frac=device_pool_frac,
        memory_limit=memory_limit
    )
    
    # logging.info(f'Creating dataset definition from: {data_path_prefix}')
    dataset = create_parquet_dataset_definition(
        bucket_name=bucket_name,
        data_prefix=data_prefix,
        frac_size=frac_size,
        file_pattern=file_pattern,
    )
    
    logging.info(f'Converting Definition to Parquet; {output_dataset.uri}')
    logging.info(f'Parquet Definition Output Path: ; {output_path_defined_dir}/{split}')
    convert_definition_to_parquet(
        output_path=f'{output_path_defined_dir}/{split}', # output_dataset.uri,
        dataset=dataset,
        output_files=num_output_files,
        shuffle=shuffle
    )
    
# =========================================================
#            analyze_dataset_op
# =========================================================
@dsl.component(
    base_image=config.NVT_IMAGE_URI,
    install_kfp_package=False
)
def analyze_dataset_op(
    parquet_dataset: Input[Dataset],
    workflow: Output[Artifact],
    output_path_defined_dir: str,
    output_path_analyzed_dir: str,
    n_workers: int,
    device_limit_frac: Optional[float] = 0.6,
    device_pool_frac: Optional[float] = 0.9,
    frac_size: Optional[float] = 0.10,
    memory_limit: Optional[int] = 100_000_000_000
):
    '''
    Component to generate statistics from the dataset.
    
    Args:
    parquet_dataset: List of strings
      Input metadata with references to the train and valid converted
      datasets in GCS and the split name.
    workflow: Output[Artifact]
      Output metadata with the path to the fitted workflow artifacts
      (statistics).
    device_limit_frac: Optional[float] = 0.6
    device_pool_frac: Optional[float] = 0.9
    frac_size: Optional[float] = 0.10
    '''
    import logging
    import nvtabular as nvt
  
    from preprocess_task import (
        create_cluster,
        create_nvt_workflow,
    )

    logging.basicConfig(level=logging.INFO)

    create_cluster(
      n_workers=n_workers,
      device_limit_frac=device_limit_frac,
      device_pool_frac=device_pool_frac,
      memory_limit=memory_limit
    )
    
    # logging.info(f'Creating Parquet dataset:{parquet_dataset.uri}')
    logging.info(f'Creating Parquet dataset output_path_defined_dir: {output_path_defined_dir}/train')
    dataset = nvt.Dataset(
        path_or_source=f'{output_path_defined_dir}/train', # TODO: JT Check "train"    # parquet_dataset.uri,
        engine='parquet',
        part_mem_fraction=frac_size,
        suffix='.parquet'
    )

    logging.info('Creating Workflow')
    # Create Workflow
    nvt_workflow = create_nvt_workflow()

    logging.info('Analyzing dataset')
    nvt_workflow = nvt_workflow.fit(dataset)

    logging.info('Saving Workflow')
    nvt_workflow.save(f'{output_path_analyzed_dir}') # workflow.path)
    
# =========================================================
#            transform_dataset_op
# =========================================================
@dsl.component(
    base_image=config.NVT_IMAGE_URI,
    install_kfp_package=False
)
def transform_dataset_op(
    workflow: Input[Artifact],
    parquet_dataset: Input[Dataset],
    transformed_dataset: Output[Dataset],
    output_path_defined_dir: str,
    output_path_transformed_dir: str,
    output_path_analyzed_dir: str,
    version: str,
    bucket_data_src: str,
    bucket_data_output: str,
    app: str,
    split: str,
    num_output_files: int,
    n_workers: int,
    shuffle: str = None,
    device_limit_frac: float = 0.6,
    device_pool_frac: float = 0.9,
    frac_size: float = 0.10,
    memory_limit: int = 100_000_000_000
):
    """Component to transform a dataset according to the workflow definitions.
    Args:
        workflow: Input[Artifact]
        Input metadata with the path to the fitted_workflow
        parquet_dataset: Input[Dataset]
              Location of the converted dataset in GCS and split name
        transformed_dataset: Output[Dataset]
        Split name of the transformed dataset.
        shuffle: str
            How to shuffle the converted CSV, default to None. Options:
                PER_PARTITION
                PER_WORKER
                FULL
    device_limit_frac: float = 0.6
    device_pool_frac: float = 0.9
    frac_size: float = 0.10
    """
    
    import os
    import logging
    import nvtabular as nvt
    from merlin.schema import Tags

    from google.cloud import storage
    from google.cloud.storage.bucket import Bucket
    from google.cloud.storage.blob import Blob

    from preprocess_task import (
        create_cluster,
        save_dataset,
    )
    def _upload_blob_gcs(gcs_uri, source_file_name, destination_blob_name):
        """Uploads a file to GCS bucket"""
        client = storage.Client()
        blob = Blob.from_string(os.path.join(gcs_uri, destination_blob_name))
        blob.bucket._client = client
        blob.upload_from_filename(source_file_name)
    
    def _read_blob_gcs(bucket_name, source_blob_name, destination_filename):
        """Downloads a file from GCS to local directory"""
        client = storage.Client()
        bucket = client.get_bucket(bucket_name)
        blob = bucket.blob(source_blob_name)
        blob.download_to_filename(destination_filename)
        

    logging.basicConfig(level=logging.INFO)

    transformed_dataset.metadata['split'] = split
    
    logging.info('Creating cluster')
    create_cluster(
        n_workers=n_workers,
        device_limit_frac=device_limit_frac,
        device_pool_frac=device_pool_frac,
        memory_limit=memory_limit
    )

   # logging.info(f'Creating Parquet dataset:gs://{parquet_dataset.uri}')
    logging.info(f'Creating Parquet dataset:{output_path_defined_dir}/{split}')
    dataset = nvt.Dataset(
        path_or_source=f'{output_path_defined_dir}/{split}', #f'gs://{parquet_dataset.uri}',
        engine='parquet',
        part_mem_fraction=frac_size,
        suffix='.parquet'
    )
    
    logging.info('Loading Workflow')
    nvt_workflow = nvt.Workflow.load(f'{output_path_analyzed_dir}') # workflow.path)

    logging.info('Transforming Dataset')
    trans_dataset = nvt_workflow.transform(dataset)

    logging.info(f'transformed_dataset.uri: {transformed_dataset.uri}')
    logging.info(f'Saving transformed dataset: {output_path_transformed_dir}/{split}')
    save_dataset(
        dataset=trans_dataset,
        output_path=f'{output_path_transformed_dir}/{split}', # transformed_dataset.uri,
        output_files=num_output_files,
        shuffle=shuffle
    )
    logging.info(f'transformed_dataset saved!')
    logging.info(f'transformed_dataset.path: {transformed_dataset.path}')
    
    # =========================================================
    #        read and upload files
    # =========================================================
    '''
    nv-tabular creates a txt file with all `gs://` paths
    create a copy that replaces `gs://` with `/gcs/`
    '''
    logging.info('Generating file list for training...')
    
#     # get loca directory
#     # LOCAL_DIRECTORY = os.getcwd()
#     LOCAL_DIRECTORY = '/tmp/directory'
    
#     # _bucket_name='spotify-merlin-v1' # bucket_data_src
#     PREFIX = f'nvt-preprocessing-{app}-{version}/nvt-processed/{split}'
#     FILENAME = '_file_list.txt'
#     SOURCE_BLOB_NAME = f'{PREFIX}/{FILENAME}'
#     logging.info(f'SOURCE_BLOB_NAME: {SOURCE_BLOB_NAME}')
    
#     # LOCAL_DESTINATION_FILENAME = f'{LOCAL_DIRECTORY}/local_file_list.txt'
#     LOCAL_DESTINATION_FILENAME = 'local_file_list.txt'
#     logging.info(f'LOCAL_DESTINATION_FILENAME: {LOCAL_DESTINATION_FILENAME}')
    
#     _read_blob_gcs(
#         bucket_name=bucket_data_output,
#         source_blob_name=f'{SOURCE_BLOB_NAME}', 
#         destination_filename=LOCAL_DESTINATION_FILENAME
#     )
    
#     file_list = os.path.join(transformed_dataset.path, '_file_list.txt')
    
#     # write new '/gcs/' file
#     new_lines = []
#     with open(LOCAL_DESTINATION_FILENAME, 'r') as fp:
#         lines = fp.readlines()
#         new_lines.append(lines[0])
#         for line in lines[1:]:
#             new_lines.append(line.replace('gs://', '/gcs/'))

#     NEW_LOCAL_FILENAME = f'{LOCAL_DIRECTORY}/_gcs_file_list.txt'
#     logging.info(f'NEW_LOCAL_FILENAME: {NEW_LOCAL_FILENAME}')
    
#     with open(NEW_LOCAL_FILENAME, 'w') as fp:
#         fp.writelines(new_lines)
        
#     GCS_URI_DESTINATION = f'{output_path_transformed_dir}/{split}'
#     logging.info(f'GCS_URI_DESTINATION: {GCS_URI_DESTINATION}')
    
#     _upload_blob_gcs(
#         gcs_uri=GCS_URI_DESTINATION, 
#         source_file_name=NEW_LOCAL_FILENAME, 
#         destination_blob_name='_gcs_file_list.txt'
#     )
# logging.info(f'List of /gcs/ file paths uploaded to {GCS_URI_DESTINATION}/_gcs_file_list.txt')

#     file_list_name = '_file_list.txt'
#     file_list_uri = f'{output_path_transformed_dir}/{split}/{file_list_name}'
#     logging.info(f'file_list_uri : {file_list_uri}')

#     new_lines = []
#     with open(file_list_uri, 'r') as fp:
#         lines = fp.readlines()
#         new_lines.append(lines[0])
#         for line in lines[1:]:
#             new_lines.append(line.replace('gs://', '/gcs/'))

#     gcs_file_list_name = '_gcs_file_list.txt'
#     gcs_file_list_uri = f'{output_path_transformed_dir}/{split}/{gcs_file_list_name}'
#     logging.info(f'gcs_file_list_uri : {gcs_file_list_uri}')
    
#     with open(gcs_file_list_uri, 'w') as fp:
#         fp.writelines(new_lines)
    
#     logging.info(f'List of /gcs/ file paths uploaded to {gcs_file_list}')
    
    # =========================================================
    #        Saving cardinalities
    # =========================================================
    logging.info('Saving cardinalities')
    
    cols_schemas = nvt_workflow.output_schema.select_by_tag(Tags.CATEGORICAL)
    cols_names = cols_schemas.column_names

    cards = []
    for c in cols_names:
        col = cols_schemas.get(c)
        cards.append(col.properties['embedding_sizes']['cardinality'])

    transformed_dataset.metadata['cardinalities'] = cards
    # transformed_dataset.metadata['dataset_gcs_uri'] = gcs_file_list

Writing src/process_pipes/pipe_components.py


## preprocessing pipelines

In [11]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PIPELINE_SUB_DIR}/preproc_pipelines.py
"""Preprocessing pipelines."""

from . import pipe_components
from . import config
from kfp.v2 import dsl
import os

GKE_ACCELERATOR_KEY = 'cloud.google.com/gke-accelerator'

# TODO: parametrize and fix config file 
# BUCKET_parquet = 'spotify-builtin-2t'
# BUCKET = 'spotify-merlin-v1'
# VERSION = 'v32-subset'
# APP = 'spotify'
# MODEL_DISPLAY_NAME = f'nvt-preprocessing-{APP}-{VERSION}'
# WORKSPACE = f'gs://{config.BUCKET}/{MODEL_DISPLAY_NAME}'
# PREPROCESS_PARQUET_PIPELINE_NAME = f'nvtabular-parquet-pipeline-{VERSION}'
# PREPROCESS_PARQUET_PIPELINE_ROOT = os.path.join(WORKSPACE, PREPROCESS_PARQUET_PIPELINE_NAME)

@dsl.pipeline(
    name=f'{config.PREPROCESS_PARQUET_PIPELINE_NAME}', 
    pipeline_root=f'{config.PREPROCESS_PARQUET_PIPELINE_ROOT}'
)
def preprocessing_parquet(
    bucket_data_src: str,
    bucket_data_output: str,
    # train_pattern: str,
    # valid_pattern: str,
    train_prefix: str,
    valid_prefix: str,
    file_pattern: str,
    num_output_files_train: int,
    num_output_files_valid: int,
    output_path_defined_dir: str,
    output_path_analyzed_dir: str,
    output_path_transformed_dir: str,
    shuffle: str,
    version: str,
    app: str,
):
    
    '''
    
    Pipeline to preprocess parquet files in GCS.
    
    '''
    
    # =========================================================
    # TODO: extract from BQ to parquet 
    # =========================================================
    
    
    # =========================================================
    #             Convert from parquet to def 
    # =========================================================
    # config.BUCKET_NAME = 'spotify-builtin-2t' # 'spotify-merlin-v1' # TODO: parameterize
    
    parquet_to_def_train = (
        pipe_components.convert_parquet_op(
            bucket_name=bucket_data_src,
            data_prefix=train_prefix,
            # data_dir_pattern=train_pattern,
            split='train',
            num_output_files=num_output_files_train,
            n_workers=int(config.GPU_LIMIT),
            shuffle=shuffle,
            output_path_defined_dir=output_path_defined_dir,
            file_pattern=file_pattern,
        )
    )
    parquet_to_def_train.set_display_name('Convert training split')
    parquet_to_def_train.set_cpu_limit(config.CPU_LIMIT)
    parquet_to_def_train.set_memory_limit(config.MEMORY_LIMIT)
    parquet_to_def_train.set_gpu_limit(config.GPU_LIMIT)
    parquet_to_def_train.add_node_selector_constraint(GKE_ACCELERATOR_KEY, config.GPU_TYPE)
    parquet_to_def_train.set_caching_options(enable_caching=True)
    
    # === Convert eval dataset from CSV to Parquet
    parquet_to_def_valid = (
        pipe_components.convert_parquet_op(
            bucket_name=bucket_data_src,
            data_prefix=valid_prefix,
            # data_dir_pattern=valid_pattern,
            split='valid',
            num_output_files=num_output_files_valid,
            n_workers=int(config.GPU_LIMIT),
            shuffle=shuffle,
            output_path_defined_dir=output_path_defined_dir,
            file_pattern=file_pattern,
        )
    )
    parquet_to_def_valid.set_display_name('Convert validation split')
    parquet_to_def_valid.set_cpu_limit(config.CPU_LIMIT)
    parquet_to_def_valid.set_memory_limit(config.MEMORY_LIMIT)
    parquet_to_def_valid.set_gpu_limit(config.GPU_LIMIT)
    parquet_to_def_valid.add_node_selector_constraint(GKE_ACCELERATOR_KEY, config.GPU_TYPE)
    parquet_to_def_valid.set_caching_options(enable_caching=True)
    
    # =========================================================
    # Analyse train dataset 
    # =========================================================
    
    # === Analyze train data split
    analyze_dataset = (
        pipe_components.analyze_dataset_op(
            # parquet_dataset=config.TRAIN_DIR_PARQUET,
            parquet_dataset=parquet_to_def_train.outputs['output_dataset'],
            n_workers=int(config.GPU_LIMIT),
            output_path_defined_dir=output_path_defined_dir,
            output_path_analyzed_dir=output_path_analyzed_dir
        )
    )
    analyze_dataset.set_display_name('Analyze Dataset')
    analyze_dataset.set_cpu_limit(config.CPU_LIMIT)
    analyze_dataset.set_memory_limit(config.MEMORY_LIMIT)
    analyze_dataset.set_gpu_limit(config.GPU_LIMIT)
    analyze_dataset.add_node_selector_constraint(GKE_ACCELERATOR_KEY, config.GPU_TYPE)
    analyze_dataset.set_caching_options(enable_caching=True)
    
    # =========================================================
    # Transform train split 
    # =========================================================

    # === Transform train data split
    transform_train = (
        pipe_components.transform_dataset_op(
            workflow=analyze_dataset.outputs['workflow'],
            split='train',
            # parquet_dataset=config.TRAIN_DIR_PARQUET,
            parquet_dataset=parquet_to_def_train.outputs['output_dataset'],
            output_path_defined_dir=output_path_defined_dir,
            output_path_transformed_dir=f'{output_path_transformed_dir}',
            output_path_analyzed_dir=output_path_analyzed_dir,
            num_output_files=num_output_files_train,
            n_workers=int(config.GPU_LIMIT),
            version=version,
            bucket_data_src=bucket_data_src,
            bucket_data_output=bucket_data_output,
            app=app,
        )
    )
    transform_train.set_display_name('Transform train split')
    transform_train.set_cpu_limit(config.CPU_LIMIT)
    transform_train.set_memory_limit(config.MEMORY_LIMIT)
    transform_train.set_gpu_limit(config.GPU_LIMIT)
    transform_train.add_node_selector_constraint(GKE_ACCELERATOR_KEY, config.GPU_TYPE)
    transform_train.set_caching_options(enable_caching=True)

    # =========================================================
    #     Transform valid split
    # =========================================================
    
    transform_valid = (
        pipe_components.transform_dataset_op(
            workflow=analyze_dataset.outputs['workflow'],
            split='valid',
            parquet_dataset=parquet_to_def_valid.outputs['output_dataset'],
            output_path_defined_dir=output_path_defined_dir,
            output_path_transformed_dir=f'{output_path_transformed_dir}',
            output_path_analyzed_dir=output_path_analyzed_dir,
            num_output_files=num_output_files_valid,
            n_workers=int(config.GPU_LIMIT),
            version=version,
            bucket_data_src=bucket_data_src,
            bucket_data_output=bucket_data_output,
            app=app,
        )
    )
    transform_valid.set_display_name('Transform valid split')
    transform_valid.set_cpu_limit(config.CPU_LIMIT)
    transform_valid.set_memory_limit(config.MEMORY_LIMIT)
    transform_valid.set_gpu_limit(config.GPU_LIMIT)
    transform_valid.add_node_selector_constraint(GKE_ACCELERATOR_KEY, config.GPU_TYPE)
    transform_valid.set_caching_options(enable_caching=True)

Writing src/process_pipes/preproc_pipelines.py


### Set Pipeline Config

In [12]:
# =============================================
#           storage
# =============================================
BUCKET_source = 'spotify-beam-v3'
BUCKET_destin = 'jt-merlin-scaling'
TRAIN_SRC_DIR = 'train_data_parquet'
VALID_SRC_DIR = 'valid_data_parquet'

print(f"BUCKET_source: {BUCKET_source}")
print(f"BUCKET_destin: {BUCKET_destin}")
print(f"TRAIN_SRC_DIR: {TRAIN_SRC_DIR}")
print(f"VALID_SRC_DIR: {VALID_SRC_DIR}\n")

# =============================================
#           accelerators
# =============================================
# Instance configuration
# GPU_LIMIT = '4'
# GPU_TYPE = 'NVIDIA_TESLA_T4'
# CPU_LIMIT = '64'
# MEMORY_LIMIT = '624G'
# INSTANCE_TYPE = "n1-highmem-64"

# Instance configuration
GPU_LIMIT = '2'                   # 1
GPU_TYPE = 'NVIDIA_TESLA_A100'
CPU_LIMIT = '24'                  # '64' '96'
MEMORY_LIMIT = '170G'              #'624G' | 680
INSTANCE_TYPE = "a2-highgpu-2g"

print(f"GPU_LIMIT: {GPU_LIMIT}")
print(f"GPU_TYPE: {GPU_TYPE}")
print(f"CPU_LIMIT: {CPU_LIMIT}")
print(f"MEMORY_LIMIT: {MEMORY_LIMIT}")
print(f"INSTANCE_TYPE: {INSTANCE_TYPE}\n")

# =============================================
#           pipelines
# =============================================
VERSION = 'latest-12'
APP = 'spotify'
MODEL_DISPLAY_NAME = f'nvt-last5-{VERSION}'
WORKSPACE = f'gs://{BUCKET_destin}/{MODEL_DISPLAY_NAME}'
# Pipeline definitions
PREPROCESS_PARQUET_PIPELINE_NAME = f'nvt-parquet-{VERSION}'
PREPROCESS_PARQUET_PIPELINE_ROOT = os.path.join(WORKSPACE, PREPROCESS_PARQUET_PIPELINE_NAME)

print(f"VERSION: {VERSION}")
print(f"APP: {APP}")
print(f"MODEL_DISPLAY_NAME: {MODEL_DISPLAY_NAME}")
print(f"WORKSPACE: {WORKSPACE}")
print(f"PREPROCESS_PARQUET_PIPELINE_NAME: {PREPROCESS_PARQUET_PIPELINE_NAME}")
print(f"PREPROCESS_PARQUET_PIPELINE_ROOT: {PREPROCESS_PARQUET_PIPELINE_ROOT}\n")

# =============================================
#           custom image
# =============================================
# Docker definitions
IMAGE_NAME = 'nvt-preprocessing'
IMAGE_URI = f'gcr.io/{PROJECT_ID}/{IMAGE_NAME}'
DOCKERNAME = f'nvt' # 150

print(f"IMAGE_NAME: {IMAGE_NAME}")
print(f"IMAGE_URI: {IMAGE_URI}")
print(f"DOCKERNAME: {DOCKERNAME}\n")

BUCKET_source: spotify-beam-v3
BUCKET_destin: jt-merlin-scaling
TRAIN_SRC_DIR: train_data_parquet
VALID_SRC_DIR: valid_data_parquet

GPU_LIMIT: 2
GPU_TYPE: NVIDIA_TESLA_A100
CPU_LIMIT: 24
MEMORY_LIMIT: 170G
INSTANCE_TYPE: a2-highgpu-2g

VERSION: latest-12
APP: spotify
MODEL_DISPLAY_NAME: nvt-last5-latest-12
WORKSPACE: gs://jt-merlin-scaling/nvt-last5-latest-12
PREPROCESS_PARQUET_PIPELINE_NAME: nvt-parquet-latest-12
PREPROCESS_PARQUET_PIPELINE_ROOT: gs://jt-merlin-scaling/nvt-last5-latest-12/nvt-parquet-latest-12

IMAGE_NAME: nvt-preprocessing
IMAGE_URI: gcr.io/hybrid-vertex/nvt-preprocessing
DOCKERNAME: nvt



In [13]:
# os.chdir('/home/jupyter/spotify-merlin')
# os.getcwd()

In [14]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/{PIPELINE_SUB_DIR}/config.py

import os

# =============================================
#           Cloud Storage Directorires
# =============================================
BUCKET_source = 'spotify-beam-v3'
BUCKET_destin = 'jt-merlin-scaling'
TRAIN_SRC_DIR = 'train_data_parquet'
VALID_SRC_DIR = 'valid_data_parquet'

# =============================================
#           Setup
# =============================================
VERSION = 'latest-12'
APP = 'spotify'
# MODEL_DISPLAY_NAME = f'nvt-prep-last5-{VERSION}'
# WORKSPACE = f'gs://{BUCKET_destin}/{MODEL_DISPLAY_NAME}'
PROJECT_ID = "hybrid-vertex"
REGION = "us-central1"
VERTEX_SA = f"vertex-sa@{PROJECT_ID}.iam.gserviceaccount.com"

# =============================================
#           Artifacts
# =============================================
# MODEL_DISPLAY_NAME = f"nvt-last5-{VERSION}"
# WORKSPACE = f"gs://jt-merlin-scaling/nvt-last5-{VERSION}"
MODEL_DISPLAY_NAME = f'nvt-last5-{VERSION}'
WORKSPACE = f'gs://{BUCKET_destin}/{MODEL_DISPLAY_NAME}'
NVT_IMAGE_URI = "gcr.io/hybrid-vertex/nvt-preprocessing"

# =============================================
#           Pipeline Configs
# =============================================
# PREPROCESS_PARQUET_PIPELINE_NAME = f"nvt-parquet-{VERSION}"
# PREPROCESS_PARQUET_PIPELINE_ROOT = f"gs://jt-merlin-scaling/{MODEL_DISPLAY_NAME}/{PREPROCESS_PARQUET_PIPELINE_NAME}"
PREPROCESS_PARQUET_PIPELINE_NAME = f'nvt-parquet-{VERSION}'
PREPROCESS_PARQUET_PIPELINE_ROOT = os.path.join(WORKSPACE, PREPROCESS_PARQUET_PIPELINE_NAME)

# INSTANCE_TYPE = os.getenv("INSTANCE_TYPE", "n1-highmem-64")
# CPU_LIMIT = os.getenv("CPU_LIMIT", "64")
# MEMORY_LIMIT = os.getenv("MEMORY_LIMIT", "624G")
# GPU_LIMIT = os.getenv("GPU_LIMIT", "4")
# GPU_TYPE = os.getenv("GPU_TYPE", "NVIDIA_TESLA_T4")

INSTANCE_TYPE = os.getenv("INSTANCE_TYPE", "a2-highgpu-2g")
CPU_LIMIT = os.getenv("CPU_LIMIT", "24")
MEMORY_LIMIT = os.getenv("MEMORY_LIMIT", "170G")
GPU_LIMIT = os.getenv("GPU_LIMIT", "2")
GPU_TYPE = os.getenv("GPU_TYPE", "NVIDIA_TESLA_A100")

Writing src/process_pipes/config.py


### check config file

In [15]:
from src.process_pipes import config
import importlib
importlib.reload(config)

for key, value in config.__dict__.items():
    if key.isupper(): print(f'{key}: {value}')

TRAIN_SRC_DIR: train_data_parquet
VALID_SRC_DIR: valid_data_parquet
VERSION: latest-12
APP: spotify
PROJECT_ID: hybrid-vertex
REGION: us-central1
VERTEX_SA: vertex-sa@hybrid-vertex.iam.gserviceaccount.com
MODEL_DISPLAY_NAME: nvt-last5-latest-12
WORKSPACE: gs://jt-merlin-scaling/nvt-last5-latest-12
NVT_IMAGE_URI: gcr.io/hybrid-vertex/nvt-preprocessing
PREPROCESS_PARQUET_PIPELINE_NAME: nvt-parquet-latest-12
PREPROCESS_PARQUET_PIPELINE_ROOT: gs://jt-merlin-scaling/nvt-last5-latest-12/nvt-parquet-latest-12
INSTANCE_TYPE: a2-highgpu-2g
CPU_LIMIT: 24
MEMORY_LIMIT: 170G
GPU_LIMIT: 2
GPU_TYPE: NVIDIA_TESLA_A100


## Build Custom Image

In [16]:
%%writefile {REPO_DOCKER_PATH_PREFIX}/Dockerfile.{DOCKERNAME}

FROM nvcr.io/nvidia/merlin/merlin-tensorflow:22.12

WORKDIR /src

RUN pip install -U pip
# RUN pip install git+https://github.com/NVIDIA-Merlin/models.git
RUN pip install google-cloud-bigquery gcsfs
RUN pip install google-cloud-aiplatform[cloud_profiler] kfp nvtabular
RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key --keyring /usr/share/keyrings/cloud.google.gpg  add - && apt-get update -y && apt-get install google-cloud-sdk -y

COPY preprocessor/* ./

ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/hugectr/lib:/usr/local/cuda/compat/lib:/usr/local/nvidia/lib:/usr/local/nvidia/lib64:/usr/local/cuda/lib64:/usr/local/cuda/extras/CUPTI/lib64:/usr/local/lib:/repos/dist/lib:/opt/tritonserver/lib

Writing src/Dockerfile.nvt


In [17]:
# Initialize Vertex AI API
vertex_ai.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=os.path.join(WORKSPACE, 'stg') 
)

MACHINE_TYPE ='e2-highcpu-32'

In [18]:
# os.chdir('/home/jupyter/spotify-merlin')
# os.getcwd()

In [19]:
FILE_LOCATION = './src'
! gcloud builds submit --config src/cloudbuild.yaml --substitutions _DOCKERNAME=$DOCKERNAME,_IMAGE_URI=$IMAGE_URI,_FILE_LOCATION=$FILE_LOCATION --timeout=2h --machine-type=$MACHINE_TYPE

Creating temporary tarball archive of 63 file(s) totalling 1.7 MiB before compression.
Some files were not included in the source upload.

Check the gcloud log [/home/jupyter/.config/gcloud/logs/2023.02.23/18.58.09.022182.log] to see which files and the contents of the
default gcloudignore file used (see `$ gcloud topic gcloudignore` to learn
more).

Uploading tarball of [.] to [gs://hybrid-vertex_cloudbuild/source/1677178689.122931-d5f718771d004ab49024350136d0b8dd.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/hybrid-vertex/locations/global/builds/c658b13d-6b5b-4d7c-8e8c-967c099f607b].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds/c658b13d-6b5b-4d7c-8e8c-967c099f607b?project=934903580331 ].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "c658b13d-6b5b-4d7c-8e8c-967c099f607b"

FETCHSOURCE
Fetching storage object: gs://hybrid-vertex_cloudbuild/source/1677178689.122931-d5f718771d004ab49024350136d0

# Vertex Pipeline

### labels

In [20]:
LABELS = {
    'version': f'{VERSION}',
    'gpu_type': f'{GPU_TYPE.lower()}',
    'gpu_limit': f'{GPU_LIMIT}',
    'memory_limit': f'{MEMORY_LIMIT.lower()}',
    'instance_type': f'{INSTANCE_TYPE}',
}
pprint(LABELS)

{'gpu_limit': '2',
 'gpu_type': 'nvidia_tesla_a100',
 'instance_type': 'a2-highgpu-2g',
 'memory_limit': '170g',
 'version': 'latest-12'}


## define pipe params

In [21]:
# from google.cloud import storage

# storage_client = storage.Client()

TRAIN_PREFIX = 'train_data_parquet'
VALID_PREFIX = 'valid_data_parquet'

delimiter = '/'
# FILE_PATTERN = "*.parquet"                    # full
FILE_PATTERN = '*.snappy.parquet'    # subset

# trying to achieve avg file size of ~100 mb
num_output_files_train = 100 #0 # Number of output Parquet files
num_output_files_valid = 10 #2 # Number of output Parquet files

# Define output directories
OUTPUT_DEFINED_DIR = os.path.join(WORKSPACE, "nvt-defined")
OUTPUT_WORKFLOW_DIR = os.path.join(WORKSPACE, "nvt-analyzed")
OUTPUT_TRANSFORMED_DIR = os.path.join(WORKSPACE, "nvt-processed")


parq_parameter_values = {
    'bucket_data_src': BUCKET_source,
    'bucket_data_output': BUCKET_destin,
    'train_prefix': f'{TRAIN_PREFIX}',
    'valid_prefix': f'{VALID_PREFIX}',
    'file_pattern': f'{FILE_PATTERN}',
    'num_output_files_train': num_output_files_train,
    'num_output_files_valid': num_output_files_valid,
    'output_path_defined_dir': f'{OUTPUT_DEFINED_DIR}',
    'output_path_analyzed_dir': f'{OUTPUT_WORKFLOW_DIR}',
    'output_path_transformed_dir': f'{OUTPUT_TRANSFORMED_DIR}',
    'version':f'{VERSION}',
    'shuffle': json.dumps(None), # select PER_PARTITION, PER_WORKER, FULL, or None.
    'app':f'{APP}',
}

pprint(parq_parameter_values)

{'app': 'spotify',
 'bucket_data_output': 'jt-merlin-scaling',
 'bucket_data_src': 'spotify-beam-v3',
 'file_pattern': '*.snappy.parquet',
 'num_output_files_train': 100,
 'num_output_files_valid': 10,
 'output_path_analyzed_dir': 'gs://jt-merlin-scaling/nvt-last5-latest-12/nvt-analyzed',
 'output_path_defined_dir': 'gs://jt-merlin-scaling/nvt-last5-latest-12/nvt-defined',
 'output_path_transformed_dir': 'gs://jt-merlin-scaling/nvt-last5-latest-12/nvt-processed',
 'shuffle': 'null',
 'train_prefix': 'train_data_parquet',
 'valid_prefix': 'valid_data_parquet',
 'version': 'latest-12'}


## compile pipeline

In [22]:
# #list the current work dir
# os.chdir('/home/jupyter/spotify-merlin/src')
# os.getcwd()

In [23]:
!ls

01-data-preprocess-pipeline.ipynb  custom_container_pipeline_spec.json
02-merlin-vertex-training.ipynb    imgs
03-query-model-inference.ipynb	   nvt-parquet-full-1a100.json
04-train-deploy-pipeline.ipynb	   nvt-parquet-full-2a100.json
05-recs-for-your-spotify.ipynb	   nvt-parquet-full-4t4.json
README.md			   src
archive


In [24]:
from src.process_pipes.preproc_pipelines import preprocessing_parquet

_compiled_pipeline_path = f'{PREPROCESS_PARQUET_PIPELINE_NAME}.json'

compiler.Compiler().compile(
       pipeline_func=preprocessing_parquet,
       package_path=_compiled_pipeline_path
)



## submit pipeline to Vertex

In [25]:
PREPROCESS_PARQUET_PIPELINE_NAME

'nvt-parquet-latest-12'

In [26]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

job_name = f'{PREPROCESS_PARQUET_PIPELINE_NAME}_{TIMESTAMP}' #{TIMESTAMP}'

pipeline_job = vertex_ai.PipelineJob(
    display_name=job_name,
    template_path=_compiled_pipeline_path,
    enable_caching=False,
    parameter_values=parq_parameter_values,
    labels=LABELS,
)

pipeline_job.submit(service_account=VERTEX_SA)

# Notes

### Define the NVTabular preprocessing graph

```
item_id = ["track_uri_can"] >> Categorify(dtype="int32") >> TagAsItemID() 
playlist_id = ["pid"] >> Categorify(dtype="int32") >> TagAsUserID() 


item_features_cat = ['artist_name_can',
        'track_name_can',
        'artist_genres_can',
    ]

item_features_cont = [
        'duration_ms_can',
        'track_pop_can',
        'artist_pop_can',
        'artist_followers_can',
    ]

playlist_features_cat = [
        'description_pl',
        'name',
        'collaborative',
    ]

playlist_features_cont = [
        'duration_ms_seed_pl',
        'n_songs_pl',
        'num_artists_pl',
        'num_albums_pl',
    ]

seq_feats_cat = [
        'artist_name_pl',
        'track_uri_pl',
        'track_name_pl',
        'album_name_pl',
        'artist_genres_pl',
    ]

CAT = playlist_features_cat + item_features_cat
CONT = item_features_cont + playlist_features_cont

item_feature_cat_node = item_features_cat >> nvt.ops.FillMissing()>> Categorify(dtype="int32") >> TagAsItemFeatures()

item_feature_cont_node =  item_features_cont >> nvt.ops.FillMissing() >>  nvt.ops.Normalize() >> TagAsItemFeatures()

playlist_feature_cat_node = playlist_features_cat >> nvt.ops.FillMissing() >> Categorify(dtype="int32") >> TagAsUserFeatures() 

playlist_feature_cont_node = playlist_features_cont >> nvt.ops.FillMissing() >>  nvt.ops.Normalize() >> TagAsUserFeatures()

playlist_feature_cat_seq_node = seq_feats_cat >> nvt.ops.FillMissing() >> Categorify(dtype="int32") >> TagAsUserFeatures() 

```