# Preparation

In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
import tensorflow_recommenders as tfrs
from tensorboard.plugins import projector
import optuna

# for visualization
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
from wordcloud import WordCloud

import os, datetime, json, re
from utils import Analysis
from typing import Any, Dict
from tqdm.notebook import tqdm

from imblearn.over_sampling import RandomOverSampler, SMOTEN
import warnings

# Suppress FutureWarnings
warnings.simplefilter(action='ignore', category=FutureWarning)

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 50)

# Set Seaborn style
sns.set_style('darkgrid') 
sns.set_context('notebook')
sns.set_palette('colorblind')
raw_data = pd.read_parquet("search_sample_data.parquet")
columns = [
    'time', 'user_id', 'product_id', 'merchant_id', 'category_id', 
    'channel', 'query_text', 'device_type', 'sales_last_week',
    'sales_last_month', 'sales_last_year', 'previous_purchase',
    'free_shipping', 'is_sold_out', 'editor_pick', 'merchant_name',
    'product_name', 'price_in_cents', 'on_sale', 'category_name',
    'merchant_city', 'merchant_state', 'merchant_region', 'reviews',
    'add_to_cart', 'click', 'conversion'
]

data_df = raw_data.copy()
data_df.columns = data_df.columns.str.lower()
data_df.rename(columns = {'visitor_token':'user_id'}, inplace = True)
data_df = data_df.reindex(columns, axis = 1)

# convert to numeric
cols_to_int = [
    'previous_purchase', 'free_shipping', 'is_sold_out', 'editor_pick',
    'on_sale', 'price_in_cents', 'reviews', 'add_to_cart', 'click', 'conversion'
]
data_df[cols_to_int] = data_df[cols_to_int].astype(int)

# Order by timestamp
data_df = data_df.sort_values(by=['time']).reset_index(drop = True)
# Create score feature
def create_clicks_dataset(df: pd.DataFrame, add_to_cart_score: float= 0.5, conversion_score: float = 1.0) -> pd.DataFrame:
    df = df[df['click'] == True].reset_index(drop=True)
    df['score'] = np.where(df['conversion'] == True, conversion_score, np.where(df['add_to_cart'] == True, add_to_cart_score, 0.0))
    return df

# Filter only clicks
clicks_df = create_clicks_dataset(data_df, 0.5, 1.0)

last_products = data_df.groupby("product_id").last().reset_index()
most_products = data_df.groupby(["product_id", "merchant_name"])[['product_name', 'category_name']].apply(lambda x: x.mode().iloc[0]).reset_index()
products_df = last_products.merge(most_products, on='product_id', suffixes=('_x', ''))
products_df.drop(columns=products_df.filter(regex='_x$', axis=1).columns, inplace=True)

# adapt clicks dataset with the new values of product_name and category_name
def adapt_clicks(df: pd.DataFrame) -> pd.DataFrame:
    merged = df.merge(products_df, on = ['product_id', 'merchant_name'])
    df[['product_name', 'category_name']] = merged[['product_name_y', 'category_name_y']]
    return df

clicks_df = adapt_clicks(clicks_df)

# Cast time to string
clicks_df['time'] = clicks_df['time'].astype(str)
products_df['time'] = products_df['time'].astype(str)

# Function to create cumulative lists
def create_cumulative_list(items: pd.Series) -> list:
    cumulative_list = []
    result = []
    for item in items:
        result.append(cumulative_list.copy())
        cumulative_list.append(item)
    return result

# Function to create sequences
def create_sequence(tbl: pd.DataFrame, feature: str, fix_len: int = 5) -> pd.DataFrame:
    name = f"seq_{feature}"
    tbl[name] = tbl.sort_values(by=['user_id', 'time']).groupby('user_id')[feature].transform(create_cumulative_list)
    tbl[name] = tbl[name].apply(lambda x: (x + [0] * fix_len)[:fix_len])
    tbl[name] = tbl[name].apply(lambda x: [str(p) for p in x])

    return tbl

# Create sequences of product_id and category_name
clicks_df = create_sequence(clicks_df, 'product_id')
clicks_df = create_sequence(clicks_df, 'category_name')

