In [1]:
RATINGS_PATH = 'ml-20m/ratings.csv'
MOVIE_PATH = 'ml-20m/movies.csv'

In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .config('spark.driver.maxResultSize', '8g')\
        .config('spark.executor.memory' ,'8g')\
        .config('spark.driver.memory', '8g')\
        .appName('EE551')\
        .getOrCreate()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

ratings_schema = StructType([
    StructField('userId', IntegerType()),
    StructField('itemId', IntegerType()),
    StructField('rating', DoubleType()),
    StructField('timestamp', IntegerType()),
])

ratings = spark.read.csv(RATINGS_PATH, 
                        sep=',',
                        header=False,
                        schema=ratings_schema)

In [4]:
ratings = ratings.dropna()

ratings.show(20)

+------+------+------+----------+
|userId|itemId|rating| timestamp|
+------+------+------+----------+
|     1|     2|   3.5|1112486027|
|     1|    29|   3.5|1112484676|
|     1|    32|   3.5|1112484819|
|     1|    47|   3.5|1112484727|
|     1|    50|   3.5|1112484580|
|     1|   112|   3.5|1094785740|
|     1|   151|   4.0|1094785734|
|     1|   223|   4.0|1112485573|
|     1|   253|   4.0|1112484940|
|     1|   260|   4.0|1112484826|
|     1|   293|   4.0|1112484703|
|     1|   296|   4.0|1112484767|
|     1|   318|   4.0|1112484798|
|     1|   337|   3.5|1094785709|
|     1|   367|   3.5|1112485980|
|     1|   541|   4.0|1112484603|
|     1|   589|   3.5|1112485557|
|     1|   593|   3.5|1112484661|
|     1|   653|   3.0|1094785691|
|     1|   919|   3.5|1094785621|
+------+------+------+----------+
only showing top 20 rows



# Sparsity

In [5]:
num_users = ratings.select('userId').distinct().count()
num_movies = ratings.select('itemId').distinct().count()

sparsity = (1.0 - ratings.select('rating').count() * 1.0 / (num_users * num_movies))*100

In [6]:
print('The ratings dataframe is: ', sparsity, '% empty')

The ratings dataframe is:  99.46001521864456 % empty


# Number of movies each user rated

In [7]:
ratings.groupBy('userId').count().show()

+------+-----+
|userId|count|
+------+-----+
|   148|  128|
|   463|   80|
|   471|  548|
|   496|  168|
|   833|   47|
|  1088|   60|
|  1238|   97|
|  1342|   25|
|  1580|   42|
|  1591|   50|
|  1645|  108|
|  1829|  288|
|  1959|  226|
|  2122|  115|
|  2142|   29|
|  2366|   42|
|  2659|  101|
|  2866|  940|
|  3175|   22|
|  3749|   44|
+------+-----+
only showing top 20 rows



# Create test/train splits

In [8]:
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Build ALS model

In [9]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

als = ALS(userCol='userId', 
         itemCol='itemId',
         ratingCol='rating',
         nonnegative=True,
         implicitPrefs=False)

## Hyperperparamters to tune

In [10]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 20]) \
            .addGrid(als.maxIter, [5, 10]) \
            .addGrid(als.regParam, [0.01, 0.05]) \
            .build()

evaluator = RegressionEvaluator(metricName='rmse',
                               labelCol='rating',
                               predictionCol='prediction')

## Cross Validation Pipeline

In [11]:
cv = CrossValidator(estimator=als, 
                    estimatorParamMaps=param_grid, 
                    evaluator=evaluator, 
                    numFolds=5)

## Find the best model

In [None]:
model = cv.fit(train)

best_model = model.bestModel

In [None]:
predictions = best_model.transform(test)

In [None]:
print('RMSE: ', evaluator.evaluate(predictions))