# 2. Recommendation Model: Efficient Serving using approximate nearest neighbour search (ANN) 

https://www.tensorflow.org/recommenders/examples/efficient_serving

In [1]:
%pip install -q tensorflow-recommenders
%pip install -q --upgrade tensorflow-datasets
%pip install -q scann
%pip install -q tf_keras # install legacy keras to overcome bug in https://github.com/tensorflow/recommenders/issues/712 

[0mNote: you may need to restart the kernel to use updated packages.
[0mNote: you may need to restart the kernel to use updated packages.
[0mNote: you may need to restart the kernel to use updated packages.
[0mNote: you may need to restart the kernel to use updated packages.


In [2]:
import os
os.environ['TF_USE_LEGACY_KERAS'] = '1' 

import mlflow
mlflow.autolog()

In [3]:
from typing import Dict, Text

import pprint
import tempfile

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs

2024-10-12 16:21:19.076381: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-10-12 16:21:19.087004: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-10-12 16:21:19.090206: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-10-12 16:21:19.098971: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024/10/12 16:21:19 INFO mlflow.tracking.fluent: Auto

# Load data

In [4]:
# Load the MovieLens 100K data.
ratings = tfds.load(
    "movielens/100k-ratings",
    split="train"
)

# Get the ratings data.
ratings = (ratings
           # Retain only the fields we need.
           .map(lambda x: {"user_id": x["user_id"], "movie_title": x["movie_title"]})
           # Cache for efficiency.
           .cache(tempfile.NamedTemporaryFile().name)
)

# Get the movies data.
movies = tfds.load("movielens/100k-movies", split="train")
movies = (movies
          # Retain only the fields we need.
          .map(lambda x: x["movie_title"])
          # Cache for efficiency.
          .cache(tempfile.NamedTemporaryFile().name))


I0000 00:00:1728721280.164800   10468 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1728721280.195405   10468 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1728721280.195612   10468 cuda_executor.cc:1015] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
I0000 00:00:1728721280.196666   10468 cuda_executor.cc:1015] successful NUMA node read from SysFS ha

In [5]:
user_ids = ratings.map(lambda x: x["user_id"])

unique_movie_titles = np.unique(np.concatenate(list(movies.batch(1000))))
unique_user_ids = np.unique(np.concatenate(list(user_ids.batch(1000))))


2024-10-12 16:21:20.496457: W tensorflow/core/kernels/data/cache_dataset_ops.cc:332] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.
2024-10-12 16:21:20.497699: I tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
2024-10-12 16:21:21.529405: I tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
2024-10-12 16:21:21.529723: W tensorflow/core/kernels/data/cache_dataset_ops.cc:332] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded

# Train / Test Split

In [6]:
tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)


# Model definition

In [7]:
class MovielensModel(tfrs.Model):

  def __init__(self):
    super().__init__()

    embedding_dimension = 32

    # Set up a model for representing movies.
    self.movie_model = tf.keras.Sequential([
      tf.keras.layers.StringLookup(
        vocabulary=unique_movie_titles, mask_token=None),
      # We add an additional embedding to account for unknown tokens.
      tf.keras.layers.Embedding(len(unique_movie_titles) + 1, embedding_dimension)
    ])

    # Set up a model for representing users.
    self.user_model = tf.keras.Sequential([
      tf.keras.layers.StringLookup(
        vocabulary=unique_user_ids, mask_token=None),
        # We add an additional embedding to account for unknown tokens.
      tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
    ])

    # Set up a task to optimize the model and compute metrics.
    self.task = tfrs.tasks.Retrieval(
      metrics=tfrs.metrics.FactorizedTopK(
        candidates=(
            movies
            .batch(128)
            .cache()
            .map(lambda title: (title, self.movie_model(title)))
        )
      )
    )

  def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
    # We pick out the user features and pass them into the user model.
    user_embeddings = self.user_model(features["user_id"])
    # And pick out the movie features and pass them into the movie model,
    # getting embeddings back.
    positive_movie_embeddings = self.movie_model(features["movie_title"])

    # The task computes the loss and the metrics.

    return self.task(
        user_embeddings,
        positive_movie_embeddings,
        candidate_ids=features["movie_title"],
        compute_metrics=not training
    )


# Fitting & Evaluation

In [8]:
with mlflow.start_run():
    # Fitting
    model = MovielensModel()
    model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))
    model.fit(train.batch(8192), epochs=5, verbose=2)

    # Evaluation
    model.evaluate(test.batch(8192), return_dict=True)



Epoch 1/5


I0000 00:00:1728721283.763809   10631 service.cc:146] XLA service 0x74c2c46ef680 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1728721283.763829   10631 service.cc:154]   StreamExecutor device (0): NVIDIA GeForce RTX 3080 Ti, Compute Capability 8.6
2024-10-12 16:21:23.767657: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2024-10-12 16:21:23.775461: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:531] Loaded cuDNN version 8907
I0000 00:00:1728721283.802911   10631 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


10/10 - 1s - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 53547.8438 - regularization_loss: 0.0000e+00 - total_loss: 53547.8438 - 1s/epoch - 102ms/step
Epoch 2/5




