In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [3]:
sc = SparkContext()

In [4]:
spark = SparkSession(sc)

### Read the dataset

In [5]:
import pandas as pd
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [6]:
df = pd.read_csv('Files/Review.csv', index_col=0)
df.head()

Unnamed: 0,customer_id,product_id,customer_rating
0,709310,10001012,3
1,10701688,10001012,5
2,11763074,10001012,5
3,9909549,10001012,5
4,1827148,10001012,5


In [7]:
df_schema = StructType([
                        StructField("customer_id", StringType(), True),
                        StructField("product_id", StringType(), True),
                        StructField("customer_rating", IntegerType(), True),
                       ])
data = spark.createDataFrame(df, schema=df_schema)

In [8]:
data.show(5)

+-----------+----------+---------------+
|customer_id|product_id|customer_rating|
+-----------+----------+---------------+
|     709310|  10001012|              3|
|   10701688|  10001012|              5|
|   11763074|  10001012|              5|
|    9909549|  10001012|              5|
|    1827148|  10001012|              5|
+-----------+----------+---------------+
only showing top 5 rows



In [9]:
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import isnan, when, count, col

In [10]:
data.show(5, truncate=True)

+-----------+----------+---------------+
|customer_id|product_id|customer_rating|
+-----------+----------+---------------+
|     709310|  10001012|              3|
|   10701688|  10001012|              5|
|   11763074|  10001012|              5|
|    9909549|  10001012|              5|
|    1827148|  10001012|              5|
+-----------+----------+---------------+
only showing top 5 rows



In [11]:
data.select([count(when(col(c).isNull(), c)).alias(c) for c in
                data.columns]).toPandas()

Unnamed: 0,customer_id,product_id,customer_rating
0,0,0,0


- No null data

In [12]:
# Check duplicates
dup_rows = data.count() - data.distinct().count()
dup_rows

1539

In [13]:
# Drop duplicates
data = data.drop_duplicates()

- No duplicates 

In [14]:
data.select('customer_rating').describe().show()

+-------+------------------+
|summary|   customer_rating|
+-------+------------------+
|  count|            360211|
|   mean|4.4734752686619785|
| stddev| 1.018069608037956|
|    min|                 1|
|    max|                 5|
+-------+------------------+



In [15]:
# Distinct users and products
users = data.select('customer_id').distinct().count()
products = data.select('product_id').distinct().count()
numerator = data.count()

In [16]:
display(numerator, users, products)

360211

251491

4218

In [17]:
# Number of ratings matrix could contain if no empty cells
denominator = users * products
denominator

1060789038

In [18]:
#Calculating sparsity
sparsity = 1 - (numerator*1.0 / denominator)
print ("Sparsity: "), sparsity

Sparsity: 


(None, 0.9996604310686702)

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [20]:
# Converting String to index
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [21]:
# Create an indexer
indexer = StringIndexer(inputCol='product_id',
                        outputCol='product_id_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(data)

# Indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(data)

# Repeat the process for the other categorical feature
indexer1 = StringIndexer(inputCol='customer_id',
                        outputCol='customer_id_idx')
indexer1_model = indexer1.fit(data_indexed)
data_indexed = indexer1_model.transform(data_indexed)

In [22]:
data_indexed.show(5, truncate=True)

+-----------+----------+---------------+--------------+---------------+
|customer_id|product_id|customer_rating|product_id_idx|customer_id_idx|
+-----------+----------+---------------+--------------+---------------+
|    7248606|  10001353|              5|        2041.0|         3028.0|
|   15191237|  10001355|              5|        1865.0|        33222.0|
|   13146900|  10001382|              5|         243.0|        29444.0|
|   15721783|  10001384|              2|         390.0|       120112.0|
|    6185696|  10062880|              5|         520.0|       204613.0|
+-----------+----------+---------------+--------------+---------------+
only showing top 5 rows



In [23]:
data_indexed.select([count(when(col(c).isNull(), c)).alias(c) for c in
                    data_indexed.columns]).toPandas().T

Unnamed: 0,0
customer_id,0
product_id,0
customer_rating,0
product_id_idx,0
customer_id_idx,0


- No null data

### Train-test split

In [24]:
(training, test) = data_indexed.randomSplit([0.8, 0.2])

### Build model

In [25]:
# Creating ALS model and fitting data
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [26]:
als = ALS(maxIter=5,
          regParam=0.09,
          rank=25,
          userCol='customer_id_idx',
          itemCol='product_id_idx',
          ratingCol='customer_rating',
          coldStartStrategy='drop',
          nonnegative=True)
model = als.fit(training)

### Testing

In [27]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [28]:
predictions.select(["product_id_idx", "customer_id_idx",
                    "customer_rating", "prediction",]).show(5)

+--------------+---------------+---------------+----------+
|product_id_idx|customer_id_idx|customer_rating|prediction|
+--------------+---------------+---------------+----------+
|        1280.0|           12.0|              5| 6.2430754|
|          79.0|           12.0|              5|  4.149614|
|          31.0|           12.0|              5| 4.5580273|
|         798.0|           12.0|              5| 5.5005198|
|         661.0|           12.0|              5| 5.2824526|
+--------------+---------------+---------------+----------+
only showing top 5 rows



In [29]:
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="customer_rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 2.137468867385508


### Tuning parameters

In [30]:
als_t = ALS(userCol='customer_id_idx', itemCol='product_id_idx', ratingCol='customer_rating', coldStartStrategy="drop", nonnegative=True)

In [31]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# define the grid of hyperparameters
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [50,75,100])\
    .addGrid(als_t.maxIter, [5, 10, 20]) \
    .addGrid(als_t.regParam, [0.1]) \
    .build()

