# User-based recommendation on user clusters (based on likes)

## Import

In [3]:
import pandas as pd
import ast

In [4]:
data_profiles = pd.read_csv("https://anime-recommendation-engine.s3.eu-west-3.amazonaws.com/data/profiles_clean.csv")
data_reviews = pd.read_csv("https://anime-recommendation-engine.s3.eu-west-3.amazonaws.com/data/reviews_clean.csv")
data_animes = pd.read_csv("https://anime-recommendation-engine.s3.eu-west-3.amazonaws.com/data/animes_clean.csv", index_col="uid")

display(data_profiles.head(2))
display(data_reviews.head(2))
display(data_animes.head(2))

Unnamed: 0,profile,gender,birthday,favorites_anime,link,age
0,DesolatePsyche,Male,"Oct 2, 1994","['33352', '25013', '5530', '33674', '1482', '2...",https://myanimelist.net/profile/DesolatePsyche,26.0
1,baekbeans,Female,"Nov 10, 2000","['11061', '31964', '853', '20583', '918', '925...",https://myanimelist.net/profile/baekbeans,20.0


Unnamed: 0,uid,profile,anime_uid,text,score,scores,link
0,255938,DesolatePsyche,34096,\n \n \n \n ...,8,"{'Overall': '8', 'Story': '8', 'Animation': '8...",https://myanimelist.net/reviews.php?id=255938
1,259117,baekbeans,34599,\n \n \n \n ...,10,"{'Overall': '10', 'Story': '10', 'Animation': ...",https://myanimelist.net/reviews.php?id=259117


Unnamed: 0_level_0,title,synopsis,genre,aired,episodes,members,popularity,ranked,score,img_url,link
uid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
28891,Haikyuu!! Second Season,Following their participation at the Inter-Hig...,"['Comedy', 'Sports', 'Drama', 'School', 'Shoun...","Oct 4, 2015 to Mar 27, 2016",25,489888,141,25.0,8.82,https://cdn.myanimelist.net/images/anime/9/766...,https://myanimelist.net/anime/28891/Haikyuu_Se...
23273,Shigatsu wa Kimi no Uso,Music accompanies the path of the human metron...,"['Drama', 'Music', 'Romance', 'School', 'Shoun...","Oct 10, 2014 to Mar 20, 2015",22,995473,28,24.0,8.83,https://cdn.myanimelist.net/images/anime/3/671...,https://myanimelist.net/anime/23273/Shigatsu_w...


## Preprocessing

In [5]:
data_profiles["favorites_anime"] = data_profiles["favorites_anime"].apply(ast.literal_eval)

df_als_favorite = data_profiles[["profile", "favorites_anime"]].copy().explode("favorites_anime")
df_als_favorite = df_als_favorite.dropna(subset=["favorites_anime"])
df_als_favorite["favorites_anime"] = df_als_favorite["favorites_anime"].astype("int64")
df_als_favorite["is_favorite"] = 1

display(df_als_favorite.head(2))

Unnamed: 0,profile,favorites_anime,is_favorite
0,DesolatePsyche,33352,1
0,DesolatePsyche,25013,1


In [8]:
df_als_reviews_score = data_reviews[["profile", "anime_uid", "score"]].copy()

display(df_als_reviews_score.head(2))

Unnamed: 0,profile,anime_uid,score
0,DesolatePsyche,34096,8
1,baekbeans,34599,10


In [6]:
def get_score_by_uid(uid) :

    if uid not in data_animes.index :
        return ""

    return data_animes.at[uid, "score"]

In [9]:
df_als_favorite_score = data_profiles[["profile", "favorites_anime"]].copy().explode("favorites_anime")
df_als_favorite_score = df_als_favorite_score.dropna(subset=["favorites_anime"])
df_als_favorite_score["favorites_anime"] = df_als_favorite_score["favorites_anime"].astype("int64")

display(df_als_favorite_score.head(2))

Unnamed: 0,profile,favorites_anime
0,DesolatePsyche,33352
0,DesolatePsyche,25013


