## <span style="color:#ff5f27">🧬 Train Retrieval Model </span>


## <span style="color:#ff5f27">📝 Imports </span>

In [16]:
import tensorflow as tf
from tensorflow.keras.layers import StringLookup, Normalization
import tensorflow_recommenders as tfrs
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

## <span style="color:#ff5f27">🔮 Connect to Hopsworks Feature Store </span>

In [17]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store()

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/17565
Connected. Call `.close()` to terminate connection gracefully.


## <span style="color:#ff5f27">🔪 Feature Selection </span>


In [18]:
users_fg = fs.get_feature_group(
    name="users",
    version=1,
)

videos_fg = fs.get_feature_group(
    name="videos",
    version=1,
)

interactions_fg = fs.get_feature_group(
    name="interactions",
    version=1,
)

---

In [19]:
# generic = Generic(locale=Locale.EN)

# obs = pd.DataFrame(
#     [[generic.person.identifier(mask='####-##-####'), 'like', 'WA546S', '3EL60S', 206]],
#     columns=['interaction_id', 'interaction_type', 'user_id', 'video_id', 'watch_time']
# )
# interactions_fg.insert(obs)

Unnamed: 0,interaction_id,interaction_type,user_id,video_id,watch_time
0,7133-34-1966,like,WA546S,3EL60S,206


---

In [21]:
QUERY_FEATURES = ["user_id", "gender", "age", "country"]
CANDIDATE_FEATURES = ["video_id", "category", "views", "likes", "video_length"]

In [22]:
# Select features for training data
selected_features = interactions_fg.select(["interaction_id"])\
    .join(users_fg.select(QUERY_FEATURES), on="user_id")\
    .join(videos_fg.select(CANDIDATE_FEATURES), on="video_id")

# Uncomment this if you would like to view your selected features
# selected_features.show(5)

## <span style="color:#ff5f27">⚙️ Feature View Creation </span>


In [23]:
feature_view = fs.get_or_create_feature_view(
    name='retrieval',
    version=1,
    query=selected_features,
)

Feature view created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/17565/fs/17485/fv/retrieval/version/1


## <span style="color:#ff5f27">🏋️ Training Dataset </span>


In [24]:
train_df, val_df, test_df, _, _, _ = feature_view.train_validation_test_split(
    validation_size=0.1, 
    test_size=0.1,
    description='Retrieval dataset splits',
)
train_df.head(3)

/arrow/cpp/src/arrow/status.cc:137: DoAction result was not fully consumed: Cancelled: Flight cancelled call, with message: CANCELLED. Detail: Cancelled


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (78.11s) 


Unnamed: 0,interaction_id,user_id,gender,age,country,video_id,category,views,likes,video_length
0,8493-35-9872,MG666N,Female,47,Turks & Caicos Islands,0IE39E,News,63950,45996,38
1,1247-11-6324,IW619V,Other,44,Côte d’Ivoire,2PV16N,Technology,200516,144355,106
2,0336-88-6412,EO571D,Other,34,Bahamas,9WK73L,News,181915,59220,74


You will train your retrieval model with a subset of features.

For the query embedding you will use:
- `user_id`: ID of a user.
- `gender`: Gender of a user.
- `age`: age of a user.
- `country`: country if a user.

For the candidate embedding you will use:
- `video_id`: ID of a video.
- `category`: Video Category.
- `views`: Number of video views.
- `likes`: Number of video likes.
- `video_length`: Length of video.


In [25]:
def df_to_ds(df):
    return tf.data.Dataset.from_tensor_slices({col: df[col] for col in df})

BATCH_SIZE = 2048
train_ds = df_to_ds(train_df).batch(BATCH_SIZE).cache().shuffle(BATCH_SIZE*10)
val_ds = df_to_ds(val_df).batch(BATCH_SIZE).cache()

2024-10-01 16:52:59.116556: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:981] could not open file to read NUMA node: /sys/bus/pci/devices/0000:03:00.0/numa_node
Your kernel may have been built without NUMA support.
2024-10-01 16:52:59.127166: W tensorflow/core/common_runtime/gpu/gpu_device.cc:1960] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...


In [26]:
# Query Features 
user_id_list = train_df["user_id"].unique().tolist()
countries_list = train_df["country"].unique().tolist()
gender_list = train_df["gender"].unique().tolist()

# Item Features
video_id_list = train_df["video_id"].unique().tolist()
category_list = train_df["category"].unique().tolist()

