# Preparing Federated Dependency

In [4]:
# Installing TensorFlow Federated
!pip install --quiet tensorflow-federated
# !pip install --quiet tensorflow_recommenders
# !pip install --quiet nest-asyncio


In [None]:
# Import other stuff
import nest_asyncio
nest_asyncio.apply()

import collections
import functools
import io
import os
import requests
import zipfile
from typing import List, Optional, Tuple, Union, Sequence

import tensorflow_recommenders as tfrs
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_federated as tff
from tensorflow_recommenders import layers

np.random.seed(42)

ModuleNotFoundError: No module named 'tensorflow_federated'

# Dataset preparation

In [None]:
def download_movielens_data(dataset_path):
  """Downloads and copies MovieLens data to local /tmp directory."""
  if dataset_path.startswith('http'):
    r = requests.get(dataset_path)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(path='/tmp')
  else:
    tf.io.gfile.makedirs('/tmp/ml-1m/')
    for filename in ['ratings.dat', 'movies.dat', 'users.dat']:
      tf.io.gfile.copy(
          os.path.join(dataset_path, filename),
          os.path.join('/tmp/ml-1m/', filename),
          overwrite=True)

download_movielens_data('http://files.grouplens.org/datasets/movielens/ml-1m.zip')

In [None]:
# Datasets download and preparation
DATA_DIR = "/tmp"

# Loads up the dataset using pandas
ratings_df = pd.read_csv(os.path.join(DATA_DIR, "ml-1m", "ratings.dat"), 
                         sep='::', 
                         names=["UserID", "MovieID", "Rating", "Timestamp"],
                         engine="python",
                         encoding='latin-1')

movies_df = pd.read_csv(os.path.join(DATA_DIR, "ml-1m", "movies.dat"), 
                         sep='::', 
                         names=["MovieID", "Title", "Genres"],
                         engine="python",
                         encoding='latin-1')

# Mapping dictionary in a new index
movie_mapping = {old_movie: new_movie for new_movie, old_movie in enumerate(ratings_df.MovieID.astype("category").cat.categories)}
user_mapping = {old_user: new_user for new_user, old_user in enumerate(ratings_df.UserID.astype("category").cat.categories)}

# Map each DataFrame consistently using the now-fixed mapping.
ratings_df.MovieID = ratings_df.MovieID.map(movie_mapping)
ratings_df.UserID = ratings_df.UserID.map(user_mapping)
movies_df.MovieID = movies_df.MovieID.map(movie_mapping)

# movies = movies_df.MovieID.map(movie_mapping)

# Remove nulls result from movies_df
movies_df = movies_df[pd.notnull(movies_df.MovieID)]

In [None]:
# Sanity check of number of users and movies
unique_movie = np.unique(np.concatenate([list(ratings_df.MovieID)])).astype(str)
unique_ids = np.unique(np.concatenate([list(ratings_df.UserID)])).astype(str)
print('Num users:', unique_ids.shape[0])
print('Num movies:', unique_movie.shape[0])

Num users: 6040
Num movies: 3706


In [None]:
# Define Embedding Dimension (arbitrary)
embedding_dimension = 32

In [None]:
# Modelling for 'candidate'
candidate_model = tf.keras.Sequential([
    tf.keras.layers.StringLookup(vocabulary=unique_movie, mask_token=None),
    tf.keras.layers.Embedding(len(unique_movie) + 1, embedding_dimension),
])

