# Recommender System

In [1]:
import pandas as pd
import matplotlib.pyplot as plt

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer

spark = SparkSession.builder.appName('red').getOrCreate()

## Read in 12 Dataset of 21.78GB

In [3]:
schema = StructType([ \
    StructField("marketplace",       StringType(),    True), \
    StructField("customer_id",       StringType(),    True), \
    StructField("review_id",         StringType(),    True), \
    StructField("product_id",        StringType(),    True), \
    StructField("product_parent",    StringType(),    True), \
    StructField("product_title",     StringType(),    True), \
    StructField("product_category",  StringType(),    True), \
    StructField("star_rating",       IntegerType(),   True), \
    StructField("helpful_votes",     IntegerType(),   True), \
    StructField("total_votes",       IntegerType(),   True), \
    StructField("vine",              StringType(),    True), \
    StructField("verified_purchase", StringType(),    True), \
    StructField("review_headline",   StringType(),    True), \
    StructField("review_body",       StringType(),    True), \
    StructField("review_date",       TimestampType(), True), \
  ])

path = ['archive/amazon_reviews_us_Apparel_v1_00.tsv',
        'archive/amazon_reviews_us_Automotive_v1_00.tsv',
        'archive/amazon_reviews_us_Baby_v1_00.tsv',
        'archive/amazon_reviews_us_Beauty_v1_00.tsv',
        'archive/amazon_reviews_us_Books_v1_02.tsv',
        'archive/amazon_reviews_us_Camera_v1_00.tsv',
        'archive/amazon_reviews_us_Electronics_v1_00.tsv',
        'archive/amazon_reviews_us_Furniture_v1_00.tsv',
        'archive/amazon_reviews_us_Sports_v1_00.tsv',
        'archive/amazon_reviews_us_Grocery_v1_00.tsv',
        'archive/amazon_reviews_us_Personal_Care_Appliances_v1_00.tsv',
        'archive/amazon_reviews_us_Music_v1_00.tsv']

data = spark.read.csv(path, schema=schema, header=True, sep='\t', mode='DROPMALFORMED')

## Filter out non-product records in product_category column

In [4]:
product_category = ['Sports', 'Baby', 'Apparel', 'Grocery', 'Electronics', 'Automotive', 'Books', 'Music', 'Furniture', 'Personal_Care_Appliances', 'Camera', 'Beauty']
data_filter = data.filter(data.product_category.isin(product_category))

## After filtering out, how many recoreds remaining

In [5]:
data_filter.count()

37172391

## Find out top 20 customers posting the most reviews

In [6]:
customer_count = data_filter.groupBy('customer_id').count()
top20 = customer_count.sort(col('count').desc()).head(20)
top20_l = list(map(lambda x : x[0], top20))

In [7]:
top20_l

['50122160',
 '50732546',
 '50736950',
 '38214553',
 '51184997',
 '18116317',
 '23267387',
 '50345651',
 '52496677',
 '14539589',
 '15725862',
 '50913245',
 '50881246',
 '20018062',
 '22073263',
 '52615377',
 '19380211',
 '51381678',
 '37455882',
 '50441674']

## Find the top 20 customers whose average ratings are 5 stars

In [6]:
customer_count = data_filter.groupBy('customer_id').agg({'star_rating':'avg', 'customer_id':'count'})

+-----------+------------------+------------------+
|customer_id|  avg(star_rating)|count(customer_id)|
+-----------+------------------+------------------+
|   42948925|               5.0|                 2|
|   39105436|               5.0|                 5|
|   49344407| 4.253164556962025|               158|
|   52857386|               3.5|                 8|
|   22197844|               4.0|                 1|
|    9991894|               5.0|                 1|
|   52867206| 2.911111111111111|                45|
|   25046820|               2.5|                 2|
|   47619131|              4.75|                 4|
|   37124998| 4.714285714285714|                 7|
|   38417982|               5.0|                 1|
|    7559816|3.7142857142857144|                 7|
|   12904029| 4.420289855072464|                69|
|   21012418| 4.398095238095238|               525|
|    7038490| 4.142857142857143|                14|
|   36125906|               3.0|                 1|
|   23877395

In [7]:
customer_5 = customer_count.filter('avg(star_rating) > 4.8').sort(col('count(customer_id)').desc())
customer_5.show()

