In [1]:
# Importing Implementation Library
import os
import sys
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import sum,avg,max,min,mean,count,sqrt
from pyspark.sql.functions import col,when
from pyspark.sql import functions as F
import sys
from collections import defaultdict
from itertools import combinations
import numpy as np
import random
import csv
import pdb
from pyspark.sql.functions import col,isnan, when, count
from pyspark.ml.evaluation import RegressionEvaluator
from numerize import numerize


In [2]:
#Initialize a spark session.
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [3]:
# Initialise spark object
spark = init_spark()

In [4]:
#Reading ratings data into pyspark dataframe object
anime_ratings_df = spark.read.csv(r"gs://anime-recommmender/sourcefiles/rating_complete.csv",header=True)

#Subsampling ratings data for implementation
#anime_ratings_df_subsample = anime_ratings_df.sample(0.0001)

#Converting pyspark dataframe to an rdd object
#anime_ratings = anime_ratings_df_subsample.rdd
anime_ratings = anime_ratings_df.rdd

#Anime Ratings DataSchema
print("Schema of AnimeRating Data is shown below : ")
anime_ratings_df.show(10)

#Print Total Count of ratings data
TotalCount = anime_ratings.count()
print("Total record count in Ratings Data : ",numerize.numerize(TotalCount))

                                                                                

Schema of AnimeRating Data is shown below : 


                                                                                

+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|      0|     430|     9|
|      0|    1004|     5|
|      0|    3010|     7|
|      0|     570|     7|
|      0|    2762|     9|
|      0|     431|     8|
|      0|     578|    10|
|      0|     433|     6|
|      0|    1571|    10|
|      0|     121|     9|
+-------+--------+------+
only showing top 10 rows





Total record count in Ratings Data :  57.63M


                                                                                

In [6]:
# Find Count of Null, None, NaN of All DataFrame Columns
from pyspark.sql.functions import col,isnan, when, count
anime_ratings_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in anime_ratings_df.columns]
   ).show(truncate=False)



+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|0      |0       |0     |
+-------+--------+------+



                                                                                

In [7]:
# Check for duplicate user_id-anime_id pairs 
anime_ratings_df.groupBy("user_id","anime_id").count().filter("count > 1").show(truncate=False)
#df1.drop('count').show()



+-------+--------+-----+
|user_id|anime_id|count|
+-------+--------+-----+
+-------+--------+-----+



                                                                                

In [8]:
# Count the total number of users and total number of animes in ratings dataset
total_users = anime_ratings_df.select(['user_id']).distinct().count()
total_animes = anime_ratings_df.select(['anime_id']).distinct().count()

print("Total No of Users  :", numerize.numerize(total_users))
print("Total No of Animes :", numerize.numerize(total_animes))



Total No of Users  : 310.06K
Total No of Animes : 16.87K


                                                                                

In [9]:
'''
Generate Spark matrix Item for Model input

     item_id_1 -> [(userid_1, rating_1),
                   (userid_2, rating_2),...] 
'''

def ItemUsersPairs(line):
    '''
    Generate Item,(User,Rating) pairs
    '''
    #line = line.split(",")
    return line[1],(line[0],float(line[2]))

def ItemUserInteractions(item,users_with_rating,n):
    '''
    For users with # interactions > n,to subsample replace their interaction history
    with a sample of n items_with_rating
    '''
    if len(users_with_rating) > n:
        #return item,users_with_rating[:n]
        return item,random.sample(users_with_rating,n)
    else:
        #return item, users_with_rating[:n]
        return item, users_with_rating

user_item_pairs = anime_ratings.map(ItemUsersPairs).groupByKey()
                               #.map(lambda p: ItemUserInteractions(p[0],list(p[1]),100))

def f(x): return x
anime_ratings_df_flatten = user_item_pairs.flatMapValues(f)

anime_rating = anime_ratings_df_flatten.map(lambda p:(p[0],p[1][0],p[1][1]))\
                               .map(lambda p: Row(userId=int(p[1]), itemId=int(p[0]),rating=float(p[2])))

print(" [Item => (User,Rating)..] RDD is shown below ")
user_item_pairs.take(2)

 [Item => (User,Rating)..] RDD is shown below 


                                                                                

