ver 6.0 
Supervised contrastive

# Enviroment setup

In [1]:
!pip install tensorflow-addons
# !pip install recommenders
!pip install "dask[dataframe]" --upgrade

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Libs

In [2]:
import pandas as pd
import numpy as np
import random
from matplotlib import pyplot as plt
import gc
import math
import datetime, time
from joblib import Parallel, delayed
from tqdm import tqdm

import tensorflow as tf
from tensorflow.keras import layers
import tensorflow.keras.backend as K
import tensorflow_addons as tfa
from tensorflow_addons.losses import TripletSemiHardLoss 
from sklearn.preprocessing import LabelEncoder

from sklearn.model_selection import train_test_split

from sklearn.decomposition import PCA
import seaborn as sns
from sklearn.manifold import TSNE

import dask.dataframe as dd

# from recommenders.datasets.python_splitters import python_random_split
# from recommenders.evaluation.python_evaluation import map_at_k, ndcg_at_k, precision_at_k, recall_at_k
# from recommenders.models.cornac.cornac_utils import predict_ranking

In [3]:
tf.__version__

'2.8.2'

In [4]:
itemCol = 'movieId'
userCol = 'userId'

In [5]:
# DGX setup
# fpath = "./ml-20m"

#colab setup
from google.colab import drive
drive.mount('/content/gdrive')
fpath = "/content/gdrive/MyDrive/RECOMMENDER_STUDIES/data/ml-20m"

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [6]:
# Loading movie
movies = pd.read_csv(fpath+'/movies.csv')
movies["year"]=movies["title"].apply(lambda x: x[-5:-1])
movies["genres"] = movies["genres"].apply(lambda x: ' ' if x == '(no genres listed)' else ' '.join(x.split('|')) )
movies["title"]= movies["title"].apply(lambda x: x[0:-7])
movies.head()

Unnamed: 0,movieId,title,genres,year
0,1,Toy Story,Adventure Animation Children Comedy Fantasy,1995
1,2,Jumanji,Adventure Children Fantasy,1995
2,3,Grumpier Old Men,Comedy Romance,1995
3,4,Waiting to Exhale,Comedy Drama Romance,1995
4,5,Father of the Bride Part II,Comedy,1995


In [7]:
# user-wise train-test split
def user_wise_split(userCol, test_size=0.25):
    df = pd.read_csv(fpath+'/ratings.csv')
    df["y"] = df["rating"]/2.5-1
    all_user = df[userCol].drop_duplicates()
    train_user, test_user = train_test_split(all_user, test_size=test_size)

    train_ratings = df[df[userCol].isin(train_user)]
    test_ratings = df[df[userCol].isin(test_user)]
    return train_ratings, test_ratings

train, test= user_wise_split(userCol,test_size= 0.35)

In [8]:
top_k_item = 20000
wu_size = 200
max_item = wu_size

top_items = train.groupby(itemCol).count().sort_values(by=userCol, ascending=False).head(top_k_item).index

# Class  model

In [9]:
class TimeTrachker():
    """Tracking runing time"""
    def start(self):
        self.start_time = time.time()

    def check(self, des = ''):
        try:
            if self.check_time is None:
                self.check_time = self.start_time
        except:
            self.check_time = self.start_time
        self.end_time = time.time()
        dur = self.end_time - self.check_time
        print(des + " duration: ", dur)

        self.check_time = time.time()
        return dur

    def stop(self, des = ''):
        self.end_time = time.time()
        dur = self.end_time - self.start_time
        print(des + " duration: ", dur)
        self.start_time = time.time()
        self.check_time = None
        return dur

timer = TimeTrachker()

In [10]:
def get_interaction_set(interaction, max_item = None, top_k_item = None):
    """
    Input:
        interaction: df[userCol, itemCol, y]: dữ liệu đầu vào
        max_item: int: item num limit
    Output:
        df, itemCol: list, y: list, itemCol_str: string, userCol as index
        list item sắp xếp theo giảm dần độ lớn rating
    """
    items = interaction.groupby(itemCol).count().sort_values(by=userCol, ascending=False)
    if top_k_item is not None:
        top_items = items.head(top_k_item).index
        interaction = interaction[interaction[itemCol].isin(top_items)]
    else:
        top_items = items.index

    # Sắp xếp item theo thứ tự giảm dần rating (về sau cắt padding sẽ ưu tiên giữ lại item có rating cao)
    rindex = interaction.groupby(userCol)["y"].transform(lambda grp: grp.sort_values(ascending=False).index)
    interaction = interaction.reindex(rindex)
    
    # Chuyển thành warm-up set theo từng user
    interaction = interaction.groupby("userId").agg({itemCol:list, "y":list})

    # Giới hạn độ dài warm_up size
    if max_item is not None:
        interaction[itemCol] = interaction[itemCol].apply(lambda x: x[0:max_item])
        interaction["y"] = interaction["y"].apply(lambda x: x[0:max_item])

    return interaction, top_items

