In [0]:
#We already have sc and sqlContext for us here
print(sc)
print(sqlContext)

### Getting the data
It is already mounted for us

In [0]:
#We will use these 2 files for our analysis and collabrative filtering
ratings_filename = "dbfs:/mnt/Files/Validated/ratings.csv"
movies_filename ="dbfs:/mnt/Files/Validated/movies.csv"

In [0]:
%fs 
ls dbfs:/mnt/Files/Validated/

In [0]:
dbutils.fs.head(movies_filename)

### A Little analysis on the movies.csv
We will create 2 dataframes for our analysis which will make the visualization with Databricks display function pretty straightforward- 
1. movies_based_on_time - We will drop the genres here final schema will be (movie_id,name, Year)
2. movies_based_on_genres - Final schema would look like (movie_id,name_with_year,one_genre)

From the description at [kaggle](https://www.kaggle.com/grouplens/movielens-20m-dataset) we can see the schema of the files. for the sake of computation we would explicitly mention the schema(Spark can infer it itself but that involves an action which at most cases we want to minimize)

In [0]:
from pyspark.sql.types import *
#working only on movies.csv right now
movies_with_genres_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType()),
   StructField('genres',StringType())]
  )

movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
  ) #dropping the genres.Also, we will tranform the df to include the Year later

In [0]:
#Creating the dataframes 
movies_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_df_schema).load(movies_filename)
movies_with_genres_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_with_genres_df_schema).load(movies_filename)

### Inspecting the DataFrames before the transformations

In [0]:
movies_df.show(4,truncate = False) #we will also use this for Collabrative filtering
movies_with_genres_df.show(4,truncate = False)

In [0]:
#transforming the Dataframes
from pyspark.sql.functions import split, regexp_extract

movies_with_year_df = movies_df.select('ID','title',regexp_extract('title',r'\((\d+)\)',1).alias('year'))

#one genre per row
movies_with_one_genre_df = sqlContext.createDataFrame(movies_with_genres_df.rdd.map(lambda x: [(x[0],x[1],i) for i in x[2].split('|')])\
    .flatMap(lambda x:x)).toDF('Id','title','one_genre')

### DataFrames after Transformation

In [0]:
movies_with_one_genre_df.show(10,truncate = False)
movies_with_year_df.show(4,truncate = False)

### Now we will use the inbuilt functionality of Databricks for some insights

In [0]:
display(movies_with_one_genre_df.groupBy('one_genre').count().sort('count',ascending=False)) #people love drama

#Below we have a bar chart here we can choose from a lot of other options

In [0]:
#from here we can look at the count and find that the maximum number of movies are produced in 2009
display(movies_with_year_df.groupBy('year').count().orderBy('count',ascending = False))

2 Observations from movies.csv
1. People love Drama.
2. And there are lot of movies each year.

### Now let's move to Ratings

We already have the movie_df now we will require ratings Lets create the Dataframe

In [0]:
#again for avoiding the action we are explicitly defining the schema
ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)              #we are dropping the Time Stamp column

In [0]:
#creating the df
ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(ratings_df_schema).load(ratings_filename)
ratings_df.show(4)

In [0]:
#We will cache both the dataframes
ratings_df.cache()
movies_df.cache()
print("both dataframes are in cache now for easy accessibility")

### Global Popularity 
It is good to know the most popular movies,and at times it is very hard to just beat popularity [Xavier Amatriain Lecture](https://www.youtube.com/watch?v=bLhq63ygoU8)
 Movies with highest average ratings here we will put a constraint on the no. of reviews given we will discard the movies where the count of ratings is less than 500.

In [0]:
from pyspark.sql import functions as F

# From ratingsDF, create a movie_ids_with_avg_ratings_df that combines the two DataFrames
movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias("count"), F.avg(ratings_df.rating).alias("average"))
print('movie_ids_with_avg_ratings_df:')
movie_ids_with_avg_ratings_df.show(4, truncate=False)

In [0]:
#this df will have names with movie_id- Make it more understandable
movie_names_with_avg_ratings_df = movie_ids_with_avg_ratings_df.join(movies_df,F.col('movieID') == F.col('ID')).drop('ID')
movie_names_with_avg_ratings_df.show(4,truncate = False)

In [0]:
#so let us see the global popularity
movies_with_500_ratings_or_more = movie_names_with_avg_ratings_df.filter(movie_names_with_avg_ratings_df['count'] >= 500).orderBy('average',ascending = False)
movies_with_500_ratings_or_more.show(truncate = False)