10/10 - 0s - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 52130.3320 - regularization_loss: 0.0000e+00 - total_loss: 52130.3320 - 365ms/epoch - 36ms/step
Epoch 3/5




10/10 - 0s - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 51503.8203 - regularization_loss: 0.0000e+00 - total_loss: 51503.8203 - 359ms/epoch - 36ms/step
Epoch 4/5




10/10 - 0s - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 51076.5977 - regularization_loss: 0.0000e+00 - total_loss: 51076.5977 - 362ms/epoch - 36ms/step
Epoch 5/5




10/10 - 0s - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 50746.5938 - regularization_loss: 0.0000e+00 - total_loss: 50746.5938 - 364ms/epoch - 36ms/step


2024-10-12 16:21:25.900161: I tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence




# Brute force prediction

In [9]:
brute_force = tfrs.layers.factorized_top_k.BruteForce(model.user_model)
brute_force.index_from_dataset(
    movies.batch(128).map(lambda title: (title, model.movie_model(title)))
)

<tensorflow_recommenders.layers.factorized_top_k.BruteForce at 0x74c7b81aa1a0>

In [10]:
# Get predictions for user 42.
_, titles = brute_force(np.array(["42"]), k=3)

print(f"Top recommendations: {titles[0]}")

Top recommendations: [b'Rudy (1993)' b'When a Man Loves a Woman (1994)'
 b'Affair to Remember, An (1957)']


In [11]:
%timeit _, titles = brute_force(np.array(["42"]), k=3)

923 μs ± 13 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


# Simulate large data set

In [12]:
# Construct a dataset of movies that's 1,000 times larger. We 
# do this by adding several million dummy movie titles to the dataset.
lots_of_movies = tf.data.Dataset.concatenate(
    movies.batch(4096),
    movies.batch(4096).repeat(1_000).map(lambda x: tf.zeros_like(x))
)

# We also add lots of dummy embeddings by randomly perturbing
# the estimated embeddings for real movies.
lots_of_movies_embeddings = tf.data.Dataset.concatenate(
    movies.batch(4096).map(model.movie_model),
    movies.batch(4096).repeat(1_000)
      .map(lambda x: model.movie_model(x))
      .map(lambda x: x * tf.random.uniform(tf.shape(x)))
)


# Brute force index

In [13]:
brute_force_lots = tfrs.layers.factorized_top_k.BruteForce()
brute_force_lots.index_from_dataset(
    tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
)

<tensorflow_recommenders.layers.factorized_top_k.BruteForce at 0x74c7b81d66b0>

# Brute force recommendation

In [14]:
_, titles = brute_force_lots(model.user_model(np.array(["42"])), k=3)

print(f"Top recommendations: {titles[0]}")

%timeit _, titles = brute_force_lots(model.user_model(np.array(["42"])), k=3)

Top recommendations: [b'Rudy (1993)' b'When a Man Loves a Woman (1994)'
 b'Affair to Remember, An (1957)']
1.64 ms ± 3.24 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


# Approximate recommendation using ScaNN

In [15]:
scann = tfrs.layers.factorized_top_k.ScaNN(
    num_reordering_candidates=500,
    num_leaves_to_search=30
)
scann.index_from_dataset(
    tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
)

2024-10-12 16:21:59.695398: I scann/partitioning/partitioner_factory_base.cc:59] Size of sampled dataset for training partition: 100347
2024-10-12 16:21:59.747569: I ./scann/partitioning/kmeans_tree_partitioner_utils.h:89] PartitionerFactory ran in 52.139029ms.


<tensorflow_recommenders.layers.factorized_top_k.ScaNN at 0x74c7b819aad0>

In [16]:
_, titles = scann(model.user_model(np.array(["42"])), k=3)

print(f"Top recommendations: {titles[0]}")

%timeit _, titles = scann(model.user_model(np.array(["42"])), k=3)

Top recommendations: [b'Rudy (1993)' b'When a Man Loves a Woman (1994)'
 b'Affair to Remember, An (1957)']
11.3 ms ± 78.4 μs per loop (mean ± std. dev. of 7 runs, 100 loops each)


# Performance evaluation

In [17]:
# Override the existing streaming candidate source.
model.task.factorized_metrics = tfrs.metrics.FactorizedTopK(
    candidates=tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
)
# Need to recompile the model for the changes to take effect.
model.compile()

%time baseline_result = model.evaluate(test.batch(8192), return_dict=True, verbose=False)

CPU times: user 15min 48s, sys: 1min 32s, total: 17min 20s
Wall time: 2min 30s


In [18]:
model.task.factorized_metrics = tfrs.metrics.FactorizedTopK(
    candidates=scann
)
model.compile()

# We can use a much bigger batch size here because ScaNN evaluation
# is more memory efficient.
%time scann_result = model.evaluate(test.batch(8192), return_dict=True, verbose=False)

CPU times: user 7.44 s, sys: 412 ms, total: 7.85 s
Wall time: 1.53 s


