In [26]:
import numpy as np
import pandas as pd
import torch
import connectorx as cx
from tqdm.auto import tqdm
from sklearn.metrics import log_loss, roc_auc_score
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from preprocessing.inputs import SparseFeat, DenseFeat, VarLenSparseFeat
from model.dssm import DSSM
from deepctr_torch.callbacks import EarlyStopping, ModelCheckpoint
import random


def data_process(data_path, samp_rows=100000):
    data = pd.read_csv(data_path)
    # data = data.drop(data[data['rating'] == 3].index)
    data['rating'] = data['rating'].apply(lambda x: 1 if x >= 3 else 0)
    data = data.sort_values(by='timestamp', ascending=True)
    train,test = train_test_split(data,test_size= 0.2 )
    return train, test, data


def get_user_feature(data):
    data_group = data[data['rating'] == 1]
    data_group = data_group[['user_id', 'movie_id']].groupby('user_id').agg(list).reset_index()
    data_group['user_hist'] = data_group['movie_id'].apply(lambda x: '|'.join([str(i) for i in x]))
    data = pd.merge(data_group.drop('movie_id', axis=1), data, on='user_id')
    data_group = data[['user_id', 'rating']].groupby('user_id').agg('mean').reset_index()
    data_group.rename(columns={'rating': 'user_mean_rating'}, inplace=True)
    data = pd.merge(data_group, data, on='user_id')
    return data


def get_item_feature(data):
    data_group = data[['movie_id', 'rating']].groupby('movie_id').agg('mean').reset_index()
    data_group.rename(columns={'rating': 'item_mean_rating'}, inplace=True)
    data = pd.merge(data_group, data, on='movie_id')
    return data


def get_var_feature(data, col):
    key2index = {}

    def split(x):
        key_ans = x.split('|')
        for key in key_ans:
            if key not in key2index:
                # Notice : input value 0 is a special "padding",\
                # so we do not use 0 to encode valid feature for sequence input
                key2index[key] = len(key2index) + 1
        return list(map(lambda x: key2index[x], key_ans))

    var_feature = list(map(split, data[col].values))
    var_feature_length = np.array(list(map(len, var_feature)))
    max_len = max(var_feature_length)
    var_feature = pad_sequences(var_feature, maxlen=max_len, padding='post', )
    return key2index, var_feature, max_len


def get_test_var_feature(data, col, key2index, max_len):
    # print("user_hist_list: \n")

    def split(x):
        key_ans = x.split('|')
        for key in key_ans:
            if key not in key2index:
                # Notice : input value 0 is a special "padding",
                # so we do not use 0 to encode valid feature for sequence input
                key2index[key] = len(key2index) + 1
        return list(map(lambda x: key2index[x], key_ans))

    test_hist = list(map(split, data[col].values))
    test_hist = pad_sequences(test_hist, maxlen=max_len, padding='post')
    return test_hist



def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True

import time
class Timer(object):
    """A convenient class to measure the running time of a program

    """
    def __init__(self):
        self.start = 0
        self.end = 0
    
    def tic(self):
        """Tic the start time

        """
        self.start = time.perf_counter()
    
    def toc(self):
        """Toc the end time and return the running time

        Returns:
            float: running time (ms)
        """
        self.end = time.perf_counter()
        return (self.end - self.start)*1000
        
import psycopg2
import pandas as pd
from sqlalchemy import create_engine
  
conn_string = "postgresql://postgres:postgres@localhost:5432/postgres"
  
db = create_engine(conn_string)
conn = db.connect()

In [29]:
# %%

embedding_dim = 32

epoch = 15
batch_size = 2048
lr = 0.001
seed = 1023
dropout = 0.3

setup_seed(seed)

# retrieve full data to obtain the encoder
query = "SELECT * FROM dssm_movielens"
data = cx.read_sql(conn_string, query)

In [30]:
data.head()

Unnamed: 0,user_id,movie_id,rating,timestamp,gender,age,occupation,zipcode,title,genres
0,1,1105,5,978300760,F,1,10,48067,One Flew Over the Cuckoo's Nest (1975),Drama
1,1,640,3,978302109,F,1,10,48067,James and the Giant Peach (1996),Animation|Children's|Musical
2,1,854,3,978301968,F,1,10,48067,My Fair Lady (1964),Musical|Romance
3,1,3178,4,978300275,F,1,10,48067,Erin Brockovich (2000),Drama
4,1,2163,5,978824291,F,1,10,48067,"Bug's Life, A (1998)",Animation|Children's|Comedy


In [31]:
sparse_features = ['user_id', 'movie_id', 'gender', 'age', 'occupation']
dense_features = ['user_mean_rating', 'movie_mean_rating']
target = ['rating']
device = 'cpu'
user_sparse_features, user_dense_features = ['user_id', 'gender', 'age', 'occupation'], ['user_mean_rating']
item_sparse_features, item_dense_features = ['movie_id', ], ['movie_mean_rating']
dict_encoder = dict()
for feat in sparse_features:
    lbe = LabelEncoder()
    lbe.fit(data[feat])
    dict_encoder[feat] = lbe

