In [1]:
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.sql.functions import *
import pyspark as ps    # for the pyspark suite
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, FloatType, DateType, TimestampType
import pyspark.sql.functions as F
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("df lecture") \
            .getOrCreate()

sc = spark.sparkContext  # for the pre-2.0 sparkContext

## Reading in Training, Test and Validation datasets (ratings)

In [2]:
ratings_schema = StructType( [
    StructField('userId',IntegerType(),True),
    StructField('movieId',IntegerType(),True),
    StructField('rating',FloatType(),True),
    StructField('count_x',IntegerType(),True),
    StructField('mean',FloatType(),True),
    StructField('std',FloatType(),True),
    StructField('stat_score',FloatType(),True),
    StructField('count_y',IntegerType(),True),
    StructField('_merge',StringType(),True)
] )

# read training CSV
train_data = spark.read.csv('data/train_ratings_df.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=False,
                         schema=ratings_schema)

train_data.show(5)
train_data.printSchema()
print("ratings has {} rows".format(train_data.count()))

+------+-------+------+-------+---------+----------+----------+-------+---------+
|userId|movieId|rating|count_x|     mean|       std|stat_score|count_y|   _merge|
+------+-------+------+-------+---------+----------+----------+-------+---------+
| 40630|    933|   4.0|   4827|3.9948208|0.76764643| 5.1462903|     43|left_only|
| 40630|   1035|   4.5|  15248|3.8015149| 1.0843617|  5.428057|     43|left_only|
| 40630|    922|   3.5|   7368| 4.206501| 0.8729187|  5.515879|     43|left_only|
| 40630|    342|   2.0|  10735|3.5076385| 1.0187078| 5.0357003|     43|left_only|
| 40630|   2724|   2.0|   8448|2.8626895|   1.08866| 4.4956794|     43|left_only|
+------+-------+------+-------+---------+----------+----------+-------+---------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- count_x: integer (nullable = true)
 |-- mean: float (nullable = true)
 |-- std: float (nullable = true)
 |-- st

In [3]:
# read validata CSV
validata = spark.read.csv('data/test_ratings_df.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=False,
                         schema=ratings_schema)

validata.show(5)
validata.printSchema()
print("ratings has {} rows".format(validata.count()))

+------+-------+------+-------+---------+---------+----------+-------+------+
|userId|movieId|rating|count_x|     mean|      std|stat_score|count_y|_merge|
+------+-------+------+-------+---------+---------+----------+-------+------+
|     3|    778|   5.0|  28702| 4.007665|0.8891922| 5.3414536|    223|  null|
|     3|    924|   5.0|  28820|3.9813497|1.0409104| 5.5427155|    223|  null|
|     3| 122882|   3.5|  13479|3.8550339|1.0573668|  5.441084|    223|  null|
|     3| 158238|   3.0|   3362| 3.752082|0.8232905|  4.987018|    223|  null|
|     3|  40815|   4.0|  18848|  3.76788|0.9804957| 5.2386236|    223|  null|
+------+-------+------+-------+---------+---------+----------+-------+------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- count_x: integer (nullable = true)
 |-- mean: float (nullable = true)
 |-- std: float (nullable = true)
 |-- stat_score: float (nullable = true)
 |

In [4]:
training, testing = train_data.randomSplit(weights = [0.80, 0.20], seed = 51)
training.cache()
testing.cache()

DataFrame[userId: int, movieId: int, rating: float, count_x: int, mean: float, std: float, stat_score: float, count_y: int, _merge: string]

Let's try a few things on that split...

In [5]:
print("users in training: {}".format(training.select('userId').distinct().count()))
print("users in testing: {}".format(testing.select('userId').distinct().count()))

print("users in both: {}".format(training.select('userId').distinct()\
                                         .join(testing.select('userId').distinct(),
                                               'userId', 'inner')\
                                         .count()))

