In [1]:
# load data
import tensorflow as tf
import keras.backend as K
import numpy as np
from datetime import datetime
from tqdm import tqdm
import pandas as pd
import boto3
import s3fs
import logging
import sys
import pickle 
import io
sys.path.append("../")

s3 = boto3.client('s3') 
s3FS = s3fs.S3FileSystem()



#### Keras Model

In [2]:
def ensure_dir(file_path, bucket_name):
    """Creates file path in s3 bucket name or validates if exists
    
    Args:
        file_path - string with directory name
        bucket_name - string bucket
    """
    # read s3
    res = s3.list_objects_v2(
        Bucket=bucket_name,
        Prefix=file_path
    )
        
    if not 'Contents' in res:
        s3.put_object(Bucket=bucket_name, Key=(file_path+'/')) 
    
    else:
        pass

def save_matrix(np_array, ids, bucket_name, key):
    """Saves tensors to bucket
    
    Args:
        array - np array
        ids - tensor with ids of members or items
        bucket_name - string with bucket name
        key - directory name + /model/ + filename.pkl
    """
    # dump tensor to bytes
    buffer = io.BytesIO()
    matrix = pd.DataFrame(np_array, index = ids)
    pickle.dump(matrix, buffer)
    buffer.seek(0)
    s3.upload_fileobj(buffer, bucket_name, key)   


def get_pop(train,batch_size):
    """Calculates pop

    Args:
        train - tensordataframe
    """
    pop = 0

    for batch in tqdm(train.batch(batch_size)):
        pop += tf.math.reduce_sum(tf.where(tf.math.greater(batch[:,:-1], 0), 1.0, 0.0), axis = 0 )

    return pop


