### This Notebook serves 2 purposes with Apache Spark
1. It provides an analysis of the Movie Lens Dataset specificaly the movies based on their times and Genres
2. Recommendation engine based on Collabrative filtering

Some of the Links and references I have used are -
1. Dataset can be found [here](https://grouplens.org/datasets/movielens/20m/) or at [kaggle](https://www.kaggle.com/grouplens/movielens-20m-dataset)
2. edX Apache Spark course cs110 Assignment 2 (Movie Recommendation) I have used my own code from that notebook (Not providing my notebook link here just in case course is offered again with same Assignments) Side note -  It is one of the best and the most intense course series I have ever done.
3. A More Scalable Way of Making Recommendations with MLlib - Xiangrui Meng [here](https://www.youtube.com/watch?v=Q0VXllYilM0&)
4. I have used [Databricks community edition cloud](https://community.cloud.databricks.com). This is because we already have the required dataset mounted on the Cloud. Also, Databricks community edition has tons of features (I love the display feature) and the whole system is preconfigured.One important thing here (Also, a differentiating feature) is we do not need to create spark context or sql context object which is already created for us

A caution Note here- 
We won't do collect() here as that will push all the data back to the Driver which might cause out of memory error.

In [2]:
#We already have sc and sqlContext for us here
print sc
print sqlContext

### Getting the data
It is already mounted for us

In [4]:
import os
from databricks_test_helper import Test

dbfs_dir = '/databricks-datasets/cs110x/ml-20m/data-001'

#We will use these 2 files for our analysis and collabrative filtering
ratings_filename = dbfs_dir + '/ratings.csv' 
movies_filename = dbfs_dir + '/movies.csv'

In [5]:
#This is a databricks feature
display(dbutils.fs.ls(dbfs_dir))

In [6]:
dbutils.fs.head(movies_filename)

### A Little analysis on the movies.csv
We will create 2 dataframes for our analysis which will make the visualization with Databricks display function pretty straightforward- 
1. movies_based_on_time - We will drop the genres here final schema will be (movie_id,name, Year)
2. movies_based_on_genres - Final schema would look like (movie_id,name_with_year,one_genre)

From the description at [kaggle](https://www.kaggle.com/grouplens/movielens-20m-dataset) we can see the schema of both the files. for the sake of computation we would explicitly mention the schema(Spark can infer it itself but that involves an action which at most cases we want to minimize)

In [8]:
from pyspark.sql.types import *
#working only on movies.csv right now
movies_with_genres_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType()),
   StructField('genres',StringType())]
  )

movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
  ) #dropping the genres.Also, we will tranform the df to include the Year later

In [9]:
#Creating the dataframes 
raw_movies_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_df_schema).load(movies_filename)
movies_with_genres_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_with_genres_df_schema).load(movies_filename)

### Inspecting the DataFrames before the transformations

In [11]:
raw_movies_df.show(4,truncate = False) #we will also use this for Collabrative filtering
movies_with_genres_df.show(4,truncate = False)

In [12]:
#transforming the Dataframes
from pyspark.sql.functions import split, regexp_extract

# Side note a very nice quote -- Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.(attributed to Jamie #Zawinski)
movies_with_year_df = raw_movies_df.select('ID','title',regexp_extract('title',r'\((\d+)\)',1).alias('year'))

#one genre per row
movies_with_one_genre_df = sqlContext.createDataFrame(movies_with_genres_df.rdd.map(lambda (a,b,c): [(a,b,i) for i in c.split('|')])\
                                                      .flatMap(lambda x:x)).toDF('Id','title','one_genre') 

### DataFrames after Transformation

In [14]:
movies_with_one_genre_df.show(10,truncate = False)
movies_with_year_df.show(4,truncate = False)

### Now we will use the inbuilt functionality of Databricks for some insights

In [16]:
display(movies_with_one_genre_df.groupBy('one_genre').count()) #people love drama

#Below we have a bar chart here we can choose from a lot of other options

In [17]:
#from here we can look at the count and find that the maximum number of movies are produced in 2009
display(movies_with_year_df.groupBy('year').count().orderBy('count',ascending = False))

### Creating Data Frames for the rating and movies
From the description at [kaggle](https://www.kaggle.com/grouplens/movielens-20m-dataset) we can see the schema of both the files. for the sake of computation we would explicitly mention the schema(Spark can infer it itself but that involves an action which at most cases we want to minimize)

In [19]:
from pyspark.sql.types import *

ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType())]
)              #we are dropping the Time Stamp column
movies_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('title', StringType())]
) #dropping genres here
