# Top 5 Genres by User

### Environment

The directory structure for this notebook is as follows:

- root
    - group
        - Top 5 Genres by User.ipynb
        - users_genres.csv (will output at the end of the notebook)
    - ml-100k
        - u.item
        - u.data

### Introduction

In this Jupyter notebook, we use `pyspark` to determine the top 5 genres for each user based on the average rating for movies in each genre. To determine the top 5, we only include genres for which users have more than 10 movie ratings

To begin our ranking, we import a few necessary pacakges from `pyspark`. Please ensure you have `pyspark` loaded in the virtual environment used to run this notebook. If your receive an error message telling you there is no module named `pyspark`, please see this [Stack Overflow](https://stackoverflow.com/questions/34302314/no-module-name-pyspark-error/34347373) post to help troubleshoot. Just make sure that if you are using anaconda, you install `pyspark` with `conda install pyspark`.

In [1]:
# load pyspark dependencies
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import expr

### The Setup

Before we can do anything with Spark, we must instantiate our session. Additionally, we will grab the Spark context from the session to help with importing the user data.

In [2]:
# create a Spark session
spark = SparkSession.builder.config("spark.sql.warehouse.dir", "file:///C:/temp").appName("UsersRankedGenres").getOrCreate()

# get the Spark context
sc = spark.sparkContext

Next, we define a mapping function to import the data into the Spark context. Then, we read the `u.item` file and convert it to a DataFrame. Finally, we create a table for use with SQL queries.

In [3]:
# define a function to read in items
def item_mapper(line):
    fields = line.split("|")
    return Row( \
        MovieId = int(fields[0]), \
        MovieTitle = fields[1], \
        ReleaseDate = fields[2], \
        VideoReleaseDate = fields[3], \
        IMDBURL = fields[4], \
        Unknown = fields[5], \
        Action = fields[6], \
        Adventure = fields[7], \
        Animation = fields[8], \
        Childrens = fields[9], \
        Comedy = fields[10], \
        Crime = fields[11], \
        Documentary = fields[12], \
        Drama = fields[13], \
        Fantasy = fields[14], \
        FilmNoir = fields[15], \
        Horror = fields[16], \
        Musical = fields[17], \
        Mystery = fields[18], \
        Romance = fields[19], \
        SciFi = fields[20], \
        Thriller = fields[21], \
        War = fields[22], \
        Western = fields[23])
    
# get the items text file
items_rdd = sc.textFile("../ml-100k/u.item").map(item_mapper)

# create items DataFrame and store it as a table
items = spark.createDataFrame(items_rdd).cache()
items.createOrReplaceTempView("items")

items.show(5)

We currently have a "wide" DataFrame with each genre represented by its own column. In order for us to create a list of each users top 5 genres in the application, we want the DataFrame to the "long" with a single column that holds the genres. That being the case, we "unpivot" the individual genre columns using a `stack` expression.

In [4]:
# create a stack expression to unpivot the items DataFrame
unpivot = "stack(19, 'Unknown', Unknown, 'Action', Action, 'Adventure', Adventure, 'Animation', Animation, \
              'Childrens', Childrens, 'Comedy', Comedy, 'Crime', Crime, 'Documentary', Documentary, \
              'Drama', Drama, 'Fantasy', Fantasy, 'FilmNoir', FilmNoir, 'Horror', Horror, 'Musical', \
              Musical, 'Mystery', Mystery, 'Romance', Romance, 'SciFi', SciFi, 'Thriller', Thriller, \
              'War', War, 'Western', Western) AS (Genre, Value)"

# unpivot the items DataFrame so that the genres are in a single column and store it as a table
item_genres = items.select("MovieId", expr(unpivot)).where("Value > 0").select("MovieId", "Genre")
item_genres.createOrReplaceTempView("item_genres")

item_genres.show(5)

Now that we have the items and associated genres in a "long" DataFrame, we can join that to the movie ratings to determine each users average rating for each genre. To do that, we first need to read in the ratings file.

We begin by defining a mapping function to import the data into the Spark context. Then, we read the `u.data` file and convert it to a DataFrame. Finally, we create a table for use with SQL queries.

In [8]:
# define a function to read in rating
def rating_mapper(line):
    fields = line.split("\t")
    return Row( \
        UserId = int(fields[0]), \
        ItemId = int(fields[1]), \
        Rating = int(fields[2]))
    
# get the rating text file
ratings_rdd = sc.textFile("../ml-100k/u.data").map(rating_mapper)

# create ratings DataFrame and store it as a table
ratings = spark.createDataFrame(ratings_rdd).cache()
ratings.createOrReplaceTempView("ratings")

ratings.show(5)

+------+------+------+
|UserId|ItemId|Rating|
+------+------+------+
|   196|   242|     3|
|   186|   302|     3|
|    22|   377|     1|
|   244|    51|     2|
|   166|   346|     1|
+------+------+------+
only showing top 5 rows



We use Spark SQL now to join the `item_genres` table to the ratings table. Using that joined table, we determine the total number of ratings and the average of all ratings for each movie, but only return those that have more than 10 ratings.

After storing the aggregating ratings query results as a DataFrame, we use another SQL query to rank each genre by user, then return the top 5 genres for each user. Finally, we save the top 5 genres DataFrame as a CSV for consumption in the recommender application.

In [9]:
# join the genres to the ratings
joined = ratings.join(item_genres, ratings.ItemId == item_genres.MovieId)
joined.createOrReplaceTempView("joined")

# aggregate ratings by user and genre
agg = spark.sql( \
    "SELECT UserId, Genre, COUNT(*) AS TotalRatings, ROUND(AVG(Rating), 3) AS AverageRating " \
    "FROM joined " \
    "GROUP BY UserId, Genre " \
    "HAVING COUNT(*) >= 10 " \
    "ORDER BY UserId, AverageRating DESC")

# store the agg DataFrame as a table
agg.createOrReplaceTempView("agg")

# get the top 5 genres for each user
user_genres = spark.sql( \
        "SELECT UserId, Genre, ROW_NUMBER() OVER (PARTITION BY UserId ORDER BY AverageRating DESC) AS Rank, TotalRatings, AverageRating " \
        "FROM agg " \
        "ORDER BY UserId, AverageRating DESC").where("Rank <= 5")

user_genres.show(10)

# save the top 5 genre DataFrame as a CSV for use in the recommender application
user_genres.toPandas().to_csv("user_genres.csv", index = False)

+------+--------+----+------------+-------------+
|UserId|   Genre|Rank|TotalRatings|AverageRating|
+------+--------+----+------------+-------------+
|     1|   SciFi|   1|          43|          4.0|
|     1| Romance|   2|          44|        3.932|
|     1|   Drama|   3|         107|        3.925|
|     1|     War|   4|          25|         3.68|
|     1|Thriller|   5|          52|        3.615|
|     2| Romance|   1|          16|        4.125|
|     2|   Drama|   2|          35|        3.829|
|     2|  Comedy|   3|          16|        3.813|
|     2|  Action|   4|          10|          3.8|
|     2|Thriller|   5|          12|        3.583|
+------+--------+----+------------+-------------+
only showing top 10 rows