In [19]:
print(f"Brute force top-100 accuracy: {baseline_result['factorized_top_k/top_100_categorical_accuracy']:.2f}")
print(f"ScaNN top-100 accuracy:       {scann_result['factorized_top_k/top_100_categorical_accuracy']:.2f}")

Brute force top-100 accuracy: 0.12
ScaNN top-100 accuracy:       0.10


# Deploying approximate model

In [20]:
lots_of_movies_embeddings

<_ConcatenateDataset element_spec=TensorSpec(shape=(None, 32), dtype=tf.float32, name=None)>

In [21]:
# We re-index the ScaNN layer to include the user embeddings in the same model.
# This way we can give the saved model raw features and get valid predictions
# back.
scann = tfrs.layers.factorized_top_k.ScaNN(model.user_model, num_reordering_candidates=1000)
scann.index_from_dataset(
    tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
)

# Need to call it to set the shapes.
_ = scann(np.array(["42"]))

with tempfile.TemporaryDirectory() as tmp:
  path = os.path.join(tmp, "model")
  tf.saved_model.save(
      scann,
      path,
      options=tf.saved_model.SaveOptions(namespace_whitelist=["Scann"])
  )

  loaded = tf.saved_model.load(path)

2024-10-12 16:24:46.714249: I tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence
2024-10-12 16:24:47.047606: I scann/partitioning/partitioner_factory_base.cc:59] Size of sampled dataset for training partition: 100347
2024-10-12 16:24:47.093281: I ./scann/partitioning/kmeans_tree_partitioner_utils.h:89] PartitionerFactory ran in 45.643806ms.










INFO:tensorflow:Assets written to: /tmp/tmpc9qqiz9n/model/assets


INFO:tensorflow:Assets written to: /tmp/tmpc9qqiz9n/model/assets


In [22]:
_, titles = loaded(tf.constant(["42"]))

print(f"Top recommendations: {titles[0][:3]}")

Top recommendations: [b'Rudy (1993)' b'When a Man Loves a Woman (1994)'
 b'Affair to Remember, An (1957)']


# Tuning Scann

In [23]:
# Process queries in groups of 1000; processing them all at once with brute force
# may lead to out-of-memory errors, because processing a batch of q queries against
# a size-n dataset takes O(nq) space with brute force.
titles_ground_truth = tf.concat([
  brute_force_lots(queries, k=10)[1] for queries in
  test.batch(1000).map(lambda x: model.user_model(x["user_id"]))
], axis=0)


In [24]:
# Get all user_id's as a 1d tensor of strings
test_flat = np.concatenate(list(test.map(lambda x: x["user_id"]).batch(1000).as_numpy_iterator()), axis=0)

# ScaNN is much more memory efficient and has no problem processing the whole
# batch of 20000 queries at once.
_, titles = scann(test_flat, k=10)


In [25]:
def compute_recall(ground_truth, approx_results):
  return np.mean([
      len(np.intersect1d(truth, approx)) / len(truth)
      for truth, approx in zip(ground_truth, approx_results)
  ])


In [26]:
print(f"Recall: {compute_recall(titles_ground_truth, titles):.3f}")
%timeit -n 1000 scann(np.array(["42"]), k=10)

Recall: 0.886
11.3 ms ± 22.7 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


# Different Scann hyperparams

In [27]:
scann2 = tfrs.layers.factorized_top_k.ScaNN(
    model.user_model, 
    num_leaves=1000,
    num_leaves_to_search=100,
    num_reordering_candidates=1000)
scann2.index_from_dataset(
    tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
)

_, titles2 = scann2(test_flat, k=10)

print(f"Recall: {compute_recall(titles_ground_truth, titles2):.3f}")
%timeit -n 1000 scann2(np.array(["42"]), k=10)

2024-10-12 16:26:22.075154: I scann/partitioning/partitioner_factory_base.cc:59] Size of sampled dataset for training partition: 100347
2024-10-12 16:26:22.105538: W scann/utils/gmm_utils.cc:920] Could not normalize centroid due to zero norm or empty or zero-weight partition.
2024-10-12 16:26:22.244429: I ./scann/partitioning/kmeans_tree_partitioner_utils.h:89] PartitionerFactory ran in 169.235865ms.


Recall: 0.949
11.3 ms ± 34.7 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [None]:
scann3 = tfrs.layers.factorized_top_k.ScaNN(
    model.user_model,
    num_leaves=1000,
    num_leaves_to_search=70,
    num_reordering_candidates=400)
scann3.index_from_dataset(
    tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
)

_, titles3 = scann3(test_flat, k=10)
print(f"Recall: {compute_recall(titles_ground_truth, titles3):.3f}")
%timeit -n 1000 scann3(np.array(["42"]), k=10)

2024-10-12 16:27:54.733708: I scann/partitioning/partitioner_factory_base.cc:59] Size of sampled dataset for training partition: 100347
2024-10-12 16:27:54.907381: I ./scann/partitioning/kmeans_tree_partitioner_utils.h:89] PartitionerFactory ran in 173.629372ms.


Recall: 0.937
