### This Notebook serves as a tutorial for 2 purposes with Apache Spark
1. It provides an analysis of the Movie Lens Dataset(It shows how the Aggregation methods are simple but very effective)
2. Recommendation engine based on Collabrative filtering (ALS from MLlib)

Some of the Links and references I have used are -
1. Dataset can be found [here](https://grouplens.org/datasets/movielens/20m/) or at [kaggle](https://www.kaggle.com/grouplens/movielens-20m-dataset)
2. edX Apache Spark course cs110 Assignment 2 (Movie Recommendation) I have used my own code from that notebook (Not providing my notebook link here just in case course is offered again with same Assignments) Side note -  It is one of the best and the most intense course series I have ever done.
3. A More Scalable Way of Making Recommendations with MLlib - Xiangrui Meng [here](https://www.youtube.com/watch?v=Q0VXllYilM0&)
4. I have used [Databricks community edition cloud](https://community.cloud.databricks.com). This is because we already have the required dataset mounted on the Cloud. Also, Databricks community edition has tons of features (I love the display feature) and the whole system is preconfigured.One important thing here (Also, a differentiating feature) is we do not need to create spark context or sql context object which is already created for us

A caution Note- 
We won't do collect() here as that will push all the data back to the Driver which might cause out of memory error. 

In [0]:
#We already have sc and sqlContext for us here
print(sc)
print(sqlContext)

<SparkContext master=local[8] appName=Databricks Shell>
<pyspark.sql.context.SQLContext object at 0x7f955d6dfbe0>


### Getting the data
It is already mounted for us

In [0]:
import os
#from databricks_test_helper import Test

dbfs_dir = '/databricks-datasets/cs110x/ml-20m/data-001'

#We will use these 2 files for our analysis and collabrative filtering
ratings_filename = dbfs_dir + '/ratings.csv' 
movies_filename = dbfs_dir + '/movies.csv'

In [0]:
print(ratings_filename)

/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv


In [0]:
#This is a databricks feature
display(dbutils.fs.ls(dbfs_dir))

path,name,size,modificationTime
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/README.txt,README.txt,8964,1471568961000
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/links.csv,links.csv,569517,1471568961000
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/links.csv.gz,links.csv.gz,245973,1471568961000
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/movies.csv,movies.csv,1397542,1471568961000
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/movies.csv.gz,movies.csv.gz,498839,1471568961000
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv,ratings.csv,533444411,1471568961000
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/ratings.csv.gz,ratings.csv.gz,132656084,1471568964000
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/tags.csv,tags.csv,16603996,1471568965000
dbfs:/databricks-datasets/cs110x/ml-20m/data-001/tags.csv.gz,tags.csv.gz,4787917,1471568965000


In [0]:
dbutils.fs.head(movies_filename)

[Truncated to first 65536 bytes]
Out[30]: 'movieId,title,genres\r\n1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy\r\n2,Jumanji (1995),Adventure|Children|Fantasy\r\n3,Grumpier Old Men (1995),Comedy|Romance\r\n4,Waiting to Exhale (1995),Comedy|Drama|Romance\r\n5,Father of the Bride Part II (1995),Comedy\r\n6,Heat (1995),Action|Crime|Thriller\r\n7,Sabrina (1995),Comedy|Romance\r\n8,Tom and Huck (1995),Adventure|Children\r\n9,Sudden Death (1995),Action\r\n10,GoldenEye (1995),Action|Adventure|Thriller\r\n11,"American President, The (1995)",Comedy|Drama|Romance\r\n12,Dracula: Dead and Loving It (1995),Comedy|Horror\r\n13,Balto (1995),Adventure|Animation|Children\r\n14,Nixon (1995),Drama\r\n15,Cutthroat Island (1995),Action|Adventure|Romance\r\n16,Casino (1995),Crime|Drama\r\n17,Sense and Sensibility (1995),Drama|Romance\r\n18,Four Rooms (1995),Comedy\r\n19,Ace Ventura: When Nature Calls (1995),Comedy\r\n20,Money Train (1995),Action|Comedy|Crime|Drama|Thriller\r\n21,Get Shorty

### A Little analysis on the movies.csv
We will create 2 dataframes for our analysis which will make the visualization with Databricks display function pretty straightforward- 
1. movies_based_on_time - We will drop the genres here final schema will be (movie_id,name, Year)
2. movies_based_on_genres - Final schema would look like (movie_id,name_with_year,one_genre)

From the description at [kaggle](https://www.kaggle.com/grouplens/movielens-20m-dataset) we can see the schema of the files. for the sake of computation we would explicitly mention the schema(Spark can infer it itself but that involves an action which at most cases we want to minimize)  

In [0]:
from pyspark.sql.types import *
#working only on movies.csv right now
movies_with_genres_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType()),
   StructField('genres',StringType())]
  )

movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
  ) #dropping the genres.Also, we will tranform the df to include the Year later

