# 02 - Experimentation - Local

This notebook covers the following steps:

1. Preparing the data using `NVTabular`.
2. Train and evaluate the `TensorFlow` model.
3. Export a `TensorFlow` model.

## Setup

In [None]:
%env PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
%env TF_MEMORY_ALLOCATION=0.7

In [None]:
import os
import logging
from datetime import datetime

import nvtabular as nvt
import tensorflow as tf

from src.common import features, utils
from src.data_preprocessing import etl
from src.model_training import trainer

logging.getLogger().setLevel(logging.INFO)
tf.get_logger().setLevel('INFO')

from google.protobuf.internal import api_implementation
print("protobuf implementation type:", api_implementation.Type())
print("TensorFlow:", tf.__version__)

In [None]:
PROJECT = 'merlin-on-gcp'
REGION = 'us-central1'
BUCKET = 'merlin-on-gcp'

MOVIES_CSV_DATASET_LOCATION = f"gs://{BUCKET}/movielens25m/dataset/movies.csv"
RATINGS_CSV_DATASET_LOCATION = f"gs://{BUCKET}/movielens25m/dataset/ratings.csv"

MODEL_DISPLAY_NAME = f'movielens25m-recommender'

LOCAL_WORKSPACE = '_workspace'
WORKSPACE = f"gs://{BUCKET}/movielens25m"
EXPERIMENT_ARTIFACTS_DIR = os.path.join(WORKSPACE, 'experiments')

TENSORBOARD_DISPLAY_NAME = f'tb-{PROJECT}'
EXPERIMENT_NAME = f'{MODEL_DISPLAY_NAME}-experiment'

## Initialize Experiment

In [None]:
REMOVE_EXPERIMENT_ARTIFACTS = False
if tf.io.gfile.exists(EXPERIMENT_ARTIFACTS_DIR) and REMOVE_EXPERIMENT_ARTIFACTS:
    print("Removing previous experiment artifacts...")
    tf.io.gfile.rmtree(EXPERIMENT_ARTIFACTS_DIR)

if not tf.io.gfile.exists(EXPERIMENT_ARTIFACTS_DIR):
    print("Creating new experiment artifacts directory...")
    tf.io.gfile.mkdir(EXPERIMENT_ARTIFACTS_DIR)

print("Preparing local workspace...")
if tf.io.gfile.exists(LOCAL_WORKSPACE):
    tf.io.gfile.rmtree(LOCAL_WORKSPACE)
tf.io.gfile.mkdir(LOCAL_WORKSPACE)
    
print("Workspace is ready.")

run_id = f"run-local-{datetime.now().strftime('%Y%m%d%H%M%S')}"
EXPERIMENT_RUN_DIR = os.path.join(EXPERIMENT_ARTIFACTS_DIR, EXPERIMENT_NAME, run_id)
print("Experiment run directory:", EXPERIMENT_RUN_DIR)

## 1. Preparing the data using NVTabular

In [None]:
ETL_OUTPUT_DIR = os.path.join(EXPERIMENT_RUN_DIR, 'etl_output')

In [None]:
transformed_train_dataset, transformed_test_dataset, transform_workflow = etl.run_etl( 
    MOVIES_CSV_DATASET_LOCATION, 
    RATINGS_CSV_DATASET_LOCATION)

In [None]:
transformed_test_dataset_dir = os.path.join(ETL_OUTPUT_DIR, "transformed_data/test")
transformed_train_dataset_dir = os.path.join(ETL_OUTPUT_DIR, "transformed_data/train")
local_transform_workflow_dir = os.path.join(LOCAL_WORKSPACE, 'transform_workflow')

print(f"Writting transformed training data to {transformed_train_dataset_dir}")
transformed_train_dataset.to_parquet(
    output_path=transformed_train_dataset_dir,
    shuffle=nvt.io.Shuffle.PER_PARTITION,
    cats=features.CATEGORICAL_FEATURE_NAMES,
    labels=features.TARGET_FEATURE_NAME,
    dtypes=features.get_dtype_dict(),
)
print("Train data parquet files are written.")

