In [None]:
%%writefile test_config.yaml
batch_size: 1024
dataset_features:
    label:
        type: "int"
    userId:
        type: "int"
    movieId:
        type: "float"
    timestamp:
        type: "int"

In [1]:
import yaml
config = yaml.safe_load(open("test_config.yaml", "r"))
config

{'batch_size': 1024,
 'dataset_features': {'label': {'type': 'int'},
  'userId': {'type': 'int'},
  'movieId': {'type': 'float'},
  'timestamp': {'type': 'int'}}}

In [None]:
%%writefile config.yaml
epochs: 2
global_batch_size: 8192
shuffle_buffer: 10
train_rows: 65536
trainval_rows: 8192
eval_rows: 65536
compression: "GZIP"
train_path: "gs://mgaiduk-us-central1/ratings/csv_gzip/part*"
validate_path: "gs://mgaiduk-us-central1/ratings_validate/csv_gzip/part*"
save_model_path: "gs://mgaiduk-us-central1/models/model1"
cycle_length: 8
dataset_features:
    userId:
        type: "string"
    movieId:
        type: "string"
    label:
        type: "int"
    timestamp:
        type: "int"
label: label
model:
    learning_rate: 0.01
    features:
        userId:
            hash: true
            vocab_size: 25000000
            embedding_dim: 16
            belongs_to: user
        movieId:
            hash: true
            vocab_size: 5000000
            embedding_dim: 16
            belongs_to: movie

In [2]:
%%writefile config.py
import yaml

class DatasetFeature:
    def __init__(self, feature_name, dic):
        self.name = feature_name
        self.type = dic["type"]
    
    def __repr__(self):
        return "DatasetFeature: " + str(self.__dict__)

class Feature:
    def __init__(self, feature_name, dic):
        self.hash = False
        if "hash" in dic:
            self.hash = dic["hash"]
            if self.hash:
                assert "vocab_size" in dic
                self.vocab_size = dic["vocab_size"]
        self.embedding_dim = dic["embedding_dim"]
        self.name = feature_name
        self.belongs_to = dic["belongs_to"]

    def __repr__(self):
        return "Feature: " + str(self.__dict__)
        
class Model:
    def __init__(self, dic):
        self.learning_rate = dic["learning_rate"]
        self.features = []
        for feature_name, feature_dic in dic["features"].items():
            self.features.append(Feature(feature_name, feature_dic))

    def __repr__(self):
        return "Model: " + str(self.__dict__)

class Config:
    def __init__(self, path):
        dic = yaml.safe_load(open(path, 'r'))
        self.epochs = dic["epochs"]
        self.compression = dic["compression"]
        self.global_batch_size = dic["global_batch_size"]
        self.label = dic["label"]
        self.shuffle_buffer = dic["shuffle_buffer"]
        self.train_path = dic["train_path"]
        self.validate_path = dic["validate_path"]
        self.save_model_path = dic["save_model_path"]
        self.train_rows = dic["train_rows"]
        self.trainval_rows = dic["trainval_rows"]
        self.eval_rows = dic["eval_rows"]
        self.model = Model(dic["model"])
        self.dataset_features = []
        self.cycle_length = dic["cycle_length"]
        for feature_name, feature_dic in dic["dataset_features"].items():
            self.dataset_features.append(DatasetFeature(feature_name, feature_dic))
       
    def __repr__(self):
        return "Config: " + str(self.__dict__)

Overwriting config.py


In [3]:
from config import Config
config = Config("config.yaml")
config

Config: {'epochs': 2, 'compression': 'GZIP', 'global_batch_size': 8192, 'label': 'label', 'shuffle_buffer': 10, 'train_path': 'gs://mgaiduk-us-central1/ratings/csv_gzip/part*', 'validate_path': 'gs://mgaiduk-us-central1/ratings_validate/csv_gzip/part*', 'save_model_path': 'gs://mgaiduk-us-central1/models/model1', 'train_rows': 65536, 'trainval_rows': 8192, 'eval_rows': 65536, 'model': Model: {'learning_rate': 0.01, 'features': [Feature: {'hash': True, 'vocab_size': 25000000, 'embedding_dim': 16, 'name': 'userId', 'belongs_to': 'user'}, Feature: {'hash': True, 'vocab_size': 5000000, 'embedding_dim': 16, 'name': 'movieId', 'belongs_to': 'movie'}]}, 'dataset_features': [DatasetFeature: {'name': 'userId', 'type': 'string'}, DatasetFeature: {'name': 'movieId', 'type': 'string'}, DatasetFeature: {'name': 'label', 'type': 'int'}, DatasetFeature: {'name': 'timestamp', 'type': 'int'}], 'cycle_length': 8}

In [None]:
%%writefile dataset.py
import tensorflow as tf