+-----------+------------------+------------------+
|customer_id|  avg(star_rating)|count(customer_id)|
+-----------+------------------+------------------+
|   50122160|4.9980296448882715|             22331|
|   50345651| 4.980478917230609|              3842|
|   52496677| 4.812853812853813|              3003|
|   14539589|4.9883281840027465|              2913|
|   20018062|4.9157158630842845|              2717|
|   50199793| 4.847798204360838|              2339|
|   53037408| 4.920921825576141|              2213|
|   50776149|4.9949972206781545|              1799|
|   20595117| 4.813236118900729|              1783|
|   39569598| 4.843078758949881|              1676|
|   15536614| 4.998778998778999|              1638|
|    7080939|               5.0|              1530|
|   34247947| 4.861070911722142|              1382|
|   12201275| 4.938851603281133|              1341|
|   50205849| 4.801063022019742|              1317|
|   10608826| 4.992773261065944|              1107|
|   40857530

In [8]:
top20_5 = customer_5.head(20)
top20_5l = list(map(lambda x : x[0], top20_5))
top20_5l

['50122160',
 '50345651',
 '52496677',
 '14539589',
 '20018062',
 '50199793',
 '53037408',
 '50776149',
 '20595117',
 '39569598',
 '15536614',
 '7080939',
 '34247947',
 '12201275',
 '50205849',
 '10608826',
 '40857530',
 '35110629',
 '51147790',
 '47883385']

## Find out top customers whose average ratings are 4, 3, 2, 1

In [9]:
customer_4 = customer_count.filter('avg(star_rating) > 3.8 and avg(star_rating) < 4.2').sort(col('count(customer_id)').desc())
top20_4 = customer_4.head(20)
top20_4l = list(map(lambda x : x[0], top20_4))

customer_3 = customer_count.filter('avg(star_rating) > 2.8 and avg(star_rating) < 3.2').sort(col('count(customer_id)').desc())
top20_3 = customer_3.head(20)
top20_3l = list(map(lambda x : x[0], top20_3))

customer_2 = customer_count.filter('avg(star_rating) > 1.8 and avg(star_rating) < 2.2').sort(col('count(customer_id)').desc())
top20_2 = customer_2.head(20)
top20_2l = list(map(lambda x : x[0], top20_2))

customer_1 = customer_count.filter('avg(star_rating) > 0.8 and avg(star_rating) < 1.2').sort(col('count(customer_id)').desc())
top20_1 = customer_1.head(20)
top20_1l = list(map(lambda x : x[0], top20_1))

In [11]:
customer_1.show()

+-----------+------------------+------------------+
|customer_id|  avg(star_rating)|count(customer_id)|
+-----------+------------------+------------------+
|   47382371|1.0229885057471264|               174|
|   37141039|               1.0|               146|
|   48608140|               1.0|                99|
|   14308733|1.0736842105263158|                95|
|   34408569|               1.0|                64|
|   41491450|               1.0|                64|
|   51303174|               1.0|                58|
|    7251578|               1.0|                56|
|   52794733|1.1555555555555554|                45|
|   25951615|1.0476190476190477|                42|
|    2395441|               1.0|                41|
|   28317383|               1.0|                40|
|   17076145|               1.0|                38|
|   49045836| 1.105263157894737|                38|
|   52984709|1.1891891891891893|                37|
|   30295837|               1.0|                36|
|   40930386

## Merge all all customer id list

In [12]:
top20_12345 = top20_1l + top20_2l + top20_3l + top20_4l + top20_5l

## Calculate each customer's the average rating of each product category

In [13]:
avg_c_p = data_filter.filter(data_filter.customer_id.isin(top20_12345)).groupBy('customer_id', 'product_category').mean('star_rating')
avg_c_p.show()

+-----------+----------------+------------------+
|customer_id|product_category|  avg(star_rating)|
+-----------+----------------+------------------+
|   43135541|         Apparel|               1.6|
|   20595117|           Books| 4.805234327449787|
|   48608140|          Sports|               1.0|
|   23267387|           Books|               5.0|
|   38214553|          Sports|               4.0|
|   23267387|          Beauty|               4.0|
|   27852921|           Music|3.1784615384615384|
|   18116317|         Grocery| 4.309523809523809|
|   47667560|         Grocery|1.3333333333333333|
|   52398462|          Sports|               2.0|
|   40140997|            Baby|               4.0|
|   38106163|         Apparel|1.3333333333333333|
|   49335121|           Music|3.6666666666666665|
|   47883385|          Sports|               5.0|
|   38889875|           Music|             3.875|
|   50345651|           Books| 4.886363636363637|
|   45457807|     Electronics|               2.0|


In [14]:
avg_c_p.count()

418

## Convert string type to numeric type and Split data into training and test

