Import libraries

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, FloatType, StringType, LongType, StructField
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

Create SparkSession

In [2]:
spark = SparkSession.builder.master("local[6]").appName('Book Ratings')\
                            .config('spark.executor.memory', '8g')\
                            .config('spark.driver.memory', '4g')\
                            .getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/19 09:48:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Import Dataset into Spark DF

In [3]:
# Get underlying Spark Context
sc = spark.sparkContext

In [4]:
# define new schema
schema = StructType()\
        .add('bookID', IntegerType(), nullable=False)\
        .add('userID', StringType(), nullable=False)\
        .add('rating', FloatType(), nullable=False)\
        .add('timestamp', LongType(), nullable=False)

In [5]:
# Import data into PySpark DF
books = spark.read.format('csv').schema(schema).load('../data/Books.csv')
books.head()

Row(bookID=1713353, userID='A1C6M8LCIX4M6M', rating=5.0, timestamp=1123804800)

In [6]:
# Verify data types
books.printSchema()

root
 |-- bookID: integer (nullable = true)
 |-- userID: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: long (nullable = true)



The data is returned in the format `(int(bookID), str(userID), float(rating), long(timestamp))`.

To parse into a `PySpark` `Rating` object, it is expected to be in the format `(int(userID), int(bookID), float(rating), long(timestamp))`.

<hr>

### Check for `NoneType` Values

In [7]:
# Check for None values in the DF
none_count = books.filter(
    (col("bookID").isNull()) |
    (col("userID").isNull()) |
    (col("rating").isNull()) |
    (col("timestamp").isNull())
).count()
has_none_values = none_count > 0

# Print the result
if has_none_values:
    print(f"The DF has at least {none_count} rows with a None value.")
else:
    print("The DF does not have any rows with None values.")



The DF has at least 6671993 rows with a None value.


                                                                                

In [8]:
none_df = books.filter(
    (col('bookID').isNull()) |
    (col('userID').isNull()) |
    (col('rating').isNull()) |
    (col('timestamp').isNull())
)
none_df.take(10)

[Row(bookID=None, userID='ASS457AQPDIFZ', rating=5.0, timestamp=1409443200),
 Row(bookID=None, userID='A3NMH1KTLG7CWX', rating=5.0, timestamp=1398816000),
 Row(bookID=None, userID='A2LI5026JCXQBA', rating=4.0, timestamp=1398729600),
 Row(bookID=None, userID='AHNMXYVRDN1R9', rating=5.0, timestamp=1394323200),
 Row(bookID=None, userID='A2CAVTNQA2Y3IJ', rating=5.0, timestamp=1384560000),
 Row(bookID=None, userID='A2685NTFXLJJ1T', rating=5.0, timestamp=1377475200),
 Row(bookID=None, userID='A17TBLPM7H401J', rating=4.0, timestamp=1374364800),
 Row(bookID=None, userID='A1840OJGNFSBSN', rating=5.0, timestamp=900460800),
 Row(bookID=None, userID='A3ONKN7GMHG6K2', rating=5.0, timestamp=1400630400),
 Row(bookID=None, userID='A4LSI6PTX23BE', rating=5.0, timestamp=1400284800)]

The number of rows with missing values represent about 13% of the entire dataset.   
Since there isn't a straightforward way of handling the missing values without affecting the results of the recommendations, the rows will simply be dropped.

In [9]:
# Drop missing values
books = books.dropna()

# Verify count
books.count()

                                                                                

44639628

After dropping rows with missing values, we still have around ~44.6M rows of data left to work with.

<hr>

### Convert Alphanumeric UserID to Int
For use in the collaborative filtering algorithm, the `productID` and `userID` must be of type `int`.  

To convert the alphanumeric UserIDs to unique integer IDs, we can first map each of the unique user IDs to a unique Integer ID, returning a new dictionary object in the k:v format --> `{'alpha_numID': 'new_intID'}`.  

After this, we can parse the lines of data into `Rating` objects for use in Pyspark's collaborative filtering algorithm by using the dict obj to map the originial alphanumeric `userID` to the newly defined numeric IDs -- fulfilling the `int` dtype requirement.

Convert DF to RDD for Mapping UserIDs

In [10]:
# Convert DF to RDD 
books_rdd = books.rdd.map(list)
books_rdd.first()

[1713353, 'A1C6M8LCIX4M6M', 5.0, 1123804800]

In [11]:
# Get all user IDs
user_ids = books_rdd.map(lambda x: x[1])

# Verify first line is user ID
user_ids.first()

'A1C6M8LCIX4M6M'

Get all unique userIDs for mapping.

In [12]:
# Get unique user IDs
user_ids = user_ids.distinct()

# Verify users total 14M+
user_ids.count() # resource intensive line



CodeCache: size=131072Kb used=22104Kb max_used=22106Kb free=108967Kb
 bounds [0x00000001091e0000, 0x000000010a7a0000, 0x00000001111e0000]
 total_blobs=8899 nmethods=7950 adapters=861
 compilation: disabled (not enough contiguous free space left)


                                                                                

14212140

The number of unique users in the dataset is 14M+. Now each unique alphanumeric userID can be mapped to a new unique integer ID.

In [13]:
# map user IDs to unique Int ID -> returns a dictionary
user_ids_mapped = user_ids.zipWithUniqueId().collectAsMap()

                                                                                

In [14]:
# broadcast to all nodes for efficient lookup
broadcasted_dict = sc.broadcast(user_ids_mapped)

