In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-1.8.0-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [48]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F

import pandas as pd


import requests
import os
import json
import textwrap

import PIL
import urllib
from getpass import getpass
import random

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [34]:
# initializing SparkSession

spark = SparkSession.builder.appName('MovieLensRecommender').getOrCreate()

#apikey
api_key = getpass('tmbd api key :')

tmbd api key :··········


In [None]:
# download MovieLens dataset

!wget https://files.grouplens.org/datasets/movielens/ml-latest-small.zip --no-check-certificate
!unzip /content/ml-latest-small.zip
!ls

--2021-05-17 14:05:38--  https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected.
  Unable to locally verify the issuer's authority.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip’


2021-05-17 14:05:39 (4.64 MB/s) - ‘ml-latest-small.zip’ saved [978202/978202]

Archive:  /content/ml-latest-small.zip
   creating: ml-latest-small/
  inflating: ml-latest-small/links.csv  
  inflating: ml-latest-small/tags.csv  
  inflating: ml-latest-small/ratings.csv  
  inflating: ml-latest-small/README.txt  
  inflating: ml-latest-small/movies.csv  
ml-latest-small      sample_data		spark-2.4.7-bin-hadoop2.7.tgz
ml-latest-small.zip  spark-2.4.7-bin-hadoop2.7


In [None]:
def load_data():
    
    ratings = spark.read.csv('/content/ml-latest-small/ratings.csv',header=True)
    movies = spark.read.csv('/content/ml-latest-small/movies.csv',header=True)
    links = spark.read.csv('/content/ml-latest-small/links.csv',header=True)
    tags = spark.read.csv('/content/ml-latest-small/tags.csv',header=True)
    
    df = ratings.join(movies,on='movieId',how='left')
    df = df.join(links,on='movieId',how='left')
    df = df.join(tags.select('movieId','tag'), on='movieId',how='left')
    
    
    return df

In [None]:
load_data().show()

+-------+------+------+---------+--------------------+--------------------+-------+------+---------------+
|movieId|userId|rating|timestamp|               title|              genres| imdbId|tmdbId|            tag|
+-------+------+------+---------+--------------------+--------------------+-------+------+---------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|0114709|   862|            fun|
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|0114709|   862|          pixar|
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|0114709|   862|          pixar|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|0113228| 15602|            old|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|0113228| 15602|          moldy|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|0113277|   949|           null|
|     47|     1|   5.0|964983815|Seve

In [None]:
def get_sparsity(data):
    """
    Returns sparsity of dataframe  
    
    (1.0- (# of non-null ratings * 1.0)     /     (# of unique movies * # of uniqe users)) * 100
    """
    return (1.0 -((data.select('rating').count() *1.0)/(data.select('movieId').distinct().count()*data.select('userId').distinct().count())))* 100

get_sparsity(load_data())

95.18241160960544

In [None]:
def top_rated(data):
    """
    returns a dataframe of tmdbIds with more than 1500 ratings
    """
    df = data.groupBy('tmdbId').count().sort('count',ascending=False).filter(col('count')>1500)
    
    return df


In [None]:
def als_recommendation(data):
    """
    returns prediction
    """    
    # changing Dtype of 'userID','movieId', and 'rating'
    # data = data.select(data.userId.cast("integer"), data.movieId.cast("integer"), data.rating.cast("double"))
    # split dat into 70/30
    (train, test) = data.randomSplit([0.7,0.3], seed = 9)
    
    #create ALS model
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False,coldStartStrategy='drop')
    
    # model = als.fit(train)
    # pred = model.transform(test)
    
    return als


In [None]:
def searchmovie(title,year,api = api_key):
    """
    Takes 2 arguements(title of movie, year of release) will return a tuple
    of the Title of Movie and its TMDB ID (IF FOUND) 
    """
    params = {'api_key' : api,
             'query' :title,
             'year':year}
    response = requests.get(f'https://api.themoviedb.org/3/search/movie', params = params)
    if response.ok:
        try:
            content = json.loads(response.content)
            movie_title = content['results'][0]['title']
            movie_id = content['results'][0]['id']
            # print(f'Found {movie_title} with ID {movie_id}')
            print(content)
            return movie_id,movie_title
        except:
            return None
    else:
        print("Bad Response")
        print(response.content)
        return None