In [11]:
# Bulding model
class Efficient_Rec(tf.keras.Model):
    def __init__(self, encoder, wu_size= 200, use_tf_function=False):
        super().__init__()
        self.use_tf_function = use_tf_function
        self.encoder = encoder
        self.wu_size = wu_size

    # from v2.2: chỉ groupby, không padding
    @staticmethod
    def get_interaction_set(interaction, max_item = None, top_k_item = None):
        """
        Input:
            interaction: df[userCol, itemCol, y]: dữ liệu đầu vào
            max_item: int: item num limit
        Output:
            df, itemCol: list, y: list, itemCol_str: string, userCol as index
            list item sắp xếp theo giảm dần độ lớn rating
        """
        items = interaction.groupby(itemCol).count().sort_values(by=userCol, ascending=False)
        if top_k_item is not None:
            top_items = items.head(top_k_item).index
            interaction = interaction[interaction[itemCol].isin(top_items)]
        else:
            top_items = items.index

        # Sắp xếp item theo thứ tự giảm dần rating (về sau cắt padding sẽ ưu tiên giữ lại item có rating cao)
        rindex = interaction.groupby(userCol)["y"].transform(lambda grp: grp.sort_values(ascending=False).index)
        interaction = interaction.reindex(rindex)
        
        # Chuyển thành warm-up set theo từng user
        interaction = interaction.groupby("userId").agg({itemCol:list, "y":list})

        # Giới hạn độ dài warm_up size
        if max_item is not None:
            interaction[itemCol] = interaction[itemCol].apply(lambda x: x[0:max_item])
            interaction["y"] = interaction["y"].apply(lambda x: x[0:max_item])

        return interaction, top_items

    def _preprocess(self, inputs, padding_size = 100):
        """
        Padding về wu_size và mask_size, convert list of items => string of items
        batch_inputs: df: itemStr, y"""

        def padding_list(list_item, wu_size, value=0, is_padding=True):
            series_item1 = list_item[0:wu_size]
            if is_padding:
                series_item1 = series_item1+[value]*(wu_size-len(series_item1))
            return series_item1

        items_list, ratings_list = inputs

        items   = items_list.apply(lambda x: ' '.join(list([str(i) for i in x])))
        ratings =   np.stack( ratings_list.apply(lambda x: padding_list( x, padding_size  ) ) )

        return items, ratings


    @staticmethod
    def get_top_cluster(scores, interaction_list, cluster_num=5):
        interaction_list_ = interaction_list.copy()
        idx = np.argsort(-scores.transpose(),axis=0)[:cluster_num]
        interaction_list_["clusters"] = [list(i) for i in idx.transpose()]
        interaction_list_["scores"] = [ list(scores[i][ind]) for i, ind in enumerate(idx.transpose()) ]
        return interaction_list_

    def minibatch_clustering(self, interaction_list, batch_size= 512):
        chunks = [interaction_list[i:i+batch_size] for i in range(0,interaction_list.shape[0],batch_size)]
        preds = []
        for chunk in chunks:
            pred = self.encoder(self._preprocess( [chunk[itemCol], chunk["y"]], padding_size = wu_size )).numpy()
            preds.append( pred )

        return np.concatenate(preds)

    @staticmethod
    def chunk_explode(ratings, batch_size = 1024**2):
        chunks = [ratings[i:i+batch_size] for i in range(0,ratings.shape[0],batch_size)]
        explodes = []

        # Todo: convert for loop to parallel
        def chunk_process(chunk):
            explode = chunk.explode(["clusters", "scores"])
            explode["contribute_score"] = explode["scores"].astype("float64")*explode["y"]
            explode = explode.groupby(["clusters", "movieId"]).agg({
                "contribute_score": ["mean", "count"]
            }).reset_index()
            explode.columns = ["clusters", "movieId", "mean", "count"]

            return explode

        explodes = Parallel(n_jobs = -1, verbose = 1)(
                    delayed(chunk_process)(chunk) for chunk in tqdm(chunks))
        # combine results
        gr = pd.concat(explodes, axis = 0)
        gr["product"] = gr["mean"]*gr["count"]
        gr = gr.groupby(["clusters", "movieId"]).sum().reset_index()
        gr["contribute_score"] = gr["product"]/gr["count"]
        gr = gr[["clusters", "movieId", "contribute_score"]]
        return gr

    def get_shortlist(self, ratings, interaction_list= None, limit = 500, cluster_num = 5, batch_size=512):
        timer.start()
        if interaction_list is None:
            interaction_list_, _ = self.get_interaction_set(  ratings, max_item = max_item, top_k_item = top_k_item  )
        else:
            interaction_list_ = interaction_list.copy()

        scores = self.minibatch_clustering(interaction_list_, batch_size=batch_size)

        # Limit number of cluster for each user
        interaction_list_ = self.get_top_cluster(scores, interaction_list_, cluster_num= cluster_num)
        timer.check(des = "Get cluster")

        # Get shortlist for each cluster
        ratings_ = ratings.copy().set_index("userId")
        ratings_ = ratings_[ratings_["y"]>0]
        ratings_ = ratings_.join(interaction_list_, rsuffix="_l", how = "inner")
        timer.check(des = "Join interaction")
        ratings_ = self.chunk_explode(ratings_, batch_size= 1024*200)
        timer.check(des = "Chunk explode")

        ratings_["rank"] = ratings_.groupby("clusters")["contribute_score"].rank(method='first', ascending=False)
        ratings_ = ratings_[(ratings_["rank"] <= limit)&(ratings_["contribute_score"]>0)]
        timer.check(des = "Chunk explode")

        self.shortlist = ratings_

        timer.stop(des = "Total")

    @staticmethod
    def left_anti_user_item_join( left, right ):
        """Fast left anijoin 2 dataframe by one column"""
        wu_key = left[userCol].astype('str')+"&"+left[itemCol].astype('str')
        blacklist = right[userCol].astype('str')+"&"+right[itemCol].astype('str')

        key_diff = set(wu_key).difference(blacklist)
        where_diff = wu_key.isin(key_diff)
        output = left[where_diff]

        return output


    def batch_get_recommend(self, warm_up= None, historical_ratings = None, top_k = 10, is_remove_interacted = True, 
        batch_size=1024, reduce_method="random", using_rapids= False, cluster_num= 5):
        """
        reduce_method: str, 'random' or 'mean'
        """
        print("historical_ratings shape ", historical_ratings.shape)
        timer.start()
        if warm_up is None:
            warm_up, _ = self.get_interaction_set( 
                         historical_ratings
                        , max_item = max_item
                        , top_k_item = top_k_item )

        scores = self.minibatch_clustering(warm_up, batch_size=512)
        wu = self.get_top_cluster( scores, warm_up, cluster_num= cluster_num)

        wu = wu.explode(["clusters", "scores"]).reset_index()[[userCol, "clusters", "scores"]]
        timer.check(des = "Get cluster for user")
        print("wu shape ", wu.shape)
        
        user_num = wu[userCol].drop_duplicates().shape[0]
        print("user_num ", user_num)

        # cudf.from_pandas
        if using_rapids:
            wu = dd.from_pandas(wu)
            historical_ratings = dd.from_pandas(historical_ratings.copy())

        chunks = [wu[wu[userCol].isin(range(i,i+batch_size))] 
                  for i in range(0,user_num,batch_size)]

        his_chunks = [historical_ratings[historical_ratings[userCol].isin(range(i,i+batch_size))] 
                        for i in range(0,user_num,batch_size)]
       
        shortlist = self.shortlist

        def batch_join_process(chunk, his_chunk):
            timer2 = TimeTrachker()
            timer2.start()

            wu = chunk.merge(shortlist, on="clusters", how='inner')
            print('''wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape: ''', wu.shape)
            wu["matched_score"] = wu["scores"]*wu["contribute_score"]

            if reduce_method=="mean":
                wu = wu.groupby([userCol, itemCol]).agg({"matched_score":'mean'}).reset_index()
            
            if reduce_method=="random":
                # using drop_duplicates for boost speed, may reduce accuracy.
                wu = wu.drop_duplicates(subset=["userId", "movieId"])

            print('''after reduce wu shape: ''', wu.shape)
            timer2.check(des = "reduce")

            if is_remove_interacted:
                wu = self.left_anti_user_item_join( wu, historical_ratings )

            print('''after remove interacted wu shape: ''', wu.shape)
            timer2.check(des = "remove interacted item")

            wu["rank"] = wu.groupby(userCol)["matched_score"].rank(method='first', ascending=False)
            wu = wu[wu["rank"]<= top_k]

            timer2.stop(des = "all chunk processing time")

            return wu 
        
        timer.check(des = "Prepare to join")
        wus = Parallel(n_jobs = -1, backend= 'threading', verbose = 1)(
                    delayed(batch_join_process)(chunk, his_chunk) for chunk, his_chunk in tqdm(zip(chunks, his_chunks)))
        
        gc.collect()

        timer.check(des = "Join")

        output = pd.concat(wus, axis=0)

        timer.stop(des = "Total")
        return output

    def release_cache(self):
        self.interaction_list = None 
        gc.collect()

