# Beer Recommendation ALS Model Fitting

This notebook contains the code to train and save the ALS matrix factorization model, as well as a csv of predicted ratings for the top beers for each user.

## Preamble

In [0]:
# Importing Dependencies 
from  pyspark.sql.types import StructField, StructType, StringType, LongType, FloatType
import pyspark.sql.functions as f 
import pandas as pd
import numpy as np 
import datetime 

# Defining all dbfs file paths
base_dir = 'dbfs:/FileStore/tables'
project_dir = base_dir + '/Capstone'
beer_dir = project_dir + '/beer'

data_dir = beer_dir + '/data'
mf_model_dir = beer_dir + '/beer_mf_model'

rec_dir = beer_dir + '/beer_recs'
exp_rec_dir = rec_dir + '/experienced'
new_rec_dir = rec_dir + '/new'

# Making all requisite directories if needed
for dirname in [project_dir, beer_dir, data_dir, mf_model_dir, rec_dir, exp_rec_dir, new_rec_dir]: 
    try: dbutils.fs.ls(dirname)
    except: dbutils.fs.mkdirs(dirname) 

# Schema For raw beer dataframe 
# Specifies column name, type and whether values are nullable
beerSchema = StructType([
    StructField('brewery_id', LongType(), True), 
    StructField('brewery_name', StringType(), True), 
    StructField('review_time', LongType(), True), 
    StructField('review_overall', FloatType(), True), 
    StructField('review_aroma', FloatType(), True), 
    StructField('review_apperance', FloatType(), True), 
    StructField('review_profilename', StringType(), True), 
    StructField('beer_style', StringType(), True), 
    StructField('review_palate', FloatType(), True), 
    StructField('review_taste', FloatType(), True), 
    StructField('beer_name', StringType(), True), 
    StructField('beer_abv', FloatType(), True), 
    StructField('beer_beerid', LongType(), True)
])

# Checking if file is in correct directory, otherwise moving it 
try: dbutils.fs.ls(data_dir + '/beer_reviews.csv')
except: dbutils.fs.cp(base_dir + '/beer_reviews.csv', data_dir)

# Loading the raw data file 
raw = spark.read.load(path = data_dir + '/beer_reviews.csv', 
                      format='csv', header=True, schema= beerSchema)

## Displaying Missing Values for all columns 

In [0]:
null_ct = [f.sum(f.when(f.col(c).isNull(), 1).otherwise(0)).alias(c) for c in raw.columns]
raw.select(null_ct).show(vertical=True)

-RECORD 0-------------------
 brewery_id         | 0     
 brewery_name       | 15    
 review_time        | 0     
 review_overall     | 0     
 review_aroma       | 0     
 review_apperance   | 0     
 review_profilename | 348   
 beer_style         | 0     
 review_palate      | 0     
 review_taste       | 0     
 beer_name          | 0     
 beer_abv           | 67785 
 beer_beerid        | 0     



## Preprocessing Data

In [0]:
# Selecting requisite columns
# Dropping any missing values in these columns 

taste_revs_raw = raw.select(f.col('review_profilename').alias('username'), 
                            'beer_beerid', 'review_taste', 'review_time')\
    .na.drop()

# Checking for duplicated reviews
n_dups = taste_revs_raw.groupBy(["username", "beer_beerid"]).count().filter("count > 1").count()
print(f'''
Number of Duplicated Reviews: {n_dups}
(Instance of the same person reviewing the same beer more than once.)
''' )

# Aggregating Duplicate Reviews 
taste_revs_raw = taste_revs_raw.groupBy(['username', 'beer_beerid'])\
        .agg(f.mean(f.col('review_taste')).alias('review_taste'), 
             f.mean(f.col('review_time')).alias('review_time'))



Number of Duplicated Reviews: 14274
(Instance of the same person reviewing the same beer more than once.)



In [0]:
# Calculating Time Cutoff for 80-20 split 
time_cutoff = taste_revs_raw.select(f.percentile(f.col('review_time'), 0.8).alias('percentile')).first()['percentile']
# Conducting train-validate split 
train_raw = taste_revs_raw.filter(f.col('review_time') <= time_cutoff)
val_raw = taste_revs_raw.subtract(train_raw)


