## ** Building a large scale job Recommendation Engine using Implicit data in pyspark **

In [1]:
import os
import math
import datetime
import pyspark.sql.functions as sf
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
from pyspark import SparkConf, SparkContext
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark import SparkContext, SparkConf, SQLContext
# from pyspark.ml.evaluation import RegressionEvaluator
# from pyspark.ml.recommendation import ALS
# from pyspark.sql import Row
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.sql.types import TimestampType

In [2]:
conf = SparkConf().setAppName("jobRecommendationEngine")
sc = SparkContext(conf=conf)
sc

In [3]:
# load job Clicks file into rdd
datasets_path=os.getcwd() + "/RE_data"
ratings_file = os.path.join(datasets_path, 'job_clicks.csv')
ratings_raw_data = sc.textFile("file:///" + ratings_file)
ratings_raw_data_header = ratings_raw_data.take(1)[0]
ratings_data = ratings_raw_data.filter(lambda line: line != ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),int(float(tokens[2])))).cache()

print ("train size is ", ratings_data.count())
print('\nColumns are:', ratings_raw_data_header)
ratings_data.take(3)

train size is  100014

Columns are: userId,jobId,Clicks


[(0, 1248, 50), (0, 1027, 70), (0, 340, 53)]

In [4]:
# load jobs category file into rdd
jobs_file = os.path.join(datasets_path, 'jobs.csv')
jobs_raw_data = sc.textFile("file:///" + jobs_file)
jobs_raw_data_header = jobs_raw_data.take(1)[0]

jobs_data = jobs_raw_data.filter(lambda line: line!=jobs_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1])).cache()
print('\nColumns are:', jobs_raw_data_header)
jobs_data.take(3)


Columns are: jobID,job_category


[(1, 'IT'), (2, 'IT'), (3, 'IT')]

### Split data into train, validation and test datasets

In [5]:
#60% training, 20% validation, 20% test
rddTraining, rddValidating, rddTesting = ratings_data.randomSplit([6,2,2], seed=1001)
 
#Add user ratings in the training model
nbValidating = rddValidating.count()
nbTesting    = rddTesting.count()

print("Training: %d, validation: %d, test: %d" % (rddTraining.count(), nbValidating, rddTesting.count()))

Training: 59908, validation: 20115, test: 19991


### Model Training

Here, I am using RMSE but for these kind of problems where we have implicit
features, it is better to use ** Mean Percentage Ranking (MPR) **

In [6]:
#[START how_far]
def howFarAreWe(model, against, sizeAgainst):
    againstNoRatings = against.map(lambda x: (int(x[0]), int(x[1])) )
    againstWiRatings = against.map(lambda x: ((int(x[0]),int(x[1])), int(x[2])) )
    predictions = model.predictAll(againstNoRatings).map(lambda p: ( (p[0],p[1]), p[2]) )
    predictionsAndRatings = predictions.join(againstWiRatings).values()    
    return sqrt(predictionsAndRatings.map(lambda s: (s[0] - s[1]) ** 2).reduce(add) / float(sizeAgainst))
#[END how_far]

In [7]:
#finding best set of parameters
ranks  = [5,10,15,20]
reguls = [0.1, 1,10]
iters  = [5,10,20]
alpha = [10, 20, 40]

finalModel = None
finalRank  = 0
finalRegul = float(0)
finalIter  = -1
finalDist   = float(300)
finalAlpha = float(0)

#[START train_model]
for cRank, cRegul, cIter, cAlpha in itertools.product(ranks, reguls, iters, alpha):
    model = ALS.trainImplicit(rddTraining, cRank, cIter, float(cRegul),alpha=float(cAlpha))
    dist = howFarAreWe(model, rddValidating, nbValidating)
    if dist < finalDist:
        print(cIter, cRank,cAlpha,cRegul)
        print("Best so far:%f" % dist)
        finalModel = model
        finalRank  = cRank
        finalRegul = cRegul
        finalIter  = cIter
        finalDist  = dist
        finalAlpha  = cAlpha 
#[END train_model]

print("Rank %i" % finalRank) 
print("Regul %f" % finalRegul) 
print("Iter %i" % finalIter)  
print("Dist %f" % finalDist) 
print("Alpha %f" % finalAlpha) 