In [0]:
#Creating the dataframes 
movies_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_df_schema).load(movies_filename)
movies_with_genres_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_with_genres_df_schema).load(movies_filename)

### Inspecting the DataFrames before the transformations

In [0]:
movies_df.show(4,truncate = False) #we will also use this for Collabrative filtering
movies_with_genres_df.show(4,truncate = False)

+---+------------------------+
|ID |title                   |
+---+------------------------+
|1  |Toy Story (1995)        |
|2  |Jumanji (1995)          |
|3  |Grumpier Old Men (1995) |
|4  |Waiting to Exhale (1995)|
+---+------------------------+
only showing top 4 rows

+---+------------------------+-------------------------------------------+
|ID |title                   |genres                                     |
+---+------------------------+-------------------------------------------+
|1  |Toy Story (1995)        |Adventure|Animation|Children|Comedy|Fantasy|
|2  |Jumanji (1995)          |Adventure|Children|Fantasy                 |
|3  |Grumpier Old Men (1995) |Comedy|Romance                             |
|4  |Waiting to Exhale (1995)|Comedy|Drama|Romance                       |
+---+------------------------+-------------------------------------------+
only showing top 4 rows



In [0]:
# #transforming the Dataframes
# from pyspark.sql.functions import split, regexp_extract

# # Side note a very nice quote -- Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.(attributed to Jamie #Zawinski)
# movies_with_year_df = movies_df.select('ID','title',regexp_extract('title',r'\((\d+)\)',1).alias('year'))

# #one genre per row
# movies_with_one_genre_df = sqlContext.createDataFrame(movies_with_genres_df.rdd.map(lambda (a,b,c): [(a,b,i) for i in c.split('|')])\
#                                                       .flatMap(lambda x:x)).toDF('Id','title','one_genre') 

In [0]:
from pyspark.sql.functions import split, regexp_extract
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Assume movies_df and movies_with_genres_df are already defined DataFrames
# If not, you need to create them before applying transformations

# Extracting the year from the 'title' column
movies_with_year_df = movies_df.select('ID', 'title', regexp_extract('title', r'\((\d+)\)', 1).alias('year'))

# Splitting genres and creating a DataFrame with one genre per row
movies_with_one_genre_df = (
    movies_with_genres_df.rdd.flatMap(lambda row: [(row.ID, row.title, genre) for genre in row.genres.split('|')])
    .toDF(['Id', 'title', 'one_genre'])
)

# Show the resulting DataFrames
movies_with_year_df.show()
movies_with_one_genre_df.show()



+---+--------------------+----+
| ID|               title|year|
+---+--------------------+----+
|  1|    Toy Story (1995)|1995|
|  2|      Jumanji (1995)|1995|
|  3|Grumpier Old Men ...|1995|
|  4|Waiting to Exhale...|1995|
|  5|Father of the Bri...|1995|
|  6|         Heat (1995)|1995|
|  7|      Sabrina (1995)|1995|
|  8| Tom and Huck (1995)|1995|
|  9| Sudden Death (1995)|1995|
| 10|    GoldenEye (1995)|1995|
| 11|American Presiden...|1995|
| 12|Dracula: Dead and...|1995|
| 13|        Balto (1995)|1995|
| 14|        Nixon (1995)|1995|
| 15|Cutthroat Island ...|1995|
| 16|       Casino (1995)|1995|
| 17|Sense and Sensibi...|1995|
| 18|   Four Rooms (1995)|1995|
| 19|Ace Ventura: When...|1995|
| 20|  Money Train (1995)|1995|
+---+--------------------+----+
only showing top 20 rows

