In [1]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Imports


Let's first get our imports out of the way.

In [2]:
!pip install -q tensorflow-recommenders
!pip install -q --upgrade tensorflow-datasets
!pip install -q scann

In [3]:
import os
import pprint
import tempfile

from typing import Dict, Text

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds

2023-10-18 01:41:26.218939: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [4]:
import tensorflow_recommenders as tfrs

In [5]:
!python --version

Python 3.10.12


# Load the dataset

In [6]:
# Ratings data.
ratings = tfds.load("movielens/100k-ratings", split="train")
# Features of all the available movies.
movies = tfds.load("movielens/100k-movies", split="train")

2023-10-18 01:41:28.847159: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-10-18 01:41:28.858842: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2023-10-18 01:41:28.861927: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysf

# Prepare Training Data

In [7]:
ratings = ratings.map(lambda x: {
    "movie_title": x["movie_title"],
    "user_id": x["user_id"],
    "user_rating": x["user_rating"]
})
movies = movies.map(lambda x: x["movie_title"])

In [8]:
movie_titles = movies.batch(1_000)
user_ids = ratings.batch(1_000).map(lambda x: x["user_id"])

unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

In [103]:
tf.keras.utils.set_random_seed(42)
tf.config.experimental.enable_op_determinism()

dataset_size = min(len(ratings) // 2, 100_000)
train_size = dataset_size * 4 // 5
test_size = dataset_size - train_size

shuffled = ratings.shuffle(dataset_size, reshuffle_each_iteration=False)

train = shuffled.take(train_size)
test = shuffled.skip(train_size).take(test_size)

### The full model 

In [104]:
strategy = tf.distribute.get_strategy()
embedding_dimension = 32

In [105]:
class MovielensModel(tfrs.Model):

    def __init__(self, rating_weight: float, retrieval_weight: float) -> None:
        super().__init__()
        
        # User and movie models.
        self.movie_model: tf.keras.layers.Layer = tf.keras.Sequential([
          tf.keras.layers.StringLookup(
            vocabulary=unique_movie_titles, mask_token=None),
          tf.keras.layers.Embedding(len(unique_movie_titles) + 1, embedding_dimension)
        ])
        self.user_model: tf.keras.layers.Layer = tf.keras.Sequential([
          tf.keras.layers.StringLookup(
            vocabulary=unique_user_ids, mask_token=None),
          tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
        ])

        # A small model to take in user and movie embeddings and predict ratings.
        # We can make this as complicated as we want as long as we output a scalar
        # as our prediction.
        self.rating_model = tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation="relu"),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dense(1),
        ])

        # The tasks.
        self.rating_task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
            loss=tf.keras.losses.MeanSquaredError(),
            metrics=[tf.keras.metrics.RootMeanSquaredError()],
        )
        self.retrieval_task: tf.keras.layers.Layer = tfrs.tasks.Retrieval(
            metrics=tfrs.metrics.FactorizedTopK(
                candidates=movies.batch(128).map(self.movie_model)
            )
        )
        
        self.rating_weight = rating_weight
        self.retrieval_weight = retrieval_weight

    def call(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:
        # We pick out the user features and pass them into the user model.
        user_embeddings = self.user_model(features["user_id"])
        # And pick out the movie features and pass them into the movie model.
        movie_embeddings = self.movie_model(features["movie_title"])

        return (
            user_embeddings,
            movie_embeddings,
            # We apply the multi-layered rating model to a concatentation of
            # user and movie embeddings.
            self.rating_model(
                tf.concat([user_embeddings, movie_embeddings], axis=1)
            ),
        )

    def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
        ratings = features.pop("user_rating")

        user_embeddings, movie_embeddings, rating_predictions = self(features)

        # We compute the loss for each task.
        rating_loss = self.rating_task(
            labels=ratings,
            predictions=rating_predictions,
        )
        retrieval_loss = self.retrieval_task(user_embeddings, movie_embeddings)

        return (self.rating_weight * rating_loss
                + self.retrieval_weight * retrieval_loss)

## Fitting and evaluating 

In [109]:
with strategy.scope():
    model = MovielensModel(rating_weight=1.0, retrieval_weight=1.0)
    model.compile(optimizer=tf.keras.optimizers.Adagrad(0.1))

In [110]:
cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()

In [111]:
model.fit(cached_train, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7f6b604b69b0>

In [112]:
metrics = model.evaluate(cached_test, return_dict=True)
print(f"Retrieval top-100 accuracy: {metrics['factorized_top_k/top_100_categorical_accuracy']:.3f}.")
print(f"Ranking RMSE: {metrics['root_mean_squared_error']:.3f}.")

Retrieval top-100 accuracy: 0.148.
Ranking RMSE: 1.136.


# Create ScaNN Index

In [113]:
scann_index = tfrs.layers.factorized_top_k.ScaNN(model.user_model)
scann_index.index_from_dataset(
  tf.data.Dataset.zip((movies.batch(100), movies.batch(100).map(model.movie_model)))
)

2023-10-18 18:00:21.028236: I scann/partitioning/partitioner_factory_base.cc:59] Size of sampled dataset for training partition: 1682
2023-10-18 18:00:21.033815: I ./scann/partitioning/kmeans_tree_partitioner_utils.h:84] PartitionerFactory ran in 5.529921ms.


