In [None]:
from recsys import hopsworks_integration

In [None]:
project, fs = hopsworks_integration.feature_store.get_feature_store()

# Create Feature View

In [None]:
# from hopsworks import udf
import pandas as pd

In [None]:
# # create hopsworks udf to convert from integer to float
# @udf(return_type=[float, float], drop=["rating", "year_of_publication"])
# def int_to_float(rating: pd.Series, year_of_publication) -> pd.Series:
#     return rating.astype(float), year_of_publication.astype(float)

# # create hopsworks udf to convert from integer to string
# @udf(return_type=str, drop="user_id")
# def int_to_str(user_id: pd.Series) -> pd.Series:
#     return user_id.astype(str)
# int_to_str.alias("user_id")

In [None]:
items_fg = fs.get_feature_group(name="items", version=1)
users_fg = fs.get_feature_group(name="users", version=1)
ratings_fg = fs.get_feature_group(name="ratings", version=1)

selected_features = (
    ratings_fg.select(
        ["user_id", "isbn", "rating"]
    )
    .join(
        users_fg.select(["age"]), #["location", "age"]),
        on="user_id",
        join_type="inner"
    )
    .join(
        items_fg.select(["year_of_publication"]), #["book_title", "book_author", "year_of_publication", "publisher"]),
        on="isbn",
        join_type="inner"
    )
)

feature_view = fs.get_or_create_feature_view(
    version=2,
    name="training_s",
    query=selected_features,
    description="Two-Towers Training dataset",
    # transformation_functions = [
    #     int_to_float("rating", "year_of_publication").alias("rating", "year_of_publication"),
    #     int_to_str("user_id").alias("user_id"),
    # ],
    # labels = ["rating"]
)


# Create Training Data

In [None]:
feature_view = fs.get_feature_view(name="training_s", version=2)
# print(feature_view.read().show(5))

In [None]:
feature_view.schema

In [None]:
# # debugging
# X_train_df, X_val_df, X_test_df, y_train_df, y_val_df, y_test_df = (
#             feature_view.train_validation_test_split(
#                 validation_size=settings.TT_VALIDATION_SPLIT,
#                 test_size=settings.TT_TEST_SPLIT,
#                 description="Training dataset splits",
#             )
#         )

# type(X_train_df)

# X_train_df["year_of_publication"].isna().sum()
# print(f"nan values in year_of_publication: {X_train_df['year_of_publication'].isna().sum()}")
# print(f"nan values in age: {X_train_df['age'].isna().sum()}")
# print(f"nan values in rating: {X_train_df['rating'].isna().sum()}")

# print(f"fraction of nan values in year_of_publication: {X_train_df['year_of_publication'].isna().sum() / X_train_df.shape[0] :.2f}")

In [None]:
import tensorflow as tf
import tensorflow_recommenders as tfrs

from recsys.config import settings

from pprint import pprint

pprint(dict(settings))

In [None]:
from loguru import logger

In [None]:
class TwoTowerDataset:
    def __init__(self, feature_view, batch_size: int) -> None:
        self._feature_view = feature_view
        self._batch_size = batch_size
        self._properties: dict | None

    @property
    def query_features(self) -> list[str]:
        return ["user_id", 
                "age", 
                # "location"
                ]

    @property
    def candidate_features(self) -> list[str]:
        return [
            "isbn",
            # "book_title",
            # "book_author",
            "year_of_publication",
            # "publisher",
        ]

    @property
    def properties(self) -> dict:
        assert self._properties is not None, "Call get_train_val_split() first."

        return self._properties

    def get_items_subset(self):
        item_df = self.properties["X_train_df"][self.candidate_features]
        item_df.drop_duplicates(subset="isbn", inplace=True)
        item_ds = self.df_to_ds(item_df)

        return item_ds

    def get_train_val_split(self):
        logger.info("Retrieving and creating train, val test split...")

        try:
            X_train_df, X_val_df, X_test_df, y_train_df, y_val_df, y_test_df = (
                self._feature_view.get_train_validation_test_split(1)
            ) 
        except Exception as e:
            logger.error(f"Error: {e}")
            logger.info("Creating new train, val test split...")
            X_train_df, X_val_df, X_test_df, y_train_df, y_val_df, y_test_df = (
                self._feature_view.train_validation_test_split(
                    validation_size=settings.TT_VALIDATION_SPLIT,
                    test_size=settings.TT_TEST_SPLIT,
                    description="Training dataset splits",
                )
            )

        X_train_df["year_of_publication"].isna().sum()
        print(f"nan values in year_of_publication: {X_train_df['year_of_publication'].isna().sum()}")
        print(f"nan values in age: {X_train_df['age'].isna().sum()}")
        print(f"nan values in rating: {X_train_df['rating'].isna().sum()}")

        train_ds = (
            self.df_to_ds(X_train_df)
            .batch(self._batch_size)
            .cache()
            .shuffle(self._batch_size * 10)
        )
        val_ds = self.df_to_ds(X_val_df).batch(self._batch_size).cache()

        self._properties = {
            "X_train_df": X_train_df,
            "X_val_df": X_val_df,
            "query_df": X_train_df[self.query_features],
            "item_df": X_train_df[self.candidate_features],
            "user_ids": X_train_df["user_id"].unique().tolist(),
            "item_ids": X_train_df["isbn"].unique().tolist(),
            # "publisher": X_train_df["publisher"].unique().tolist(),
            # "book_author": X_train_df["book_author"].unique().tolist(),
            # "location": X_train_df["location"].unique().tolist(),
            # "book_title": X_train_df["book_title"].unique().tolist(),
        }

        return train_ds, val_ds

    def df_to_ds(self, df):
        return tf.data.Dataset.from_tensor_slices({col: df[col].to_list() for col in df})    