query_features = {
    'categorical': ['user_id', 'channel', 'device_type'],
    'integer': [
    ],
    'text': ['query_text'],
    'continuous': [],
    'sequential': [
        'seq_product_id', 'seq_category_name'
    ]
}

product_features = {
    'categorical': [
        'product_id', 'category_name', 'merchant_name',
        'merchant_city', 'merchant_state', 'merchant_region'
    ],
    'integer': [
        'free_shipping', 'is_sold_out', 'editor_pick', 'on_sale' 
    ],
    'text': ['product_name'],
    'continuous': [
        'price_in_cents', 'reviews', 'sales_last_week', 'sales_last_month', 'sales_last_year'
    ],
    'sequential': []
}

query_features_list = [i for l in query_features.values() for i in l]
product_features_list = [i for l in product_features.values() for i in l]
side_features = ['time', 'score']

# Set seed for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

def create_dataset(clicks_df: pd.DataFrame, products_df: pd.DataFrame) -> tuple[tf.data.Dataset]:
    # Convert pandas dataframe to tensorflow dataset
    clicks_dict = {}
    for f in query_features_list + product_features_list + side_features:
        clicks_dict[f] = tf.constant(clicks_df[f].to_list(), dtype=tf.string) if f in query_features['sequential'] else clicks_df[f]

    products_dict = {}
    for f in product_features_list:
        products_dict[f] = tf.constant(products_df[f].to_list(), dtype=tf.string) if f in product_features['sequential'] else products_df[f]

    return tf.data.Dataset.from_tensor_slices(clicks_dict), tf.data.Dataset.from_tensor_slices(products_dict)

# Order by timestamp
clicks, products = create_dataset(clicks_df, products_df)

# Batching, caching and prefetching to clicks and products datasets used in adapt method later
clicks_batched = clicks.batch(512).cache().prefetch(tf.data.AUTOTUNE)
products_batched = products.batch(512).cache().prefetch(tf.data.AUTOTUNE)

# Define vocabulary
VOCABULARY = {}

for feature in (query_features['categorical'] + query_features['integer']):
    vocab = clicks.batch(512).map(lambda x: x[feature])
    VOCABULARY[feature] = np.unique(np.concatenate(list(vocab)))

for feature in (product_features['categorical'] + product_features['integer']):
    vocab = products.batch(512).map(lambda x: x[feature])
    VOCABULARY[feature] = np.unique(np.concatenate(list(vocab)))

# Add hour and day of the week vocabulary manually, since they are not in the dataset
VOCABULARY['hour'] = np.arange(1, 25, dtype=np.int32)
VOCABULARY['day_of_week'] = np.arange(1, 8, dtype=np.int32)

# Define buckets
BUCKETS = {}

for feature in query_features['continuous']:
    values = np.concatenate(list(clicks.map(lambda x: x[feature]).batch(512)))
    BUCKETS[feature] = np.linspace(values.min(), values.max(), num=100)

for feature in product_features['continuous']:
    values = np.concatenate(list(products.map(lambda x: x[feature]).batch(512)))
    BUCKETS[feature] = np.linspace(values.min(), values.max(), num=100)

2024-07-30 09:43:57.813545: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-07-30 09:43:57.813603: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-30 09:43:57.904619: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-07-30 09:43:58.091661: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-07-30 09:44:10.286739: I external/local_xla/xla/

# Preprocessing Layer

In [63]:
@tf.function
def replace_empty_string(x):
    return tf.where(tf.strings.regex_full_match(x, ""), tf.constant("[NULL]"), x)

