In [1]:
import ast
import json
from pprint import pprint

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, types, functions

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS,ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [2]:
# Read in config file
with open("../config.json") as fp:
    config = json.load(fp)

{'data': {'mongodb_uri': 'localhost:27017/', 'repository_tbl': 'cs5344.repos', 'user_tbl': 'cs5344.users', 'combined_tbl': 'cs5344.combined'}, 'recommender': {'_comment': 'save_preprocess is a flag to save output from preprocessing', 'save_preprocess': False}}


In [3]:
spark = (
    SparkSession
    .builder 
    .appName("reviews") 
    .config("spark.driver.memory", "10g")
    .config("spark.driver.maxResultSize", "0") 
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0") 
    .getOrCreate()
)

21/10/30 11:03:51 WARN Utils: Your hostname, LAPTOP-TCC7ITKI resolves to a loopback address: 127.0.1.1; using 172.28.245.193 instead (on interface eth0)
21/10/30 11:03:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/azmimr/miniconda3/envs/main_env/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/azmimr/.ivy2/cache
The jars for the packages stored in: /home/azmimr/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8f58a9c1-a107-4bbb-acea-d5b43c074e40;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.0 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 626ms :: artifacts dl 20ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts

In [4]:
mongodb_uri = config['data']['mongodb_uri']
    
# collection names for the various datasets
repos_coll = config['data']['repository_tbl']
users_coll = config['data']['user_tbl']
combined_coll = config['data']['combined_tbl']

## Recommendation Model using Collaborative Filtering

In [5]:
# Read in data. Each row consists of a user id and a repo id which he/she starred
combined = (
    spark
    .read.format("com.mongodb.spark.sql.DefaultSource")
    .option("uri","mongodb://" + mongodb_uri + combined_coll)
    .load()
)

                                                                                

In [6]:
# Add the starred column. All rows will have the value 1
combined = combined.withColumn("starred",functions.lit(1))

# Display a few values. The index _id is not needed
combined.show()