5 5 10 0.1
Best so far:55.523455
5 5 20 0.1
Best so far:55.498406
5 5 40 0.1
Best so far:55.495280
10 5 20 0.1
Best so far:55.490687
20 5 20 0.1
Best so far:55.482580
20 5 40 1
Best so far:55.477242
20 5 40 10
Best so far:55.468791
Rank 5
Regul 10.000000
Iter 20
Dist 55.468791
Alpha 40.000000


In [8]:
model = ALS.trainImplicit(rddTraining, rank=finalRank, iterations=finalIter, lambda_= float(finalRegul),alpha=float(finalAlpha))
# Calculate all predictions
rddTesting_withoutclicks = rddTesting.map(lambda r: ((r[0], r[1])))
predictions = model.predictAll(rddTesting_withoutclicks).map(lambda r: ((r[0], r[1]), (r[2])))
predictions.take(3)
# user id, node_id, actual clickss,pred clickss -> df below
rates_and_preds = rddTesting.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions) 
rates_and_preds.take(3)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 56.819548213976084


In [9]:
from pyspark.sql import SparkSession
x = rates_and_preds.map(lambda x : (x[0][0],x[0][1],x[1][0],x[1][1]))
spark = SparkSession(sc)
hasattr(x, "toDF")
x.toDF().show(4)

+---+----+----+------------------+
| _1|  _2|  _3|                _4|
+---+----+----+------------------+
|  1|1129| 0.0|0.8724590465403006|
|  2| 144|94.0|0.8851383645065171|
|  2| 272|51.0|0.9867535964890282|
|  2| 364|67.0|0.9844975567752661|
+---+----+----+------------------+
only showing top 4 rows



### Get total clicks and average clicks given to each job by different users

In [10]:
def get_counts_and_averages(ID_and_ratings_tuple):    
    nratings = len(ID_and_ratings_tuple[1]) 
    return ID_and_ratings_tuple[0], (nratings, sum([float(val) for val in ID_and_ratings_tuple[1]])/nratings)