class SerLogisticMF():

    def __init__(self, save_path, bucket_name, F, num_users, num_items, popularity, lr, l2=1e-4, l2_bias=.001, alpha = .5):
        self.F = F
        self.learn_rate = lr
        self.l2 = tf.keras.regularizers.l2(l2)
        self.l2_bias = tf.keras.regularizers.l2(l2_bias)
        self.alpha = alpha
        self.m, self.n = (num_users, num_items)
        self.popularity_pow = tf.pow(popularity, tf.Variable(self.alpha))
        self.save_path = save_path
        self.bucket_name = bucket_name
        
        self.model = self.create_model()
    
    def difference(self,X):
        n = tf.shape(X)[0]
        m = tf.shape(X)[1]
        X1 = tf.expand_dims(X, -1)
        X2 = tf.reshape(X, (n, 1, m))
        return tf.math.subtract(X1, X2)
    
  
    def coherent(self,X):
        n = tf.shape(X)[0]
        m = tf.shape(X)[1]
        X1 = tf.expand_dims(X, -1)
        X2 = tf.reshape(X, (n, 1, m))
        return tf.where(tf.math.greater(X1, X2), 1.0, 0.0)
    
    def auc_metric(self, batch, y_pred):
        """
        Calculate AUC
        """
        coherent_pairs = self.coherent(batch[:,:-1])
        auc = (\
                 self.coherent(y_pred)*\
                 coherent_pairs
                ) 

        auc = tf.math.reduce_sum(auc, axis =1)
        auc = tf.math.reduce_sum(auc, axis =1)
        denumerator = tf.reduce_sum(tf.reduce_sum(coherent_pairs,axis=1),axis=1)
        auc = tf.math.divide(auc, denumerator)

        return auc
    

    def sauc_loss(self,batch,y_pred):
        """
        Calculate loss

        Args:
          minibatch: minibatch size batchsize from ratings matrix
        """
        #1. Calculate SAUC per member in batch
        coherent_pairs = self.coherent(batch[:,:-1])
        loss = (\
                 -tf.math.log_sigmoid(self.difference(y_pred))*\
                 coherent_pairs*\
                 self.popularity_pow
                ) 
    
        loss = tf.math.reduce_sum(loss, axis =1)
        loss = tf.math.reduce_sum(loss, axis =1)
        denumerator = tf.reduce_sum(tf.reduce_sum(coherent_pairs,axis=1),axis=1)
        loss = tf.math.divide(loss, denumerator)
        
        #3. Calculate loss for batch
        loss = tf.reduce_mean(loss)

        return loss 

    def create_model(self):
        inputs = tf.keras.layers.Input(shape=(self.n+1,), name="inputs")
        
        user_embeddings = tf.keras.layers.Embedding(
          input_dim=self.m, output_dim=self.F, name="user_embedding",
          embeddings_regularizer=self.l2,
          embeddings_initializer=tf.keras.initializers.RandomNormal(stddev=1/np.sqrt(self.F)))(inputs[:,-1])
        
        items_range = tf.range(self.n, delta=1, dtype=tf.float32)
        items_inputs = K.ones_like(inputs[:,:-1],dtype=tf.float32)*items_range
        
        item_embeddings = tf.keras.layers.Embedding(
          input_dim=self.n, output_dim=self.F, name="item_embedding",
          embeddings_regularizer=self.l2,
          embeddings_initializer=tf.keras.initializers.RandomNormal(stddev=1/np.sqrt(self.F)))(items_inputs)
        
        item_embeddings = tf.keras.layers.Permute((2, 1))(item_embeddings)
        
        dots = tf.keras.layers.Lambda(lambda x: K.batch_dot(x[0],x[1]))([user_embeddings, item_embeddings])
        
        user_biases = tf.keras.layers.Embedding(
            input_dim=self.m, output_dim=1, name="user_bias",
            embeddings_regularizer=self.l2_bias)(inputs[:,-1])
        
        item_biases = tf.keras.layers.Embedding(
            input_dim=self.n, output_dim=1, name="item_bias",
            embeddings_regularizer=self.l2_bias)(items_inputs)
        
        item_biases = K.squeeze(item_biases,2)
        
        dots = tf.keras.layers.Add()([dots, user_biases, item_biases])
        
        model = tf.keras.Model(
          name="matrix_factorizer",
          inputs=[inputs], outputs=dots)
        
        model.add_loss(self.sauc_loss(inputs,dots))
        model.add_metric(self.auc_metric(inputs,dots), name='auc_metric',aggregation='mean')
        
        model.compile(
          optimizer=tf.keras.optimizers.Adam(lr=self.learn_rate)
        )
        
        return model
    
    def fit(self, train, train_size, validation, val_size, epochs = 100, batch_size = 128, patience=5,path_name_log="../results/SerLMF/model_history_log.csv"):
        
        train = train.shuffle(buffer_size=8*batch_size).batch(batch_size)
        steps_per_epoch = train_size // batch_size
        
        
        validation = validation.batch(batch_size)
        validation_steps = val_size // batch_size
        
        callback = tf.keras.callbacks.EarlyStopping(monitor='val_auc_metric', patience=patience, mode='max')
        csv_logger = tf.keras.callbacks.CSVLogger(path_name_log, append=True)
        
        history = self.model.fit(
            train,
            epochs = epochs,
            validation_data = validation, 
            callbacks = [callback, csv_logger],
            validation_steps = validation_steps,
            use_multiprocessing = True,
            max_queue_size = 512,
            workers=8,
            steps_per_epoch = steps_per_epoch
        )
        
        return history
    
    def save(self, users_id,items_id): 
        """Save matrix tensors to S3"""
        # ensure save path exists
        ensure_dir(self.save_path, self.bucket_name)
    
        keys = [self.save_path + '/model/P.pkl', self.save_path + '/model/Q.pkl', self.save_path + '/model/bias_u.pkl', self.save_path + '/model/bias_i.pkl' ]
        matrices = [
              self.model.get_weights()[1]   # User embedding
            , self.model.get_weights()[0]   # Item embedding
            , self.model.get_weights()[3]   # User bias embedding
            , self.model.get_weights()[2]   # Item bias embedding

        ]

        ids = [users_id, items_id, users_id, items_id]
        for matrix, idx, key in zip(matrices, ids, keys):
            save_matrix(matrix, idx, self.bucket_name, key)


#### Train

In [6]:
##### CREATE DUMMY DATA
train_size = 10000
val_size = 1000
item_size = 1600

train = tf.random.normal([train_size,item_size], 0, 1, tf.float32)
ids = tf.reshape(tf.cast(tf.range(0, train_size, 1),tf.float32),(-1,1))
train = tf.concat([train,ids] , axis=1)
train = tf.data.Dataset.from_tensor_slices(train)

validation = tf.random.normal([val_size,item_size], 0, 1, tf.float32)
ids = tf.reshape(tf.cast(tf.range(0, val_size, 1),tf.float32),(-1,1))
validation = tf.cast(tf.concat([validation,ids] , axis=1),tf.float32)
validation = tf.data.Dataset.from_tensor_slices(validation)

train_user_ids = np.arange(0,train_size)
train_item_ids = np.arange(0,item_size)

In [7]:
# poppularity
popularity =  get_pop(train,128)

100%|██████████| 79/79 [00:00<00:00, 149.36it/s]


In [8]:
# train
SerLMF = SerLogisticMF(save_path='',
                       bucket_name="", 
                       F=100, 
                       popularity = popularity, 
                       num_users=train_size,
                       num_items=item_size, 
                       l2 = 1e-4,
                       l2_bias = 1 ,   
                       lr=.01 , 
                       alpha =.5)

In [None]:
# Train model
history = SerLMF.fit(train, train_size, validation, val_size ,epochs = 5, batch_size = 512, patience=2)

Epoch 1/5


  "Converting sparse IndexedSlices to a dense Tensor of unknown shape. "


