### Spark HW2 Moive Recommendation
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

In [4]:
dbutils.library.installPyPI("mlflow")
dbutils.library.restartPython()
import mlflow

## Part1: Data ETL and Data Exploration

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [7]:
movies_df = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [8]:
movies_df.show(5)

In [9]:
ratings_df.show(5)

In [10]:
links_df.show(5)

In [11]:
tags_df.show(5)

In [12]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [13]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

## Part 1: Spark SQL and OLAP

In [15]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

### Q1: The number of Users

In [17]:
%sql 
select count( distinct userId  )
from ratings

count(DISTINCT userId)
610


### Q2: The number of Movies

In [19]:
%sql 
select count(distinct movieId)
from movies

count(DISTINCT movieId)
9742


### Q3:  How many movies are rated by users? List movies not rated before

In [21]:
%sql 
select count(*)
from movies 
where movieID in (
  select distinct movieID
  from ratings
)

count(1)
9724


In [22]:
%sql 
select *
from movies 
where movieID in (
  select distinct movieID
  from ratings)

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [23]:
%sql 
select *
from movies 
where movieID not in (
  select distinct movieID
  from ratings)

movieId,title,genres
1076,"Innocents, The (1961)",Drama|Horror|Thriller
2939,Niagara (1953),Drama|Thriller
3338,For All Mankind (1989),Documentary
3456,"Color of Paradise, The (Rang-e khoda) (1999)",Drama
4194,I Know Where I'm Going! (1945),Drama|Romance|War
5721,"Chosen, The (1981)",Drama
6668,"Road Home, The (Wo de fu qin mu qin) (1999)",Drama|Romance
6849,Scrooge (1970),Drama|Fantasy|Musical
7020,Proof (1991),Comedy|Drama|Romance
7792,"Parallax View, The (1974)",Thriller


In [24]:
%sql 
select count(*)
from movies 
where movieID not in (
  select distinct movieID
  from ratings)

count(1)
18


### Q4: List Movie Genres

In [26]:
%sql
select distinct genres
from movies


genres
Comedy|Horror|Thriller
Adventure|Sci-Fi|Thriller
Action|Adventure|Drama|Fantasy
Action|Drama|Horror
Action|Animation|Comedy|Sci-Fi
Animation|Children|Drama|Musical|Romance
Action|Adventure|Drama
Adventure|Sci-Fi
Documentary|Musical|IMAX
Adventure|Children|Fantasy|Sci-Fi|Thriller


## 方法一： 按照pandas dataframe来做getdummies split （先把spark.dataframe convert成pandas.dataframe， 再转化回来）
## 但这个问题除了麻烦以外，还把数据压得很平，把行变成了列。不是很好统计。

In [28]:
movies_pd=movies_df.toPandas()

In [29]:
genres_pd=movies_pd['genres'].str.get_dummies(sep='|')

In [30]:
import pandas as pd
movies_pd=pd.concat([movies_pd,genres_pd],axis=1)
movies_pd=movies_pd.drop(columns=['genres'])

In [31]:
movies_pd.head(3)

Unnamed: 0,movieId,title,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film-Noir,Horror,IMAX,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
0,1,Toy Story (1995),0,0,1,1,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0
1,2,Jumanji (1995),0,0,1,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0
2,3,Grumpier Old Men (1995),0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0


In [32]:
new_movies_df=spark.createDataFrame(movies_pd)

In [33]:
display(new_movies_df)

movieId,title,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film-Noir,Horror,IMAX,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
1,Toy Story (1995),0,0,1,1,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0
2,Jumanji (1995),0,0,1,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0
3,Grumpier Old Men (1995),0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0
4,Waiting to Exhale (1995),0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,1,0,0,0,0
5,Father of the Bride Part II (1995),0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0
6,Heat (1995),0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0
7,Sabrina (1995),0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0
8,Tom and Huck (1995),0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
9,Sudden Death (1995),0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
10,GoldenEye (1995),0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0


## 方法二：经过多方搜索，我发现sql中有一个cross apply的方法，可以把， a b，c这样的变成a b和a c两行。
## 然而遗憾的是spark里并没有这个function。后来老师说可以用explode function。看似可以达到我想要的结果。参考了下面的文章
https://sparkbyexamples.com/pyspark/pyspark-explode-array-and-map-columns-

In [35]:
spark.catalog.listTables()

In [36]:
from pyspark.sql.functions import explode, col

In [37]:
from pyspark.sql.functions import split
movies_df=movies_df.withColumn("genres",split(col("genres"),"\|"))
movies_df.show()

In [38]:
new_movie_df =movies_df.select(movies_df.movieId,movies_df.title,explode(movies_df.genres).alias("genres"))
new_movie_df.printSchema()
new_movie_df.show()

