# Training at scale with AI Platform Training Service
Learning Objectives:

1. Learn how to organize your training code into a Python package
2. Train your model using cloud infrastructure via Google Cloud AI Platform Training Service
3. (optional) Learn how to run your training package using Docker containers and push training Docker images on a Docker registry

specify your project name and bucket name in the cell below.

In [1]:
# !sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst

In [2]:
import os

from google.cloud import bigquery

In [3]:
import pandas as pd
import numpy as np
print(np.__version__)

1.19.2


In [4]:
%load_ext google.cloud.bigquery

In [5]:
BUCKET =  "buddie_rec_data"
PROJECT = "buddie-270710"
REGION = "europe-west4"

OUTDIR = "gs://{bucket}/buddieRec/data".format(bucket=BUCKET)

os.environ['BUCKET'] = BUCKET
os.environ['OUTDIR'] = OUTDIR
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION
os.environ['TFVERSION'] = "2.3"

In [6]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


## Create BigQuery tables
If you have not already created a BigQuery dataset for our data, run the following cell:

In [7]:
bq = bigquery.Client(project = PROJECT)
dataset = bigquery.Dataset(bq.dataset("buddieRec"))

try:
    bq.create_dataset(dataset)
    print("Dataset created")
except:
    print("Dataset already exists")

Dataset already exists


In [8]:
%%bigquery

CREATE OR REPLACE TABLE buddieRec.ga_training_data AS

SELECT 
    clientId, 
    timeOnPage,
    (
    SELECT STRING_AGG(part, '/' ORDER BY index) 
    FROM UNNEST(SPLIT(pagePath, '/')) part WITH OFFSET index 
    WHERE index BETWEEN 1 AND 3
  ) contentId,
    organization
FROM (select split(clientId,'.')[offset (0)] as clientId, 
        split(pagePath,'?')[offset(0)] as pagePath,
        timeOnPage,
        split(hostname,'.')[offset(0)] as organization
From `buddie-270710.ga_data.rec_data`)
WHERE ABS(MOD(FARM_FINGERPRINT(`clientId`),10)) < 8 AND RAND() < 1

In [9]:
%%bigquery

CREATE OR REPLACE TABLE buddieRec.ga_valid_data AS

SELECT 
    clientId, 
    timeOnPage,
    (
    SELECT STRING_AGG(part, '/' ORDER BY index) 
    FROM UNNEST(SPLIT(pagePath, '/')) part WITH OFFSET index 
    WHERE index BETWEEN 1 AND 3
  ) contentId,organization
FROM (select split(clientId,'.')[offset (0)] as clientId, 
        split(pagePath,'?')[offset(0)] as pagePath,
        timeOnPage,
        split(hostname,'.')[offset(0)] as organization
From `buddie-270710.ga_data.rec_data`)
WHERE ABS(MOD(FARM_FINGERPRINT(`clientId`),10)) = 8 AND RAND() < 1


In [10]:
%%bigquery

CREATE OR REPLACE TABLE buddieRec.uniqClientIds AS

SELECT 
    distinct(SPLIT(clientId, '.')[OFFSET(1)]) as clientId
FROM `buddie-270710.ga_data.rec_data`
GROUP BY clientId


In [11]:
%%bigquery 

CREATE OR REPLACE TABLE buddieRec.uniqOrganizations AS

SELECT 
    DISTINCT(SPLIT(hostname, '.')[OFFSET(0)])  organization 
FROM `buddie-270710.ga_data.rec_data`
GROUP BY SPLIT(hostname, '.')[OFFSET(0)] 

In [12]:
%%bigquery

CREATE OR REPLACE TABLE buddieRec.uniqContentIds AS

select 
    DISTINCT(SPLIT(pagePath, "?")[ORDINAL(1)]) as contentId
FROM `buddie-270710.ga_data.rec_data`
GROUP BY pagePath

## export the tables as CSV files

In [13]:
%%bash

echo "Deleting current contents of $OUTDIR"
gsutil -m -q rm -rf $OUTDIR

echo "Extracting training data to $OUTDIR"
bq --location=europe-west4 extract \
   --destination_format CSV  \
   --field_delimiter "," --noprint_header \
   buddieRec.ga_training_data \
   $OUTDIR/rec-train-*.csv

echo "Extracting validation data to $OUTDIR"
bq --location=europe-west4 extract \
   --destination_format CSV  \
   --field_delimiter "," --noprint_header \
   buddieRec.ga_valid_data \
   $OUTDIR/rec-valid-*.csv

echo "Extracting unique clientIds to $OUTDIR"
bq --location=europe-west4 extract \
   --destination_format CSV  \
   --field_delimiter "," --noprint_header \
   buddieRec.uniqClientIds \
   $OUTDIR/rec-clientIds.csv


echo "Extracting unique organizations to $OUTDIR"
bq --location=europe-west4 extract \
   --destination_format CSV  \
   --field_delimiter "," --noprint_header \
   buddieRec.uniqOrganizations \
   $OUTDIR/rec-orgIds.csv