In [None]:
def parse_movie_details(movie):
    """
    parsing the returned json
    """
    movie_data = {}
    movie_data['tmdb_id'] = movie['id']
    movie_data['title'] = movie['title'] if 'title' in movie.keys() else None
    movie_data['budget'] = movie['budget'] if 'budget' in movie.keys() else None
    movie_data['revenue'] = movie['revenue'] if 'revenue' in movie.keys() else None
    movie_data['popularity'] = movie['popularity'] if 'popularity' in movie.keys() else None
    movie_data['imdb_id'] = movie['imdb_id'] if 'imdb_id' in movie.keys() else None
    movie_data['release_date'] = movie['release_date'] if 'release_date' in movie.keys() else None
    movie_data['poster_path'] = movie['poster_path'] if 'poster_path' in movie.keys() else None

    movie_data['production'] = parse_production(movie['production_companies']) if 'production_companies' in movie.keys() else None
    
    movie_data['runtime'] = movie['runtime'] if 'runtime' in movie.keys() else None
    movie_data['genres'] = parse_genres(movie['genres']) if 'genres' in movie.keys() else ''
    movie_data['dom_gross'] = movie['revenue'] if 'revenue' in movie.keys() else None
    return movie_data

def parse_genres(genre_data):
    genre_names = []
    for item in genre_data:
        if 'name' in item.keys():
            genre_names.append(item['name'])
    return ','.join(genre_names)
#     return genre_names
    
    
def parse_production(production_data):
    production = []
    for item in production_data:
        if 'name' in item.keys():
            production.append(item['name'])
    return ','.join(production)
#     return production

In [None]:
def get_movie_details(movieID,api = api_key):
    """
    When passed a movieID, if valid, makes a request to the API and returns a detailed info on that movie
    """
    response = requests.get(f'https://api.themoviedb.org/3/movie/{movieID}?api_key={api}&language=en-US')
    if response.ok:
        try:
            content = json.loads(response.content)
            return json.loads(response.content)
        except:
            return None
    else:
        return None

In [41]:
get_movie_details(111)

