# Assignment 5 - Spark 

For this exercise, we'll use Spark on an AWS EC2 machine. 

### AWS Machine Specs 
The machine is a T2 Extra Large machine with fore cores and 16GB of RAM.

### Spark Setup
We'll use PySpark with two partitions. We are running four cores, and Spark with use these to run parallel jobs

### Objective
The objective of this exercise is to compare an Apache Spark distributed setup with my local computer and compare efficiency. We'll compare the PySpark model against our trusty Surprise Package. Let's once again use the [Beer Recommender](https://github.com/pburkard88/DS_BOS_06/tree/master/Data) data.

### Import Modules and Libraries for Use

In [1]:
import warnings
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
import requests, os, sys, zipfile, io
from urllib.request import urlopen
from io import BytesIO
from zipfile import ZipFile
import os
import pyspark
warnings.filterwarnings('ignore')
%matplotlib inline
import json
import surprise

## Set Up Spark

### Open Secrets File to Get Path to Spark Master 

(I don't want my internal IP in the public domain)

In [2]:
with open('data/secrets.json') as f:
    pyspark_config = json.load(f)

### Create Spark Session with Link to Master

In [3]:
spark = pyspark.sql.SparkSession.builder.master(pyspark_config['master']).getOrCreate()

### Get Default Partitions

In [4]:
spark.sparkContext.defaultMinPartitions

2

### Get Default Parallelism (since we have four cores)

We can see that spark has automatically detected that we can run four parallel operations

In [5]:
spark.sparkContext.defaultParallelism

4

### Get Data and Import into Spark RDD format

In [6]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

ratings = spark.read.csv('data/beer_reviews.csv',
                        sep=',',
                        inferSchema=True,
                        header=True)
                         
                         

for i in ['reviewer_id', 
         'beer_beerid']:
         ratings = ratings.withColumn(i, ratings[i].cast('int'))
    
ratings = ratings.withColumn("review_overall", ratings["review_overall"].cast('float'))

ratings = ratings[['reviewer_id',
                   'beer_beerid',
                   'review_overall']]

### Create the Spark ALS instance

In [7]:
als = ALS(userCol='reviewer_id',
          itemCol='beer_beerid',
          ratingCol='review_overall',
          nonnegative=True,
          rank=20,
          seed=42)

### Split Data for Training

In [8]:
train, test = ratings.randomSplit([0.7, 0.3], seed=42)

### Let's check the fit time of ALS to compare later with non-distributed computing

In [9]:
# fit the model
% timeit als_model = als.fit(dataset=train)

1 loop, best of 3: 11.8 s per loop


#### The model took 12s to fit with 20 weights. Let's remember that for later on!

In [10]:
# make predictions on the test set
als_model = als.fit(train)
als_pred = als_model.transform(test)

In [11]:
als_pred = als_pred.withColumn('diff_sq', (als_pred['review_overall'] - als_pred['prediction'])**2)

### Calculate RMSE for the fitted model

In [12]:
from pyspark.sql import functions as func

# the aggregate function can be created outside of the dataframe if desired
rms_calc = func.sqrt(func.mean('diff_sq'))

als_pred.dropna().select(rms_calc.alias('rmse') ).show()

+------------------+
|              rmse|
+------------------+
|0.6224995404916362|
+------------------+



### Get the Best Model with Gridsearch

In [13]:
# The ALS instance
als = ALS(userCol='reviewer_id',
          itemCol='beer_beerid',
          ratingCol='review_overall',
          nonnegative=True,
          seed=42)

# The parameter grid to search

als_paramgrid = (ParamGridBuilder()
                 .addGrid(als.rank, [10, 20])
                 .addGrid(als.maxIter, [10])
                 .addGrid(als.regParam, [0.1,.5])
                 .addGrid(als.alpha, [0.5, 1.0])
                 .build())

# The evaluation function for determining the best model
rmse_eval = RegressionEvaluator(labelCol='review_overall',
                                predictionCol='prediction', 
                                metricName='rmse')

# The cross validation instance
cv = CrossValidator(estimator=als,
                    parallelism=4,
                    collectSubModels=True,
                    estimatorParamMaps=als_paramgrid,
                    evaluator=rmse_eval,
                    numFolds=3, 
                    seed=42)

# Fit the models and find the best one!
als_cv = cv.fit(train.dropna())

### Find the best model and Get Rank

In [31]:
als_best = als_cv.bestModel
als_best.rank

10

### Get Best Regularization Param

In [30]:
als.getRegParam()

0.1

### Do a Visual Comparison of Predictions on Testset

In [15]:
als_pred_best = als_best.transform(test)
als_pred_best_df = als_pred_best.dropna().toPandas()

In [16]:
als_pred_best_df.head()

Unnamed: 0,reviewer_id,beer_beerid,review_overall,prediction
0,4219,148,4.0,3.867328
1,16916,148,3.5,3.472813
2,20596,148,4.0,4.213685
3,23607,148,4.0,3.851794
4,6482,148,3.5,3.817792


In [17]:
als_pred_best_df.tail()

Unnamed: 0,reviewer_id,beer_beerid,review_overall,prediction
461806,16338,75739,4.0,3.27044
461807,28855,75739,4.0,3.184555
461808,8430,75739,3.0,2.967432
461809,16894,75999,4.5,3.278557
461810,12378,76694,3.5,2.828838


#### The predictions don't seem too bad!

### Get RMSE on Best Model

In [18]:
als_pred_best = als_pred_best.withColumn('diff_sq', (als_pred_best['review_overall'] - als_pred_best['prediction'])**2)
als_pred_best.dropna().select(rms_calc.alias('rmse') ).show()

+------------------+
|              rmse|
+------------------+
|0.6234894689615749|
+------------------+



We can see that the best RMSE is almost identical to our default model

### Get Recommended for All Beers

(This'll require some preprocessing)

In [19]:
# Recommend top five users
recommendations_df = als_best.recommendForAllItems(5).toPandas()

# Get Ids for users and ingore values
recommendations_df['recommendations'] = recommendations_df['recommendations'].apply(lambda x: [y[0] for y in x])

# Get column for each user
recommendations_df = recommendations_df[['beer_beerid']].join(
                     recommendations_df['recommendations'].apply(pd.Series))

# turn dataframe into long form
recommendations_df = pd.melt(recommendations_df,
                             id_vars='beer_beerid',
                             var_name='rec_index', 
                             value_name='Recommended User Ids'
                             ).drop(labels=['rec_index'],
                                    axis=1)

# Get dataframes for mapping ids to names
beer_df = pd.read_csv('data/beer_reviews.csv')[['beer_beerid',
                                                'beer_name']].drop_duplicates().set_index('beer_beerid')
user_df = pd.read_csv('data/beer_reviews.csv')[['reviewer_id',
                                                'review_profilename']].drop_duplicates().set_index('reviewer_id')

### Get dictionaries mapping Ids to names
user_dict={}
beer_dict={}
for i,j in beer_df.iterrows():
    beer_dict[i] = j['beer_name']
for i,j in user_df.iterrows():    
    user_dict[i] = j['review_profilename']
    
    
# map!
recommendations_df['Recommended Users'] = recommendations_df['Recommended User Ids'].map(user_dict)
recommendations_df['Beer'] = recommendations_df['beer_beerid'].map(beer_dict)

### View Dataframe

In [20]:
recommendations_df.head(10)

Unnamed: 0,beer_beerid,Recommended User Ids,Recommended Users,Beer
0,148,8328,MichiganMike,Brooklyn Lager
1,463,17643,crbauman,Oregon Honey Beer
2,833,22084,jams7611,Samuel Adams Pale Ale
3,1088,22084,jams7611,Old Whiskers Hefeweizen
4,1238,22084,jams7611,Pale Ale
5,1580,22084,jams7611,Eroica Ale
6,1591,22084,jams7611,Winter Ale
7,1645,29016,rumguzzler,Bruegel Amber Ale
8,1959,22084,jams7611,Farmhouse Summer Ale
9,2122,4590,Feliks,Blue Ridge Subliminator Dopplebock


In [21]:
# Delete the dataframe to save memory
del recommendations_df

## Compare Spark ALS with Surprise Package

Let's comapre Spark's computing efficiency with the Surprise Package's speed.

### Using the Surprise Model's Baseline Model to Fit the Data

In [24]:
# Read in data
data = pd.read_csv('data/beer_reviews.csv')

In [25]:
# Turn dataframe into model readable by Surprise PAckage
reader = surprise.Reader(rating_scale=(0, 5))
data_surprise = surprise.Dataset.load_from_df(data[['reviewer_id',
                                                    'beer_beerid',
                                                    'review_overall']], reader)

### Create Test Train split and Run Baseline Model

In [26]:
trainset, testset = surprise.model_selection.train_test_split(data_surprise, test_size=.3)

bsl_options = {'method': 'als',
               'n_epochs': 10,
               'reg_u': 12,
               'reg_i': 5
               }
algo = surprise.BaselineOnly(bsl_options=bsl_options)

# Train the algorithm on the trainset, and predict ratings for the testset
% timeit algo.fit(trainset)

Estimating biases using als...
Estimating biases using als...
Estimating biases using als...
Estimating biases using als...
1 loop, best of 3: 3.41 s per loop


#### We can see that the baseline model took a very short time to run. This is not surprising (no pun intended), given that we are not doing any factorization. Let's now do the actual factorization!

In [27]:
algo = surprise.SVD(n_factors=20,
                    n_epochs=10)

# Train the algorithm on the trainset, and predict ratings for the testset
% timeit algo.fit(trainset)

1 loop, best of 3: 18.9 s per loop


#### We can see that the Spark distributed computing model (12 seconds) outperforms the Surprise package (19 seocnds) thanks to parallel processing. However, the gains are small, so we may disregard it even for a dataset for this size.

## Summary

#### We did not use a super massive dataset (1.5 million rows), so the incremental benefit of using Spark was not terribly visible. The biggest benefit came from using the AWS EC2 instance. Perhaps if we had a dataset with 15-20 million rows, distributed computing would have been more beneficial.