print("movies in training: {}".format(training.select('movieId').distinct().count()))
print("movies in testing: {}".format(testing.select('movieId').distinct().count()))

print("movies in both: {}".format(training.select('movieId').distinct()\
                                         .join(testing.select('movieId').distinct(),
                                               'movieId', 'inner')\
                                         .count()))

users in training: 104964
users in testing: 104560
users in both: 104560
movies in training: 12594
movies in testing: 12404
movies in both: 12404


## Train and fit and `ALS` algorithm

In [35]:
cv_als = ALS(userCol="userId",
          itemCol="movieId",
          ratingCol="rating")

default_als = cv_als.fit(training)

In [36]:
# Normalizing -- tried this--didn't add improvement to results
# mean_rating, sttdev_rating = training.select(mean("rating"), stddev("rating")).first()
# max_rating,min_rating = training.select(max("rating"), min("rating")).first()

# training=training.withColumn("rating_Normalized", (col("rating") - mean_rating) / sttdev_rating)
# training=training.withColumn("rating_minmax", (col("rating") - min_rating) /(max_rating-min_rating))


In [37]:
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.

# Tried to do the full param search but not enough memory. Notes on individual searches are below.
paramMapExplicit = ParamGridBuilder() \
                    .addGrid(cv_als.rank, [10]) \
                    .addGrid(cv_als.maxIter, [5]) \
                    .addGrid(cv_als.regParam, [0.1, 0.15, 0.2]) \
                    .addGrid(cv_als.alpha, [1, 2.0]) \
                    .build()

# rank [5,10,20] were searched with little difference between them
# maxIter [5,10,20] were searched with little difference between them
# regParam [0.01, 0.1, 0.2, 1] searched. Big falloff on shoulders.
# alpha [1,2] were searched with little difference between them

evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")


# Run cross-validation, and choose the best set of parameters.
CVALSExplicit = CrossValidator(estimator=cv_als,
                            estimatorParamMaps=paramMapExplicit,
                            evaluator=evaluatorR,
                           numFolds=5)

In [38]:
CVModelEXplicit = CVALSExplicit.fit(training)

In [None]:
CVM_predictions = CVModelEXplicit.transform(testing)
CVM_predictions.show(5)

In [None]:
# Evaluate this model using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

rmse = evaluator.evaluate(CVM_predictions.na.fill({'prediction':2.5}))
print("Root-mean-square error = " + str(rmse))
# Optimal results

## ALS fit model

In [10]:
als = ALS(rank=20,
          maxIter=20,
          regParam=0.1,
          alpha=2,
          userCol="userId",
          itemCol="movieId",
          ratingCol="rating")

ALS_fit_model = als.fit(training)

In [11]:
predictions = ALS_fit_model.transform(testing)
predictions.show(5)

+------+-------+------+-------+---------+---------+----------+-------+---------+----------+
|userId|movieId|rating|count_x|     mean|      std|stat_score|count_y|   _merge|prediction|
+------+-------+------+-------+---------+---------+----------+-------+---------+----------+
| 35969|    148|   2.0|    335|2.9089553|1.0417528| 4.4715843|     67|left_only|  2.835187|
|135438|    148|   2.0|    335|2.9089553|1.0417528| 4.4715843|     24|left_only|  2.944143|
|138573|    148|   3.0|    335|2.9089553|1.0417528| 4.4715843|     82|left_only| 2.6523845|
|100706|    148|   3.0|    335|2.9089553|1.0417528| 4.4715843|    118|left_only| 2.6532512|
| 65981|    148|   3.5|    335|2.9089553|1.0417528| 4.4715843|    334|left_only| 2.7382565|
+------+-------+------+-------+---------+---------+----------+-------+---------+----------+
only showing top 5 rows



In [12]:
# Evaluate this model using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

rmse = evaluator.evaluate(predictions.na.fill({'prediction':2.5})) # na.fill really isn't needed b/c of how train/test datasets were formed