{'adult': False,
 'backdrop_path': '/cCvp5Sni75agCtyJkNOMapORUQV.jpg',
 'belongs_to_collection': None,
 'budget': 25000000,
 'genres': [{'id': 28, 'name': 'Action'},
  {'id': 80, 'name': 'Crime'},
  {'id': 18, 'name': 'Drama'},
  {'id': 53, 'name': 'Thriller'}],
 'homepage': '',
 'id': 111,
 'imdb_id': 'tt0086250',
 'original_language': 'en',
 'original_title': 'Scarface',
 'overview': 'After getting a green card in exchange for assassinating a Cuban government official, Tony Montana stakes a claim on the drug trade in Miami. Viciously murdering anyone who stands in his way, Tony eventually becomes the biggest drug lord in the state, controlling nearly all the cocaine that comes through Miami. But increased pressure from the police, wars with Colombian drug cartels and his own drug-fueled paranoia serve to fuel the flames of his eventual downfall.',
 'popularity': 31.224,
 'poster_path': '/iQ5ztdjvteGeboxtmRdXEChJOHh.jpg',
 'production_companies': [{'id': 33,
   'logo_path': '/8lvHyhjr

In [None]:
# def poster(tmdb_id):
#     """
#     returns poster

#     """
#     movie = get_movie_details(tmdb_id)
#     link = movie['poster_path'] if 'poster_path' in movie.keys() else None
    
    
#     urllib.request.urlretrieve(f'https://image.tmdb.org/t/p/original/{link}','poster.jpg')
#     img = PIL.Image.open('poster.jpg')
#     return img.show()

In [None]:
def poster(tmdb_id):
    """
    returns poster

    """
    movie = get_movie_details(tmdb_id)
    link = movie['poster_path'] if 'poster_path' in movie.keys() else None
    
    print(f'https://image.tmdb.org/t/p/original/{link}')

In [42]:
poster(111)

https://image.tmdb.org/t/p/original//iQ5ztdjvteGeboxtmRdXEChJOHh.jpg


In [None]:
def random_pick(data):
    # list of tmdbIds with more than 1500 ratings
    toprated_tmdbIds = [int(i.tmdbId) for i in top_rated(data).collect()]
    
    # randomly pick 5 tmdbIds
    return(random.choice(toprated_tmdbIds))

In [37]:
def main():
    #Creating a new userId
    new_user_ID = 0

    # # load data
    df = load_data()

    # remove titles with <25 reviews
    df = df.withColumn("count", F.count("title").over(W.partitionBy('title')))\
    .filter(F.col("count") > 25).drop("count")

    #######Cold-Start User Survey###################################
    lst = [] 
    past_id = []
    while len(lst) < 5 :
        random_id = random_pick(df)
        if random_id in past_id:
            continue
        else:
            print(f"Title : {get_movie_details(random_id)['title']}")
            print(f"Genre : {df.filter(col('tmdbId')== random_id).select('genres').collect()[0][0]}")
            poster(random_id)
            print('\nSummary : ')
            print("\n".join(textwrap.wrap(get_movie_details(random_id)['overview'],100)))
            rating = float(input('Rate this movie 1 to 5: '))
            print('\n')
            if (rating in range(0,6) ):    
                lst.append((new_user_ID,random_id,rating))
                past_id.append(random_id)
            else:
                continue
    #######Cold-Start User Survey###################################
    print(lst)
    # lst = [(0, 27205, 4.0), (0, 13, 3.0), (0, 11, 2.0), (0, 19995, 5.0), (0, 680, 4.0)]

    lst_rdd = spark.sparkContext.parallelize(lst)
    lst_rdd = spark.createDataFrame(lst,['userId','movieId','rating'])
    
    df = df.select(df.userId.cast("integer"), df.movieId.cast("integer"), df.rating.cast("double"))
    
    df = df.union(lst_rdd)

    
    # # Create test and train set
    (train, test) = df.randomSplit([0.8, 0.2])
    
    # # Create ALS model
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False,coldStartStrategy='drop')

    model = als.fit(train)
    
    pred = model.transform(test)
    evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol= 'prediction')
    rmse = evaluator.evaluate (pred)
    print(rmse)
    
    # Generate 5 movie recommendations for each user
    userRecs = model.recommendForAllUsers(10)
    
    # Creating a temp table to store recommendation
    userRecs.registerTempTable('temp_table')
    
    rec_table = spark.sql('SELECT userId,\
                            movieIds_and_ratings.movieId AS movieId, \
                            movieIds_and_ratings.rating AS prediction \
                    FROM temp_table\
                    LATERAL VIEW explode(recommendations) exploded_table AS movieIds_and_ratings')
    
    rec_table.filter(col('userId') == 0).show()

    list_of_recc_movies = rec_table.filter(col('userId') == 0).select('movieId').collect()
    
    for i in list_of_recc_movies:
      try: 

        print(get_movie_details(i[0])['title'])
        print(f"Title : {get_movie_details(i[0])['title']}")
        poster(i[0])
        print('\nSummary : ')
        print("\n".join(textwrap.wrap(get_movie_details(i[0])['overview'],100)))
        print("\n")
        print("--------------------------------------------------------------------------------------------------")

      except:
        print(load_data().where(col('movieId') == i[0]).select('title').distinct().collect()[0].asDict()['title'])
        print("---------------------------------------------------------------------------------------------------")

In [38]:
main()

Title : Avatar
Genre : Action|Adventure|Sci-Fi|IMAX
https://image.tmdb.org/t/p/original//6EiRUJpuoeQPghrs3YNktfnqOVh.jpg

Summary : 
In the 22nd century, a paraplegic Marine is dispatched to the moon Pandora on a unique mission, but
becomes torn between following orders and protecting an alien civilization.
Rate this movie 1 to 5: 4


Title : Braveheart
Genre : Action|Drama|War
https://image.tmdb.org/t/p/original//or1gBugydmjToAEq7OZY0owwFk.jpg

Summary : 
Enraged at the slaughter of Murron, his new bride and childhood love, Scottish warrior William
Wallace slays a platoon of the local English lord's soldiers. This leads the village to revolt and,
eventually, the entire country to rise up against English rule.
Rate this movie 1 to 5: 5


Title : Forrest Gump
Genre : Comedy|Drama|Romance|War
https://image.tmdb.org/t/p/original//h5J4W4veyxMXDMjeNxZI46TsHOb.jpg

Summary : 
A man with a low IQ has accomplished great things in his life and been present during significant
historic events—in 

In [49]:
def main():
    #Creating a new userId
    new_user_ID = 0

    # # load data
    df = load_data()

    # remove titles with <25 reviews
    df = df.withColumn("count", F.count("title").over(W.partitionBy('title')))\
    .filter(F.col("count") > 25).drop("count")

    #######Cold-Start User Survey###################################
    # lst = [] 
    # past_id = []
    # while len(lst) < 5 :
    #     random_id = random_pick(df)
    #     if random_id in past_id:
    #         continue
    #     else:
    #         print(f"Title : {get_movie_details(random_id)['title']}")
    #         print(f"Genre : {df.filter(col('tmdbId')== random_id).select('genres').collect()[0][0]}")
    #         poster(random_id)
    #         print('\nSummary : ')
    #         print("\n".join(textwrap.wrap(get_movie_details(random_id)['overview'],100)))
    #         rating = float(input('Rate this movie 1 to 5: '))
    #         print('\n')
    #         if (rating in range(0,6) ):    
    #             lst.append((new_user_ID,random_id,rating))
    #             past_id.append(random_id)
    #         else:
    #             continue
    #######Cold-Start User Survey###################################

    lst = [(0, 27205, 4.0), (0, 13, 3.0), (0, 11, 2.0), (0, 19995, 5.0), (0, 680, 4.0)]

    lst_rdd = spark.sparkContext.parallelize(lst)
    lst_rdd = spark.createDataFrame(lst,['userId','movieId','rating'])
    
    df = df.select(df.userId.cast("integer"), df.movieId.cast("integer"), df.rating.cast("double"))
    
    df = df.union(lst_rdd)

    
    # # Create test and train set
    (train, test) = df.randomSplit([0.8, 0.2], seed = 1234)
    
    # # Create ALS model
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False,coldStartStrategy='drop')



    #Tune model using ParamGrid
    param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50]) \
            .addGrid(als.regParam, [.01, .05]) \
            .build()
    evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
    # print ("Num models to be tested: ", len(param_grid)) #16

    # Build cross validation using CrossValidator
    cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

    # print("**Best Model**")# Print "Rank"
    # print("  Rank:", best_model._java_obj.parent().getRank())# Print "MaxIter"
    # print("  MaxIter:", best_model._java_obj.parent().getMaxIter())# Print "RegParam"
    # print("  RegParam:", best_model._java_obj.parent().getRegParam())

    model = cv.fit(train)
    best_model = model.best_model
    test_preditction = best_model.transform(test)

    # model = als.fit(train)
    
    # # pred = model.transform(test)
    
    # # Generate 5 movie recommendations for each user
    userRecs = best_model.recommendForAllUsers(10)
    
    # Creating a temp table to store recommendation
    userRecs.registerTempTable('temp_table')
    
    rec_table = spark.sql('SELECT userId,\
                            movieIds_and_ratings.movieId AS movieId, \
                            movieIds_and_ratings.rating AS prediction \
                    FROM temp_table\
                    LATERAL VIEW explode(recommendations) exploded_table AS movieIds_and_ratings')
    
    rec_table.filter(col('userId') == 0).show()

    list_of_recc_movies = rec_table.filter(col('userId') == 0).select('movieId').collect()
    
    for i in list_of_recc_movies:
      try: 

        print(get_movie_details(i[0])['title'])
        print(f"Title : {get_movie_details(i[0])['title']}")
        poster(i[0])
        print('\nSummary : ')
        print("\n".join(textwrap.wrap(get_movie_details(i[0])['overview'],100)))
        print("\n")
        print("--------------------------------------------------------------------------------------------------")

      except:
        print(load_data().where(col('movieId') == i[0]).select('title').distinct().collect()[0].asDict()['title'])