### Create SPARK_HOME and PYLIB env var and update PATH env var

In [2]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

### Initializing Spark

Build __SparkConf__ object 

    Contains information about your application.  


Create __SparkContext__ object 
    
    Tells Spark how to access a cluster. 
    

Create __SparkSession__ object

    The entry point to programming Spark with the Dataset and DataFrame API.

    Used to create DataFrame, register DataFrame as tables and execute SQL over tables etc.

In [3]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Movie Recommendation Application").setMaster('local')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

### Loading the dependent libraries

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.sql.functions import isnan, when, count, col, countDistinct


#### Problem Statement
 Building a recommender system for movies with a data set from MovieLens.


#### Data Dictionary

Ratings Data File Structure (ratings.csv)
-----------------------------------------

All ratings are contained in the file `ratings.csv`. Each line of this file after the header row represents one rating of one movie by one user, and has the following format:

    userId, movieId, rating, timestamp

The lines within this file are ordered first by userId, then, within user, by movieId.

Ratings are made on a 5-star scale, with half-star increments (0.5 stars - 5.0 stars).

Timestamps represent seconds since midnight Coordinated Universal Time (UTC) of January 1, 1970.


Movies Data File Structure (movies.csv)
---------------------------------------

Movie information is contained in the file `movies.csv`. Each line of this file after the header row represents one movie, and has the following format:

    movieId, title, genres

Genres are a pipe-separated list, and are selected from the following:

* Action
* Adventure
* Animation
* Children's
* Comedy
* Crime
* Documentary
* Drama
* Fantasy
* Film-Noir
* Horror
* Musical
* Mystery
* Romance
* Sci-Fi
* Thriller
* War
* Western
* (no genres listed)


### Reading the movies and ratings data and creating a dataframe

In [6]:
## Read data and create a dataframe
ratingsData = spark.read.format("csv")\
       .option("header", "true")\
       .option("inferSchema", "true")\
       .load("file:///home/1867B39/20180701_Batch39_CSE7322c_Recommendation/ml-latest-small/ml-latest-small/ratings.csv")
    
moviesData = spark.read.format("csv")\
       .option("header","true")\
       .option("inferSchema", "true")\
       .load("file:///home/1867B39/20180701_Batch39_CSE7322c_Recommendation/ml-latest-small/ml-latest-small/movies.csv")
    

### Understanding Data

#### Print Schema

In [9]:
ratingsData.printSchema()
moviesData.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



#### Total number of Columns and Records

In [10]:
print("No. of Columns = {}".format(len(ratingsData.columns)))

print('No. of Records = {}'.format(ratingsData.count()))

print("No. of Columns = {}".format(len(moviesData.columns)))

print('No. of Records = {}'.format(moviesData.count()))

No. of Columns = 4
No. of Records = 100004
No. of Columns = 3
No. of Records = 9125


#### Look at first 3 row of the dataframe

In [11]:
ratingsData.show(3)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
+------+-------+------+----------+
only showing top 3 rows



In [13]:
moviesData.show(3)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



#### Summary statistics

In [18]:
moviesData.describe().show()

+-------+------------------+--------------------+------------------+
|summary|           movieId|               title|            genres|
+-------+------------------+--------------------+------------------+
|  count|              9125|                9125|              9125|
|   mean|31123.291835616437|                null|              null|
| stddev| 40782.63360397416|                null|              null|
|    min|                 1|"""Great Performa...|(no genres listed)|
|    max|            164979| İtirazım Var (2014)|           Western|
+-------+------------------+--------------------+------------------+



Getting the count of Distinct usersIds and movieIDs

In [19]:
print "Number of different users: " + str(ratingsData.select('userId').distinct().count())
print "Number of different movies: " + str(ratingsData.select('movieId').distinct().count())
print "Number of different movies: " + str(moviesData.select('movieId').distinct().count())

Number of different users: 671
Number of different movies: 9066
Number of different movies: 9125


Checking for null values at each column

In [23]:
ratingsData.select([count(when(isnan(c), c)).alias(c) for c in ratingsData.columns]).show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+



In [24]:
moviesData.select([count(when(isnan(c), c)).alias(c) for c in moviesData.columns]).show()

+-------+-----+------+
|movieId|title|genres|
+-------+-----+------+
|      0|    0|     0|
+-------+-----+------+



#### Split the data into training and test sets (30% held out for testing)

In [31]:
#from sklearn.model_selection import train_test_split
#ratingData_train, ratingData_test = train_test_split(ratingsData, test_size=0.30,)
ratingsData.randomSplit([0.7, 0.2, 0.2])

[DataFrame[userId: int, movieId: int, rating: double, timestamp: int],
 DataFrame[userId: int, movieId: int, rating: double, timestamp: int],
 DataFrame[userId: int, movieId: int, rating: double, timestamp: int]]

### Model Building and Evaluation

#### ALS model params


1. numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
2. rank is the number of latent factors in the model (defaults to 10).
3. maxIter is the maximum number of iterations to run (defaults to 10).
4. regParam specifies the regularization parameter in ALS (defaults to 1.0).
5. implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
6. alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
7. nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false).


In [14]:
from pyspark.ml.recommendation import ALS

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als= ALS(userCol="userId",itemCol="movieId",ratingCol="rating",coldStartStrategy='drop')

In [15]:
model = als.fit(trainingData)

In [16]:
# Predicting on the test data
predictions=model.transsform(testData)

In [17]:
predictions.show(5)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   575|    148|   4.0|1012605106|       NaN|
|   242|    463|   4.0| 956685706| 3.7861197|
|   460|    471|   5.0|1072836030| 3.8607845|
|   548|    471|   4.0| 857407799| 3.2966456|
|   491|    471|   3.0| 940797129|  4.539845|
+------+-------+------+----------+----------+
only showing top 5 rows



### Defining the evaluator

In [18]:
from pyspark.ml.evaluation import RegressionEvaluator

#### Evaluation on the test data

In [19]:
evaluator= RegressionEvaluator(metricName="rmse,labelCol="rating"), predictionCol="prediction"

Root-mean-square error = nan


In [None]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = 