**Initial Setup**

1. First, you will setup your CoLab environment. Run the cell below.

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 61.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=aed30fae8e94db687f1251c0d612dcd1f9a00a46b27c74b21219d217a9726b43
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-m

Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

Download both anime.csv and rating.csv, and store it in your google drive. It is advisable to create a separate project folder, where you can store this dataset and also your code.

The script will give you the id of the two files in your drive.

In [None]:
file_list = drive.ListFile({'q': "'1Oi8cMnAfJVZH9-FyXGxwOrGGCIkkB7uy' in parents"}).GetList()
for f in file_list:
  print('title: %s, id: %s' % (f['title'], f['id']))

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 33, in <module>
    from oauth2client.contrib.locked_file import LockedFile
ModuleNotFoundError: No module named 'oauth2client.contrib.locked_file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 37, in <module>
    from oauth2client.locked_file import LockedFile
ModuleNotFoundError: No module named 'oauth2client.locked_file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/googleapiclient/discovery_cache/__init__.py", line 44, in autodetect
    from . import file_cache
  File "/usr/local/lib/python3.7/dist-packages/googleapiclient/discovery_cache/file_cache.py", line 41, in <module>
    "file_cach

title: rating.csv, id: 1f76dQZxRB1fNaReBv_DnUDVkIXNm7mw9
title: anime.csv, id: 1TppJoj4QVJlc_HML20xmH847Brrw0Zfc


**Getting the data downloaded earlier**

In [None]:
# Change the id, if it differs from the one below.
id='1TppJoj4QVJlc_HML20xmH847Brrw0Zfc'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('anime.csv')

id='1f76dQZxRB1fNaReBv_DnUDVkIXNm7mw9'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('rating.csv')

**importing the useful package**

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# Student Activity: Add your packages here.
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import array, col, lit, struct, avg
from pyspark.sql.types import IntegerType
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer,IndexToString
 


**This step initializes the Spark context.**

In [None]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if your Colab is running with a local runtime).

In [None]:
spark


**Identifying and describing the number of columns in the two dataset files.**

In [None]:
anime_data = spark.read.csv('/content/anime.csv',header=True)
ratings_data = spark.read.csv('/content/rating.csv',header=True)

In [None]:
# describe the columns in the dataset
def print_columns(data):
  no_columns=len(data.columns)
  print ('The number of columns in the dataset is ',no_columns)
  print(data.columns)
  print(data.printSchema())


print_columns(anime_data)

The number of columns in the dataset is  7
['anime_id', 'name', 'genre', 'type', 'episodes', 'rating', 'members']
root
 |-- anime_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- members: string (nullable = true)

None


In [None]:
print_columns(ratings_data)

The number of columns in the dataset is  3
['user_id', 'anime_id', 'rating']
root
 |-- user_id: string (nullable = true)
 |-- anime_id: string (nullable = true)
 |-- rating: string (nullable = true)

None




**Combining the two datasets and identifing the key common column**

In [None]:
# Remove rating from anime_dataset 
anime_data = anime_data.drop('rating')

# merge data on the anime_id column

data = anime_data.join(ratings_data, on='anime_id', how='inner')
data.printSchema()

root
 |-- anime_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- members: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: string (nullable = true)





 **Finding the top 10 anime based on rating. The analysis is presented in a tabular form**

**Finding the top 10 anime with the most episodes. The analysis is presented in a tabular form**

In [None]:
# top 10 anime based on rating
data_rating=data.withColumn('rating_',data['rating'].cast('int')).groupBy('name').agg({"rating_":"avg"})\
  .withColumnRenamed('avg(rating_)', "rating")
data_rating = data_rating.sort(data_rating['rating'].desc())
data_rating.show(10,truncate=200)


+----------------------------------------------------------------+-----------------+
|                                                            name|           rating|
+----------------------------------------------------------------+-----------------+
|                               Warui no wo Taose!! Salaryman Man|             10.0|
|                                                      Shiroi Zou|             10.0|
|                                              Choegang Top Plate|             10.0|
|                                      STAR BEAT!: Hoshi no Kodou|             10.0|
|                                                      Shiranpuri|              9.0|
|Yakushiji Ryouko no Kaiki Jikenbo: Hamachou, Voice &amp; Fiction|              9.0|
|                                               Tang Lang Bu Chan|              9.0|
|                                                      Doukyuusei|              9.0|
|                                                   Steins;Gate 0

In [None]:
 # top 10 anime with the most episodes
data_episode=data.withColumn('total_episode',data['episodes'].cast('int')).groupBy('name').agg({'total_episode':"max"})\
  .withColumnRenamed('max(total_episode)', "Total_episodes")