In [None]:
class FactorizedTopK(tf.keras.layers.Layer):
  """Computes metrics for across top K candidates surfaced by a retrieval model.
  The default metric is top K categorical accuracy: how often the true candidate
   is in the top K candidates for a given query.
  """

  def __init__(
      self,
      candidates: Union[layers.factorized_top_k.TopK, tf.data.Dataset],
      ks: Sequence[int] = (1, 5, 10, 50, 100),
      name: str = "factorized_top_k",
  ) -> None:
    """Initializes the metric.
    Args:
      candidates: A layer for retrieving top candidates in response
        to a query, or a dataset of candidate embeddings from which
        candidates should be retrieved.
      ks: A sequence of values of `k` at which to perform retrieval evaluation.
      name: Optional name.
    """

    super().__init__(name=name)

    if isinstance(candidates, tf.data.Dataset):
      candidates = (
          layers.factorized_top_k.Streaming(k=max(ks))
          .index_from_dataset(candidates)
      )

    self._ks = ks
    self._candidates = candidates
    self._top_k_metrics = [
        tf.keras.metrics.Mean(
            name=f"{self.name}/top_{x}_categorical_accuracy"
        ) for x in ks
    ]

  def update_state(
      self,
      query_embeddings: tf.Tensor,
      true_candidate_embeddings: tf.Tensor,
      true_candidate_ids: Optional[tf.Tensor] = None
  ) -> tf.Operation:
    """Updates the metrics.
    Args:
      query_embeddings: [num_queries, embedding_dim] tensor of query embeddings.
      true_candidate_embeddings: [num_queries, embedding_dim] tensor of
        embeddings for candidates that were selected for the query.
      true_candidate_ids: Ids of the true candidates. If supplied, evaluation
        will be id-based: the supplied ids will be matched against the ids of
        the top candidates returned from the retrieval index, which should have
        been constructed with the appropriate identifiers.
        If not supplied, evaluation will be score-based: the score of the true
        candidate will be computed and compared with the scores returned from
        the index for the top candidates.
        Score-based evaluation is useful for when the true candidate is not
        in the retrieval index. Id-based evaluation is useful for when scores
        returned from the index are not directly comparable to scores computed
        by multiplying the candidate and embedding vector. For example, scores
        returned by ScaNN are quantized, and cannot be compared to
        full-precision scores.
    Returns:
      Update op. Only used in graph mode.
    """

    if true_candidate_ids is None and not self._candidates.is_exact():
      raise ValueError(
          f"The candidate generation layer ({self._candidates}) does not return "
          "exact results. To perform evaluation using that layer, you must "
          "supply `true_candidate_ids`, which will be checked against "
          "the candidate ids returned from the candidate generation layer."
      )

    positive_scores = tf.reduce_sum(
        query_embeddings * true_candidate_embeddings, axis=1, keepdims=True)

    top_k_predictions, retrieved_ids = self._candidates(
        query_embeddings, k=max(self._ks))

    update_ops = []

    if true_candidate_ids is not None:
      # We're using ID-based evaluation.
      if len(true_candidate_ids.shape) == 1:
        true_candidate_ids = tf.expand_dims(true_candidate_ids, 1)

      # Deal with ScaNN using `NaN`-padding by converting its
      # `NaN` scores into minimum scores.
      nan_padding = tf.math.is_nan(top_k_predictions)
      top_k_predictions = tf.where(
          nan_padding,
          tf.ones_like(top_k_predictions) * tf.float32.min,
          top_k_predictions
      )

      # Check sortedness.
      is_sorted = (
          top_k_predictions[:, :-1] - top_k_predictions[:, 1:]
      )
      tf.debugging.assert_non_negative(
          is_sorted, message="Top-K predictions must be sorted."
      )

      # Check whether the true candidates were retrieved, accounting
      # for padding.
      ids_match = tf.cast(
          tf.math.logical_and(
              tf.math.equal(true_candidate_ids, retrieved_ids),
              tf.math.logical_not(nan_padding)
          ),
          tf.float32
      )

      for k, metric in zip(self._ks, self._top_k_metrics):
        # By slicing until :k we assume scores are sorted.
        # Clip to only count multiple matches once.
        match_found = tf.clip_by_value(
            tf.reduce_sum(ids_match[:, :k], axis=1, keepdims=True),
            0.0, 1.0
        )
        update_ops.append(metric.update_state(match_found))
    else:
      # Score-based evaluation.
      y_pred = tf.concat([positive_scores, top_k_predictions], axis=1)

      for k, metric in zip(self._ks, self._top_k_metrics):
        targets = tf.zeros(tf.shape(positive_scores)[0], dtype=tf.int32)
        top_k_accuracy = tf.math.in_top_k(
            targets=targets,
            predictions=y_pred,
            k=k
        )
        update_ops.append(metric.update_state(top_k_accuracy))

    return tf.group(update_ops)

  def reset_states(self) -> None:
    """Resets the metrics."""

    for metric in self.metrics:
      metric.reset_states()

  def result(self) -> List[tf.Tensor]:
    """Returns a list of metric results."""

    return [metric.result() for metric in self.metrics]

# Preprocessing Datasets

