## 4. Stochastic Gradient Descent (SGD)

Collaborative filtering is one of the most used types of recommendation systems. It is also known as the Nearest Neighborhood algorithm. Collaborative filtering produces recommendations based on the knowledge of users’ preference for items. Stochastic gradient descent (SGD) is widely used for collaborative filtering. It is an approach that randomly picks one data point from the whole dataset at each iteration and gradually travels down by updating the parameters until it reaches the lowest point of that function.

In [24]:
import spark
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType
import math

At first, I load the main dataset "user_artist_data.txt", and then replace the "goodid" with "badid" using the information in "artist_alias.txt". 

In [25]:
#read data
spark = SparkSession.builder.appName('recommend').master('local').getOrCreate()
user_artist_df = spark.read.text('./profiledata_06-May-2005/user_artist_data.txt')
artist_alias_df = spark.read.text('./profiledata_06-May-2005/artist_alias.txt')       
user_artist = user_artist_df.withColumn('fields', split(col('value'), ' ')).select(
            col('fields').getItem(0).alias('user_id').cast('int'),
            col('fields').getItem(1).alias('artist_id').cast('int'),
            col('fields').getItem(2).alias('play_count').cast('int')
        ).drop('fields')
artist_alias = artist_alias_df.withColumn('fields', split(col('value'), '\t')).select(
            col('fields').getItem(0).alias('artist_id').cast('int'),
            col('fields').getItem(1).alias('goodid').cast('int')
        ).drop('fields')

#replace bad id to good id        
df = user_artist.join(artist_alias, ['artist_id'], 'left')
train_df = df.selectExpr('user_id', 'ifnull(goodid, artist_id) as artist_id', 'play_count')   
#train_df = train_df.limit(5000) #first 5000 data (for training only)  

#aggregate the views on the same user on the same artist
train_df = train_df.groupBy(['user_id', 'artist_id']).agg({'play_count': 'sum'}).withColumnRenamed('sum(play_count)', 'play_count')

The play count of an artist is an implicit rating, which is the measurement that does not give direct feedback from users to show how much they like an item. People may have a different habit when listening to music, some people tend to repeat one song all the time, others may like to shuffle play even they like the artist. To avoid this, I add a new column "score", which scale the "play count" column to the range of 0-1 by dividing each play count by every user's maximum play number. 

In [26]:
#standardized play_count
@udf(FloatType())
def scaler_score(count, max_count):
    return count/max_count

max_train_df = train_df.groupBy('user_id').agg({'play_count': 'max'}).withColumnRenamed('max(play_count)', 'max_count')
recommend_train_df = train_df.join(max_train_df, ['user_id'], 'left').withColumn('score', scaler_score('play_count', 'max_count')) \
    .select('user_id', 'artist_id', 'play_count', 'score')

There is an n × m matrix of the score, n represents the number of users and m indicates the number of artists. In order to predict the rating $r_{ij}$ if target user i never listen to artist j before, I need to calculate the similarities between target user i and all other users, then make an artists recommendation for a target user by referring to the artist preference of top n most similar users.

In class "Recommend", "fit" function trains the model to get the similarity between users. It first gathers the artists and the corresponding scores that related to the same user into the format like ('user_id', \[(artist1,score1), (artist2,score2), ...\]) and implements the cartesian product. Then the function calculates users' similarity between every two users by applying cosine similarity to their scores of common artists. Cosine similarity is calculated as $cos(x_1, x_2) = \frac{x_1*x_2}{|x_1|*|x_2|}$. The greater the cosine value, the more the match between two vectors, 0 means there is no match.