# Xây dựng encoder model
Encoder =  interaction embedding + user feature embedding<br> 
interaction embedding = sum( interaction embedding các item i)<br> 
interaction embedding item i = rating x (embedding id sản phẩm + embedding item feature)<br>



In [12]:
%%time
# Vectorize (encode + padding) item list
max_vocab_size = len(top_items) # nếu số item có <= top_k_item => lấy số lượng item max
items_str = ' '.join([str(i) for i in top_items])
itemStr = itemCol+"_str"

vectorizer = layers.TextVectorization( max_tokens= top_k_item, split='whitespace', output_sequence_length= wu_size, name = 'vectorizer')
vectorizer.adapt( [items_str] ) 

CPU times: user 866 ms, sys: 570 ms, total: 1.44 s
Wall time: 5.22 s


In [13]:
class Broadcasting_Multiply(tf.keras.layers.Layer):
    """Nhân 2 layers khác shape với nhau, trong đó:
    inputs=[layer1, layer2]
    layer1.shape = (None, n_item, n_feature)
    layer2.shape = (None, n_item)
    (Chú ý đúng thứ tự)
    """

    def call(self, inputs):
        x, y = inputs
        deno = tf.expand_dims(tf.cast(tf.math.count_nonzero(y, axis=1), tf.float32), -1)
        #we add the extra dimension:
        y = K.expand_dims(y, axis=-1)
        #we replicate the elements
        y = K.repeat_elements(y, rep=x.shape[2], axis=-1)

        return x * y, deno

In [14]:
# Xây dựng mạng
embedding_size = 173
reps_size = 132
preference_vector_size = 43

@tf.function
def avg_layer(z):
    t = K.sum(z[0], axis=1)/z[1]
    t = tf.clip_by_value( t, -1, 1 )
    t = tf.where(tf.math.is_nan(t), tf.zeros_like(t), t)
    return t