+---+--------------------+---------+
| Id|               title|one_genre|
+---+--------------------+---------+
|  1|    Toy Story (1995)|Adventure|
|  1|    Toy Story (1995)|Animation|
|  1|    Toy Story (1

### DataFrames after Transformation

In [0]:
print(movies_with_one_genre_df.show(20,truncate = False))
movies_with_year_df.show(5,truncate = False)

+---+----------------------------------+---------+
|Id |title                             |one_genre|
+---+----------------------------------+---------+
|1  |Toy Story (1995)                  |Adventure|
|1  |Toy Story (1995)                  |Animation|
|1  |Toy Story (1995)                  |Children |
|1  |Toy Story (1995)                  |Comedy   |
|1  |Toy Story (1995)                  |Fantasy  |
|2  |Jumanji (1995)                    |Adventure|
|2  |Jumanji (1995)                    |Children |
|2  |Jumanji (1995)                    |Fantasy  |
|3  |Grumpier Old Men (1995)           |Comedy   |
|3  |Grumpier Old Men (1995)           |Romance  |
|4  |Waiting to Exhale (1995)          |Comedy   |
|4  |Waiting to Exhale (1995)          |Drama    |
|4  |Waiting to Exhale (1995)          |Romance  |
|5  |Father of the Bride Part II (1995)|Comedy   |
|6  |Heat (1995)                       |Action   |
|6  |Heat (1995)                       |Crime    |
|6  |Heat (1995)               

### Now we will use the inbuilt functionality of Databricks for some insights 

In [0]:
display(movies_with_one_genre_df.groupBy('one_genre').count()) #people love drama

#Below we have a bar chart here we can choose from a lot of other options

one_genre,count
Crime,2939
Romance,4127
Thriller,4178
Adventure,2329
Drama,13344
War,1194
Documentary,2471
Fantasy,1412
Mystery,1514
Musical,1036


In [0]:
#from here we can look at the count and find that the maximum number of movies are produced in 2009
display(movies_with_year_df.groupBy('year').count().orderBy('count',ascending = False))

year,count
2009.0,1112
2012.0,1022
2011.0,1016
2013.0,1011
2008.0,979
2010.0,962
2007.0,902
2006.0,855
2005.0,741
2014.0,740


2 Observations from movies.csv
1. People love Drama.
2. And there are lot of movies each year.

### Now let's move to Ratings

We already have the movie_df now we will require ratings Lets create the Dataframe

In [0]:
#again for avoiding the action we are explicitly defining the schema
ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)              #we are dropping the Time Stamp column

In [0]:
#creating the df
ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(ratings_df_schema).load(ratings_filename)
ratings_df.show(4)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
+------+-------+------+
only showing top 4 rows



In [0]:
#We will cache both the dataframes
ratings_df.cache()
movies_df.cache()
print("both dataframes are in cache now for easy accessibility")

both dataframes are in cache now for easy accessibility


### Global Popularity 
It is good to know the most popular movies,and at times it is very hard to just beat popularity [Xavier Amatriain Lecture](https://www.youtube.com/watch?v=bLhq63ygoU8)
 Movies with highest average ratings here we will put a constraint on the no. of reviews given we will discard the movies where the count of ratings is less than 500.

In [0]:
from pyspark.sql import functions as F

# From ratingsDF, create a movie_ids_with_avg_ratings_df that combines the two DataFrames
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias("count"), F.avg(ratings_df.rating).alias("average"))
print('movie_ids_with_avg_ratings_df:')
movie_ids_with_avg_ratings_df.show(4, truncate=False)