In [27]:
class Recommend:
    '''
    Recommendation algorithm
    '''
    def __init__(self, uc='user_id', ic='item_id', sc='score', min_common_item=1):
        '''
        @param uc: column name of user id，default is "user_id"
        @param ic: column name of artist id，default is "item_id"
        @param sc: column name of score，default is "score"
        @param min_common_item: the minimum number of common artists between two users，defualt value is 1
        '''
        self.uc = uc
        self.ic = ic
        self.sc = sc
        self.min_common_item = min_common_item #only calculate similarity when there are common artists between two user 
        self.__sim_df = '' #similarity matrix
    
    def fit(self, train_df):
        '''
        Train the model to get similarity between users
        '''
        rating_by_user_rdd = train_df.rdd.map(lambda x: (x[self.uc], (x[self.ic], x[self.sc]))).groupByKey().mapValues(list)
        rating_cross = rating_by_user_rdd.cartesian(rating_by_user_rdd)#cartesian product
        user_sim_rdd = rating_cross.map(self.__user_sim).filter(lambda x: x is not None).groupByKey().mapValues(list) #calculate consine similarity between two user 
        schema = StructType([StructField("uid", IntegerType(), True), StructField("user_id", IntegerType(), True), \
                     StructField("sim", FloatType(), True)])
        sim_df = user_sim_rdd.flatMapValues(lambda x: x).map(lambda x:(x[0], x[1][0], float(x[1][1]))) \
            .toDF(schema=schema)
        self.__sim_df = sim_df

    def __user_sim(self, xi):
        '''
        Calculate users' similarity
        '''
        l1 = xi[0][1] #artist and score by user 1
        l2 = xi[1][1] #artist and score by user 2
        common_item = set([kv[0] for kv in l1]).intersection(set([kv[0] for kv in l2])) #same artists between two users
        if len(common_item) >= self.min_common_item: #if have commmon artist
            vector_1 = [kv[1] for kv in l1 if kv[0] in common_item] #scores of common artist
            vector_2 = [kv[1] for kv in l2 if kv[0] in common_item]
            cos = np.around(self.__cos_sim(vector_1, vector_2), 5) #cosine similarity
            return (xi[0][0],  (xi[1][0], cos))

    def __cos_sim(self, vector_a, vector_b):
        '''
        Calculate the cosine similarity
        @param vector_a: user 1 vector (including all artists and corresponding scores related to a user)
        @param vector_b: user 2 vector 
        '''
        vector_a = np.mat(vector_a) 
        vector_b = np.mat(vector_b)
        num = float(vector_a * vector_b.T)
        denom = np.linalg.norm(vector_a) * np.linalg.norm(vector_b)
        cos = num / denom
        return cos
    
    def get_sim_n(self, user_id, n): 
        '''
        Get the most similiar n user to user_id 
        '''
        sdg_sim = self.__sim_df.where('uid="'+user_id+'"').orderBy(desc('sim')).limit(n)
        return sdg_sim
    
    def get_sim_df(self):
        return self.__sim_df

In [28]:
recommend_model = Recommend(uc='user_id', ic='artist_id', sc='score')
recommend_model.fit(recommend_train_df)#get similarity between user