In [15]:
avg_c_p = avg_c_p.withColumn("customer_id", avg_c_p["customer_id"].cast(IntegerType()))
indexer = StringIndexer(inputCol="product_category", outputCol="product_id")
cus_prod = indexer.fit(avg_c_p).transform(avg_c_p)

### Retrieve the mapping between index and product category

In [16]:
indexer.fit(avg_c_p).labels

['Music',
 'Books',
 'Electronics',
 'Grocery',
 'Sports',
 'Apparel',
 'Beauty',
 'Camera',
 'Automotive',
 'Baby',
 'Furniture',
 'Personal_Care_Appliances']

### Split the data into training and test

In [17]:
(training, test) = cus_prod.randomSplit([0.8, 0.2])

In [18]:
training.show()

+-----------+--------------------+------------------+----------+
|customer_id|    product_category|  avg(star_rating)|product_id|
+-----------+--------------------+------------------+----------+
|   20595117|               Books| 4.805234327449787|       1.0|
|   48608140|              Sports|               1.0|       4.0|
|   23267387|              Beauty|               4.0|       6.0|
|   38214553|              Sports|               4.0|       4.0|
|   27852921|               Music|3.1784615384615384|       0.0|
|   40140997|                Baby|               4.0|       9.0|
|   47667560|             Grocery|1.3333333333333333|       3.0|
|   52398462|              Sports|               2.0|       4.0|
|   47883385|              Sports|               5.0|       4.0|
|   49335121|               Music|3.6666666666666665|       0.0|
|   50345651|               Books| 4.886363636363637|       1.0|
|   52569116|               Music|               4.2|       0.0|
|    2395441|            

## Build the recommendation model using ALS on the training data

In [19]:
als = ALS(maxIter=5, regParam=0.01, userCol="customer_id", itemCol="product_id", ratingCol="avg(star_rating)", coldStartStrategy="drop")
model = als.fit(training)

## Evaluate the model by computing the RMSE on the test data

In [20]:
predictions = model.transform(test)
predictions = predictions.dropna()
predictions.show()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="product_id",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

+-----------+----------------+------------------+----------+----------+
|customer_id|product_category|  avg(star_rating)|product_id|prediction|
+-----------+----------------+------------------+----------+----------+
|   39569598|           Books| 4.839689265536723|       1.0|  4.802179|
|   23267387|           Books|               5.0|       1.0| 4.0730057|
|   35110629|           Books|               5.0|       1.0|  5.068819|
|   34639163|           Books| 4.368421052631579|       1.0| 4.2211094|
|   52932081|           Books| 3.160621761658031|       1.0| 3.6676617|
|   50503261|           Books| 4.157894736842105|       1.0| 4.9542913|
|   50122160|           Books| 4.997992883860961|       1.0| 4.8402967|
|   44617291|           Books|               4.0|       1.0| 3.8545032|
|   50881246|           Books| 3.958036421219319|       1.0| 3.4014323|
|   52496677|          Beauty| 4.904109589041096|       6.0|  4.394989|
|   35689076|          Beauty|               3.0|       6.0| 2.7

In [21]:
K = 3
userRecs = model.recommendForAllUsers(K)
userRecs.show(userRecs.count(), False)

+-----------+---------------------------------------------------+
|customer_id|recommendations                                    |
+-----------+---------------------------------------------------+
|47667560   |[[3, 1.3322111], [4, 1.2130852], [10, 1.0492013]]  |
|7080939    |[[8, 5.537457], [3, 5.3056493], [1, 5.3042455]]    |
|47375452   |[[0, 2.8856785], [4, 2.8499315], [1, 2.7823439]]   |
|29490909   |[[9, 2.5491278], [8, 2.3140297], [4, 2.0385966]]   |
|52421866   |[[7, 3.860292], [2, 3.041908], [1, 3.0339704]]     |
|37141039   |[[9, 0.9998093], [10, 0.99946404], [8, 0.96031696]]|
|49635896   |[[1, 4.3396564], [4, 3.9630327], [3, 3.949873]]    |
|52496677   |[[8, 5.0212307], [3, 4.998295], [9, 4.984811]]     |
|52932081   |[[4, 4.992174], [3, 4.9425707], [10, 4.3153152]]   |
|41228222   |[[6, 4.994157], [9, 4.100984], [2, 4.0705853]]     |
|2395441    |[[9, 1.089569], [8, 1.0002432], [10, 0.999448]]    |
|51184997   |[[1, 4.9975934], [3, 4.4241796], [4, 4.4059277]]   |
|20781656 