#Customer Segmentation Using MLlib ALS Algorithm

In [2]:
dbutils.fs.help()

In [3]:
automobile_category_df = spark.read.json("/FileStore/tables/cnfldt2b1504923605351/meta_Automotive_json-31ec7.gz")
display(automobile_category_df)

In [4]:
automobile_df = spark.read.json("/FileStore/tables/0cpczmu11504368060244/reviews_Automotive_5_json-afd9e.gz")
print automobile_df.dtypes
automobile_df.show(2)
automobile_df.printSchema()

In [5]:
display(automobile_df.describe())

In [6]:
review_df = automobile_df.select('asin', 'helpful', 'overall', 'reviewerID', 'reviewText', 'unixReviewTime')
review_df = review_df.withColumnRenamed('overall', 'rating').withColumnRenamed('asin', 'product').withColumnRenamed('reviewerId', 'user').sort('unixReviewTime', ascending=False) 
review_df.show(2)

## Visualization

In [8]:
display(review_df.select('rating'))

In [9]:
display(review_df.select('rating', 'unixReviewTime'))

In [10]:
display(review_df.select('unixReviewTime'))

##data preparation

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
review_df = review_df.withColumn('timeStamp', review_df.unixReviewTime.cast("timestamp").cast("date"))
indexers = [StringIndexer(inputCol=column, outputCol=column+"Id").fit(review_df) for column in ['product', 'user'] ]
pipeline = Pipeline(stages=indexers)
indexed_df = pipeline.fit(review_df).transform(review_df)
indexed_df = (indexed_df.withColumn("productId", indexed_df["productId"].cast('Int'))
                       .withColumn("userId", indexed_df["userId"].cast('Int'))) 

In [13]:
# review summary
review_count = indexed_df.count()
prod_count = indexed_df.select('product').distinct().count()
user_count = indexed_df.select('user').distinct().count()
avg_num_review_prod = review_count/prod_count
print 'There are %s reviews and %s products in the datasets.  there are average %s reviews per product from %s users.' % (review_count, prod_count, avg_num_review_prod, user_count )

In [14]:
from pyspark.sql import functions as F
prod_avg_ratings_df = indexed_df.groupBy('productId').agg(F.count(indexed_df.rating).alias('count'), F.avg(indexed_df.rating).alias('average')).sort('average', 'count', ascending=False)    
prod_avg_ratings_df.show(3)

In [15]:
display(prod_avg_ratings_df)

In [16]:
user_avg_ratings_df = indexed_df.groupBy('userId').agg(F.count(indexed_df.rating).alias('user_count'),F.avg(indexed_df.rating).alias('user_average')).sort('user_average', 'user_count', ascending=False) 
user_avg_ratings_df.show(3)

In [17]:
display(user_avg_ratings_df)

In [18]:
'''
most ratings are above 4.5, How to deal with highly skewed towards very positive reviews?
# 1. Add an adjustment factor by a helpful score: percentage of helpful
'''
from pyspark.sql.functions import when
epsilon = 1e-16
adj = 1.0
min_helpful_vote = 2.0

adjusted_df = indexed_df.withColumn('adjusted_rating', when( (indexed_df['rating'] >= 4.0) & (indexed_df.helpful[1] >= min_helpful_vote), indexed_df.rating +  adj*(indexed_df.helpful[0]/indexed_df.helpful[1].cast("Double"))).otherwise( indexed_df.rating -  adj*(indexed_df.helpful[0]/(indexed_df.helpful[1].cast("Double") + epsilon ))))

display(adjusted_df)

In [19]:
# scale the ratings to 5
max_val = adjusted_df.agg({"adjusted_rating": "max"}).collect()[0][0]
min_val = adjusted_df.agg({"adjusted_rating": "min"}).collect()[0][0]
scaled_df = adjusted_df.withColumn("scaled_rating", 5*(adjusted_df.adjusted_rating - min_val)/(max_val-min_val)).drop('adjusted_rating')
display(scaled_df.select('scaled_rating'))

In [20]:
# average rating and number of rating each product received computed based on adjusted scaled dataset
prod_avg_ratings_scaled_df = scaled_df.groupBy('productId').agg(F.count(scaled_df.scaled_rating).alias('count'), F.avg(scaled_df.scaled_rating).alias('average')).sort('average', 'count', ascending=False)    
prod_avg_ratings_scaled_df.show(3)

In [21]:
display(prod_avg_ratings_scaled_df)

