### Setup envs

In [1]:
import boto3
from sagemaker import get_execution_role

In [2]:
!pip install tensorflow==2.5.0

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [3]:
!pip install tensorflow-recommenders==0.5.2



You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [4]:
role = get_execution_role()
bucket = "ling-cold-start-data"
prefix = "2021-09-08"
data_key = "a3d86f3b-eb45-4641-b05d-30dff7423e6b.csv"
data_location = "s3://{}/{}/{}".format(bucket, prefix, data_key)

In [5]:
import os
import tempfile
from typing import Dict, Text
import pprint 

In [6]:
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_recommenders as tfrs

### Model definition

In [7]:
class UserModel(tf.keras.Model):

    def __init__(self, unique_genders, unique_langs, unique_countries, unique_networks, viewer_age):
        super().__init__()

        self.gender_embedding = tf.keras.Sequential([
            tf.keras.layers.experimental.preprocessing.StringLookup(
                vocabulary=unique_genders, mask_token=None),
            tf.keras.layers.Embedding(len(unique_genders) + 1, 4),
        ])
        
        self.lang_embedding = tf.keras.Sequential([
            tf.keras.layers.experimental.preprocessing.StringLookup(
                vocabulary=unique_langs, mask_token=None),
            tf.keras.layers.Embedding(len(unique_langs) + 1, 11),
        ])
        
        self.country_embedding = tf.keras.Sequential([
            tf.keras.layers.experimental.preprocessing.StringLookup(
                vocabulary=unique_countries, mask_token=None),
            tf.keras.layers.Embedding(len(unique_countries) + 1, 11),
        ])
        
        self.network_embedding = tf.keras.Sequential([
            tf.keras.layers.experimental.preprocessing.StringLookup(
                vocabulary=unique_networks, mask_token=None),
            tf.keras.layers.Embedding(len(unique_networks) + 1, 5),
        ])
        
        self.normalized_age = tf.keras.layers.experimental.preprocessing.Normalization(axis = None)
        self.normalized_age.adapt(viewer_age)

    def call(self, inputs):
        return tf.concat([
            self.gender_embedding(inputs["viewer_gender"]),
            self.lang_embedding(inputs["viewer_lang"]),
            self.country_embedding(inputs["viewer_country"]),
            self.network_embedding(inputs["viewer_network"]),
            tf.reshape(self.normalized_age(inputs["viewer_age"]), (-1, 1))
        ], axis=1)

In [8]:
class BroadcasterModel(tf.keras.Model):

    def __init__(self, unique_movie_titles, dims):
        super().__init__()

        self.broadcaster_embedding = tf.keras.Sequential([
            tf.keras.layers.experimental.preprocessing.StringLookup(
                vocabulary=unique_movie_titles, mask_token=None),
            tf.keras.layers.Embedding(len(unique_movie_titles) + 1, dims)
        ])

    def call(self, broadcaster):
        return tf.concat([
            self.broadcaster_embedding(broadcaster),
        ], axis=1)

### Load data