print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8016189594801708


## Write model to file

In [13]:
# save best model to specified path
mPath =  "als_recommender"
ALS_fit_model.write().overwrite().save(mPath)

In [14]:
# read pickled model via pipeline api
from pyspark.ml import recommendation
persisted_ALS_model = recommendation.ALSModel.load(mPath)

## Get predictions

In [24]:
validata =validata.toPandas()

In [25]:
sample = validata.sample(1)
sampled_user = sample['userId'].iloc[0] # userId, not index number
sampled_user_movies = validata[validata['userId'] == sampled_user][['movieId','rating']]#'movie Id numbers, not indices'

In [29]:
movie_samps = []
for movie in sampled_user_movies['movieId']:
    movie_samps.append(movie)

In [42]:
d = {'userId': sampled_user, 'movieId': movie_samps[0]}

In [45]:
df = pd.DataFrame(data=d, index='userId')

TypeError: Index(...) must be called with a collection of some kind, 'userId' was passed

In [40]:
df2=spark.createDataFrame(df)
df2

ValueError: If using all scalar values, you must pass an index

In [39]:
single_pred=ALS_fit_model.transform(df2).show()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42705)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42705)

In [36]:
single_pred.withColumn("prediction", ("prediction")).show()

AssertionError: col should be Column

## How does this model compare to those models made on case study day? 
**Case study day best in class was 3.7.**
- Our objective is to integrate the scoring function into a CrossValidation loop of Spark.
- To do that, we create our own spark object based on `RegressionEvaluator` with an overcasted `evaluate()` method of our own. - This method just calls the exact same function used in `src/submit.py`.

In [15]:
class RecoRegressionEvaluation(RegressionEvaluator):
    """ copy/pasted from submit.py """
    @staticmethod
    def _compute_casestudy_score(predictions, actual):
        """Look at 5% of most highly predicted movies for each user.
        Return the average actual rating of those movies.
        """
        df = pd.merge(predictions, actual, on=['userId','movieId']).fillna(1.0)
        #df = pd.concat([predictions.fillna(1.0), actual.actualrating], axis=1)

        # for each user
        g = df.groupby('userId')

        # detect the top_5 movies as predicted by your algorithm
        top_5 = g.rating.transform(
            lambda x: x >= x.quantile(.95)
        )

        # return the mean of the actual score on those
        return df.actualrating[top_5==1].mean()

    def evaluate(self, dataset):
        # experimental
        print("evaluate based on: {}, {}".format(self.getLabelCol(),
                                                 self.getPredictionCol()))
        
        # create a pandas dataframe that corresponds to argument predictions
        pd_pred = dataset.select('userId','movieId',self.getPredictionCol())\
                         .withColumnRenamed(self.getPredictionCol(),'rating')\
                         .toPandas()
                
        # create a pandas dataframe that corresponds to argument actual
        pd_actual = dataset.select('userId','movieId',self.getLabelCol())\
                         .withColumnRenamed(self.getLabelCol(),'actualrating')\
                         .toPandas()
        
        # call the exact same function from submit.py
        return(self._compute_casestudy_score(pd_pred, pd_actual))

**Testing that on that loop 1 ALS.**

In [17]:
rec_evaluator = RecoRegressionEvaluation(labelCol="rating",
                                         predictionCol="prediction")

rec_score = rec_evaluator.evaluate(predictions)

print(rec_score)

evaluate based on: rating, prediction
4.240729331970215


## 3.4. integrate into a `MovieRecommender`-like class

Doing that here will let us integrate this loop more easily with the next loops. All we have learned in this loop is thus contained in a single class.

In [21]:
class MovieRecommender_loop1():
    def fit(self, training_df):
        self.als = ALS(rank=100,
              maxIter=10,
              regParam=0.1,
              userCol="user",
              itemCol="movie",
              ratingCol="rating")

        self.loop1_model = self.als.fit(training_df)

    def transform(self, requests_df):
        return(self.loop1_model.transform(requests_df))

