In [1]:
dbutils.widgets.removeAll()

In [2]:
%sql
describe ratings


col_name,data_type,comment
user_id,int,
movie_id,int,
rating,int,
timestamp,string,


In [3]:
%sql
select count(movie_id), count(user_id), count(rating)
from ratings 
where rating is NULL

count(movie_id),count(user_id),count(rating)
0,0,0


In [4]:
spark.sql("""
    select 
      movies.movie_id, movies.name, count(*) as  number_of_timesrated
    from 
      ratings
    join 
      movies on ratings.movie_id = movies.movie_id
    group by 
      movies.movie_id, movies.name
    order by 
      number_of_timesrated desc
    limit
      200
    """
).createOrReplaceTempView("most_rated")

In [5]:
%sql
select count(movie_id), rating 
from ratings
group by rating

count(movie_id),rating
56174,1
261197,3
226310,5
348971,4
107557,2


In [6]:
%sql
select count(movie_id) as number_of_movies 
from ratings
group by rating

number_of_movies
56174
261197
226310
348971
107557


In [7]:
# Sample of first 20 movies based on the number of times rated
most_rated_sample = spark.table("most_rated").rdd.takeSample(True, 10)
display(most_rated_sample)

movie_id,name,number_of_timesrated
1036,Die Hard (1988),1666
3176,"Talented Mr. Ripley, The (1999)",1331
1370,Die Hard 2 (1990),1135
1387,Jaws (1975),1697
111,Taxi Driver (1976),1240
2985,Robocop (1987),1229
2011,Back to the Future Part II (1989),1158
2628,Star Wars,2250
1544,Lost World,1255
593,"Silence of the Lambs, The (1991)",2578


In [8]:
for i in range(0, len(most_rated_sample)):
  dbutils.widgets.dropdown("movie_%i" % i, "5", ["1", "2", "3", "4", "5"], most_rated_sample[i].name)

In [9]:
# Dataframe creation based on my ratings

from datetime import datetime
from pyspark.sql import Row
ratingsList = []
for i in range(0, len(most_rated_sample)):
  ratingsList.append(
    Row(user_id = 0,
        movie_id = most_rated_sample[i].movie_id,
        rating = float(dbutils.widgets.get("movie_%i" %i)),
        number_of_timesrated=most_rated_sample[i].number_of_timesrated
        )
  )
myRatingsDataFr = spark.createDataFrame(ratingsList)

#Creating the dataframe called myRatingsTable with column ordered so that it matches our ratings
myRatingsTable = myRatingsDataFr.select("user_id", "movie_id", "rating", "number_of_timesrated")
myRatingsTable.createOrReplaceTempView("myRatingsTable")

In [10]:
display(ratingsList)

movie_id,number_of_timesrated,rating,user_id
1036,1666,4.0,0
3176,1331,3.0,0
1370,1135,2.0,0
1387,1697,2.0,0
111,1240,5.0,0
2985,1229,2.0,0
2011,1158,5.0,0
2628,2250,2.0,0
1544,1255,1.0,0
593,2578,3.0,0


In [11]:
display(myRatingsTable)

user_id,movie_id,rating,number_of_timesrated
0,1036,4.0,1666
0,3176,3.0,1331
0,1370,2.0,1135
0,1387,2.0,1697
0,111,5.0,1240
0,2985,2.0,1229
0,2011,5.0,1158
0,2628,2.0,2250
0,1544,1.0,1255
0,593,3.0,2578


In [12]:
%sql
select * from most_rated

movie_id,name,number_of_timesrated
2858,American Beauty (1999),3428
260,Star Wars,2991
1196,Star Wars,2990
1210,Star Wars,2883
480,Jurassic Park (1993),2672
2028,Saving Private Ryan (1998),2653
589,Terminator 2,2649
2571,"Matrix, The (1999)",2590
1270,Back to the Future (1985),2583
593,"Silence of the Lambs, The (1991)",2578


In [13]:
%sql
-- Display ratings
select f.movie_id, m.name, f.rating from myRatingsTable f inner join most_rated m on m.movie_id = f.movie_id

movie_id,name,rating
1036,Die Hard (1988),4.0
3176,"Talented Mr. Ripley, The (1999)",3.0
1370,Die Hard 2 (1990),2.0
1387,Jaws (1975),2.0
111,Taxi Driver (1976),5.0
2985,Robocop (1987),2.0
2011,Back to the Future Part II (1989),5.0
2628,Star Wars,2.0
1544,Lost World,1.0
593,"Silence of the Lambs, The (1991)",3.0


In [14]:
display(ratingsList)

movie_id,number_of_timesrated,rating,user_id
1036,1666,4.0,0
3176,1331,3.0,0
1370,1135,2.0,0
1387,1697,2.0,0
111,1240,5.0,0
2985,1229,2.0,0
2011,1158,5.0,0
2628,2250,2.0,0
1544,1255,1.0,0
593,2578,3.0,0


In [15]:
from pyspark.sql import functions

ratingsList = spark.table("ratings")

# Split our data for our training and test datasets
(training, test) = ratingsList.randomSplit([0.8, 0.2])


In [16]:
display(training)

user_id,movie_id,rating,timestamp
1,1,5,978824268
1,48,5,978824351
1,150,5,978301777
1,260,4,978300760
1,527,5,978824195
1,531,4,978302149
1,588,4,978824268
1,594,4,978302268
1,661,3,978302109
1,720,3,978300760