<tensorflow_recommenders.layers.factorized_top_k.ScaNN at 0x7f6b603fca30>

# Save all model artifacts

In [114]:
# Export the query model.
scann_path = "deploy/models/scann_100k"

scann_index.call(tf.constant(["user_id"]), tf.constant(1))
scann_index.query_with_exclusions(tf.constant(["user_id"]), tf.constant([["movie_title"]]), tf.constant(5))

# Save the index.
tf.keras.models.save_model(
  scann_index,
  scann_path,
  options=tf.saved_model.SaveOptions(namespace_whitelist=["Scann"])
)









INFO:tensorflow:Assets written to: models/scann_100k/assets


INFO:tensorflow:Assets written to: models/scann_100k/assets










In [115]:
user_model_path = "deploy/models/user_model"

user_embedding = model.user_model(tf.constant(["user_id"]))

# Save the index.
model.user_model.save(user_model_path, save_format='tf')





INFO:tensorflow:Assets written to: models/user_model/assets


INFO:tensorflow:Assets written to: models/user_model/assets


In [116]:
movie_model_path = "deploy/models/movie_model"

movie_embedding = model.movie_model(tf.constant(["movie_title"]))

# Save the index.
model.movie_model.save(movie_model_path, save_format='tf')





INFO:tensorflow:Assets written to: models/movie_model/assets


INFO:tensorflow:Assets written to: models/movie_model/assets


In [117]:
rating_model_path = "deploy/models/rating_model"

model.rating_model(tf.concat([user_embedding, movie_embedding], axis=1))

# Save the index.
model.rating_model.save(rating_model_path, save_format='tf')





INFO:tensorflow:Assets written to: models/rating_model/assets


INFO:tensorflow:Assets written to: models/rating_model/assets


# Load models for testing prediction functions

In [118]:
scann_loaded = tf.keras.models.load_model(scann_path)
user_model_loaded = tf.keras.models.load_model(user_model_path)
movie_model_loaded = tf.keras.models.load_model(movie_model_path)
rating_model_loaded = tf.keras.models.load_model(rating_model_path)

















# Custom predict to return retrieval results

NOTE: returned scores are not normalized

In [120]:
DEFAULT_K = 10

def retrieval_predict(user_id, exclusions = [], k = DEFAULT_K):
    user_ids = tf.constant([user_id])
    k = tf.constant(k)

    if len(exclusions) > 0:
        exclusions = tf.constant([exclusions])
        k_scores, k_predictions = scann_loaded.query_with_exclusions(user_ids, exclusions, k)
    else:
        k_scores, k_predictions = scann_loaded.call(user_ids, k)

    k_predictions_json = k_predictions.numpy().tolist()
    k_scores_json = k_scores.numpy().tolist()

    k_encoded_predictions_json = [[pred.decode('utf-8') for pred in pred_list] for pred_list in k_predictions_json]

    return {
        "movie_titles": k_encoded_predictions_json[0],
        "movie_scores": k_scores_json[0]
    }

In [None]:
import time

def check_prediction_time(n):
    request = {
        "instances": [
            {
                "user_id": "41",
                "exclusions": ["Fargo (1996)"],
            }
        ]*n,
        "parameters": {
            "k": 10
        }
    }

    instances = request["instances"]
    parameters = request["parameters"]
    k = parameters.get("k", 10)

    start_time = time.time()
    predictions = [retrieval_predict(**instance, k = k) for instance in instances]
    end_time = time.time()
    
    return end_time - start_time

check_prediction_time(n = 1000)

1.887946367263794

In [None]:
import json

request = {
    "instances": [
        {
            "user_id": "41",
            "exclusions": ["Fargo (1996)"],
        },
        {
            "user_id": "42",
            "exclusions": [],
        }
    ],
    "parameters": {
        "k": 10
    }
}

instances = request["instances"]
parameters = request["parameters"]
k = parameters.get("k", 10)

predictions = [retrieval_predict(**instance, k = k) for instance in instances]

print(json.dumps(predictions, indent=2))