In [22]:
# average rating and number of rating given by each user computed based on adjusted scaled dataset
user_avg_ratings_scaled_df = scaled_df.groupBy('userId').agg(F.count(scaled_df.scaled_rating).alias('user_count'),F.avg(scaled_df.scaled_rating).alias('user_average')).sort('user_average', 'user_count', ascending=False) 
user_avg_ratings_scaled_df.show(3)

In [23]:
display(user_avg_ratings_scaled_df)

In [24]:
display(scaled_df)

In [25]:
'''
2. Time factor, rating time is a significant factor for automobile. Using algorithm similar to following to adjust the rating score further
Hacker News formula (https://medium.com/hacking-and-gonzo/how-hacker-news-ranking-algorithm-works-1d9b0cf2c08d) :
Score = (P-1) / (T+2)^G

where,
P = points of an item (and -1 is to negate submitters vote)
T = time since submission (in hours)
G = Gravity, defaults to 1.8  
'''
import time
import datetime
def calculate_score(votes, item_hour_age, gravity=1.8):
    return (votes - 1) / pow((item_hour_age+2), gravity)
  
gravity = 0.01
current_time = datetime.date.fromtimestamp(time.time())
scaled_df = scaled_df.withColumn("current_time",  F.lit(current_time))
time_adjusted_df =  scaled_df.withColumn('time_adjusted_rating', (scaled_df.rating - 1)/pow((F.datediff(scaled_df['current_time'], scaled_df['timeStamp'])/365+2), gravity)).drop('current_time') 
time_adjusted_df.show(2)

In [26]:
# scale the ratings again to 5
max_val = time_adjusted_df.agg({"time_adjusted_rating": "max"}).collect()[0][0]
min_val = time_adjusted_df.agg({"time_adjusted_rating": "min"}).collect()[0][0]
scaled_df = time_adjusted_df.withColumn("scaled_time_adjusted_rating", 5*(time_adjusted_df.time_adjusted_rating - min_val)/(max_val-min_val)).drop('	time_adjusted_rating')
display(scaled_df.select('scaled_time_adjusted_rating'))

In [27]:
# average rating and number of rating each product received computed based on adjusted scaled dataset
prod_avg_ratings_time_scaled_df = scaled_df.groupBy('productId').agg(F.count(scaled_df.scaled_time_adjusted_rating).alias('count'), F.avg(scaled_df.scaled_time_adjusted_rating).alias('average')).sort('average', 'count', ascending=False)    
display(prod_avg_ratings_time_scaled_df)

In [28]:
user_avg_ratings_time_scaled_df = scaled_df.groupBy('userId').agg(F.count(scaled_df.scaled_time_adjusted_rating).alias('user_count'),F.avg(scaled_df.scaled_time_adjusted_rating).alias('user_average')).sort('user_average', 'user_count', ascending=False) 
display(user_avg_ratings_time_scaled_df)

In [29]:
# create traing/valication/testing sets based on time:  testing set, 2014-03-12 ~ 2014-07-12; vtalication set: 2013-12-12 ~ 2014-03-11; the rest of the data as training set
#test_df = scaled_df.where(scaled_df['timeStamp'] > '2014-03-12')
#validation_df = scaled_df.where((scaled_df['timeStamp'] >= '2013-12-12') & (scaled_df['timeStamp'] <= '2014-03-12'))
#training_df = scaled_df.where(scaled_df['timeStamp'] < '2013-12-12')

seed = 100009193L
(split_70_df, split_a_15_df, split_b_15_df) =  scaled_df.randomSplit([0.7, 0.15, 0.15], seed)
training_df = split_70_df.cache()
validation_df = split_a_15_df.cache()
test_df = split_b_15_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(
  training_df.count(), validation_df.count(), test_df.count())
)
training_df.show(2)
validation_df.show(2)
test_df.show(2)

In [30]:
# baseline recommender: highest average rating and recieved number of ratings more than average number of ratings
train_avg_ratings_df = training_df.groupBy('productId').agg(F.count(training_df.scaled_time_adjusted_rating).alias('count'), F.avg(training_df.scaled_time_adjusted_rating).alias('average'))     
train_with_avg_or_more_num_rating = train_avg_ratings_df.where(train_avg_ratings_df['count'] >= avg_num_review_prod).sort('average', 'count', ascending=False)
train_with_avg_or_more_num_rating.show(3)