In [10]:
df_to_merge = df_als_favorite_score["favorites_anime"].reset_index().drop_duplicates(subset=["favorites_anime"])
df_to_merge["score"] = df_to_merge["favorites_anime"].apply(get_score_by_uid)
df_to_merge = df_to_merge.dropna(subset=["score"])
df_to_merge = df_to_merge.drop(columns="index")

display(df_to_merge.head(2))

Unnamed: 0,favorites_anime,score
0,33352,8.62
1,25013,8.13


In [11]:
df_als_favorite_score = df_als_favorite_score.merge(df_to_merge, on="favorites_anime")
df_als_favorite_score = df_als_favorite_score[df_als_favorite_score["score"]!=""]

display(df_als_favorite_score.head(2))

Unnamed: 0,profile,favorites_anime,score
0,DesolatePsyche,33352,8.62
1,DesolatePsyche,25013,8.13


In [12]:
df_als_favorite.to_csv("../data/als_is_favorite.csv", index=False)
df_als_reviews_score.to_csv("../data/als_reviews_score.csv", index=False)
df_als_favorite_score.to_csv("../data/als_favorite_score.csv", index=False)

## Spark

For now, this part have to be executed in a ad-hoc Jupyter environment with PySpark, following those steps (disclaimer : you need to install and configure PySpark first) :


```shell
pyspark --name anime-recommendation-engine --driver-java-options -Djava.security.manager=allow
```

```python
sc = SparkSession.builder.getOrCreate()
```