def interaction_embedding():

    input_wi = layers.Input(shape=(1,), name='input_wi')
    wi = vectorizer(input_wi)
    wi = layers.Embedding(input_dim= max_vocab_size, output_dim= embedding_size, mask_zero= True, name='ei')(wi)
    # wi = layers.Dense(embedding_size, activation='sigmoid', use_bias = False, name='di')(wi)
    wi = layers.Dense(embedding_size, activation='relu', use_bias = False, name='di1')(wi)
    wi = layers.Dense(embedding_size, activation='relu', use_bias = False, name='di2')(wi)
    # wi = layers.Dense(embedding_size, activation='sigmoid', use_bias = False, name='di3')(wi)

    wr = layers.Input(shape=(wu_size,), name='warm_up_ratings')

    ireps = Broadcasting_Multiply(name='mul')([wi, wr])
    uprofile = layers.Lambda(lambda z: avg_layer(z) )(ireps)

    uprofile = layers.Dense( reps_size, activation='relu', name='du1')(uprofile)
    uprofile = layers.Dense( reps_size, activation='relu', name='du2')(uprofile)
    # uprofile = layers.Dense( reps_size, activation='relu', name='du3')(uprofile)
    # uprofile = layers.BatchNormalization(name='norm')(uprofile)
    uprofile = layers.LayerNormalization(name='norm')(uprofile)
    # uprofile = layers.Dense( reps_size, activation='relu', name='du4')(uprofile)
    uprofile = layers.Dense(preference_vector_size, activation='sigmoid', name='clustering')(uprofile)
    
    
    model = tf.keras.Model(inputs= [input_wi, wr], outputs=[uprofile])
    return model

In [15]:
# # Example of layer interaction embedding step by step
# input_wi = ["15 25 65 20 84",  # 5 items
#             "51 54 45 21 24 83 81 76 74 75 72 48 29 38",# 14 items
#             " ", # 0 item
#             ] 

# tvectorizer = layers.TextVectorization( max_tokens= 17, split='whitespace', output_sequence_length= 10)
# tvectorizer.adapt( input_wi ) 

# wi = tvectorizer(input_wi)
# print("afer TextVectorization layer \n",wi)
# wi = layers.Embedding(input_dim= 17, output_dim= 4, mask_zero= True, name='ei')(wi)
# print("afer Embedding layer \n",wi)
# wi = layers.Dense(3, activation='sigmoid',  use_bias = False, name='di')(wi)

# wr = np.array([[0.5, 0.1, -0.5, 1, 0.25, 0, 0, 0, 0, 0], [0.25, 0.15, 0.5, 1, 0.25, 0.5, 0.1, -0.9, 0.4, -0.3], [0,0,0,0,0,0,0,0,0,0]])

# ireps = Broadcasting_Multiply(name='mul')([wi, wr])
# print("afer Multiply layer \n",ireps)
# uprofile = layers.Lambda(lambda z: avg_layer(z) )(ireps)
# print("afer Average layer \n",uprofile)

# uprofile = layers.Dense( reps_size, activation='relu', name='du2')(uprofile)
# uprofile = layers.LayerNormalization(name='norm')(uprofile)
# uprofile = layers.Dense(5, activation='sigmoid', name='clustering')(uprofile)
# print("afer Sigmoid layer \n",uprofile)

In [16]:
# Kiểm tra tham số
# interaction_embedding().summary()

In [17]:
# tf.keras.utils.plot_model( interaction_embedding() ,show_shapes=True, show_dtype=True, show_layer_names=True )

# Evaluate model results

In [18]:
def model_evaluate(model, movies, df):
    dfu, ttop_items = get_interaction_set( df
                    , max_item = max_item
                    , top_k_item = top_k_item )
    group_scores = model.encoder(model._preprocess( [dfu[itemCol], dfu["y"]], padding_size = wu_size )).numpy()

    print("SAMPLE INTERACTION EMBEDDING")
    print( np.max(group_scores), np.mean(group_scores), np.min(group_scores) )
    print( group_scores[0:3] )

    print("FEATURE PLOT")
    feature_plot( movies, df, group_scores)
    
    print("CLUSTER CHECKING")
    check_cluster(group_scores)
    
    print("SPECTROGRAM PLOT")
    plot_spectrogram(group_scores)

def get_label(movies, df, is_encode = False):
    movies["genres_list"] = movies["genres"].apply(lambda x: x.split(' '))
    movie_genres = movies.explode("genres_list")
    gr = df.merge(movie_genres, on="movieId").groupby(["userId", "genres_list"])["movieId"].count().reset_index()
    gr["rank"] = gr.groupby("userId")["movieId"].rank(method='first', ascending=False)

    labels = gr[gr["rank"] ==1].set_index("userId")
    # labels["pred_max_ind"] = np.argmax(group_scores, axis=1)

    if is_encode:
        label_enc = LabelEncoder()
        labels["label"] = label_enc.fit_transform(labels["genres_list"])

    return labels

def feature_plot( movies, df, group_scores ):
    tlabels = get_label(movies, df)

    tsne = PCA(n_components=2, random_state=123)
    # tsne = TSNE(n_components=2, random_state=123)
    z = tsne.fit_transform(group_scores) 

    df = pd.DataFrame()
    df["y"] = tlabels["genres_list"]
    df["comp-1"] = z[:,0]
    df["comp-2"] = z[:,1]

    plt.rcParams["figure.figsize"] = (8,8)
    sns.scatterplot(x="comp-1", y="comp-2", hue=df.y.tolist(),
                    palette="Paired" ,#sns.color_palette("hls", 3),
                    data=df)#.set(title="Iris data T-SNE projection") 
    plt.show()

def check_cluster(group_scores):
    # Kiểm tra số user trong mỗi cụm có bị vón cục
    ugs= np.argmax(group_scores, axis=1)
    for i in range(50):
        print(i,': ', np.sum(ugs==i) )

def plot_spectrogram(group_scores):
    # Sort theo user_group + draw sigmoid/softmax layer
    plt.rcParams["figure.figsize"] = (10,10)
    k =100
    a = group_scores
    ind = np.argmax(group_scores, axis=1)
    plt.imshow( a[np.argsort(ind)][0:k] )
    plt.show()