### Q5: Movie for Each Category

In [40]:
new_movie_df.registerTempTable("new_movies")

In [41]:
%sql
select genres, count(*) as number
from new_movies
group by genres
order by number desc

genres,number
Drama,4361
Comedy,3756
Thriller,1894
Action,1828
Romance,1596
Adventure,1263
Crime,1199
Sci-Fi,980
Horror,978
Fantasy,779


In [42]:
%sql 
select title
from new_movies
where genres in('Crime')

title
Heat (1995)
Casino (1995)
Money Train (1995)
Get Shorty (1995)
Copycat (1995)
Assassins (1995)
Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)
Dead Man Walking (1995)
Dead Presidents (1995)
"Usual Suspects, The (1995)"


## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [44]:
new_ratings_df= sc.textFile("/FileStore/tables/ratings.csv")

In [45]:
new_ratings_df.getNumPartitions() 

In [46]:
new_ratings_df.count()

In [47]:
movie_ratings=ratings_df.drop('timestamp')
movie_ratings.show()

In [48]:
movie_ratings.printSchema() 

In [49]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [50]:
movie_ratings.show()

### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [52]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.sql import Row

In [53]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [54]:
#Create ALS model
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, rank=5, regParam=0.09, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop" )
model = als.fit(training)

In [55]:
# #Fit ALS model to training data
# Define evaluator as RMSE
predictions = model.transform(training)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

In [56]:
# #Fit ALS model to testing data
# Define evaluator as RMSE
predictions_test = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions_test)
print("Root-mean-square error = " + str(rmse))

In [57]:
#Tune model using ParamGridBuilder
paramGrid = ParamGridBuilder()\
  .addGrid(als.rank,  [1,3] )\
  .addGrid(als.maxIter, [5,10] )\
  .addGrid(als.regParam,[ 0.05, .1] )\
  .build()

In [58]:
 # Build Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

# Confirm cv was built
print(cv)

In [59]:
#Extract best model from the tuning exercise using ParamGridBuilder
#Fit cross validator to the 'train' dataset
best_model =cv.fit(training)

In [60]:
best_model = best_model.bestModel

In [61]:
print(type(model))

### Model testing
And finally, make a prediction and check the testing error.

In [63]:
# #Fit ALS model to training data
# Define evaluator as RMSE
new_predictions = best_model.transform(training)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")


In [64]:
#Generate predictions and evaluate using RMSE
new_rmse = evaluator.evaluate(new_predictions)
print("Root-mean-square error = " + str(new_rmse))

In [65]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(new_rmse))
print ("**Best Model**")
print (" Rank:"+str(best_model.rank))
print (" MaxIter:"+str(best_model._java_obj.parent().getMaxIter()))
print (" RegParam:"+str(best_model._java_obj.parent().getRegParam()))
# Print best_model
print(type(best_model))

In [66]:
# #Fit best model to testing data
# Define evaluator as RMSE
predictions_test_bestModel = best_model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions_test_bestModel)
print("Root-mean-square error = " + str(rmse))

In [67]:
predictions_test_bestModel.show()

### Model apply and see the performance

In [69]:
full_ratings_df = spark.read.load("/FileStore/tables/ratings_full.csv", format='csv', header = True)

In [70]:
full_ratings_df.show()

In [71]:
all_ratings=full_ratings_df.drop('timestamp')

In [72]:
all_ratings.registerTempTable("all_ratings")

In [73]:
%sql
select count(*)
from all_ratings

count(1)
27753444


In [74]:
%sql
select * from all_ratings where movieId=463

userId,movieId,rating
114,463,4.0
758,463,3.0
1927,463,2.0
2481,463,3.0
2629,463,2.5
3246,463,3.0
3832,463,3.0
4796,463,2.0
5276,463,4.0
5323,463,3.0


In [75]:
from pyspark.sql.types import IntegerType, FloatType
all_ratings = all_ratings.withColumn("userId", all_ratings["userId"].cast(IntegerType()))
all_ratings = all_ratings.withColumn("movieId", all_ratings["movieId"].cast(IntegerType()))
all_ratings = all_ratings.withColumn("rating", all_ratings["rating"].cast(FloatType()))

In [76]:
# #Fit best model to full data
# Define evaluator as RMSE
alldata= best_model.transform(all_ratings)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(alldata)
print("Root-mean-square error = " + str(rmse))

In [77]:
alldata.registerTempTable("alldata")

In [78]:
alldata.show()

In [79]:
%sql
select count(*)
from alldata

count(1)
60774


In [80]:
%sql
select count(distinct movieId)
from all_ratings

