For this project, we will create an anime recommendation system. A user will input his/hers genre of choice and the model will output the top 10 anime of that genre, along with their descriptions. 

The Data used for this project consist of about 17.562 anime and the preference from 325.772 different users. 

Datasets Description:

The Anime_df has 16214 rows and 5 columns:

* MAL_IDName: the anime id (Primary key) 
* Name: the title of the anime
* Score: the average rating (however we will drop that column)
* Genres: the genres of each anime entry
* Synopsis: a brief description of the anime.

The Ratings_df has 57633278 rows and 3 columns:

* user_id: Id of the user who left a rating
* anime_id: anime id (foreigh key)
* rating: rating from 1 to 10

The Big Data problem surrounding this project is Volume, because of the large number of rows of the Ratings_df. Moreover, we will perform analysis on batch data.

# Setting Up Pyspark

In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [None]:
! pip install pyspark

In [None]:
import findspark
findspark.init()
findspark.find()

In [None]:
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import Row
import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
from pyspark.sql import functions as F
from pyspark.sql.functions import explode 
from pyspark.sql.functions import rank
from pyspark.sql.functions import col 
from pyspark.sql.functions import avg 
from pyspark.sql.functions import split
from pyspark.sql.functions import round
import time
from functools import reduce

In [None]:
spark = SparkSession.builder\
        .master('local[5]')\
        .appName('Anime recommendation system')\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

#Importing Data

In [None]:
Anime_df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/anime_with_synopsis.csv', header=True, inferSchema=True)
Ratings_df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/rating_complete.csv', header=True, inferSchema=True)

In [None]:
def withColumnRenamed(existingName: any, newName: any): 
  DataFrame

Anime_df = Anime_df.withColumnRenamed("sypnopsis","synopsis")

In [None]:
Anime_df.show(5)

In [None]:
Ratings_df.show(5)

# Data Processing

For the project, we will use the Anime_df as well as the Ratings_df. We will drop the 'Score' column for the Anime_df as we will create a model that will aggregate the ratings of the Ratings_df and then output the score.

In [None]:
Anime_df = Anime_df.drop('Score')

Then we will explode the genre column, so each row have just one gerne

In [None]:
Anime_df.printSchema()

Checking for Null values:

In [None]:
Anime_df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in Anime_df.columns]).show()

In [None]:
Ratings_df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in Ratings_df.columns]).show()

As we can see, Anime_df has 8 null values in the synopsis column, however, we will not drop these rows.

Printing the shapes of the above dataframes:

In [None]:
print(f'Anime_df has {Anime_df.count()} rows and {len(Anime_df.columns)} columns')
print(f'Ratings_df has {Ratings_df.count()} rows and {len(Ratings_df.columns)} columns')

Because the genre column of the Anime_df consists of multiple genres per anime, we will split them, resulting in each row containing only one genre per anime.

In [None]:
# Converting each entry of the 'Genres' column to a list of strings, for them to be exploded
Anime_df = Anime_df.withColumn('Genres', split(Anime_df['Genres'], ', '))
Anime_df.show()

Explode the 'Genre' column

In [None]:
Anime_df.printSchema()

In [None]:
Anime_df = Anime_df.select( '*' , explode('Genres').alias('Genres2'))

In [None]:
Anime_df = Anime_df.drop('Genres')
Anime_df = Anime_df.withColumnRenamed('Genres2', 'Genres')

In [None]:
Anime_df.show()

In [None]:
Ratings = Ratings_df.select(['anime_id' , 'rating'])
Ratings.show(5)

### Visualization of the Genre column

In [None]:
# Group column by distinct values and count occurrences
df_count = Anime_df.groupBy('Genres').count()

# Convert DataFrame to Pandas
df_pd = df_count.toPandas()

# Create bar plot
df_pd.plot(kind='bar', x='Genres', y='count' , figsize=(10,6))
plt.show()

As we can observe, the Genre column is unbalanced as the counts differ. Comedy appears to be the most dominant genre of the dataframe contrastly with '7.14' and '6.45' to be the least popular genres.

#First Model Design 

For the first phase, we will process the 'Ratings' Dataframe, to compute the average rating of each anime. To achieve this, we will first turn the dataframe into an RDD. Then, with a Map function, we will create key-value pairs, where keys will be the anime ids and values their corresponding ratings. Afterward, the key-value pairs will be grouped by key creating key-lists of value pairs that then be input into a reduce function. Lastly, the reduce function will take the key-lists of values and output the average rating of each key.

In [None]:
def average_ratings(df):

    start_time = time.time()

    # Turning the dataframe into an RDD
    RDD = df.rdd

# Map function:
    # Performing the map function in parallel on each chunk, in which it will create key-value pairs, key: anime_id, value: rating:
    Map = RDD.map(lambda x : (x[0], x[1]) )

#Group-By key function:
    # Using the groupByKey() function to group the key-value pairs by key, creating key-lists of values pairs:
    GroupBy = Map.groupByKey()

