# Preprocessing

> Convert BQ table to parquet files
> orchestrate job with `Vertex Pipelines`

Data originally converted to parquet using the job config below:

```
BUCKET = 'gs://spotify-builtin-2t'
PROJECT = 'hybrid-vertex'
DATASET_ID = 'spotify_train_3'
TABLE = 'train_flatten'
TABLE_SMALL = 'train_json_export_table_small'
LOCATION = 'us-central1'

from google.cloud import bigquery
client = bigquery.Client()

destination_uri = f"{BUCKET}/train_data_parquet/*.snappy.parquet"
dataset_ref = bigquery.DatasetReference(PROJECT, DATASET_ID)
table_ref = dataset_ref.table(TABLE)
job_config = bigquery.job.ExtractJobConfig()
job_config.destination_format = bigquery.DestinationFormat.PARQUET
extract_job = client.extract_table(
    table_ref,
    destination_uri,
    job_config=job_config,
    # Location must match that of the source table.
    location=LOCATION,
)  # API request
extract_job.result()  # Waits for job to complete.
```

## Setup

### pip

In [2]:
!pip install google-cloud-aiplatform 
!pip install google-cloud-pipeline-components 
!pip install google-cloud-bigquery-storage 
!pip install kfp

Collecting protobuf<4.0.0dev,>=3.19.0
  Downloading protobuf-3.20.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m17.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Installing collected packages: protobuf
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
tfx-bsl 1.9.0 requires google-api-python-client<2,>=1.7.11, but you have google-api-python-client 2.52.0 which is incompatible.
tfx-bsl 1.9.0 requires pyarrow<6,>=1, but you have pyarrow 8.0.0 which is incompatible.
tensorflow 2.9.0rc2 requires tensorboard<2.10,>=2.9, but you have tensorboard 2.8.0 which is incompatible.
tensorflow-transform 1.9.0 requires pyarrow<6,>=1, but you have pyarrow 8.0.0 which is incompatible.
tensorflow-serving-api 2.9.0 requires tensorflow<3,>=2.9.0, but you have tensorflow 2.9.

### import packages

In [1]:
import os
import json
from datetime import datetime
from google.cloud import aiplatform as vertex_ai
from kfp.v2 import compiler

In [2]:
# TODO: Project definitions
PROJECT_ID = 'hybrid-vertex' # Change to your project ID.
REGION = 'us-central1' # Change to your region.

# TODO: Service Account address
VERTEX_SA = '934903580331-compute@developer.gserviceaccount.com' # Change to your service account with Vertex AI Admin permitions.

# TODO: define GCS Bucket
BUCKET_parquet = 'spotify-builtin-2t'
BUCKET = 'spotify-merlin-v1'

## Define preprocess pipeline

In [3]:
# Bucket definitions
VERSION = 'v09-subset'
APP = 'spotify'
MODEL_DISPLAY_NAME = f'nvt-preprocessing-{APP}-{VERSION}'
WORKSPACE = f'gs://{BUCKET}/{MODEL_DISPLAY_NAME}'

# Docker definitions
IMAGE_NAME = 'nvt-preprocessing'
IMAGE_URI = f'gcr.io/{PROJECT_ID}/{IMAGE_NAME}'
DOCKERNAME = 'nvtabular'

# Pipeline definitions
PREPROCESS_PARQUET_PIPELINE_NAME = 'nvt-parquet-pipeline'
PREPROCESS_PARQUET_PIPELINE_ROOT = os.path.join(WORKSPACE, PREPROCESS_PARQUET_PIPELINE_NAME)

# Instance configuration
GPU_LIMIT = '4'
GPU_TYPE = 'NVIDIA_TESLA_T4'
CPU_LIMIT = '64'
MEMORY_LIMIT = '416'
INSTANCE_TYPE = "n1-highmem-64"

In [4]:
# !gsutil cp 'spotify-merlin/src/cloudbuild.yaml' gs://spotify-merlin-v1/cloudbuild

In [5]:
os.environ['PROJECT_ID'] = PROJECT_ID
os.environ['REGION'] = REGION
os.environ['BUCKET'] = BUCKET
os.environ['WORKSPACE'] = WORKSPACE

os.environ['NVT_IMAGE_URI'] = IMAGE_URI
os.environ['PREPROCESS_PARQUET_PIPELINE_NAME'] = PREPROCESS_PARQUET_PIPELINE_NAME
os.environ['PREPROCESS_PARQUET_PIPELINE_ROOT'] = PREPROCESS_PARQUET_PIPELINE_ROOT
os.environ['DOCKERNAME'] = DOCKERNAME

