# Avro2TF TensorFlow 2 Keras Tutorial (MovieLens) (TFRecord) - TensorFlow Training

Assuming we have already generated the tensor data using **Avro2TF**. The data is stored in the ../data/train for training, ../validate ../test for validation and test.

Based on the meta data stored in the data/train/metadata/tensor_metadata.json

is designed to fill the gap of data processing before training to make your training data ready to be consumed by deep learning training frameworks. It reads raw user input data with any format supported by Spark to generate Avro or TFRecord tensorized training data.

Below is an interactive tutorial for using Avro2TF. In this tutorial, we will go through some exercises to get comfortable with Avro2TF and understand how it works.

In [None]:
import os
import tensorflow as tf
from tensorflow import feature_column
AUTO = tf.data.experimental.AUTOTUNE # used in tf.data.Dataset API

#### Tensor Meta Data
The meta data is generated from Avro2TF to describe the dataset. For example, user_features is a 571 dimension sparsed vector. We could use this information to form our input tensor shape

In [None]:
!cat ../data/metadata/tensor_metadata.json

In [None]:
_USER_FEATURE_DIM = 571
_MOVIE_FEATURE_DIM = 22
_NUM_USERS = 943
_NUM_MOVIES = 1682

#### Training data in TFRecord format generated by Avro2TF. 

(Jupyter notebook: "avro2tf_open_source_tutorial_text_tfrecord_data_processing")

In [None]:
tf_record_train_path = "../data/trainingData/part-r-*"
tf_record_validate_path = "../data/validationData/part-r-*"

#### Hash ID into multiple ids with a range

This will compressed the IDs into a smaller set of dimensions and then be used for embedding learning.

In [None]:
def get_dynamic_hash_ids_from_id(input_feature, num_hash_functions, hash_bucket_size, use_dense=False):
    if input_feature.dtype != tf.string:
        input_feature = tf.as_string(input_feature)
    hashed_ids = []
    for i in range(num_hash_functions):
        hashed_id = tf.strings.to_hash_bucket_strong(input_feature,
                                                    hash_bucket_size,
                                                    [i+7, i + 113],
                                                    name="hash_op_name")
        if use_dense:
            hashed_ids.append(tf.one_hot(hashed_id, hash_bucket_size))
        else:
            hashed_ids.append(hashed_id + i * hash_bucket_size)
    hashed_ids = tf.stack(hashed_ids, -1)
    return hashed_ids
    

#### TF-Record parsing function

In [None]:
def read_tfrecord(example):
    
    # Define context_features
    features = {
        'user_features_indices': tf.io.VarLenFeature(dtype=tf.int64),
        'user_features_values': tf.io.VarLenFeature(dtype=tf.float32),
        'movie_features_indices': tf.io.VarLenFeature(dtype=tf.int64),
        'movie_features_values': tf.io.VarLenFeature(dtype=tf.float32),
        'userId_hashed': tf.io.FixedLenFeature([4], dtype=tf.int64),
        'movieId_hashed': tf.io.FixedLenFeature([4], dtype=tf.int64),
        'user_embb_features': tf.io.FixedLenFeature([64], dtype=tf.float32),
        'movie_embb_features': tf.io.FixedLenFeature([64], dtype=tf.float32),
        'userId': tf.io.FixedLenFeature([], dtype=tf.int64),
        'movieId': tf.io.FixedLenFeature([], dtype=tf.int64),
        'response': tf.io.FixedLenFeature([], dtype=tf.int64) # Need to use tf.FixedLenFeature() for our "response"
    }
    
    # decode the TFRecord
    tf_example = tf.io.parse_single_example(example, features)
    
    # we can get user feature dimension from meta data
    tf_user_features = tf.compat.v1.sparse.merge(tf_example["user_features_indices"], 
                                                     tf_example["user_features_values"],
                                                     _USER_FEATURE_DIM)

    tf_movie_features = tf.compat.v1.sparse.merge(tf_example["movie_features_indices"], 
                                                     tf_example["movie_features_values"],
                                                     _MOVIE_FEATURE_DIM)

       
    user_id_hashed2 = get_dynamic_hash_ids_from_id(tf_example["userId"], 4, 1000)
    movie_id_hashed2 = get_dynamic_hash_ids_from_id(tf_example["movieId"], 4, 1000)
    
    
    tf_labelTensor = tf_example["response"]
    tf_labelTensor = tf.cast(tf_labelTensor, tf.int64)

    return (
        (user_id_hashed2, movie_id_hashed2, 
        tf.sparse.to_dense(tf_user_features), tf.sparse.to_dense(tf_movie_features),
        tf_example["userId"], tf_example["movieId"],
        tf_example["user_embb_features"], tf_example["movie_embb_features"]),
        tf_labelTensor)