In [19]:
def model_plot(model, movies, df):
    dfu, ttop_items = get_interaction_set( df
                    , max_item = max_item
                    , top_k_item = top_k_item )
    group_scores = model.encoder(model._preprocess( [dfu[itemCol], dfu["y"]], padding_size = wu_size )).numpy()
    print("FEATURE PLOT")
    feature_plot( movies, df, group_scores)
    

# Warm start user

In [20]:
%%time
u_train_from = 0
u_train_to = u_train_from + 130000
u_test = u_train_to + 5000

def get_labeled_data(df):
    interact_df, _ = get_interaction_set( 
                     df
                    , max_item = max_item
                    , top_k_item = top_k_item )
    labels = get_label( movies, df, is_encode = True)
    interact_df["label"] = labels["label"]
    return interact_df[["movieId","y","label"]]
try:
    train_warm
except:
    # if exists, do not rerun
    train_warm =  get_labeled_data( train[(train[userCol]>u_train_from)&(train[userCol]<u_train_to)] )
    train_warm["rating_num"] = train_warm.apply(lambda x: len(x["y"]), axis=1)
    # pretrain with warm start user
    train_warm = train_warm[train_warm["rating_num"]>=100]

print( train_warm.shape )

(32154, 4)
CPU times: user 36.9 s, sys: 1.76 s, total: 38.7 s
Wall time: 42.8 s


In [21]:
# x = train[["userId", "rating"]].groupby("userId").count()
# x["rating_num_clip"] = x["rating"].clip(0, 700).apply(lambda x: int(x/20)*20)
# x.groupby("rating_num_clip").count().plot()

In [22]:
# x.groupby("rating_num_clip").count()

In [23]:
gc.collect()

150

# supervised constrastive 

In [24]:
class SupervisedContrastiveLoss(tf.keras.losses.Loss):
    def __init__(self, temperature=1, name=None):
        super(SupervisedContrastiveLoss, self).__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        # Normalize feature vectors
        feature_vectors_normalized = tf.math.l2_normalize(feature_vectors, axis=1)
        # Compute logits
        logits = tf.divide(
            tf.matmul(
                feature_vectors_normalized, tf.transpose(feature_vectors_normalized)
            ),
            self.temperature,
        )
        return tfa.losses.npairs_loss(tf.squeeze(labels), logits)

In [25]:
# Thực hiện training
def _supervised_constrastive_train_step(self, inputs):
    items_pd, ratings_pd, labels = inputs["movieId"], inputs["y"], inputs["label"]
    items, ratings = self._preprocess((items_pd, ratings_pd), wu_size)

    with tf.GradientTape() as tape:
        # Interaction embedding
        vec = self.encoder([items, ratings])

        average_loss = self.loss(labels, vec)

    # Apply an optimization step
    variables = self.trainable_variables 
    gradients = tape.gradient(average_loss, variables)
    
    # Gradient clipping
    # gradients = [None if gradient is None else tf.clip_by_value(gradient, -0.1, 0.1)
    #              for gradient in gradients]
    self.optimizer.apply_gradients(zip(gradients, variables))

    # Return a dict mapping metric names to current value
    return {'batch_loss': average_loss}

Efficient_Rec._supervised_constrastive_train_step = _supervised_constrastive_train_step

In [26]:
# Thực hiện minibatch training
def _spv_constrastive_train_minibatch_step(self, inputs, batch_size):
    df = inputs.copy()
    chunks = [df[i:i+batch_size] for i in range(0,df.shape[0],batch_size)]
    losses = []
    for chunk in chunks:
        loss = self._supervised_constrastive_train_step(chunk)
        losses.append(loss["batch_loss"].numpy())
        print(loss)
        gc.collect()
    return np.mean(losses)

Efficient_Rec._spv_constrastive_train_minibatch_step = _spv_constrastive_train_minibatch_step

In [27]:
# Compile model
model = Efficient_Rec( encoder = interaction_embedding(), 
                      wu_size = wu_size,
                      use_tf_function=False)
model.compile(
    optimizer=tf.optimizers.Adam(learning_rate = 0.001),
    loss= TripletSemiHardLoss() # SupervisedContrastiveLoss(temperature = 0.05),
)

In [None]:
# Load trained model
latest = tf.train.latest_checkpoint("/content/gdrive/MyDrive/RECOMMENDER_STUDIES/data/")
model.encoder.load_weights(latest)
test_set = train[(train[userCol]>15000)&(train[userCol]<20000)]
model_plot(model, movies, test_set )

In [29]:
%%time
# Train new model
# epochs= 6
# test_set = train[(train[userCol]>15000)&(train[userCol]<20000)]
# model_plot(model, movies, test_set )
# for n in range(epochs):
#   print(n, "/", epochs, ": ", model._spv_constrastive_train_minibatch_step(train_warm.sample(frac=1.), batch_size=512))
#   model_plot(model, movies, test_set )
#   gc.collect()

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.48 µs


In [30]:
model.encoder.save_weights("/content/gdrive/MyDrive/RECOMMENDER_STUDIES/data/encoder_v6",save_format='tf')

In [31]:
# model_evaluate(model, movies, 
#                warm_up_mask[(warm_up_mask[userCol]>u_train_to)&(warm_up_mask[userCol]<=u_test)])

# Pick item pipeline

In [32]:
test_warm_up, test_mask = train_test_split(test, test_size= 0.5)

In [33]:
test_items = test.groupby(itemCol).count().sort_values(by=userCol, ascending=False).head(1000).index

In [34]:
%%time 
# Item catalog of shortlist based in mesuable item in test (for evaluation process)
model.get_shortlist( 
    ratings = train[ train[userCol].isin(train_warm.index) & train[itemCol].isin( test_items )],
    interaction_list = train_warm,
    limit = 50, 
    cluster_num = 5
)