[
  {
    "movie_titles": [
      "Winnie the Pooh and the Blustery Day (1968)",
      "Alien (1979)",
      "Deconstructing Harry (1997)",
      "Patton (1970)",
      "To Wong Foo, Thanks for Everything! Julie Newmar (1995)",
      "Of Love and Shadows (1994)",
      "Miracle on 34th Street (1994)",
      "Mask, The (1994)",
      "Bed of Roses (1996)",
      "Fly Away Home (1996)"
    ],
    "movie_scores": [
      9.653619766235352,
      7.183511734008789,
      7.077798843383789,
      6.192534446716309,
      6.092672348022461,
      5.85479736328125,
      5.565959930419922,
      5.52648401260376,
      5.500728607177734,
      5.386192321777344
    ]
  },
  {
    "movie_titles": [
      "Two if by Sea (1996)",
      "Mask, The (1994)",
      "Malice (1993)",
      "Bridges of Madison County, The (1995)",
      "Smoke (1995)",
      "Firm, The (1993)",
      "Star Wars (1977)",
      "Beauty and the Beast (1991)",
      "Cats Don't Dance (1997)",
      "Clear and Present Dange

# Custom predict to return retrieval results + ratings

NOTE: This doesn't guarantee that all the retrieved results have the highest rating. Top K retrieval is based on the user and movie embedding models. Ratings are determined after top k results are retrieved.

The ratings are tuned to user ratings (1-5) but aren't normalized. They can be by setting a min/max

In [122]:
def retrieval_and_rating_predict(user_id, k, exclusions = []):
    user_ids = tf.constant([user_id])
    
    if len(exclusions) > 0:
        exclusions = tf.constant([exclusions])
        k_scores, k_predictions = scann_loaded.query_with_exclusions(user_ids, exclusions, k)
    else:
        k_scores, k_predictions = scann_loaded.call(user_ids, k)

    k_predictions_json = k_predictions.numpy().tolist()
    k_encoded_predictions_json = [[pred.decode('utf-8') for pred in pred_list] for pred_list in k_predictions_json]

    user_embeddings = user_model_loaded(user_ids)
    movie_embeddings = movie_model_loaded(k_predictions)
    
    # Handle case when less than k candidates can be returned
    movie_embedding_size = tf.size(movie_embeddings)
    valid_k = min(k, movie_embedding_size // embedding_dimension)

    flattened_movie_embeddings = tf.reshape(movie_embeddings, (valid_k, embedding_dimension))
    repeated_user_embeddings = tf.repeat(user_embeddings, repeats=valid_k, axis=0)

    flattened_movie_ratings = rating_model_loaded(tf.concat([repeated_user_embeddings, flattened_movie_embeddings], axis=1))

    movie_ratings = tf.reshape(flattened_movie_ratings, (1, valid_k))
    movie_ratings_json = movie_ratings.numpy().tolist()

    predictions = {
        "movies": k_encoded_predictions_json[0],
        "ratings": movie_ratings_json[0]
    }
    
    return predictions

In [123]:
import json

request = {
    "instances": [
        {
            "user_id": "41",
            "exclusions": ["Fargo (1996)"],
        },
        {
            "user_id": "42",
            "exclusions": [],
        }
    ],
    "parameters": {
        "k": 10
    }
}

instances = request["instances"]
parameters = request["parameters"]
k = parameters.get("k", 10)

predictions = [retrieval_and_rating_predict(**instance, k = k) for instance in instances]

print(json.dumps(predictions, indent=2))

[
  {
    "movies": [
      "Groundhog Day (1993)",
      "Local Hero (1983)",
      "Apollo 13 (1995)",
      "Empire Strikes Back, The (1980)",
      "E.T. the Extra-Terrestrial (1982)",
      "Silence of the Lambs, The (1991)",
      "Blues Brothers, The (1980)",
      "It's a Wonderful Life (1946)",
      "Penny Serenade (1941)",
      "American in Paris, An (1951)"
    ],
    "ratings": [
      4.262782096862793,
      4.885346412658691,
      4.478445053100586,
      4.4249725341796875,
      4.450826644897461,
      4.708822250366211,
      4.400209426879883,
      4.602962493896484,
      4.568150520324707,
      4.511090278625488
    ]
  },
  {
    "movies": [
      "Rent-a-Kid (1995)",
      "Paper, The (1994)",
      "Sneakers (1992)",
      "Black Sheep (1996)",
      "Pinocchio (1940)",
      "She's the One (1996)",
      "Santa Clause, The (1994)",
      "North (1994)",
      "Lion King, The (1994)",
      "Substitute, The (1996)"
    ],
    "ratings": [
      3.930472135