#Reduce function:
    # Computing the average rating for each key:
    Reduce = GroupBy.mapValues(lambda x : sum(x) / len(x)) 

    end_time = time.time()
    print('Map-Reduce time: ', end_time - start_time)

    # Converting the RDD back to a DataFrame
    Avrg_df = Reduce.toDF(['anime_id', 'score'])


    end_time = time.time()
    print('Processing time: ', end_time - start_time)

    return Avrg_df

In [None]:
Average_Ratings = average_ratings(Ratings)
Average_Ratings.show()

## Second Approach to the First Model:

Another approach to compute the average rating of each anime in the Ratings Dataframe, would be to first group by the anime_id column. This is a similar procedure to the Map and GroupByKey functions. The output is key-lists of values pairs, where the keys are the anime ids paired with lists of their corresponding ratings. Then, we will apply a reduction function by computing the average rating of each anime

In [None]:
def average_ratings(df):

    # We will also compute the processing time
    start_time = time.time()

    # Grouping by the anime_id
    df_grouped = df.groupBy('anime_id')
    # we will aggregate the results computing the average rating of each transaction
    df_avg = df_grouped.agg(avg('rating').alias('Score'))

    end_time = time.time()
    print('Processing time: ', end_time - start_time)


    return df_avg

In [None]:
Average_Ratings = average_ratings(Ratings)
Average_Ratings.show()

Next step is to join the two tables, Average_Ratings and Anime_df, on anime_id.

In [None]:
Anime_DF = Anime_df.join(Average_Ratings, Anime_df.MAL_ID == Average_Ratings.anime_id, 'inner')
Anime_DF = Anime_DF.drop('MAL_ID')
Anime_DF.show()

#Second Model Design 

In the second phase, we will collect the top 10 animes of each genre based on their score and store them in a Dataframe. This will be achieved by first turning the Anime_DF into an RDD and then applying a map function emitting key-value pairs, the key will be the genre and the values will be the name, synopsis, and score. Then, we will group by genre, creating key-lists of values pairs. Lastly, we will sort the key-lists of values pairs by score and save the top 10 animes in a new dataframe.

In [None]:
def top_10_by_genre(df):

    start_time = time.time()

    # Converting DataFrame to RDD
    rdd = df.rdd

# Map function:
    # Performing the map function, in which it will creat key-value pairs, key: genre , value: ( Name, Synopsis , Score ):
    Map = rdd.map(lambda x: (x[2], (x[0], x[1], x[4])))

# Group by key:
    # Grouping by key, and creating key-lists of value pairs
    List_of_values = Map.groupByKey().mapValues(lambda x: list(x))

# Sorting phase:

    # We will create an empty list to append the top 10 animes of each genre and then use it to create a dataframe:
    top_10_list = []
    
    # Creating a for loop, firstly sorting the values of each genre by their score, collecting the top 10 and lastly appending the results in the list
    for genre, values in List_of_values.collect():
        top10 = sorted( values, key = lambda x: x[2] , reverse=True )[:10]

        for row in top10:
            top_10_list.append(Row(genre, row[0] , row[1] , row[2]))

    Top10_DF = spark.createDataFrame(top_10_list)

    # Processing the Dataframe
    Top10_DF = Top10_DF.withColumnRenamed('_1', 'Genre')
    Top10_DF = Top10_DF.withColumnRenamed('_2', 'Name')
    Top10_DF = Top10_DF.withColumnRenamed('_3', 'Synopsis')
    Top10_DF = Top10_DF.withColumn('Score', round(Top10_DF['_4'], 2))
    Top10_DF= Top10_DF.drop('_4')

    end_time = time.time()
    print('Processing time: ', end_time - start_time)
 
    return Top10_DF


In [None]:
Top10 = top_10_by_genre(Anime_DF)
Top10.show()

Now we will create a function, that will asks a user for genres and then return the top animes that belong to those genres.

In [None]:
def Anime_Recommendation_System(df):

  print('-----------------------------------------------------------------------------------------------------------------------------------------------------------\n')
  print('Welcome to the Anime Recommendation System!\n')
  print('You will be asked to input your genre of choice and the system will return the top 10 animes that correspond to that genre')
  print('Lets get started!\n')
  print('-----------------------------------------------------------------------------------------------------------------------------------------------------------\n')

  Genres = input( 'What genre you are interested in? \n')
  
  df.filter(df.Genre == Genres).select('Name', 'Synopsis').show()

In [None]:
Anime_Recommendation_System(Top10)

#Bibliography

Setting up Pyspark on google collab: 

https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/

https://stackoverflow.com/questions/55240940/error-while-installing-spark-on-google-colab

Datasets:

https://www.kaggle.com/datasets/hernan4444/anime-recommendation-database-2020

Google Cloud Dataproc:

https://holowczak.com/getting-started-with-pyspark-on-google-cloud-platform-dataproc/9/