echo "Extracting contentIds to $OUTDIR"
bq --location=europe-west4 extract \
   --destination_format CSV  \
   --field_delimiter "," --noprint_header \
   buddieRec.uniqContentIds \
   $OUTDIR/rec-contentIds.csv

gsutil ls -l $OUTDIR

Deleting current contents of gs://buddie_rec_data/buddieRec/data
Extracting training data to gs://buddie_rec_data/buddieRec/data
Extracting validation data to gs://buddie_rec_data/buddieRec/data
Extracting unique clientIds to gs://buddie_rec_data/buddieRec/data
Extracting unique organizations to gs://buddie_rec_data/buddieRec/data
Extracting contentIds to gs://buddie_rec_data/buddieRec/data
      5236  2021-02-07T11:49:18Z  gs://buddie_rec_data/buddieRec/data/rec-clientIds.csv
      1993  2021-02-07T11:49:28Z  gs://buddie_rec_data/buddieRec/data/rec-contentIds.csv
        73  2021-02-07T11:49:24Z  gs://buddie_rec_data/buddieRec/data/rec-orgIds.csv
     44707  2021-02-07T11:49:12Z  gs://buddie_rec_data/buddieRec/data/rec-train-000000000000.csv
      5210  2021-02-07T11:49:14Z  gs://buddie_rec_data/buddieRec/data/rec-valid-000000000000.csv
TOTAL: 5 objects, 57219 bytes (55.88 KiB)


Waiting on bqjob_r53984448f9274e2_000001777c52fcda_1 ... (0s) Current status: RUNNING                                                                                     Waiting on bqjob_r53984448f9274e2_000001777c52fcda_1 ... (1s) Current status: RUNNING                                                                                     Waiting on bqjob_r53984448f9274e2_000001777c52fcda_1 ... (2s) Current status: RUNNING                                                                                     Waiting on bqjob_r53984448f9274e2_000001777c52fcda_1 ... (2s) Current status: DONE   
Waiting on bqjob_r5ea400382eed6b08_000001777c530e94_1 ... (0s) Current status: RUNNING                                                                                      Waiting on bqjob_r5ea400382eed6b08_000001777c530e94_1 ... (0s) Current status: DONE   
Waiting on bqjob_r13a761c9aaa5b08f_000001777c531773_1 ... (0s) Current status: RUNNING                                               

In [14]:
!gsutil cat gs://$BUCKET/buddieRec/data/rec-train-000000000000.csv | head -2

1395906622,1,content/mix/1508,educared-t
1395906622,1,content/mix/1508,educared-t
Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>
BrokenPipeError: [Errno 32] Broken pipe


## Make code compatible with AI Platform Training Service
In order to make our code compatible with AI Platform Training Service we need to make the following changes:

1. Upload data to Google Cloud Storage
2. Move code into a trainer Python package
3. Submit training job with gcloud to train on AI Platform

### Upload data to Google Cloud Storage (GCS)
Cloud services don't have access to our local files, so we need to upload them to a location the Cloud servers can read from. In this case we'll use GCS.

In [15]:
!gsutil ls gs://$BUCKET/buddieRec/data

gs://buddie_rec_data/buddieRec/data/rec-clientIds.csv
gs://buddie_rec_data/buddieRec/data/rec-contentIds.csv
gs://buddie_rec_data/buddieRec/data/rec-orgIds.csv
gs://buddie_rec_data/buddieRec/data/rec-train-000000000000.csv
gs://buddie_rec_data/buddieRec/data/rec-valid-000000000000.csv


### Move code into a python package
The first thing to do is to convert your training code snippets into a regular Python package that we will then pip install into the Docker container.

A Python package is simply a collection of one or more .py files along with an __init__.py file to identify the containing directory as a package. The __init__.py sometimes contains initialization code but for our purposes an empty file suffices.

#### Create the package directory
Our package directory contains 3 files:

In [16]:
ls ./buddieRec/trainer/