class DatasetReader:
    def __init__(self, config, path):
        self.config = config
        self.path = path
        defaults = []
        for feature in self.config.dataset_features:
            if feature.type  == "int":
                defaults.append(tf.constant(0, dtype=tf.int64))
            elif feature.type == "float":
                defaults.append(tf.constant(0.0, dtype=tf.float32))
            elif feature.type == "string":
                defaults.append(tf.constant("", dtype=tf.string))
            else:
                assert False
        self.defaults = defaults

    def __call__(self, ctx: tf.distribute.InputContext):
        batch_size = ctx.get_per_replica_batch_size(
            self.config.global_batch_size) if ctx else self.config.global_batch_size
        @tf.function
        def decode_fn(record_bytes):
            csv_row = tf.io.decode_csv(record_bytes, self.defaults)
            parsed_features = {}
            for i, feature in enumerate(self.config.dataset_features):
                parsed_features[feature.name] = csv_row[i]
            features = {}
            for feature in self.config.model.features:
                t = parsed_features[feature.name]
                if feature.hash:
                    t = tf.strings.to_hash_bucket(t, feature.vocab_size)
                features[feature.name] = t
            labels = {
                "label": parsed_features[self.config.label]
            }
            return (features, labels)

        def make_dataset_fn(path):
            dataset = tf.data.TextLineDataset([path], compression_type=self.config.compression.upper())
            dataset = dataset\
                .shuffle(self.config.shuffle_buffer)\
                .batch(batch_size, drop_remainder=True)\
                .repeat(self.config.epochs).map(decode_fn)
            return dataset
        filenames = tf.data.Dataset.list_files(self.path, shuffle=True, seed=42)
        if ctx and ctx.num_input_pipelines > 1:
            filenames = filenames.shard(ctx.num_input_pipelines, ctx.input_pipeline_id)
        dataset = filenames.interleave(make_dataset_fn, num_parallel_calls=10, deterministic=False, cycle_length=10)
        dataset = dataset.prefetch(100)
        return dataset

def create_dataset(config, strategy, path):
    dataset_callable = DatasetReader(
        config=config,
        path=path
    )
    dataset = strategy.distribute_datasets_from_function(
        dataset_fn=dataset_callable,
        options=tf.distribute.InputOptions(experimental_fetch_to_device=False),
    )
    return dataset

In [4]:
import tensorflow as tf
from dataset import create_dataset
strategy = tf.distribute.get_strategy()
dataset = create_dataset(config, strategy, config.train_path)
for elem in dataset:
    break
elem

2023-03-08 16:54:27.140807: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI AVX512_BF16 AVX_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-08 16:54:27.506699: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-03-08 16:54:27.517628: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2023-03-08 16:54:27.517640: I tensorflow/compiler/xla/stream_executor/cuda/cudar

({'userId': <tf.Tensor: shape=(8192,), dtype=int64, numpy=array([ 4907422, 12027942,  9464171, ..., 14247112, 16568631, 23811830])>,
  'movieId': <tf.Tensor: shape=(8192,), dtype=int64, numpy=array([3110186,  788626, 3951215, ..., 4347734, 3513937, 4559261])>},
 {'label': <tf.Tensor: shape=(8192,), dtype=int64, numpy=array([2, 2, 2, ..., 2, 2, 2])>})

In [None]:
%%writefile model.py
import math
import tensorflow as tf
import tensorflow_recommenders as tfrs

class BaseModel(tfrs.models.Model):
    def __init__(self):
        super().__init__()
        self.task = tfrs.tasks.Ranking(
            loss=tf.keras.losses.BinaryCrossentropy(
                reduction=tf.keras.losses.Reduction.NONE
            ),
            metrics=[
                tf.keras.metrics.BinaryCrossentropy(name="label-crossentropy"),
                tf.keras.metrics.AUC(name="auc"),
                tf.keras.metrics.AUC(curve="PR", name="pr-auc"),
                tf.keras.metrics.BinaryAccuracy(name="accuracy"),
            ],
            prediction_metrics=[
                tf.keras.metrics.Mean("prediction_mean"),
            ],
            label_metrics=[
                tf.keras.metrics.Mean("label_mean")
            ]
        )
    
    def call(self, inputs):
        raise NotImplementedError

    def compute_loss(self, inputs, training=False):
        features, labels = inputs
        outputs = self(features, training=training)
        # loss = tf.reduce_mean(label_loss)
        loss = self.task(labels=labels["label"], predictions=outputs["label"])
        loss = tf.reduce_mean(loss)
        return loss

