In [11]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
import tfx
print('TFX version: {}'.format(tfx.__version__))
import tensorflow_recommenders as tfrs
print('Tensorflow Recommenders version: {}'.format(tfrs.__version__))

TensorFlow version: 2.5.2
TFX version: 1.0.0
Tensorflow Recommenders version: v0.5.2


In [13]:
from typing import Dict, Text

## Set up variables

In [3]:
import os

PIPELINE_NAME = 'TFRS-ranking'

# Directory where MovieLens 100K rating data lives
DATA_ROOT = os.path.join('data', PIPELINE_NAME)
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

### Prepare example data

In [4]:
!wget https://files.grouplens.org/datasets/movielens/ml-100k.zip
!mkdir -p {DATA_ROOT}
!unzip ml-100k.zip
!echo 'userId,movieId,rating,timestamp' > {DATA_ROOT}/ratings.csv
!sed 's/\t/,/g' ml-100k/u.data >> {DATA_ROOT}/ratings.csv

--2022-04-05 14:34:34--  https://files.grouplens.org/datasets/movielens/ml-100k.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4924029 (4.7M) [application/zip]
Saving to: ‘ml-100k.zip’


2022-04-05 14:34:35 (7.28 MB/s) - ‘ml-100k.zip’ saved [4924029/4924029]

Archive:  ml-100k.zip
   creating: ml-100k/
  inflating: ml-100k/allbut.pl       
  inflating: ml-100k/mku.sh          
  inflating: ml-100k/README          
  inflating: ml-100k/u.data          
  inflating: ml-100k/u.genre         
  inflating: ml-100k/u.info          
  inflating: ml-100k/u.item          
  inflating: ml-100k/u.occupation    
  inflating: ml-100k/u.user          
  inflating: ml-100k/u1.base         
  inflating: ml-100k/u1.test         
  inflating: ml-100k/u2.base         
  inflating: ml-100k/u2.test         
  inflating: ml-100k/u3.base  

In [5]:
!head {DATA_ROOT}/ratings.csv

userId,movieId,rating,timestamp
196,242,3,881250949
186,302,3,891717742
22,377,1,878887116
244,51,2,880606923
166,346,1,886397596
298,474,4,884182806
115,265,2,881171488
253,465,5,891628467
305,451,3,886324817


TFX pipelines are defined using Python APIs. We will define a pipeline which consists of following three components.

- CsvExampleGen: Reads in data files and convert them to TFX internal format for further processing. There are multiple ExampleGens for various formats. In this tutorial, we will use CsvExampleGen which takes CSV file input.

- Trainer: Trains an ML model. Trainer component requires a model definition code from users. You can use TensorFlow APIs to specify how to train a model and save it in a _savedmodel format.

- Pusher: Copies the trained model outside of the TFX pipeline. Pusher component can be thought of an deployment process of the trained ML model.

### Write model training code

In [6]:
_trainer_module_file = 'tfrs_ranking_trainer.py'
%%writefile {_trainer_module_file}

In [24]:
from typing import Dict, Text
from typing import List

import numpy as np
import tensorflow as tf

from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_recommenders as tfrs
from tensorflow_transform.tf_metadata import schema_utils
from tfx_bsl.public import tfxio

_FEATURE_KEYS = ['userId', 'movieId']
_LABEL_KEY = 'rating'

_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}

In [25]:
class RankingModel(tf.keras.Model):
    def __init__(self):
        super().__init__()
        embedding_dimension = 32

        unique_user_ids = np.array(range(943)).astype(str)
        unique_movie_ids = np.array(range(1682)).astype(str)

        # Compute embeddings for users.
        self.user_embeddings = tf.keras.Sequential([
            tf.keras.layers.Input(shape=(1,), name='userId', dtype=tf.int64),
            tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
            tf.keras.layers.StringLookup(
                vocabulary=unique_user_ids, mask_token=None),
            tf.keras.layers.Embedding(
                len(unique_user_ids) + 1, embedding_dimension)
        ])
        
        # Compute embeddings for movies.
        self.movie_embeddings = tf.keras.Sequential([
            tf.keras.layers.Input(shape=(1,), name='movieId', dtype=tf.int64),
            tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
            tf.keras.layers.StringLookup(
                vocabulary=unique_movie_ids, mask_token=None),
            tf.keras.layers.Embedding(
                len(unique_movie_ids) + 1, embedding_dimension)
        ])

        # Compute predictions.
        self.ratings = tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(1)
        ])
    
    def call(self, inputs):
        user_id, movie_id = inputs

        user_embedding = self.user_embeddings(user_id)
        movie_embedding = self.movie_embeddings(movie_id)

        return self.ratings(tf.concat([user_embedding, movie_embedding], axis=2))