In [None]:
# Helper function to create dataset based on user list
def create_tf_datasets(ratings_df: pd.DataFrame,
                       batch_size: int = 1,
                       max_examples_per_user: Optional[int] = None,
                       max_clients: Optional[int] = None) -> List[tf.data.Dataset]:
  num_users = len(set(ratings_df.UserID))
  # Optionally limit to `max_clients` to speed up data loading.
  if max_clients is not None:
    num_users = min(num_users, max_clients)

  def rating_batch_map_fn(rating_batch):
    # Each example looks like: {x: movie_id, y: rating}.
    # We won't need the UserID since each client will only look at their own
    # data.
    return collections.OrderedDict([
        ("x", tf.cast(rating_batch[:, 1:2], tf.int64)),
        ("y", tf.cast(rating_batch[:, 2:3], tf.float32))
    ])

  tf_datasets = []
  for user in range(num_users):
    # Get subset of ratings_df belonging to a particular user.
    user_ratings_df = ratings_df[ratings_df.UserID == user]

    tf_dataset = tf.data.Dataset.from_tensor_slices(user_ratings_df)

    # Define preprocessing operations.
    tf_dataset = tf_dataset.take(max_examples_per_user).shuffle(
        buffer_size=max_examples_per_user, seed=42).batch(batch_size).map(
        rating_batch_map_fn,
        num_parallel_calls=tf.data.experimental.AUTOTUNE)
    tf_datasets.append(tf_dataset)

  return tf_datasets

def split_tf_datasets(
    tf_datasets: List[tf.data.Dataset],
    train_fraction: float = 0.8,
    val_fraction: float = 0.1,
) -> Tuple[List[tf.data.Dataset], List[tf.data.Dataset], List[tf.data.Dataset]]:
  np.random.seed(42)
  np.random.shuffle(tf_datasets)

  train_idx = int(len(tf_datasets) * train_fraction)
  val_idx = int(len(tf_datasets) * (train_fraction + val_fraction))

  # Note that the val and test data contains completely different users, not
  # just unseen ratings from train users.
  return (tf_datasets[:train_idx], tf_datasets[train_idx:val_idx],
          tf_datasets[val_idx:])

In [None]:
# We limit the number of clients to speed up dataset creation. Feel free to pass
# max_clients=None to load all clients' data.
tf_datasets = create_tf_datasets(
    ratings_df=ratings_df,
    batch_size=5,
    max_examples_per_user=300,
    max_clients=2000)

# Split the ratings into training/val/test by client.
tf_train_datasets, tf_val_datasets, tf_test_datasets = split_tf_datasets(
    tf_datasets,
    train_fraction=0.8,
    val_fraction=0.1)

# Defining Federated Model

In [None]:
# Define a single Keras layer representing an embedding for a single user
class UserEmbedding(tf.keras.layers.Layer):
  def __init__(self, num_latent_factors, **kwargs):
    super().__init__(**kwargs)
    self.num_latent_factors = num_latent_factors

  def build(self, input_shape):
    self.embedding = self.add_weight(
        shape=(1, self.num_latent_factors),
        initializer='uniform',
        dtype=tf.float32,
        name='UserEmbeddingKernel')
    super().build(input_shape)

  def call(self, inputs):
    return self.embedding

  def compute_output_shape(self):
    return (1, self.num_latent_factors)

In [None]:
def get_matrix_factorization_model(
    num_items: int,
    num_latent_factors: int) -> tff.learning.reconstruction.Model:
  """Defines a Keras matrix factorization model."""
  # Layers with variables will be partitioned into global and local layers.
  # We'll pass this to `tff.learning.reconstruction.from_keras_model`.
  global_layers = []
  local_layers = []

  # Extract the item embedding.
  # This will be the global layers and the layers stored on the centralized setting
  item_input = tf.keras.layers.Input(shape=[1], name='Item')
  item_embedding_layer = tf.keras.layers.Embedding(
      num_items,
      num_latent_factors,
      name='ItemEmbedding')
  global_layers.append(item_embedding_layer)
  flat_item_vec = tf.keras.layers.Flatten(name='FlattenItems')(
      item_embedding_layer(item_input))

  # Extract the user embedding.
  # This will be the local layers and the layers stored on each clients setting
  user_embedding_layer = UserEmbedding(
      num_latent_factors,
      name='UserEmbedding')
  local_layers.append(user_embedding_layer)

  # The item_input never gets used by the user embedding layer,
  # but this allows the model to directly use the user embedding.
  flat_user_vec = user_embedding_layer(item_input)

  # Compute the dot product between the user embedding, and the item one.
  pred = tf.keras.layers.Dot(
      1, normalize=False, name='Dot')([flat_user_vec, flat_item_vec])

  input_spec = collections.OrderedDict(
      x=tf.TensorSpec(shape=[None, 1], dtype=tf.int64),
      y=tf.TensorSpec(shape=[None, 1], dtype=tf.float32))
  
  model = tf.keras.Model(inputs=item_input, outputs=pred)

  return tff.learning.reconstruction.from_keras_model(
      keras_model=model,
      global_layers=global_layers,
      local_layers=local_layers,
      input_spec=input_spec)