In [0]:
# Displaying cutoff date
print(datetime.date.fromtimestamp(time_cutoff))

2011-01-27


In [0]:
train_raw = train_raw.select('username', 'beer_beerid', 'review_taste')
val_raw = val_raw.select('username',  'beer_beerid', 'review_taste')

In [0]:
### Setting the Min Reviews per Beer and Min positive reviews per User 
# Min positive user reviews
minRevsPerUser = 3
# Min Reviews per Beer 
minRevsPerBeer = 50

# Getting usernames of experienced users (at or above cutoff)
expUsers = train_raw.filter(f.col('review_taste') >= 3).groupBy('username').count()\
    .filter(f.col('count') >= minRevsPerUser).select('username')

# Getting IDs of beers above review cutoffs
commonBeers = train_raw.groupBy('beer_beerid').count()\
    .filter(f.col('count') > minRevsPerBeer).select('beer_beerid')

# Removing beers and users through broadcast join (most efficient way)
als_train = train_raw\
    .join(f.broadcast(commonBeers), 'beer_beerid', 'left_semi')\
        .join(f.broadcast(expUsers), 'username', 'left_semi')
als_train = als_train.persist() # persisting DFs to memory saves having to recalculate them

# Only keep validation set where beer and username are in the training set 
als_val = val_raw\
    .join(f.broadcast(als_train.select('beer_beerid')), 'beer_beerid', 'left_semi')\
        .join(f.broadcast(als_train.select('username')), 'username', 'left_semi')
als_val = als_val.persist()

In [0]:
# Displaying user, beer and review count (slow)
print('Training Set Counts:')
print(f'Number of Users: {als_train.select("username").distinct().count()}')
print(f'Number of Beers: {als_train.select("beer_beerid").distinct().count()}')
print(f'Number of Reviews: {als_train.count()}')

Training Set Counts:
Number of Users: 14662
Number of Beers: 4189
Number of Reviews: 974199


In [0]:
# Displaying user, beer and review count (slow)
print('Validation Set Counts')
print(f'Number of Users: {als_val.select("username").distinct().count()}')
print(f'Number of Beers: {als_val.select("beer_beerid").distinct().count()}')
print(f'Number of Reviews: {als_val.count()}')

Validation Set Counts
Number of Users: 4125
Number of Beers: 3870
Number of Reviews: 127716


## Fitting and Saving ALS Model

In [0]:
# Importing dependencies 
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml.pipeline import Pipeline 
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALSModel

# Adding ID Numbers for each username 
# ALS model prefers numeric column for User column 
strIndex = StringIndexer(inputCol = 'username', outputCol = 'user_id')
str_idxer = strIndex.fit(als_train)

fin_als_train = str_idxer.transform(als_train).persist()
fin_als_val = str_idxer.transform(als_val).persist()

als_val = als_val.unpersist()
als_train = als_val.unpersist()

# Retrain or Load Model
retrain = False 

if retrain: 
        
        ### Code to perform grid search and/or cross validation, saved here for illustration 
        ### very slow so commented out 

        # als = ALS(userCol = 'user_id',
        #         itemCol = 'beer_beerid',
        #         ratingCol = 'review_taste',
        #         coldStartStrategy = 'drop' ,
        #         maxIter=20,
        #         nonnegative=True)

        # paramGrid = ParamGridBuilder()\
        #         .addGrid(als.rank, [10, 15, 20])\
        #         .addGrid(als.regParam, [0.5, 0.1, 0.15])\
        #         .build()

        # evaluator = RegressionEvaluator(metricName='rmse', 
        #                                 labelCol='review_taste', 
        #                                 predictionCol='prediction')

        # cv = CrossValidator(
        #         estimator=als,
        #         estimatorParamMaps=paramGrid,
        #         evaluator=evaluator,
        #         parallelism=10,
        #         numFolds=3)

        # cv_model = cv.fit(fin_als_train)

        # best_model = cv_model.bestModel
        # print('rank: ', best_model.rank)
        # print('RegParam: ', best_model._java_obj.parent().getRegParam())

        # sorted(cv_model.avgMetrics, reverse=False)

        # Fitting ALS model using 15 latent factors and a regularization of 0.1
        als = ALS(
                # Hyperparameters
                rank = 15,      #best_model.rank, 
                regParam = 0.1, #best_model._java_obj.parent().getRegParam(), 
                # Columns from training data 
                userCol = 'user_id',
                itemCol = 'beer_beerid',
                ratingCol = 'review_taste',
                coldStartStrategy='drop',
                maxIter=20,
                # No prediction should be negative
                nonnegative=True
                )

        # Fit the model and write to dbfs
        model = als.fit(fin_als_train)
        model.write().overwrite().save(mf_model_dir)