In [32]:
# define the cross-validator with number of folds and parallelism
crossval = CrossValidator(estimator=als_t,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [33]:
# fit the cross-validator to the training data
cvModel = crossval.fit(training)

In [34]:
# best ALS model from the cross-validator
bestModel = cvModel.bestModel

In [35]:
# Evaluate the model by computing the RMSE on the test data
predictions_t = bestModel.transform(test)

In [36]:
rmse_t = evaluator.evaluate(predictions_t)
print('Root-mean-square-error = ', str(rmse_t))

Root-mean-square-error =  1.2766821563392612


- Select bestModel because rmse of bestModel is 1.27, it's less than model's rmse ~2.14

### Save the model

In [37]:
bestModel.save('RecommendationSystem_ALS')

### Recommend for specific customers

In [38]:
def recommend_product(customer_id):
    # Get 10 recommendations which have the highest rating
    customer_recs = bestModel.recommendForAllUsers(10)

    # create a df of distinct 'customer_id_idx' & 'customer_id'
    df_customer_customer_id = data_indexed.select('customer_id_idx', 'customer_id').distinct()

    # join with customer_recs
    new_customer_recs = customer_recs.join(df_customer_customer_id, on='customer_id_idx', how='left')

    # create a df of distinct 'product_id_idx' & 'product_id'
    df_product_product_idx = data_indexed.select('product_id_idx', 'product_id').distinct()

    # Recommendation for customer_id
    find_customer_rec = new_customer_recs.filter(new_customer_recs['customer_id'] == int(customer_id))
    customer = find_customer_rec.first()
    lst = []
    for row in customer['recommendations']:
        row_f = df_product_product_idx.filter(df_product_product_idx.product_id_idx == row['product_id_idx'])
        row_f_first = row_f.first()
        lst.append((row['product_id_idx'], row_f_first['product_id'], row['rating']))
    df_rec = spark.createDataFrame(lst, ['product_id_idx', 'product_id', 'rating'])

    # join with product name
    df_product = spark.read.csv('Files/Product.csv', header=True, inferSchema=True)
    df_joined = df_rec.join(df_product, on='product_id')
    df_joined = df_joined.select('product_id', 'product_name', 'rating')

    # join with customer id
    customer_id_idx = df_customer_customer_id.filter(df_customer_customer_id.customer_id == int(customer_id)).select('customer_id_idx').collect()[0][0]
    df_final = df_joined.withColumn('customer_id', lit(customer_id_idx))
    return df_final

In [39]:
df_top10 = recommend_product('5145760')
df_top10.show()

+----------+--------------------+-----------------+-----------+
|product_id|        product_name|           rating|customer_id|
+----------+--------------------+-----------------+-----------+
|  66251373|Flycam Bugs 20 EI...|6.492493629455566|     8620.0|
|  52699359|Máy Giặt Cửa Trên...|6.004734039306641|     8620.0|
|  56399103|Amply Bluetooth K...|5.947179794311523|     8620.0|
|  39720996|Loa đứng karaoke ...|5.848761081695557|     8620.0|
|  29248122|Máy sấy Electrolu...|5.846507549285889|     8620.0|
|  73831096|Laptop Dell Vostr...|5.789547920227051|     8620.0|
|   2383179|Máy Ảnh Lấy Liền ...|5.706010818481445|     8620.0|
|  50592905|Smart Tivi QLED S...|5.665002822875977|     8620.0|
|   9687395|Thẻ Nhớ Lexar SDX...|5.630174160003662|     8620.0|
|   4597127|Giá Treo Tivi Sát...| 5.56638765335083|     8620.0|
+----------+--------------------+-----------------+-----------+



### Save the dataframe

In [40]:
from pyspark.sql.functions import *

def recommend_products_all():
    # Get 10 recommendations which have the highest rating
    customer_recs = bestModel.recommendForAllUsers(10)

    # create a df of distinct 'customer_id_idx' & 'customer_id'
    df_customer_customer_id = data_indexed.select('customer_id_idx', 'customer_id').distinct()

    # join with customer_recs
    new_customer_recs = customer_recs.join(df_customer_customer_id, on='customer_id_idx', how='left')

    # create a df of distinct 'product_id_idx' & 'product_id'
    df_product = spark.read.csv('Files/Product_image.csv', header=True, inferSchema=True)
    df_product_indexed = data_indexed.select('product_id', 'product_id_idx').distinct().join(df_product, on='product_id', how='left')

    # join with product name
    df_joined = new_customer_recs.withColumn('recommendation', explode('recommendations')) \
        .selectExpr('customer_id', 'recommendation.product_id_idx as product_id_idx', 'recommendation.rating as rating')
    df_joined = df_joined.join(df_product_indexed, on='product_id_idx').select('customer_id', 'product_id', 'product_name', 'rating', 'image')
    

    return df_joined

In [42]:
# call the function and store the result in a variable
recommendations = recommend_products_all()

# save the DataFrame as parquet
recommendations.write.parquet('Files/ALS_recommendation')