@tf.function
def extract_features(x):
    # Extract the 'time' feature
    times = x['time']

    # Extract date and time parts using substr
    date_str = tf.strings.substr(times, 0, 10)
    time_str = tf.strings.substr(times, 11, 12)

    # Extract date components
    years = tf.strings.to_number(tf.strings.substr(date_str, 0, 4), tf.int32)
    months = tf.strings.to_number(tf.strings.substr(date_str, 5, 2), tf.int32)
    days = tf.strings.to_number(tf.strings.substr(date_str, 8, 2), tf.int32)

    # Extract hour
    hours = tf.strings.to_number(tf.strings.substr(time_str, 0, 2), tf.int32)

    # Helper function to calculate day of week using Zeller's Congruence
    def zellers_congruence(year, month, day):
        if month < 3:
            month += 12
            year -= 1
        K = year % 100
        J = year // 100
        f = day + ((13 * (month + 1)) // 5) + K + (K // 4) + (J // 4) + (5 * J)
        return (f % 7 + 5) % 7 # Shift the output to have Monday as 0, Tuesday as 1, ...

    # Calculate the day of the week
    day_of_week = tf.vectorized_map(lambda x: zellers_congruence(x[0], x[1], x[2]), (years, months, days))

    # Add the parsed components back to the dictionary
    return {**x, 'hour': hours, 'day_of_week': day_of_week}


class Preprocessing(tf.keras.layers.Layer):
  
    def __init__(self, name: str, features: dict[str, list[str]], ds: tf.data.Dataset):
        """
        Preprocessing layer.

        Args:
            features: The features dictionary with their preprocessing type as key.
            ds: Dataset to adapt for Query and Product.
        """
        super().__init__(name=name)

        self.features = features
        self.prep_layers = {}
        self._adaptables: dict = {}

        self.extract_time_features = None
        if "int-hour" in self.features or "int-day_of_week" in self.features:
            self.extract_time_features = tf.keras.layers.Lambda(extract_features, name= "extract_time_features")

        for feature in self.features:
            if '-' not in feature:
                continue

            prep, feat = feature.split("-")
            if prep == "cat":
                vocab: list[str] = VOCABULARY[feat]
                self.prep_layers[feature] = tf.keras.layers.StringLookup(vocabulary=vocab, mask_token=None, name=feature)

            elif prep == "int":
                vocab: list[str] = VOCABULARY[feat]
                self.prep_layers[feature] = tf.keras.layers.IntegerLookup(vocabulary=vocab, mask_token=None, name=feature)
                
            elif prep == "text":
                text_layer = tf.keras.layers.TextVectorization(
                        max_tokens = 10_000,
                        output_mode="int",
                        output_sequence_length=20,
                        name=feature
                    )
                text_layer.adapt(ds.map(lambda x: x[feat]))
                self.prep_layers[feature] = tf.keras.Sequential([
                    tf.keras.layers.Lambda(replace_empty_string, name= f"text_null_{feature}"),
                    text_layer
                ], name=feature)

            elif prep == "disc":
                buckets: list[float] = BUCKETS[feat]
                self.prep_layers[feature] = tf.keras.layers.Discretization(buckets.tolist(), name=feature)
            
            elif prep == "norm":
                norm_layer = tf.keras.layers.Normalization(axis = None, name=feature)
                norm_layer.adapt(ds.map(lambda x: x[feat]))
                self.prep_layers[feature] = norm_layer

            elif prep == "seq":
                vocab: list[str] = VOCABULARY[feat.replace("seq_", "")]
                self.prep_layers[feature] = tf.keras.layers.StringLookup(vocabulary=vocab, mask_token=None, name=feature)
            
            else:
                raise ValueError("Preprocessing type not supported.")

    def call(self, input: dict[str, tf.Tensor]) -> tf.Tensor:
        if self.extract_time_features:
            input = self.extract_time_features(input)
        output = {feature: layer(input[feature.split("-")[1]]) for feature, layer in self.prep_layers.items()}
        return {**input, **output}

# Embeddings

In [3]:
class Embeddings(tf.keras.Model):

    def __init__(self, name: str, params: dict[str, Any]) -> None:
        super().__init__(name= name)

        l_name = name.replace("Embeddings", "").lower()
        self.emb_weight = params['MODEL_PARAMS'].get('emb_weight', 1)
        self.features = params['FEATURES'][l_name.upper()]

        self._embeddings = {}
        self.output_dim = 0

        for feature in self.features:
            if '-' not in feature:
                continue
            
            prep, feat = feature.split("-")
            if prep in ["cat", "int"]:
                input_dim = len(VOCABULARY[feat])
                emb_dim, embedding = self.get_embedding(feature, input_dim)
                self._embeddings[feature] = embedding
                
            elif prep == "text":
                input_dim = 10_000
                emb_dim, embedding = self.get_embedding(feature, input_dim)
                self._embeddings[feature] = tf.keras.Sequential([
                    embedding,
                    tf.keras.layers.GlobalAveragePooling1D(name= f"text_avg_{feature}"),
                ])
            
            elif prep == "disc":
                input_dim = len(BUCKETS[feat])
                emb_dim, embedding = self.get_embedding(feature, input_dim)
                self._embeddings[feature] = embedding

            elif prep == "seq":
                input_dim = len(VOCABULARY[feat.replace("seq_", "")])
                emb_dim, embedding = self.get_embedding(feature, input_dim)
                self._embeddings[feature] = tf.keras.Sequential([
                    embedding,
                    tf.keras.layers.GRU(emb_dim, name=f"seq_gru_{feature}"),
                ])
    
    def get_embedding(self, feature: str, input_dim: int) -> int:
        emb_dim = int((np.log2(input_dim) + 1) * self.emb_weight)
        self.output_dim += emb_dim
        return emb_dim, tf.keras.layers.Embedding(input_dim + 1, emb_dim, name=f"emb_{feature}", mask_zero=feature.split("-")[0] == "text")

    def call(self, input: dict[str, tf.Tensor]) -> tf.Tensor:
        normalized_features = [tf.reshape(v, (-1, 1)) for f, v in input.items() if f.split("-")[0] == "norm"]
        return tf.concat([self._embeddings[feature](input[feature]) for feature in self._embeddings.keys()] + normalized_features, axis=1)

# Deep Layers

In [4]:
class DeepLayers(tf.keras.Model):
    def __init__(self, name: str, model_params: dict[str, Any], emb_dim: int) -> None:
        super().__init__(name= name)

        l_name = re.sub(r"Tower|Model", "", name).lower()
        self.deep_layers = model_params[f"{l_name}_layers"]
        self.model = tf.keras.Sequential(name=f"{l_name}_deep_layers")

        if model_params["cross_layer"]:
            self.model.add(tfrs.layers.dcn.Cross(projection_dim=emb_dim // 4, kernel_initializer="glorot_uniform", name=f"cross_{l_name}"))

        # Use the ReLU activation for all but the last layer.
        for i, layer_size in enumerate(self.deep_layers[:-1]):
            self.model.add(tf.keras.layers.Dense(layer_size, activation="relu", name=f"{l_name}_layer{i:02d}"))

            # Add dropout layer for regularization after each layer except the last one
            if i != len(self.deep_layers) - 1:                
                self.model.add(tf.keras.layers.Dropout(model_params["dropout"], name=f"{l_name}_dropout{i:02d}"))

        # No activation for the last layer.
        self.model.add(tf.keras.layers.Dense(self.deep_layers[-1], name=f"{l_name}_output_layer"))
        
    def call(self, input: tf.Tensor) -> tf.Tensor:
        return self.model(input)

# Model

In [5]:
class RecommenderEngineModel(tfrs.models.Model):

    def __init__(self, params: dict[str, Any], preprocessing: bool = False, candidates: tf.data.Dataset = None) -> None:
        """
        Args:
            params: Hyperparameters for the model.
        """
        super().__init__()
        
        # If preprocessing is required, for serving purposes
        self.preprocessing = preprocessing
        if self.preprocessing:
            self.query_prep = Preprocessing("QueryPreprocessing", params['FEATURES']['QUERY'])
            self.prods_prep = Preprocessing("ProductsPreprocessing", params['FEATURES']['PRODUCT'])
            self.query_prep.adapt(clicks_batched)
            self.prods_prep.adapt(products_batched)

        # Assertions
        assert params['MODEL_PARAMS']['query_layers'][-1] == params['MODEL_PARAMS']['product_layers'][-1], "Query and Product output layers must have the same dimension"
        assert params['MODEL_PARAMS']['rating_layers'][-1] == 1, "Rating output layer must have 1 unit"

        # QUERY
        self.query_embedding: tf.Tensor = Embeddings('QueryEmbeddings', params)
        self.query_model = DeepLayers('QueryTower', params['MODEL_PARAMS'], self.query_embedding.output_dim)
        
        # PRODUCT
        self.product_embedding: tf.Tensor = Embeddings('ProductEmbeddings', params)
        self.product_model = DeepLayers('ProductTower', params['MODEL_PARAMS'], self.product_embedding.output_dim)
                
        # RATING
        self.rating_model: tf.Tensor = DeepLayers(
            'RatingModel', 
            params['MODEL_PARAMS'], 
            self.query_embedding.output_dim + self.product_embedding.output_dim
        )

        # TASKS
        # Retrieval Task
        # Build the candidates model to be used as candidates.
        candidates_model = tf.keras.Sequential(name="product_candidates")
        if self.preprocessing:
            candidates_model.add(self.prods_prep)
        candidates_model.add(self.product_embedding)
        candidates_model.add(self.product_model)
    
        self.candidates_model = candidates_model
        self.retrieval_task: tf.keras.layers.Layer = tfrs.tasks.Retrieval(
            metrics=tfrs.metrics.FactorizedTopK(
                candidates.map(candidates_model)
            )
        )

        # Ranking Task
        self.rating_task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
            loss=tf.keras.losses.MeanSquaredError(),
            metrics=[tf.keras.metrics.RootMeanSquaredError()],
        )

    def call(self, input: dict[str, tf.Tensor]) -> tf.Tensor:
        if self.preprocessing:
            query_prep_data = self.query_prep(input)
            prods_prep_data = self.prods_prep(input)
            input = {**query_prep_data, **prods_prep_data}

        query_embedding: tf.Tensor = self.query_embedding(input)
        product_embedding: tf.Tensor = self.product_embedding(input)
        return (
            self.query_model(query_embedding),
            self.product_model(product_embedding),
            self.rating_model(tf.concat([query_embedding, product_embedding], axis=1))
        )

    def compute_loss(self, input: dict[str, tf.Tensor], training=False) -> tf.Tensor:
        ratings: tf.Tensor = input.pop("score")

        query, product, rating = self(input)

        # We compute the loss for each task.
        rating_loss = self.rating_task(
            labels=ratings,
            predictions=rating,
            compute_metrics=not training
        )
        retrieval_loss = self.retrieval_task(query, product, compute_metrics=not training)

        return (rating_loss + retrieval_loss)

# Train Pipeline

In [6]:
# Custone Early Stopping
class CustomEarlyStopping(tf.keras.callbacks.Callback):
    """Stop training when both metrics no longer improve beyond a specified delta.

    Arguments:
        patience: Number of epochs to wait after no improvement. After this
        number of no improvement, training stops.
        start_from_epoch: Epoch number to start checking for early stopping.
        delta_retrieval: Minimum change in retrieval metric to qualify as an improvement.
        delta_rating: Minimum change in rating metric to qualify as an improvement.
    """

    def __init__(
            self, patience=0, start_from_epoch=0, 
            delta_retrieval=0.01, delta_rating=0.01,
            retrieval_metric: str = "val_factorized_top_k/top_100_categorical_accuracy",
            rating_metric: str = "val_root_mean_squared_error"
        ):
        super(CustomEarlyStopping, self).__init__()
        self.patience = patience
        self.start_from_epoch = start_from_epoch
        self.delta_retrieval = delta_retrieval
        self.delta_rating = delta_rating
        self.retrieval_metric = retrieval_metric
        self.rating_metric = rating_metric

    def on_train_begin(self, logs=None):
        self.wait = 0
        self.retrieval_metric_value = -np.Inf
        self.rating_metric_value = np.Inf

    def on_epoch_end(self, epoch, logs=None):
        current_retrieval_metric = logs.get(self.retrieval_metric)
        current_rating_metric = logs.get(self.rating_metric)
        improvement = False

        if epoch < self.start_from_epoch or current_retrieval_metric is None or current_rating_metric is None:
            return

        if current_retrieval_metric is not None and np.greater(current_retrieval_metric - self.retrieval_metric_value, self.delta_retrieval):
            self.retrieval_metric_value = current_retrieval_metric
            self.wait = 0
            improvement = True
        
        if current_rating_metric is not None and np.less(current_rating_metric - self.rating_metric_value, -self.delta_rating):
            self.rating_metric_value = current_rating_metric
            self.wait = 0
            improvement = True

        if not improvement:
            self.wait += 1
            if self.wait >= self.patience and epoch >= self.start_from_epoch:
                self.model.stop_training = True


def train_model(params: dict, verbose: int= 1):
    logdir = params.get("LOGDIR", "logs")
    # Creates a file writer for the log directory.
    file_writer = tf.summary.create_file_writer(logdir)
    
    # Write the model parameters metadata to the log directory
    model_metadata = json.dumps(params, indent=4)

    with file_writer.as_default():
        tf.summary.text(f"Parameters f or {logdir}:", f"```\n{model_metadata}\n```", step=0)

    # Setup Early Stopping and TensorBoard callbacks
    callbacks = [
        tf.keras.callbacks.TensorBoard(logdir),#, profile_batch= (20,25)),
        CustomEarlyStopping(**params['EARLY_STOPPING']),
    ]
    
    # Learning Rate Scheduler
    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(**params['LEARNING_RATE'])    

    model = RecommenderEngineModel(params, candidates=products.batch(512).cache().prefetch(tf.data.AUTOTUNE), preprocessing=True)
    # Compile the model
    optimizer = tf.keras.optimizers.Adagrad(learning_rate=lr_schedule)
    if params["MODEL_PARAMS"]["optimizer"] == "Adam":
        optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
    
    model.compile(optimizer=optimizer)

    # Split train/test set
    n = clicks_df.shape[0]
    train_size = 0.8
    train_samples = int(train_size * n)

    # Split train/test
    shuffle_ds = clicks.shuffle(n, reshuffle_each_iteration=False)
    train = shuffle_ds.take(train_samples)
    test = shuffle_ds.skip(train_samples)
    
    # Batch, cache and prefetch
    cached_train = train.batch(params.get("BATCH_SIZE", 1024)).cache().prefetch(tf.data.AUTOTUNE)
    cached_test = test.batch(512).cache().prefetch(tf.data.AUTOTUNE) # In inference mode I will fix the batch size to 512

    # Fit the model
    model.fit(
        cached_train,
        epochs=params.get("MAX_EPOCHS", 10),
        validation_data=cached_test,
        callbacks=callbacks,
        verbose=verbose
    )

    return model

# Training

In [7]:
# Select features with transformation prefix separated by '-'
# cat stands for StringLookup, int for IntegerLookup, norm for Normalization and disc for Discretization layers.
params = {
    "LOGDIR": "logs/model_1",
    "FEATURES": {
        'QUERY': [
            'time',
            'int-hour',
            'int-day_of_week',            
            'cat-user_id',
            'cat-channel',
            'cat-device_type',
            'text-query_text',
            'seq-seq_product_id',
            'seq-seq_category_name',
            'score',
            ],
        'PRODUCT': [
            'cat-product_id',
            'cat-category_name',
            'cat-merchant_name',
            'cat-merchant_city',
            'cat-merchant_state',
            'cat-merchant_region',
            'int-free_shipping',
            'int-is_sold_out',
            'int-editor_pick',
            'int-on_sale',
            'text-product_name',
            'disc-price_in_cents',
            'norm-price_in_cents',
            'disc-reviews',
            'norm-reviews',
        ]
    },
    "BATCH_SIZE": 64,
    "EARLY_STOPPING": {
        "patience": 3,
        "start_from_epoch": 10,
        "delta_retrieval": 0.001, 
        "delta_rating": 0.001, 
        "retrieval_metric": "val_factorized_top_k/top_100_categorical_accuracy",
        "rating_metric": "val_root_mean_squared_error"
    },
    "MAX_EPOCHS": 300,
    "LEARNING_RATE": {
        "initial_learning_rate": 0.1, # from 0.1 to 0.001 within min_epochs, further decay until early stopping
        "decay_steps": 200, # batchs number in each epoch
        "decay_rate": 0.77,
        "staircase": True
    },
    "MODEL_PARAMS": {
        "emb_weight": 8, # embedding weight shared among all features where emb_size = (np.log2(input_dim) + 1) * emb_weight
        "query_layers": [32], # last layer has to be equal between query and product
        "product_layers": [32], 
        "rating_layers": [256, 128, 1], # last layer needs to have 1 unit for regression task
        "dropout": 0.1,
        "cross_layer": False,
        "optimizer": "Adagrad", # 'Adagrad' or 'Adam'
    }
}

In [9]:
params['LOGDIR'] = "logs/new_model"

mod = train_model(params, verbose=1)

Epoch 1/300
Epoch 2/300
Epoch 3/300
Epoch 4/300
Epoch 5/300
Epoch 6/300
Epoch 7/300
Epoch 8/300
Epoch 9/300
Epoch 10/300
Epoch 11/300
Epoch 12/300
Epoch 13/300
Epoch 14/300


In [85]:
query_prep = Preprocessing(name="QueryPreprocessing", features=params['FEATURES']['QUERY'], ds=clicks_batched)
product_prep = Preprocessing(name="ProductsPreprocessing", features=params['FEATURES']['PRODUCT'], ds=products_batched)
prep_clicks = clicks_batched.map(query_prep).map(product_prep).unbatch()
prep_products = products_batched.map(product_prep).unbatch()

params['LOGDIR'] = "logs/new_model"
file_writer = tf.summary.create_file_writer(params['LOGDIR'])

# Write the model parameters metadata to the log directory
model_metadata = json.dumps(params, indent=4)

with file_writer.as_default():
    tf.summary.text(f"Parameters f or {params['LOGDIR']}:", f"```\n{model_metadata}\n```", step=0)

# Setup Early Stopping and TensorBoard callbacks
callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=params['LOGDIR']),#, profile_batch= (20,25)),
    CustomEarlyStopping(**params['EARLY_STOPPING']),
]
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(**params['LEARNING_RATE'])   

