In [1]:
from pyspark.ml.linalg import Vectors
import pyspark.sql.functions as f
from math import sqrt
from numpy.linalg import norm
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, rank, monotonically_increasing_id 
from operator import add
from pyspark.sql import Row

In [2]:
def cosine(a,b):
    """
       return cosine similirity of 2 two vector a and b
    """
    s = 0
    for i,j in zip(a,b):
        s = s + i*j
    return s/(norm(a)*norm(b))

In [3]:
movies = spark.read.options(delimiter=',', header=True) \
               .csv("/home/thanhhung/Downloads/movies.csv")
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [4]:
movies = movies.withColumn('genres_array', f.split(f.col('genres'), '\|'))\
    .withColumn('genres', f.explode('genres_array')).select('movieId', 'genres', 'title')
movies.show(5)

+-------+---------+----------------+
|movieId|   genres|           title|
+-------+---------+----------------+
|      1|Adventure|Toy Story (1995)|
|      1|Animation|Toy Story (1995)|
|      1| Children|Toy Story (1995)|
|      1|   Comedy|Toy Story (1995)|
|      1|  Fantasy|Toy Story (1995)|
+-------+---------+----------------+
only showing top 5 rows



In [5]:
movies_pivot = movies.groupBy('movieId').pivot('genres').agg(f.count('genres')).drop('(no genres listed)')
movies_pivot.show(5)

+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+----+-------+
|movieId|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|IMAX|Musical|Mystery|Romance|Sci-Fi|Thriller| War|Western|
+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+----+-------+
|   2294|  null|        1|        1|       1|     1| null|       null| null|      1|     null|  null|null|   null|   null|   null|  null|    null|null|   null|
|   2162|  null|        1|     null|       1|  null| null|       null| null|      1|     null|  null|null|   null|   null|   null|  null|    null|null|   null|
|   3210|  null|     null|     null|    null|     1| null|       null|    1|   null|     null|  null|null|   null|   null|      1|  null|    null|null|   null|
|   3959|     1|        1|     null|    

In [6]:
movies_pivot = movies_pivot.na.fill(0)
movies_pivot.show(5)

+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|movieId|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|IMAX|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+-------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|   2294|     0|        1|        1|       1|     1|    0|          0|    0|      1|        0|     0|   0|      0|      0|      0|     0|       0|  0|      0|
|   2162|     0|        1|        0|       1|     0|    0|          0|    0|      1|        0|     0|   0|      0|      0|      0|     0|       0|  0|      0|
|   3210|     0|        0|        0|       0|     1|    0|          0|    1|      0|        0|     0|   0|      0|      0|      1|     0|       0|  0|      0|
|   3959|     1|        1|        0|       0| 

In [7]:
movies_feature = movies_pivot.select("movieId", f.array(movies_pivot.columns[1:]).alias("feature"))
movies_feature.show(5)