__init__.py  [34m__pycache__[m[m/ model.py     task.py      util.py


#### Paste existing code into model.py
A Python package requires our code to be in a .py file, as opposed to notebook cells. So, we simply copy and paste our existing code for the previous notebook into a single file.

In the cell below, we write the contents of the cell into model.py packaging the model we developed in the previous labs so that we can deploy it to AI Platform Training Service.

In [17]:
%%bash
touch ./buddieRec/trainer/__init__.py

In [18]:
%%writefile ./buddieRec/trainer/model.py

"""Defines a Keras model and input function for training."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import shutil
import numpy as np
import tensorflow as tf
import tensorflow_recommenders as tfrs
from typing import Dict, Text

"""## UserModel & ArticleModel"""

class UserModel(tf.keras.Model):

    def __init__(self, uniq_clients, uniq_org):
        super().__init__()

        self.user_embedding = tf.keras.Sequential([
            tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=uniq_clients, mask_token=None),
            tf.keras.layers.Embedding(len(uniq_clients) + 1, 32),
        ])

        self.org_embedding = tf.keras.Sequential([
            tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=uniq_org, mask_token=None),
            tf.keras.layers.Embedding(len(uniq_org) + 1, 32),
        ])

    def call(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:
        return tf.concat([
            self.user_embedding(features["clientId"]),
            self.org_embedding(features["organization"])
        ], axis=1)


class ArticleModel(tf.keras.Model):

    def __init__(self, uniq_content):
        super().__init__()

        max_tokens = 10_000

        self.title_embedding = tf.keras.Sequential([
            tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=uniq_content, mask_token=None),
            tf.keras.layers.Embedding(len(uniq_content) + 1, 32)
        ])

        articles = tf.data.Dataset.from_tensor_slices(uniq_content)
        self.title_vectorizer = tf.keras.layers.experimental.preprocessing.TextVectorization(max_tokens=max_tokens)

        self.title_text_embedding = tf.keras.Sequential([
            self.title_vectorizer,
            tf.keras.layers.Embedding(max_tokens, 32, mask_zero=True),
            tf.keras.layers.GlobalAveragePooling1D(),
        ])

        self.title_vectorizer.adapt(articles)

    def call(self, titles):
        return tf.concat([
            self.title_embedding(titles),
            self.title_text_embedding(titles)
        ], axis=1)


class CandidateModel(tf.keras.Model):
    """Model for encoding articles."""

    def __init__(self, layer_sizes, uniq_content):
        super().__init__()

        self.embedding_model = ArticleModel(uniq_content)
        self.dense_layers = tf.keras.Sequential()

        for layer_size in layer_sizes[:-1]:
            self.dense_layers.add(tf.keras.layers.Dense(layer_size, activation="relu"))
        for layer_size in layer_sizes[-1:]:
            self.dense_layers.add(tf.keras.layers.Dense(layer_size))

    def call(self, inputs):
        feature_embedding = self.embedding_model(inputs)
        return self.dense_layers(feature_embedding)


class QueryModel(tf.keras.Model):
    """Model for encoding user queries."""

    def __init__(self, layer_sizes, uniq_client, uniq_org):
        super().__init__()

        self.embedding_model = UserModel(uniq_client, uniq_org)
        self.dense_layers = tf.keras.Sequential()

        for layer_size in layer_sizes[:-1]:
            self.dense_layers.add(tf.keras.layers.Dense(layer_size, activation="relu"))
        for layer_size in layer_sizes[-1:]:
            self.dense_layers.add(tf.keras.layers.Dense(layer_size))

    def call(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:
        feature_embedding = self.embedding_model(features)
        return self.dense_layers(feature_embedding)


class ArticlelensModel(tfrs.models.Model):

    def __init__(self,
                 rating_weight: float,
                 retrieval_weight: float,
                 layer_sizes,
                 uniq_content,
                 uniq_clients,
                 uniq_org) -> None:
        super().__init__()

        self.query_model = QueryModel(layer_sizes, uniq_clients, uniq_org)  # user model
        self.candidate_model = CandidateModel(layer_sizes, uniq_content)  # article model

        self.rating_model = tf.keras.Sequential([
            #             tf.keras.layers.Dense(64, activation="relu"),
            tf.keras.layers.Dense(1),
        ])

        articles = tf.data.Dataset.from_tensor_slices(uniq_content)
        self.rating_task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
            loss=tf.keras.losses.MeanSquaredError(),
            metrics=[tf.keras.metrics.RootMeanSquaredError()],
        )
        self.retrieval_task: tf.keras.layers.Layer = tfrs.tasks.Retrieval(
            metrics=tfrs.metrics.FactorizedTopK(candidates=articles.batch(256).map(self.candidate_model))
        )

        self.rating_weight = rating_weight
        self.retrieval_weight = retrieval_weight

    def call(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:
        user_embeddings = self.query_model({
            "clientId": features["clientId"],
            "organization": features["organization"],
        })
        article_embeddings = self.candidate_model(features["contentId"])

        return (
            user_embeddings,
            article_embeddings,
            self.rating_model(tf.concat([user_embeddings, article_embeddings], axis=1)),
        )

    def compute_loss(self, data, training=False) -> tf.Tensor:
        # data structure is that (features, label)
        user_embeddings, article_embeddings, rating_predictions = self(data[0])

        rating_loss = self.rating_task(
            labels=data[1],
            predictions=rating_predictions,
        )
        retrieval_loss = self.retrieval_task(user_embeddings, article_embeddings)

        return (self.rating_weight * rating_loss + self.retrieval_weight * retrieval_loss)


def build_rec_model(rating_weight, retrieval_weight, layer_size, uniq_content, uniq_clients, uniq_org,learning_rate):
    model = ArticlelensModel(rating_weight=rating_weight,
                             retrieval_weight=retrieval_weight,
                             layer_sizes=layer_size,
                             uniq_content=uniq_content,
                             uniq_clients=uniq_clients,
                             uniq_org=uniq_org)
    model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=learning_rate))
    return model