+--------------------+-------+--------+-------+
|                 _id|repo_id| user_id|starred|
+--------------------+-------+--------+-------+
|{616a414ee158a735...|  18944|    4524|      1|
|{616a414ee158a735...|  18944|  195597|      1|
|{616a414ee158a735...| 122484|   17833|      1|
|{616a414ee158a735...| 124647| 1849731|      1|
|{616a414ee158a735...| 129791|    6581|      1|
|{616a414ee158a735...| 129791|   78595|      1|
|{616a414ee158a735...| 129791|   89980|      1|
|{616a414ee158a735...| 129791|  135803|      1|
|{616a414ee158a735...| 129791|  263237|      1|
|{616a414ee158a735...| 129791|  307240|      1|
|{616a414ee158a735...| 129791|  800485|      1|
|{616a414ee158a735...| 129791| 1450340|      1|
|{616a414ee158a735...| 129791| 1793013|      1|
|{616a414ee158a735...| 129791| 3086940|      1|
|{616a414ee158a735...| 129791|10206753|      1|
|{616a414ee158a735...| 129791|16163477|      1|
|{616a414ee158a735...| 179115|  275239|      1|
|{616a414ee158a735...| 206378| 7951015| 

In [7]:
num_rows = combined.count()
num_uniq_users = combined.select('user_id').distinct().count()
num_uniq_repos = combined.select('repo_id').distinct().count()

print(f"Total rows in datasets = {num_rows}")
print(f"Number of unique users = {num_uniq_users}")
print(f"Number of unique repos = {num_uniq_repos}")



Total rows in datasets = 71356676
Number of unique users = 1132411
Number of unique repos = 1146520


                                                                                

In [8]:
# Train/test split data
test_proportion = 0.2

(train, test) = combined.randomSplit([(1-test_proportion), test_proportion], seed = 42)

In [9]:
# Instantiate the model instance
als = ALS(
    maxIter=5, 
    regParam=0.01, 
    userCol="user_id", 
    itemCol="repo_id", 
    ratingCol="starred",
    nonnegative = True, 
    implicitPrefs = True,
    coldStartStrategy="drop"
)

In [10]:
%%time
# Train
model = als.fit(train)

                                                                                

CPU times: user 400 ms, sys: 83.9 ms, total: 484 ms
Wall time: 8min 20s


In [11]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="starred",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))



Root-mean-square error = 0.9432094786247853


                                                                                

In [12]:
predictions.show()

[Stage 235:>                                                        (0 + 1) / 1]

+--------------------+-------+-------+-------+------------+
|                 _id|repo_id|user_id|starred|  prediction|
+--------------------+-------+-------+-------+------------+
|{616a417de158a735...|     53| 144334|      1| 7.431414E-5|
|{616a4164e158a735...|     65|   1088|      1|0.0029117684|
|{616a418ce158a735...|     65|   2313|      1|2.2667405E-4|
|{616a4169e158a735...|     65|  12273|      1| 0.001912994|
|{616a4156e158a735...|     65|    748|      1|2.9926094E-5|
|{616a4178e158a735...|     65|   2155|      1|5.0180755E-4|
|{616a4156e158a735...|     65|   6996|      1|5.8697007E-4|
|{616a417de158a735...|     65|   1901|      1| 3.924508E-4|
|{616a4178e158a735...|     65|   2019|      1|  3.78215E-4|
|{616a4182e158a735...|     65|   4145|      1|1.5860506E-4|
|{616a4187e158a735...|     65|    439|      1| 5.594585E-4|
|{616a417de158a735...|     65|   9307|      1|0.0051337653|
|{616a4156e158a735...|     65|   7138|      1| 4.963239E-4|
|{616a414ee158a735...|     65|  12004|  

                                                                                

In [13]:
# Save model
model.write().overwrite().save("../models/recommender_als")

                                                                                

### Various recommendation outputs for the model

In [None]:
# # Get top N recommended repos for all users
# user_recs = model.recommendForAllUsers(3)

# # Use filtering to get the recommendation for a particular user
# user_recs.where(user_recs.user_id == 41994)

In [None]:
# # Get top N recommended user for all repos
# item_recs = model.recommendForAllItems(3)

# # Use filtering to get the recommendation for a particular repo
# item_recs.where(item_recs.repo_id == 1829)

In [14]:
# Sample 1 user
test_user = test.rdd.takeSample(False,1,seed=42)
# Display what is taken
spark.createDataFrame(test_user).show()

                                                                                

+--------------------+---------+--------+-------+
|                 _id|  repo_id| user_id|starred|
+--------------------+---------+--------+-------+
|{616a418ee158a735...|151232910|29633025|      1|
+--------------------+---------+--------+-------+



In [15]:
# Recommend for this user
recommend_repos = model.recommendForUserSubset(spark.createDataFrame(test_user),3)



In [16]:
# recommend_repos.withColumn('recommendations', functions.explode(recommend_repos['recommendations'])).printSchema()

In [17]:
(
    recommend_repos
    .withColumn('recommendations', functions.explode(recommend_repos['recommendations']))
    .select(['user_id', functions.col("recommendations.repo_id"), functions.col("recommendations.rating")])
    .show()
)



+--------+--------+----------+
| user_id| repo_id|    rating|
+--------+--------+----------+
|29633025|10270250|0.41122252|
|29633025|14440270| 0.3534975|
|29633025| 6498492| 0.3428549|
+--------+--------+----------+



                                                                                

In [18]:
# Use the same sample to recommend users for this repo
model.recommendForItemSubset(spark.createDataFrame(test_user), 3).show()



+---------+--------------------+
|  repo_id|     recommendations|
+---------+--------------------+
|151232910|[{6391776, 0.0635...|
+---------+--------------------+



                                                                                

In [19]:
# Read model
model_reload = ALSModel.load("../models/recommender_als")

In [20]:
# Check that model loaded correctly
model_reload.recommendForUserSubset(spark.createDataFrame(test_user),3).show()



+--------+--------------------+
| user_id|     recommendations|
+--------+--------------------+
|29633025|[{10270250, 0.411...|
+--------+--------------------+



                                                                                