In [31]:
# RMSE baseline, calculate RMSE on training and test dataset using average rating from train_with_avg_or_more_num_rating
from pyspark.ml.evaluation import RegressionEvaluator
pred_avg = train_with_avg_or_more_num_rating.agg(F.avg(train_with_avg_or_more_num_rating.average).alias('avg'))
train_with_avg_pred_df = training_df.withColumn('avg', F.lit(pred_avg.collect()[0][0]))
base_reg_eval = RegressionEvaluator(predictionCol="avg", labelCol="rating", metricName="rmse")
base_training_RMSE = base_reg_eval.evaluate(train_with_avg_pred_df)
print('The base line RMSE on the training set of {0}'.format(base_training_RMSE))

test_with_avg_pred_df = test_df.withColumn('avg', F.lit(pred_avg.collect()[0][0]))
base_test_RMSE = base_reg_eval.evaluate(test_with_avg_pred_df)
print('The base line RMSE on the test set of {0}'.format(base_test_RMSE)) 

In [32]:
# Recommendation Using ASL algorithm
from pyspark.ml.recommendation import ALS

seed = 1800009193L
ranks = [4, 8, 12, 16]
errors = [[0]*len(ranks)][0]
models = [[0]*len(ranks)][0]
def als_model(training_df, validation_df, rating, num_iter):
  als = ALS()
  als.setMaxIter(num_iter)\
     .setSeed(seed)\
     .setRegParam(0.1)\
      .setItemCol('productId')\
      .setRatingCol(rating)\
      .setUserCol('userId')
      
  reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol = rating, metricName="rmse")

  tolerance = 0.03
  err = 0
  min_error = float('inf')
  best_rank = -1

  for rank in ranks:
    # Set the rank here:
    als.setRank(rank)
    # Create the model with these parameters.
    model = als.fit(training_df)
    # Run the model to create a prediction. Predict against the validation_df.
    predict_df = model.transform(validation_df).select("userId", "productId", rating, "prediction")

    # Remove NaN values from prediction (due to SPARK-14489)
    predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))
 
    # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df DataFrame
    error = reg_eval.evaluate(predicted_ratings_df)
    errors[err] = error
    models[err] = model
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
      min_error = error
      best_rank = err
    err += 1

  als.setRank(ranks[best_rank])
  print 'The best model was trained with rank %s' % ranks[best_rank]
  return models[best_rank]

In [33]:
my_model = als_model(training_df, validation_df, 'rating', 15)

In [34]:
model_rank_rmse = sqlContext.createDataFrame(zip(ranks, errors), ["Rank", "RMSE_rating"])
display(model_rank_rmse)

In [35]:
# test
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol ="rating", metricName="rmse")
predict_df = my_model.transform(test_df).select("userId", "productId", "rating", "prediction")
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE)) 

In [36]:
my_model = als_model(training_df, validation_df, 'scaled_rating', 15)

In [37]:
model_rank_rmse = sqlContext.createDataFrame(zip(ranks, errors), ["Rank", "RMSE_scaled_rating"])
display(model_rank_rmse)

In [38]:
# test
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol ="scaled_rating", metricName="rmse")
predict_df = my_model.transform(test_df).select("userId", "productId", "scaled_rating", "prediction")
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE)) 

In [39]:
my_model = als_model(training_df, validation_df, 'scaled_time_adjusted_rating', 15)

In [40]:
model_rank_rmse = sqlContext.createDataFrame(zip(ranks, errors), ["Rank", "RMSE_scaled_time_adjusted_rating"])
display(model_rank_rmse)

In [41]:
# test
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol ="scaled_time_adjusted_rating", metricName="rmse")
predict_df = my_model.transform(test_df).select("userId", "productId", "scaled_time_adjusted_rating", "prediction")
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))
test_RMSE = reg_eval.evaluate(predicted_test_df)

print('The model had a RMSE on the test set of {0}'.format(test_RMSE)) 

In [42]:
userRecs = my_model.recommendForAllUsers(5).withColumnRenamed("userId", "Id")
recommend_prod_df = test_df.select("userId").join(userRecs, test_df.userId == userRecs.Id).drop("Id")
print recommend_prod_df.count()
display(recommend_prod_df)

In [43]:
prodRecs = my_model.recommendForAllItems(50).withColumnRenamed("productId", "Id")
recommend_user_df = test_df.select("productId").join(prodRecs, test_df.productId == prodRecs.Id).drop("Id")
display(recommend_user_df)