#### Test parser with dataset

In [None]:
# read from TFRecords. For optimal performance, read from multiple
# TFRecord files at once and set the option experimental_deterministic = False
# to allow order-altering optimizations.

option_no_order = tf.data.Options()
option_no_order.experimental_deterministic = False
filenames = tf.io.gfile.glob(tf_record_train_path)
# filenames = tf.io.gfile.glob(tf_record_validate_path)

dataset = tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTO)
dataset = dataset.with_options(option_no_order)
dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTO)
dataset = dataset.shuffle(300)

In [None]:
for tf_example, tf_labelTensor in dataset.take(2):
    print("{} | {}\n".format(tf_example, tf_labelTensor))

#### Setup training strategy based on hardware

In [None]:
# Detect hardware
try:
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver() # TPU detection
except ValueError:
  tpu = None
  gpus = tf.config.experimental.list_logical_devices("GPU")
    
# Select appropriate distribution strategy for hardware
if tpu:
  tf.config.experimental_connect_to_cluster(tpu)
  tf.tpu.experimental.initialize_tpu_system(tpu)
  strategy = tf.distribute.experimental.TPUStrategy(tpu)
  print('Running on TPU ', tpu.master())  
elif len(gpus) > 0:
  strategy = tf.distribute.MirroredStrategy(gpus) # this works for 1 to multiple GPUs
  print('Running on ', len(gpus), ' GPU(s) ')
else:
  strategy = tf.distribute.get_strategy() # default strategy that works on CPU and single GPU
  print('Running on CPU')

# How many accelerators do we have ?
print("Number of accelerators: ", strategy.num_replicas_in_sync)

#### Define metric hook up

In [None]:
# Hook up to AUC
from sklearn.metrics import roc_auc_score

def auroc(y_true, y_pred):
    return tf.py_function(roc_auc_score, (y_true, y_pred), tf.double)

In [None]:
# Get some basic prediction stats
def mean_pred(y_true, y_pred):
    return tf.keras.backend.mean(y_pred)

#### Model loader and driver

In [None]:
def load_dataset(filenames):
  # read from TFRecords. For optimal performance, read from multiple
  # TFRecord files at once and set the option experimental_deterministic = False
  # to allow order-altering optimizations.

  option_no_order = tf.data.Options()
  option_no_order.experimental_deterministic = False

  dataset = tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTO)
  dataset = dataset.with_options(option_no_order)
  dataset = dataset.map(read_tfrecord, num_parallel_calls=AUTO)
  return dataset


def get_batched_dataset(filenames, batch_size, train=False):
  dataset = load_dataset(filenames)
  dataset = dataset.cache() # This dataset fits in RAM
  if train:
    # Best practices for Keras:
    # Training dataset: repeat then batch
    # Evaluation dataset: do not repeat
    dataset = dataset.repeat()
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(AUTO) # prefetch next batch while training (autotune prefetch buffer size)
  # should shuffle too but this dataset was well shuffled on disk already
  return dataset
  # source: Dataset performance guide: https://www.tensorflow.org/guide/performance/datasets



In [None]:
def run_model(model, tf_record_train_path, tf_record_validate_path, 
              batch_size, epoch, num_record_epoch):
    print(model.summary())
    # tf.keras.utils.plot_model(model, 'my_embedding.png')
    
    training_filenames = tf.io.gfile.glob(tf_record_train_path)
    validation_filenames = tf.io.gfile.glob(tf_record_validate_path)  # choose different file
    
    # instantiate the datasets
    training_dataset = get_batched_dataset(training_filenames, batch_size, train=True)
    validation_dataset = get_batched_dataset(validation_filenames, batch_size, train=False)

    history = model.fit(training_dataset, steps_per_epoch=num_record_epoch/batch_size, 
                        epochs=epoch,
                        validation_data=validation_dataset)
    

