In [1]:
# install pyspark if needed
# !pip install pyspark

In [None]:
# lit, sum, when, count and row were only used to check values,
# consider dropping when cleaning up the notebook.
import pandas as pd

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, count, isnan, lit, sum, when

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [None]:
# use config to adjust memory usage (use to prevent memory issues)
spark = SparkSession.builder.config('spark.driver.memory', '8g').appName('rec').getOrCreate()

In [None]:
!pwd

/content


In [None]:
# load in saved data
transformed = spark.read.csv('/content/drive/My Drive/Colab Notebooks/data/transformed_data.csv')

In [None]:
# load in data from original tsv file. When working, use the cleaned version 'transformed'
data = spark.read.csv('/content/drive/My Drive/Colab Notebooks/data/amazon_reviews_us_Digital_Music_Purchase_v1_00.tsv', 
                      inferSchema=True, header=True, sep='\t')

In [None]:
# *dont know why i would load dataframe this way, but this is another way to load in the data*
# line = spark.read.option('header','true').text('amazon_reviews_us_Digital_Music_Purchase_v1_00.tsv').rdd

# parts = line.map(lambda row: row.value.split("\t"))
# ratingsRDD = parts.map(lambda p: Row(customer_id=int(p[1]), musicId=int(p[5]),
#                                      rating=float(p[7])))

# ratings = spark.createDataFrame(ratingsRDD)

In [None]:
# *these lines of code were used to create transformed dataset, use this when transformed.csv is missing*
# data.show(10, truncate=True)

# rec_data = data.select(data['customer_id'], data['product_title'], data['star_rating'])
# rec_data.show(10, truncate=True)

# *indexing user and product because ALS model only takes numerical values*
# ***IMPORTANT***: issue with indexer, it generates different indices for the same customer id
# indexer = [StringIndexer(inputCol=column, outputCol=column+'_index')
#             for column in list(set(rec_data.columns)-set(['star_rating']))]

# pipeline = Pipeline(stages=indexer)
# transformed = pipeline.fit(rec_data).transform(rec_data)

In [None]:
transformed.show(10, truncate=True)

+-----------+--------------------+-----------+-------------------+-----------------+
|customer_id|       product_title|star_rating|product_title_index|customer_id_index|
+-----------+--------------------+-----------+-------------------+-----------------+
|   10293599|Knockin' On Heave...|          5|              82560|             1397|
|    6087195|            Flawless|          5|             777773|             1392|
|   33717063|    Scandal of Grace|          4|             106759|             6787|
|   14948020|  I Surrender (Live)|          5|              10969|             3363|
|   21694522|We Are Young (Jer...|          4|               7349|            47101|
|   14948020|        Lord, I Live|          5|              10969|           339136|
|   20031966|  I'll Never Be Free|          5|               5671|            86772|
|   12223745|    All of the Stars|          5|             282897|             5076|
|   30231876|Exile On Main Str...|          5|             510931

In [None]:
# use this to deal with changed column names
oldnames = transformed.schema.names
newnames = ['customer_id', 'product_title', 'star_rating', 'product_title_index', 'customer_id_index']

for i, name in enumerate(oldnames):
    transformed = transformed.withColumnRenamed(name, newnames[i])

In [None]:
transformed.schema.names

['customer_id',
 'product_title',
 'star_rating',
 'product_title_index',
 'customer_id_index']

In [None]:
# this is used to change the dtype in case it's casuing issues when fitting the model
transformed = transformed.withColumn('customer_id_index', transformed.customer_id_index.cast('int'))
transformed = transformed.withColumn('product_title_index', transformed.product_title_index.cast('int'))
transformed = transformed.withColumn('star_rating', transformed.star_rating.cast('int'))
transformed = transformed.withColumn('customer_id', transformed.customer_id.cast('int'))

In [None]:
transformed.dtypes

[('customer_id', 'int'),
 ('product_title', 'string'),
 ('star_rating', 'int'),
 ('product_title_index', 'int'),
 ('customer_id_index', 'int')]

In [None]:
# check for nan values
transformed.select([count(when(isnan(c), c)).alias(c) for c in transformed.columns]).show()

+-----------+-------------+-----------+-------------------+-----------------+
|customer_id|product_title|star_rating|product_title_index|customer_id_index|
+-----------+-------------+-----------+-------------------+-----------------+
|          0|            0|          0|                  0|                0|
+-----------+-------------+-----------+-------------------+-----------------+



In [None]:
# IMPORTANT!!!!!!!!
# THERE ARE THREE NULL VALUES IN THE DATA, USE both isNull and isnan to check
transformed.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in transformed.columns]).show()