## Collaborative filtering now
[wikipedia article here](https://en.wikipedia.org/wiki/Collaborative_filtering). We will use the Matrix Factorization algoithm present in spark MLlib called [ALS quora explaination](https://www.quora.com/What-is-the-Alternating-Least-Squares-method-in-recommendation-systems)

<img alt="factorization" src="http://spark-mooc.github.io/web-assets/images/matrix_factorization.png" style="width: 885px"/>

### Splitting in Train, Test and Validation dataset

As with all the Machine Learning Algorithms in practice we have to tune parameters and then test accuracy.For this we will split the data into 3 parts Train, Test(Checking the final accuracy) and Validation(optimizing hyperparameters) data. For more information about this [brilliant lecture by Nando](https://www.youtube.com/watch?v=PvuN23m7hhY)

In [0]:
# We'll hold out 60% for training, 20% of our data for validation, and leave 20% for testing
seed = 4
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6,0.2,0.2],seed)

# Let's cache these datasets for performance
training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(4,truncate = False)
validation_df.show(4,truncate = False)
test_df.show(4,truncate = False)

From above we can see approximately 10 million training samples, 4 million validation and 4 million test samples

### Alternating Least Square (ALS)
the documentation can be found [here](http://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS)

Need of Cross validation, some problems and solutions here I am copying it directly from the Assignment notebook

A challenge for collaborative filtering is how to provide ratings to a new user (a user who has not provided *any* ratings at all). Some recommendation systems choose to provide new users with a set of default ratings (e.g., an average value across all ratings), while others choose to provide no ratings for new users. Spark's ALS algorithm yields a NaN (`Not a Number`) value when asked to provide a rating for a new user.

Using the ML Pipeline's [CrossValidator](http://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) with ALS is thus problematic, because cross validation involves dividing the training data into a set of folds (e.g., three sets) and then using those folds for testing and evaluating the parameters during the parameter grid search process. It is likely that some of the folds will contain users that are not in the other folds, and, as a result, ALS produces NaN values for those new users. When the CrossValidator uses the Evaluator (RMSE) to compute an error metric, the RMSE algorithm will return NaN. This will make *all* of the parameters in the parameter grid appear to be equally good (or bad).

You can read the discussion on [Spark JIRA 14489](https://issues.apache.org/jira/browse/SPARK-14489) about this issue. There are proposed workarounds of having ALS provide default values or having RMSE drop NaN values. Both introduce potential issues. We have chosen to have RMSE drop NaN values. While this does not solve the underlying issue of ALS not predicting a value for a new user, it does provide some evaluation value. We manually implement the parameter grid search process using a for loop (below) and remove the NaN values before using RMSE.

For a production application, you would want to consider the tradeoffs in how to handle new users.

I will try to write comments as explicit as possible in the next cell.

In [0]:
from pyspark.ml.recommendation import ALS

# our ALS learner
als = ALS()

# Now we set the parameters for the method
als.setMaxIter(5)\
   .setSeed(seed)\
   .setRegParam(0.1)\
   .setUserCol('userId')\
   .setItemCol('movieId')\
   .setRatingCol('rating')


In [0]:

# Now let's compute an evaluation metric for our test and validation dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
#it will essentially calculate the rmse score based on these columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

tolerance = 0.03

#Now to understand rank let us initially assume that my recommendation matrix is 1000 * 1000 (1000 users and 1000 products this is a very sparse matrix)
#Now what we do is we get 2 matrices P (shape 1000 * rank) and Q (shape rank * 1000) so essentially now if we multiply them I get the same but now the storage has decreased from storing 1000 * 1000 numbers to 2 * 1000 * rank (for rank = 4 we only need 8000 numbers compared to 1000000)  
ranks = [4, 8, 12] 
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1
for rank in ranks:
  # Set the rank here:
  als.setRank(rank)
  # Create the model with these parameters.
  model = als.fit(training_df)
  # Run the model to create a prediction. Predict against the validation_df.
  predict_df = model.transform(validation_df)

  # Remove NaN values from prediction (due to SPARK-14489)
  predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

  # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
  error = reg_eval.evaluate(predicted_ratings_df)
  errors[err] = error
  models[err] = model
  print('For rank %s the RMSE is %s' % (rank, error))
  if error < min_error:
    min_error = error
    best_rank = err
  err += 1

als.setRank(ranks[best_rank])
print('The best model was trained with rank %s' % ranks[best_rank])
my_model = models[best_rank]

### Testing our Model

Again we will filter out where the prediction is NaN

In [0]:
# In ML Pipelines, this next step has a bug that produces unwanted NaN values. We
# have to filter them out. See https://issues.apache.org/jira/browse/SPARK-14489
predict_df = my_model.transform(test_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE))

### It is always good to just compare with the by Default model
Where we will just get the global average rating for our training dataset and get the RMSE based on it

In [0]:
default_value = training_df.agg(F.avg('rating')).collect()[0][0]
print(default_value) 

In [0]:
# Add a column with the average rating -- getting the RMSE based on a default value (which is same throughout)
test_for_avg_df = test_df.withColumn('prediction', F.lit(default_value))

# Run the previously created RMSE evaluator, reg_eval, on the test_for_avg_df DataFrame
test_avg_RMSE = reg_eval.evaluate(test_for_avg_df)

print("The RMSE on the average set is {0}".format(test_avg_RMSE)) 

Looking at the value we can say we have definitely improved on the RMSE

### Prediction based on our watched Movies

In [0]:
#lets look at the top movies because there would be high chance if I have seen them
display(movies_with_500_ratings_or_more)

Let me pick 4 movies and put the ratings in:
<pre>
My pick                     - my movie_id - my stars
Shawshank Redemption          318           5
12 angry men                  1203          4
Forrest Gump                  356           5
GodFather                     858           5
</pre>

### Putting the values into Training dataset and training it again
As the User id 0 is not used so will use that for the user rating

In [0]:
from pyspark.sql import Row
my_user_id = 0

# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
my_rated_movies = [
     (0,318,5),(0,1203,4),(0,356,5),(0,858,5)
     # The format of each line is (my_user_id, movie ID, your rating)
     ]

my_ratings_df = sqlContext.createDataFrame(my_rated_movies, ['userId','movieId','rating'])
print('My movie ratings:')
my_ratings_df.show()

In [0]:
#Now adding my_ratings to the training_df
training_with_my_ratings_df = training_df.unionAll(my_ratings_df)
print("the train data has %s more entries now"%(training_with_my_ratings_df.count() - training_df.count()))

In [0]:
# TODO: Replace <FILL IN> with appropriate code

# Reset the parameters for the ALS object.
als.setPredictionCol("prediction")\
   .setMaxIter(5)\
   .setSeed(seed)\
   .setRegParam(0.1)\
   .setUserCol('userId')\
   .setItemCol('movieId')\
   .setRatingCol('rating')\
   .setRank(8)   #we got rank 8 as optimal


# Create the model with these parameters.
my_ratings_model = als.fit(training_with_my_ratings_df)

Looking for RMSE again

In [0]:
my_predict_df = my_ratings_model.transform(test_df)

# Remove NaN values from prediction
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_my_ratings_df DataFrame
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))