model_fn = functools.partial(
    get_matrix_factorization_model,
    num_items=len(set(ratings_df.MovieID)),
    num_latent_factors=embedding_dimension)

In [None]:
K_Factor = (5, 10, 20, 40, 60, 80, 100) # Change this according to desired values

fact_topK = FactorizedTopK(
  candidates=movies_df.MovieID.map(candidate_model),
  ks = K_Factor
)

loss_fn = lambda: tfrs.tasks.Retrieval(metrics=fact_topK)
metrics_fn = lambda: [fact_topK]

UnimplementedError: ignored

# Training Federated Reconstruction

In [None]:
# Federated Training Process
training_process = tff.learning.reconstruction.build_training_process(
    model_fn=model_fn,
    loss_fn=loss_fn,
    metrics_fn=metrics_fn,
    server_optimizer_fn=lambda: tf.keras.optimizers.Adagrad(learning_rate=1),
    client_optimizer_fn=lambda: tf.keras.optimizers.Adagrad(learning_rate=0.5),
    reconstruction_optimizer_fn=lambda: tf.keras.optimizers.Adagrad(learning_rate=0.1))

# Federated Evaluation Process
evaluation_computation = tff.learning.reconstruction.build_federated_evaluation(
    model_fn,
    loss_fn=loss_fn,
    metrics_fn=metrics_fn,
    reconstruction_optimizer_fn=lambda: tf.keras.optimizers.Adagrad(learning_rate=0.1))

In [None]:
# Initialize Model State
# This model has yet to be trained so its states are randomly initialized
state = training_process.initialize()
print(state.model)
print('Item variables shape:', state.model.trainable[0].shape)

In [None]:
# Testing evaluation pipelines on evaluating initial state model
eval_metrics = evaluation_computation(state.model, tf_val_datasets)
print('Initial Eval:', eval_metrics['eval'])

In [None]:
# Training loops
NUM_EPOCHS = 50
NUM_CLIENT_PER_EPOCH = 100

train_losses = []
train_accs = []

# This may take a couple minutes to run.
print(f"Training with {NUM_CLIENT_PER_EPOCH} clients per sample of Epoch. Total number of epochs: {NUM_EPOCHS}")
for i in range(NUM_EPOCHS):
  federated_train_data = np.random.choice(tf_train_datasets, size=NUM_CLIENT_PER_EPOCH, replace=False).tolist()
  state, metrics = training_process.next(state, federated_train_data)
  print(f'Epoch {i+1}/{NUM_EPOCHS}:', metrics['train'])
  train_losses.append(metrics['train']['loss'])
  train_accs.append(metrics['train']['rating_accuracy'])


eval_metrics = evaluation_computation(state.model, tf_val_datasets)
print('Final Eval:', eval_metrics['eval'])

# Model Evaluation

In [None]:
# Evaluating model
fig, ax = plt.subplots(1, 2)
fig.suptitle('Federated model evaluation')
fig.set_size_inches(15, 6)

ax[0].plot(range(NUM_EPOCHS), train_losses)
ax[0].set_ylabel('Train Loss')
ax[0].set_xlabel('Epochs')
ax[0].set_title('Train Loss')

ax[1].plot(range(NUM_EPOCHS), train_accs)
ax[1].set_ylabel('Train Accuracy')
ax[1].set_xlabel('Epochs')
ax[1].set_title('Train Accuracy')

plt.show()