In [22]:
mr1 = MovieRecommender_loop1()

%time mr1.fit(training)

rec_evaluator_loop1 = RecoRegressionEvaluation(labelCol="rating",
                                         predictionCol="prediction")

%time predictions_loop1 = mr1.transform(testing)

print(rec_evaluator_loop1.evaluate(predictions_loop1))

CPU times: user 26.2 ms, sys: 48.2 ms, total: 74.4 ms
Wall time: 1min 31s
CPU times: user 421 µs, sys: 1.12 ms, total: 1.54 ms
Wall time: 105 ms
evaluate based on: rating, prediction
3.53959567516


## 3.5. develop a function to create submission [`loop1.csv`]

We'll use this function based on our loop1 model.

_Note: In the final submission we would train this model on the whole dataset, not on training only._

In [23]:
def write_submission(submission_df, output_filepath):
    with open(output_filepath, 'w') as submissionfile:
        submissionfile.write("user,movie,rating\n")
        for row in submission_df.collect():
            submissionfile.write("{},{},{}\n".format(row['user'],
                                                     row['movie'],
                                                     row['prediction']))

_**Note: When run on the testing set, this one will obtain a score of 3.62**_

In [25]:
write_submission(mr1.transform(requests),
                 '../submissions/loop1.csv')

# 4. Loop 2 : complete `NaN`s with something

**Loop roadmap**:
1. create an average score for each movie
2. use average to fill predicted `NaN`
3. integrate averaging into a `MovieRecommender`-like class
4. develop a pipeline between loop1 and loop2
5. generate a new submission file

Let's first try to figure out how many predictions are `NaN`.

In [26]:
predictions_loop1.select('prediction').filter(F.isnan('prediction')).count()

73743

## 4.1. create an average score for each movie

In [27]:
movies_avgratings = training.select('movie','rating')\
                            .groupBy('movie')\
                            .agg(F.avg('rating'))\
                            .withColumnRenamed('avg(rating)','avg_rating')

movies_avgratings.show(5)

+-----+-----------------+
|movie|       avg_rating|
+-----+-----------------+
|  858|4.524121500893389|
|  593|4.364773319437207|
| 2384|3.217877094972067|
| 1961|4.033648790746582|
| 2019|4.582795698924731|
+-----+-----------------+
only showing top 5 rows



## 4.2. use average to fill predicted `NaN`

Now let's use that to complete predictions when there's `NaN`.
1. We rename `prediction` as `prediction_als`.
2. We join with `avg_rating`
3. We define `prediction` as `avg_rating` if `prediction_als` is `NaN`.

In [28]:
predictions_loop2 = \
    predictions_loop1.withColumnRenamed('prediction','prediction_als')\
                     .join(movies_avgratings, 'movie', 'left')\
                     .withColumn('prediction',
                                 F.when(F.isnan('prediction_als'),
                                        F.col('avg_rating'))\
                                       .otherwise(F.col('prediction_als')))

predictions_loop2.show(5)

+-----+----+------+---------+--------------+------------------+------------------+
|movie|user|rating|timestamp|prediction_als|        avg_rating|        prediction|
+-----+----+------+---------+--------------+------------------+------------------+
|  148| 673|   5.0|975620824|           NaN|2.7777777777777777|2.7777777777777777|
|  148|1242|   3.0|974909976|     2.6576235|2.7777777777777777| 2.657623529434204|
|  148|1069|   2.0|974945135|           NaN|2.7777777777777777|2.7777777777777777|
|  148|1605|   2.0|974930221|     2.1516573|2.7777777777777777|2.1516573429107666|
|  148|1150|   2.0|974875106|           NaN|2.7777777777777777|2.7777777777777777|
+-----+----+------+---------+--------------+------------------+------------------+
only showing top 5 rows



Let's see how THAT performs better.