Get cluster duration:  3.9045166969299316
Join interaction duration:  0.8713810443878174


  0%|          | 0/25 [00:00<?, ?it/s][Parallel(n_jobs=-1)]: Using backend LokyBackend with 2 concurrent workers.
100%|██████████| 25/25 [00:18<00:00,  1.37it/s]
[Parallel(n_jobs=-1)]: Done  25 out of  25 | elapsed:   20.3s finished


Chunk explode duration:  20.605602979660034
Chunk explode duration:  0.01332545280456543
Total duration:  25.3980815410614
CPU times: user 8.9 s, sys: 605 ms, total: 9.5 s
Wall time: 25.8 s


In [35]:
gc.collect()

1232

In [36]:
model.shortlist

Unnamed: 0,clusters,movieId,contribute_score,rank
27,0,50,0.716508,28.0
100,0,296,0.732032,17.0
103,0,306,0.743898,9.0
104,0,307,0.717722,27.0
109,0,318,0.798635,1.0
...,...,...,...,...
39240,42,7153,0.642912,48.0
39268,42,27773,0.644806,45.0
39272,42,30810,0.678977,17.0
39304,42,48394,0.654859,31.0


In [37]:
%%time 
top_k = 50

y_pred = model.batch_get_recommend(
        historical_ratings= test_warm_up, 
        top_k = top_k, is_remove_interacted = True, 
        batch_size= 1024*10,
        reduce_method="mean", 
        cluster_num= 5, 
        using_rapids= False
    )

historical_ratings shape  (3488421, 5)
Get cluster for user duration:  18.35366415977478
wu shape  (242365, 3)
user_num  48473
Prepare to join duration:  1.1198077201843262


0it [00:00, ?it/s]

wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (868437, 6)


[Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 2 concurrent workers.


wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (910200, 6)
after reduce wu shape:  (281542, 3)
reduce duration:  98.5161292552948
after reduce wu shape:  (292546, 3)
reduce duration:  106.51985120773315
after remove interacted wu shape:  (260659, 3)
remove interacted item duration:  8.481445074081421


5it [01:51, 22.25s/it]

all chunk processing time duration:  111.22775745391846





after remove interacted wu shape:  (271286, 3)
remove interacted item duration:  5.949856519699097
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (926200, 6)
all chunk processing time duration:  114.15825366973877
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (884500, 6)
after reduce wu shape:  (284527, 3)
reduce duration:  85.45140862464905
after reduce wu shape:  (299370, 3)
reduce duration:  88.77213168144226
after remove interacted wu shape:  (264283, 3)
remove interacted item duration:  11.44455885887146
after remove interacted wu shape:  (277367, 3)
remove interacted item duration:  11.188748598098755
all chunk processing time duration:  97.0539026260376
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (904487, 6)
all chunk processing time duration:  101.63033103942871
after reduce wu shape:  (294062, 3)
reduce duration:  41.64566159248352
after remove interacted wu shape:  (273044, 3)
remove interacted item duratio

[Parallel(n_jobs=-1)]: Done   5 out of   5 | elapsed:  4.3min finished


Join duration:  259.1402883529663
Total duration:  278.6308798789978
CPU times: user 4min 41s, sys: 18.2 s, total: 4min 59s
Wall time: 4min 38s


In [38]:
gc.collect()

50

In [39]:
y_pred

Unnamed: 0,userId,movieId,matched_score,rank
1,10,50,0.748056,41.0
2,10,194,0.796426,18.0
3,10,232,0.962821,7.0
4,10,260,0.732365,50.0
5,10,280,0.881090,8.0
...,...,...,...,...
294057,51198,6016,0.724854,18.0
294058,51198,7153,0.727283,15.0
294059,51198,44555,0.719642,28.0
294060,51198,58559,0.721043,24.0


In [40]:
y_pred.shape[0]/y_pred[userCol].drop_duplicates().shape[0]

49.38918558077437

In [41]:
y_true = test
y_true["is_fav"] = y_true["rating"].apply(lambda x: 1 if x>2.5 else 0)

In [42]:
y_true

Unnamed: 0,userId,movieId,rating,timestamp,y,is_fav
922,10,1,4.0,943497887,0.6,1
923,10,11,4.0,943497660,0.6,1
924,10,25,4.0,943497660,0.6,1
925,10,260,4.0,943497376,0.6,1
926,10,356,3.0,943497122,0.2,1
...,...,...,...,...,...,...
19999885,138492,8376,4.5,1115351058,0.8,1
19999886,138492,8623,3.5,1115351368,0.4,1
19999887,138492,8784,5.0,1115296793,1.0,1
19999888,138492,8807,4.0,1115351064,0.6,1


In [43]:
def evaluate_model(y_true, y_pred):
    """
    y_true: dataframe: user_id, item_id, is_fav (1 true, 0 false)
    y_pred: dataframe: user_id, item_id
    return:
    precision@k, recall@k
    """
    y_pred2 = y_pred.copy()
    y_pred2["is_rec"] = 1
    y_true2 = y_true.copy()

    total1 = y_true2.merge(y_pred2, on=[userCol, itemCol], how = 'left')
    total1["rec_fav"] = total1["is_rec"].fillna(0) * total1["is_fav"]

    # Precision
    p = total1["rec_fav"].sum() / total1["is_rec"].sum()

    # Recall
    r = total1["rec_fav"].sum() / total1["is_fav"].sum()

    print("Rec matched: ", total1["is_rec"].sum())
    print("All labels: ", total1["is_fav"].sum())

    return p, r

In [44]:
precision, recall = evaluate_model(y_true, y_pred)

Rec matched:  85905.0
All labels:  5760712


In [45]:
print("Precision :", precision)

Precision : 0.950130958617077


In [46]:
recall

0.014168561108418543

## Evaluate based on rated item

In [47]:
%%time
def evaluate_model(model, y_true, warm_up= None, historical_ratings = None, top_k = 10, is_remove_interacted = True, 
    batch_size=1024*10, reduce_method="random", cluster_num= 5):
    """
    reduce_method: str, 'random' or 'mean'
    """
    print("historical_ratings shape ", historical_ratings.shape)
    timer.start()
    if warm_up is None:
        warm_up, _ = model.get_interaction_set( 
                     historical_ratings
                    , max_item = max_item
                    , top_k_item = top_k_item )

    scores = model.minibatch_clustering(warm_up, batch_size=512)
    wu = model.get_top_cluster( scores, warm_up, cluster_num= cluster_num)

    wu = wu.explode(["clusters", "scores"]).reset_index()[[userCol, "clusters", "scores"]]
    timer.check(des = "Get cluster for user")
    print("wu shape ", wu.shape)
    
    user_num = wu[userCol].drop_duplicates().shape[0]
    print("user_num ", user_num)

    chunks = [wu[wu[userCol].isin(range(i,i+batch_size))] 
              for i in range(0,user_num,batch_size)]

    his_chunks = [historical_ratings[historical_ratings[userCol].isin(range(i,i+batch_size))] 
                    for i in range(0,user_num,batch_size)]
   
    shortlist = model.shortlist

    def batch_join_process(chunk, his_chunk):
        timer2 = TimeTrachker()
        timer2.start()

        wu = chunk.merge(shortlist, on="clusters", how='inner')
        print('''wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape: ''', wu.shape)
        wu["matched_score"] = wu["scores"]*wu["contribute_score"]

        if reduce_method=="mean":
            wu = wu.groupby([userCol, itemCol]).agg({"matched_score":'mean'}).reset_index()
        
        if reduce_method=="random":
            # using drop_duplicates for boost speed, may reduce accuracy.
            wu = wu.drop_duplicates(subset=["userId", "movieId"])

        print('''after reduce wu shape: ''', wu.shape)
        timer2.check(des = "reduce")

        if is_remove_interacted:
            wu = model.left_anti_user_item_join( wu, historical_ratings )

        # remove unrated item
        wu = wu.merge(y_true, on= [userCol, itemCol], how='outer')
        wu = wu[wu["y"].notnull()]

        print('''after remove interacted wu shape: ''', wu.shape)
        timer2.check(des = "remove interacted item")

        wu["rank"] = wu.groupby(userCol)["matched_score"].rank(method='first', ascending=False)
        wu = wu[wu["rank"]<= top_k]

        timer2.stop(des = "all chunk processing time")

        return wu 
    
    timer.check(des = "Prepare to join")
    wus = Parallel(n_jobs = -1, backend= 'threading', verbose = 1)(
                delayed(batch_join_process)(chunk, his_chunk) for chunk, his_chunk in tqdm(zip(chunks, his_chunks)))
    
    gc.collect()

    timer.check(des = "Join")

    output = pd.concat(wus, axis=0)

    timer.stop(des = "Total")
    return output


eval_tbl = evaluate_model(
        model, y_true,
        historical_ratings= test_warm_up, 
        top_k = top_k, is_remove_interacted = True, 
        batch_size= 1024*5,
        reduce_method="random", 
        cluster_num= 5
    )

historical_ratings shape  (3488421, 5)
Get cluster for user duration:  17.52860140800476
wu shape  (242365, 3)
user_num  48473
Prepare to join duration:  0.8781232833862305


0it [00:00, ?it/s][Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 2 concurrent workers.


wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (432737, 6)
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (435700, 6)
after reduce wu shape:  (138643, 7)
reduce duration:  2.0556087493896484
after reduce wu shape:  (142899, 7)
reduce duration:  4.525472402572632
after remove interacted wu shape:  (6976842, 11)
remove interacted item duration:  16.745432138442993
after remove interacted wu shape:  (6976842, 11)
remove interacted item duration:  14.517069339752197


4it [00:32,  8.09s/it]

all chunk processing time duration:  26.066219806671143
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (460700, 6)
all chunk processing time duration:  33.323891401290894
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (449500, 6)
after reduce wu shape:  (147055, 7)
reduce duration:  1.8388009071350098
after reduce wu shape:  (145491, 7)
reduce duration:  5.290325403213501
after remove interacted wu shape:  (6976842, 11)
remove interacted item duration:  17.11936044692993
after remove interacted wu shape:  (6976842, 11)
remove interacted item duration:  12.95199465751648
all chunk processing time duration:  

6it [01:04, 11.49s/it]

26.110783100128174
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (465950, 6)
all chunk processing time duration:  32.4640371799469
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (460250, 6)
after reduce wu shape:  (151046, 7)
reduce duration:  1.8618054389953613
after reduce wu shape:  (148324, 7)
reduce duration:  5.343752861022949
after remove interacted wu shape: after remove interacted wu shape:  (6976842, 11)
remove interacted item duration:  17.009716749191284
 (6976842, 11)
remove interacted item duration:  12.567822217941284


8it [01:37, 13.21s/it]

all chunk processing time duration:  32.21333694458008
all chunk processing time duration:  31.31591296195984
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (447500, 6)
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (437000, 6)
after reduce wu shape: after reduce wu shape:  (139780, 7)
reduce duration:  1.7101294994354248
 (144747, 7)
reduce duration:  1.9431235790252686
after remove interacted wu shape: after remove interacted wu shape:  (6976842, 11)
remove interacted item duration:  16.952407836914062
 (6976842, 11)
remove interacted item duration:  17.124051094055176


10it [02:09, 12.94s/it]

all chunk processing time duration:  32.260825395584106
all chunk processing time duration:  32.39958357810974
wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (450237, 6)





wu = chunk.merge(shortlist, on="clusters", how='inner') : wu shape:  (454250, 6)
after reduce wu shape:  (146199, 7)
reduce duration:  1.8055663108825684
after reduce wu shape:  (147863, 7)
reduce duration:  1.9275975227355957
after remove interacted wu shape:  (6976842, 11)
remove interacted item duration:  17.03400754928589
after remove interacted wu shape:  (6976842, 11)
remove interacted item duration:  17.2228524684906
all chunk processing time duration:  25.956830501556396
all chunk processing time duration:  32.590904235839844


[Parallel(n_jobs=-1)]: Done  10 out of  10 | elapsed:  2.7min finished


Join duration:  162.59311389923096
Total duration:  181.03590393066406


In [68]:
eval_tbl[eval_tbl["y"] <0]

Unnamed: 0,userId,clusters,scores,movieId,contribute_score,rank,matched_score,rating,timestamp,y,is_fav
664,138,31,0.997932,1196,0.741992,2.0,0.740457,1.0,1.046427e+09,-0.6,0.0
713,157,31,0.971372,1208,0.701784,4.0,0.681694,2.0,9.678670e+08,-0.2,0.0
885,208,31,0.997879,1234,0.710701,16.0,0.709193,2.0,9.417517e+08,-0.2,0.0
968,254,31,0.998296,5952,0.742070,4.0,0.740806,2.0,1.205358e+09,-0.2,0.0
971,254,31,0.998296,7153,0.756653,2.0,0.755364,2.0,1.205359e+09,-0.2,0.0
...,...,...,...,...,...,...,...,...,...,...,...
134563,48669,14,0.999471,3608,0.771999,5.0,0.771590,0.5,1.310186e+09,-0.8,0.0
135714,49684,35,0.996241,4878,0.752356,3.0,0.749527,2.0,1.175497e+09,-0.2,0.0
135952,50208,35,0.999524,2329,0.755301,1.0,0.754941,1.5,1.132839e+09,-0.4,0.0
137244,48669,15,0.996956,293,0.994608,1.0,0.991580,2.0,1.310187e+09,-0.2,0.0


In [66]:
eval_tbl["matched_score"] = eval_tbl["matched_score"].astype('float')

In [67]:
eval_tbl[["matched_score", "y"]].corr()

Unnamed: 0,matched_score,y
matched_score,1.0,0.013638
y,0.013638,1.0


# END HERE

In [49]:
1/0

ZeroDivisionError: ignored

In [None]:
# Warm start evaluation

In [None]:
def evaluate_rs(y_true, y_pred, is_return_df = False):
    """
    y_true: dataframe: user_id, item_id, y (rating normailised), only favorite item
    y_pred: dataframe: user_id, item_id (just top k item)
    return:
    precision@k, recall@k
    """
    total1 = y_true.merge(y_pred, on=[userCol, itemCol], how = 'outer', suffixes=('_t', '_p'))
    total1['is_pt'] = total1.apply(lambda x: 0 if (np.isnan(x["y"]) or np.isnan(x["rank"]) ) else 1,axis=1)
    total = total1.groupby(userCol).agg({
        "y":'count',
        "rank":'count',
        "is_pt": 'sum'
        })
    total.columns = ["true_num", "predict_num", "pred_true_num"]
    total["macro_p"] = total["pred_true_num"]/total["predict_num"]
    total["macro_r"] = total["pred_true_num"]/total["true_num"]

    total = total[total["predict_num"]>0]

    macro_p = total[total["predict_num"]>0]["macro_p"].mean()
    macro_r = total[total["true_num"]>0]["macro_r"].mean()

    micro_p = total["pred_true_num"].sum()/total["predict_num"].sum()
    micro_r = total["pred_true_num"].sum()/total["true_num"].sum()

    print("macro_p: ", macro_p, "; macro_r :", macro_r)
    print("micro_p: ", micro_p, "; micro_r :", micro_r)

    if is_return_df:
        return (macro_p, macro_r, micro_p, micro_r), total

    return macro_p, macro_r, micro_p, micro_r

In [None]:
%%time
eval_map = map_at_k(y_true, y_pred, col_user = userCol, col_item = itemCol ,col_prediction='rating', col_rating="rating", k=top_k)
eval_ndcg = ndcg_at_k(y_true, y_pred, col_user = userCol, col_item = itemCol ,col_prediction='rating', col_rating="rating", k=top_k)
eval_precision = precision_at_k(y_true, y_pred, col_user = userCol, col_item = itemCol ,col_prediction='rating', col_rating="rating", k=top_k)
eval_recall = recall_at_k(y_true, y_pred, col_user = userCol, col_item = itemCol, col_prediction='rating', col_rating="rating", k=top_k)

print('K = %f' % top_k)
print(
    "MAP:\t%f" % eval_map,
      "NDCG:\t%f" % eval_ndcg,
      "Precision@K:\t%f" % eval_precision,
      "Recall@K:\t%f" % eval_recall, sep='\n')

In [None]:
%%time
eval_metrics, eval_df = evaluate_rs(y_true, y_pred, is_return_df= True)