# Movie Recommender

This Jupyter Notebook is part 3 of 3 to create a Recommender System using PySpark and the [MovieLens](https://grouplens.org/datasets/movielens/) dataset from GroupLens.   It uses the small dataset for education and development, which contains ~100,000 ratings from ~9,000 movies by ~600 users.  It was last updated September 2018 (as of 3/3/2022).  The ratings were created between March 29th, 1996 and September 24th, 2018.  More information can be found [here](https://files.grouplens.org/datasets/movielens/ml-latest-small-README.html).

We are interested in creating a recommender system that can accurately predict the ratings of movies for a given user.  We will be using collaborative-filtering first.

**Note**: The culmination of this project is a separate journal-formatted paper, so this Jupyter Notebook will have less text than usual.

Notebook breakdown:
- **Part 1:** Importing and EDA
- **Part 2:** Collaborative Filtering
- **Part 3:** Content-based Filtering

## Configuration:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#INPUT_DIRECTORY = "/content/drive/MyDrive/Grad School/DSCI 632/MovieRecommender/data/" #for google mount
INPUT_DIRECTORY = "/content/drive/MyDrive/DSCI632/Project/data/"
#/content/drive/MyDrive/DSCI632/Project/data
# INPUT_DIRECTORY = "./data/" #for jupyter notebook

In [None]:
%%capture 
#prevent large printout with %%capture

#Download Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

#Install Apache Spark 3.2.1 with Hadoop 3.2, get zipped folder
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

#Unzip folder
!tar xvf spark-3.2.1-bin-hadoop3.2.tgz

#Install findspark, pyspark 3.2.1
!pip install -q findspark
!pip install pyspark==3.2.1

#Set variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.1-bin-hadoop3.2"

## Import Data to PySpark


In [None]:
#create SparkSession and SparkContext objects
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

In [None]:
#Import data
ratings_file_path = INPUT_DIRECTORY + "ratings.txt"
ratings = spark.read.csv(ratings_file_path, header=True, inferSchema=True)
ratings.show()

tags_file_path = INPUT_DIRECTORY + "tags.txt"
tags = spark.read.csv(tags_file_path, header=True, inferSchema=True)
tags.show()

movies_file_path = INPUT_DIRECTORY + "movies.txt"
movies = spark.read.csv(movies_file_path, header=True, inferSchema=True)
movies.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows

+------+-------+-----------------+----------+
|userId|movieId|              tag| timestamp|
+------+-------+-----------------+----------+
|     2|  60756|    

In [None]:
#setting up Content based recommender
from pyspark.sql.functions import collect_list, col, array_distinct, lit, size


#merge spark dataframes
tags_and_movies = movies.join(tags, "movieId")
tags_and_movies.show()




+-------+--------------------+--------------------+------+----------------+----------+
|movieId|               title|              genres|userId|             tag| timestamp|
+-------+--------------------+--------------------+------+----------------+----------+
|      1|    Toy Story (1995)|Adventure|Animati...|   567|             fun|1525286013|
|      1|    Toy Story (1995)|Adventure|Animati...|   474|           pixar|1137206825|
|      1|    Toy Story (1995)|Adventure|Animati...|   336|           pixar|1139045764|
|      2|      Jumanji (1995)|Adventure|Childre...|   474|            game|1137375552|
|      2|      Jumanji (1995)|Adventure|Childre...|    62|  Robin Williams|1528843907|
|      2|      Jumanji (1995)|Adventure|Childre...|    62|magic board game|1528843932|
|      2|      Jumanji (1995)|Adventure|Childre...|    62|         fantasy|1528843929|
|      3|Grumpier Old Men ...|      Comedy|Romance|   289|             old|1143424860|
|      3|Grumpier Old Men ...|      Comedy|

In [None]:
#create a tag list for each movie

df_tag_list = tags_and_movies.groupby("movieId", "title", "genres").agg(array_distinct(collect_list(col("tag"))).alias("tag_list"))

#df_tag_list.where(size(df_tag_list.tag_list) > 5).orderBy(size(df_tag_list.tag_list)).count() #135

In [None]:

#setup function to determine movie similarities
from pyspark.sql.functions import collect_list, col, array_distinct, lit, udf

def getSimilarMoviesByJacard(movie_in_id, df_with_tags):
  movie_in_tags_row = df_with_tags[df_with_tags.movieId == movie_in_id].select(['title', 'tag_list']).collect()
  schema = StructType([
  StructField('movieId', IntegerType(), True),
  StructField('tag_list', ArrayType(StringType()), True),
  StructField('jacSim', FloatType(), True)
  ])
  all_jacards = spark.createDataFrame([], schema=schema)

  if movie_in_tags_row and len(movie_in_tags_row[0][1]) > 0:
    movie_in_tag_list = movie_in_tags_row[0][1] #['fun', 'pixar']
    
    movies_tags_list = df_with_tags[['movieId', 'tag_list']]

    print("Tag List for selected Movie:" + str(movie_in_tags_row[0][0]) + str(movie_in_tag_list))

    jac = udf(lambda x: len(set(x).intersection(set(movie_in_tag_list))) / len(set(x).union(set(movie_in_tag_list))))

    movies_tags_list_jac = movies_tags_list.withColumn('jacSim', jac(movies_tags_list["tag_list"]))
    #movies_tags_list_jac = movies_tags_list_jac.sort("jacSim", ascending=False)

    all_jacards = movies_tags_list_jac.filter(movies_tags_list_jac.jacSim > 0.0)
    all_jacards = all_jacards.alias("a").join(movies.alias("b"), col('a.movieId') == col('b.movieId')).select('a.*', "b.title")
  
    all_jacards = all_jacards.sort("jacSim", ascending=False)
  return all_jacards

test = getSimilarMoviesByJacard(541,df_tag_list )

test.show()



Tag List for selected Movie:Blade Runner (1982)['philosophical', 'existentialism', 'dreamlike', 'atmospheric', 'Philip K. Dick', 'mindfuck', 'future', 'cyberpunk', 'artificial intelligence', 'androids', 'robots', 'sci-fi']
+-------+--------------------+-------------------+--------------------+
|movieId|            tag_list|             jacSim|               title|
+-------+--------------------+-------------------+--------------------+
|    541|[philosophical, e...|                1.0| Blade Runner (1982)|
| 176371|[philosophical, m...|               0.25|Blade Runner 2049...|
|  99917|[existentialism, ...|                0.2|Upstream Color (2...|
|   1237|[reflective, phil...|             0.1875|Seventh Seal, The...|
| 180031|[dreamlike, atmos...|0.16666666666666666|The Shape of Wate...|
|  68791|[sequel, sci-fi, ...|0.14285714285714285|Terminator Salvat...|
|   4370|[robots, Steven S...|0.13333333333333333|A.I. Artificial I...|
|    924|[visually appeali...|0.13043478260869565|2001: A

In [None]:

from pyspark.sql.types import StructType,StructField, ArrayType, IntegerType, FloatType, StringType
#Create a user vector for the movies this user has seen

def get_movies_by_user(userId):
  movies_by_user = ratings[ratings.userId==userId].select(['movieId','userId']).distinct().collect()

  movies_vector_user = map(lambda x: x.movieId, movies_by_user)
  movies_vector_user = list(movies_vector_user)
  return movies_vector_user


def get_all_jacards(movieId_list, df_movie):
  schema = StructType([
  StructField('movieId', IntegerType(), True),
  StructField('tag_list', ArrayType(StringType()), True),
  StructField('jacSim', FloatType(), True)
  ])
  all_jacards = spark.createDataFrame([], schema=schema)


  for movieId in movieId_list:
    newDf = getSimilarMoviesByJacard(movieId, df_movie)
    #newDf.show(5)
    all_jacards = all_jacards.union(newDf)
    #all_jacards.show()



  ajc = all_jacards.filter(~all_jacards.movieId.isin(movieId_list))

  finalRecs = ajc.alias("a").join(movies.alias("b"), col('a.movieId') == col('b.movieId')).select('a.*', "b.title")
  finalRecs = finalRecs.sort("jacSim", ascending=False)
  return finalRecs


mvu = get_movies_by_user(406)
print(mvu)

allJacFrame = get_all_jacards(mvu, df_tag_list)
allJacFrame.show()





[2125, 261, 277, 33669, 531, 5620, 56949, 1586, 46972, 135, 724, 2100, 2722, 2137, 1022, 44840, 1282, 2005, 2694, 1644]
Tag List for selected Movie:Little Women (1994)['Louisa May Alcott']
Tag List for selected Movie:Miracle on 34th Street (1994)['Christmas']
Tag List for selected Movie:Secret Garden, The (1993)['In Netflix queue']
Tag List for selected Movie:G.I. Jane (1997)['military']
Tag List for selected Movie:Night at the Museum (2006)['Robin Williams', 'Ben Stiller']
Tag List for selected Movie:Splash (1984)['mermaid']
Tag List for selected Movie:Cinderella (1950)['Disney']
Tag List for selected Movie:Fantasia (1940)['Disney']
Tag List for selected Movie:Big Daddy (1999)['Adam Sandler']
+-------+------------------+------+--------------------+
|movieId|          tag_list|jacSim|               title|
+-------+------------------+------+--------------------+
|   1542|[In Netflix queue]|   1.0|  Brassed Off (1996)|
|    953|       [Christmas]|   1.0|It's a Wonderful ...|
|    317|   

In [None]:
df_ratings_count = ratings.groupBy("userId").count()
df_ratings_count = df_ratings_count.sort("count", ascending=True)
df_ratings_count.show(30)

+------+-----+
|userId|count|
+------+-----+
|    53|   20|
|   442|   20|
|   278|   20|
|   431|   20|
|   194|   20|
|   569|   20|
|   257|   20|
|   320|   20|
|   576|   20|
|   189|   20|
|   595|   20|
|   147|   20|
|   406|   20|
|   207|   20|
|   157|   21|
|   507|   21|
|   547|   21|
|    87|   21|
|    26|   21|
|   598|   21|
|   324|   21|
|   281|   21|
|   439|   21|
|   245|   21|
|   549|   21|
|    37|   21|
|    49|   21|
|   293|   21|
|   364|   21|
|   467|   22|
+------+-----+
only showing top 30 rows



In [None]:
#evaluation of content filtering based on the users with the most movie ratings
from sklearn.model_selection import train_test_split

def evaluate_users_ratings(userId):

  #split the users movies
  umv = get_movies_by_user(userId)
  #we split 50 50 in hope of getting some reccomendations that already have ratings by this user
  x_train ,x_test = train_test_split(umv,test_size=0.5, random_state=7) 

  #get recommendations for the first split

  jac_frame = get_all_jacards(x_train, df_tag_list)
  jac_frame = jac_frame.sort("jacSim", ascending=False)

  total_movies_rated_with_tags = jac_frame.count()

  #see if the movie is in jacframe for recommended movies is in the list of x_test
  #all_jacards.filter(~all_jacards.movieId.isin(movieId_list))
  movies_in_test = jac_frame.filter(jac_frame.movieId.isin(x_test))
  movies_in_test = movies_in_test.withColumn("userId", lit(userId))

  #df = df1.join(df2, (df1.x1 == df2.x1) & (df1.x2 == df2.x2))
  movies_recommended_rating = movies_in_test.join(ratings, (movies_in_test.movieId == ratings.movieId) & (movies_in_test.userId == ratings.userId))

  movies_test_count = movies_in_test.count()
  
  print("Total Rated with tags: " + str(total_movies_rated_with_tags))
  print("Total movies recommended that are in test: " + str(movies_test_count))

  movies_recommended_rating.show()
  jac_frame.show()

evaluate_users_ratings(467)


Tag List for selected Movie:Boot, Das (Boat, The) (1981)['submarine']
Tag List for selected Movie:Prince of Egypt, The (1998)['Moses', 'Bible']
Tag List for selected Movie:Star Trek: Insurrection (1998)['Star Trek', 'space opera', 'cameo:Whoopi Goldberg']
Tag List for selected Movie:Postman, The (Postino, Il) (1994)['writing']
Tag List for selected Movie:English Patient, The (1996)['adultery']
Tag List for selected Movie:Richard III (1995)['Shakespeare']
Tag List for selected Movie:Life Is Beautiful (La Vita è bella) (1997)['tearjerking', 'tear jerker', 'sentimental', 'poignant', 'Heartwarming', 'emotional', 'bittersweet', 'Holocaust']
Tag List for selected Movie:Shine (1996)['music']
Total Rated with tags: 78
Total movies recommended that are in test: 1
+-------+--------------------+-------------------+--------------------+------+------+-------+------+---------+
|movieId|            tag_list|             jacSim|               title|userId|userId|movieId|rating|timestamp|
+-------+----