In [11]:
job_ID_with_ratings_RDD = (ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
job_ID_with_ratings_RDD_updated = job_ID_with_ratings_RDD.map(lambda x : (x[0], list(x[1])))
job_ID_with_avg_ratings_RDD = job_ID_with_ratings_RDD_updated.map(get_counts_and_averages)  # count and average rating
job_rating_counts_RDD = job_ID_with_avg_ratings_RDD.map(lambda x: (int(x[0]), x[1][0]))    # rating count per job
job_rating_counts_RDD.cache()
job_rating_counts_RDD.take(3)

[(1248, 20), (340, 9), (60040, 18)]

In [12]:
# get user-wise jobs clicked
all_users_ratings_RDD = ratings_data.map(lambda x: (x[0], x[1])).groupByKey()
all_users_ratings_RDD1 = all_users_ratings_RDD.map(lambda x : (x[0], list(x[1])))    # jobs clicked by each user

### finding unrated jobs by each user- we will use this set for model's prediction/recommendations
job_ids = set(jobs_data.map(lambda x : x[0]).toLocalIterator()) # list of all job ids
unrated_jobs_RDD = all_users_ratings_RDD1.map(lambda x: (x[0], list((job_ids) - set(x[1]))))

# #create user_id and unrated job id pairs
unrated_userjobs_RDD = unrated_jobs_RDD.flatMap(lambda x : [(x[0],i) for i in x[1]])

# # #model predictions for each user and not clicked job pairs
recommendations_RDD = model.predictAll(unrated_userjobs_RDD)
recommended_jobs_rating_RDD = recommendations_RDD.map(lambda x: (x.product,(x.user, x.rating)))
recommended_jobs_rating_RDD.cache()
print(recommended_jobs_rating_RDD.take(10))

[(1084, (384, 0.951018851618679)), (1084, (386, 1.0361604903013872)), (1084, (454, 0.943636857628553)), (1084, (574, 0.9526263070748273)), (1084, (534, 1.012799342786141)), (1084, (656, 1.0184441570557023)), (1084, (324, 0.9242567294344457)), (1084, (180, 0.9640708082731622)), (1084, (340, 0.9412727527174267)), (1084, (130, 1.0113882343770575))]


### Joining job title and total number of clicks received by each job for further filtering recommendations

In [13]:
# #     # converting id into int for job_clicks_count RDD to perform join
# job_clicks_counts_RDD_updated = job_clicks_counts_RDD.map(lambda x: (int(x[0]), x[1]))

# join job name with job id, predicted rating for job and total number of ratings received by each job
recommendations_rating_title_and_count_RDD = recommended_jobs_rating_RDD.join(jobs_data).join(job_rating_counts_RDD)
recommendations_rating_title_and_count_RDD = recommendations_rating_title_and_count_RDD.map(lambda r: (r[0], r[1][0][1], r[1][0][0][0],round(r[1][0][0][1],2),r[1][1]))
recommendations_rating_title_and_count_RDD = recommendations_rating_title_and_count_RDD.map(lambda x: (x[2],(x[0],x[1], x[3],x[4])))
recommendations_rating_title_and_count_RDD.take(3)

[(384, (4992, 'Finance', 0.72, 10)),
 (386, (4992, 'Finance', 0.4, 10)),
 (454, (4992, 'Finance', 0.12, 10))]

## ** Top 5 recommendations **

In [14]:
# filter only those jobs which have been clicked by atleast 20 users
# take only top5 jobs by sorting based on preference confidence
top_jobs = recommendations_rating_title_and_count_RDD.groupBy(lambda x : x[0])\
                               .map(lambda x : list(x[1]))\
                               .map(lambda r: [i for i in r if i[1][3] > 20])\
                               .map(lambda a: [i for i in sorted(a, key=lambda x: -x[1][2])[:5]])   

#preparing dataframe to insert in Database
rec_jobs_df = top_jobs.map(lambda x: [(i[0],i[1][0],i[1][1],i[1][2]) for i in x]).flatMap(lambda x: x).toDF()\
                                .withColumnRenamed("_1", "user_id")\
                                .withColumnRenamed("_2", 'job_recommendations')\
                                .withColumnRenamed("_3", 'job_category')\
                                .withColumnRenamed("_4", 'preference_confidence')\
                                .withColumnRenamed("_5", "total_clicks")
                
# #final recommendation engine dataframe to be saved in Database
final_df_rec_eng = rec_jobs_df.withColumn("rec_date", sf.lit(datetime.datetime.now()).cast(TimestampType()))   
final_df_rec_eng = final_df_rec_eng.withColumn("rec_number", sf.row_number().over(Window.partitionBy("user_id").orderBy(desc("preference_confidence"))))    
final_df_rec_eng.show(15)

+-------+-------------------+--------------------+---------------------+--------------------+----------+
|user_id|job_recommendations|        job_category|preference_confidence|            rec_date|rec_number|
+-------+-------------------+--------------------+---------------------+--------------------+----------+
|     26|              27611|Business Intellig...|                 1.12|2019-06-11 06:12:...|         1|
|     26|             106489|              Retial|                 1.06|2019-06-11 06:12:...|         2|
|     26|               5971|             Banking|                 1.05|2019-06-11 06:12:...|         3|
|     26|               7022|                  HR|                 1.05|2019-06-11 06:12:...|         4|
|     26|               7502|                  HR|                 1.05|2019-06-11 06:12:...|         5|
|     29|                434|             Finance|                  1.1|2019-06-11 06:12:...|         1|
|     29|                165|                  IT|     

**Before showing the top 5 recommendations, we can filter them based on job category to show only 
recommendations from the categories users have previously looked at.
Also, Here, I am using RMSE but for these kind of problems where we have implicit
features, it is better to use Mean Percentage Ranking (MPR) **

### Saving the model

In [15]:
# from pyspark.mllib.recommendation import MatrixFactorizationModel

# model_path = os.path.join('..', 'models', 'job_lens_als')

# # Save and load model
# model.save(sc, model_path)
# same_model = MatrixFactorizationModel.load(sc, model_path)

In [16]:
#20% training, 30% validation, 50% test
rddTraining, rddValidating, rddTesting = ratings_data.randomSplit([2,3,5], seed=1001)
 
#Add user ratings in the training model
nbValidating = rddValidating.count()
nbTesting    = rddTesting.count()

print("Training: %d, validation: %d, test: %d" % (rddTraining.count(), nbValidating, rddTesting.count()))

Training: 19996, validation: 29883, test: 50135


In [17]:
#finding best set of parameters
ranks  = [5,10,15,20]
reguls = [0.1, 1,10]
iters  = [5,10,20]
alpha = [10, 20, 40]

finalModel = None
finalRank  = 0
finalRegul = float(0)
finalIter  = -1
finalDist   = float(300)
finalAlpha = float(0)

#[START train_model]
for cRank, cRegul, cIter, cAlpha in itertools.product(ranks, reguls, iters, alpha):
    model = ALS.trainImplicit(rddTraining, cRank, cIter, float(cRegul),alpha=float(cAlpha))
    dist = howFarAreWe(model, rddValidating, nbValidating)
    if dist < finalDist:
        print(cIter, cRank,cAlpha,cRegul)
        print("Best so far:%f" % dist)
        finalModel = model
        finalRank  = cRank
        finalRegul = cRegul
        finalIter  = cIter
        finalDist  = dist
        finalAlpha  = cAlpha 
#[END train_model]

print("Rank %i" % finalRank) 
print("Regul %f" % finalRegul) 
print("Iter %i" % finalIter)  
print("Dist %f" % finalDist) 
print("Alpha %f" % finalAlpha) 

5 5 10 0.1
Best so far:54.106427
5 5 20 0.1
Best so far:54.089740
5 5 40 0.1
Best so far:54.065282
10 5 40 0.1
Best so far:54.051065
10 5 40 1
Best so far:54.037789
20 5 20 1
Best so far:54.031673
10 5 40 10
Best so far:54.030458
20 5 20 10
Best so far:54.006490
Rank 5
Regul 10.000000
Iter 20
Dist 54.006490
Alpha 20.000000


In [18]:
model = ALS.trainImplicit(rddTraining, rank=finalRank, iterations=finalIter, lambda_= float(finalRegul),alpha=float(finalAlpha))
# Calculate all predictions
rddTesting_withoutclicks = rddTesting.map(lambda r: ((r[0], r[1])))
predictions = model.predictAll(rddTesting_withoutclicks).map(lambda r: ((r[0], r[1]), (r[2])))
predictions.take(3)
# user id, node_id, actual clickss,pred clickss -> df below
rates_and_preds = rddTesting.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions) 
rates_and_preds.take(3)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print ('For testing data the RMSE is %s' % (error))