In [29]:
rec_evaluator_loop2 = RecoRegressionEvaluation(labelCol="rating",
                                         predictionCol="prediction")

rec_score_loop2 = rec_evaluator_loop2.evaluate(predictions_loop2)

print(rec_score_loop2)

evaluate based on: rating, prediction
4.30716103235


## 4.3. integrate averaging into a `MovieRecommender`-like class

Let's put that recommendation into a distinct class, and work on the integration with `MovieRecommender_loop1`.

In [30]:
class MovieRecommender_loop2():
    def fit(self, training_df):
        self.avg_ratings = training_df.select('movie','rating')\
                                      .groupBy('movie')\
                                      .agg(F.avg('rating'))\
                                      .withColumnRenamed('avg(rating)','avg_rating')

    def transform(self, requests_df):
        return(requests_df.join(self.avg_ratings, 'movie', 'left')\
                          .withColumnRenamed('avg_rating','prediction'))

In [31]:
mr2 = MovieRecommender_loop2()

%time mr2.fit(training)

rec_evaluator_loop2 = RecoRegressionEvaluation(labelCol="rating",
                                         predictionCol="prediction")

%time predictions_loop2 = mr2.transform(testing)

print(rec_evaluator_loop2.evaluate(predictions_loop2))

CPU times: user 4.76 ms, sys: 1.77 ms, total: 6.53 ms
Wall time: 42.2 ms
CPU times: user 1.2 ms, sys: 224 µs, total: 1.42 ms
Wall time: 23 ms
evaluate based on: rating, prediction
4.27444565613


## 4.4. develop a pipeline between loop1 and loop2

Now let's build a class that would aggregate ratings from loop1 and loop2.

In [32]:
class MovieRecommender_agg_loop2():
    def __init__(self):
        self.mr1 = MovieRecommender_loop1()
        self.mr2 = MovieRecommender_loop2()
        
    def fit(self, training_df):
        self.mr1.fit(training_df)
        self.mr2.fit(training_df)
        
    def transform(self, requests_df):
        pred_loop1 = self.mr1.transform(requests_df)\
                        .withColumnRenamed('prediction','prediction_loop1')

        pred_loop2 = self.mr2.transform(pred_loop1)\
                        .withColumnRenamed('prediction','prediction_loop2')

        results_loop2 = pred_loop2.withColumn('prediction',
                                      F.when(F.isnan('prediction_loop1'),
                                             F.col('prediction_loop2'))\
                                             .otherwise(F.col('prediction_loop1')))

        #results_loop2.show(5)
        return(results_loop2)

In [33]:
mr_agg2 = MovieRecommender_agg_loop2()

%time mr_agg2.fit(training)

CPU times: user 23.2 ms, sys: 36.3 ms, total: 59.5 ms
Wall time: 1min 9s


In [34]:
rec_evaluator_loop2 = RecoRegressionEvaluation(labelCol="rating",
                                         predictionCol="prediction")

%time predictions_agg2 = mr_agg2.transform(testing)

print(rec_evaluator_loop2.evaluate(predictions_agg2))

CPU times: user 6.53 ms, sys: 2.2 ms, total: 8.72 ms
Wall time: 207 ms
evaluate based on: rating, prediction
4.30716103235


## 4.5. generate a new submission file [`loop2.csv`]

This time, we train on the whole `ratings.csv` dataset instead.

_**Note: When run on the testing set, this one will obtain a score of 4.34**_

In [35]:
mr_agg2 = MovieRecommender_agg_loop2()

%time mr_agg2.fit(ratings)

rec_evaluator_loop2 = RecoRegressionEvaluation(labelCol="rating",
                                         predictionCol="prediction")

write_submission(mr_agg2.transform(requests).na.fill({'prediction':1.0}),
                 '../submissions/loop2.csv')

CPU times: user 27.2 ms, sys: 17.9 ms, total: 45.1 ms
Wall time: 54.7 s