In [None]:
dataset = TwoTowerDataset(
    feature_view=feature_view, batch_size=settings.TT_BATCH_SIZE
)
train_ds, val_ds = dataset.get_train_val_split()

In [None]:
logger.info(f"Training samples: {len(dataset.properties['X_train_df']):,}")
logger.info(f"Validation samples: {len(dataset.properties['X_val_df']):,}")

logger.info(f"Number of users: {len(dataset.properties['user_ids']):,}")
logger.info(f"Number of items: {len(dataset.properties['item_ids']):,}")

# Create Model

In [None]:
from tensorflow.keras.layers import StringLookup, Embedding, Normalization, Dense, TextVectorization, GlobalAveragePooling1D
from tensorflow.keras import regularizers, Sequential

In [None]:
class QueryTowerFactory:
    def __init__(self, dataset: "TwoTowerDataset") -> None:
        self._dataset = dataset

    def build(
        self, embed_dim: int = settings.TT_EMBEDDING_DIM
    ) -> "QueryTower":
        return QueryTower(
            user_ids=self._dataset.properties["user_ids"],
            emb_dim=embed_dim,
        )

class QueryTower(tf.keras.Model):
    def __init__(self, 
                 user_ids: list, 
                 emb_dim: int) -> None:
        super().__init__()

        self.user_embedding = tf.keras.Sequential(
            [
                StringLookup(vocabulary=user_ids, mask_token=None),
                tf.keras.layers.Embedding(
                    # Add an additional embedding to account for unknown tokens.
                    len(user_ids) + 1,
                    emb_dim,
                    embeddings_regularizer=regularizers.l2(settings.TT_WEIGHT_DECAY),
                ),
            ]
        )

        self.normalized_age = Normalization(axis=None)

        self.fnn = tf.keras.Sequential(
            [
                tf.keras.layers.Dense(
                    emb_dim, 
                    activation="relu",
                    kernel_regularizer=regularizers.l2(settings.TT_WEIGHT_DECAY),
                    bias_regularizer=regularizers.l2(settings.TT_WEIGHT_DECAY),
                ),
                tf.keras.layers.Dense(
                    emb_dim,
                    kernel_regularizer=regularizers.l2(settings.TT_WEIGHT_DECAY),
                    bias_regularizer=regularizers.l2(settings.TT_WEIGHT_DECAY),
                ),
            ]
        )

    def call(self, inputs):
        concatenated_inputs = tf.concat(
            [
                self.user_embedding(inputs["user_id"]),
                tf.reshape(self.normalized_age(inputs["age"]), (-1, 1)),
            ],
            axis=1,
        )

        outputs = self.fnn(concatenated_inputs)

        return outputs


In [None]:
query_model_factory = QueryTowerFactory(dataset=dataset)
query_model = query_model_factory.build()

In [None]:
class ItemTowerFactory:
    def __init__(self, dataset: "TwoTowerDataset") -> None:
        self._dataset = dataset

    def build(
        self, embed_dim: int = settings.TT_EMBEDDING_DIM
    ) -> "ItemTower":
        return ItemTower(
            item_ids=self._dataset.properties["item_ids"],
            # garment_groups=self._dataset.properties["garment_groups"],
            # index_groups=self._dataset.properties["index_groups"],
            emb_dim=embed_dim,
        )


