### Spark Setup 

In [None]:
# Set up Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.mirrors.hoobly.com/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install py4j

!export JAVA_HOME=$(/usr/lib/jvm/java-8-openjdk-amd64 -v 1.8)
! echo $JAVA_HOME
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"
import findspark
findspark.init("spark-3.0.0-bin-hadoop2.7")# SPARK_HOME

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

/bin/bash: /usr/lib/jvm/java-8-openjdk-amd64: Is a directory



### 新段落

In [None]:
!ls

links.csv    README.txt			spark-3.0.0-bin-hadoop2.7.tgz
movies.csv   sample_data		tags.csv
ratings.csv  spark-3.0.0-bin-hadoop2.7


In [None]:
spark.version

'3.0.0'

### Part1: Data ETL and Data Exploration
Read data from your machine

In [None]:
# from google.colab import files

# uploaded = files.upload()

# for fn in uploaded.keys():
#   print('User uploaded file "{name}" with length {length} bytes'.format(
#       name=fn, length=len(uploaded[fn])))

In [None]:
!ls

links.csv    README.txt			spark-3.0.0-bin-hadoop2.7.tgz
movies.csv   sample_data		tags.csv
ratings.csv  spark-3.0.0-bin-hadoop2.7


In [None]:
import os 
os.listdir('./')

['.config',
 'README.txt',
 'tags.csv',
 '.ipynb_checkpoints',
 'ratings.csv',
 'movies.csv',
 'links.csv',
 'spark-3.0.0-bin-hadoop2.7.tgz',
 'spark-3.0.0-bin-hadoop2.7',
 'sample_data']

### Spark read data from drive

In [None]:
movies_df = spark.read.load("movies.csv", format='csv', header = True)
ratings_df = spark.read.load("ratings.csv", format='csv', header = True)
links_df = spark.read.load("links.csv", format='csv', header = True)
tags_df = spark.read.load("tags.csv", format='csv', header = True)

In [None]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [None]:
links_df.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



In [None]:
tags_df.show(5)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
|     2|  89774|   Boxing story|1445715207|
|     2|  89774|            MMA|1445715200|
+------+-------+---------------+----------+
only showing top 5 rows



Check null in each table

In [None]:
print("If there is missing data in each table?")
print('movie_df: {}'.format(movies_df.count() == movies_df.na.drop().count()))
print('ratings_df: {}'.format(ratings_df.count() == ratings_df.na.drop().count()))
print('links_df: {}'.format(links_df.count() == links_df.na.drop().count()))
print('tags_df: {}'.format(tags_df.count() == tags_df.na.drop().count()))

If there is missing data in each table?
movie_df: True
ratings_df: True
links_df: False
tags_df: True


In [None]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [None]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

3446 out of 9724 movies are rated by only one user


### Part1: Spark SQL and OLAP

In [None]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

Q1: The number of Users


In [None]:
num_of_users = spark.sql("Select count(distinct userId) as Number_of_users from ratings")
num_of_users.show()

+---------------+
|Number_of_users|
+---------------+
|            610|
+---------------+



Q2: The number of Movies

In [None]:
num_of_movies = spark.sql("Select count(distinct movieId) as Number_of_movies from movies")
num_of_movies.show()

+----------------+
|Number_of_movies|
+----------------+
|            9742|
+----------------+



Q3: How many movies are rated by users? List movies not rated before

In [None]:
# List movies not rated before
not_rated_movies = spark.sql("Select title, genres from movies where movieId not in (Select movieId from ratings)")
not_rated_movies.show()