In [15]:
# Transform to Mllib Rating object
ratings = books_rdd.map(lambda r: Row(userID=int(broadcasted_dict.value.get(r[1])), 
                                      bookID=int(r[0]), 
                                      rating=float(r[2]), 
                                      timestamp=r[3]))

In [16]:
# Verify correctly formatted Row object
ratings.first()

                                                                                

Row(userID=7, bookID=1713353, rating=5.0, timestamp=1123804800)

The data has now been formatted appropriately for the collaborative filtering algorithm:  
Rows of `Rating(int(user), int(product), float(rating), long(timestamp))`

Convert Back to DF

In [17]:
schema = StructType([
    StructField('userID', IntegerType(), False),
    StructField('bookID', IntegerType(), False),
    StructField('rating', FloatType(), False),
    StructField('timestamp', LongType(), False),
])

In [18]:
# Convert back to DF for more efficient processing
ratings_df = spark.createDataFrame(ratings, schema=schema)

In [19]:
ratings_df.show(10)

                                                                                

+------+-------+------+----------+
|userID| bookID|rating| timestamp|
+------+-------+------+----------+
|     7|1713353|   5.0|1123804800|
|    15|1713353|   5.0|1112140800|
|    12|1713353|   5.0|1081036800|
|     3|1713353|   5.0|1077321600|
|    31|1713353|   5.0|1475452800|
|     5|1713353|   5.0|1469750400|
|     8|1713353|   5.0|1466380800|
|     0|1713353|   5.0|1461456000|
|     6|1713353|   5.0|1455408000|
|    14|1713353|   5.0|1453593600|
+------+-------+------+----------+
only showing top 10 rows



In [20]:
# Confirm dtypes
ratings_df.dtypes

[('userID', 'int'),
 ('bookID', 'int'),
 ('rating', 'float'),
 ('timestamp', 'bigint')]

In [21]:
type(ratings_df)

pyspark.sql.dataframe.DataFrame

Subset Data for Testing with Various Sizes

In [29]:
# Get Fractions for 20M, 10M, 1M and 500K rows
frac_20M, frac_10M, frac_1M, frac_500K, frac_10K = [0.4477, 0.2238, 0.0224, 0.0112, 0.0224]


# Subset data 
books_subset_10K = ratings_df.sample(withReplacement=False, fraction=frac_10K)
books_subset_500K = ratings_df.sample(withReplacement=False, fraction=frac_500K)
books_subset_1M = ratings_df.sample(withReplacement=False, fraction=frac_1M)
books_subset_10M = ratings_df.sample(withReplacement=False, fraction=frac_10M)
books_subset_20M = ratings_df.sample(withReplacement=False, fraction=frac_20M)

### Split data into Train/Test Sets


In [31]:
train, test = books_subset_10K.randomSplit([0.7, 0.3], seed=42)

## Build Recommendation Model using ALS

These params are chosen as a baseline based on benchmarking results [here.](http://mymedialite.net/examples/datasets.html)


In [32]:
# Define model hyperparametes
RANK = 10
MAX_ITER = 15
REG_PARAM = 0.05

# Define columns
COL_USER = "userID"
COL_ITEM = "bookID"
COL_RATING = "rating"
COL_PREDICTION = "prediction"
COL_TIMESTAMP = "timestamp"

In [33]:
# num of recommended books
K = 5

In [34]:
# Build a baseline model with 10K rows only
als = ALS(
    maxIter=MAX_ITER,
    rank=RANK,
    regParam=REG_PARAM,
    userCol=COL_USER,
    itemCol=COL_ITEM,
    ratingCol=COL_RATING,
    coldStartStrategy="drop"
)

model = als.fit(train)

23/11/19 11:02:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/11/19 11:02:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [35]:
# save model
model.save('../models/baseline')

                                                                                

### Model Evaluation

In [None]:
# Load model 
# loaded_model = ALS.load('../models/model1')

In [36]:
# Predict with model
predictions = model.transform(test)

In [37]:
# Evaluate the model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

                                                                                

In [38]:
print(f"The root mean squared error for the model is: {rmse}")

The root mean squared error for the model is: 4.719012752097297


In [40]:
# Evaluate the model on test data
# predicted_ratings = model.predictAll(test).map(lambda x: ((x[0], x[1]), x[2]))

In [None]:

# print(
#     "RMSE score = {}".format(evaluations.rmse()),
#     "MAE score = {}".format(evaluations.mae()),
#     "R2 score = {}".format(evaluations.rsquared()),
#     "Explained variance score = {}".format(evaluations.exp_var()),
#     sep="\n"
# )

Tuning Model Hyperparameters

In [41]:
# Build generic ALS Model
als2 = ALS(userCol=COL_USER, 
          itemCol=COL_ITEM, 
          ratingCol=COL_RATING,
          coldStartStrategy="drop", 
          nonnegative=True, 
          implicitPrefs=False)

In [42]:
# Define hyperparams for CV
param_grid = ParamGridBuilder()\
             .addGrid(als2.rank, [5, 20, 40, 80])\
             .addGrid(als2.maxIter, [5, 100, 250, 300])\
             .addGrid(als2.regParam, [0.05, 0.1, 1.5])\
             .build()

In [43]:
# Evaluate Model Performance
evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol="rating", 
                                predictionCol="prediction")

In [44]:
# Perform Cross-Validation for best hyperparams
cv = CrossValidator(estimator=als2,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=5, 
                    parallelism=4)

In [None]:
# Run the cv on training data
model = cv.fit(train)

In [None]:
# Extract best combinations of values from CV
# best_model = model.bestModel