else: 
        # Load the model so it doesn't have to refit
        model = ALSModel.load(mf_model_dir)

## Evaluate Fitted ALS Model by RMSE

In [0]:
evaluator = RegressionEvaluator(metricName='rmse', 
                                labelCol='review_taste', 
                                predictionCol='prediction')

trainPreds = model.transform(fin_als_train)
train_rmse = evaluator.evaluate(trainPreds)

valPreds = model.transform(fin_als_val)
val_rmse = evaluator.evaluate(valPreds)

print(f'Training RMSE: {round(train_rmse, 4)}')
print(f'Validation RMSE: {round(val_rmse, 4)}')

Training RMSE: 0.5277
Validation RMSE: 0.5329


## Save Recommendations for Experienced Users

In [0]:
# DF mapping of internal ID number 
user_map = fin_als_train.select('username', 'user_id').distinct()

# Make Recommendations 
all_recs = model.recommendForAllUsers(10)

# Add username to recommendations 
all_recs = all_recs.join(f.broadcast(user_map), 'user_id', 'inner')

# Explode predictions column to get one prediction per line 
# Select and rename needed columns 
all_recs = all_recs.select('username', f.explode('recommendations').alias('recs'))\
        .select(f.lower(f.col('username')).alias('username'), 
                'recs.beer_beerid', 
                f.col('recs.rating').alias('predRating'))

# Add beer and brewery names to dataframe
all_recs = all_recs.join(
    f.broadcast(raw.select('beer_beerid', 'brewery_name', 'beer_name').distinct()), 'beer_beerid', 'left_outer')\

# Write the recommendations to csv for future queries
all_recs.write.option("header",True)\
    .format("csv") \
        .mode("overwrite") \
            .save(exp_rec_dir + "/exp_user_recommendations.csv")


## Display Examples of Predicted Ratings CSV

Shows 20 examples of predicted ratings for users. These are from each users top 10 predicted ratings, includes the brewery name, beer name, the username and the beer ID. This is all the requisite information for querying recommendations by username.

In [0]:
# Load ratings (faster than displaying from memory because of pyspark lazy evaluation)
saved_recs = spark.read.load(path = exp_rec_dir + '/exp_user_recommendations.csv', 
                      format='csv', header=True)
# Shuffle and display 20 examples 
saved_recs.orderBy(f.rand()).display()

beer_beerid,username,predRating,brewery_name,beer_name
41928,ddean,4.692143,Russian River Brewing Company,Deviation - Bottleworks 9th Anniversary
42664,jalfpsu,5.3495865,Närke Kulturbryggeri AB,Kaggen! Stormaktsporter
41928,kevinphish,4.6472697,Russian River Brewing Company,Deviation - Bottleworks 9th Anniversary
47658,jwhite1979,4.7570186,Founders Brewing Company,Founders CBS Imperial Stout
45957,nrrd,4.9891343,The Lost Abbey,Veritas 004
37114,trevorgw,4.663975,North Coast Brewing Co.,Old Stock Cellar Reserve (Aged In Brandy Barrels)
42664,jibbyvonjibb,4.570468,Närke Kulturbryggeri AB,Kaggen! Stormaktsporter
47658,jkaufman,4.5515723,Founders Brewing Company,Founders CBS Imperial Stout
1545,herrfaust,5.082619,Brouwerij Westvleteren (Sint-Sixtusabdij van Westvleteren),Trappist Westvleteren 12
42349,wrx2ndregime,5.413293,Three Floyds Brewing Co. & Brewpub,Vanilla Bean Aged Dark Lord