+-------+--------------------+
|movieId|             feature|
+-------+--------------------+
|   2294|[0, 1, 1, 1, 1, 0...|
|   2162|[0, 1, 0, 1, 0, 0...|
|   3210|[0, 0, 0, 0, 1, 0...|
|   3959|[1, 1, 0, 0, 0, 0...|
|    467|[0, 0, 0, 0, 1, 0...|
+-------+--------------------+
only showing top 5 rows



In [8]:
movies_feature = movies_feature.rdd.map(lambda row: Row(movieId=row["movieId"], feature=Vectors.dense(row["feature"]))).toDF()
movies_feature.printSchema()

[Stage 34:>                                                         (0 + 1) / 1]

root
 |-- movieId: string (nullable = true)
 |-- feature: vector (nullable = true)



                                                                                

In [9]:
movies_feature.show(5)

+-------+--------------------+
|movieId|             feature|
+-------+--------------------+
|   2294|[0.0,1.0,1.0,1.0,...|
|   2162|[0.0,1.0,0.0,1.0,...|
|   3210|[0.0,0.0,0.0,0.0,...|
|   3959|[1.0,1.0,0.0,0.0,...|
|    467|[0.0,0.0,0.0,0.0,...|
+-------+--------------------+
only showing top 5 rows



In [10]:
ratings = spark.read.options(delimiter=',', header=True).csv("/home/thanhhung/Downloads/ratings.csv").select('userId', 'movieId', 'rating')
ratings.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [11]:
df = ratings.join(movies_feature, on='movieId').select('userId', 'movieId', 'feature', 'rating')
df.show(5)

+------+-------+--------------------+------+
|userId|movieId|             feature|rating|
+------+-------+--------------------+------+
|   608|   2294|[0.0,1.0,1.0,1.0,...|   4.0|
|   600|   2294|[0.0,1.0,1.0,1.0,...|   2.5|
|   596|   2294|[0.0,1.0,1.0,1.0,...|   3.0|
|   580|   2294|[0.0,1.0,1.0,1.0,...|   4.0|
|   561|   2294|[0.0,1.0,1.0,1.0,...|   2.0|
+------+-------+--------------------+------+
only showing top 5 rows



In [12]:
def get_user_vectors(df, userId):
    return df.filter(df.userId == userId).select(col("feature"), col("rating"))

In [13]:
def get_not_rating_vectors(df, userId):
    return df.filter(df.userId != userId).select(col("feature").alias("candidate_feature"), col("movieId").alias("candidate_id")).dropDuplicates()

In [14]:
vec = get_user_vectors(df, 1)
vec = vec.withColumn("rating", vec["rating"].cast("float"))
vec.show(5)

+--------------------+------+
|             feature|rating|
+--------------------+------+
|[0.0,0.0,0.0,0.0,...|   3.0|
|[0.0,0.0,0.0,0.0,...|   4.0|
|[1.0,0.0,0.0,0.0,...|   5.0|
|[0.0,0.0,0.0,0.0,...|   4.0|
|[0.0,0.0,1.0,0.0,...|   5.0|
+--------------------+------+
only showing top 5 rows



In [15]:
not_vec = get_not_rating_vectors(df, 1).head(20)
not_vec =spark.createDataFrame(not_vec)

                                                                                

In [16]:
class KNN_recommender:
    def __init__(self, user_id, watched_df, not_watched_df, k):
        """
            watched_df : dataframe of items that the user already watched and rated
                column "feature": vector of float elements
                column "rating" : float
                
            not_watched_df: dataframe of items that the user is yet to watched
                column "candidate_feature": vector of float elements
                column "candidate_id": any
        """
        self.userId = user_id
        self.watched_df = watched_df
        self.not_watched_df = not_watched_df
        self.k = k
    
    def get_ranked_list(self):
        
        k = self.k
        w = self.watched_df
        n = self.not_watched_df
        
        cross_join_df = w.crossJoin(n)
    
        cosine_mapped_df = cross_join_df\
                    .rdd.map(lambda x: (float(cosine(x.feature,x.candidate_feature)), x.rating, x.candidate_id))\
                    .toDF(["cosine_score", "rating", "candidate_id"])
    
        windowDept = Window.partitionBy("candidate_id").orderBy(col("cosine_score").desc(), col("rating").desc())
    
        top_k_nearest_items_df = cosine_mapped_df\
                                .withColumn("row",row_number().over(windowDept)).filter(col("row") <= k)
    
        predicted_rating_df = top_k_nearest_items_df.rdd.map(lambda x: (x.candidate_id, x.cosine_score*x.rating))\
                                                    .reduceByKey(add)\
                                                    .map(lambda x: (x[0], x[1]/k))\
                                                    .toDF(["candidate_id", "predicted_rating"])\
                                                    .orderBy(col("predicted_rating").desc())
    
        return predicted_rating_df
    
    
    def RMSE(self):
        k = self.k
        w = self.watched_df.orderBy(f.rand())
        test_size = int(w.count()/4)
        test_df = w.limit(test_size)
        train_df = w.subtract(test_df)
        
        test_df = test_df.withColumnRenamed("feature", "candidate_feature")\
                                    .withColumn("candidate_id", monotonically_increasing_id())
        
        actual_rating_df = test_df.select(col("candidate_id"), col("rating").alias("actual_rating"))
        
        cross_join_df = train_df.crossJoin(test_df)
    
        cosine_mapped_df = cross_join_df\
                    .rdd.map(lambda x: (float(cosine(x.feature,x.candidate_feature)), x.rating, x.candidate_id))\
                    .toDF(["cosine_score", "rating", "candidate_id"])
    
        windowDept = Window.partitionBy("candidate_id").orderBy(col("cosine_score").desc(), col("rating").desc())
    
        top_k_nearest_items_df = cosine_mapped_df\
                                .withColumn("row",row_number().over(windowDept)).filter(col("row") <= k)
    
        predicted_rating_df = top_k_nearest_items_df.rdd.map(lambda x: (x.candidate_id, x.cosine_score*x.rating))\
                                                    .reduceByKey(add)\
                                                    .map(lambda x: (x[0], x[1]/k))\
                                                    .toDF(["candidate_id", "predicted_rating"])
        
        join_df = predicted_rating_df.join(actual_rating_df, ["candidate_id"])
        
        
        result_rdd = join_df.rdd.map(lambda x: ((x.predicted_rating - x.actual_rating)**2))
        
        return (result_rdd.reduce(add)/result_rdd.count())**(1/2)
        

In [17]:
a = KNN_recommender(1, vec, not_vec, 5)

In [18]:
a.RMSE()

1.2053961814715568

In [19]:
a.get_ranked_list().show()



+------------+------------------+
|candidate_id|  predicted_rating|
+------------+------------------+
|        5872| 5.000000000000001|
|       78160|               5.0|
|        2759|               5.0|
|       81537|               5.0|
|        2596|4.3999999999999995|
|        2835| 4.330127018922194|
|      103228| 4.330127018922194|
|        3577| 4.176800626856454|
|       72694| 4.121320343559643|
|      161032|4.0824829046386295|
|        4580|4.0494897427831775|
|        1307|3.8531972647421804|
|        1965|3.7760883751542678|
|        6616| 3.646494472526361|
|       79536| 3.535533905932737|
|        4833|3.3491886513788316|
|      180497|3.1565965239697253|
|        7320|3.1112698372208087|
|        5502| 3.053197264742181|
|         128|               0.0|
+------------+------------------+



                                                                                