+--------------------+--------------------+
|               title|              genres|
+--------------------+--------------------+
|Innocents, The (1...|Drama|Horror|Thri...|
|      Niagara (1953)|      Drama|Thriller|
|For All Mankind (...|         Documentary|
|Color of Paradise...|               Drama|
|I Know Where I'm ...|   Drama|Romance|War|
|  Chosen, The (1981)|               Drama|
|Road Home, The (W...|       Drama|Romance|
|      Scrooge (1970)|Drama|Fantasy|Mus...|
|        Proof (1991)|Comedy|Drama|Romance|
|Parallax View, Th...|            Thriller|
|This Gun for Hire...|Crime|Film-Noir|T...|
|Roaring Twenties,...|Crime|Drama|Thriller|
|Mutiny on the Bou...|Adventure|Drama|R...|
|In the Realms of ...|Animation|Documen...|
|Twentieth Century...|              Comedy|
|Call Northside 77...|Crime|Drama|Film-...|
|Browning Version,...|               Drama|
|  Chalet Girl (2011)|      Comedy|Romance|
+--------------------+--------------------+



In [None]:
# The number of movies are rated by users 
num_of_rated_movies = spark.sql("Select count(distinct movieId) as Number_movies_are_rated_by_users from ratings")
num_of_rated_movies.show()

+--------------------------------+
|Number_movies_are_rated_by_users|
+--------------------------------+
|                            9724|
+--------------------------------+



Q4: List Movie Genres

In [None]:
movie_genres = spark.sql("Select distinct genres from movies")
movie_genres.show()

+--------------------+
|              genres|
+--------------------+
|Comedy|Horror|Thr...|
|Adventure|Sci-Fi|...|
|Action|Adventure|...|
| Action|Drama|Horror|
|Action|Animation|...|
|Animation|Childre...|
|Action|Adventure|...|
|    Adventure|Sci-Fi|
|Documentary|Music...|
|Adventure|Childre...|
| Adventure|Animation|
| Musical|Romance|War|
|Action|Adventure|...|
|Adventure|Childre...|
|Comedy|Crime|Horr...|
|Crime|Drama|Fanta...|
|Comedy|Mystery|Th...|
|   Adventure|Fantasy|
|Horror|Romance|Sc...|
|Drama|Film-Noir|R...|
+--------------------+
only showing top 20 rows



In [None]:
movie_genres = spark.sql("Select distinct Category from movies \
lateral view explode(split(genres,'[|]')) as Category order by Category")
movie_genres.show()

+------------------+
|          Category|
+------------------+
|(no genres listed)|
|            Action|
|         Adventure|
|         Animation|
|          Children|
|            Comedy|
|             Crime|
|       Documentary|
|             Drama|
|           Fantasy|
|         Film-Noir|
|            Horror|
|              IMAX|
|           Musical|
|           Mystery|
|           Romance|
|            Sci-Fi|
|          Thriller|
|               War|
|           Western|
+------------------+



Q5: Movie for Each Category

In [None]:
movie_category = spark.sql("Select Category, count(movieId) as number from movies \
lateral view explode(split(genres,'[|]')) as Category group by Category order by number desc")
movie_category.show()

+------------------+------+
|          Category|number|
+------------------+------+
|             Drama|  4361|
|            Comedy|  3756|
|          Thriller|  1894|
|            Action|  1828|
|           Romance|  1596|
|         Adventure|  1263|
|             Crime|  1199|
|            Sci-Fi|   980|
|            Horror|   978|
|           Fantasy|   779|
|          Children|   664|
|         Animation|   611|
|           Mystery|   573|
|       Documentary|   440|
|               War|   382|
|           Musical|   334|
|           Western|   167|
|              IMAX|   158|
|         Film-Noir|    87|
|(no genres listed)|    34|
+------------------+------+



In [None]:
movie_category = spark.sql("Select Category, concat_ws(',', collect_set(t.title)) as list_of_movies from \
  (Select Category, title from movies lateral view explode(split(genres, '[|]')) as Category \
  group by Category, title) as t group by t.Category")

movie_category.show()

+------------------+--------------------+
|          Category|      list_of_movies|
+------------------+--------------------+
|             Crime|Stealing Rembrand...|
|           Romance|Vampire in Brookl...|
|          Thriller|Element of Crime,...|
|         Adventure|Ice Age: Collisio...|
|             Drama|Airport '77 (1977...|
|               War|General, The (192...|
|       Documentary|Jim & Andy: The G...|
|           Fantasy|Masters of the Un...|
|           Mystery|Before and After ...|
|           Musical|U2: Rattle and Hu...|
|         Animation|Ice Age: Collisio...|
|         Film-Noir|Rififi (Du rififi...|
|(no genres listed)|T2 3-D: Battle Ac...|
|              IMAX|Harry Potter and ...|
|            Horror|Tormented (1960),...|
|           Western|Man Who Shot Libe...|
|            Comedy|Hysteria (2011),H...|
|          Children|Ice Age: Collisio...|
|            Action|Stealing Rembrand...|
|            Sci-Fi|Push (2009),SORI:...|
+------------------+--------------

Part 2: Spark ALS based approach for training model 

We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using sc.textFile and then convert it to the form of (user, item, rating) tuples.

In [None]:
ratings_df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [None]:
movie_ratings = ratings_df.drop('timestamp')

In [None]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings ["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings ["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings ["rating"].cast(FloatType()))
movie_ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
|     1|    163|   5.0|
|     1|    216|   5.0|
|     1|    223|   3.0|
|     1|    231|   5.0|
|     1|    235|   4.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    349|   4.0|
+------+-------+------+
only showing top 20 rows



ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [None]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
# Create test and train set
(training, test) = movie_ratings.randomSplit([0.8,0.2], seed = 1)

In [None]:
#Create ALS model
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [None]:
# Tune model using ParamGridBuilder
paramGrid = ParamGridBuilder()\
            .addGrid(als.regParam, [0.1, 0.01, 0.001])\
            .addGrid(als.maxIter, [5])\
            .addGrid(als.rank, [5, 10, 15])\
            .build()

In [None]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")


In [None]:
# Build Cross validation
crossval = CrossValidator(estimator = als, 
                          estimatorParamMaps = paramGrid,
                          evaluator = evaluator,
                          numFolds = 5)

In [None]:
# Fit ALS model to trainning data
model = als.fit(training)

In [None]:
# Extract best model from the tunning exercise using ParamGridBuilder
cvModel = crossval.fit(training)
predictions = cvModel.transform(training)
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.6535070060296998


Model testing

And finally, make a prediction and check the testing error.

In [None]:
# Generate predictions and evaluate using RMSE
best_model = cvModel.bestModel
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [None]:
# Print evaluation metrics and model parameters
print ("RMSE = "+ str(rmse))
print ("**Best Model**")
print (" Rank:"+ str(best_model._java_obj.parent().getRank())), 
print (" MaxIter:" + str(best_model._java_obj.parent().getMaxIter())), 
print (" RegParam:" + str(best_model._java_obj.parent().getRegParam()))

RMSE = 0.8768334444841349
**Best Model**
 Rank:5
 MaxIter:5
 RegParam:0.1


In [None]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|    91|    471|   1.0| 2.5507083|
|   462|    471|   2.5|  3.108667|
|   520|    471|   5.0|  3.633205|
|   411|    471|   4.0|   3.15352|
|    32|    471|   3.0|  3.457068|
|   469|    471|   5.0|   3.59625|
|   104|    471|   4.5| 3.5028918|
|   169|   1088|   4.5|  4.432019|
|   387|   1088|   1.5|  2.659665|
|   381|   1088|   3.5| 3.7212386|
|    84|   1088|   3.0|   3.46345|
|    51|   1088|   4.0| 3.5365894|
|   391|   1088|   1.0|  2.795803|
|   188|   1088|   4.0| 3.8472464|
|   600|   1088|   3.5| 2.5938923|
|    42|   1088|   3.0|  3.699719|
|   325|   1238|   4.0| 4.6691504|
|   425|   1342|   3.5|  2.305586|
|   600|   1342|   2.5| 2.4082193|
|    34|   1580|   2.5| 3.6282887|
+------+-------+------+----------+
only showing top 20 rows



Model apply and see the performance

In [None]:
alldata = best_model.transform(movie_ratings)
print(alldata)
rmse = evaluator.evaluate(alldata)
print("RMSE = " + str(rmse))

DataFrame[userId: int, movieId: int, rating: float, prediction: float]
RMSE = 0.702660546619583


In [None]:
alldata.registerTempTable("alldata")

In [None]:
output = spark.sql("Select * from alldata")
output.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   191|    148|   5.0|  4.981635|
|   133|    471|   4.0| 3.3381379|
|   597|    471|   2.0| 3.7802052|
|   385|    471|   4.0| 3.2818017|
|   436|    471|   3.0|  3.522771|
|   602|    471|   4.0| 3.8617244|
|    91|    471|   1.0| 2.5507083|
|   409|    471|   3.0|  4.060608|
|   372|    471|   3.0| 2.5710945|
|   599|    471|   2.5| 2.6305625|
|   603|    471|   4.0| 3.4769447|
|   182|    471|   4.5|  3.918007|
|   218|    471|   4.0| 3.3713999|
|   474|    471|   3.0| 3.7102542|
|   500|    471|   1.0|  2.700399|
|    57|    471|   3.0| 3.5570793|
|   462|    471|   2.5|  3.108667|
|   387|    471|   3.0|  2.876326|
|   610|    471|   4.0|  3.183628|
|   217|    471|   2.0|  2.311636|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
output = spark.sql("Select * from movies join alldata on movies.movieId = alldata.movieId")
output.show()

+-------+--------------------+------+------+-------+------+----------+
|movieId|               title|genres|userId|movieId|rating|prediction|
+-------+--------------------+------+------+-------+------+----------+
|    148|Awfully Big Adven...| Drama|   191|    148|   5.0|  4.981635|
|    471|Hudsucker Proxy, ...|Comedy|   133|    471|   4.0| 3.3381379|
|    471|Hudsucker Proxy, ...|Comedy|   597|    471|   2.0| 3.7802052|
|    471|Hudsucker Proxy, ...|Comedy|   385|    471|   4.0| 3.2818017|
|    471|Hudsucker Proxy, ...|Comedy|   436|    471|   3.0|  3.522771|
|    471|Hudsucker Proxy, ...|Comedy|   602|    471|   4.0| 3.8617244|
|    471|Hudsucker Proxy, ...|Comedy|    91|    471|   1.0| 2.5507083|
|    471|Hudsucker Proxy, ...|Comedy|   409|    471|   3.0|  4.060608|
|    471|Hudsucker Proxy, ...|Comedy|   372|    471|   3.0| 2.5710945|
|    471|Hudsucker Proxy, ...|Comedy|   599|    471|   2.5| 2.6305625|
|    471|Hudsucker Proxy, ...|Comedy|   603|    471|   4.0| 3.4769447|
|    4

Recommond movies to user with id: 575, 232.


you can choose some users to recommend the movies


In [None]:
!pip install pyarrow



In [None]:
!pip install koalas



In [None]:
!pip install pyspark



In [None]:
import databricks.koalas as ks

In [None]:
# Generate top 10 movie recommendations for each user
userRecs = best_model.recommendForAllUsers(10)

In [None]:
userRecs.filter(userRecs.userId == 575).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   575|[[6201, 5.2072964...|
+------+--------------------+



In [None]:
userRecs.head()

Row(userId=471, recommendations=[Row(movieId=8477, rating=5.117608070373535), Row(movieId=148881, rating=5.07659912109375), Row(movieId=40491, rating=4.868337154388428), Row(movieId=6818, rating=4.868337154388428), Row(movieId=3379, rating=4.856972694396973), Row(movieId=4495, rating=4.80442476272583), Row(movieId=6201, rating=4.80442476272583), Row(movieId=33649, rating=4.7364277839660645), Row(movieId=58301, rating=4.729963302612305), Row(movieId=53, rating=4.717412948608398)])

In [None]:
user_rec = userRecs.to_koalas()

In [None]:
user_rec.head(5)

Unnamed: 0,userId,recommendations
0,471,"[(8477, 5.117608070373535), (148881, 5.0765991..."
1,463,"[(3379, 5.16340446472168), (33649, 5.082136631..."
2,496,"[(58301, 5.119971752166748), (8254, 5.08419561..."
3,148,"[(33090, 5.134688377380371), (69211, 5.0945620..."
4,540,"[(3379, 5.755271911621094), (33649, 5.69312095..."


In [None]:
movies_koalas = movies_df.to_koalas()

In [None]:
def movie_recommendation(user_rec, userId, movies_koalas):
  rec_movieId = []
  for item in user_rec.loc['userId' == userId][1]:
    rec_movieId.append(item[0])
  return movies_koalas[movies_koalas.movieId.isin(rec_movieId)]

In [None]:
movie_recommendation(user_rec, 575, movies_koalas)

Unnamed: 0,movieId,title,genres
48,53,Lamerica (1994),Adventure|Drama
2523,3379,On the Beach (1959),Drama
3320,4495,Crossing Delancey (1988),Comedy|Romance
4251,6201,Lady Jane (1986),Drama|Romance
4590,6818,Come and See (Idi i smotri) (1985),Drama|War
5202,8477,"Jetée, La (1962)",Romance|Sci-Fi
5906,33649,Saving Face (2004),Comedy|Drama|Romance
6051,40491,"Match Factory Girl, The (Tulitikkutehtaan tytt...",Comedy|Drama
6697,58301,Funny Games U.S. (2007),Drama|Thriller
9170,148881,World of Tomorrow (2015),Animation|Comedy


In [None]:
movie_recommendation(user_rec, 232, movies_koalas)

Unnamed: 0,movieId,title,genres
48,53,Lamerica (1994),Adventure|Drama
2523,3379,On the Beach (1959),Drama
3320,4495,Crossing Delancey (1988),Comedy|Romance
4251,6201,Lady Jane (1986),Drama|Romance
4590,6818,Come and See (Idi i smotri) (1985),Drama|War
5202,8477,"Jetée, La (1962)",Romance|Sci-Fi
5906,33649,Saving Face (2004),Comedy|Drama|Romance
6051,40491,"Match Factory Girl, The (Tulitikkutehtaan tytt...",Comedy|Drama
6697,58301,Funny Games U.S. (2007),Drama|Thriller
9170,148881,World of Tomorrow (2015),Animation|Comedy


Find the similar movies from movie with id 463, 471

You can find the similar movies based on the ALS results

In [None]:
import numpy as np

In [None]:
# Generate top 10 movie recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [None]:
item_factors = best_model.itemFactors

In [None]:
item_factors.show(3)

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[-0.36941087, -0....|
| 20|[-0.21831185, -0....|
| 30|[-1.6858807, -0.2...|
+---+--------------------+
only showing top 3 rows



In [None]:
movie_factors = item_factors.to_koalas()

In [None]:
movie_factors.features[0]

[-0.3694108724594116,
 -0.5022423267364502,
 -1.4914720058441162,
 -1.125838279724121,
 0.6240119338035583]

In [None]:
def similar_movies(features, movieId):

  try: 
    target_id_feature = movie_factors.loc[movie_factors.id == movieId].features.to_numpy()[0]
  except:
    return 'There is no movie with id ' + str(movieId)

  similarities = []
  for feature in movie_factors['features'].to_numpy():
    similarity = np.dot(target_id_feature,feature)/(np.linalg.norm(target_id_feature) * np.linalg.norm(feature))
    similarities.append(similarity)
    
  ks_similarity = ks.DataFrame({'similarity' : similarities}, index = movie_factors.id.to_numpy())
  # top 11 similar movies contain the movie itself with similarity = 1, so I need to remove it. 
  top_11 = ks_similarity.sort_values(by = ['similarity'], ascending = False).head(11)
  joint = top_11.merge(movies_koalas, left_index = True, right_on = 'movieId', how = 'inner')
  joint.sort_values(by = ['similarity'], ascending = False, inplace = True)
  joint.reset_index(inplace = True)
  # take top 10 similar movies
  return joint.loc[1:,['movieId','title','genres']]

In [None]:
similar_movies(features = movie_factors['features'], movieId = 463)

'There is no movie with id 463'

In [None]:
similar_movies(features = movie_factors['features'], movieId = 471)

Unnamed: 0,movieId,title,genres
1,5325,Dogtown and Z-Boyz (2001),Documentary
2,569,Little Big League (1994),Comedy|Drama
3,6252,View from the Top (2003),Comedy|Romance
4,87194,The Way (2010),Adventure|Comedy|Drama
5,2142,"American Tail: Fievel Goes West, An (1991)",Adventure|Animation|Children|Musical|Western
6,446,Farewell My Concubine (Ba wang bie ji) (1993),Drama|Romance
7,1018,That Darn Cat! (1965),Children|Comedy|Mystery
8,3181,Titus (1999),Drama
9,49772,"Painted Veil, The (2006)",Drama|Romance
10,5572,Barbershop (2002),Comedy


In [None]:
similar_movies(features = movie_factors['features'], movieId = 500)

Unnamed: 0,movieId,title,genres
1,36363,Kin-Dza-Dza! (1986),Comedy|Drama|Sci-Fi
2,27320,"Nine Lives of Tomas Katz, The (2000)",Comedy|Drama|Fantasy
3,757,Ashes of Time (Dung che sai duk) (1994),Drama
4,55253,"Lust, Caution (Se, jie) (2007)",Drama|Romance|Thriller|War
5,4019,Finding Forrester (2000),Drama
6,5159,Ferngully: The Last Rainforest (1992),Animation|Children|Comedy|Musical
7,74508,Persuasion (2007),Drama|Romance
8,2585,"Lovers of the Arctic Circle, The (Los Amantes ...",Drama|Romance
9,74510,"Girl Who Played with Fire, The (Flickan som le...",Action|Crime|Drama|Mystery|Thriller
10,43396,"World's Fastest Indian, The (2005)",Drama


### Write the report

motivation: In this notebook, I try to use an Aternating Least Squares(ALS) algorithm with Spark APIs to predict the ratings for the movies in MovieLens small dataset.

Step 1: Data ETL and Data Exploration

* Conducted data preprocessing like splitting genres into categories

* Conducted exploratory data analysis such as counting the number of movies for each category

Step 2: Build a recommendation model based on historical movie ratings and solve it by matrix factorization and alternating least squares (ALS).

Step 3: Tune the model parameters through grid search and crossvalidation with the metric root mean square errors (rmse). 

Step 4: Recommend 10 movies to some certain users and find top 10 similar movies in terms of some specific movies.

Conclusion: The best model has rmse = 0.65 on the training dataset and rmse = 0.88 on the test dataset. The best model I got has 5 latent factors (hidden features). These features describe a movie in 5 dimensions. Based on the feature, I can define the cosine similarity between movies.