data_episode = data_episode.sort(data_episode["Total_episodes"].desc())
data_episode.show(10,truncate=200)



+-------------------------------------+--------------+
|                                 name|Total_episodes|
+-------------------------------------+--------------+
|                           Oyako Club|          1818|
|                      Doraemon (1979)|          1787|
|               Kirin Monoshiri Yakata|          1565|
|   Manga Nippon Mukashibanashi (1976)|          1471|
|                     Hoka Hoka Kazoku|          1428|
|Monoshiri Daigaku: Ashita no Calendar|          1274|
|                Sekai Monoshiri Ryoko|          1006|
|                       Kotowaza House|           773|
|       Shima Shima Tora no Shimajirou|           726|
|                    Ninja Hattori-kun|           694|
+-------------------------------------+--------------+
only showing top 10 rows



In [None]:
# top 10 genre based on user rating
data_genre=data.withColumn('rating_',data['rating'].cast('int')).groupBy('genre').agg({'rating_':"avg"})\
  .withColumnRenamed('avg(rating_)', "rating")
data_data_genre = data_genre.sort(data_genre["rating"].desc())
data_genre.show(10,truncate=200)




+--------------------------------------------------------------+------------------+
|                                                         genre|            rating|
+--------------------------------------------------------------+------------------+
|Action, Comedy, Demons, Fantasy, Martial Arts, School, Shounen| 6.954316972205795|
|                       Comedy, Ecchi, Romance, School, Shounen| 5.443392919733614|
|                               Action, Drama, Seinen, Thriller| 7.054484492875105|
|                                       Fantasy, Magic, Shounen| 5.120701754385965|
|                       Comedy, Fantasy, Harem, Romance, Shoujo| 5.376404494382022|
|               Action, Adventure, Drama, Martial Arts, Shounen| 5.417241379310345|
|    Action, Adventure, Mecha, Military, Romance, Sci-Fi, Space| 6.021739130434782|
|                              Action, Adventure, Comedy, Mecha| 3.909090909090909|
|                                         Horror, Mecha, Sci-Fi|3.5757575757


## Designing a collaborative filter-based recommendation system. 

In [None]:
#filter dataset for recommender system
data = data.selectExpr('user_id','name','rating')
data = data.withColumn('rating_',data['rating'].cast('int')).withColumn('user_id',data['user_id'].cast('int'))\
              .groupBy('name','user_id').agg({'rating_':"avg"})\
              .withColumnRenamed('avg(rating_)', "rating")
indexer = StringIndexer(inputCol="name", outputCol="name_idx") 
indexer=indexer.fit(data) 
data_transformed = indexer.transform(data)


In [None]:
data_transformed.columns


['name', 'user_id', 'rating', 'name_idx']

In [None]:

# dataset split into training and testing set
(training_data, test_data) = data_transformed.randomSplit([0.75, 0.25])
# training the model
als_model = ALS(implicitPrefs=True,ratingCol='rating',userCol="user_id", itemCol="name_idx",maxIter=10)
model = als_model.fit(training_data)


**Give example of best three anime recommendations for minimum of 10 users**

In [None]:
labels = indexer.labels

idx_to_name={x:y for x,y in enumerate(list(labels))}


In [None]:
collect =[]
ids =  [1000, 200,100,450,444,700,123,456,2345,76,233,457]
for id in ids:
  output =  model.recommendForAllUsers(3).filter(col('user_id')==id).select("recommendations").collect()
  anime = []
  for item in output[0][0]:
    anime.append(idx_to_name[item.name_idx])
  collect.append((id,anime))



In [None]:
columns = ['id','recommended anime']
# collect[0]
anime = [x[1] for x in collect]
collect_dict =pd.DataFrame({'id':ids,'recommended anime':anime })
collect_dict

Unnamed: 0,id,recommended anime
0,1000,"[Death Note, Ouran Koukou Host Club, Sen to Ch..."
1,200,"[Cowboy Bebop, Ghost in the Shell, Akira]"
2,100,"[Death Note, Shingeki no Kyojin, Sword Art Onl..."
3,450,"[Suzumiya Haruhi no Yuuutsu, Lucky☆Star, Elfen..."
4,444,"[Sword Art Online, No Game No Life, Shingeki n..."
5,700,"[Death Note, Elfen Lied, Naruto]"
6,123,"[Free!, No.6, Free!: Eternal Summer]"
7,456,"[Shingeki no Kyojin, Kuroshitsuji, Tokyo Ghoul]"
8,2345,"[Sword Art Online, Angel Beats!, Sakurasou no ..."
9,76,"[Tonari no Totoro, Howl no Ugoku Shiro, Majo n..."