+-----------+-------------+-----------+-------------------+-----------------+
|customer_id|product_title|star_rating|product_title_index|customer_id_index|
+-----------+-------------+-----------+-------------------+-----------------+
|          0|            0|          0|                  0|                0|
+-----------+-------------+-----------+-------------------+-----------------+



In [None]:
# also, remember to drop rows with null values
transformed = transformed.dropna()

In [None]:
# saving final dataset to csv
transformed.write.overwrite.csv('transformed_data.csv')

In [None]:
# split twice here to create holdout set for later testing
(work_set, holdout) = transformed.randomSplit([0.8, 0.2])

In [None]:
(training, test) = work_set.randomSplit([0.8, 0.2])

In [None]:
# params for baseline ALS model
baseline_als = ALS(maxIter=5, regParam=0.01, 
                   userCol='customer_id', itemCol='product_title_index',
                   ratingCol='star_rating', coldStartStrategy='drop', 
                   nonnegative=True)

In [None]:
baseline_model = baseline_als.fit(training)

In [None]:
# saving baseline model
baseline_model.save('baseline_model_v2')

In [None]:
baseline_predictions = baseline_model.transform(test)

In [None]:
baseline_holdoutpred = baseline_model.transform(holdout)

In [None]:
# use ALSModel not ALS to load persisted model
from pyspark.ml.recommendation import ALSModel

In [None]:
path = '/content/drive/My Drive/Colab Notebooks/data/baseline_model'
persistedModel = ALSModel.load(path)

In [None]:
baseline_predictions2 = persistedModel.transform(test)

In [None]:
baseline_predictions.columns

['customer_id',
 'product_title',
 'star_rating',
 'product_title_index',
 'customer_id_index',
 'prediction']

In [None]:
baseline_predictions.select('prediction').distinct().show()

+----------+
|prediction|
+----------+
|  4.777439|
|  4.665684|
| 4.9995794|
| 4.9250536|
| 4.5987654|
|  4.498161|
| 4.7985516|
| 3.9973154|
| 2.6655726|
| 3.9982824|
|  4.198654|
|  4.999115|
|  4.998976|
| 4.9985247|
|  4.998269|
| 4.7988133|
| 4.8867493|
|  4.999251|
|  4.998774|
|  4.999388|
+----------+
only showing top 20 rows



In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="star_rating",predictionCol="prediction")

In [None]:
map_eval = RegressionEvaluator(metricName='map', labelCol='star_rating', predictionCol='prediction')

In [None]:
rmse = evaluator.evaluate(baseline_predictions)

In [None]:
rmse_holdout = evaluator.evaluate(baseline_holdoutpred)

In [None]:
map_score = evaluator.evaluate(baseline_predictions)

In [None]:
rmse

0.8038922504895376

In [None]:
rmse_holdout

0.8020742127639935

In [None]:
map_score

0.8038922504895375

baseline v2 has rmse of 0.8038922504895376 on testing set after fixing the customer id index issue

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
als_cv_model = ALS(userCol='customer_id', itemCol='product_title_index', ratingCol='star_rating', coldStartStrategy='drop')

In [None]:
paramGrid = ParamGridBuilder() \
                .addGrid(als_cv_model.rank, [5, 15]) \
                .addGrid(als_cv_model.maxIter, [5, 15]) \
                .addGrid(als_cv_model.regParam, [0.1, 0.01]) \
                .addGrid(als_cv_model.nonnegative, [True, False]) \
                .addGrid(als_cv_model.alpha, [1, 2]) \
                .addGrid(als_cv_model.seed, [88]) \
                .build()

In [None]:
cv = CrossValidator(estimator=als_cv_model,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=5)

In [None]:
cv_fitted = cv.fit(training)

In [None]:
best_cv_model = cv_fitted.bestModel

In [None]:
best_cv_pred = best_cv_model.transform(test)

In [None]:
best_cv_rmse = evaluator.evaluate(best_cv_pred)

In [None]:
best_cv_rmse

0.794830886083656

In [None]:
best_cv_model.save('best_cv_model')

In [None]:
!pwd

/content
