<a href="https://colab.research.google.com/github/tungrix/recommender-system/blob/Joel/SDSC3002_recommender_system.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Recommending movies: retrieval with distribution strategy

In this tutorial, we're going to train the same retrieval model as we did in the [basic retrieval](basic_retrieval) tutorial, but with distribution strategy.

We're going to:

1. Get our data and split it into a training and test set.
2. Set up two virtual GPUs and TensorFlow MirroredStrategy.
3. Implement a retrieval model using MirroredStrategy.
4. Fit it with MirrorredStrategy and evaluate it.



## Imports


Let's first get our imports out of the way.

In [1]:
!pip install -q tensorflow-recommenders
!pip install -q --upgrade tensorflow-datasets

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m96.2/96.2 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.4/5.4 MB[0m [31m65.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m92.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import os
import pprint
import tempfile

from typing import Dict, Text

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds

In [3]:
import tensorflow_recommenders as tfrs

## Preparing the dataset

We prepare the dataset in exactly the same way as we do in the [basic retrieval](basic_retrieval) tutorial.

In [23]:
# Ratings data.
ratings_raw = tfds.load("movielens/100k-ratings", split="train")
# Features of all the available movies.
movies_raw = tfds.load("movielens/100k-movies", split="train")

for x in ratings_raw.take(1).as_numpy_iterator():
  pprint.pprint(x)

for x in movies_raw.take(1).as_numpy_iterator():
  pprint.pprint(x)

{'bucketized_user_age': 45.0,
 'movie_genres': array([7]),
 'movie_id': b'357',
 'movie_title': b"One Flew Over the Cuckoo's Nest (1975)",
 'raw_user_age': 46.0,
 'timestamp': 879024327,
 'user_gender': True,
 'user_id': b'138',
 'user_occupation_label': 4,
 'user_occupation_text': b'doctor',
 'user_rating': 4.0,
 'user_zip_code': b'53211'}
{'movie_genres': array([4]),
 'movie_id': b'1681',
 'movie_title': b'You So Crazy (1994)'}


In [24]:
for x in ratings_raw.take(1).as_numpy_iterator():
  pprint.pprint(x["movie_genres"])

array([7])


Given the overlapping nature of some of the features in the dataset (see `'bucketized_user_age'`/`'raw_user_age'`, `'user_occupation_label'`/`'user_occupation_text'`)

In [120]:
ratings = ratings_raw.map(lambda x: {
    "movie_title": x["movie_title"],
    "user_id": x["user_id"],
    "user_rating": x["user_rating"],
    "user_occupation_label": tf.strings.as_string(x["user_occupation_label"]),
    "bucketized_user_age": tf.strings.as_string(tf.dtypes.cast(x["bucketized_user_age"], tf.int32)),
    "user_gender": tf.strings.as_string(x["user_gender"]),
    "timestamp": x["timestamp"]
})
movies = movies_raw.map(lambda x: x["movie_title"])

movie_titles = movies.batch(100_000)
user_ids = ratings.batch(1_000_000).map(lambda x: x["user_id"])

In [97]:
user_ratings = ratings.batch(1_000_000).map(lambda x: x["user_rating"])
unique_ratings = np.unique(np.concatenate(list(user_ratings)))

In [99]:
user_occupations = ratings.batch(1_000_000).map(lambda x: x["user_occupation_label"])
unique_occupations = np.unique(np.concatenate(list(user_occupations)))

In [100]:
unique_occupations

array([b'0', b'1', b'10', b'11', b'12', b'13', b'14', b'15', b'17', b'18',
       b'2', b'21', b'4', b'5', b'6', b'8', b'9'], dtype=object)

In [101]:
user_ages = ratings.batch(1_000_000).map(lambda x: x["bucketized_user_age"])
unique_ages = np.unique(np.concatenate(list(user_ages)))

In [102]:
unique_ages

array([b'1', b'18', b'25', b'35', b'45', b'50', b'56'], dtype=object)

In [121]:
user_genders = ratings.batch(1_000_000).map(lambda x: x["user_gender"])
unique_genders = np.unique(np.concatenate(list(user_genders)))

In [104]:
unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

unique_movie_titles[:10]

array([b"'Til There Was You (1997)", b'1-900 (1994)',
       b'101 Dalmatians (1996)', b'12 Angry Men (1957)', b'187 (1997)',
       b'2 Days in the Valley (1996)',
       b'20,000 Leagues Under the Sea (1954)',
       b'2001: A Space Odyssey (1968)',
       b'3 Ninjas: High Noon At Mega Mountain (1998)',
       b'39 Steps, The (1935)'], dtype=object)

In [105]:
timestamps = np.concatenate(list(ratings.map(lambda x: x["timestamp"]).batch(100)))

max_timestamp = timestamps.max()
min_timestamp = timestamps.min()

timestamp_buckets = np.linspace(
    min_timestamp, max_timestamp, num=1000,
)

## Set up two virtual GPUs

If you have not added GPU accelerators to your Colab, please disconnect the Colab runtime and do it now. We need the GPU to run the code below:

In [32]:
gpus = tf.config.list_physical_devices("GPU")
if gpus:
  # Create 2 virtual GPUs with 1GB memory each
  try:
    tf.config.set_logical_device_configuration(
        gpus[0],
        [tf.config.LogicalDeviceConfiguration(memory_limit=1024),
         tf.config.LogicalDeviceConfiguration(memory_limit=1024)])
    logical_gpus = tf.config.list_logical_devices("GPU")
    print(len(gpus), "Physical GPU,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    # Virtual devices must be set before GPUs have been initialized
    print(e)

strategy = tf.distribute.MirroredStrategy()

Virtual devices cannot be modified after being initialized


## Implementing a model

We implement the user_model, movie_model, metrics and task in the same way as we do in the [basic retrieval](basic_retrieval) tutorial, but we wrap them in the distribution strategy scope:

In [33]:
embedding_dimension = 32

In [148]:
class UserModel(tf.keras.Model):

  def __init__(self):
    super().__init__()

    self.user_embedding = tf.keras.Sequential([
        tf.keras.layers.StringLookup(
          vocabulary=unique_user_ids, mask_token=None),
        # We add an additional embedding to account for unknown tokens.
        tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
    ])

    # self.user_occupation = tf.keras.Sequential([
    #     tf.keras.layers.StringLookup(
    #       vocabulary=unique_occupations, mask_token=None),
    #     # We add an additional embedding to account for unknown tokens.
    #     tf.keras.layers.Embedding(len(unique_occupations) + 1, embedding_dimension)
    # ])

    self.user_age = tf.keras.Sequential([
        tf.keras.layers.StringLookup(
          vocabulary=unique_ages, mask_token=None),
        # We add an additional embedding to account for unknown tokens.
        tf.keras.layers.Embedding(len(unique_ages) + 1, embedding_dimension)
    ])

    # self.user_gender = tf.keras.Sequential([
    #     tf.keras.layers.StringLookup(
    #       vocabulary=unique_genders, mask_token=None),
    #     # We add an additional embedding to account for unknown tokens.
    #     tf.keras.layers.Embedding(len(unique_genders) + 1, embedding_dimension)
    # ])

    self.timestamp_embedding = tf.keras.Sequential([
        tf.keras.layers.Discretization(timestamp_buckets.tolist()),
        tf.keras.layers.Embedding(len(timestamp_buckets) + 1, embedding_dimension),
    ])
    self.normalized_timestamp = tf.keras.layers.Normalization(
        axis=None
    )

    self.normalized_timestamp.adapt(timestamps)



  def call(self, inputs):

    # Take the input dictionary, pass it through each input layer,
    # and concatenate the result.
    return tf.concat([
        self.user_embedding(inputs["user_id"]),
        # self.user_occupation(inputs["user_occupation_label"]),
        self.user_age(inputs["bucketized_user_age"]),
        # self.user_gender(inputs["user_gender"]),
        self.timestamp_embedding(inputs["timestamp"]),
        tf.reshape(self.normalized_timestamp(inputs["timestamp"]), (-1, 1))
    ], axis=1)

In [149]:
class MovieModel(tf.keras.Model):

  def __init__(self):
    super().__init__()

    max_tokens = 10_000

    self.title_embedding = tf.keras.Sequential([
      tf.keras.layers.StringLookup(
          vocabulary=unique_movie_titles, mask_token=None),
      tf.keras.layers.Embedding(len(unique_movie_titles) + 1, embedding_dimension)
    ])

    self.title_vectorizer = tf.keras.layers.TextVectorization(
        max_tokens=max_tokens)
    
    self.title_text_embedding = tf.keras.Sequential([
      self.title_vectorizer,
      tf.keras.layers.Embedding(max_tokens, embedding_dimension, mask_zero=True),
      tf.keras.layers.GlobalAveragePooling1D(),
    ])

    self.title_vectorizer.adapt(movies)

  def call(self, input):
    return tf.concat([
        self.title_embedding(input),
        self.title_text_embedding(input),
    ], axis=1)


In [150]:
embedding_dimension = 32

with strategy.scope():
    query_model = tf.keras.Sequential([
      UserModel(),
      tf.keras.layers.Dense(64)
    ])
    candidate_model = tf.keras.Sequential([
      MovieModel(),
      tf.keras.layers.Dense(64)
    ])
    task = tfrs.tasks.Retrieval(
        metrics=tfrs.metrics.FactorizedTopK(
            candidates=movies.batch(128).map(candidate_model),
        ),
    )

We can now put it all together into a model. This is exactly the same as in the [basic retrieval](basic_retrieval) tutorial.

In [151]:
class MovielensModel(tfrs.models.Model):

  def __init__(self):
    super().__init__()
    self.query_model = query_model
    self.candidate_model = candidate_model
    self.task = task

  def compute_loss(self, features, training=False):
    query_embeddings = self.query_model({
        "user_id": features["user_id"],
        # "user_occupation_label": features["user_occupation_label"],
        "bucketized_user_age": features["bucketized_user_age"],
        # "user_gender": features["user_gender"],
        "timestamp": features["timestamp"],
    })
    movie_embeddings = self.candidate_model(features["movie_title"])

    return self.task(query_embeddings, movie_embeddings)

## Fitting and evaluating

Now we instantiate and compile the model within the distribution strategy scope.

Note that we are using Adam optimizer here instead of Adagrad as in the [basic retrieval](basic_ranking) tutorial since Adagrad is not supported here.

In [152]:
with strategy.scope():
  model = MovielensModel()
  model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1, weight_decay=0.001)) # optimizer=tf.keras.optimizers.Adagrad(0.1)

Then shuffle, batch, and cache the training and evaluation data.

In [153]:
tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()

Then train the  model:

In [None]:
model.fit(cached_train, epochs=10)

Epoch 1/10




Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
 1/10 [==>...........................] - ETA: 9s - factorized_top_k/top_1_categorical_accuracy: 3.6621e-04 - factorized_top_k/top_5_categorical_accuracy: 0.0051 - factorized_top_k/top_10_categorical_accuracy: 0.0131 - factorized_top_k/top_50_categorical_accuracy: 0.0839 - factorized_top_k/top_100_categorical_accuracy: 0.1862 - loss: 70129.3984 - regularization_loss: 0.0000e+00 - total_loss: 70129.3984

In [None]:
test_accuracy = model.evaluate(
    cached_test, return_dict=True)["factorized_top_k/top_100_categorical_accuracy"]

print(f"Top-100 accuracy (test): {test_accuracy:.2f}.")

In [147]:
test_accuracy = model.evaluate(
    cached_test, return_dict=True)["factorized_top_k/top_100_categorical_accuracy"]

print(f"Top-100 accuracy (test): {test_accuracy:.2f}.")



Top-100 accuracy (test): 0.24.


You can see from the training log that TFRS is making use of both virtual GPUs.

Finally, we can evaluate our model on the test set:

This concludes the retrieval with distribution strategy tutorial.