In [26]:
class MovielensModel(tfrs.models.Model):

	def __init__(self):
		super().__init__()
		self.ranking_model: tf.keras.Model = RankingModel()
		self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
			loss = tf.keras.losses.MeanSquaredError(),
			metrics = [tf.keras.metrics.RootMeanSquaredError()]
		)

	def call(self, features: Dict[str, tf.Tensor]) -> tf.Tensor:
		return self.ranking_model((features['userId'], features['movieId']))

	def compute_loss(self,
	                 features: Dict[Text, tf.Tensor],
	                 training = False) -> tf.Tensor:
		labels = features[1]
		rating_predictions = self(features[0])

		# The task computes the loss and the metrics.
		return self.task(labels = labels, predictions = rating_predictions)

In [29]:
def _input_fn(file_pattern,
              data_accessor,
              schema: schema_pb2.Schema,
              batch_size: int = 256) -> tf.data.Dataset:
    
	return data_accessor.tf_dataset_factory(
		file_pattern,
		tfxio.TensorFlowDatasetOptions(
			batch_size = batch_size, label_key = _LABEL_KEY
		),
		schema = schema
	).repeat()


def _build_keras_model() -> tf.keras.Model:
	return MovielensModel()


# TFX Trainer will call this function.
def run_fn(fn_args):
	"""Train the model based on given args.
  
	Args:
	  fn_args: Holds args used to train the model as name/value pairs.
	"""
	schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

	train_dataset = _input_fn(
		fn_args.train_files, fn_args.data_accessor, schema, batch_size = 8192
	)
	eval_dataset = _input_fn(
		fn_args.eval_files, fn_args.data_accessor, schema, batch_size = 4096
	)

	model = _build_keras_model()

	model.compile(optimizer = tf.keras.optimizers.Adagrad(learning_rate = 0.1))

	model.fit(
		train_dataset,
		steps_per_epoch = fn_args.train_steps,
		epochs = 3,
		validation_data = eval_dataset,
		validation_steps = fn_args.eval_steps
	)

	model.save(fn_args.serving_model_dir)

### Write a pipeline definition

In [31]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str):
	"""Creates a three component pipeline with TFX."""
	# Brings data into the pipeline.
	example_gen = tfx.components.CsvExampleGen(input_base = data_root)

	# Uses user-provided Python function that trains a model.
	trainer = tfx.components.Trainer(
		module_file = module_file,
		examples = example_gen.outputs['examples'],
		train_args = tfx.proto.TrainArgs(num_steps = 12),
		eval_args = tfx.proto.EvalArgs(num_steps = 24)
	)

	# Pushes the model to a filesystem destination.
	pusher = tfx.components.Pusher(
		model = trainer.outputs['model'],
		push_destination = tfx.proto.PushDestination(
			filesystem = tfx.proto.PushDestination.Filesystem(
				base_directory = serving_model_dir
			)
		)
	)

	# Following three components will be included in the pipeline.
	components = [
		example_gen,
		trainer,
		pusher,
	]

	return tfx.dsl.Pipeline(
		pipeline_name = pipeline_name,
		pipeline_root = pipeline_root,
		metadata_connection_config = tfx.orchestration.metadata
			.sqlite_metadata_connection_config(metadata_path),
		components = components
	)

### Run the pipeline

TFX supports multiple orchestrators to run pipelines. In this tutorial we will use LocalDagRunner which is included in the TFX Python package and runs pipelines on local environment.

Now we create a LocalDagRunner and pass a Pipeline object created from the function we already defined.

The pipeline runs directly and you can see logs for the progress of the pipeline including ML model training.

In [None]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

You should see "INFO:absl:Component Pusher is finished." at the end of the logs if the pipeline finished successfully. Because Pusher component is the last component of the pipeline.

The pusher component pushes the trained model to the SERVING_MODEL_DIR which is the serving_model/TFRS-ranking directory if you did not change the variables in the previous steps. You can see the result from the file browser in the left-side panel in Colab, or using the following command:

In [None]:
# List files in created model directory.
!ls -R {SERVING_MODEL_DIR}

In [None]:
import glob
# Load the latest model for testing
loaded = tf.saved_model.load(max(glob.glob(os.path.join(SERVING_MODEL_DIR, '*/')), key=os.path.getmtime))
print(loaded({'userId': [[42]], 'movieId': [[15]]}).numpy())