genres_key2index, train_genres_list, genres_maxlen = get_var_feature(data, 'genres')

user_feature_columns = [SparseFeat(feat, data[feat].nunique(), embedding_dim=embedding_dim)
                        for i, feat in enumerate(user_sparse_features)] + [DenseFeat(feat, 1, ) for feat in
                                                                           user_dense_features]
item_feature_columns = [SparseFeat(feat, data[feat].nunique(), embedding_dim=embedding_dim)
                        for i, feat in enumerate(item_sparse_features)] + [DenseFeat(feat, 1, ) for feat in
                                                                           item_dense_features]
model = DSSM(user_feature_columns, item_feature_columns, task='binary', device=device)
model.compile("adam", "binary_crossentropy", metrics=['auc', 'accuracy', 'logloss']
              , lr=lr)

In [78]:
sqlUserData = "SELECT user_id, gender, age, rating, occupation FROM dssm_movielens WHERE user_id IN {}"
sqlMovieData = "SELECT movie_id, rating, genres FROM dssm_movielens WHERE movie_id IN {}"

randomSeed = 0
numSamples = 50
np.random.seed(0)

sampledUserId = np.random.randint(1, 6041, numSamples)
sampledMovieId = np.random.randint(1, 3707, numSamples)

In [87]:
timer_end_end = Timer()
timer_load_data = Timer()
timer_data_preprocess = Timer()

t_end_end = 0
t_load_data = 0
t_data_preprocess = 0
t_nn = 0

num_loop = 1

timer_end_end.tic()
for _ in tqdm(range(num_loop)):
    # data loading
    timer_load_data.tic()
    userData = cx.read_sql(conn_string, sqlUserData.format(tuple(sampledUserId)))
    movieData = cx.read_sql(conn_string, sqlMovieData.format(tuple(sampledMovieId)))
    
    t_load_data += timer_load_data.toc()

    # preprocessing
    timer_data_preprocess.tic()
    userIDDf = pd.DataFrame({'user_id': sampledUserId})
    userData['rating'] = userData['rating'].apply(lambda x: 1 if x >=3 else 0)
    userData = userData.groupby(['user_id', 'gender', 'age', 'occupation']).agg('mean').reset_index()
    userData.rename(columns={'rating': 'user_mean_rating'}, inplace=True)
    fetchedUserData = userData.copy()
    userData = pd.merge(userIDDf, userData, on='user_id', how='left')
    
    movieIDDf = pd.DataFrame({'movie_id': sampledItemId})
    movieData['rating'] = movieData['rating'].apply(lambda x: 1 if x >=3 else 0)
    movieData = movieData.groupby(['movie_id', 'genres']).agg('mean').reset_index()
    movieData.rename(columns={'rating': 'movie_mean_rating'}, inplace=True)
    fetchedMovieData = movieData.copy()
    movieData = pd.merge(movieIDDf, movieData, on='movie_id', how='left')
    
    mergedData = pd.concat([userData, movieData], axis=1)
    preprocessedData = mergedData.copy()
    for feat in sparse_features:     
        lbe = dict_encoder[feat]
        mergedData[feat] = lbe.transform(mergedData[feat])
    
    test_genres_list = get_test_var_feature(mergedData, 'genres', genres_key2index, genres_maxlen)
    test_model_input = {name: mergedData[name] for name in sparse_features + dense_features}
    test_model_input["genres"] = test_genres_list

    t_data_preprocess += timer_data_preprocess.toc()

    # model inference 
    model.clear_time()
    y_preds = model.predict(test_model_input)

    t_nn += np.sum(model.t)

t_end_end += timer_end_end.toc()

  0%|          | 0/1 [00:00<?, ?it/s]

## Data Preview

In [84]:
# query user-movie pair
pd.DataFrame({"user_id": sampledUserId, "movie_id": sampledMovieId}).head(5)

Unnamed: 0,user_id,movie_id
0,2733,3623
1,2608,3561
2,1654,757
3,3265,274
4,4932,2384


In [83]:
# fetched user data 
fetchedUserData.head(5)

Unnamed: 0,user_id,gender,age,occupation,user_mean_rating
0,100,M,35,17,0.763158
1,152,M,18,4,0.916667
2,424,M,25,17,0.902121
3,538,M,56,16,1.0
4,545,M,35,17,0.862069


In [85]:
# fetched movie data 
fetchedMovieData.head(5)

Unnamed: 0,movie_id,genres,movie_mean_rating
0,85,Adventure|Drama,0.868644
1,92,Drama,0.895604
2,258,Drama,0.8
3,274,Drama|Thriller,0.882075
4,325,Drama,0.903509


In [89]:
# preprocessed data: after label encoder
mergedData.head(5)

Unnamed: 0,user_id,gender,age,occupation,user_mean_rating,movie_id,genres,movie_mean_rating
0,2732,1,4,17,0.989899,3622,Comedy,0.864162
1,2607,0,2,1,0.925439,3560,Drama,0.888889
2,1653,0,4,0,0.957447,756,Drama|Romance,0.843091
3,3264,1,2,15,0.608268,273,Drama|Thriller,0.882075
4,4931,1,2,7,0.885542,2383,Crime,0.921162