In [17]:
display(myRatingsTable)

user_id,movie_id,rating,number_of_timesrated
0,1036,4.0,1666
0,3176,3.0,1331
0,1370,2.0,1135
0,1387,2.0,1697
0,111,5.0,1240
0,2985,2.0,1229
0,2011,5.0,1158
0,2628,2.0,2250
0,1544,1.0,1255
0,593,3.0,2578


In [18]:
from pyspark.ml.recommendation import ALS

ranks=[4, 8, 12]
rmse=[0,0,0]
r=0

# Running Alternating least square(ALS) collaborative filtering
for rank in ranks:
  als = ALS(maxIter=5, regParam=0.01, rank=rank, userCol="user_id", itemCol="movie_id", ratingCol="rating")

  # Running training model which includes the ratings provided by us
  model = als.fit(training.unionAll(myRatingsTable))

  Predict_Test = model.transform(test).dropna()
  Predict_Test.createOrReplaceTempView("Predict_Test")
#   display(Predict_Test)
  
  # Evaluating the model
  from pyspark.ml.evaluation import RegressionEvaluator
  evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
  rmse_train = evaluator.evaluate(Predict_Test)
  rmse[r] = rmse_train
  r+=1

  # Display the root mean square error
  displayHTML("<span style='font-size:12pt;color:Red'>For rank %s RMSE</span>" %str(rank))
  displayHTML("<span style='font-size:12pt;color:Red'>RMSE is %s</span>" %str(rmse))

In [19]:
%sql
select * from ratings 

user_id,movie_id,rating,timestamp
1,1193,5,978300760
1,661,3,978302109
1,914,3,978301968
1,3408,4,978300275
1,2355,5,978824291
1,1197,3,978302268
1,1287,5,978302039
1,2804,5,978300719
1,594,4,978302268
1,919,4,978301368


In [20]:
display(myRatingsDataFr)

movie_id,number_of_timesrated,rating,user_id
1036,1666,4.0,0
3176,1331,3.0,0
1370,1135,2.0,0
1387,1697,2.0,0
111,1240,5.0,0
2985,1229,2.0,0
2011,1158,5.0,0
2628,2250,2.0,0
1544,1255,1.0,0
593,2578,3.0,0


In [21]:
myRatingsTable = myRatingsDataFr.select("user_id", "movie_id", "rating")

# Run the same ALS model on my personalized Ratings
PersonalizedMovies = model.transform(myRatingsTable).dropna()
PersonalizedMovies.createOrReplaceTempView("PersonalizedMovies")

# Displaying ALS predicted ratings
display(PersonalizedMovies)

user_id,movie_id,rating,prediction
0,593,3.0,3.67056
0,111,5.0,4.295476
0,2628,2.0,2.2084453
0,2985,2.0,2.6981692
0,1036,4.0,3.3689387
0,1544,1.0,0.787525
0,1370,2.0,2.3573844
0,2011,5.0,4.19901
0,3176,3.0,2.921299
0,1387,2.0,2.2207465


In [22]:
# Evaluate model for my personalized ratings movies
rmse_test = evaluator.evaluate(PersonalizedMovies)

# Displaying the RMSE
displayHTML("<span style='font-size:12pt;color:Red'>The RMSE is %s</span>" % str(rmse_test))

In [23]:
# Take the list of the movies you already rated (myRatings), the list of most_rated_movies (removing the ones you already rated)
# and assign them to yourself (user_id = 0) 
sqlQuery = """
select null as user_id, null as movie_id, null as rating union all 
select user_id, movie_id, rating from myRatingsTable union all
select cast(0 as bigint) as user_id, movie_id, cast(0 as float) as rating from most_rated where movie_id not in (select movie_id from myRatingsTable)
"""
n_most_rated_movies = spark.sql(sqlQuery)
n_most_rated_movies.createOrReplaceTempView("n_most_rated_movies")

# Applying and then removing NULL to ensure original ratings and most_rated_movies_0 schema match
most_rated_movies_by_user_0 = spark.sql("select * from n_most_rated_movies where user_id = 0")

# Re-apply our ALS model for all movies
movies_predicted_for_user_0 = model.transform(most_rated_movies_by_user_0).dropna()
movies_predicted_for_user_0.createOrReplaceTempView("movies_predicted_for_user_0")
display(movies_predicted_for_user_0)

user_id,movie_id,rating,prediction
0,1580,0.0,2.55616
0,3175,0.0,3.7206476
0,858,0.0,4.3471665
0,1127,0.0,4.169235
0,1721,0.0,1.7524321
0,1270,0.0,3.9859564
0,1265,0.0,4.2741923
0,588,0.0,4.203801
0,296,0.0,4.486774
0,2396,0.0,3.9354858


In [24]:
%sql
-- Show Your Top 10 movies
select m.name
  from movies_predicted_for_user_0 f
    inner join most_rated m
      on m.movie_id = f.movie_id
  order by f.prediction desc
  limit 10

name
Magnolia (1999)
Gattaca (1997)
Twelve Monkeys (1995)
Boogie Nights (1997)
High Fidelity (2000)
Being John Malkovich (1999)
Rear Window (1954)
Chinatown (1974)
"Breakfast Club, The (1985)"
North by Northwest (1959)