count(DISTINCT movieId)
53889


In [81]:
%sql
select count(distinct movieId)
from alldata

count(DISTINCT movieId)
6198


In [82]:
%sql
select * from alldata where movieId=463

userId,movieId,rating,prediction


In [83]:
%sql select * from movies join alldata on movies.movieId=alldata.movieId

movieId,title,genres,userId,movieId.1,rating,prediction
148,"Awfully Big Adventure, An (1995)",Drama,165,148,3.0,2.5097415
471,"Hudsucker Proxy, The (1994)",Comedy,251,471,3.0,4.1424904
471,"Hudsucker Proxy, The (1994)",Comedy,593,471,3.0,3.4580667
471,"Hudsucker Proxy, The (1994)",Comedy,81,471,3.5,3.0215242
471,"Hudsucker Proxy, The (1994)",Comedy,332,471,2.0,3.3657987
471,"Hudsucker Proxy, The (1994)",Comedy,336,471,4.0,3.96483
471,"Hudsucker Proxy, The (1994)",Comedy,417,471,3.0,4.2116575
471,"Hudsucker Proxy, The (1994)",Comedy,360,471,5.0,5.2912974
471,"Hudsucker Proxy, The (1994)",Comedy,185,471,4.0,3.5891848
471,"Hudsucker Proxy, The (1994)",Comedy,429,471,4.0,4.2484083


## Recommend moive to users with id: 575 and 232. 
you can choose some users to recommend the moives

In [85]:
randomuser_recs=best_model.recommendForAllUsers(20).show(10)

In [86]:
 user_subset = alldata.where(alldata.userId.isin('575', '232'))

In [87]:
user_recs=best_model.recommendForUserSubset(user_subset,10)

In [88]:
user_recs.registerTempTable("user_recs")

In [89]:
user_recs.show()

In [90]:
user_recs.printSchema()

In [91]:
user_recs =user_recs.select(user_recs.userId,explode(user_recs.recommendations).alias("recommendations"))
user_recs.printSchema()
user_recs.show()

In [92]:
user_recs = user_recs.select(user_recs["userId"], user_recs["recommendations.movieId"], user_recs["recommendations.rating"])
user_recs.show()

In [93]:
user_recs.registerTempTable("user_recs")

In [94]:
%sql 
select userId, movies.movieId, title, rating from movies join user_recs on movies.movieId=user_recs.movieId

userId,movieId,title,rating
232,104875,"History of Future Folk, The (2012)",5.204411
232,3379,On the Beach (1959),4.951416
232,59018,"Visitor, The (2007)",4.9338446
232,60943,Frozen River (2008),4.9338446
232,77846,12 Angry Men (1997),4.9050493
232,93008,"Very Potter Sequel, A (2010)",4.9050493
232,25906,Mr. Skeffington (1944),4.9050493
232,67618,Strictly Sexual (2008),4.8008194
232,102217,Bill Hicks: Revelations (1993),4.7856746
232,33779,Eddie Izzard: Dress to Kill (1999),4.7856746


## Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [96]:
item_subset = alldata.where(alldata.movieId.isin(463, 471))
item_recs=best_model.recommendForItemSubset(item_subset, 100)
item_recs.show()

In [97]:
item_recs =item_recs.select(item_recs.movieId,explode(item_recs.recommendations).alias("recommendations"))
item_recs.printSchema()
item_recs.show()

In [98]:
item_recs = item_recs.select(item_recs["movieId"], item_recs["recommendations.userId"], item_recs["recommendations.rating"])
item_recs.show()

In [99]:
item_recs.registerTempTable("item_recs")

In [100]:
%sql 
select  movies.movieId, title, userId, rating from movies join item_recs on movies.movieId=item_recs.movieId

movieId,title,userId,rating
471,"Hudsucker Proxy, The (1994)",360,5.2912974
471,"Hudsucker Proxy, The (1994)",558,4.966321
471,"Hudsucker Proxy, The (1994)",99,4.794973
471,"Hudsucker Proxy, The (1994)",258,4.7661834
471,"Hudsucker Proxy, The (1994)",53,4.7304034
471,"Hudsucker Proxy, The (1994)",250,4.6426454
471,"Hudsucker Proxy, The (1994)",538,4.6003923
471,"Hudsucker Proxy, The (1994)",12,4.5880632
471,"Hudsucker Proxy, The (1994)",515,4.568069
471,"Hudsucker Proxy, The (1994)",188,4.5572786


## Write the report 
motivation
1. step1
2. step2
3. step3
4. step4  
output and conclusion

# 老师我主要的问题就是cmd 70-80这里的用完als模型以后数据丢失的问题