[('31105',
  [('50826', 6.0),
   ('202476', 6.0),
   ('346506', 6.0),
   ('288527', 7.0),
   ('343712', 9.0),
   ('200959', 8.0),
   ('339662', 7.0),
   ('180661', 7.0),
   ('285103', 8.0),
   ('122020', 6.0),
   ('283532', 8.0),
   ('326379', 6.0),
   ('17667', 8.0),
   ('3858', 8.0),
   ('112196', 5.0),
   ('28023', 7.0),
   ('336459', 5.0),
   ('100179', 6.0),
   ('147552', 4.0),
   ('255733', 7.0),
   ('162615', 10.0),
   ('240415', 6.0),
   ('267712', 7.0),
   ('145710', 6.0),
   ('54340', 9.0),
   ('292087', 6.0),
   ('168185', 10.0),
   ('253744', 1.0),
   ('31262', 8.0),
   ('252275', 5.0),
   ('151974', 1.0),
   ('53481', 7.0),
   ('181050', 8.0),
   ('268647', 6.0),
   ('299777', 7.0),
   ('206112', 6.0),
   ('26035', 9.0),
   ('144519', 3.0),
   ('221120', 7.0),
   ('146070', 7.0),
   ('32319', 9.0),
   ('97686', 7.0),
   ('287309', 7.0),
   ('349054', 10.0),
   ('304225', 7.0),
   ('227835', 8.0),
   ('332910', 6.0),
   ('113311', 6.0),
   ('181380', 7.0),
   ('90528', 7.0)

In [10]:
# Model Train -Test Split
(training,test) = anime_rating.randomSplit([0.8,0.2],2000)
anime_ratings  =  training.map(lambda p: Row(userId=int(p[0]), itemId=int(p[1]),rating=float(p[2])))
anime_ratings_2  =  training.map(lambda p: Row(userId2=int(p[0]), itemId2=int(p[1]),rating2=float(p[2])))
anime_ratingsdf = spark.createDataFrame(anime_ratings)
anime_ratingsdf2 = spark.createDataFrame(anime_ratings_2)

#Generating Item1,Item2 => UserRating1,UserRating2 combinations on train data
anime_df = anime_ratingsdf.join(anime_ratingsdf2, ( \
                                                           (anime_ratingsdf.itemId != anime_ratingsdf2.itemId2) & \
                                                           (anime_ratingsdf.userId == anime_ratingsdf2.userId2)) \
                                        ,'left') \
                                  .select("itemId","itemId2","rating","rating2")
anime_df1 = anime_df.na.fill(0)
anime_user_ratingrdd = anime_df1.rdd

#Total Count of Item-Item Pair and their rating data by users in Training Data
ItemPairCount  = anime_user_ratingrdd.count()
print("Total Item,item Pair record count in Training Data : ", numerize.numerize(ItemPairCount))

#Item-Item Pair and their rating data by users in Training Data
print("Item1-Item2=>Rating1,Rating2 Dataframe input to the model is shown below : ")
anime_df1.show(truncate=False)

                                                                                

Total Item,item Pair record count in Training Data :  321.47M
Item1-Item2=>Rating1,Rating2 Dataframe input to the model is shown below : 


                                                                                

+------+-------+------+-------+
|itemId|itemId2|rating|rating2|
+------+-------+------+-------+
|918   |6884   |9.0   |7.0    |
|918   |35069  |9.0   |6.0    |
|918   |40784  |9.0   |7.0    |
|918   |16241  |9.0   |7.0    |
|918   |23369  |9.0   |7.0    |
|28297 |1858   |10.0  |6.0    |
|28297 |3271   |7.0   |6.0    |
|614   |611    |5.0   |3.0    |
|614   |3371   |5.0   |10.0   |
|614   |4872   |5.0   |7.0    |
|614   |3417   |5.0   |6.0    |
|614   |6883   |5.0   |7.0    |
|614   |5177   |5.0   |7.0    |
|614   |3328   |5.0   |5.0    |
|614   |4672   |5.0   |8.0    |
|614   |6582   |5.0   |7.0    |
|614   |3614   |5.0   |6.0    |
|614   |6946   |5.0   |8.0    |
|614   |2261   |5.0   |7.0    |
|614   |2775   |5.0   |5.0    |
+------+-------+------+-------+
only showing top 20 rows



In [None]:
# Generating Cosine Distance for item-item pair for all user ratings
pairwise_items = anime_user_ratingrdd.map(lambda p: ((p[0],p[1]),(p[2],p[3])))\
                                     .map(lambda p:(p[0],p[1],p[1][0]*p[1][0],p[1][1]*p[1][1],p[1][0]*p[1][1]))\
                                     .map(lambda p: Row(item_pair=p[0], rating_pair=p[1],cosim_x = p[2],cosim_y = p[3],cosim_xy = p[4] ))
pairwise_item_df = spark.createDataFrame(pairwise_items)
pairwise = pairwise_item_df.groupBy("item_pair").agg(sum("cosim_x").alias("Cosim_sumx"),\
                                                sum("cosim_y").alias("Cosim_sumy"),\
                                                sum("cosim_xy").alias("Cosim_sumxy"),\
                                               )
pairwsie_sqrt = pairwise.withColumn("Cosim_sumx_sqrt",sqrt("Cosim_sumx")).withColumn("Cosim_sumy_sqrt",sqrt("Cosim_sumy"))
pairwise_cosine = pairwsie_sqrt.withColumn("Cosine_Similarity", (pairwsie_sqrt.Cosim_sumxy /((pairwsie_sqrt.Cosim_sumx_sqrt * pairwsie_sqrt.Cosim_sumy_sqrt))))
pairwise_cosine = pairwise_cosine.na.fill(0)
pairwise_cosinerdd = pairwise_cosine.rdd

#Cosine Distance calculated for our rating data for each anime-anime combination
print("Cosine Distance calculated for our rating data for each anime-anime combination : ")
pairwise_cosine.show(truncate=False)




Cosine Distance calculated for our rating data for each anime-anime combination : 


[Stage 63:>                                                         (0 + 1) / 1]

+-----------+----------+----------+-----------+-----------------+-----------------+------------------+
|item_pair  |Cosim_sumx|Cosim_sumy|Cosim_sumxy|Cosim_sumx_sqrt  |Cosim_sumy_sqrt  |Cosine_Similarity |
+-----------+----------+----------+-----------+-----------------+-----------------+------------------+
|{5, 2428}  |49.0      |36.0      |42.0       |7.0              |6.0              |0.9882352941176471|
|{5, 9204}  |49.0      |49.0      |49.0       |7.0              |7.0              |0.98989898989899  |
|{5, 17479} |49.0      |9.0       |21.0       |7.0              |3.0              |0.9767441860465116|
|{5, 29585} |49.0      |36.0      |42.0       |7.0              |6.0              |0.9882352941176471|
|{7, 571}   |100.0     |34.0      |58.0       |10.0             |5.830951894845301|0.9862348993338552|
|{7, 1644}  |36.0      |25.0      |30.0       |6.0              |5.0              |0.9836065573770492|
|{7, 6184}  |64.0      |49.0      |56.0       |8.0              |7.0     

                                                                                

In [None]:
def keyOnFirstItem(item_pair,item_sim_data):
    '''
    For each item-item pair, make the first item's id the key
    '''
    (item1_id,item2_id) = item_pair
    return item1_id,(item2_id,item_sim_data)

def nearestNeighbors(item_id,items_and_sims,n):
    '''
    Sort the predictions list by similarity and select the top-N neighbors
    '''
    items_and_sims.sort(key=lambda x: x[1],reverse=True)
    return item_id, items_and_sims[:n]

In [None]:
# Generate Top K Neighbours based on Cosine Similarity Distance
pair_wise_nn = pairwise_cosinerdd.map(lambda p:keyOnFirstItem(p[0],p[6]))\
                  .groupByKey()\
                  .map( lambda p : (p[0], list(p[1])))\
                  .map( lambda p: nearestNeighbors(p[0],p[1],5))\
                  .map(lambda p: Row(item=p[0], item_rating_list=p[1]))

def f(x): return x
cosine_pairs = pair_wise_nn.flatMapValues(f)
cosine_pairsnn = cosine_pairs.map(lambda p:(p[0],p[1][0],p[1][1]))\
                             .map(lambda p: Row(item=p[0], item_nn=p[1], item_cosine = p[2]))
cosine_pairsnn_df = spark.createDataFrame(cosine_pairsnn)

topknn = anime_ratingsdf.join(cosine_pairsnn_df, cosine_pairsnn_df.item == anime_ratingsdf.itemId, 'left')
topknn_cosim = topknn.withColumn("totalratings",topknn.rating * topknn.item_cosine )\
                     .withColumn("CosimTotal",topknn.item_cosine + topknn.item_cosine )
topknn_cosine = topknn_cosim.groupBy("itemId","item_nn").agg(sum("totalratings").alias("total_ratings"),\
                                                sum("CosimTotal").alias("CosineTotal"),\
                                               )
topknn_cosim = topknn_cosine.withColumn("CosineDistance",(topknn_cosine.total_ratings / topknn_cosine.CosineTotal) )\
                .select("itemId","item_nn","CosineDistance")
topknn_cosimrdd = topknn_cosim.rdd
print("Generate Top 5 Neighbours based on Cosine Similarity Distance for AnimeId - 28035 : ")
topknn_cosim.where("itemId = 28035").show(truncate=False)



Generate Top 5 Neighbours based on Cosine Similarity Distance for AnimeId - 28035 : 


[Stage 92:>                                                         (0 + 1) / 1]

+------+-------+-----------------+
|itemId|item_nn|CosineDistance   |
+------+-------+-----------------+
|28035 |23969  |4.765625000000003|
|28035 |19905  |4.765625000000003|
|28035 |7793   |4.765625000000002|
|28035 |18731  |4.765625000000002|
|28035 |23185  |4.765625000000002|
+------+-------+-----------------+



                                                                                

In [None]:
#Evaluation
Testratings  =  test.map(lambda p: Row(TestItemID=int(p[1]),testrating=float(p[2])))\
                    .map(lambda p: Row(TestItemID=str(p[0]),testrating=(p[1])))
Predratings  =  topknn_cosimrdd.map(lambda p: Row(PredItemID=str(p[1]),Predictedrating=(p[2])))
Testratingsdf = spark.createDataFrame(Testratings)
Predratingsdf = spark.createDataFrame(Predratings)
Pred =  Predratingsdf.join(Testratingsdf,Testratingsdf.TestItemID ==  Predratingsdf.PredItemID,"inner")
Pred = Pred.na.fill(0)
preds = Pred.select("Predictedrating","testrating")

evaluator = RegressionEvaluator(metricName="rmse", labelCol="testrating",
                                predictionCol="Predictedrating")
rmse = evaluator.evaluate(preds)
print("RMSE of KNN Implementation using Cosine Similarity: ",rmse)



RMSE of KNN Implementation using Cosine Similarity:  2.091485612051355


                                                                                

In [None]:
##Pearson Coefficient Implementation for Anime Recommender System

#(training,test) = anime_rating.randomSplit([0.8,0.2],2000)
pe_cf_anime_ratings  =  training.map(lambda p: Row(userId=str(p[0]), itemId=int(p[1]),rating=float(p[2])))
pe_cf_anime_ratings_2  =  training.map(lambda p: Row(userId2=str(p[0]), itemId2=int(p[1]),rating2=float(p[2])))
pe_cf_anime_ratingsdf = spark.createDataFrame(pe_cf_anime_ratings)
pe_cf_anime_ratingsdf2 = spark.createDataFrame(pe_cf_anime_ratings_2)

## Subtracting Mean User Anime Ratings from the ratings data 
User_mean = pe_cf_anime_ratingsdf.groupBy("userId").agg({'rating' : 'mean'}).withColumnRenamed("avg(rating)", "user_mean")\
                                                                            .withColumnRenamed("userId", "meanuserId")
pe_cf_anime_ratingsdf_Umean = pe_cf_anime_ratingsdf.join(User_mean, ( \
                                                           (pe_cf_anime_ratingsdf.userId == User_mean.meanuserId)) \
                                        ,'left').select(pe_cf_anime_ratingsdf.userId,pe_cf_anime_ratingsdf.itemId,pe_cf_anime_ratingsdf.rating,User_mean.user_mean)

pe_cf_anime_ratingsdf2_Umean = pe_cf_anime_ratingsdf2.join(User_mean, ( \
                                                           (pe_cf_anime_ratingsdf2.userId2 == User_mean.meanuserId)) \
                                        ,'left').select(pe_cf_anime_ratingsdf2.userId2,pe_cf_anime_ratingsdf2.itemId2,pe_cf_anime_ratingsdf2.rating2,User_mean.user_mean)                                                    
pe_cf_MeanDeviation= pe_cf_anime_ratingsdf_Umean.withColumn("UserRatingDeviation",pe_cf_anime_ratingsdf_Umean.rating - pe_cf_anime_ratingsdf_Umean.user_mean)

pe_cf_MeanDeviation2= pe_cf_anime_ratingsdf2_Umean.withColumn("UserRatingDeviation2",pe_cf_anime_ratingsdf2_Umean.rating2 - pe_cf_anime_ratingsdf2_Umean.user_mean)


#Generating Item1,Item2 => UserRating1,UserRating2 combinations on train data
pe_cf_anime_df = pe_cf_MeanDeviation.join(pe_cf_MeanDeviation2, ( \
                                                           (pe_cf_MeanDeviation.itemId != pe_cf_MeanDeviation2.itemId2) & \
                                                           (pe_cf_MeanDeviation.userId == pe_cf_MeanDeviation2.userId2)) \
                                        ,'left') \
                                  .select("itemId","itemId2","UserRatingDeviation","UserRatingDeviation")
pe_cf_anime_df1 = pe_cf_anime_df.na.fill(0)
pe_cf_anime_user_ratingrdd = pe_cf_anime_df1.rdd

# Generating Cosine Distance for item-item pair for all user ratings
pe_cf_pairwise_items = pe_cf_anime_user_ratingrdd.map(lambda p: ((p[0],p[1]),(p[2],p[3])))\
                                     .map(lambda p:(p[0],p[1],p[1][0]*p[1][0],p[1][1]*p[1][1],p[1][0]*p[1][1]))\
                                     .map(lambda p: Row(item_pair=p[0], rating_pair=p[1],cosim_x = p[2],cosim_y = p[3],cosim_xy = p[4] ))
pe_cf_pairwise_item_df = spark.createDataFrame(pe_cf_pairwise_items)
pe_cf_pairwise = pe_cf_pairwise_item_df.groupBy("item_pair").agg(sum("cosim_x").alias("Cosim_sumx"),\
                                                sum("cosim_y").alias("Cosim_sumy"),\
                                                sum("cosim_xy").alias("Cosim_sumxy"),\
                                               )
pe_cf_pairwsie_sqrt = pe_cf_pairwise.withColumn("Cosim_sumx_sqrt",sqrt("Cosim_sumx")).withColumn("Cosim_sumy_sqrt",sqrt("Cosim_sumy"))
pe_cf_pairwise_cosine = pe_cf_pairwsie_sqrt.withColumn("Cosine_Similarity", (pe_cf_pairwsie_sqrt.Cosim_sumxy /((pe_cf_pairwsie_sqrt.Cosim_sumx_sqrt * pe_cf_pairwsie_sqrt.Cosim_sumy_sqrt)+0.5))+0)
pe_cf_pairwise_cosine = pe_cf_pairwise_cosine.na.fill(0)
pe_cf_pairwise_cosinerdd = pe_cf_pairwise_cosine.rdd


# Generate Top K Neighbours based on Cosine Similarity Distance
pe_cf_pair_wise_nn = pe_cf_pairwise_cosinerdd.map(lambda p:keyOnFirstItem(p[0],p[6]))\
                  .groupByKey()\
                  .map( lambda p : (p[0], list(p[1])))\
                  .map( lambda p: nearestNeighbors(p[0],p[1],5))\
                  .map(lambda p: Row(item=p[0], item_rating_list=p[1]))

def f(x): return x
pe_cf_cosine_pairs = pe_cf_pair_wise_nn.flatMapValues(f)
pe_cf_cosine_pairsnn = pe_cf_cosine_pairs.map(lambda p:(p[0],p[1][0],p[1][1]))\
                             .map(lambda p: Row(item=p[0], item_nn=p[1], item_cosine = p[2]))
pe_cf_cosine_pairsnn_df = spark.createDataFrame(pe_cf_cosine_pairsnn)

pe_cf_topknn = pe_cf_anime_ratingsdf.join(pe_cf_cosine_pairsnn_df, pe_cf_cosine_pairsnn_df.item == pe_cf_anime_ratingsdf.itemId, 'left')
pe_cf_topknn_cosim = pe_cf_topknn.withColumn("totalratings",pe_cf_topknn.rating * pe_cf_topknn.item_cosine*2 )\
                     .withColumn("CosimTotal",pe_cf_topknn.item_cosine + pe_cf_topknn.item_cosine )
pe_cf_topknn_cosine = pe_cf_topknn_cosim.groupBy("itemId","item_nn").agg(sum("totalratings").alias("total_ratings"),\
                                                sum("CosimTotal").alias("CosineTotal"),\
                                               )
pe_cf_topknn_cosim = pe_cf_topknn_cosine.withColumn("PearsonDistance",(pe_cf_topknn_cosine.total_ratings / pe_cf_topknn_cosine.CosineTotal) )\
                .select("itemId","item_nn","PearsonDistance")
pe_cf_topknn_cosimrdd = pe_cf_topknn_cosim.rdd





In [126]:
#Pearson Coefficient Implementation Evaluation
Testratings  =  test.map(lambda p: Row(TestItemID=int(p[1]),testrating=float(p[2])))\
                    .map(lambda p: Row(TestItemID=str(p[0]),testrating=(p[1])))
Testratingsdf = spark.createDataFrame(Testratings)
pe_cf_Predratings  =  pe_cf_topknn_cosimrdd.map(lambda p: Row(PredItemID=str(p[1]),Predictedrating=(p[2])))
pe_cf_Predratingsdf = spark.createDataFrame(pe_cf_Predratings)

pe_cf_Pred =  pe_cf_Predratingsdf.join(Testratingsdf,Testratingsdf.TestItemID ==  pe_cf_Predratingsdf.PredItemID,"inner")
pe_cf_Pred = pe_cf_Pred.na.fill(0)
pe_cf_preds = pe_cf_Pred.select("Predictedrating","testrating")

pe_cf_evaluator = RegressionEvaluator(metricName="rmse", labelCol="testrating",
                                predictionCol="Predictedrating")
pe_cf_rmse = pe_cf_evaluator.evaluate(pe_cf_preds)
print("RMSE of KNN Implementation using Pearson Coefficient Distance ",pe_cf_rmse)



RMSE of KNN Implementation using Pearson Coefficient Distance  2.270102854815402


                                                                                

In [25]:
##  Converting recommendations into readable format by adding anime name and genres

# Importing anime.csv which contains anime_id, Name and Genres
anime = spark.read.csv("gs://anime-recommmender/sourcefiles/anime.csv",header=True)
anime = anime.withColumn('MAL_ID', col('MAL_ID').cast('integer'))
anime = anime.withColumnRenamed('MAL_ID', 'anime_id')
anime = anime.select(col("anime_id"),col("Name"), col("Score"),col("Genres"))

In [122]:
##Recommend similar anime for input anime based on cosine similarity
def AnimeRecommender(itemId):
    '''
    #Recommend Top K nearest anime for a input itemId
    '''
    overall_avg_anime_mean = anime_ratingsdf.filter(anime_ratingsdf['itemId'] == itemId).agg({'rating' : 'mean'}).collect()[0][0]
    print("Anime Movie Details shown below")
    anime.filter(anime['anime_id']==itemId).show(truncate=False)
    #print("\n")
    print("Overall Avg Rating by all users for anime",itemId,"is",overall_avg_anime_mean)
    #print("\n")
    print("Top N Recommended anime similar to input anime -",itemId, "is shown below" )
    recommender = topknn_cosim.filter(topknn_cosim['itemId'] == itemId )
    anime_recommender = recommender.join(anime,recommender.item_nn == anime.anime_id,'left')\
                                   .select("anime_id","Name","Genres")
    anime_recommender =anime_recommender.na.drop(subset=["anime_id"])
    return anime_recommender

topNrecommender = AnimeRecommender(11757) # AnimeIds - 5114, 1535, 22319, 11757, 16498, 249
topNrecommender.show(truncate=False)

                                                                                

Anime Movie Details shown below


                                                                                

+--------+----------------+-----+-----------------------------------------+
|anime_id|Name            |Score|Genres                                   |
+--------+----------------+-----+-----------------------------------------+
|11757   |Sword Art Online|7.25 |Action, Game, Adventure, Romance, Fantasy|
+--------+----------------+-----+-----------------------------------------+

Overall Avg Rating by all users for anime 11757 is 7.068181818181818
Top N Recommended anime similar to input anime - 11757 is shown below




+--------+--------------------------------------------------------+-------------------------------------------------------+
|anime_id|Name                                                    |Genres                                                 |
+--------+--------------------------------------------------------+-------------------------------------------------------+
|38422   |High Score Girl: Extra Stage                            |Comedy, Game, Romance, School, Seinen                  |
|2002    |Heroic Age                                              |Action, Mecha, Military, Sci-Fi, Space                 |
|3323    |Kite Liberator                                          |Action, Sci-Fi, Drama                                  |
|36702   |Shingeki no Kyojin Season 2 Movie: Kakusei no Houkou    |Action, Drama, Fantasy, Shounen, Super Power           |
|38040   |Kono Subarashii Sekai ni Shukufuku wo!: Kurenai Densetsu|Adventure, Comedy, Fantasy, Magic, Parody, Supernatural|
+-------

                                                                                

In [123]:
##Recommend similar anime for input anime based on Pearson Coefficient Distance
def AnimeRecommenderPC(itemId):
    '''
    #Recommend Top K nearest anime for a input itemId
    '''
    overall_avg_anime_mean = anime_ratingsdf.filter(anime_ratingsdf['itemId'] == itemId).agg({'rating' : 'mean'}).collect()[0][0]
    print("Anime Movie Details shown below")
    anime.filter(anime['anime_id']==itemId).show(truncate=False)
    #print("\n")
    print("Overall Avg Rating by all users for anime",itemId,"is",overall_avg_anime_mean)
    #print("\n")
    print("Top N Recommended anime similar to input anime using Pearson Coefficient Distance to input anime -",itemId, "is shown below" )
    recommender = pe_cf_topknn_cosim.filter(pe_cf_topknn_cosim['itemId'] == itemId )
    anime_recommender = recommender.join(anime,recommender.item_nn == anime.anime_id,'left')\
                                   .select("anime_id","Name","Genres")
    anime_recommender =anime_recommender.na.drop(subset=["anime_id"])
    return anime_recommender

topNrecommenderPC = AnimeRecommenderPC(11757)
topNrecommenderPC.show(truncate=False)

                                                                                

Anime Movie Details shown below


                                                                                

+--------+----------------+-----+-----------------------------------------+
|anime_id|Name            |Score|Genres                                   |
+--------+----------------+-----+-----------------------------------------+
|11757   |Sword Art Online|7.25 |Action, Game, Adventure, Romance, Fantasy|
+--------+----------------+-----+-----------------------------------------+

Overall Avg Rating by all users for anime 11757 is 7.397260273972603
Top N Recommended anime similar to input anime using Pearson Coefficient Distance to input anime - 11757 is shown below




+--------+-------------------------+----------------------+
|anime_id|Name                     |Genres                |
+--------+-------------------------+----------------------+
|14045   |Mangirl!                 |Comedy, Slice of Life |
|28637   |Dreams                   |Dementia              |
|18149   |Ishida to Asakura Special|Comedy, School, Seinen|
|842     |Ushio to Tora            |Action, Comedy, Horror|
+--------+-------------------------+----------------------+



                                                                                

In [120]:
##Recommend similar anime for input anime based on cosine similarity
def UserAnimeRecommender(userId):
    '''
    #Recommend Top K nearest anime for a input userID
    '''
    overall_avg_user_mean = anime_ratingsdf.filter(anime_ratingsdf['userId'] == userId).agg({'rating' : 'mean'}).collect()[0][0]
    overall_user_rated_anime_count = anime_ratingsdf.filter(anime_ratingsdf['userId'] == userId).select("itemId").distinct().count()
    print("User -",userId,"rated a total of",overall_user_rated_anime_count,"animes")
    #print("\n")
    print("Overall Avg Rating by user -",userId,"is",overall_avg_user_mean)
    #print("\n")
    User_rated_anime = anime_ratingsdf.filter(anime_ratingsdf['userId'] == userId).select("userId","itemId")\
                                  .withColumnRenamed("itemId", "UserItemId")
    user_nn_anime = User_rated_anime.join(topknn_cosim,User_rated_anime.UserItemId == topknn_cosim.itemId)
    #print("\n")
    print("User -",userId, "rated sample animes details below " )
    user_nn_anime.join(anime,user_nn_anime.UserItemId == anime.anime_id,'left')\
                                 .select("anime_id","Name","Genres").distinct().limit(5).show(truncate=False)
    
    user_nn = User_rated_anime.join(topknn_cosim,User_rated_anime.UserItemId == topknn_cosim.itemId).select("userId","item_nn","CosineDistance")
    user_nn =user_nn.na.drop(subset=["CosineDistance"])
    
    user_nnrdd = user_nn.rdd
    user_nn = user_nnrdd.map(lambda p: ( p[0], (p[1],p[2])))\
                  .groupByKey().map( lambda p : (p[0], list(p[1])))\
                  .map( lambda p: nearestNeighbors(p[0],p[1],5))\
                  .map(lambda p: Row(userID=p[0], item_rating_list=p[1]))
    def f(x): return x
    user_nn_flatten = user_nn.flatMapValues(f)    
    user_knn = user_nn_flatten.map(lambda p:(p[0],p[1][0],p[1][1]))\
                             .map(lambda p: Row(userID=p[0], UseritemID=p[1], item_cosine = p[2]))
    user_knndf = spark.createDataFrame(user_knn)
    print("Top N Recommended anime similar to input user -",userId, "is shown below" )
    anime_recommender = user_knndf.join(anime,user_knndf.UseritemID == anime.anime_id,'left')\
                                  .select("anime_id","Name","Genres")
    anime_recommender =anime_recommender.na.drop(subset=["anime_id"])
    return anime_recommender

topNrecommenderPC = UserAnimeRecommender(68042) # UserId - 162615,68042, 283786, 190748, 328108
topNrecommenderPC.show(truncate=False)

                                                                                

User - 68042 rated a total of 4587 animes
Overall Avg Rating by user - 68042 is 5.892343984559297
User - 68042 rated sample animes details below 


                                                                                

+--------+-------------------------------------------------------+----------------------------------------------+
|anime_id|Name                                                   |Genres                                        |
+--------+-------------------------------------------------------+----------------------------------------------+
|11593   |Ganbare! Bokura no Hit and Run                         |School, Sports                                |
|33957   |Danball Senki Wars: All Star Battle                    |Action, Kids, Mecha, School                   |
|19921   |Ogami Matsugorou                                       |Action, Martial Arts, Romance, School, Shounen|
|38199   |Bermuda Triangle: Colorful Pastrale                    |Music, Fantasy                                |
|17219   |Sore Ike! Anpanman: Roll to Laura Ukigumojou no Himitsu|Kids, Fantasy, Comedy                         |
+--------+-------------------------------------------------------+----------------------

                                                                                

Top N Recommended anime similar to input user - 68042 is shown below


                                                                                

+--------+--------------------------+------------------------------+
|anime_id|Name                      |Genres                        |
+--------+--------------------------+------------------------------+
|11245   |Manga Nihonshi            |Historical                    |
|22975   |Kaibutsu-kun: Demon no Ken|Comedy, Horror, Kids, Shounen |
|5477    |Gozonji! Gekkou Kamen-kun |Parody, Comedy, Sci-Fi        |
|35401   |Hyaku-nengo no Aru Hi     |Military, Sci-Fi, Supernatural|
|15141   |Yukiwatari                |Fantasy                       |
+--------+--------------------------+------------------------------+



In [121]:
##Recommend similar anime for input anime based on cosine similarity
def UserAnimeRecommenderPC(userId):
    '''
    #Recommend Top K nearest anime for a input userID
    '''
    overall_avg_user_mean = anime_ratingsdf.filter(anime_ratingsdf['userId'] == userId).agg({'rating' : 'mean'}).collect()[0][0]
    overall_user_rated_anime_count = anime_ratingsdf.filter(anime_ratingsdf['userId'] == userId).select("itemId").distinct().count()
    print("User -",userId,"rated a total of",overall_user_rated_anime_count,"animes")
    #print("\n")
    print("Overall Avg Rating by user -",userId,"is",overall_avg_user_mean)
    #print("\n")
    User_rated_anime = anime_ratingsdf.filter(anime_ratingsdf['userId'] == userId).select("userId","itemId")\
                                  .withColumnRenamed("itemId", "UserItemId")
    user_nn_anime = User_rated_anime.join(pe_cf_topknn_cosim,User_rated_anime.UserItemId == pe_cf_topknn_cosim.itemId)
    #print("\n")
    print("User -",userId, "rated sample animes details below " )
    user_nn_anime.join(anime,user_nn_anime.UserItemId == anime.anime_id,'left')\
                                 .select("anime_id","Name","Genres").distinct().limit(5).show(truncate=False)
    user_nn = User_rated_anime.join(pe_cf_topknn_cosim,User_rated_anime.UserItemId == pe_cf_topknn_cosim.itemId)\
                              .select("userId","item_nn","PearsonDistance")
    user_nn =user_nn.na.drop(subset=["PearsonDistance"])                          
    user_nnrdd = user_nn.rdd
    user_nn = user_nnrdd.map(lambda p: ( p[0], (p[1],p[2])))\
                  .groupByKey().map( lambda p : (p[0], list(p[1])))\
                  .map( lambda p: nearestNeighbors(p[0],p[1],5))\
                  .map(lambda p: Row(userID=p[0], item_rating_list=p[1]))
    def f(x): return x
    user_nn_flatten = user_nn.flatMapValues(f)    
    user_knn = user_nn_flatten.map(lambda p:(p[0],p[1][0],p[1][1]))\
                             .map(lambda p: Row(userID=p[0], UseritemID=p[1], item_cosine = p[2]))
    user_knndf = spark.createDataFrame(user_knn)
    print("Top N Recommended anime similar to input user -",userId, "is shown below" )
    anime_recommender = user_knndf.join(anime,user_knndf.UseritemID == anime.anime_id,'left')\
                                     .select("anime_id","Name","Genres")
    anime_recommender =anime_recommender.na.drop(subset=["anime_id"])
    
    return anime_recommender

topNUserrecommender = UserAnimeRecommenderPC(68042)
topNUserrecommender.show(truncate=False)

                                                                                

User - 68042 rated a total of 4589 animes
Overall Avg Rating by user - 68042 is 5.888245388245388
User - 68042 rated sample animes details below 


                                                                                

+--------+----------------------------------------+-----------------------------------------+
|anime_id|Name                                    |Genres                                   |
+--------+----------------------------------------+-----------------------------------------+
|2640    |Doraemon: It's Summer!                  |Kids, Adventure, Fantasy, Comedy, Shounen|
|9106    |Nogsaegjeoncha Hamos                    |Action, Adventure, Fantasy, Shounen      |
|5232    |Ninja Bugei-chou                        |Action                                   |
|39581   |TV Yarou Nanaana: Wakuwaku Doukutsu Land|Adventure, Comedy                        |
|39328   |Fushigiboshi no☆Futagohime: Marugoto DVD|Kids, Magic                              |
+--------+----------------------------------------+-----------------------------------------+



                                                                                

Top N Recommended anime similar to input user - 68042 is shown below
+--------+-------------------------+-----------------------+
|anime_id|Name                     |Genres                 |
+--------+-------------------------+-----------------------+
|10435   |Boyfriend                |Sports, Romance, Shoujo|
|7709    |Peng You Town            |Kids, Slice of Life    |
|6674    |Reporter Blues           |Mystery, Comedy        |
|5143    |Tokusou Kihei Dorvack    |Action, Mecha, Sci-Fi  |
|5088    |Fabre-sensei wa Meitantei|Mystery                |
+--------+-------------------------+-----------------------+