print(f"⛳️ Number of users: {len(user_id_list)}")
print(f"⛳️ Number of items: {len(video_id_list)}")

⛳️ Number of users: 25986
⛳️ Number of items: 25979


## <span style="color:#ff5f27">🏰 Two Tower Model </span>


In [27]:
EMB_DIM = 16

In [28]:
class QueryTower(tf.keras.Model):

    def __init__(self):
        super().__init__()

        self.emb_dim = EMB_DIM
        self.user_embedding = tf.keras.Sequential([
            StringLookup(
                vocabulary=user_id_list,
                mask_token=None
            ),
            tf.keras.layers.Embedding(
                # You add an additional embedding to account for unknown tokens.
                len(user_id_list) + 1,
                self.emb_dim
            )
        ])

        self.normalized_age = Normalization(axis=None)
        
        # Converts strings into integer indices (scikit-learn LabelEncoder analog)
        self.gender_tokenizer = StringLookup(
            vocabulary=gender_list,
            mask_token=None,
        )
        
        self.country_tokenizer = StringLookup(
            vocabulary=countries_list, 
            mask_token=None,
        )

        self.fnn = tf.keras.Sequential([
            tf.keras.layers.Dense(self.emb_dim, activation="relu"),
            tf.keras.layers.Dense(self.emb_dim)
        ])

    def call(self, inputs):
        gender_embedding = tf.one_hot(
            self.gender_tokenizer(inputs["gender"]),
            len(gender_list),
        )
        
        country_embedding = tf.one_hot(
            self.country_tokenizer(inputs["country"]),
            len(countries_list),
        )
        
        concatenated_inputs = tf.concat([
            self.user_embedding(inputs["user_id"]),
            tf.reshape(self.normalized_age(inputs["age"]), (-1,1)),
            gender_embedding,
            country_embedding,
        ], axis=1)

        outputs = self.fnn(concatenated_inputs)

        return outputs


query_model = QueryTower()

query_model.normalized_age.adapt(train_ds.map(lambda x : x["age"]))

# Initialize model with inputs.
query_df = train_df[QUERY_FEATURES]
query_ds = df_to_ds(query_df).batch(1)
query_model(next(iter(query_ds)))

<tf.Tensor: shape=(1, 16), dtype=float32, numpy=
array([[ 0.10894044,  0.20039581,  0.37230363,  0.13125409,  0.1859128 ,
        -0.1191321 , -0.18680353, -0.11699877, -0.03922038, -0.12373803,
        -0.13764858,  0.01167573,  0.28114206,  0.08604892,  0.02320188,
         0.25075862]], dtype=float32)>

In [29]:
class ItemTower(tf.keras.Model):

    def __init__(self):
        super().__init__()

        self.emb_dim = EMB_DIM
        self.video_embedding = tf.keras.Sequential([
            StringLookup(
                vocabulary=video_id_list,
                mask_token=None
            ),
            tf.keras.layers.Embedding(
                # You add an additional embedding to account for unknown tokens.
                len(video_id_list) + 1,
                self.emb_dim,
            )
        ])
        
        # Converts strings into integer indices (scikit-learn LabelEncoder analog)
        self.category_tokenizer = StringLookup(
            vocabulary=category_list, 
            mask_token=None,
        )
        
        self.normalized_views = Normalization(axis=None)
        self.normalized_likes = Normalization(axis=None)
        self.normalized_video_length = Normalization(axis=None)

        self.fnn = tf.keras.Sequential([
            tf.keras.layers.Dense(self.emb_dim, activation="relu"),
            tf.keras.layers.Dense(self.emb_dim)
        ])

    def call(self, inputs):
        category_embedding = tf.one_hot(
            self.category_tokenizer(inputs["category"]),
            len(category_list),
        )

        concatenated_inputs = tf.concat([
            self.video_embedding(inputs["video_id"]),
            category_embedding,
            tf.reshape(self.normalized_views(inputs["views"]), (-1,1)),
            tf.reshape(self.normalized_views(inputs["likes"]), (-1,1)),
            tf.reshape(self.normalized_views(inputs["video_length"]), (-1,1)),
        ], axis=1)

        outputs = self.fnn(concatenated_inputs)

        return outputs

    
item_model = ItemTower()

item_df = train_df[CANDIDATE_FEATURES]
item_df.drop_duplicates(subset="video_id", inplace=True)
item_ds = df_to_ds(item_df)