movie_ids_with_avg_ratings_df:
+-------+-----+------------------+
|movieId|count|average           |
+-------+-----+------------------+
|3997   |2047 |2.0703468490473864|
|1580   |35580|3.55831928049466  |
|3918   |1246 |2.918940609951846 |
|2366   |6627 |3.5492681454655197|
+-------+-----+------------------+
only showing top 4 rows



In [0]:
#this df will have names with movie_id- Make it more understandable
movie_names_with_avg_ratings_df = movie_ids_with_avg_ratings_df.join(movies_df,F.col('movieID') == F.col('ID')).drop('ID')
movie_names_with_avg_ratings_df.show(4,truncate = False)

+-------+-----+------------------+--------------------------------+
|movieId|count|average           |title                           |
+-------+-----+------------------+--------------------------------+
|3997   |2047 |2.0703468490473864|Dungeons & Dragons (2000)       |
|1580   |35580|3.55831928049466  |Men in Black (a.k.a. MIB) (1997)|
|3918   |1246 |2.918940609951846 |Hellbound: Hellraiser II (1988) |
|2366   |6627 |3.5492681454655197|King Kong (1933)                |
+-------+-----+------------------+--------------------------------+
only showing top 4 rows



In [0]:
#so let us see the global popularity
movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter(movie_names_with_avg_ratings_df['count'] >= 500).orderBy('average',ascending = False)
movies_with_500_ratings_or_more.show(truncate = False)

+-------+-----+------------------+---------------------------------------------------------------------------+
|movieId|count|average           |title                                                                      |
+-------+-----+------------------+---------------------------------------------------------------------------+
|318    |63366|4.446990499637029 |Shawshank Redemption, The (1994)                                           |
|858    |41355|4.364732196832306 |Godfather, The (1972)                                                      |
|50     |47006|4.334372207803259 |Usual Suspects, The (1995)                                                 |
|527    |50054|4.310175010988133 |Schindler's List (1993)                                                    |
|1221   |27398|4.275640557704942 |Godfather: Part II, The (1974)                                             |
|2019   |11611|4.2741796572216   |Seven Samurai (Shichinin no samurai) (1954)                                |
|

