### Import libraries

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd
!pip install pyspark
from pyspark.sql.functions import col, explode
from pyspark import SparkContext

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=06f596c55e70bf8de2d03770dd438d000dfd6810167e41f1b6d676fbef66f3e1
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


### Initiate spark session

In [None]:
from pyspark.sql import SparkSession
sc = SparkContext
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

# 1. Load data

In [None]:
products = spark.read.csv("/content/drive/My Drive/product_ratings.csv",header=True)

In [None]:
products.show()

+--------------+----------+------+----------+-------------+----------------+---------+--------------------+
|        userId| productId|rating| timestamp|mapped_userId|mapped_productId|sentiment|      product_review|
+--------------+----------+------+----------+-------------+----------------+---------+--------------------+
|A2CX7LUOHB2NDG|0321732944|     5|1341100800|            0|               0| positive|Very satisfied wi...|
|A2NWSAGRHCP8N5|0439886341|     1|1367193600|            1|               1| positive|Encountered multi...|
|A2WNBOD3WNDNKT|0439886341|     3|1374451200|            2|               1| negative|The product is de...|
|A1GI0U4ZRJA8WN|0439886341|     1|1334707200|            3|               1| positive|Very disappointed...|
|A1QGNMC6O1VW39|0511189877|     5|1397433600|            4|               2|  neutral|Fantastic quality...|
|A3J3BRHTDRFJ2G|0511189877|     2|1397433600|            5|               2| positive|Not worth the mon...|
|A2TY0BTJOTENPG|0511189877| 

In [None]:
products.printSchema()

root
 |-- userId: string (nullable = true)
 |-- productId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- mapped_userId: string (nullable = true)
 |-- mapped_productId: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- product_review: string (nullable = true)



In [None]:
#droping columns that are not needed for running ALS alogrithm
products = products.\
    withColumn('mapped_userId', col('mapped_userId').cast('integer')).\
    withColumn('mapped_productId', col('mapped_productId').cast('integer')).\
    withColumn('rating', col('rating').cast('float')).\
    drop('timestamp').\
    drop('userId').\
    drop('productId').\
    drop('sentiment').\
    drop('product_review')
products.show()

+------+-------------+----------------+
|rating|mapped_userId|mapped_productId|
+------+-------------+----------------+
|   5.0|            0|               0|
|   1.0|            1|               1|
|   3.0|            2|               1|
|   1.0|            3|               1|
|   5.0|            4|               2|
|   2.0|            5|               2|
|   5.0|            6|               2|
|   5.0|            7|               2|
|   5.0|            8|               2|
|   5.0|            9|               2|
|   5.0|           10|               3|
|   1.0|           11|               3|
|   5.0|           12|               3|
|   1.0|           13|               3|
|   4.0|           14|               3|
|   3.0|           15|               3|
|   2.0|           16|               3|
|   2.0|           17|               3|
|   4.0|           18|               3|
|   5.0|           19|               3|
+------+-------------+----------------+
only showing top 20 rows



## Calculate sparsity

In [None]:
# Count the total number of ratings in the dataset
numerator = products.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = products.select("mapped_userId").distinct().count()
num_products = products.select("mapped_productId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_products

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  100.00% empty.


## Interpret ratings

In [None]:
# Group data by userId, count ratings
userId_ratings = products.groupBy("mapped_userId").count().orderBy('count', ascending=False)
userId_ratings.show()

+-------------+-----+
|mapped_userId|count|
+-------------+-----+
|         2145|  250|
|        10837|  245|
|        38781|  147|
|        10529|   80|
|         7292|   73|
|         8616|   70|
|         7688|   68|
|       113226|   64|
|        10840|   61|
|        31912|   59|
|        12590|   57|
|        12117|   56|
|        17181|   56|
|        27396|   54|
|        30019|   52|
|         7351|   52|
|         8859|   50|
|         9342|   49|
|         9256|   41|
|        15104|   40|
+-------------+-----+
only showing top 20 rows



In [None]:
# Group data by userId, count ratings
product_ratings = products.groupBy("mapped_productId").count().orderBy('count', ascending=False)
product_ratings.show()

+----------------+-----+
|mapped_productId|count|
+----------------+-----+
|           24438| 5345|
|           14182| 3523|
|           14779| 2608|
|            5129| 2547|
|            2110| 2075|
|           17468| 1978|
|           14179| 1962|
|           14633| 1816|
|           14318| 1735|
|            9387| 1714|
|            3267| 1692|
|            2247| 1586|
|           14300| 1568|
|            9389| 1304|
|            7344| 1296|
|            8103| 1287|
|           15096| 1139|
|           13776| 1134|
|           19326| 1117|
|            7087| 1107|
+----------------+-----+
only showing top 20 rows



## Build Out An ALS Model

In [None]:
# Import the required functions for running the model
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
products = products.limit(10000)

In [None]:
# Create test and train set
(train, test) = products.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="mapped_userId", itemCol="mapped_productId", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

# Confirm that a model called "als" was created
type(als)

pyspark.ml.recommendation.ALS

In [None]:
train.printSchema()

root
 |-- rating: float (nullable = true)
 |-- mapped_userId: integer (nullable = true)
 |-- mapped_productId: integer (nullable = true)



## Tell Spark how to tune your ALS model

In [None]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \


# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


## Build your cross validation pipeline

In [None]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm the cross validator was was built
print(cv)

CrossValidator_316d761cdc46


## Best Model and Best Model Parameters

In [None]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

In [None]:

# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 100
  MaxIter: 10
  RegParam: 0.01


In [None]:
# View the predictions for ALS model
test_pred = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

2.5294613217256985


In [None]:
test_pred.show()

+------+-------------+----------------+----------+
|rating|mapped_userId|mapped_productId|prediction|
+------+-------------+----------------+----------+
|   1.0|         1270|              46|0.96472883|
|   5.0|         7333|            1196|  4.497788|
|   1.0|         9035|            1263| 0.9752621|
|   4.0|         7838|            1218| 2.2418604|
|   5.0|         3017|             297|0.47522694|
|   4.0|         3010|            1262|0.68607277|
|   1.0|         7823|            1301| 1.6133895|
|   5.0|         2227|              59| 1.6761053|
|   5.0|         4277|             252| 1.2059135|
|   5.0|         2612|              64| 2.2413585|
|   5.0|         2138|              62| 2.4554198|
|   4.0|         8009|            1227| 2.0455399|
|   4.0|         7349|            1235|  2.904678|
|   4.0|         4544|             280| 1.3838326|
|   5.0|         7739|            1215| 1.2970033|
|   3.0|         8673|            1256| 3.9187438|
|   5.0|         1360|         

## Make Recommendations

In [None]:
# Generate n Recommendations for all users
nrec = best_model.recommendForAllUsers(10)
nrec.limit(10).show()

+-------------+--------------------+
|mapped_userId|     recommendations|
+-------------+--------------------+
|            1|[{273, 3.4106715}...|
|            5|[{2, 1.9997269}, ...|
|            6|[{2, 4.999318}, {...|
|           12|[{3, 4.998765}, {...|
|           13|[{3, 0.999753}, {...|
|           15|[{3, 2.9992588}, ...|
|           16|[{3, 1.999506}, {...|
|           17|[{3, 1.999506}, {...|
|           19|[{3, 4.998765}, {...|
|           20|[{3, 3.999012}, {...|
+-------------+--------------------+



### Check if any recommendations make any sense?

In [None]:
nrec.filter('userId = 100').show()