new_model = RecommenderEngineModel(params, candidates=prep_products.batch(512).cache().prefetch(tf.data.AUTOTUNE))

optimizer = tf.keras.optimizers.Adagrad(learning_rate=lr_schedule)
new_model.compile(optimizer=optimizer)

# Split train/test set
n = clicks_df.shape[0]
train_size = 0.8
train_samples = int(train_size * n)

# Split train/test
shuffle_ds = prep_clicks.shuffle(n, reshuffle_each_iteration=False)
train = shuffle_ds.take(train_samples)
test = shuffle_ds.skip(train_samples)
# Repeat, batch, cache and prefetch
cached_train = train.cache().repeat().batch(params.get("BATCH_SIZE", 1024)).prefetch(tf.data.AUTOTUNE)
cached_test = test.cache().repeat().batch(512).prefetch(tf.data.AUTOTUNE) # In inference mode I will fix the batch size to 512

In [88]:
# Fit the model
# Calculate steps per epoch
steps_per_epoch = train_samples // params.get("BATCH_SIZE", 1024)
validation_steps = (n - train_samples) // 512

new_model.fit(
    cached_train,
    epochs=params.get("MAX_EPOCHS", 10),
    validation_data=cached_test,
    callbacks=callbacks,
    verbose=1,
    steps_per_epoch=steps_per_epoch,
    validation_steps=validation_steps
)

Epoch 1/300
Epoch 2/300
Epoch 3/300
Epoch 4/300
Epoch 5/300
Epoch 6/300
Epoch 7/300
 31/267 [==>...........................] - ETA: 7s - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - root_mean_squared_error: 0.0000e+00 - loss: 8.3394 - regularization_loss: 0.0000e+00 - total_loss: 8.3394

KeyboardInterrupt: 