os.environ['GPU_LIMIT'] = GPU_LIMIT
os.environ['GPU_TYPE'] = GPU_TYPE
os.environ['CPU_LIMIT'] = CPU_LIMIT
os.environ['MEMORY_LIMIT'] = MEMORY_LIMIT
os.environ['INSTANCE_TYPE'] = INSTANCE_TYPE

In [6]:
GPU_TYPE

'NVIDIA_TESLA_T4'

In [7]:
# Initialize Vertex AI API
vertex_ai.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=os.path.join(WORKSPACE, 'stg') 
)

In [8]:
os.chdir('/home/jupyter/spotify-merlin')
os.getcwd()

'/home/jupyter/spotify-merlin'

In [9]:
FILE_LOCATION = './src'
! gcloud builds submit --config src/cloudbuild.yaml --substitutions _DOCKERNAME=$DOCKERNAME,_IMAGE_URI=$IMAGE_URI,_FILE_LOCATION=$FILE_LOCATION --timeout=2h --machine-type=e2-highcpu-8

Creating temporary tarball archive of 30 file(s) totalling 362.7 KiB before compression.
Uploading tarball of [.] to [gs://hybrid-vertex_cloudbuild/source/1658343241.913822-e1827ea02e0d4ae8a12c1e9c4bf3bb55.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/hybrid-vertex/locations/global/builds/fb14ce67-d503-4880-a601-b4d21f182269].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/fb14ce67-d503-4880-a601-b4d21f182269?project=934903580331].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "fb14ce67-d503-4880-a601-b4d21f182269"

FETCHSOURCE
Fetching storage object: gs://hybrid-vertex_cloudbuild/source/1658343241.913822-e1827ea02e0d4ae8a12c1e9c4bf3bb55.tgz#1658343242206232
Copying gs://hybrid-vertex_cloudbuild/source/1658343241.913822-e1827ea02e0d4ae8a12c1e9c4bf3bb55.tgz#1658343242206232...
/ [1 files][ 55.3 KiB/ 55.3 KiB]                                                
Operation completed over 1 objects/55.3

# Parquet Preprocessing Pipeline

### Inputs

In [10]:
# Subset
# TRAIN_DIR_PARQUET = f"{BUCKET_parquet}/train_data_parquet/0000000000**.snappy.parquet"
# VALID_DIR_PARQUET = f"{BUCKET_parquet}/validation_data_parquet/00000000000*.snappy.parquet"

# full dataset
# TRAIN_DIR_PARQUET = f"{BUCKET_parquet}/train_data_parquet/*.snappy.parquet"
# VALID_DIR_PARQUET = f"{BUCKET_parquet}/validation_data_parquet/*.snappy.parquet"

# MAX_PADDING = 375

In [10]:
from google.cloud import storage

storage_client = storage.Client()

delimiter = '/'

train_prefix = 'train_data_parquet/'
train_files = []

train_blobs = storage_client.list_blobs(BUCKET_parquet, prefix=train_prefix, delimiter=delimiter)
for blob in train_blobs:
    train_files.append(f'gs://{BUCKET_parquet}/{blob.name}')
    
valid_prefix = 'validation_data_parquet/'
valid_files = []

valid_blobs = storage_client.list_blobs(BUCKET_parquet, prefix=valid_prefix, delimiter=delimiter)
for blob in valid_blobs:
    valid_files.append(f'gs://{BUCKET_parquet}/{blob.name}')
    
len(valid_files)

152

In [11]:
# train_files[:30]

### Outputs

In [12]:
WORKSPACE

'gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset'

In [13]:
OUTPUT_DEFINED_DIR = os.path.join(WORKSPACE, "nvt-defined")

OUTPUT_WORKFLOW_DIR = os.path.join(WORKSPACE, "nvt-analyzed")

OUTPUT_TRANSFORMED_DIR = os.path.join(WORKSPACE, "nvt-processed")


# OUTPUT_TRAIN_DIR = os.path.join(OUTPUT_PATH_DIR, 'train/')
# OUTPUT_VALID_DIR = os.path.join(OUTPUT_PATH_DIR, 'valid/')
# OUTPUT_WORKFLOW_DIR = os.path.join(OUTPUT_PATH_DIR, 'workflow/')


print(f"OUTPUT_DEFINED_DIR: {OUTPUT_DEFINED_DIR}\nOUTPUT_WORKFLOW_DIR: {OUTPUT_WORKFLOW_DIR}\nOUTPUT_TRANSFORMED_DIR: {OUTPUT_TRANSFORMED_DIR}")

OUTPUT_DEFINED_DIR: gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-defined
OUTPUT_WORKFLOW_DIR: gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-analyzed
OUTPUT_TRANSFORMED_DIR: gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-processed


## TODO: Create pipeline parameters

In [14]:
# Training files
TRAIN_PATHS = train_files[:99] # Parquet files from BQ extract
# Validation files
VALID_PATHS = valid_files[:9] # Parquet files from BQ extract

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

num_output_files_train = 50 # Number of output Parquet files
num_output_files_valid = 10 # Number of output Parquet files

parq_parameter_values = {
    'train_paths': json.dumps(TRAIN_PATHS),
    'valid_paths': json.dumps(VALID_PATHS),
    'num_output_files_train': num_output_files_train,
    'num_output_files_valid': num_output_files_valid,
    'output_path_defined_dir': f'{OUTPUT_DEFINED_DIR}',
    'output_path_analyzed_dir': f'{OUTPUT_WORKFLOW_DIR}',
    'output_path_transformed_dir': f'{OUTPUT_TRANSFORMED_DIR}',
    'shuffle': json.dumps(None) # select PER_PARTITION, PER_WORKER, FULL, or None.
}

In [15]:
from pprint import pprint
# pprint(parq_parameter_values)

print(f"num_output_files_train: {parq_parameter_values['num_output_files_train']}")
print(f"num_output_files_valid: {parq_parameter_values['num_output_files_valid']}")
print(f"output_path_defined_dir: {parq_parameter_values['output_path_defined_dir']}")
print(f"output_path_analyzed_dir: {parq_parameter_values['output_path_analyzed_dir']}")
print(f"output_path_transformed_dir: {parq_parameter_values['output_path_transformed_dir']}")
print(f"shuffle: {parq_parameter_values['shuffle']}")
print(f"train_paths[0]: {parq_parameter_values['train_paths'][:74]} ...]") # json array
print(f"valid_paths[1]: {parq_parameter_values['valid_paths'][:79]} ...]") # json array

num_output_files_train: 50
num_output_files_valid: 10
output_path_defined_dir: gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-defined
output_path_analyzed_dir: gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-analyzed
output_path_transformed_dir: gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-processed
shuffle: null
train_paths[0]: ["gs://spotify-builtin-2t/train_data_parquet/000000000000.snappy.parquet", ...]
valid_paths[1]: ["gs://spotify-builtin-2t/validation_data_parquet/000000000000.snappy.parquet", ...]


## Compile KFP pipeline

In [16]:
#list the current work dir
os.getcwd()

'/home/jupyter/spotify-merlin'

In [17]:
os.chdir('/home/jupyter/spotify-merlin/src')

from pipelines.preprocessing_pipelines import preprocessing_parquet

_compiled_pipeline_path = f'{PREPROCESS_PARQUET_PIPELINE_NAME}.json'

compiler.Compiler().compile(
       pipeline_func=preprocessing_parquet,
       package_path=_compiled_pipeline_path
)



## Submit pipeline to Vertex AI

In [18]:
job_name = f'{PREPROCESS_PARQUET_PIPELINE_NAME}_{TIMESTAMP}' #{TIMESTAMP}'

pipeline_job = vertex_ai.PipelineJob(
    display_name=job_name,
    template_path=_compiled_pipeline_path,
    # enable_caching=True,
    parameter_values=parq_parameter_values,
)

pipeline_job.submit(service_account=VERTEX_SA)

Creating PipelineJob
PipelineJob created. Resource name: projects/934903580331/locations/us-central1/pipelineJobs/nvt-parquet-pipeline-20220720190013
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/934903580331/locations/us-central1/pipelineJobs/nvt-parquet-pipeline-20220720190013')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/nvt-parquet-pipeline-20220720190013?project=934903580331


### Resource: [Caching builds or dockerfiles](https://luis-sena.medium.com/creating-the-perfect-python-dockerfile-51bdec41f1c8)

In [19]:
# logging.info(f'output_path_transformed_dir/split: gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-processed/valid')
file_list = os.path.join(f'gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-processed/valid', f'_file_list.txt')
print(f"file_list: {file_list}")

new_lines = []
with open(file_list, 'r') as fp:
    lines = fp.readlines()
    new_lines.append(lines[0])
    for line in lines[1:]:
        new_lines.append(line.replace('gs://', '/gcs/'))

file_list: gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-processed/valid/_file_list.txt


FileNotFoundError: [Errno 2] No such file or directory: 'gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-processed/valid/_file_list.txt'

In [None]:
gcs_file_list = os.path.join(f'gs://spotify-merlin-v1/nvt-preprocessing-spotify-v09-subset/nvt-processed/valid', f'_gcs_file_list.txt')

with open(gcs_file_list, 'w') as fp:
    fp.writelines(new_lines)