print(f"Writting transformed training data to {transformed_test_dataset_dir}")
transformed_test_dataset.to_parquet(
    output_path=transformed_test_dataset_dir,
    shuffle=False,
    cats=features.CATEGORICAL_FEATURE_NAMES,
    labels=features.TARGET_FEATURE_NAME,
    dtypes=features.get_dtype_dict(),
)
print("Test data parquet files are written.")

logging.info("Saving transformation workflow...")
transform_workflow.save(local_transform_workflow_dir)
logging.info("Transformation workflow is saved.")

print("Uploading trandorm workflow to Cloud Storage...")
utils.upload_directory(
    local_transform_workflow_dir, 
    os.path.join(ETL_OUTPUT_DIR, 'transform_workflow')
)
try:
    tf.io.gfile.rmtree(local_transform_workflow_dir)
    tf.io.gfile.rmtree("categories")
except: pass
print("Transformation uploaded to Cloud Storage.")

In [None]:
import gc
del transformed_train_dataset, transformed_test_dataset, transform_workflow
gc.collect()

In [None]:
!gsutil ls {ETL_OUTPUT_DIR}

## 2. Train a TensorFlow model

In [None]:
EXPORT_DIR = os.path.join(EXPERIMENT_RUN_DIR, 'model')

In [None]:
LOCAL_DATA_DIR = os.path.join(LOCAL_WORKSPACE, 'data')
LOCAL_TRAIN_DATA_DIR = os.path.join(LOCAL_DATA_DIR, 'train')
LOCAL_TEST_DATA_DIR = os.path.join(LOCAL_DATA_DIR, 'test')
LOCAL_MODEL_DIR = os.path.join(LOCAL_WORKSPACE, 'exported_model')

tf.io.gfile.mkdir(LOCAL_DATA_DIR)
tf.io.gfile.mkdir(LOCAL_TRAIN_DATA_DIR)
tf.io.gfile.mkdir(LOCAL_TEST_DATA_DIR)
tf.io.gfile.mkdir(LOCAL_MODEL_DIR)

### Prepare experiment parameters

In [None]:
hyperparams = {
    'learning_rate': 0.001,
    'batch_size': 1024 * 32,
    'hidden_units': [128, 128],
    'num_epochs': 1
}

### Download the data locally

In [None]:
utils.copy_files(os.path.join(ETL_OUTPUT_DIR, 'transformed_data', 'train', '*.parquet'), LOCAL_TRAIN_DATA_DIR)
utils.copy_files(os.path.join(ETL_OUTPUT_DIR, 'transformed_data', 'test', '*.parquet'), LOCAL_TEST_DATA_DIR)
utils.download_directory(os.path.join(ETL_OUTPUT_DIR, 'transform_workflow'), LOCAL_WORKSPACE)
print("Transformed data and transform workflow are downloaded.")

### Train the model

In [None]:
nvt_workflow = nvt.Workflow.load(os.path.join(LOCAL_WORKSPACE, 'transform_workflow'))

In [None]:
recommendation_model = trainer.train(
    train_data_file_pattern=os.path.join(LOCAL_TRAIN_DATA_DIR, '*.parquet'),
    nvt_workflow=nvt_workflow,
    hyperparams=hyperparams
)

### Evaluate the model

In [None]:
eval_loss, eval_mse = trainer.evaluate(
    recommendation_model,
    eval_data_file_pattern=os.path.join(LOCAL_TEST_DATA_DIR, '*.parquet'),
    hyperparams=hyperparams
)

### Export the model

In [None]:
trainer.export(
    recommendation_model=recommendation_model,
    nvt_workflow=nvt_workflow,
    model_name=MODEL_DISPLAY_NAME,
    export_dir=LOCAL_MODEL_DIR
)

In [None]:
utils.upload_directory(LOCAL_MODEL_DIR, EXPORT_DIR)

In [None]:
!gsutil ls {EXPORT_DIR}