In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
%cd gdrive/My Drive/Colab Notebooks

/content/gdrive/My Drive/Colab Notebooks


In [None]:
! git clone https://github.com/Microsoft/Recommenders

Cloning into 'Recommenders'...
remote: Enumerating objects: 121, done.[K
remote: Counting objects: 100% (121/121), done.[K
remote: Compressing objects: 100% (93/93), done.[K
remote: Total 24400 (delta 62), reused 55 (delta 28), pack-reused 24279[K
Receiving objects: 100% (24400/24400), 196.53 MiB | 14.17 MiB/s, done.
Resolving deltas: 100% (15866/15866), done.
Checking out files: 100% (358/358), done.


In [None]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 63kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 40.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=332a73e4599671d2bbace46bbe9efdfbd0ba4b5957311d24b91dde74cef11ec2
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
import os
os.chdir('/content/gdrive/My Drive/Colab Notebooks/Recommenders')

In [None]:
import sys
sys.path.append("../../")
import pyspark
from pyspark.ml.recommendation import ALS
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType
from reco_utils.common.timer import Timer
from reco_utils.dataset import movielens
from reco_utils.common.notebook_utils import is_jupyter
from reco_utils.dataset.spark_splitters import spark_random_split
from reco_utils.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from reco_utils.common.spark_utils import start_or_get_spark
from pyspark.sql.types import IntegerType
print("System version: {}".format(sys.version))
print("Spark version: {}".format(pyspark.__version__))

System version: 3.6.9 (default, Oct  8 2020, 12:12:24) 
[GCC 8.4.0]
Spark version: 3.0.1


Set the default parameters

In [None]:
# top k items to recommend
TOP_K = 10


**0. Set up Spark context**

In [None]:
spark = start_or_get_spark("ALS PySpark", memory="16g")

1. Download the dataset

In [None]:
data=spark.read.format('csv').options(header='true').load('regular.csv')
df=data.select("Customer_ID","ProductID","Rating","Order_Date")
df.show()

+-----------+---------+------+----------+
|Customer_ID|ProductID|Rating|Order_Date|
+-----------+---------+------+----------+
|          8|        1|   4.0|2014-01-01|
|          9|        1|   4.5|2014-01-01|
|         12|        1|   4.0|2014-01-01|
|         20|        1|   4.0|2014-01-01|
|         24|        1|   4.0|2014-01-01|
|         27|        1|   3.5|2014-01-01|
|         34|        1|   3.0|2014-01-02|
|         37|        1|   3.5|2014-01-02|
|         40|        1|   4.0|2014-01-02|
|         41|        1|   3.5|2014-01-02|
|         44|        1|   5.0|2014-01-02|
|         47|        1|   5.0|2014-01-02|
|         51|        1|   5.0|2014-01-02|
|         55|        1|   5.0|2014-01-02|
|         56|        1|   5.0|2014-01-02|
|         59|        1|   1.0|2014-01-02|
|         63|        1|   3.5|2014-01-02|
|         67|        1|   5.0|2014-01-02|
|         68|        1|   4.5|2014-01-03|
|         75|        1|   4.0|2014-01-03|
+-----------+---------+------+----

2. Split the data using the Spark random splitter provided in utilities

In [None]:
train, test = spark_random_split(data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 5822
N test 2031


3. Train the ALS model on the training data, and get the top-k recommendations for our testing data¶

In [None]:
header = {
    "userCol": "Customer_ID",
    "itemCol": "ProductID",
    "ratingCol": "Rating",
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [None]:

train = train.withColumn("Customer_ID", train["Customer_ID"].cast(IntegerType()))
train = train.withColumn("ProductID", train["ProductID"].cast(IntegerType()))
train = train.withColumn("Rating", train["Rating"].cast(IntegerType()))

In [None]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 15.512276572000019 seconds for training.


In [None]:
with Timer() as test_time:

    # Get the cross join of all user-item pairs and score them.
    users = train.select('Customer_ID').distinct()
    items = train.select('ProductID').distinct()
    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    # Remove seen items.
    dfs_pred_exclude_train = dfs_pred.alias("pred").join(
        train.alias("train"),
        (dfs_pred['Customer_ID'] == train['Customer_ID']) & (dfs_pred['ProductID'] == train['ProductID']),
        how='outer'
    )

    top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train.Rating"].isNull()) \
        .select('pred.' + 'Customer_ID', 'pred.' + 'ProductID', 'pred.' + "prediction")

    # In Spark, transformations are lazy evaluation
    # Use an action to force execute and measure the test time 
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

Took 37.29452175999995 seconds for prediction.


In [None]:
top_all.show()

+-----------+---------+----------+
|Customer_ID|ProductID|prediction|
+-----------+---------+----------+
|          2|       80| 3.7612119|
|          8|       52| 2.9393995|
|         15|       14| 1.7130064|
|         15|       26|  2.291733|
|         18|       68| 4.3159866|
|         18|       95| 3.5058136|
|         20|      121| 3.1371145|
|         27|       65| 1.7354449|
|         28|       16| 2.6154988|
|         28|       63| 2.4449253|
|         36|       83|0.92783904|
|         41|      103| 2.6113977|
|         47|       24| 3.2844334|
|         47|       32| 4.0610013|
|         52|       58| 2.5323274|
|         53|      103| 2.9531116|
|         56|       26| 4.2257004|
|         56|       30| 3.8704646|
|         63|       81| 2.1452148|
|         64|       48| 1.6869694|
+-----------+---------+----------+
only showing top 20 rows



In [None]:
test = test.withColumn("Customer_ID", test["Customer_ID"].cast(IntegerType()))
test = test.withColumn("ProductID", test["ProductID"].cast(IntegerType()))
test = test.withColumn("Rating", test["Rating"].cast(IntegerType()))

In [None]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()

+------+---------------+----------+----------+--------------+-----------+-----------------+-----------+----------------+-----------------+--------------------+-----------+------+--------------+---------+---------------+------------+--------------------+-----------------+--------+--------+------------------+-------------+--------------+------+-------+---------+------------------+------+----------+----------+----------+--------+-------+----------+
|Row_ID|       Order_ID|Order_Date| Ship_Date|     Ship_Mode|Customer_ID|    Customer_Name|    Segment|            City|            State|             Country|Postal_Code|Market|        Region|ProductID|       Category|Sub-Category|         ProductName|            Sales|Quantity|Discount|            Profit|Shipping_Cost|Order_Priority|Rating|Recency|Frequency|          Monetary|  Rank|R_Quartile|F_Quartile|M_Quartile|RFMScore|   Type|prediction|
+------+---------------+----------+----------+--------------+-----------+-----------------+---------

In [None]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user="Customer_ID", col_item="ProductID", 
                                    col_rating="Rating", col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

Model:	ALS rating prediction
RMSE:	1.382080
MAE:	1.105362
Explained variance:	-0.267233
R squared:	-0.779769