#### First model just utilize user and movie features

In [None]:
def m1_feature_cross():
    # user embedding with 1D pool, follow by adjustment layer
    user_hash = tf.keras.layers.Input(shape=(4), name='user_hash')

    # movie embedding with 1D pool, follow by adjustment layer
    movie_hash = tf.keras.layers.Input(shape=(4), name='movie_hash')
    
    # Raw IDs
    user_id = tf.keras.layers.Input(shape=(), name='user_id')
    movie_id = tf.keras.layers.Input(shape=(), name='movie_id')
    
    # user features & adjustment layer
    user_features = tf.keras.layers.Input(shape=(_USER_FEATURE_DIM), name='user_features')

    # movie features & adjustment layer
    movie_features = tf.keras.layers.Input(shape=(_MOVIE_FEATURE_DIM), name='movie_features')

    # user & movie feature cross
    umf = tf.keras.layers.concatenate([user_features, movie_features], axis=-1)
    
    umf2 = tf.keras.layers.Dense(32, activation='relu', name='umf2')(umf)


    # activation layer
    output = tf.keras.layers.Dense(1, activation='sigmoid', name='last')(umf2)

    model = tf.keras.models.Model(inputs=[user_hash,
                                          movie_hash,
                                          user_features, 
                                          movie_features,
                                         user_id, movie_id], 
                                      outputs=output)
    model.compile(optimizer='adam', loss='binary_crossentropy', 
                      metrics=['accuracy', mean_pred, auroc])
    return model    
    

run_model(m1_feature_cross(), 
          tf_record_train_path, 
          tf_record_validate_path, 
          batch_size=100, 
          epoch=30, 
          num_record_epoch=9000)    
    
    

#### Model 2: Dynamic embedding and max pool model.

In [None]:
def m2_feature_emb_cross():
    # user embedding with 1D pool, follow by adjustment layer
    user_hash = tf.keras.layers.Input(shape=(4), name='user_hash')

    user_emb = tf.keras.layers.Embedding(10000, 64, input_length=4)(user_hash)

    user_emb_max = tf.keras.layers.GlobalMaxPool1D()(user_emb)

    user_emb_max2 = tf.keras.layers.Dense(64, activation='relu', name='user_emb_max2')(user_emb_max)


    # movie embedding with 1D pool, follow by adjustment layer
    movie_hash = tf.keras.layers.Input(shape=(4), name='movie_hash')

    movie_emb = tf.keras.layers.Embedding(10000, 64, input_length=4)(movie_hash)

    movie_emb_max = tf.keras.layers.GlobalMaxPool1D()(movie_emb)

    movie_emb_max2 = tf.keras.layers.Dense(64, activation='relu', name='movie_emb_max2')(movie_emb_max)
    
    # Cross user & movie at ID level
    um_emb_interact = tf.math.multiply(user_emb_max2, movie_emb_max2)
#     um_emb_interact = tf.keras.layers.concatenate([user_emb_max, movie_emb_max], axis=-1)

    
    # Raw IDs
    user_id = tf.keras.layers.Input(shape=(), name='user_id')
    movie_id = tf.keras.layers.Input(shape=(), name='movie_id')
    
    # user features & adjustment layer
    user_features = tf.keras.layers.Input(shape=(_USER_FEATURE_DIM), name='user_features')

    # movie features & adjustment layer
    movie_features = tf.keras.layers.Input(shape=(_MOVIE_FEATURE_DIM), name='movie_features')

    # user & movie feature cross
    umf = tf.keras.layers.concatenate([user_features, movie_features, um_emb_interact], axis=-1)
    
    umf2 = tf.keras.layers.Dense(32, activation='relu', name='umf2')(umf)


    # activation layer
    output = tf.keras.layers.Dense(1, activation='sigmoid', name='last')(umf2)

    model = tf.keras.models.Model(inputs=[user_hash,
                                          movie_hash,
                                          user_features, 
                                          movie_features,
                                         user_id, movie_id], 
                                      outputs=output)
    model.compile(optimizer='adam', loss='binary_crossentropy', 
                      metrics=['accuracy', mean_pred, auroc])
    return model    
    

run_model(m2_feature_emb_cross(), 
          tf_record_train_path, 
          tf_record_validate_path, 
          batch_size=100, 
          epoch=30, 
          num_record_epoch=9000)    
    
    