In [9]:
def load_data_file_cold(file, stats):
    print('loading file:' + file)
    training_df = pd.read_csv(
        file,
        skiprows=[0],
        names=["viewer","broadcaster","viewer_age","viewer_gender","viewer_longitude","viewer_latitude","viewer_lang","viewer_country","broadcaster_age","broadcaster_gender","broadcaster_longitude","broadcaster_latitude","broadcaster_lang","broadcaster_country","duration", "viewer_network", "broadcaster_network", "count"], dtype={
            'viewer': np.unicode,
            'broadcaster': np.unicode,
            'viewer_age': np.single,
            'viewer_gender': np.unicode,
            'viewer_longitude': np.single,
            'viewer_latitude': np.single,
            'viewer_lang': np.unicode,
            'viewer_country': np.unicode,
            'broadcaster_age': np.single,
            'broadcaster_longitude': np.single,
            'broadcaster_latitude': np.single,
            'broadcaster_lang': np.unicode,
            'broadcaster_country': np.unicode,
            'viewer_network': np.unicode,
            'broadcaster_network': np.unicode,
            'count': np.int
        })

    values = {
        'viewer': 'unknown',
        'broadcaster': 'unknown',
        'viewer_age': 30,
        'viewer_gender': 'unknown',
        'viewer_longitude': 0,
        'viewer_latitude': 0,
        'viewer_lang': 'unknown',
        'viewer_country': 'unknown',
        'broadcaster_age': 30,
        'broadcaster_longitude': 0,
        'broadcaster_latitude': 0,
        'broadcaster_lang': 'unknown',
        'broadcaster_country': 'unknown',
        'duration': 0,
        'viewer_network': 'unknown',
        'broadcaster_network': 'unknown',
        'count': 0
    }
    training_df.fillna(value=values, inplace=True)
    print(training_df.head(10))
    print(training_df.iloc[-10:])
#     stats.send_stats('data-size', len(training_df.index))
    return training_df


def load_training_data_cold(file, stats):
    ratings_df = load_data_file_cold(file, stats)
    print('creating data set')
    training_ds = (
        tf.data.Dataset.from_tensor_slices(
            ({
                "viewer": tf.cast(
                    ratings_df['viewer'].values,
                    tf.string),
                "viewer_gender": tf.cast(
                    ratings_df['viewer_gender'].values,
                    tf.string),
                "viewer_lang": tf.cast(
                    ratings_df['viewer_lang'].values,
                    tf.string),
                "viewer_country": tf.cast(
                    ratings_df['viewer_country'].values,
                    tf.string),
                "viewer_age": tf.cast(
                    ratings_df['viewer_age'].values,
                    tf.int16),
                "viewer_longitude": tf.cast(
                    ratings_df['viewer_longitude'].values,
                    tf.float16),
                "viewer_latitude": tf.cast(
                    ratings_df['viewer_latitude'].values,
                    tf.float16),
                "broadcaster": tf.cast(
                    ratings_df['broadcaster'].values,
                    tf.string),
                "viewer_network": tf.cast(
                    ratings_df['viewer_network'].values,
                    tf.string),
                "broadcaster_network": tf.cast(
                    ratings_df['broadcaster_network'].values,
                    tf.string),
            })))

    return training_ds

In [10]:
def prepare_training_data_cold(train_ds):
    print('prepare_training_data')
    training_ds = train_ds.cache().map(lambda x: {
        "broadcaster": x["broadcaster"],
        "viewer": x["viewer"],
        "viewer_gender": x["viewer_gender"],
        "viewer_lang": x["viewer_lang"],
        "viewer_country": x["viewer_country"],
        "viewer_age": x["viewer_age"],
        "viewer_longitude": x["viewer_longitude"],
        "viewer_latitude": x["viewer_latitude"],
        "viewer_network": x["viewer_network"],
        "broadcaster_network": x["broadcaster_network"],
    }, num_parallel_calls=tf.data.AUTOTUNE,
       deterministic=False)

    print('done prepare_training_data')
    return training_ds

