Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

In [None]:

import sys
print(sys.version)

In [None]:
import matplotlib.pyplot as plt
from datetime import datetime
from dateutil import parser
from pyspark.sql.functions import unix_timestamp

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
df = spark.read.table("retailaidb.cleaned_dataset")
spark.sparkContext.setCheckpointDir('checkpoint/')
print("First 5 rows:")
print(df.head(5))
print("Columns: ")
print(df.columns)
print("No. of rows:", df.count())

In [None]:
# Filter only for Electronics items

df = df.withColumn('category_code_new', df['category_code'].substr(0, 11))
df = df.filter("category_code_new = 'electronics'")

In [None]:
top_category = df.groupBy('category_code_new').count().sort('count', ascending=False).limit(5) # only keep top 5 categories
top_category = top_category.withColumnRenamed("category_code","category_code_tmp")

In [None]:
item_to_save = df.groupBy('product_id', "category_code").count().sort('count', ascending=False)

In [None]:
item_to_save = item_to_save.join(top_category, top_category.category_code_tmp == item_to_save.category_code).limit(20)
item_to_save.show(20, False)

In [None]:
from pyspark.sql.functions import *

raw_df = df

product_count = df.groupBy('product_id').count()
product_count = product_count.filter("count >= 30000").orderBy('count', ascending=False) #only counts when the product has 20000 views

raw_df = raw_df.withColumnRenamed("product_id","product_id_tmp")
raw_df = raw_df.join(product_count, raw_df.product_id_tmp == product_count.product_id)

user_count = df.groupBy('user_id').count()
user_count = user_count.filter("count >= 200").orderBy('count', ascending=False) #only counts when the user has 100 clicks

raw_df = raw_df.withColumnRenamed("user_id","user_id_tmp")
raw_df = raw_df.join(user_count, raw_df.user_id_tmp == user_count.user_id)

df = raw_df

df = df.where(df.event_type == "view")
df = df.drop("event_time","category_code","user_session","price","brand","category_id")
df = df.groupBy([df.product_id, df.user_id]).count()

# spark.write.table("retailaidb.cleaned_dataset_20000filter")

In [None]:
# save table for further use
df.write.saveAsTable("retailaidb.cleaned_dataset_electronics", mode="overwrite")

In [None]:
#import the required functions and libraries
from pyspark.sql.functions import *

In [None]:
from pyspark.sql.types import IntegerType
df = df.withColumn("user_id", df["user_id"].cast(IntegerType()))
df = df.withColumn("product_id", df["product_id"].cast(IntegerType()))
df.printSchema()

In [None]:
df.show(10, False)

In [None]:
#split the data into training and test datatset
train,test=df.randomSplit([0.75,0.25])
print("Training Count:")
train.count()
print("Test Count:")
test.count()

In [None]:
#import ALS recommender function from pyspark ml library
from pyspark.ml.recommendation import ALS
#Training the recommender model using train datatset
rec=ALS(maxIter=40,regParam=0.20,implicitPrefs = True, userCol='user_id',itemCol='product_id',ratingCol='count',nonnegative=True,coldStartStrategy="drop", rank=25)
#fit the model on train set
rec_model=rec.fit(train)
#making predictions on test set 
predicted_ratings=rec_model.transform(test)
#columns in predicted ratings dataframe
predicted_ratings.printSchema()

In [None]:
#predicted vs actual ratings for test set 
predicted_ratings.orderBy(rand()).show(10)

In [None]:
predicted_ratings_witherr=predicted_ratings.withColumn('err',abs(predicted_ratings["prediction"] - predicted_ratings["count"]))
predicted_ratings_witherr.show()

In [None]:
df.groupBy('count').count().orderBy('count',ascending=True).show()

In [None]:
predicted_ratings_witherr.groupBy('count').agg({'err':'mean'}).orderBy('count',ascending=True).show()

In [None]:
#importing Regression Evaluator to measure RMSE
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
#create Regressor evaluator object for measuring accuracy
evaluator=RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='count')

In [None]:
#apply the RE on predictions dataframe to calculate RMSE
rmse=evaluator.evaluate(predicted_ratings)
print(rmse)

In [None]:
# Save the model
rec_model.write().overwrite().save("retailai_recommendation_model")

In [None]:
#create function to recommend top 'n' products to any particular user
def top_recommendations(user_id,n):
    """
    This function returns the top 'n' products that user has not seen yet but might like 
    
    """
    #assigning alias name 'a' to unique products df
    a = unique_products.alias('a')
    
    #creating another dataframe which contains already watched product by active user 
    watched_products=indexed.filter(indexed['user_id'] == user_id).select('product_id')
    
    #assigning alias name 'b' to watched products df
    b=watched_products.alias('b')
    
    #joining both tables on left join 
    total_products = a.join(b, a.product_id == b.product_id,how='left')
    
    #selecting products which active user is yet to rate or watch
    remaining_products=total_products.where(col("b.product_id").isNull()).select(a.product_id).distinct()
    
    
    #adding new column of user_Id of active user to remaining products df 
    remaining_products=remaining_products.withColumn("user_id",lit(int(user_id)))
    
    
    #making recommendations using ALS recommender model and selecting only top 'n' products
    recommendations=rec_model.transform(remaining_products).orderBy('prediction',ascending=False).limit(n)
    
    
    #adding columns of product titles in recommendations
    product_title = IndexToString(inputCol="product_id", outputCol="product_id",labels=model.labels)
    final_recommendations=product_title.transform(recommendations)
    
    #return the recommendations to active user
    return final_recommendations.show(n,False)

In [None]:
top_recommendations(522313122,10)

In [None]:
from pyspark.ml.recommendation import ALSModel
model_reload = ALSModel.load("retailai_recommendation_model")


In [None]:

# partly borrowed from https://github.com/akshitvjain/item-based-recommender
from pyspark.sql import Row
from pyspark.sql.functions import col
import numpy as np
from numpy import linalg as LA
from pyspark.ml.recommendation import ALSModel

class ItemBasedRecommender():

    def __init__(self, model, spark):
        self.model = model
        self.spark = spark
        self.itemFactors = self.model.itemFactors

    def compute_similarity(self, item_id):
        item = self.itemFactors.where(
            col('id') == item_id).select(col('features'))
        item_features = item.rdd.map(lambda x: x.features).first()

        lol = []
        for row in self.itemFactors.rdd.toLocalIterator():
            _id = row.__getattr__('id')
            features = row.__getattr__('features')
            similarity_score = self._cosine_similarity(features, item_features)
            if _id != item_id:
                lol.append([_id, similarity_score])

        R = Row('item_index', 'similarity_score')
        self.similar_items_df = self.spark.createDataFrame(
            [R(col[0], float(col[1])) for col in lol])
        self.similar_items_df = self.similar_items_df.orderBy(
            col('similarity_score').desc()).na.drop()
        return self.similar_items_df


    def _cosine_similarity(self, vector_1, vector_2):
        v1 = np.asarray(vector_1)
        v2 = np.asarray(vector_2)
        cs = v1.dot(v2) / (LA.norm(v1) * LA.norm(v2))
        return(cs)

# item id as input. Note: this is the normalized item id starting from 0.        
test_id = 100
item_based_rec = ItemBasedRecommender(model_reload, spark)
ret_df = item_based_rec.compute_similarity(test_id)
ret_df.show(5)