Overwriting ./buddieRec/trainer/model.py


In [19]:
%%writefile ./buddieRec/trainer/task.py

"""Trains a Keras model to predict income bracket from other Census data."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import os

from trainer import model
from trainer import util

import tensorflow as tf
import pandas as pd
import tensorflow_recommenders as tfrs
import datetime
import json
import numpy as np

def get_args():
    """Argument parser.
    Returns:
      Dictionary of arguments.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--job-dir",
        help="this model ignores this field, but it is required by gcloud",
        default="junk"
    )
    parser.add_argument(
        '--clientID_path',
        type=str,
        required=True,
        help='unique client ids'
    )
    parser.add_argument(
        '--contentID_path',
        type=str,
        required=True,
        help='unique content ids'
    )
    parser.add_argument(
        '--orgID_path',
        type=str,
        required=True,
        help='unqiue organization ids'
    )
    parser.add_argument(
        "--output_dir",
        help="GCS location to write checkpoints and export models",
        required=True
    )
    parser.add_argument(
        "--train_data_path",
        help="GCS location pattern of train files containing eval URLs",
        required=True
    )
    parser.add_argument(
        "--eval_data_path",
        help="GCS location pattern of eval files",
        required=True
    )
    parser.add_argument(
        "--nnsize",
        help = "Hidden layer sizes (provide space-separated sizes)",
        nargs = "+",
        type = int,
        default=[32, 8]
    )
    parser.add_argument(
        '--num_eval',
        type=int,
        default=20,
        help='number of times to go through the data, default=20')
    parser.add_argument(
        '--num_eval_samples',
        type=int,
        default=10000,
        help='number of times to go through the data, default=20')
    parser.add_argument(
        '--num_train_samples',
        type=int,
        default=10000*5,
        help='number of times to go through the data, default=20')
    parser.add_argument(
        '--train_batch_size',
        default=128,
        type=int,
        help='number of records to read during each training step, default=128')
    parser.add_argument(
        '--learning_rate',
        default=.01,
        type=float,
        help='learning rate for gradient descent, default=.01')
    parser.add_argument(
        '--verbosity',
        choices=['DEBUG', 'ERROR', 'FATAL', 'INFO', 'WARN'],
        default='INFO')
    args, _ = parser.parse_known_args()
    return args

def train_and_evaluate(args):
    """Trains and evaluates the Keras model.
    Uses the Keras model defined in model.py and trains on data loaded and
    preprocessed in util.py. Saves the trained model in TensorFlow SavedModel
    format to the path defined in part by the --job-dir argument.
    Args:
      args: dictionary of arguments - see get_args() for details
    """

    UNIQ_CLIENTS = [str(x) for x in pd.read_csv(args.clientID_path, header=None)[0].values]
    UNIQ_CONTENT = pd.read_csv(args.contentID_path, header=None)[0].values
    articles = tf.data.Dataset.from_tensor_slices(UNIQ_CONTENT)
    UNIQ_ORG = pd.read_csv(args.orgID_path, header=None)[0].values

    # note how to use strategy to do distributed training
    # strategy = tf.distribute.MirroredStrategy()
    # with strategy.scope():
    keras_model = model.build_rec_model(rating_weight=1,
                            retrieval_weight=1,
#                             layer_size=[64, 32],
                            layer_size=args.nnsize,
                            uniq_content=UNIQ_CONTENT,
                            uniq_clients=UNIQ_CLIENTS,
                            uniq_org=UNIQ_ORG,
                            learning_rate = args.learning_rate)

    TRAIN_BATCH_SIZE = args.train_batch_size
    NUM_TRAIN_EXAMPLES = args.num_train_samples  # training dataset repeats, so it will wrap around
    NUM_EVALS = args.num_eval  # how many times to evaluate
    NUM_EVAL_EXAMPLES = args.num_eval_samples  # enough to get a reasonable sample, but not so much that it slows down

    trainds = util.load_dataset(args.train_data_path,
                                TRAIN_BATCH_SIZE,
                                tf.estimator.ModeKeys.TRAIN)
    evalds = util.load_dataset(args.eval_data_path, 1000,
                               tf.estimator.ModeKeys.EVAL).take(NUM_EVAL_EXAMPLES // 1000)
    
    
    timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
    savedmodel_dir = os.path.join(args.output_dir, 'export/savedmodel')
    model_export_path = os.path.join(savedmodel_dir, timestamp)
    checkpoint_path = os.path.join(args.output_dir, 'checkpoints')
    tensorboard_path = os.path.join(args.output_dir, 'tensorboard')
    
    if tf.io.gfile.exists(args.output_dir):
        tf.io.gfile.rmtree(args.output_dir)
    
    checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(
        checkpoint_path,
        save_weights_only=True,
        verbose=1
    )
    
    # Setup Learning Rate decay.
    lr_decay_cb = tf.keras.callbacks.LearningRateScheduler(
        lambda epoch: args.learning_rate + 0.02 * (0.5 ** (1 + epoch)),
        verbose=True)
    # Setup TensorBoard callback.
    tensorboard_cb = tf.keras.callbacks.TensorBoard(tensorboard_path)

    early_stopping_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=40)
    history = keras_model.fit(trainds, validation_data=evalds, validation_steps=1,
                        epochs=args.num_eval,
                        steps_per_epoch=NUM_TRAIN_EXAMPLES // (TRAIN_BATCH_SIZE * NUM_EVALS),
                        callbacks=[lr_decay_cb,
                                   checkpoint_cb,
                                   early_stopping_callback,
                                   tensorboard_cb])

    metrics = keras_model.evaluate(evalds, return_dict=True)
    print(f"Retrieval top-10 accuracy: {metrics['factorized_top_k/top_10_categorical_accuracy']:.3f}.")
    print(f"Ranking RMSE: {metrics['root_mean_squared_error']:.3f}.")

    index = tfrs.layers.factorized_top_k.BruteForce(keras_model.query_model)
    index.index(articles.batch(100).map(keras_model.candidate_model), articles)
    
    
    # Get some recommendations.
    socres, titles = index({"clientId": np.array(["160145"]),
                    "organization": np.array(["leeromgeving"])})


    print(f"Top 3 recommendations for user 1395906622.160145 under leeromgeving environment: {titles[0, :3]}")

    index.save(model_export_path)
    # tf.keras.experimental.export_saved_model(model, export_path)
    print('Model exported to: {}'.format(model_export_path))
    
    
     # TODO 1
    hp_metric = history.history['val_rmse'][args.num_eval-1]
    
    # TODO 1
    hpt = hypertune.HyperTune()
    hpt.report_hyperparameter_tuning_metric(
        hyperparameter_metric_tag='rmse',
        metric_value=hp_metric,
        global_step=args.num_eval
    )
    
    return history
    