### Now finding the Movies which best suites me :)

Some steps how we would achieve this:
1. Here I have to first create a DF which has all the movies except what I already rated user id should be 0.
2. Then predict the rating for them. 
3. And then finally choose the best 50 movies

In [0]:
my_rated_movies

In [0]:
# Create a list of my rated movie IDs
my_rated_movie_ids = [x[1] for x in my_rated_movies]

# Filter out the movies I already rated.'~' sign will make sure not to include them.
not_rated_df = movies_df.filter(~ movies_df['ID'].isin(my_rated_movie_ids))

# Rename the "ID" column to be "movieId", and add a column with my_user_id as "userId".
my_unrated_movies_df = not_rated_df.withColumnRenamed('ID','movieId').withColumn('userId',F.lit(0))

# Use my_rating_model to predict ratings for the movies that I did not manually rate.
raw_predicted_ratings_df = my_ratings_model.transform(my_unrated_movies_df)

predicted_ratings_df = raw_predicted_ratings_df.filter(raw_predicted_ratings_df['prediction'] != float('nan'))

In [0]:
#some sample ratings
predicted_ratings_df.show(4,truncate = False)

One Last trick I don't want to see a movie which is very new I mean it should atleast have some reviews say 400 here

In [0]:
predicted_with_counts_df = predicted_ratings_df.join(movie_names_with_avg_ratings_df,predicted_ratings_df.movieId== movie_names_with_avg_ratings_df.movieId)
predicted_highest_rated_movies_df = predicted_with_counts_df.filter(predicted_with_counts_df['count'] > 400).sort('prediction',ascending = False)

print ('My 50 highest rated movies as predicted (for movies with more than 400 reviews):')
display(predicted_highest_rated_movies_df.show(50,truncate = False))