class ItemTower(tf.keras.Model):
    def __init__(
        self,
        item_ids: list,
        emb_dim: int,
    ):
        super().__init__()

        self.item_embedding = tf.keras.Sequential(
            [
                StringLookup(vocabulary=item_ids, mask_token=None),
                tf.keras.layers.Embedding(
                    # Add an additional embedding to account for unknown tokens.
                    len(item_ids) + 1,
                    emb_dim,
                    embeddings_regularizer=regularizers.l2(settings.TT_WEIGHT_DECAY),
                ),
            ]
        )

        self.normalized_year = Normalization(axis=None)

        self.fnn = tf.keras.Sequential(
            [
                tf.keras.layers.Dense(
                    emb_dim, 
                    activation="relu",
                    kernel_regularizer=regularizers.l2(settings.TT_WEIGHT_DECAY),
                    bias_regularizer=regularizers.l2(settings.TT_WEIGHT_DECAY),
                ),
                tf.keras.layers.Dense(
                    emb_dim,
                    kernel_regularizer=regularizers.l2(settings.TT_WEIGHT_DECAY),
                    bias_regularizer=regularizers.l2(settings.TT_WEIGHT_DECAY),
                ),
            ]
        )

    def call(self, inputs):
        concatenated_inputs = tf.concat(
            [
                self.item_embedding(inputs["isbn"]),
                tf.reshape(self.normalized_year(inputs["year_of_publication"]), (-1, 1)),
            ],
            axis=1,
        )

        outputs = self.fnn(concatenated_inputs)

        return outputs



In [None]:
item_model_factory = ItemTowerFactory(dataset=dataset)
item_model = item_model_factory.build()

In [None]:
class TwoTowerFactory:
    def __init__(self, 
                 dataset: "TwoTowerDataset") -> None:
        self._dataset = dataset

    def build(
        self,
        query_model: QueryTower,
        item_model: ItemTower,
        batch_size: int = settings.TT_BATCH_SIZE,
    ) -> "TwoTowerModel":
        item_ds = self._dataset.get_items_subset()

        return TwoTowerModel(
            query_model,
            item_model,
            item_ds=item_ds,
            batch_size=batch_size,
        )