Next, I implement the SGD algorithm in function "fit_sgd" below. First, I initialized all the biases value of users and artists to zeros. Then I apply the gradient descent method to each partition of data to update the biases value. Bias value, in this case, indicates the parameter we want to optimize using the SGD algorithm. Biases value are updated using $\theta  \leftarrow \theta - \alpha \frac{\partial f_k}{\partial \theta}$. At last, refer to the Parallelized Stochastic Gradient Descent algorithm [http://papers.nips.cc/paper/4006-parallelized-stochastic-gradient-descent.pdf], I calculate an averaged bias value after applying SGD within every partition. 

In [29]:
def init_bias(df, uc='user_id', ic='artist_id'):
    '''
    initialise the bias of users and artists to 0 and store them in dictionary
    '''
    bu_df = df.select(uc).dropDuplicates().withColumn('bu', lit(0)) #add a constant conlumn
    bi_df = df.select(ic).dropDuplicates().withColumn('bi', lit(0))
    bu = bu_df.toPandas().set_index(uc).T.to_dict('int') #{userid :inial bias}
    bi = bi_df.toPandas().set_index(ic).T.to_dict('int') #{artistid :inial bias}
    return bu_df, bi_df, bu, bi
    
def get_gmean(df, sc='play_count'):
    '''
    Calculate the mean
    '''
    global_mean = df.select(mean(sc)).collect()[0][0]
    return global_mean

In [30]:
def fit_sgd(train_df, num_partition, num_iter=500, spark=spark, bu={}, bi={}, uc='user_id', 
        ic='artist_id', sc='play_count', alpha=0.23, rate_alpha=0.99, reg=0.3):
    '''
    SGD implementation
    @param train_df: train dataset
    @param num_partition: number of partition
    @param num_iter: number of iteration, default valuue is 500
    @param spark: SparkSession
    @param bu: users bias，default is empty dictionary
    @param bi: artist bias，default is empty dictionary
    @param uc: column name of user id，default is "user_id"
    @param ic: column name of user id，default is "item_id"
    @param sc: column name of user id，default is "score"   
    @param alpha: learning rate，default value is 0.23
    @param rate_alpha: changing rate of learning rate，default value is 0.99
    @param reg: L2 regularization factor，default value is 0.3
    '''
    prec = 'predict'
    bu_df, bi_df, bu, bi = init_bias(train_df) #inialised bias of userd and artists
    global_mean = get_gmean(train_df) #mean of playcount
    train_rdd = train_df.rdd
    train_rdd = train_rdd.repartition(num_partition) #data repartition
    #print('partition num: ', num_partition)
    
    #broadcast
    bu_c = spark.sparkContext.broadcast(bu)
    bi_c = spark.sparkContext.broadcast(bi)   
    global_mean_c = spark.sparkContext.broadcast(global_mean)
    uc_c = spark.sparkContext.broadcast(uc)
    ic_c = spark.sparkContext.broadcast(ic)
    sc_c = spark.sparkContext.broadcast(sc)
    alpha_c = spark.sparkContext.broadcast(alpha)
    reg_c = spark.sparkContext.broadcast(reg)
     
    def __calculate(iterator):
        '''
        calcualte gradient decent(update bias value) 
        '''
        bu_v = bu_c.value
        bi_v = bi_c.value
        global_mean_v = global_mean_c.value
        uc_v = uc_c.value
        ic_v = ic_c.value
        sc_v = sc_c.value
        alpha_v = alpha_c.value
        reg_v = reg_c.value
        
        for row in iterator:
            uid = row[uc_v] #userid
            iid = row[ic_v] #artistid
            real_rating = row[sc_v] #playcount
            error = real_rating - (global_mean_v + bu_v['bu'][uid] + bi_v['bi'][iid])
            #update bias
            bu_v['bu'][uid] += alpha_v * (error - reg_v * bu_v['bu'][uid])
            bi_v['bi'][iid] += alpha_v * (error - reg_v * bi_v['bi'][iid])
        return (bu_v, bi_v) #return a updated bias dictionary for user and artist
        
    for i in range(num_iter):
        tmp_result = train_rdd.mapPartitions(__calculate) #apply calculate function to all train_rdd partitions
        results = tmp_result.collect()
        bu_result = {}
        bi_result = {}
        for result in results:
            has_bu = 'bu' in result.keys()
            has_bi = 'bi' in result.keys()
            if has_bu: #put user bias in dictionary
                if len(bu_result)==0:
                    bu_result = result['bu']
                else:
                    for k, v in result['bu'].items():
                        bu_result[k] += v
            if has_bi: #put artist bias in dictionary
                if len(bi_result)==0:
                    bi_result = result['bi']
                else:
                    for k, v in result['bi'].items():
                        bi_result[k] += v
                        
        # take bias value from each partition，and calculate mean(PSDG)     
        for k, v in bu_result.items():
            bu_result[k] = v / num_partition
        for k, v in bi_result.items():
            bi_result[k] = v / num_partition
        bu['bu'] = bu_result
        bi['bi'] = bi_result    
        alpha *= rate_alpha #update learning rate
    return bi_df, global_mean, bu, bi

In below, the function "predict" is used to predict the play count, and the performance of models can be evaluated using "evaluate_rmse".

In [31]:
def predict(predict_df, global_mean, bu, bi, uc='user_id', ic='artist_id', prec='predict'):
    '''
    Predict the numebr of play count by adding user bias and item bias to mean play count
    '''
    global_mean_c = spark.sparkContext.broadcast(global_mean)
    bu_c = spark.sparkContext.broadcast(bu)
    bi_c = spark.sparkContext.broadcast(bi)
        
    @udf(FloatType())
    def predict_count(uid, iid):
        global_mean_v = global_mean_c.value
        bu_v = bu_c.value
        bi_v = bi_c.value
        return global_mean_v + bu_v['bu'][uid] + bi_v['bi'][iid]
        
    result_df = predict_df.withColumn(prec, predict_count(predict_df[uc], predict_df[ic]))
    return result_df   


def evaluate_rmse(df, sc='play_count', prec ='predict'):
    '''
    Evaluate the model perfomance using rmse
    '''
    @udf(FloatType())
    def rmse(score, predict):
        return math.pow(score-predict, 2) #(real_playcount-predicted_playcount)^2
    rmse_df = df.withColumn('RMSE2', rmse(df[sc], df[prec])) #rmse   
    RMSE2_sum, count = rmse_df.rdd.map(lambda x: (1, (x['RMSE2'], 1))) \
        .reduceByKey(lambda x1, x2: (x1[0]+x2[0], x1[1]+x2[1])) \
        .collect()[0][1] #sum the RMSE2_sum
    return math.sqrt(RMSE2_sum/count)

A user ID and the number of artists to recommend are inputs of the "recommend" function below. It starts with getting the top n users that have similar music tastes with the input user. Then fits their user vector (including artists they listen to, corresponding play count, and score) into the SGD method to obtain optimize biases for users and artists. A large number of data in one partition would affect the model performance. Therefore, 4000 data are computed in a partition, and the number of iteration is 500.

In [33]:
def recommend(uid, n):
    '''
    @param uid: the user id to recommend 
    @param n: numeber of artist to recommend
    '''
    sdg_sim = recommend_model.get_sim_n(uid, n) #get the most similiary n users
    sdg_df = sdg_sim.join(recommend_train_df, ['user_id'], 'left').select('user_id', 'artist_id', 'play_count', 'score')
    count = sdg_df.count()
    bi_df, global_mean, bu, bi = fit_sgd(sdg_df, num_partition=count // 4000 + 1, num_iter=500) #train sgd model
    
    #predict and get rmse score of model
    test = sdg_df.limit(1000)
    pre_df = predict(test, global_mean, bu, bi)
    rmse = evaluate_rmse(pre_df)
    
    #predict top n artist for user
    result_pre_df = bi_df.withColumn('user_id', lit(uid)).select('user_id', 'artist_id')
    result_pre_df = result_pre_df.select(result_pre_df['user_id'].cast('int'), 'artist_id')
    sgd_result = predict(result_pre_df, global_mean, bu, bi).orderBy(desc('predict')).limit(n) 
            
    return sgd_result, rmse  

#recommand 5 artist to user "1000002"
result, rmse_score = recommend('1000002', 5)

In [44]:
#read artist_data.txt dataset 
artist_schema = StructType([
    StructField("artistId", LongType(), True),    
    StructField("artistName", StringType(), True)])
artist_data_df = spark.read.csv('./profiledata_06-May-2005/artist_data.txt', sep = "\t", header=False, schema = artist_schema)       

A music recommender built by SGD is now down. Next, I try to recommend 5 artists to user '1000002'.

In [43]:
n=5
recommend_list = [row['artist_id'] for row in result.select('artist_id').collect()]
#replace the artist id to the corresponding artist name.
for i in range(n):
    artist = artist_data_df.rdd.filter(lambda x:x[0] == recommend_list[i]).collect()[0][1] 
    print(artist)

Dream Theater
Miles Davis
Less Than Jake
Free
Badly Drawn Boy


Finally, the performance of the SGD model is evaluated by the same criteria as for ALS. 

In [106]:
rmse_score

1.9360720178682889

## 5. Conclusion
In conclusion, collaborative filtering implements ALS and SGD are used in this project. The performance of models is evaluated by RMSE. The RMSE value of these two models is similar. However, the implementation time of the SGD model is much longer than ALS. Therefore, to save the cost of the implementation, ALS does a better job. Overall, I think that the results of this project can be improved and this topic can be developed more in future research.

## Reference
Rajendra Sahu, Manoj Kumar Dash, Anil Kumar. Applying Predictive Analytics Within the Service Sector. P189. 2017.

Sarwar, Badrul, et al. "Item-based collaborative filtering recommendation algorithms." Proceedings of the 10th international conference on World Wide Web. 2001.

Zinkevich, Martin, et al. "Parallelized stochastic gradient descent." Advances in neural information processing systems. 2010.

Zhuang, Yong, et al. "A fast parallel SGD for matrix factorization in shared memory systems." Proceedings of the 7th ACM conference on Recommender systems. 2013.

Manuel Pozo, Raja Chiky. An implementation of a Distributed Stochastic Gradient Descent for Recommender Systems based on Map-Reduce, Oct 2015.