In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
plt.style.use('ggplot')
import seaborn as sns
pd.set_option('display.max_columns', None)
import os
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from src.data_funcs import *
from src.model_funcs import *
from pyspark.sql.functions import explode

In [2]:
anime_df = pd.read_csv('data/anime.csv')
rating_df = pd.read_csv('data/rating.csv')
anime_meta = pd.read_csv('data/AnimeList_meta.csv')
users_meta = pd.read_csv('data/UserList_Meta.csv')
rating_df = rating_df[rating_df['rating']!=-1]

In [3]:
anime_full = full_anime_df(rating_df, anime_df, anime_meta)
anime_map = anime_full[['anime_id','name','title_english', 'type']]

## Model Based Matrix Factorization Recommenders System

## Manual train_test_split on users

In [4]:
filt = rating_df.groupby('user_id').count()['rating']
user_ids = filt[filt>50].reset_index()['user_id'].values
over_df = rating_df[rating_df['user_id'].isin(user_ids)]
remaining_df = rating_df[~rating_df['user_id'].isin(user_ids)]
over_df.groupby('user_id').count()['rating'].sort_values()
y=over_df['user_id']
X=over_df.drop(columns=['user_id'])
anime_train, anime_test, user_train, user_test = train_test_split(X, y, test_size = 0.20, random_state = 0, stratify=y)
train_over_split = pd.concat([anime_train, user_train],axis=1)
train = pd.concat([train_over_split, remaining_df], axis=0)
test = pd.concat([anime_test, user_test],axis=1)

In [5]:
#old
# y=over_2_df['user_id']
# X=over_2_df.drop(columns=['user_id'])
# anime_train, anime_test, user_train, user_test = train_test_split(X, y, test_size = 0.25, random_state = 0, stratify=y)
# train_over2_split = pd.concat([anime_train, user_train],axis=1)
# train = pd.concat([train_over2_split, remaining_df], axis=0)
# test = pd.concat([anime_test, user_test],axis=1)

## Using Spark ALS

In [6]:
from pyspark.sql import SparkSession
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [7]:
# Setup a SparkSession
spark = SparkSession.builder.getOrCreate()

# Convert a Pandas DF to a Spark DF
#spark_df = spark.createDataFrame(pandas_df) 

# Convert a Spark DF to a Pandas DF
#pandas_df = spark_df.toPandas()

In [8]:
all_spark = spark.createDataFrame(rating_df)
train_spark = spark.createDataFrame(train)
test_spark = spark.createDataFrame(test)
#Note to self: Add some cross validation

## Train Test Split with Pyspark

In [9]:
# train, test = all_spark.randomSplit([0.8, 0.2], seed=0)
train_data, val_data = train_spark.randomSplit([0.8, 0.2], seed=0)

In [10]:
train_data

DataFrame[anime_id: bigint, rating: bigint, user_id: bigint]

In [11]:
als_model = ALS(
    itemCol='anime_id',
    userCol='user_id',
    ratingCol='rating',
    nonnegative=True,    
    maxIter=20,
    regParam=0.1,
    rank=10) 
als_model.setColdStartStrategy("drop")

recommender = als_model.fit(train_data)

os.system("say 'Complete'") 

32512

In [12]:
preds_train = recommender.transform(train_data)
preds_val = recommender.transform(val_data)

In [14]:
predstrain_df = preds_train.toPandas()
predsval_df = preds_val.toPandas()

rmse_train = np.sqrt(mean_squared_error(predstrain_df['rating'],predstrain_df['prediction']))
rmse_val = np.sqrt(mean_squared_error(predsval_df['rating'],predsval_df['prediction']))

print(rmse_train, rmse_val)

1.037179999135848 1.146461759462331


In [15]:
preds_test = recommender.transform(test_spark)
predstest_df = preds_test.toPandas()
rmse_test = np.sqrt(mean_squared_error(predstest_df['rating'],predstest_df['prediction']))
print(rmse_test)

1.1343560679695368


