Step 1: Prior to uploading the data into pyspark, load the beerreviews.csv dataset into S3 bucket.  The csv can be downloaded from: https://www.kaggle.com/rdoume/beerreviews#__sid=js0

In [31]:
# Define a manual schema
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType,DoubleType, DateType

manual_schema = StructType([
    StructField("brewery_id", IntegerType(), False),
    StructField("brewery_name", StringType(), False),
    StructField("review_time", IntegerType(), False),
    StructField("review_overall", DoubleType(), False),
    StructField("review_aroma", DoubleType(), False),
    StructField("review_appearance", DoubleType(), False),
    StructField("review_profilename", StringType(), False),
    StructField("beer_style", StringType(), False),
    StructField("review_palate", DoubleType(), False),
    StructField("review_taste", DoubleType(), False),
    StructField("beer_name", StringType(), False),
    StructField("beer_abv", DoubleType(), False),
    StructField("beer_beerid", IntegerType(), False)])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
# read in the data from S3 bucket
beer = spark.read.format("csv").schema(manual_schema).option("header", "true").csv('s3://mboken-bigdata/beerreviews/beer_reviews.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
# check that the schema was successful
beer.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- brewery_id: integer (nullable = true)
 |-- brewery_name: string (nullable = true)
 |-- review_time: integer (nullable = true)
 |-- review_overall: double (nullable = true)
 |-- review_aroma: double (nullable = true)
 |-- review_appearance: double (nullable = true)
 |-- review_profilename: string (nullable = true)
 |-- beer_style: string (nullable = true)
 |-- review_palate: double (nullable = true)
 |-- review_taste: double (nullable = true)
 |-- beer_name: string (nullable = true)
 |-- beer_abv: double (nullable = true)
 |-- beer_beerid: integer (nullable = true)

In [34]:
# check out the first few rows
beer.take(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(brewery_id=10325, brewery_name='Vecchio Birraio', review_time=1234817823, review_overall=1.5, review_aroma=2.0, review_appearance=2.5, review_profilename='stcules', beer_style='Hefeweizen', review_palate=1.5, review_taste=1.5, beer_name='Sausa Weizen', beer_abv=5.0, beer_beerid=47986), Row(brewery_id=10325, brewery_name='Vecchio Birraio', review_time=1235915097, review_overall=3.0, review_aroma=2.5, review_appearance=3.0, review_profilename='stcules', beer_style='English Strong Ale', review_palate=3.0, review_taste=3.0, beer_name='Red Moon', beer_abv=6.2, beer_beerid=48213), Row(brewery_id=10325, brewery_name='Vecchio Birraio', review_time=1235916604, review_overall=3.0, review_aroma=2.5, review_appearance=3.0, review_profilename='stcules', beer_style='Foreign / Export Stout', review_palate=3.0, review_taste=3.0, beer_name='Black Horse Black Beer', beer_abv=6.5, beer_beerid=48215), Row(brewery_id=10325, brewery_name='Vecchio Birraio', review_time=1234725145, review_overall=3.0, re

In [8]:
# find number of partitions
beer.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2

In [None]:
# possibly partition on a specific column?
# put that here

In [9]:
# get summary statistics for some of the numerical variables
dscr_cols = ['review_aroma', 'review_appearance', 'review_palate', 'review_taste', 'beer_abv']
beer.select(dscr_cols).describe().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+------------------+------------------+------------------+------------------+
|summary|      review_aroma| review_appearance|     review_palate|      review_taste|          beer_abv|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|           1586614|           1586614|           1586614|           1586614|           1518829|
|   mean| 3.735636077836197|3.8416416973504584|3.7437013665579655| 3.792860456292457| 7.042386753219579|
| stddev|0.6976167287896318|0.6160927688920651|0.6822183633739453|0.7319696098919132|2.3225259927418085|
|    min|               1.0|               0.0|               1.0|               1.0|              0.01|
|    max|               5.0|               5.0|               5.0|               5.0|              57.7|
+-------+------------------+------------------+------------------+------------------+------------------+

In [10]:
# use Spark SQL to generate insights
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# read files into Spark SQL table
beer.registerTempTable("beer")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# TODO: input queries here

In [None]:
# TODO: visualizations

To be able to use Python libraries like Seaborn, type the following command into the shell:
sudo python3.7 -m pip install seaborn

In [35]:
# recommendation engine using ALS
# https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS
from pyspark.ml.recommendation import ALS

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
# ALS requires that the userCol be an integer, but we currently have unique identifiers that are strings.
# solution: use string indexer from spark mllib to create a unique id for each user name
# https://spark.apache.org/docs/latest/ml-features.html#stringindexer 
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="review_profilename", outputCol="user_id")
# set setHandleInvalid to skip to avoid errors for nulls
beer = indexer.setHandleInvalid("skip").fit(beer).transform(beer)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
# split our data into training and test
(training, test) = beer.randomSplit([0.8, 0.2])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [73]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

als = ALS(
    # the users are represented by the column review_profilename
         userCol="user_id", 
    # the items we are recommending are in the beer_name column
         itemCol="beer_beerid",
    # we will use the overall review to make the predictions
         ratingCol="review_overall", 
    # the preferences are explicit (i.e. a user explictly rated beer X as Y)
         implicitPrefs = False, 
    # drop rows with NA values
    coldStartStrategy="drop")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
# define evaluator as RMSE
# we want to use RMSE to compare our predicted ratings for a user to their actual rating

evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="review_overall", 
           predictionCol="prediction")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [75]:
# fit the model to the training data
model = als.fit(training)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [76]:
# use our model to  make predictions for the test data
test_predictions = model.transform(test)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
# let's take a look at some of the predictions our model made
test_predictions.select(['review_profilename', 'review_overall', 'beer_name', 'prediction']).show(n=10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------+--------------------+----------+
|review_profilename|review_overall|           beer_name|prediction|
+------------------+--------------+--------------------+----------+
|           Rhynes2|           4.0|     Pilsner Urquell| 3.3896742|
|         iloveyani|           2.5|         Purple Haze|  3.291819|
|        CortexBomb|           3.5|Bert Grant's Impe...| 3.5015614|
|        CortexBomb|           4.5|   Widmer Hefeweizen| 3.0609984|
|            Buebie|           4.5|    Drifter Pale Ale| 3.7207134|
|         beertunes|           4.0|     Pitch Black IPA|  3.687558|
|            Buebie|           4.5|      Drop Top Amber| 3.3385055|
|         beertunes|           3.0|Barrel Aged Brrrb...| 3.1888351|
|         beertunes|           4.0|W'11 (KGB Imperia...| 3.5367568|
|          BeefyMee|           1.5|     La Fin Du Monde| 3.9558308|
+------------------+--------------+--------------------+----------+
only showing top 10 rows

In [79]:
# it appears that the predictions are not too far from the true rating.  We can calculate the RMSE for the test data
rmse = evaluator.evaluate(test_predictions)
print('Root-mean-square error = ' + str(rmse))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Root-mean-square error = 0.549497728214305

The RMSE for this model is 0.554, meaning that on average, the difference between the actual rating and the prediction is 0.5

In [78]:
# now we can use the model to make recommendations for each user.  We can start by making 3 recommendations for each user
recommendations = model.recommendForAllUsers(numItems = 3)
recommendations.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|     12|[[32744, 4.999482...|
|     65|[[51494, 4.993389...|
|     76|[[51494, 5.148505...|
|     81|[[51494, 5.184389...|
|    122|[[32744, 5.090419...|
+-------+--------------------+
only showing top 5 rows

In [81]:
# these ratings are not interpretable.  Let's convert it into an interpretable format
# https://towardsdatascience.com/build-recommendation-system-with-pyspark-using-alternating-least-squares-als-matrix-factorisation-ebe1ad2e7679
from pyspark.sql.functions import explode, col

recommendations = recommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('user_id', col("rec_exp.beer_beerid"), col("rec_exp.rating"))

recommendations.limit(10).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+---------+
|user_id|beer_beerid|   rating|
+-------+-----------+---------+
|      1|      51494|5.2669983|
|      1|      32744|5.1986384|
|      1|      42941| 5.146857|
|      3|      51494| 5.119087|
|      3|      76583| 4.885499|
|      3|      32744|4.8575177|
|      6|      32744| 4.863537|
|      6|      47661|  4.83607|
|      6|      42941|  4.83607|
|     64|      35361| 5.014617|
+-------+-----------+---------+

In [109]:
# register recommendations as a table
recommendations.registerTempTable("recommendations")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# try joining recommendations with predictions to get the user name back

In [119]:
# check the recommendations for a user to make sure that they make sense
recommendations.where(recommendations.user_id == 1).select("user_id", "beer_beerid", "rating").collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(user_id=1, beer_beerid=51494, rating=5.266998291015625), Row(user_id=1, beer_beerid=32744, rating=5.198638439178467), Row(user_id=1, beer_beerid=42941, rating=5.146856784820557)]

In [111]:
# let's compare the user's recommended beers to the beers they actually rated the highest
user1_preferences = sqlContext.sql("SELECT DISTINCT b.review_profilename, b.review_overall, b.brewery_name, b.beer_name FROM recommendations AS r JOIN beer AS b ON r.beer_beerid = b.beer_beerid WHERE review_profilename = 'beertunes' ")
user1.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+---------+--------------------+--------------------+
|review_profilename|   rating|        brewery_name|           beer_name|
+------------------+---------+--------------------+--------------------+
|         beertunes|5.9246755|Alpine Brewing Co...|              Märzen|
|         beertunes|5.5589724|      Big Al Brewing|        Summer Lager|
|         beertunes|3.9205563|Anacortes Brewery...|        Belgian Sour|
|         beertunes|2.3909724|Boundary Bay Brew...|Oak Aged Cabin Fe...|
|         beertunes|7.3559165|      Big Al Brewing|        Summer Lager|
|         beertunes| 5.201452|Anacortes Brewery...|Anacortes Hefewezien|
|         beertunes|5.8828206|Alpine Brewing Co...|              Märzen|
|         beertunes| 6.328036|Anacortes Brewery...|Anacortes Hefewezien|
|         beertunes|1.3913641|Anacortes Brewery...|        Belgian Sour|
|         beertunes|3.5028129|Boundary Bay Brew...|Oak Aged Cabin Fe...|
|         beertunes|5.6839643|Anacortes Brewery...|