#### Pre-trained embedding 

In [None]:
def m3_feature_pretrain_emb_cross():
    # user embedding with 1D pool, follow by adjustment layer
    user_hash = tf.keras.layers.Input(shape=(4), name='user_hash')

    user_emb = tf.keras.layers.Embedding(10000, 64, input_length=4)(user_hash)

    user_emb_max = tf.keras.layers.GlobalMaxPool1D()(user_emb)

    user_emb_max2 = tf.keras.layers.Dense(64, activation='relu', name='user_emb_max2')(user_emb_max)


    # movie embedding with 1D pool, follow by adjustment layer
    movie_hash = tf.keras.layers.Input(shape=(4), name='movie_hash')

    movie_emb = tf.keras.layers.Embedding(10000, 64, input_length=4)(movie_hash)

    movie_emb_max = tf.keras.layers.GlobalMaxPool1D()(movie_emb)

    movie_emb_max2 = tf.keras.layers.Dense(64, activation='relu', name='movie_emb_max2')(movie_emb_max)
    
    # Cross user & movie at ID level
    um_emb_interact = tf.math.multiply(user_emb_max2, movie_emb_max2)
#     um_emb_interact = tf.keras.layers.concatenate([user_emb_max, movie_emb_max], axis=-1)


    # Pretrained embbedding
    user_p_emb = tf.keras.layers.Input(shape=(64), name='user_embb_features')
    movie_p_emb = tf.keras.layers.Input(shape=(64), name='movie_embb_features')
    
    uf_pe = tf.keras.layers.Dense(16, activation='relu', name='uf_pe')(user_p_emb)
    mf_pe = tf.keras.layers.Dense(16, activation='relu', name='mf_pe')(movie_p_emb)
    
    um_emb_pe = tf.keras.layers.concatenate([uf_pe, mf_pe], axis=-1)
    
    # Raw IDs
    user_id = tf.keras.layers.Input(shape=(), name='user_id')
    movie_id = tf.keras.layers.Input(shape=(), name='movie_id')
    
    # user features & adjustment layer
    user_features = tf.keras.layers.Input(shape=(_USER_FEATURE_DIM), name='user_features')

    # movie features & adjustment layer
    movie_features = tf.keras.layers.Input(shape=(_MOVIE_FEATURE_DIM), name='movie_features')

    # user & movie feature cross
    umf = tf.keras.layers.concatenate([user_features, movie_features, um_emb_interact, um_emb_pe], axis=-1)
    
    umf2 = tf.keras.layers.Dense(32, activation='relu', name='umf2')(umf)


    # activation layer
    output = tf.keras.layers.Dense(1, activation='sigmoid', name='last')(umf2)

    model = tf.keras.models.Model(inputs=[user_hash,
                                          movie_hash,
                                          user_features, 
                                          movie_features,
                                         user_id, movie_id,
                                         user_p_emb, 
                                         movie_p_emb], 
                                      outputs=output)
    model.compile(optimizer='adam', loss='binary_crossentropy', 
                      metrics=['accuracy', mean_pred, auroc])
    return model    
    

run_model(m3_feature_pretrain_emb_cross(), 
          tf_record_train_path, 
          tf_record_validate_path, 
          batch_size=100, 
          epoch=30, 
          num_record_epoch=9000)    
    
    

#### Prediction and scoring

In [None]:
model = m3_feature_pretrain_emb_cross()
model.predict(validation_dataset)

In [None]:
model.inputs

In [None]:
user_emb = tf.keras.Model(model.inputs, 
                          (model.get_layer("user_emb_max").output,
                          model.get_layer("movie_emb_max").output,
                          model.get_layer("user_id2").output))

In [None]:
user_emb = tf.keras.Model((model.get_layer("user_hash").input, model.get_layer("movie_hash").input), 
                          (model.get_layer("user_emb_max").output,
                          model.get_layer("movie_emb_max").output))

In [None]:
print(user_emb.summary())

In [None]:
output_data = user_emb.predict(validation_dataset)

In [None]:
import numpy as np
input_data = [np.asarray([1,2,3,4]), np.asarray([5,6,7,8])]

In [None]:
user_emb.predict( input_data )

In [None]:
len(output_data)