# MovieLens example
This example is based on the TFRS movie retrieval example you can find here: https://www.tensorflow.org/recommenders/examples/basic_retrieval.


In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys
sys.path.append('../src/')

In [3]:
import os
import pprint
import tempfile

from typing import Dict, Text

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs

from tf_tabular.builder import InputBuilder
from tf_tabular.utils import get_vocab

In [4]:
# Ratings data.
ratings = tfds.load("movielens/100k-ratings", split="train")
# Features of all the available movies.
movies = tfds.load("movielens/100k-movies", split="train")

## View dataset examples

In [143]:
for x in ratings.take(1).as_numpy_iterator():
    pprint.pprint(x)

{'bucketized_user_age': 45.0,
 'movie_genres': array([7]),
 'movie_id': b'357',
 'movie_title': b"One Flew Over the Cuckoo's Nest (1975)",
 'raw_user_age': 46.0,
 'timestamp': 879024327,
 'user_gender': True,
 'user_id': b'138',
 'user_occupation_label': 4,
 'user_occupation_text': b'doctor',
 'user_rating': 4.0,
 'user_zip_code': b'53211'}


2024-04-22 11:13:21.455280: W tensorflow/core/kernels/data/cache_dataset_ops.cc:858] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.


In [144]:
for x in movies.take(1).as_numpy_iterator():
    pprint.pprint(x)

{'movie_genres': array([4]),
 'movie_id': b'1681',
 'movie_title': b'You So Crazy (1994)'}


2024-04-22 11:13:21.561915: W tensorflow/core/kernels/data/cache_dataset_ops.cc:858] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.


## Preprocessing

In [5]:
ratings = ratings.map(lambda x: {
    "movie_title": x["movie_title"],
    "movie_genres": x["movie_genres"],
    "user_id": x["user_id"],
})
movies = movies.map(lambda x: {"movie_title": x["movie_title"],
                               "movie_genres": x["movie_genres"]
                               })


In [6]:
# movie_titles = movies.map(lambda x: x["movie_title"]).batch(1_000)

user_ids = ratings.map(lambda x: x["user_id"]).batch(10_000)
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

all_movies = ratings.map(lambda x: x["movie_title"]).batch(10_000)
all_titles = np.concatenate(list(all_movies))
unique_movie_titles, movie_counts = np.unique(all_titles, return_counts=True)

genres = movies.map(lambda x: x["movie_genres"])
unique_movie_genres = np.unique(np.concatenate(list(genres)))

display(unique_movie_titles[:10])



array([b"'Til There Was You (1997)", b'1-900 (1994)',
       b'101 Dalmatians (1996)', b'12 Angry Men (1957)', b'187 (1997)',
       b'2 Days in the Valley (1996)',
       b'20,000 Leagues Under the Sea (1954)',
       b'2001: A Space Odyssey (1968)',
       b'3 Ninjas: High Noon At Mega Mountain (1998)',
       b'39 Steps, The (1935)'], dtype=object)

In [7]:
total_count = np.sum(movie_counts)
normalized_counts = movie_counts / total_count
sampling_dict = {}
for i, key in enumerate(unique_movie_titles):
    sampling_dict[key] = normalized_counts[i]
probs = np.array([sampling_dict[key] for key in all_titles], dtype=np.float32)

### Add sampling probability

In [8]:

probs = tf.data.Dataset.from_tensor_slices(probs)  #.map(lambda x: tf.cast(x, tf.float32))
ratings = tf.data.Dataset.zip(ratings, probs).map(lambda x, y: dict(x, **{"sampling_prob": y}))

### Shuffle and split dataset

In [9]:
tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)


## Build model

In [11]:

class MovielensModel(tfrs.Model):
    def __init__(self, user_model, movie_model):
        super().__init__()
        self.movie_model: tf.keras.Model = movie_model
        self.user_model: tf.keras.Model = user_model

    def prepare_task(self, movies):
        id_candidates = (movies.ragged_batch(1024)
                         .prefetch(tf.data.AUTOTUNE)
                         .cache()
                         .map(lambda movie: (movie["movie_title"], self.movie_model(movie))))

        metrics = tfrs.metrics.FactorizedTopK(
            candidates=tfrs.layers.factorized_top_k.Streaming(k=100).index_from_dataset(id_candidates),
            ks=[1,5,100],
            name='factk'
        )
        loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True,
                                                       reduction=tf.keras.losses.Reduction.SUM_OVER_BATCH_SIZE)
        task = tfrs.tasks.Retrieval(
            metrics=metrics,
            batch_metrics=[tf.keras.metrics.AUC(from_logits=True), tf.keras.metrics.Recall(top_k=10)],
            # num_hard_negatives=2,
            remove_accidental_hits=True,
            loss=loss
        )
        self.task: tf.keras.layers.Layer = task

    def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
        # We pick out the user features and pass them into the user model.
        user_embeddings = self.user_model(features["user_id"])
        # And pick out the movie features and pass them into the movie model,
        # getting embeddings back.
        positive_movie_embeddings = self.movie_model({"movie_title": features["movie_title"],
                                                      "movie_genres": features["movie_genres"]
                                                      })

        # The task computes the loss and the metrics.
        return self.task(user_embeddings, positive_movie_embeddings,
                         candidate_ids=features["movie_title"],
                         candidate_sampling_probability=features["sampling_prob"],
                        )


In [12]:
from tensorflow.keras.layers import Dense
from tensorflow.keras import Model

vocabs = {"movie_title": unique_movie_titles,
          "movie_genres": unique_movie_genres}

embedding_dims = {"movie_title": 32,
                  "movie_genres": 32}

def build_model():
    input_builder = InputBuilder()
    input_builder.add_inputs_list(categoricals=["movie_title", "movie_genres"],
                                  vocabs=vocabs,
                                  multi_hots=["movie_genres"],
                                  embedding_dims=embedding_dims)
    inputs, output = input_builder.build_input_layers()
    x = Dense(32, activation=None)(output)
    return Model(inputs=inputs, outputs=x)

movie_model = build_model()

In [13]:
user_model = tf.keras.Sequential([
  tf.keras.layers.StringLookup(
      vocabulary=unique_user_ids, mask_token=None),
  # We add an additional embedding to account for unknown tokens.
  tf.keras.layers.Embedding(len(unique_user_ids) + 1, 32),
])


In [14]:
model = MovielensModel(user_model, movie_model)
model.prepare_task(movies)
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.003))


Cache the training and test data

In [15]:
cached_train = train.shuffle(100_000).ragged_batch(8192).cache()
cached_test = test.ragged_batch(4096).cache()


In [16]:
for b in cached_train.take(1):
    print(b)

{'movie_title': <tf.Tensor: shape=(8192,), dtype=string, numpy=
array([b'Godfather, The (1972)', b'Escape to Witch Mountain (1975)',
       b'Fargo (1996)', ..., b'Picture Perfect (1997)', b'Fear (1996)',
       b'Sleepers (1996)'], dtype=object)>, 'movie_genres': <tf.RaggedTensor [[0, 5, 7], [1, 3, 8], [5, 7, 16], ..., [4, 14], [16], [5, 7]]>, 'user_id': <tf.Tensor: shape=(8192,), dtype=string, numpy=array([b'424', b'429', b'53', ..., b'351', b'551', b'872'], dtype=object)>, 'sampling_prob': <tf.Tensor: shape=(8192,), dtype=float32, numpy=
array([0.00413, 0.0003 , 0.00508, ..., 0.00081, 0.00044, 0.00169],
      dtype=float32)>}


2024-04-22 11:36:11.238956: W tensorflow/core/kernels/data/cache_dataset_ops.cc:858] The calling iterator did not fully read the dataset being cached. In order to avoid unexpected truncation of the dataset, the partially cached contents of the dataset  will be discarded. This can happen if you have an input pipeline similar to `dataset.cache().take(k).repeat()`. You should use `dataset.take(k).cache().repeat()` instead.


In [17]:
model.fit(cached_train, epochs=10)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x285a80640>

### Evaluation

In [19]:
model.evaluate(cached_test, return_dict=True)



{'auc': 0.5015000104904175,
 'recall': 0.007199999876320362,
 'factk/top_1_categorical_accuracy': 0.006649999879300594,
 'factk/top_5_categorical_accuracy': 0.03020000085234642,
 'factk/top_100_categorical_accuracy': 0.35690000653266907,
 'loss': 8.013958930969238,
 'regularization_loss': 0,
 'total_loss': 8.013958930969238}