class Model(BaseModel):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.embedding_optimizer = tf.keras.optimizers.legacy.Adam(learning_rate = config.model.learning_rate)
        self.optimizer = tf.keras.optimizers.Adam(learning_rate = config.model.learning_rate)
        self.hashing_layers = {}
        embedding_layer_feature_config = {}
        for feature in self.config.model.features:
            if feature.hash:
                self.hashing_layers[feature.name] = tf.keras.layers.experimental.preprocessing.Hashing(num_bins=feature.vocab_size)
            initializer = tf.initializers.TruncatedNormal(
                mean=0.0, stddev=1 / math.sqrt(feature.embedding_dim)
            )
            embedding_layer_feature_config[feature.name] = tf.tpu.experimental.embedding.FeatureConfig(
                table=tf.tpu.experimental.embedding.TableConfig(
                vocabulary_size=feature.vocab_size,
                initializer=initializer,
                dim=feature.embedding_dim))
        self.embedding_layer = tfrs.layers.embedding.TPUEmbedding(
            feature_config=embedding_layer_feature_config,
            optimizer=self.embedding_optimizer)
        self.final_activation = tf.keras.layers.Activation('sigmoid')
        

    def call(self, inputs):
        features = {}
        for feature in self.config.model.features:
            t = inputs[feature.name]
            if feature.hash:
                t = self.hashing_layers[feature.name](t)
            features[feature.name] = t
        embeddings = self.embedding_layer(features)
        user_embs = []
        movie_embs = []
        for feature in self.config.model.features:
            embedding = embeddings[feature.name]
            if feature.belongs_to == "user":
                user_embs.append(embedding)
            elif feature.belongs_to == "movie":
                movie_embs.append(embedding)
            else:
                assert False
        user_final = tf.concat(user_embs, axis = 1)
        movie_final = tf.concat(movie_embs, axis = 1)
        # last unit of embedding is considered to be bias
        # out = tf.keras.backend.batch_dot(user_final[:, :-1], post_final[:, :-1]) + user_final[:, -1:] +  post_final[:, -1:]
        # This tf.slice code helps get read of "WARNING:tensorflow:AutoGraph could not transform ..." warnings produced by the above line
        # doesn't seem to improve speed though
        # user_final_emb = tf.slice(user_final, begin=[0, 0], size=[user_final.shape[0],  user_final.shape[1] - 1])
        # user_final_bias = tf.slice(user_final, begin=[0, user_final.shape[1] - 1], size=[user_final.shape[0],  1])
        # movie_final_emb = tf.slice(movie_final, begin=[0, 0], size=[movie_final.shape[0],  movie_final.shape[1] - 1])
        # movie_final_bias = tf.slice(movie_final, begin=[0, movie_final.shape[1] - 1], size=[movie_final.shape[0],  1])
        user_final_emb = user_final[:,:-1]
        user_final_bias = user_final[:,-1:]
        movie_final_emb = movie_final[:,:-1]
        movie_final_bias = movie_final[:,-1:]
        out = tf.keras.backend.batch_dot(user_final_emb, movie_final_emb) + user_final_bias + movie_final_bias
        prediction = self.final_activation(out) 
        return {
            "label": prediction
        }

In [5]:
from model import Model
with strategy.scope():
    model = Model(config)
    model.compile(model.optimizer, steps_per_execution=1)
model(elem[0]) # see some model predictions

{'label': <tf.Tensor: shape=(8192, 1), dtype=float32, numpy=
 array([[0.6423675 ],
        [0.52241415],
        [0.50483507],
        ...,
        [0.55719084],
        [0.6364841 ],
        [0.5118672 ]], dtype=float32)>}

In [None]:
import sys
def save_string_gcs(string_object, gcs_dir, filename):
    string_string = json.dumps(string_object)
    with open(filename, "w") as f:
        f.write(string_string)
    os.system(f"gsutil -m cp {filename} {gcs_dir}/{filename}")
    os.system(f"rm {filename}")

with strategy.scope():
    model = Model(config)
    model.compile(model.optimizer, steps_per_execution=1)
    train_dataset = create_dataset(config, strategy, config.train_path)
    trainval_dataset = create_dataset(config, strategy, config.validate_path)
    eval_dataset = create_dataset(config, strategy, config.validate_path)
    train_steps_per_epoch = config.train_rows // config.global_batch_size
    trainval_steps_per_epoch = config.trainval_rows // config.global_batch_size
    eval_steps_per_epoch = config.eval_rows // config.global_batch_size
    checkpoints_cb = tf.keras.callbacks.ModelCheckpoint(config.save_model_path  + '/checkpoints/',  save_freq = train_steps_per_epoch//3)
    callbacks=[checkpoints_cb]
    history = model.fit(train_dataset, epochs=config.epochs, callbacks=[callbacks], steps_per_epoch=train_steps_per_epoch,
    validation_data=trainval_dataset, validation_steps=trainval_steps_per_epoch)
    model.save_weights(config.save_model_path  + '/weights/')
    eval_steps = config.eval_rows // config.global_batch_size
    eval_scores = model.evaluate(eval_dataset, return_dict=True, steps=eval_steps_per_epoch)
    metrics = {}
    metrics["eval"] = eval_scores
    metrics["history"] = history.history
    metrics["args"] = sys.argv
    metrics["config"] = repr(config)
    save_string_gcs(json.dumps(metrics), config.save_model_path, f"metrics_pretrain.json")

Epoch 1/2
1/8 [==>...........................] - ETA: 21s - label-crossentropy: 0.7036 - auc: 0.0000e+00 - pr-auc: 1.0000 - accuracy: 0.0000e+00 - prediction_mean: 0.5010 - label_mean: 2.0000 - loss: 0.7036 - regularization_loss: 0.0000e+00 - total_loss: 0.7036