For testing data the RMSE is 56.98523107288847


In [19]:
from pyspark.sql import SparkSession
x = rates_and_preds.map(lambda x : (x[0][0],x[0][1],x[1][0],x[1][1]))
spark = SparkSession(sc)
hasattr(x, "toDF")
x.toDF().show(4)

+---+----+----+-------------------+
| _1|  _2|  _3|                 _4|
+---+----+----+-------------------+
|  1|1029|66.0|-0.1551305171165323|
|  1|1129| 0.0|0.09516518060023466|
|  2|  52|98.0| 0.9753075319549064|
|  2| 144|94.0| 0.5255806590999219|
+---+----+----+-------------------+
only showing top 4 rows



In [20]:
job_ID_with_ratings_RDD = (ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
job_ID_with_ratings_RDD_updated = job_ID_with_ratings_RDD.map(lambda x : (x[0], list(x[1])))
job_ID_with_avg_ratings_RDD = job_ID_with_ratings_RDD_updated.map(get_counts_and_averages)  # count and average rating
job_rating_counts_RDD = job_ID_with_avg_ratings_RDD.map(lambda x: (int(x[0]), x[1][0]))    # rating count per job
job_rating_counts_RDD.cache()
job_rating_counts_RDD.take(3)

[(1248, 20), (340, 9), (60040, 18)]

In [21]:
# get user-wise jobs clicked
all_users_ratings_RDD = ratings_data.map(lambda x: (x[0], x[1])).groupByKey()
all_users_ratings_RDD1 = all_users_ratings_RDD.map(lambda x : (x[0], list(x[1])))    # jobs clicked by each user

### finding unrated jobs by each user- we will use this set for model's prediction/recommendations
job_ids = set(jobs_data.map(lambda x : x[0]).toLocalIterator()) # list of all job ids
unrated_jobs_RDD = all_users_ratings_RDD1.map(lambda x: (x[0], list((job_ids) - set(x[1]))))

# #create user_id and unrated job id pairs
unrated_userjobs_RDD = unrated_jobs_RDD.flatMap(lambda x : [(x[0],i) for i in x[1]])

# # #model predictions for each user and not clicked job pairs
recommendations_RDD = model.predictAll(unrated_userjobs_RDD)
recommended_jobs_rating_RDD = recommendations_RDD.map(lambda x: (x.product,(x.user, x.rating)))
recommended_jobs_rating_RDD.cache()
print(recommended_jobs_rating_RDD.take(10))

[(1084, (384, 1.1352492817551871)), (1084, (386, 0.9286078834729442)), (1084, (454, 0.9061540580481768)), (1084, (574, 1.079442540164557)), (1084, (534, 0.9371451295085724)), (1084, (656, 0.8658506163838839)), (1084, (324, 1.0606029130383046)), (1084, (180, 0.9168045455156122)), (1084, (340, 0.8368054906757925)), (1084, (130, 1.0251591421569208))]


In [22]:
# #     # converting id into int for job_clicks_count RDD to perform join
# job_clicks_counts_RDD_updated = job_clicks_counts_RDD.map(lambda x: (int(x[0]), x[1]))

# join job name with job id, predicted rating for job and total number of ratings received by each job
recommendations_rating_title_and_count_RDD = recommended_jobs_rating_RDD.join(jobs_data).join(job_rating_counts_RDD)
recommendations_rating_title_and_count_RDD = recommendations_rating_title_and_count_RDD.map(lambda r: (r[0], r[1][0][1], r[1][0][0][0],round(r[1][0][0][1],2),r[1][1]))
recommendations_rating_title_and_count_RDD = recommendations_rating_title_and_count_RDD.map(lambda x: (x[2],(x[0],x[1], x[3],x[4])))
recommendations_rating_title_and_count_RDD.take(3)

[(384, (81132, 'Business Intelligence', 0.52, 1)),
 (386, (81132, 'Business Intelligence', -0.32, 1)),
 (454, (81132, 'Business Intelligence', 0.21, 1))]

In [23]:
# filter only those jobs which have been clicked by atleast 20 users
# take only top5 jobs by sorting based on preference confidence
top_jobs = recommendations_rating_title_and_count_RDD.groupBy(lambda x : x[0])\
                               .map(lambda x : list(x[1]))\
                               .map(lambda r: [i for i in r if i[1][3] > 20])\
                               .map(lambda a: [i for i in sorted(a, key=lambda x: -x[1][2])[:5]])   

#preparing dataframe to insert in Database
rec_jobs_df = top_jobs.map(lambda x: [(i[0],i[1][0],i[1][1],i[1][2]) for i in x]).flatMap(lambda x: x).toDF()\
                                .withColumnRenamed("_1", "user_id")\
                                .withColumnRenamed("_2", 'job_recommendations')\
                                .withColumnRenamed("_3", 'job_category')\
                                .withColumnRenamed("_4", 'preference_confidence')\
                                .withColumnRenamed("_5", "total_clicks")
                
# #final recommendation engine dataframe to be saved in Database
final_df_rec_eng = rec_jobs_df.withColumn("rec_date", sf.lit(datetime.datetime.now()).cast(TimestampType()))   
final_df_rec_eng = final_df_rec_eng.withColumn("rec_number", sf.row_number().over(Window.partitionBy("user_id").orderBy(desc("preference_confidence"))))    
final_df_rec_eng.show(15)

+-------+-------------------+--------------------+---------------------+--------------------+----------+
|user_id|job_recommendations|        job_category|preference_confidence|            rec_date|rec_number|
+-------+-------------------+--------------------+---------------------+--------------------+----------+
|     26|               4369|             Finance|                 1.16|2019-06-11 06:24:...|         1|
|     26|              46578|                  IT|                 1.13|2019-06-11 06:24:...|         2|
|     26|              35836|                  IT|                 1.12|2019-06-11 06:24:...|         3|
|     26|               8970|Business Intellig...|                 1.11|2019-06-11 06:24:...|         4|
|     26|               1957|                  IT|                 1.11|2019-06-11 06:24:...|         5|
|     29|                849|                  HR|                 0.98|2019-06-11 06:24:...|         1|
|     29|              68237|             Banking|     