# Spark Recommender Pair Exercise

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
import pyspark
from pyspark.sql.types import *
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [3]:
# Build our Spark Session and Context
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext
spark, sc;

## Loading  and Cleaning Data

We have two options here

1. Load data into a Pandas dataframe, convert to a Spark dataframe
    * Careful! This only works because our dataset is small. Usually when we use Spark our datasets are too large to fit in memory.
2. Load data into a Spark RDD, convert to a Spark dataframe

### Pandas

In [4]:
# Read the ratings data into a Pandas DataFrame
ratings_pd_df = pd.read_csv('data/training.csv')

In [5]:
# Convert to a Spark DataFrame, dropping `timestamp` column will happend before the train
ratings_df = spark.createDataFrame(ratings_pd_df) #.drop('timestamp', axis=1))

In [None]:
# Show Pandas DataFrame
# ratings_pd_df.head()

In [None]:
# Check to see we built the DataFrame
# ratings_df

# Take a look at the entries in the DataFrame
# ratings_df.show(5)

Order dataframe by timestamp


In [None]:
# ratings_df.orderBy('timestamp', ascending=True).show()

In [None]:
# ratings_pd_df.isna().sum()

In [7]:
ratings_df    = ratings_df.orderBy('timestamp')
ratings_pd_df = ratings_df.toPandas()

n_rows = ratings_pd_df.shape[0] # int(n_rows*.8)

train = ratings_pd_df.head(int(n_rows*.8))
test  = ratings_pd_df.tail(int(n_rows*.2))

sp_train = spark.createDataFrame(train.drop('timestamp', axis=1))
sp_test  = spark.createDataFrame(test.drop('timestamp', axis=1))

In [8]:
req_pd = pd.read_csv('data/requests.csv')

In [9]:
sp_req = spark.createDataFrame(req_pd)

req_pd = pd.read_csv('data/requests.csv.csv')

In [None]:
# Check to see our split worked
sp_train.count(), sp_test.count()

In [10]:
als_model = ALS(userCol='user',
                itemCol='movie',
                ratingCol='rating',
                nonnegative=True,
                regParam=0.01,
                maxIter=20,
                rank=10,
                coldStartStrategy = "nan"
               )

In [12]:
recommender = als_model.fit(sp_train)

In [13]:
# Make predictions for the whole test set
prediction_sp = recommender.transform(sp_req)

In [14]:
prediction_sp.show(5)

+----+-----+----------+
|user|movie|prediction|
+----+-----+----------+
|  53|  148|       NaN|
|4169|  148| 3.0860252|
|5333|  148| 2.2447586|
|4387|  148|  2.671124|
| 840|  148|       NaN|
+----+-----+----------+
only showing top 5 rows



## Evaluation

Time to evaluate our model. We'll calculate the RMSE of our predicted ratings and also look at a violin plot of true ratings (x-axis) vs the predicted ratings (y-axis).

In [None]:
# Dump the predictions to Pandas DataFrames to make our final calculations easier
# predictions_df = predictions.toPandas()
# train_df = sp_train.toPandas()

In [None]:
# predictions_pd.shape

# predictions_pd.isna().sum()

# predictions_pd.isna().count()

In [15]:
predictions_pd = prediction_sp.toPandas()

In [None]:
# train.drop('timestamp', axis=1, inplace=True)

In [16]:
# Fill any missing values with the mean rating
# There are multiple things you could fill with, this is just one example
predictions_pd = predictions_pd.fillna(4.5)

In [None]:
mask = predictions_pd['rating'] > 3


In [None]:
mask.sum()

In [None]:
predictions_pd[['rating','prediction']][mask] 

In [None]:
# predictions_pd['squared_error'] = (predictions_pd['rating'][mask] - predictions_pd['prediction'][mask])**2

In [None]:
predictions_pd['sq_err_45'] = (predictions_pd['rating'][mask] - predictions_pd['prediction'][mask])**2

In [None]:
predictions_pd.describe().T

In [None]:
predictions_pd.fillna(0, inplace=True)

# Calculate RMSE
np.sqrt(sum(predictions_pd['sq_err_45']) / mask.sum())

In [19]:
predictions_pd.to_csv('data/pred1.csv', sep=',', index=False)

1.4129060095515595

als_model = ALS(nonnegative=True, regParam=0.01, maxIter=20,
                rank=10, coldStartStrategy = "nan")
                
2.5177126250676602

als_model = ALS(nonnegative=True, regParam=0.01, maxIter=15,
                rank=10, coldStartStrategy = "nan")

In [None]:
# Create array of predictions for violinplot
data = [predictions_pd['prediction'][predictions_pd['rating'] == rating] for rating in range(1, 6)]

plt.violinplot(data, range(1,6), showmeans=True)
plt.xlabel('True Ratings')
plt.ylabel('Predicted Ratings')
plt.title('True vs. ALS Recommender Predicted Ratings')
plt.show()

### Option 2: Spark

In [None]:
# Read file into a RDD
rdd = sc.textFile('data/u.data')

In [None]:
# Check to see it is loaded
rdd.take(10)

In [None]:
# Ugly, looks like we need to clean it up a little
# Build a custom function to fix types in the RDD
def casting_function(row):
    user, movie, rating, timestamp = row
    return int(user), int(movie), float(rating)

In [None]:
# Clean up the RDD and drop the timestamp row
clean_rdd = rdd.map(lambda row: row.split('\t')) \
               .map(casting_function) \
               .collect()

In [None]:
# Create a schema for our Spark DataFrame
schema = StructType( [ 
    StructField('user', IntegerType(), True),
    StructField('movie', IntegerType(), True),
    StructField('rating', FloatType(), True)]
)

In [None]:
# Make a Spark DataFrame
ratings_df = spark.createDataFrame(clean_rdd, schema)

In [None]:
# Check to see we built the DataFrame
ratings_df

In [None]:
# Take a look at the entries in the DataFrame
ratings_df.show(5)

In [None]:
# from our modeling block

# # Get the factors for user 1 and movie 100
# user_factor_df = recommender.userFactors.filter('id = 4958')
# item_factor_df = recommender.itemFactors.filter('id = 1924')

# user_factors = user_factor_df.collect()[0]['features']
# item_factors = item_factor_df.collect()[0]['features']

# # Manually (sorta) calculate the predicted rating
# np.dot(user_factors, item_factors)



# # Build a single row DataFrame
# data = [(4958, 1924)]
# columns = ('user', 'movie')
# one_row_spark_df = spark.createDataFrame(data, columns)

# # Check that it worked
# one_row_spark_df.show()

# # Get the recommender's prediction
# recommender.transform(one_row_spark_df).show()

# # Let's take a look all the user factors
# recommender.userFactors.show(2)