## Inspecting the latent features

In [16]:
anime_features = recommender.itemFactors
user_features = recommender.userFactors
#10
anime_features.take(1)

[Row(id=20, features=[0.492931604385376, 0.5206156969070435, 0.646899938583374, 1.0530647039413452, 1.3970526456832886, 0.6397818326950073, 0.7391793727874756, 0.07245408743619919, 1.0065990686416626, 0.028668029233813286])]

## Tuning the model with crossvalidation & hyperparameter tuning

In [None]:
# #Tuning the model with crossvalidation
# paramGrid = ParamGridBuilder() \
#     .addGrid(als_model.rank, [10]) \
#     .addGrid(als_model.maxIter, [20]) \
#     .addGrid(als_model.regParam, [0.1]) \
#     .build()

# evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
#                                 predictionCol='prediction')

# crossval = CrossValidator(estimator=als_model,
#                           estimatorParamMaps=paramGrid,
#                           evaluator=evaluator,
#                           numFolds=3)

# cvModel = crossval.fit(train_data)

# preds_cv = cvModel.transform(test_data)
# preds_cvdf = preds_cv.toPandas()
# rmse_cv = np.sqrt(mean_squared_error(preds_cvdf['rating'],preds_cvdf['prediction']))
# pct_mean_cv = round(rmse_cv/(rating_df['rating'].mean())*100,0)
# print(f'rmse is {pct_mean_cv}% of mean')
# os.system("say 'Complete'") 

## Getting Recommendations For Certain Anime based on what other users that rated the anime highly also liked

In [17]:
#movie recomendations
movieRecs = recommender.recommendForAllItems(10)
movieRecs.filter(movieRecs.anime_id==120).show()
#This returns the recommender user to watch the anime based on anime_id