item_model(next(iter(item_ds.batch(1))))

<tf.Tensor: shape=(1, 16), dtype=float32, numpy=
array([[ -8337.219 ,    271.7439,   6766.8276,  -5961.2393,   4387.9194,
         -2594.7898,   -959.5883,  11187.032 ,  -5418.3667,   3178.5222,
           530.5153,  16850.271 ,  -1287.1066,  -2689.3015, -12817.958 ,
          7989.92  ]], dtype=float32)>

In [30]:
class TwoTowerModel(tf.keras.Model):
    def __init__(self, query_model, item_model):
        super().__init__()
        self.query_model = query_model
        self.item_model = item_model
        self.task = tfrs.tasks.Retrieval(
            metrics=tfrs.metrics.FactorizedTopK(
                candidates=item_ds.batch(BATCH_SIZE).map(self.item_model)
            )
        )

    def train_step(self, batch) -> tf.Tensor:
        # Set up a gradient tape to record gradients.
        with tf.GradientTape() as tape:

            # Loss computation.
            user_embeddings = self.query_model(batch)
            item_embeddings = self.item_model(batch)
            loss = self.task(
                user_embeddings, 
                item_embeddings,
                compute_metrics=False,
            )

            # Handle regularization losses as well.
            regularization_loss = sum(self.losses)

            total_loss = loss + regularization_loss

        gradients = tape.gradient(total_loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        metrics = {
            "loss": loss,
            "regularization_loss": regularization_loss,
            "total_loss": total_loss
        }

        return metrics

    def test_step(self, batch) -> tf.Tensor:
        # Loss computation.
        user_embeddings = self.query_model(batch)
        item_embeddings = self.item_model(batch)

        loss = self.task(
            user_embeddings, 
            item_embeddings,
            compute_metrics=False,
        )

        # Handle regularization losses as well.
        regularization_loss = sum(self.losses)

        total_loss = loss + regularization_loss

        metrics = {metric.name: metric.result() for metric in self.metrics}
        metrics["loss"] = loss
        metrics["regularization_loss"] = regularization_loss
        metrics["total_loss"] = total_loss

        return metrics

### <span style="color:#ff5f27">🏃🏻‍♂️ Model Training </span>

You'll train our model using the AdamW optimizer, which applies weight regularization during training.

In [31]:
# Create a TwoTowerModel with the specified query_model and item_model
model = TwoTowerModel(query_model, item_model)

# Define an optimizer using AdamW with a learning rate of 0.01
optimizer = tf.keras.optimizers.AdamW(
    weight_decay=0.001, 
    learning_rate=0.01,
)

# Compile the model using the specified optimizer
model.compile(optimizer=optimizer)

In [32]:
model.fit(
    train_ds, 
    validation_data=val_ds, 
    epochs=5,
)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.src.callbacks.History at 0x7f6cdde07be0>

## <span style="color:#ff5f27">🗄️ Upload Model to Model Registry </span>

One of the features in Hopsworks is the model registry. This is where you can store different versions of models and compare their performance. Models from the registry can then be served as API endpoints.

Let's connect to the model registry using the [HSML library](https://docs.hopsworks.ai/machine-learning-api/latest) from Hopsworks.

In [33]:
mr = project.get_model_registry()

Connected. Call `.close()` to terminate connection gracefully.


In [34]:
class QueryModelModule(tf.Module):
    def __init__(self, query_model):
        self.query_model = query_model

    @tf.function()
    def compute_emb(self, instances):
        query_emb = self.query_model(instances)
        return {
            "user_id": instances["user_id"],
            "gender": instances["gender"],
            "age": instances["age"],
            "country": instances["country"],
            "query_emb": query_emb,
        }

# wrap query_model:   query_model -> query_model_module
query_model = QueryModelModule(model.query_model)

In [35]:
# Define the input specifications for the instances
instances_spec = {
    'user_id': tf.TensorSpec(shape=(None,), dtype=tf.string, name='user_id'),   # Specification for user IDs
    'gender': tf.TensorSpec(shape=(None,), dtype=tf.string, name='gender'),     # Specification for gender
    'country': tf.TensorSpec(shape=(None,), dtype=tf.string, name='country'),    # Specification for country
    'age': tf.TensorSpec(shape=(None,), dtype=tf.int64, name='age'),              # Specification for age
}

# Get the concrete function for the query_model's compute_emb function using the specified input signatures
signatures = query_model.compute_emb.get_concrete_function(instances_spec)

# Save the query_model along with the concrete function signatures
tf.saved_model.save(
    query_model,           # The model to save
    "query_model",         # Path to save the model
    signatures=signatures, # Concrete function signatures to include
)

2024-10-01 17:22:37,692 INFO: Function `compute_emb` contains input name(s) table_handle, 16213, resource with unsupported characters which will be renamed to query_tower_sequential_string_lookup_none_lookup_lookuptablefindv2_table_handle, query_tower_sequential_embedding_embedding_lookup_16213, query_tower_sequential_1_dense_1_biasadd_readvariableop_resource in the SavedModel.
INFO:tensorflow:Assets written to: query_model/assets
2024-10-01 17:22:42,016 INFO: Assets written to: query_model/assets


In [36]:
tf.saved_model.save(
    model.item_model,    # The model to save
    "candidate_model",   # Path to save the model
)

INFO:tensorflow:Assets written to: candidate_model/assets
2024-10-01 17:22:45,419 INFO: Assets written to: candidate_model/assets


In [37]:
from hsml.schema import Schema
from hsml.model_schema import ModelSchema

# Infer input schema from data.
query_model_input_schema = Schema(query_df)

# Manually specify output schema.
query_model_output_schema = Schema([{
    "name": "query_embedding",
    "type": "float32",
    "shape": [EMB_DIM],
}])

query_model_schema = ModelSchema(
    input_schema=query_model_input_schema,
    output_schema=query_model_output_schema,
)

query_model_schema.to_dict()

{'input_schema': {'columnar_schema': [{'name': 'user_id', 'type': 'object'},
   {'name': 'gender', 'type': 'object'},
   {'name': 'age', 'type': 'int64'},
   {'name': 'country', 'type': 'object'}]},
 'output_schema': {'tensor_schema': [{'name': 'query_embedding',
    'shape': '[16]',
    'type': 'float32'}]}}

In [38]:
# Sample a query example from the query DataFrame
query_example = query_df.sample().to_dict("records")

# Create a tensorflow model for the query_model in the Model Registry 
mr_query_model = mr.tensorflow.create_model(
    name="query_model",                                           # Name of the model
    description="Model that generates query embeddings from user features",  # Description of the model
    input_example=query_example,                                  # Example input for the model
    model_schema=query_model_schema,                              # Schema of the model
)

# Save the query_model to the Model Registry
mr_query_model.save("query_model")                                # Path to save the model

  0%|          | 0/6 [00:00<?, ?it/s]

Uploading: 0.000%|          | 0/562094 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/55 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/576 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/1688053 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/78 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/484 elapsed<00:00 remaining<?

Model created, explore it at https://c.app.hopsworks.ai:443/p/17565/models/query_model/1


Model(name: 'query_model', version: 1)

In [39]:
# Define the input schema for the candidate_model based on item_df
candidate_model_input_schema = Schema(item_df)

# Define the output schema for the candidate_model, specifying the shape and type of the output
candidate_model_output_schema = Schema([{
    "name": "candidate_embedding",   # Name of the output feature
    "type": "float32",               # Data type of the output feature
    "shape": [EMB_DIM],              # Shape of the output feature
}])

# Combine the input and output schemas to create the overall model schema for the candidate_model
candidate_model_schema = ModelSchema(
    input_schema=candidate_model_input_schema,    # Input schema for the model
    output_schema=candidate_model_output_schema,  # Output schema for the model
)

# Sample a candidate example from the item DataFrame
candidate_example = item_df.sample().to_dict("records")

# Create a tensorflow model for the candidate_model in the Model Registry
mr_candidate_model = mr.tensorflow.create_model(
    name="candidate_model",                                        # Name of the model
    description="Model that generates candidate embeddings from video features",  # Description of the model
    input_example=candidate_example,                              # Example input for the model
    model_schema=candidate_model_schema,                          # Schema of the model
)

# Save the candidate_model to the Model Registry
mr_candidate_model.save("candidate_model")                        # Path to save the model

  0%|          | 0/6 [00:00<?, ?it/s]

Uploading: 0.000%|          | 0/527309 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/57 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/562 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/1671587 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/102 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/563 elapsed<00:00 remaining<?

Model created, explore it at https://c.app.hopsworks.ai:443/p/17565/models/candidate_model/1


Model(name: 'candidate_model', version: 1)

---