class TwoTowerModel(tf.keras.Model):
    def __init__(
        self,
        query_model: QueryTower,
        item_model: ItemTower,
        item_ds: tf.data.Dataset,
        batch_size: int,
    ) -> None:
        super().__init__()
        self.query_model = query_model
        self.item_model = item_model
        self.task = tfrs.tasks.Retrieval(
            metrics=tfrs.metrics.FactorizedTopK(
                candidates=item_ds.batch(batch_size).map(self.item_model)
            )
        )

    def train_step(self, batch) -> tf.Tensor:
        # Set up a gradient tape to record gradients.
        with tf.GradientTape() as tape:
            # Loss computation.
            user_embeddings = self.query_model(batch)
            item_embeddings = self.item_model(batch)
            loss = self.task(
                user_embeddings,
                item_embeddings,
                compute_metrics=False,
            )

            # Handle regularization losses as well.
            regularization_loss = sum(self.losses)
            print(f"{regularization_loss=}")

            total_loss = loss + regularization_loss

        gradients = tape.gradient(total_loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

        metrics = {
            "loss": loss,
            "regularization_loss": regularization_loss,
            "total_loss": total_loss,
        }

        return metrics

    def test_step(self, batch) -> tf.Tensor:
        # Loss computation.
        user_embeddings = self.query_model(batch)
        item_embeddings = self.item_model(batch)

        loss = self.task(
            user_embeddings,
            item_embeddings,
            compute_metrics=False,
        )

        # Handle regularization losses as well.
        regularization_loss = sum(self.losses)

        total_loss = loss + regularization_loss

        metrics = {metric.name: metric.result() for metric in self.metrics}
        metrics["loss"] = loss
        metrics["regularization_loss"] = regularization_loss
        metrics["total_loss"] = total_loss

        return metrics

In [None]:
model_factory = TwoTowerFactory(dataset=dataset)
model = model_factory.build(query_model=query_model, item_model=item_model)

# Train the model

In [None]:
class TwoTowerTrainer:
    def __init__(self, 
                 dataset: TwoTowerDataset, 
                 model: TwoTowerModel) -> None:
        self._dataset = dataset
        self._model = model

    def train(self, train_ds, val_ds):
        self._initialize_models(train_ds)

        # Define an optimizer using AdamW with a learning rate of 0.01
        optimizer = tf.keras.optimizers.AdamW(
            weight_decay=settings.TT_WEIGHT_DECAY,
            learning_rate=settings.TT_LEARNING_RATE,
        )

        # Compile the model using the specified optimizer
        self._model.compile(optimizer=optimizer)

        # Start training
        history = self._model.fit(
            train_ds,
            validation_data=val_ds,
            epochs=settings.TT_EPOCHS,
        )

        return history

    def _initialize_models(self, train_ds):
        # Initialize age normalization layer.
        self._model.query_model.normalized_age.adapt(train_ds.map(lambda x: x["age"]))
        self._model.item_model.normalized_year.adapt(train_ds.map(lambda x: x["year_of_publication"]))

        # Initialize model with inputs.
        query_df = self._dataset.properties["query_df"]
        query_ds = self._dataset.df_to_ds(query_df).batch(1)
        self._model.query_model(next(iter(query_ds)))

        item_df = self._dataset.properties["item_df"]
        item_ds = self._dataset.df_to_ds(item_df).batch(1)
        self._model.item_model(next(iter(item_ds)))

In [None]:
trainer = TwoTowerTrainer(dataset=dataset, model=model)
history = trainer.train(train_ds, val_ds)

In [None]:
import matplotlib.pyplot as plt

# Create figure with two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6))

# Training loss subplot
ax1.plot(history.history["loss"], label="Training Loss", color="blue")
ax1.set_title("Training Loss Over Time")
ax1.set_xlabel("Epoch")
ax1.set_ylabel("Loss")
ax1.legend()
ax1.grid(True)

# Validation loss subplot
ax2.plot(history.history["val_loss"], label="Validation Loss", color="red")
ax2.set_title("Validation Loss Over Time")
ax2.set_xlabel("Epoch")
ax2.set_ylabel("Loss")
ax2.legend()
ax2.grid(True)

# Adjust layout to prevent overlap
plt.tight_layout()
# plt.show() # Uncomment to show the plot

# Save to model registry

In [None]:
from typing import Literal
import hopsworks
import os
from hsml.transformer import Transformer

In [None]:
mr = project.get_model_registry()

In [None]:
class HopsworksQueryModel:
    deployment_name = "query"

    def __init__(self, model: QueryTower) -> None:
        self.model = model

    def save_to_local(self, 
                      output_path: str = "query_model") -> str:
        # Define the input specifications for the instances
        instances_spec = {
            "user_id": tf.TensorSpec(
                shape=(None,), dtype=tf.string, name="user_id"
            ),  # Specification for customer IDs
            "age": tf.TensorSpec(
                shape=(None,), dtype=tf.float32, name="age"
            ),  # Specification for age
        }

        query_module_module = QueryModelModule(model=self.model)
        # Get the concrete function for the query_model's compute_emb function using the specified input signatures
        inference_signatures = (
            query_module_module.compute_embedding.get_concrete_function(instances_spec)
        )

        # Save the query_model along with the concrete function signatures
        tf.saved_model.save(
            self.model,  # The model to save
            output_path,  # Path to save the model
            signatures=inference_signatures,  # Concrete function signatures to include
        )

        return output_path

    def register(self, 
                 mr, 
                 feature_view, 
                 query_df) -> None:
        local_model_path = self.save_to_local()

        # Sample a query example from the query DataFrame
        query_example = query_df.sample().to_dict("records") # [{'user_id': '141902', 'age': 51.0}]

        # Create a tensorflow model for the query_model in the Model Registry
        mr_query_model = mr.tensorflow.create_model(
            name="query_model",  # Name of the model
            description="Model that generates query embeddings from user features",  # Description of the model
            input_example=query_example,  # Example input for the model
            feature_view=feature_view,
        )

        # Save the query_model to the Model Registry
        mr_query_model.save(local_model_path)  # Path to get the model

    @classmethod
    def deploy(cls, 
               ranking_model_type: Literal["ranking", "llmranking"] = "ranking"):
        # Prepare secrets used in the deployment
        project = hopsworks.login()
        cls._prepare_secrets(ranking_model_type)

        mr = project.get_model_registry()
        dataset_api = project.get_dataset_api()

        # Retrieve the 'query_model' from the Model Registry
        query_model = mr.get_model(
            name="query_model",
            version=1,
        )

        # # Query-Transformer
        # # Query-Transformer
        # # Query-Transformer
        # # Copy transformer file into Hopsworks File System
        # uploaded_file_path = dataset_api.upload(
        #     str(settings.RECSYS_DIR / "inference" / "query_transformer.py"),
        #     "Models",
        #     overwrite=True,
        # )

        # # Construct the path to the uploaded script
        # transformer_script_path = os.path.join(
        #     "/Projects",
        #     project.name,
        #     uploaded_file_path,
        # )

        # query_model_transformer = Transformer(
        #     script_file=transformer_script_path,
        #     resources={"num_instances": 0},
        # )

        # # Deploy the query model
        # query_model_deployment = query_model.deploy(
        #     name=cls.deployment_name,
        #     description="Deployment that generates query embeddings from customer features using the query model",
        #     resources={"num_instances": 0},
        #     transformer=query_model_transformer,
        # )

        # return query_model_deployment

        # # Query-Transformer
        # # Query-Transformer
        # # Query-Transformer
    
    @classmethod
    def _prepare_secrets(cls, 
                         ranking_model_type: Literal["ranking", "llmranking"]):
        project = hopsworks.login(
            hostname_verification=False,
            api_key_value=settings.HOPSWORKS_API_KEY.get_secret_value(),     
        )
        secrets_api = hopsworks.get_secrets_api()
        secrets = secrets_api.get_secrets()
        existing_secret_keys = [secret.name for secret in secrets]
        if "RANKING_MODEL_TYPE" in existing_secret_keys:
            secrets_api._delete(name="RANKING_MODEL_TYPE")

        secrets_api.create_secret(
            "RANKING_MODEL_TYPE",
            ranking_model_type,
            project=project.name,
        )


class QueryModelModule(tf.Module):
    def __init__(self, 
                 model: QueryTower) -> None:
        self.model = model

    @tf.function()
    def compute_embedding(self, instances):
        query_embedding = self.model(instances)

        return {
            "customer_id": instances["user_id"],
            # "month_sin": instances["month_sin"],
            # "month_cos": instances["month_cos"],
            "query_emb": query_embedding,
        }
    


In [None]:
query_model = HopsworksQueryModel(
    model=model.query_model
)
query_model.register(
    mr=mr,
    feature_view=feature_view,
    query_df=dataset.properties["query_df"],
)

In [None]:
dataset.properties["query_df"].sample().to_dict("records")

In [None]:
class HopsworksCandidateModel:
    def __init__(self, 
                 model: ItemTower):
        self.model = model

    def save_to_local(self, output_path: str = "candidate_model") -> str:
        tf.saved_model.save(
            self.model,  # The model to save
            output_path,  # Path to save the model
        )

        return output_path

    def register(self, 
                 mr, 
                 feature_view, 
                 item_df):
        local_model_path = self.save_to_local()

        # Sample a candidate example from the item DataFrame
        candidate_example = item_df.sample().to_dict("records")

        # Create a tensorflow model for the candidate_model in the Model Registry
        mr_candidate_model = mr.tensorflow.create_model(
            name="candidate_model",  # Name of the model
            description="Model that generates candidate embeddings from item features",  # Description of the model
            input_example=candidate_example,  # Example input for the model
            feature_view=feature_view,
        )

        # Save the candidate_model to the Model Registry
        mr_candidate_model.save(local_model_path)  # Path to save the model

    @classmethod
    def download(cls, 
                 mr) -> tuple[ItemTower, dict]:
        models = mr.get_models(name="candidate_model")
        if len(models) == 0:
            raise RuntimeError(
                "No 'candidate_model' found in Hopsworks model registry."
            )
        latest_model = max(models, key=lambda m: m.version)

        logger.info(f"Downloading 'candidate_model' version {latest_model.version}")
        model_path = latest_model.download()

        # load downloaded model
        candidate_model = tf.saved_model.load(model_path)

        candidate_features = [
            *candidate_model.signatures["serving_default"]
            .structured_input_signature[-1]
            .keys()
        ]
        return candidate_model, candidate_features

In [None]:
item_model = HopsworksCandidateModel(
    model=model.item_model
)
item_model.register(
    mr=mr,
    feature_view=feature_view,
    item_df=dataset.properties["item_df"],
)