+--------+--------------------+
|anime_id|     recommendations|
+--------+--------------------+
|     120|[[16390, 10.48166...|
+--------+--------------------+



In [24]:
rec = movieRecs.filter(movieRecs.anime_id==120)
recs = rec.withColumn('recommendations', explode(rec.recommendations))
recs.show()

+--------+------------------+
|anime_id|   recommendations|
+--------+------------------+
|     120|[16390, 10.481668]|
|     120|[66213, 10.460606]|
|     120|[29702, 10.435471]|
|     120| [5443, 10.416733]|
|     120| [5659, 10.359993]|
|     120|[59579, 10.358955]|
|     120|[54624, 10.353795]|
|     120|[59519, 10.343967]|
|     120|[54114, 10.334009]|
|     120| [8224, 10.307118]|
+--------+------------------+



In [26]:
recs.printSchema()

root
 |-- anime_id: integer (nullable = false)
 |-- recommendations: struct (nullable = true)
 |    |-- user_id: integer (nullable = true)
 |    |-- rating: float (nullable = true)



In [56]:
sim_users = recs.select("anime_id", 'recommendations.*').select("user_id").rdd.flatMap(lambda x: x).collect()
sim_users

[16390, 66213, 29702, 5443, 5659, 59579, 54624, 59519, 54114, 8224]

In [None]:
# rec_df = recs.toPandas()
# rec_df

In [36]:
userRecs = recommender.recommendForAllUsers(10)

userRecs.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|    148|[[32400, 9.912419...|
|    463|[[32400, 11.00275...|
|    471|[[33360, 9.268867...|
|    496|[[32400, 11.11616...|
|    833|[[32400, 11.15705...|
|   1088|[[22607, 12.51817...|
|   1238|[[22607, 11.79739...|
|   1342|[[4208, 12.100459...|
|   1580|[[32400, 11.01759...|
|   1591|[[6262, 11.151911...|
|   1645|[[22607, 12.63872...|
|   1829|[[32400, 10.42426...|
|   1959|[[32400, 11.01497...|
|   2122|[[22607, 10.69851...|
|   2366|[[32400, 9.596404...|
|   2659|[[32400, 10.59612...|
|   2866|[[32400, 10.95549...|
|   3175|[[22607, 12.28804...|
|   3749|[[10742, 10.50278...|
|   3794|[[32400, 10.59204...|
+-------+--------------------+
only showing top 20 rows



In [58]:
user_rec = userRecs.where(userRecs.user_id.isin(sim_users))
user_rec.show()


+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|  54624|[[22607, 15.26658...|
|  16390|[[22607, 15.99666...|
|   5659|[[22607, 13.64116...|
|  66213|[[3644, 14.618015...|
|   5443|[[22607, 14.53432...|
|  59579|[[22607, 14.70403...|
|  59519|[[22607, 15.09886...|
|  54114|[[32894, 15.39254...|
|   8224|[[22607, 15.12295...|
|  29702|[[22607, 15.83381...|
+-------+--------------------+



In [59]:
userrecs = user_rec.withColumn('recommendations', explode(user_rec.recommendations))
user_picks = userrecs.select("user_id", 'recommendations.*')
user_picks.show()

+-------+--------+---------+
|user_id|anime_id|   rating|
+-------+--------+---------+
|  54624|   22607|15.266581|
|  54624|   32894|12.753238|
|  54624|    2552|12.662847|
|  54624|    3644|12.639414|
|  54624|   20969|12.561953|
|  54624|    4640|12.509378|
|  54624|    5096| 11.65477|
|  54624|   32422|11.542713|
|  54624|   15159| 11.52776|
|  54624|   33911|11.513495|
|  16390|   22607|15.996662|
|  16390|   29978| 12.42245|
|  16390|   17985|12.407199|
|  16390|   33360|12.386678|
|  16390|    3644|12.364303|
|  16390|    1466| 12.20294|
|  16390|    3205|12.189185|
|  16390|   32894|12.172327|
|  16390|    4208|12.092691|
|  16390|   21349| 11.85147|
+-------+--------+---------+
only showing top 20 rows



In [60]:
user_picks.count()

100

In [61]:
userpicks_df = user_picks.toPandas()

In [78]:
avg_rating = userpicks_df.groupby('anime_id').mean()['rating']
count_rating = userpicks_df.groupby('anime_id').count()['rating']
user_recs_joined = pd.DataFrame([avg_rating,count_rating],columns=avg_rating.index, index=['avg_rating','count_rating']).T
user_recs_joined['weighted_avg'] = weighted_rating(user_recs_joined,'count_rating', 'avg_rating')
top_anime_recs = user_recs_joined.sort_values('weighted_avg')[:10].index

In [79]:
anime_map[anime_map['anime_id'].isin(top_anime_recs)]

Unnamed: 0,anime_id,name,title_english,type
1064,33605,Ling Qi,Spiritpact,ONA
1443,5096,Doraemon Movie 28: Nobita To Midori No Kyojin Den,Doraemon The Movie: Nobita And The Green Giant...,Movie
3348,8677,Sangokushi (1985),,Special
3434,12163,Ginga Tetsudou 999: Hoshizora Wa Time Machine,,Movie
3941,6971,Gegege No Kitarou (1971),,TV
4637,33360,Code Geass: Boukoku No Akito Final - Itoshiki ...,,Special
5620,7729,Hokkyoku No Muushika Miishika,Adventures Of The Polar Cubs,Movie
7812,32400,Kochinpa!,,TV
8365,16041,Chogattai Majutsu Robot Ginguiser Specials,,Special
8485,32422,Doukyuusei,,Music


In [90]:
#Generate recommendations
def other_user_recs(anime_id, anime_map):
#         movieRecs = self.recommender.recommendForAllItems(10)
#         userRecs = self.recommender.recommendForAllUsers(10)
        rec = movieRecs.filter(movieRecs.anime_id==anime_id)
        recs = rec.withColumn('recommendations', explode(rec.recommendations))
        sim_users = recs.select("anime_id", 'recommendations.*').select("user_id").rdd.flatMap(lambda x: x).collect()
        user_rec = userRecs.where(userRecs.user_id.isin(sim_users))
        userrecs = user_rec.withColumn('recommendations', explode(user_rec.recommendations))
        user_picks = userrecs.select("user_id", 'recommendations.*')
        userpicks_df = user_picks.toPandas()
        avg_rating = userpicks_df.groupby('anime_id').mean()['rating']
        count_rating = userpicks_df.groupby('anime_id').count()['rating']
        user_recs_joined = pd.DataFrame([avg_rating,count_rating],columns=avg_rating.index, index=['avg_rating','count_rating']).T
        user_recs_joined['weighted_avg'] = weighted_rating(user_recs_joined,'count_rating', 'avg_rating')
        top_anime_recs = user_recs_joined.sort_values('weighted_avg')[:10].index
        return anime_map[anime_map['anime_id'].isin(top_anime_recs)]
    
def user_rec(user_id, anime_map):
    your_rec = userRecs.where(userRecs.user_id==user_id)
    yourrecs = your_rec.withColumn('recommendations', explode(your_rec.recommendations))
    your_picks = yourrecs.select("user_id", 'recommendations.*').select("anime_id").rdd.flatMap(lambda x: x).collect()
    return anime_map[anime_map['anime_id'].isin(your_picks)]

In [84]:
other_user_recs(7054, anime_map)

Unnamed: 0,anime_id,name,title_english,type
11,28851,Koe No Katachi,A Silent Voice,Movie
1012,10471,Ie Naki Ko Remi Specials,,Special
3348,8677,Sangokushi (1985),,Special
4685,4640,Maroko,,Movie
4712,9558,Cheung Gong Chat Hou,Cj7: The Cartoon,Movie
5620,7729,Hokkyoku No Muushika Miishika,Adventures Of The Polar Cubs,Movie
8168,6630,Asari-Chan: Ai No Marchen Shoujo,Super Gal Asari: The Dreaming Girl In Fairy World,Movie
8365,16041,Chogattai Majutsu Robot Ginguiser Specials,,Special
8923,9950,Hulu Xiongdi,,TV
9537,5994,Midoriyama Koukou Koushien-Hen,,OVA


In [88]:
#Specific recommendations for one user based on user_id:
user_id = 5
your_rec = userRecs.where(userRecs.user_id==user_id)
yourrecs = your_rec.withColumn('recommendations', explode(your_rec.recommendations))
your_picks = yourrecs.select("user_id", 'recommendations.*').select("anime_id").rdd.flatMap(lambda x: x).collect()
your_picks

[22059, 2881, 30921, 32400, 3327, 820, 29722, 17957, 18029, 7785]

In [89]:
anime_map[anime_map['anime_id'].isin(your_picks)]

Unnamed: 0,anime_id,name,title_english,type
7,820,Ginga Eiyuu Densetsu,Legend Of The Galactic Heroes,OVA
50,7785,Yojouhan Shinwa Taikei,The Tatami Galaxy,TV
2787,3327,Giant Gorg,Giant Gorg,TV
5021,2881,Chinmoku No Kantai,Silent Service,OVA
5514,29722,"Eikoku Ikka, Nihon Wo Taberu",Sushi And Beyond,TV
7621,22059,Kakumeiteki Broadway Shugisha Doumei,,Music
7812,32400,Kochinpa!,,TV
8821,17957,Hello Kitty No Papa Nante Daikirai,,OVA
8830,18029,Hello Kitty No Suteki Na Kyoudai,Hello Kitty In The Wonderful Sisters,OVA
9058,30921,Kacchikenee!,,Movie


In [91]:
user_rec(5, anime_map)

Unnamed: 0,anime_id,name,title_english,type
7,820,Ginga Eiyuu Densetsu,Legend Of The Galactic Heroes,OVA
50,7785,Yojouhan Shinwa Taikei,The Tatami Galaxy,TV
2787,3327,Giant Gorg,Giant Gorg,TV
5021,2881,Chinmoku No Kantai,Silent Service,OVA
5514,29722,"Eikoku Ikka, Nihon Wo Taberu",Sushi And Beyond,TV
7621,22059,Kakumeiteki Broadway Shugisha Doumei,,Music
7812,32400,Kochinpa!,,TV
8821,17957,Hello Kitty No Papa Nante Daikirai,,OVA
8830,18029,Hello Kitty No Suteki Na Kyoudai,Hello Kitty In The Wonderful Sisters,OVA
9058,30921,Kacchikenee!,,Movie


In [110]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import mlflow.spark as ml
# Save and load model
# recommender.save("model/myCollaborativeFilter3")

ModuleNotFoundError: No module named 'mlflow'

In [105]:
sameModel = MatrixFactorizationModel.load(spark,path="model/myCollaborativeFilter2")

Py4JJavaError: An error occurred while calling z:org.apache.spark.mllib.recommendation.MatrixFactorizationModel.load.
: org.json4s.package$MappingException: Did not find value which can be converted into java.lang.String
	at org.json4s.reflect.package$.fail(package.scala:95)
	at org.json4s.Extraction$$anonfun$org$json4s$Extraction$$convert$2.apply(Extraction.scala:704)
	at org.json4s.Extraction$$anonfun$org$json4s$Extraction$$convert$2.apply(Extraction.scala:704)
	at scala.Option.getOrElse(Option.scala:121)
	at org.json4s.Extraction$.org$json4s$Extraction$$convert(Extraction.scala:704)
	at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:394)
	at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:392)
	at org.json4s.Extraction$.customOrElse(Extraction.scala:606)
	at org.json4s.Extraction$.extract(Extraction.scala:392)
	at org.json4s.Extraction$.extract(Extraction.scala:39)
	at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
	at org.apache.spark.mllib.util.Loader$.loadMetadata(modelSaveLoad.scala:131)
	at org.apache.spark.mllib.recommendation.MatrixFactorizationModel$.load(MatrixFactorizationModel.scala:348)
	at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.load(MatrixFactorizationModel.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


## Surprise Package

In [None]:
from surprise import SVDpp, NormalPredictor, Dataset, Reader, accuracy
from surprise import KNNBaseline, NMF, SVD
from surprise.model_selection import KFold, cross_validate, GridSearchCV, train_test_split

In [None]:
reader = Reader(rating_scale=(1, 10))
all_data = Dataset.load_from_df(rating_df, reader)
train_surprise = Dataset.load_from_df(train, reader)
test_surprise = Dataset.load_from_df(test, reader)

In [None]:
kf = KFold(n_splits=3)

algo = SVD()

for trainset, testset in kf.split(train_surprise):

    # train and test algorithm.
    algo.fit(trainset)
    preds_surprise = algo.test(testset)

    # Compute and print Root Mean Squared Error
    accuracy.rmse(preds_surprise, verbose=True)

In [None]:
os.system("say 'Model Complete'") 

In [None]:
from collections import defaultdict

def get_top_n(predictions, n=10):
    '''Return the top-N recommendation for each user from a set of predictions.

    Args:
        predictions(list of Prediction objects): The list of predictions, as
            returned by the test method of an algorithm.
        n(int): The number of recommendation to output for each user. Default
            is 10.

    Returns:
    A dict where keys are user (raw) ids and values are lists of tuples:
        [(raw item id, rating estimation), ...] of size n.
    '''

    # First map the predictions to each user.
    top_n = defaultdict(list)
    for uid, iid, true_r, est, _ in predictions:
        top_n[uid].append((iid, est))

    # Then sort the predictions for each user and retrieve the k highest ones.
    for uid, user_ratings in top_n.items():
        user_ratings.sort(key=lambda x: x[1], reverse=True)
        top_n[uid] = user_ratings[:n]

    return top_n

top_n = get_top_n(predictions, n=10)

# Print the recommended items for each user
for uid, user_ratings in top_n.items():
    print(uid, [iid for (iid, _) in user_ratings])