def get_broadcaster_data_set(train_ds):
    broadcasters = train_ds.cache().map(lambda x: x["broadcaster"], num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
    broadcasters_ds = tf.data.Dataset.from_tensor_slices(
        np.unique(list(broadcasters.as_numpy_iterator())))
    return broadcasters_ds

def get_list(training_data, key):
    return training_data.batch(1_000_000).map(lambda x: x[key], num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)

def get_unique_list(data):
    return np.unique(np.concatenate(list(data)))

In [11]:
training_dataset = load_training_data_cold(file=data_location, stats="")

loading file:s3://ling-cold-start-data/2021-09-08/a3d86f3b-eb45-4641-b05d-30dff7423e6b.csv
             viewer       broadcaster  viewer_age viewer_gender  \
0   meetme:19714617  meetme:242525021        39.0        female   
1   skout:161675320   skout:167570679        46.0          male   
2     pof:333093026      pof:77411971        28.0          male   
3     pof:324991301     pof:207032171        39.0          male   
4   skout:177541297  meetme:316500815        41.0          male   
5  meetme:258247855  meetme:314497940        36.0          male   
6    meetme:8237459  meetme:213050479        52.0          male   
7   meetme:84182876   skout:175969618        32.0          male   
8   skout:176797432  meetme:309441196        51.0          male   
9  meetme:282314746  meetme:226200204        32.0          male   

   viewer_longitude  viewer_latitude viewer_lang viewer_country  \
0       -118.380096        34.093899          en             US   
1        120.963997        14.693000 

In [12]:
train = prepare_training_data_cold(training_dataset)

prepare_training_data
done prepare_training_data


In [13]:
broadcasters_data_set = get_broadcaster_data_set(training_dataset)

### Prepare features

In [14]:
def get_list(training_data, key):
    return training_data.batch(1_000_000).map(lambda x: x[key], num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)


def get_unique_list(data):
    return np.unique(np.concatenate(list(data)))

In [15]:
user_genders = get_list(train, 'viewer_gender')

In [16]:
user_langs = get_list(train, 'viewer_lang')

In [17]:
user_countries = get_list(train, 'viewer_country')

In [18]:
viewer_age = get_list(train, 'viewer_age')

In [19]:
user_networks = get_list(train, 'viewer_network')

In [20]:
unique_user_genders = get_unique_list(user_genders)

In [21]:
unique_user_langs = get_unique_list(user_langs)

In [22]:
unique_user_countries = get_unique_list(user_countries)

In [23]:
unique_user_networks = get_unique_list(user_networks)

In [24]:
user_model = UserModel(unique_user_genders, unique_user_langs, unique_user_countries, unique_user_networks, viewer_age)  

In [25]:
broadcaster_ids = get_list(train, 'broadcaster')

In [26]:
unique_broadcasters = get_unique_list(broadcaster_ids)

In [27]:
broadcaster_embedding_dimension = 32

In [28]:
broadcaster_model = BroadcasterModel(unique_broadcasters, broadcaster_embedding_dimension)

In [29]:
metrics = tfrs.metrics.FactorizedTopK(candidates=broadcasters_data_set.batch(128).map(broadcaster_model))

In [30]:
task = tfrs.tasks.Retrieval(
    metrics=metrics
)

In [32]:
class TwoTowers(tf.keras.Model):

    def __init__(self, broadcaster_model, user_model, task):
        super().__init__()
        self.broadcaster_model: tf.keras.Model = broadcaster_model
        self.embedding_model = user_model
        self.task: tf.keras.layers.Layer = task

    def train_step(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:

        # Set up a gradient tape to record gradients.
        with tf.GradientTape() as tape:

            # Loss computation.

            user_embeddings = self.embedding_model({
                "viewer_gender": features["viewer_gender"],
                "viewer_lang": features["viewer_lang"],
                "viewer_country": features["viewer_country"],
                "viewer_network": features["viewer_network"],
                "viewer_age": features["viewer_age"],
            })
            positive_movie_embeddings = self.broadcaster_model(
                features["broadcaster"])
            loss = self.task(user_embeddings, positive_movie_embeddings)

            # Handle regularization losses as well.
            regularization_loss = sum(self.losses)

            total_loss = loss + regularization_loss

        gradients = tape.gradient(total_loss, self.trainable_variables)
        self.optimizer.apply_gradients(
            zip(gradients, self.trainable_variables))

        metrics = {metric.name: metric.result() for metric in self.metrics}
        metrics["loss"] = loss
        metrics["regularization_loss"] = regularization_loss
        metrics["total_loss"] = total_loss

        return metrics

    def test_step(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:

        # Loss computation.

        user_embeddings = self.embedding_model({
            "viewer": features["viewer"],
        })
        positive_movie_embeddings = self.broadcaster_model(
            features["broadcaster"])
        loss = self.task(user_embeddings, positive_movie_embeddings)

        # Handle regularization losses as well.
        regularization_loss = sum(self.losses)

        total_loss = loss + regularization_loss

        metrics = {metric.name: metric.result() for metric in self.metrics}
        metrics["loss"] = loss
        metrics["regularization_loss"] = regularization_loss
        metrics["total_loss"] = total_loss
        return metrics


In [33]:
model = TwoTowers(broadcaster_model, user_model, task)

In [34]:
learning_rate = 0.05
batch_size = 16384
epochs = 2
top_k = 1999

In [35]:
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=learning_rate))

In [36]:
train_ds = train.batch(batch_size).cache()
# train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
model.fit(train_ds, epochs=1)

Instructions for updating:
The `validate_indices` argument has no effect. Indices are always validated on CPU and never validated on GPU.


<tensorflow.python.keras.callbacks.History at 0x7fb5c6d69668>

In [39]:
data_location

's3://ling-cold-start-data/2021-09-08/a3d86f3b-eb45-4641-b05d-30dff7423e6b.csv'

In [44]:
from datetime import date

In [45]:
model_location = "s3://{}/{}/{}".format(bucket, prefix, date.today())

In [46]:
model_location

's3://ling-cold-start-data/2021-09-08/2021-09-16'

In [47]:
print("create index")
index = tfrs.layers.factorized_top_k.BruteForce(
    query_model=user_model,
    k=top_k,
)

index.index(
    broadcasters_data_set.batch(10000).map(
        model.broadcaster_model),
    broadcasters_data_set)

_, titles = index(
    {
        "viewer_gender": tf.constant(["male"]),
        "viewer_lang": tf.constant(["en"]),
        "viewer_country": tf.constant(["US"]),
        "viewer_age": tf.constant([38]),
        "viewer_longitude": tf.constant([-74.89611]),
        "viewer_latitude": tf.constant([40.36393]),
        "viewer_network": tf.constant(["meetme"]),
    }
)

print(f"Recommendations for user lam: {titles}")

_, titles = index(
    {
        "viewer_gender": tf.constant(["male"]),
        "viewer_lang": tf.constant(["en"]),
        "viewer_country": tf.constant(["US"]),
        "viewer_age": tf.constant([28]),
        "viewer_longitude": tf.constant([-118.41625]),
        "viewer_latitude": tf.constant([34.10313]),
        "viewer_network": tf.constant(["pof"]),
    }
)

print(f"Recommendations for user cal: {titles}")

_, titles = index(
    {
        "viewer_gender": tf.constant(["female"]),
        "viewer_lang": tf.constant(["en"]),
        "viewer_country": tf.constant(["US"]),
        "viewer_age": tf.constant([32]),
        "viewer_longitude": tf.constant([-74.89611]),
        "viewer_latitude": tf.constant([40.36393]),
        "viewer_network": tf.constant(["skout"]),
    }
)

print(f"Recommendations for user 32: {titles}")

index.save(model_location)


create index
Recommendations for user lam: [[b'meetme:216401566' b'meetme:260974973' b'meetme:213547896' ...
  b'skout:183636079' b'meetme:308353225' b'meetme:318247417']]
Recommendations for user cal: [[b'pof:172999642' b'pof:333413485' b'pof:317513630' ... b'pof:329246893'
  b'pof:313957900' b'pof:332191762']]
Recommendations for user 32: [[b'skout:142826829' b'skout:181048236' b'skout:137840011' ...
  b'meetme:150794322' b'meetme:280917015' b'meetme:266748404']]




INFO:tensorflow:Assets written to: s3://ling-cold-start-data/2021-09-08/2021-09-16/assets


INFO:tensorflow:Assets written to: s3://ling-cold-start-data/2021-09-08/2021-09-16/assets