A good thing to notice above is it has a lot of similarity with the [IMDB top 250](http://www.imdb.com/chart/top)

If there is a cold start problem (New user) we can just recommend the global populars

## Collaborative filtering now
[wikipedia article here](https://en.wikipedia.org/wiki/Collaborative_filtering). We will use the Matrix Factorization algoithm present in spark MLlib called [ALS quora explaination](https://www.quora.com/What-is-the-Alternating-Least-Squares-method-in-recommendation-systems)

<img alt="factorization" src="http://spark-mooc.github.io/web-assets/images/matrix_factorization.png" style="width: 885px"/>

### Splitting in Train, Test and Validation dataset

As with all the Machine Learning Algorithms in practice we have to tune parameters and then test accuracy.For this we will split the data into 3 parts Train, Test(Checking the final accuracy) and Validation(optimizing hyperparameters) data. For more information about this [brilliant lecture by Nando](https://www.youtube.com/watch?v=PvuN23m7hhY)

In [0]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing
seed = 4
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6,0.2,0.2],seed)

# Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(4,truncate = False)
validation_df.show(4,truncate = False)
test_df.show(4,truncate = False)

Training: 11997031, validation: 4003826, test: 3999406

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |29     |3.5   |
|1     |32     |3.5   |
|1     |47     |3.5   |
|1     |50     |3.5   |
+------+-------+------+
only showing top 4 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |260    |4.0   |
|1     |318    |4.0   |
|1     |541    |4.0   |
|1     |589    |3.5   |
+------+-------+------+
only showing top 4 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|1     |2      |3.5   |
|1     |223    |4.0   |
|1     |367    |3.5   |
|1     |924    |3.5   |
+------+-------+------+
only showing top 4 rows



From above we can see approximately 10 million training samples, 4 million validation and 4 million test samples

### Alternating Least Square (ALS)
the documentation can be found [here](http://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS)

Need of Cross validation, some problems and solutions here I am copying it directly from the Assignment notebook

A challenge for collaborative filtering is how to provide ratings to a new user (a user who has not provided *any* ratings at all). Some recommendation systems choose to provide new users with a set of default ratings (e.g., an average value across all ratings), while others choose to provide no ratings for new users. Spark's ALS algorithm yields a NaN (`Not a Number`) value when asked to provide a rating for a new user.

Using the ML Pipeline's [CrossValidator](http://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) with ALS is thus problematic, because cross validation involves dividing the training data into a set of folds (e.g., three sets) and then using those folds for testing and evaluating the parameters during the parameter grid search process. It is likely that some of the folds will contain users that are not in the other folds, and, as a result, ALS produces NaN values for those new users. When the CrossValidator uses the Evaluator (RMSE) to compute an error metric, the RMSE algorithm will return NaN. This will make *all* of the parameters in the parameter grid appear to be equally good (or bad).

You can read the discussion on [Spark JIRA 14489](https://issues.apache.org/jira/browse/SPARK-14489) about this issue. There are proposed workarounds of having ALS provide default values or having RMSE drop NaN values. Both introduce potential issues. We have chosen to have RMSE drop NaN values. While this does not solve the underlying issue of ALS not predicting a value for a new user, it does provide some evaluation value. We manually implement the parameter grid search process using a for loop (below) and remove the NaN values before using RMSE.

For a production application, you would want to consider the tradeoffs in how to handle new users.

I will try to write comments as explicit as possible in the next cell.

In [0]:
from pyspark.ml.recommendation import ALS

# our ALS learner
als = ALS()

# Now we set the parameters for the method
als.setMaxIter(5)\
   .setSeed(seed)\
   .setRegParam(0.1)\
   .setUserCol('userId')\
   .setItemCol('movieId')\
   .setRatingCol('rating')

# Now let's compute an evaluation metric for our test and validation dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
#it will essentially calculate the rmse score based on these columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

tolerance = 0.03

#Now to understand rank let us initially assume that my recommendation matrix is 1000 * 1000 (1000 users and 1000 products this is a very sparse matrix)
#Now what we do is we get 2 matrices P (shape 1000 * rank) and Q (shape rank * 1000) so essentially now if we multiply them I get the same but now the storage has decreased from storing 1000 * 1000 numbers to 2 * 1000 * rank (for rank = 4 we only need 8000 numbers compared to 1000000)  
ranks = [4, 8, 12] 
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
  # Set the rank here:
  als.setRank(rank)
  # Create the model with these parameters.
  model = als.fit(training_df)
  # Run the model to create a prediction. Predict against the validation_df.
  predict_df = model.transform(validation_df)

  # Remove NaN values from prediction (due to SPARK-14489)
  predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

  # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
  error = reg_eval.evaluate(predicted_ratings_df)
  errors[err] = error
  models[err] = model
  print('For rank %s the RMSE is %s' % (rank, error))
  if error < min_error:
    min_error = error
    best_rank = err
  err += 1

als.setRank(ranks[best_rank])
print('The best model was trained with rank %s' % ranks[best_rank])
my_model = models[best_rank]

For rank 4 the RMSE is 0.8231040294640697
For rank 8 the RMSE is 0.8148873231719284
For rank 12 the RMSE is 0.8301578761278824
The best model was trained with rank 8


### Testing our Model

Again we will filter out where the prediction is NaN


In [0]:
# TODO: Replace <FILL_IN> with the appropriate code
# In ML Pipelines, this next step has a bug that produces unwanted NaN values. We
# have to filter them out. See https://issues.apache.org/jira/browse/SPARK-14489
predict_df = my_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE))

The model had a RMSE on the test set of 0.8142072914578944


### It is always good to just compare with the by Default model
Where we will just get the global average rating for our training dataset and get the RMSE based on it 

In [0]:
default_value = training_df.agg(F.avg('rating')).collect()[0][0]
print(default_value) 

3.5256308831743453


In [0]:
# Add a column with the average rating -- getting the RMSE based on a default value (which is same throughout)
test_for_avg_df = test_df.withColumn('prediction', F.lit(default_value))

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg_RMSE = reg_eval.evaluate(test_for_avg_df)

print("The RMSE on the average set is {0}".format(test_avg_RMSE)) 

The RMSE on the average set is 1.051698647311728


Looking at the value we can say we have definitely improved on the RMSE

### Prediction based on our watched Movies

In [0]:
#lets look at the top movies because there would be high chance if I have seen them
display(movies_with_500_ratings_or_more)

movieId,count,average,title
318,63366,4.446990499637029,"Shawshank Redemption, The (1994)"
858,41355,4.364732196832306,"Godfather, The (1972)"
50,47006,4.334372207803259,"Usual Suspects, The (1995)"
527,50054,4.310175010988133,Schindler's List (1993)
1221,27398,4.275640557704942,"Godfather: Part II, The (1974)"
2019,11611,4.2741796572216,Seven Samurai (Shichinin no samurai) (1954)
904,17449,4.271333600779414,Rear Window (1954)
7502,4305,4.263182346109176,Band of Brothers (2001)
912,24349,4.258326830670664,Casablanca (1942)
922,6525,4.256934865900383,Sunset Blvd. (a.k.a. Sunset Boulevard) (1950)


Let me pick 4 movies and put the ratings in:
<pre>
My pick                     - my movie_id - my stars
Shawshank Redemption          318           3
12 angry men                  1203          5
Forrest Gump                  356           5
GodFather                     858           2 (Sorry in advance If the reader like it and I don't)
</pre>

### Putting the values into Training dataset and training it again
As the User id 0 is not used so will use that for the user rating


In [0]:
from pyspark.sql import Row
my_user_id = 0

# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
my_rated_movies = [
     (0,318,3),(0,1203,5),(0,356,5),(0,858,2)
     # The format of each line is (my_user_id, movie ID, your rating)
     ]

my_ratings_df = sqlContext.createDataFrame(my_rated_movies, ['userId','movieId','rating'])
print('My movie ratings:')
my_ratings_df.show()

My movie ratings:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     0|    318|     3|
|     0|   1203|     5|
|     0|    356|     5|
|     0|    858|     2|
+------+-------+------+



In [0]:
#Now adding my_ratings to the training_df
training_with_my_ratings_df = training_df.unionAll(my_ratings_df)
print("the train data has %s more entries now"%(training_with_my_ratings_df.count() - training_df.count()))

the train data has 4 more entries now


In [0]:
# TODO: Replace <FILL IN> with appropriate code

# Reset the parameters for the ALS object.
als.setPredictionCol("prediction")\
   .setMaxIter(5)\
   .setSeed(seed)\
   .setRegParam(0.1)\
   .setUserCol('userId')\
   .setItemCol('movieId')\
   .setRatingCol('rating')\
   .setRank(8)   #we got rank 8 as optimal


# Create the model with these parameters.
my_ratings_model = als.fit(training_with_my_ratings_df)

Looking for RMSE again

In [0]:
my_predict_df = my_ratings_model.transform(test_df)

# Remove NaN values from prediction
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_my_ratings_df DataFrame
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-301378602049574>:1[0m
[0;32m----> 1[0m my_predict_df [38;5;241m=[39m my_ratings_model[38;5;241m.[39mtransform(test_df)
[1;32m      3[0m [38;5;66;03m# Remove NaN values from prediction[39;00m
[1;32m      4[0m predicted_test_my_ratings_df [38;5;241m=[39m my_predict_df[38;5;241m.[39mfilter(my_predict_df[38;5;241m.[39mprediction [38;5;241m!=[39m [38;5;28mfloat[39m([38;5;124m'[39m[38;5;124mnan[39m[38;5;124m'[39m))

[0;31mNameError[0m: name 'my_ratings_model' is not defined

### Now finding the Movies which best suites me :)

Some steps how we would achieve this:
1. Here I have to first create a DF which has all the movies except what I already rated user id should be 0.
2. Then predict the rating for them. 
3. And then finally choose the best 50 movies 

In [0]:
my_rated_movies

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-301378602049576>:1[0m
[0;32m----> 1[0m [43mmy_rated_movies[49m

[0;31mNameError[0m: name 'my_rated_movies' is not defined

In [0]:
# Create a list of my rated movie IDs
my_rated_movie_ids = [x[1] for x in my_rated_movies]

# Filter out the movies I already rated.'~' sign will make sure not to include them.
not_rated_df = movies_df.filter(~ movies_df['ID'].isin(my_rated_movie_ids))

# Rename the "ID" column to be "movieId", and add a column with my_user_id as "userId".
my_unrated_movies_df = not_rated_df.withColumnRenamed('ID','movieId').withColumn('userId',F.lit(0))

# Use my_rating_model to predict ratings for the movies that I did not manually rate.
raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)

predicted_ratings_df = raw_predicted_ratings_df.filter(raw_predicted_ratings_df['prediction'] != float('nan'))

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-301378602049577>:2[0m
[1;32m      1[0m [38;5;66;03m# Create a list of my rated movie IDs[39;00m
[0;32m----> 2[0m my_rated_movie_ids [38;5;241m=[39m [x[[38;5;241m1[39m] [38;5;28;01mfor[39;00m x [38;5;129;01min[39;00m my_rated_movies]
[1;32m      4[0m [38;5;66;03m# Filter out the movies I already rated.'~' sign will make sure not to include them.[39;00m
[1;32m      5[0m not_rated_df [38;5;241m=[39m movies_df[38;5;241m.[39mfilter([38;5;241m~[39m movies_df[[38;5;124m'[39m[38;5;124mID[39m[38;5;124m'[39m][38;5;241m.[39misin(my_rated_movie_ids))

[0;31mNameError[0m: name 'my_rated_movies' is not defined

In [0]:
#some sample ratings
predicted_ratings_df.show(4,truncate = False)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-301378602049578>:2[0m
[1;32m      1[0m [38;5;66;03m#some sample ratings[39;00m
[0;32m----> 2[0m [43mpredicted_ratings_df[49m[38;5;241m.[39mshow([38;5;241m4[39m,truncate [38;5;241m=[39m [38;5;28;01mFalse[39;00m)

[0;31mNameError[0m: name 'predicted_ratings_df' is not defined

One Last trick I don't want to see a movie which is very new I mean it should atleast have some reviews say 400 here

In [0]:
predicted_with_counts_df = predicted_ratings_df.join(movie_names_with_avg_ratings_df,predicted_ratings_df.movieId== movie_names_with_avg_ratings_df.movieId)
predicted_highest_rated_movies_df = predicted_with_counts_df.filter(predicted_with_counts_df['count'] > 400).sort('prediction',ascending = False)

print ('My 50 highest rated movies as predicted (for movies with more than 400 reviews):')
predicted_highest_rated_movies_df.show(50,truncate = False)

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-301378602049580>:1[0m
[0;32m----> 1[0m predicted_with_counts_df [38;5;241m=[39m predicted_ratings_df[38;5;241m.[39mjoin(movie_names_with_avg_ratings_df,predicted_ratings_df[38;5;241m.[39mmovieId[38;5;241m==[39m movie_names_with_avg_ratings_df[38;5;241m.[39mmovieId)
[1;32m      2[0m predicted_highest_rated_movies_df [38;5;241m=[39m predicted_with_counts_df[38;5;241m.[39mfilter(predicted_with_counts_df[[38;5;124m'[39m[38;5;124mcount[39m[38;5;124m'[39m] [38;5;241m>[39m [38;5;241m400[39m)[38;5;241m.[39msort([38;5;124m'[39m[38;5;124mprediction[39m[38;5;124m'[39m,ascending [38;5;241m=[39m [38;5;28;01mFalse[39;00m)
[1;32m      4[0m [38;5;28mprint[39m ([38;5;124m'[39m[38;5;124mMy 50 highest rated movies as predicted (for movies with more than 400 reviews)

Cool Sleepless in Seattle - Tom Hanks Movie will watch it

Next steps:
1. Use another dataset (to get some content) so that it will be hybrid
2. Probably use Deep Learning in a way