[Medium article simple ALS](https://medium.com/@patelneha1495/recommendation-system-in-python-using-als-algorithm-and-apache-spark-27aca08eaab3)

[Medium article advanced ALS](https://medium.com/@brunoborges_38708/recommender-system-using-als-in-pyspark-10329e1d1ee1)


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rank
from pyspark.sql.window import Window

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [5]:
sc = SparkSession.builder.getOrCreate()

In [7]:
df_als_is_favorite = pd.read_csv("../data/als_is_favorite.csv")
df_als_reviews_score = pd.read_csv("../data/als_reviews_score.csv")
df_als_favorite_score = pd.read_csv("../data/als_favorite_score.csv")

display(df_als_is_favorite.head(2)) # (216695, 3)
display(df_als_reviews_score.head(2)) # (130519, 3)
display(df_als_favorite_score.head(2)) # (216188, 3)

Unnamed: 0,profile,favorites_anime,is_favorite
0,DesolatePsyche,33352,1
1,DesolatePsyche,25013,1


Unnamed: 0,profile,anime_uid,score
0,DesolatePsyche,34096,8
1,baekbeans,34599,10


Unnamed: 0,profile,favorites_anime,score
0,DesolatePsyche,33352,8.62
1,DesolatePsyche,25013,8.13


In [9]:
def pyspark_df_with_train_test_split(df, test_size, user_col, item_col, rating_col) :

    df_items_counted = df.groupby(user_col)[rating_col].count().reset_index(name="num_items")
    df_merged = df.merge(df_items_counted, on=user_col)

    user_window = Window.partitionBy(user_col).orderBy(col(item_col).desc())

    df_spark = sc.createDataFrame(df_merged)
    df_spark = df_spark.withColumn("num_items_to_mask", (col("num_items") * test_size).cast("int"))
    df_spark = df_spark.withColumn("item_rank", rank().over(user_window))

    indexer = StringIndexer(inputCol=user_col, outputCol=user_col+"_index")

    df_spark = indexer.fit(df_spark).transform(df_spark)

    training = df_spark.filter(col("item_rank") > col("num_items_to_mask"))
    test = df_spark.filter(col("item_rank") <= col("num_items_to_mask"))

    return df_spark, training, test 

In [11]:
def als_tuning_and_predict(train, test, user_col, item_col, rating_col) :

    als = ALS(userCol=user_col+"_index", itemCol=item_col, ratingCol=rating_col,
            coldStartStrategy="drop", nonnegative=True)

    param_grid = ParamGridBuilder()\
                .addGrid(als.rank, [1, 3, 5])\
                .addGrid(als.maxIter, [5, 10, 15])\
                .addGrid(als.regParam, [.05, .1, .15])\
                .build()
    
    evaluator = RegressionEvaluator(metricName="rmse", labelCol=rating_col, predictionCol="prediction")

    cv = CrossValidator(
            estimator=als,
            estimatorParamMaps=param_grid,
            evaluator=evaluator,
            numFolds=3)
    
    model = cv.fit(train)

    print("Rank: ", model.bestModel.rank)
    print("MaxIter: ", model.bestModel._java_obj.parent().getMaxIter())
    print("RegParam: ", model.bestModel._java_obj.parent().getRegParam())

    predictions = model.bestModel.transform(test)
    rmse = evaluator.evaluate(predictions)
    
    print("RMSE="+str(rmse))

    return model.bestModel

In [None]:
def create_predictions_pandas_dataframe(model, df_spark, user_col, item_col, rating_col) :
    
    recs = model.recommendForAllUsers(5).toPandas()

    nrecs = recs.recommendations.apply(pd.Series)\
                .merge(recs, right_index = True, left_index = True)\
                .drop(["recommendations"], axis = 1)\
                .melt(id_vars = [user_col+"_index"], value_name = "recommendation")\
                .drop("variable", axis = 1)\
                .dropna()
    nrecs = nrecs.sort_values(user_col+"_index")
    nrecs = pd.concat([nrecs["recommendation"].apply(pd.Series), nrecs[user_col+"_index"]], axis = 1)
    nrecs.columns = [item_col, rating_col, user_col]

    md = df_spark.select(df_spark[user_col],df_spark[user_col+"_index"],df_spark[item_col])
    md = md.toPandas()

    dict1 = dict(zip(md[user_col+"_index"],md[user_col]))

    nrecs[user_col] = nrecs[user_col].map(dict1)
    nrecs = nrecs.sort_values(user_col)
    nrecs.reset_index(drop=True, inplace=True)

    # new = nrecs[[user_col,item_col,rating_col]]
    # new["recommendations"] = list(zip(new[item_col], new[rating_col]))

    # df_recommendations = new[[user_col,"recommendations"]]
    df_recommendations = nrecs[[user_col,item_col]]
    df_recommendations["item_col"] = df_recommendations["item_col"].astype("int64")

    #df_recommendations = df_recommendations["recommendations"].groupby([df_recommendations[user_col]]).apply(list).reset_index()
    df_recommendations = df_recommendations[item_col].groupby([df_recommendations[user_col]]).apply(list).reset_index()

    return df_recommendations

### Is favorite

In [13]:
df_sp_is_favorite, train, test = pyspark_df_with_train_test_split(df_als_is_favorite, 0.2, "profile", "favorites_anime", "is_favorite")

                                                                                

In [14]:
model = als_tuning_and_predict(train, test, "profile", "favorites_anime", "is_favorite")

25/05/30 12:29:52 WARN DAGScheduler: Broadcasting large task binary with size 1250.4 KiB
25/05/30 12:29:54 WARN DAGScheduler: Broadcasting large task binary with size 1276.2 KiB
25/05/30 12:29:54 WARN DAGScheduler: Broadcasting large task binary with size 1277.7 KiB
25/05/30 12:29:55 WARN DAGScheduler: Broadcasting large task binary with size 1279.2 KiB
25/05/30 12:29:57 WARN DAGScheduler: Broadcasting large task binary with size 1280.5 KiB
25/05/30 12:29:57 WARN DAGScheduler: Broadcasting large task binary with size 1279.4 KiB
25/05/30 12:29:58 WARN DAGScheduler: Broadcasting large task binary with size 1280.7 KiB
25/05/30 12:29:59 WARN DAGScheduler: Broadcasting large task binary with size 1281.5 KiB
25/05/30 12:29:59 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/05/30 12:29:59 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/05/30 12:29:59 WARN DAGScheduler: Broadcasting large task binary wit

Rank:  1
MaxIter:  5
RegParam:  0.05


25/05/30 12:39:53 WARN DAGScheduler: Broadcasting large task binary with size 1294.5 KiB
25/05/30 12:39:53 WARN DAGScheduler: Broadcasting large task binary with size 1293.1 KiB
25/05/30 12:39:53 WARN DAGScheduler: Broadcasting large task binary with size 1245.5 KiB


RMSE=0.048186799301785245


25/05/30 12:39:54 WARN DAGScheduler: Broadcasting large task binary with size 1342.1 KiB


In [21]:
df_is_favorite_based_reco = create_predictions_pandas_dataframe(model, df_sp_is_favorite, "profile", "favorites_anime", "is_favorite")

display(df_is_favorite_based_reco.head())

25/05/30 12:44:32 WARN DAGScheduler: Broadcasting large task binary with size 1341.7 KiB
25/05/30 12:44:34 WARN DAGScheduler: Broadcasting large task binary with size 1333.2 KiB
25/05/30 12:44:41 WARN DAGScheduler: Broadcasting large task binary with size 1223.6 KiB


Unnamed: 0,profile,favorites_anime
0,-----noname-----,"[1890.0, 1906.0, 2536.0, 1861.0, 2498.0]"
1,---SnowFlake---,"[1890.0, 1906.0, 2536.0, 2498.0, 1861.0]"
2,--Mizu--,"[1906.0, 1890.0, 2498.0, 1861.0, 2536.0]"
3,--Sunclaudius,"[1890.0, 1906.0, 2536.0, 2498.0, 1861.0]"
4,--animeislife--,"[2498.0, 1890.0, 1906.0, 1861.0, 2536.0]"


In [35]:
df_is_favorite_based_reco.to_csv("../data/als_is_favorite_based_reco.csv", index=False)

### Reviews score

In [23]:
df_sp_reviews_score, train, test = pyspark_df_with_train_test_split(df_als_reviews_score, 0.2, "profile", "anime_uid", "score")

In [25]:
model = als_tuning_and_predict(train, test, "profile", "anime_uid", "score")

25/05/30 12:45:15 WARN DAGScheduler: Broadcasting large task binary with size 2005.9 KiB
25/05/30 12:45:16 WARN DAGScheduler: Broadcasting large task binary with size 2031.7 KiB
25/05/30 12:45:16 WARN DAGScheduler: Broadcasting large task binary with size 2033.1 KiB
25/05/30 12:45:18 WARN DAGScheduler: Broadcasting large task binary with size 2034.7 KiB
25/05/30 12:45:19 WARN DAGScheduler: Broadcasting large task binary with size 2036.0 KiB
25/05/30 12:45:19 WARN DAGScheduler: Broadcasting large task binary with size 2034.9 KiB
25/05/30 12:45:20 WARN DAGScheduler: Broadcasting large task binary with size 2036.2 KiB
25/05/30 12:45:20 WARN DAGScheduler: Broadcasting large task binary with size 2037.0 KiB
25/05/30 12:45:20 WARN DAGScheduler: Broadcasting large task binary with size 2040.1 KiB
25/05/30 12:45:20 WARN DAGScheduler: Broadcasting large task binary with size 2041.5 KiB
25/05/30 12:45:20 WARN DAGScheduler: Broadcasting large task binary with size 2042.9 KiB
25/05/30 12:45:20 WAR

Rank:  1
MaxIter:  15
RegParam:  0.15


25/05/30 13:12:43 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
25/05/30 13:12:43 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
25/05/30 13:12:43 WARN DAGScheduler: Broadcasting large task binary with size 2001.1 KiB


RMSE=1.901189841457557


25/05/30 13:12:44 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


In [26]:
df_score_reviews_based_reco = create_predictions_pandas_dataframe(model, df_sp_reviews_score, "profile", "anime_uid", "score")

display(df_score_reviews_based_reco.head())

25/05/30 13:12:44 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
25/05/30 13:12:48 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
25/05/30 13:12:57 WARN DAGScheduler: Broadcasting large task binary with size 1978.5 KiB


Unnamed: 0,profile,anime_uid
0,-----noname-----,"[40040.0, 29807.0, 19219.0, 1550.0, 32230.0]"
1,---SnowFlake---,"[19219.0, 29807.0, 40040.0, 1550.0, 32230.0]"
2,---was-----,"[19219.0, 40040.0, 29807.0, 32230.0, 1550.0]"
3,--EYEPATCH--,"[19219.0, 29807.0, 40040.0, 32230.0, 1550.0]"
4,--Mizu--,"[32230.0, 19219.0, 1550.0, 29807.0, 40040.0]"


In [37]:
df_score_reviews_based_reco.to_csv("../data/als_reviews_score_based_reco.csv", index=False)

### Is favorite score

In [29]:
df_sp_favorite_score, train, test = pyspark_df_with_train_test_split(df_als_favorite_score, 0.2, "profile", "favorites_anime", "score")

In [31]:
model = als_tuning_and_predict(train, test, "profile", "favorites_anime", "score")

25/05/30 13:13:55 WARN DAGScheduler: Broadcasting large task binary with size 1250.8 KiB
25/05/30 13:13:55 WARN DAGScheduler: Broadcasting large task binary with size 1275.4 KiB
25/05/30 13:13:55 WARN DAGScheduler: Broadcasting large task binary with size 1276.9 KiB
25/05/30 13:13:57 WARN DAGScheduler: Broadcasting large task binary with size 1278.4 KiB
25/05/30 13:13:57 WARN DAGScheduler: Broadcasting large task binary with size 1279.7 KiB
25/05/30 13:13:57 WARN DAGScheduler: Broadcasting large task binary with size 1278.6 KiB
25/05/30 13:13:58 WARN DAGScheduler: Broadcasting large task binary with size 1279.9 KiB
25/05/30 13:13:58 WARN DAGScheduler: Broadcasting large task binary with size 1280.7 KiB
25/05/30 13:13:58 WARN DAGScheduler: Broadcasting large task binary with size 1283.8 KiB
25/05/30 13:13:58 WARN DAGScheduler: Broadcasting large task binary with size 1285.2 KiB
25/05/30 13:13:58 WARN DAGScheduler: Broadcasting large task binary with size 1286.6 KiB
25/05/30 13:13:58 WAR

Rank:  1
MaxIter:  15
RegParam:  0.15


25/05/30 13:23:35 WARN DAGScheduler: Broadcasting large task binary with size 1321.4 KiB
25/05/30 13:23:35 WARN DAGScheduler: Broadcasting large task binary with size 1320.1 KiB
25/05/30 13:23:35 WARN DAGScheduler: Broadcasting large task binary with size 1245.9 KiB


RMSE=0.2875007824864007


25/05/30 13:23:36 WARN DAGScheduler: Broadcasting large task binary with size 1369.9 KiB


In [33]:
df_is_favorite_score_based_reco = create_predictions_pandas_dataframe(model, df_sp_favorite_score, "profile", "favorites_anime", "score")

display(df_is_favorite_score_based_reco.head())

25/05/30 13:24:20 WARN DAGScheduler: Broadcasting large task binary with size 1368.6 KiB
25/05/30 13:24:22 WARN DAGScheduler: Broadcasting large task binary with size 1360.2 KiB
25/05/30 13:24:29 WARN DAGScheduler: Broadcasting large task binary with size 1223.3 KiB


Unnamed: 0,profile,favorites_anime
0,-----noname-----,"[38917.0, 37403.0, 12067.0, 1066.0, 36688.0]"
1,---SnowFlake---,"[38917.0, 37403.0, 36688.0, 12067.0, 1066.0]"
2,--Mizu--,"[12067.0, 37403.0, 36688.0, 1066.0, 38917.0]"
3,--Sunclaudius,"[1066.0, 37403.0, 12067.0, 36688.0, 38917.0]"
4,--animeislife--,"[38917.0, 37403.0, 36688.0, 12067.0, 1066.0]"


In [39]:
df_is_favorite_score_based_reco.to_csv("../data/als_favorite_score_based_reco.csv", index=False)