if __name__ == '__main__':
    args = get_args()
    tf.compat.v1.logging.set_verbosity(args.verbosity)
    
    hparams = args.__dict__
    hparams["output_dir"] = os.path.join(
        hparams["output_dir"],
        json.loads(
            os.environ.get("TF_CONFIG", "{}")
        ).get("task", {}).get("trial", "")
    )
    print("output_dir", hparams["output_dir"])
    
    train_and_evaluate(hparams)


Overwriting ./buddieRec/trainer/task.py


In [20]:
%%writefile ./buddieRec/trainer/util.py

"""Utilities to download and preprocess the Census data."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
from six.moves import urllib
import tempfile

import numpy as np
import pandas as pd
import tensorflow as tf


# Determine CSV, label, and key columns
CSV_COLUMNS = 'clientId,timeOnPage,contentId,organization'.split(',')
LABEL_COLUMN = 'timeOnPage'

DEFAULTS = [['0.0'], [0.0], ['null'], ['null']]

def features_and_labels(row_data):
    label = row_data.pop(LABEL_COLUMN)

    features = row_data  # only remains clientId, contentId, organization in features

    return features, label  # features, label



def load_dataset(pattern, batch_size=1, mode='eval'):
    """Loads dataset using the tf.data API from CSV files.

    Args:
        pattern: str, file pattern to glob into list of files.
        batch_size: int, the number of examples per batch.
        mode: 'train' | 'eval' to determine if training or evaluating.
    Returns:
        `Dataset` object.
    """
    print("mode = {}".format(mode))
    # Make a CSV dataset
    dataset = tf.data.experimental.make_csv_dataset(
        file_pattern=pattern,
        batch_size=batch_size,
        column_names=CSV_COLUMNS,
        column_defaults=DEFAULTS,
        header=False)

    # Map dataset to features and label
    dataset = dataset.map(map_func=features_and_labels)  # features, label

    # Shuffle and repeat for training
    if mode == 'train':
        dataset = dataset.shuffle(buffer_size=10).repeat()

    # Take advantage of multi-threading; 1=AUTOTUNE
    dataset = dataset.prefetch(buffer_size=1)

    return dataset


Overwriting ./buddieRec/trainer/util.py


In [40]:
%%writefile ./buddieRec/setup.py

from setuptools import find_packages
from setuptools import setup

from distutils.core import setup, Command

REQUIRED_PACKAGES = ['tensorflow_recommenders', 'gcsfs','tensorflow==2.3.0']

setup(
    name='trainer',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    description='My training application package.',
)

Overwriting ./buddieRec/setup.py


## Run trainer module package locally
Now we can test our training code locally as follows using the local test data. We'll run a very small training job over a single file with a small batch size and one eval step.

In [41]:
%%bash

EVAL_DATA_PATH=./buddieRec/tests/data/rec-valid*
TRAIN_DATA_PATH=./buddieRec/tests/data/rec-train*
clientID_PATH=./buddieRec/tests/data/rec-clientIds.csv
contentID_PATH=./buddieRec/tests/data/rec-contentIds.csv
orgID_PATH=./buddieRec/tests/data/rec-orgIds.csv
OUTPUT_DIR=./buddieRec-model

test ${OUTPUT_DIR} && rm -rf ${OUTPUT_DIR}
export PYTHONPATH=${PYTHONPATH}:${PWD}/buddieRec
    
echo $OUTPUT_DIR

python3 -m trainer.task \
--eval_data_path $EVAL_DATA_PATH \
--output_dir $OUTPUT_DIR \
--train_data_path $TRAIN_DATA_PATH \
--clientID_path $clientID_PATH \
--contentID_path $contentID_PATH \
--orgID_path $orgID_PATH

### Run your training package on Cloud AI Platform
Once the code works in standalone mode locally, you can run it on Cloud AI Platform. To submit to the Cloud we use gcloud ai-platform jobs submit training [jobname] and simply specify some additional parameters for AI Platform Training Service:

- jobid: A unique identifier for the Cloud job. We usually append system time to ensure uniqueness
- region: Cloud region to train in. See here for supported AI Platform Training Service regions
The arguments before -- \ are for AI Platform Training Service. The arguments after -- \ are sent to our task.py.

Because this is on the entire dataset, it will take a while. You can monitor the job from the GCP console in the Cloud AI Platform section.

In [42]:
%%bash

# Output directory and jobID
OUTDIR=gs://${BUCKET}/buddieRec/trained_model_$(date -u +%y%m%d_%H%M%S)
JOBID=buddieRec_$(date -u +%y%m%d_%H%M%S)
echo ${OUTDIR} ${REGION} ${JOBID}
gsutil -m rm -rf ${OUTDIR}

# Model and training hyperparameters
TFVERSION=2.3

# GCS paths
GCS_PROJECT_PATH=gs://$BUCKET/buddieRec
DATA_PATH=$GCS_PROJECT_PATH/data
TRAIN_DATA_PATH=$DATA_PATH/rec-train*
EVAL_DATA_PATH=$DATA_PATH/rec-valid*
clientID_PATH=$DATA_PATH/rec-clientIds.csv
contentID_PATH=$DATA_PATH/rec-contentIds.csv
orgID_PATH=$DATA_PATH/rec-orgIds.csv

#TODO 2
gcloud ai-platform jobs submit training $JOBID \
    --module-name=trainer.task \
    --package-path=buddieRec/trainer \
    --staging-bucket=gs://${BUCKET} \
    --python-version=3.7 \
    --runtime-version=${TFVERSION} \
    --region=${REGION} \
    -- \
    --eval_data_path $EVAL_DATA_PATH \
    --output_dir $OUTDIR \
    --train_data_path $TRAIN_DATA_PATH \
    --clientID_path $clientID_PATH \
    --contentID_path $contentID_PATH \
    --orgID_path $orgID_PATH

gs://buddie_rec_data/buddieRec/trained_model_210207_125028 europe-west4 buddieRec_210207_125028
jobId: buddieRec_210207_125028
state: QUEUED


If you experience problems with multiprocessing on MacOS, they might be related to https://bugs.python.org/issue33725. You can disable multiprocessing by editing your .boto config or by adding the following flag to your command: `-o "GSUtil:parallel_process_count=1"`. Note that multithreading is still available even if you disable multiprocessing.

If you experience problems with multiprocessing on MacOS, they might be related to https://bugs.python.org/issue33725. You can disable multiprocessing by editing your .boto config or by adding the following flag to your command: `-o "GSUtil:parallel_process_count=1"`. Note that multithreading is still available even if you disable multiprocessing.

CommandException: 1 files/objects could not be removed.
Job [buddieRec_210207_125028] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe buddieRec_210207_125028

or continue streaming the logs with the comman

## (Optional) Run your training package using Docker container
AI Platform Training also supports training in custom containers, allowing users to bring their own Docker containers with any pre-installed ML framework or algorithm to run on AI Platform Training.

In this last section, we'll see how to submit a Cloud training job using a customized Docker image.

Containerizing our ./buddieRec/trainer package involves 3 steps:

- Writing a Dockerfile in ./buddieRec
- Building the Docker image
- Pushing it to the Google Cloud container registry in our GCP project

The Dockerfile specifies

1. How the container needs to be provisioned so that all the dependencies in our code are satisfied
2. Where to copy our trainer Package in the container and how to install it (pip install /trainer)
3. What command to run when the container is ran (the ENTRYPOINT line)

In [64]:
%%writefile ./buddieRec/Dockerfile
FROM gcr.io/deeplearning-platform-release/tf2-cpu
# TODO 3

COPY . /code

WORKDIR /code

RUN pip3 install cloudml-hypertune
RUN pip3 install tensorflow_recommenders
RUN pip3 install gcsfs
RUN pip3 install tensorflow==2.3.0

ENTRYPOINT ["python3", "-m", "trainer.task"]

Overwriting ./buddieRec/Dockerfile


In [65]:
!gcloud auth configure-docker


{
  "credHelpers": {
    "europe-west4-docker.pkg.dev": "gcloud",
    "gcr.io": "gcloud",
    "us.gcr.io": "gcloud",
    "eu.gcr.io": "gcloud",
    "asia.gcr.io": "gcloud",
    "staging-k8s.gcr.io": "gcloud",
    "marketplace.gcr.io": "gcloud"
  }
}
Adding credentials for all GCR repositories.
gcloud credential helpers already registered correctly.


In [66]:
%%bash 
PROJECT_DIR=$(cd ./buddieRec && pwd)
PROJECT_ID=$(gcloud config list project --format "value(core.project)")
IMAGE_NAME=buddierec_training_container
DOCKERFILE=$PROJECT_DIR/Dockerfile
IMAGE_URI=gcr.io/$PROJECT_ID/$IMAGE_NAME

docker build $PROJECT_DIR -f $DOCKERFILE -t $IMAGE_URI

docker push $IMAGE_URI

Sending build context to Docker daemon  100.9kB
Step 1/8 : FROM gcr.io/deeplearning-platform-release/tf2-cpu
 ---> d52504fdb37e
Step 2/8 : COPY . /code
 ---> 24342be542c3
Step 3/8 : WORKDIR /code
 ---> Running in 0b62cf655aab
Removing intermediate container 0b62cf655aab
 ---> a0077405d914
Step 4/8 : RUN pip3 install cloudml-hypertune
 ---> Running in 27297a858477
Collecting cloudml-hypertune
  Downloading cloudml-hypertune-0.1.0.dev6.tar.gz (3.2 kB)
Building wheels for collected packages: cloudml-hypertune
  Building wheel for cloudml-hypertune (setup.py): started
  Building wheel for cloudml-hypertune (setup.py): finished with status 'done'
  Created wheel for cloudml-hypertune: filename=cloudml_hypertune-0.1.0.dev6-py2.py3-none-any.whl size=3988 sha256=f321f6ba7eb786fe069bc54297fe9e0280b9cd5717c4d6578e9d6dfb1c25422d
  Stored in directory: /root/.cache/pip/wheels/a7/ff/87/e7bed0c2741fe219b3d6da67c2431d7f7fedb183032e00f81e
Successfully built cloudml-hypertune
Installing collected pac

## Hypertunning
### Create config.yaml file
Specify the hyperparameter tuning configuration for your training job Create a HyperparameterSpec object to hold the hyperparameter tuning configuration for your training job, and add the HyperparameterSpec as the hyperparameters object in your TrainingInput object.

In your HyperparameterSpec, set the hyperparameterMetricTag to a value representing your chosen metric. If you don't specify a hyperparameterMetricTag, AI Platform Training looks for a metric with the name training/hptuning/metric. The following example shows how to create a configuration for a metric named metric1:

In [57]:
%%writefile hptuning_config.yaml
trainingInput:
  scaleTier: BASIC
  hyperparameters:
    goal: MINIMIZE
    maxTrials: 10
    maxParallelTrials: 2
    hyperparameterMetricTag: rmse # TODO
    enableTrialEarlyStopping: True
    params:
    - parameterName: learning_rate
      # TODO
      type: DOUBLE
      minValue: 0.0001
      maxValue: 0.1
      scaleType: UNIT_LOG_SCALE
    - parameterName: train_batch_size
      # TODO
      type: DISCRETE
      discreteValues:
      - 15
      - 30
      - 50

Overwriting hptuning_config.yaml


Report your hyperparameter metric to AI Platform Training
The way to report your hyperparameter metric to the AI Platform Training service depends on whether you are using TensorFlow for training or not. It also depends on whether you are using a runtime version or a custom container for training.

We recommend that your training code reports your hyperparameter metric to AI Platform Training frequently in order to take advantage of early stopping.

TensorFlow with a runtime version If you use an AI Platform Training runtime version and train with TensorFlow, then you can report your hyperparameter metric to AI Platform Training by writing the metric to a TensorFlow summary. Use one of the following functions.

You may need to install cloudml-hypertune on your machine to run this code locally

In [58]:
!pip install cloudml-hypertune



In [54]:
%%bash

EVAL_DATA_PATH=./buddieRec/tests/data/rec-valid*
TRAIN_DATA_PATH=./buddieRec/tests/data/rec-train*
clientID_PATH=./buddieRec/tests/data/rec-clientIds.csv
contentID_PATH=./buddieRec/tests/data/rec-contentIds.csv
orgID_PATH=./buddieRec/tests/data/rec-orgIds.csv
OUTPUT_DIR=./buddieRec-model

rm -rf ${OUTDIR}
export PYTHONPATH=${PYTHONPATH}:${PWD}/buddieRec
    
python3 -m trainer.task \
--eval_data_path $EVAL_DATA_PATH \
--output_dir $OUTPUT_DIR \
--train_data_path $TRAIN_DATA_PATH \
--clientID_path $clientID_PATH \
--contentID_path $contentID_PATH \
--orgID_path $orgID_PATH

mode = train
mode = eval

Epoch 00001: LearningRateScheduler reducing learning rate to 0.02.
Epoch 1/20
Epoch 00001: saving model to ./buddieRec-model/checkpoints

Epoch 00002: LearningRateScheduler reducing learning rate to 0.015.
Epoch 2/20
Epoch 00002: saving model to ./buddieRec-model/checkpoints

Epoch 00003: LearningRateScheduler reducing learning rate to 0.0125.
Epoch 3/20
Epoch 00003: saving model to ./buddieRec-model/checkpoints

Epoch 00004: LearningRateScheduler reducing learning rate to 0.01125.
Epoch 4/20
Epoch 00004: saving model to ./buddieRec-model/checkpoints

Epoch 00005: LearningRateScheduler reducing learning rate to 0.010625.
Epoch 5/20
Epoch 00005: saving model to ./buddieRec-model/checkpoints

Epoch 00006: LearningRateScheduler reducing learning rate to 0.0103125.
Epoch 6/20
Epoch 00006: saving model to ./buddieRec-model/checkpoints

Epoch 00007: LearningRateScheduler reducing learning rate to 0.01015625.
Epoch 7/20
Epoch 00007: saving model to ./buddieRec-model/

2021-02-07 14:30:51.576001: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN)to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-02-07 14:30:51.598990: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7f7f180503c0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2021-02-07 14:30:51.599005: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
2021-02-07 14:30:52.163266: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session started.
2021-02-07 14:30:53.999646: I tensorflow/core/profiler/lib/profiler_session.cc:164] Profiler session started.
Instructions for updating:
use `tf.profiler.experimental.stop` instead.
2021-02-07 14:30:54.072863: I tensorflow/core

In [55]:
ls buddieRec-model/tensorboard

[34mtrain[m[m/      [34mvalidation[m[m/


In [67]:
%%bash

PROJECT_ID=$(gcloud config list project --format "value(core.project)")
BUCKET="buddie_rec_data"
REGION="europe-west4"
TFVERSION="2.3"

# Output directory and jobID
OUTDIR=gs://${BUCKET}/buddieRec/trained_model_$(date -u +%y%m%d_%H%M%S)
JOBID=buddieRec_$(date -u +%y%m%d_%H%M%S)
echo ${OUTDIR} ${REGION} ${JOBID}
gsutil -m rm -rf ${OUTDIR}

# Model and training hyperparameters
BATCH_SIZE=15
NUM_EXAMPLES_TO_TRAIN_ON=500
NUM_EVAL_SAMPLES = 100
NUM_EVALS=10
LR=0.1
NNSIZE="32 8"

# GCS paths
GCS_PROJECT_PATH=gs://$BUCKET/buddieRec
DATA_PATH=$GCS_PROJECT_PATH/data
TRAIN_DATA_PATH=$DATA_PATH/rec-train*
EVAL_DATA_PATH=$DATA_PATH/rec-valid*
clientID_PATH=$DATA_PATH/rec-clientIds.csv
contentID_PATH=$DATA_PATH/rec-contentIds.csv
orgID_PATH=$DATA_PATH/rec-orgIds.csv

# TODO
gcloud ai-platform jobs submit training $JOBID \
    --module-name=trainer.task \
    --package-path=buddieRec/trainer \
    --staging-bucket=gs://${BUCKET} \
    --config=hptuning_config.yaml \
    --python-version=3.7 \
    --runtime-version=${TFVERSION} \
    --region=${REGION} \
    -- \
    --eval_data_path $EVAL_DATA_PATH \
    --output_dir $OUTDIR \
    --train_data_path $TRAIN_DATA_PATH \
    --clientID_path $clientID_PATH \
    --contentID_path $contentID_PATH \
    --orgID_path $orgID_PATH \
    --train_batch_size $BATCH_SIZE \
    --learning_rate $LR \

gs://buddie_rec_data/buddieRec/trained_model_210207_184643 europe-west4 buddieRec_210207_184643
jobId: buddieRec_210207_184643
state: QUEUED


If you experience problems with multiprocessing on MacOS, they might be related to https://bugs.python.org/issue33725. You can disable multiprocessing by editing your .boto config or by adding the following flag to your command: `-o "GSUtil:parallel_process_count=1"`. Note that multithreading is still available even if you disable multiprocessing.

If you experience problems with multiprocessing on MacOS, they might be related to https://bugs.python.org/issue33725. You can disable multiprocessing by editing your .boto config or by adding the following flag to your command: `-o "GSUtil:parallel_process_count=1"`. Note that multithreading is still available even if you disable multiprocessing.

CommandException: 1 files/objects could not be removed.
bash: line 16: NUM_EVAL_SAMPLES: command not found
Job [buddieRec_210207_184643] submitted successfully.
Your job is still active. You may view the status of your job with the command

  $ gcloud ai-platform jobs describe buddieRec_210207_184