## Spark and ALS Training and in a Recommender System Problem considering Provider Fairness, Diversity, and, Popularity Metrics

In this notebook, we will recommend top-10 items to users using MovieLens-20M dataset. We have three metrics we work with and they are as follows in simple terms:

Popularity: Penalize the more popular items, so that unpopular items will get a chance to get recommendations. Otherwise, usually recommender systems over-recommend the already popular items, creating a popularity bias cycle where popular items become more popular and unpopular items become more unpopular.

Provider Fairness: Every provider will be recommended similar number of times. 

Diversity: Every user will be recommended movies with different genres.

We start with training an ALS model and predicting every item-user pair ratings. Then, considering the predicted items, we keep the best of the best items offered from every supplier for every user, keeping the optimal solution intact while reducing the size of the data we need to keep significantly. This is a very important point, we remove items in such a way that we do not lose optimality, the removed items would never be offered in the first place considering the metrics of fairness, diversity, and popularity. Therefore, we save memory and speed significantly while not reducing the solution quality.

In the last step, given the subset of items that are best of the best, we use a similar optimization model in "A Unified Optimization Toolbox for Solving Popularity Bias, Fairness, and Diversity in Recommender Systems" to solve our recommender system problem.

In [1]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, explode
from pyspark import SparkContext

from pyspark.sql import SparkSession
sc = SparkContext
# sc.setCheckpointDir('checkpoint')
spark2 = SparkSession.Builder().appName("Spark2").\
    config("spark.executor.memory", "8g").config("spark.driver.memory","8g").\
    getOrCreate()
spark = spark2.newSession()

In [2]:
import os
notebook_path = os.path.abspath("Spark_ALS_Training.ipynb")
file_loc = os.path.join(os.path.dirname(notebook_path), "OneDrive/Desktop/Res.Fall22/ML20m/")

movies = spark.read.csv(file_loc+"subratingsnew_includebadmovies.csv",header=True)
ratings = spark.read.csv(file_loc+"ratingsnew_includebadmovies.csv",header=True)

ratings = ratings.\
    withColumn('userId2', col('userId2').cast('integer')).\
    withColumn('movieId2', col('movieId2').cast('integer')).\
    withColumn('rating', col('rating').cast('float'))
    
movies = movies.\
    withColumn('movieId2', col('movieId2').cast('integer')).\
    withColumn('rgenre', col('rgenre').cast('string')).\
    withColumn('lognorm', col('lognorm').cast('float')).\
    drop('movieId').drop('genres')

In [3]:
ratings.show()
movies.show()


+-------+--------+------+
|userId2|movieId2|rating|
+-------+--------+------+
|      1|       2|   3.5|
|      5|       2|   3.0|
|     13|       2|   3.0|
|     29|       2|   3.0|
|     34|       2|   3.0|
|     54|       2|   3.0|
|     88|       2|   1.0|
|     91|       2|   3.5|
|    116|       2|   2.0|
|    119|       2|   4.0|
|    120|       2|   1.0|
|    124|       2|   2.0|
|    127|       2|   3.0|
|    128|       2|   3.0|
|    129|       2|   3.0|
|    131|       2|   1.0|
|    132|       2|   3.0|
|    137|       2|   3.0|
|    142|       2|   4.0|
|    152|       2|   3.0|
+-------+--------+------+
only showing top 20 rows

+--------+---------+----------+
|movieId2|   rgenre|   lognorm|
+--------+---------+----------+
|       2|  Fantasy|0.88353753|
|      29|Adventure| 0.7826069|
|      32|   Sci-Fi| 0.9576034|
|      47|  Mystery| 0.9534758|
|      50| Thriller| 0.9622373|
|     112|    Crime|  0.817661|
|     151|  Romance|0.82389474|
|     223|   Comedy| 0.8901887

Ratings file:
userId2: modified user_ids from MovieLens-20M dataset
movieId2:  modified item_ids from MovieLens-20M dataset
rating: rating of item-user pairs

Movies file:
movieId2:  modified item_ids from MovieLens-20M dataset
rgenre:  If one movie has multiple genre tags, I selected one randomly among them.
lognorm: A continuous value between [0, 1] which depicts the popularity of an item.


In [4]:
# Training ALS model
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
## Can use grids to find the best model

# Create test and train set
(train, test) = ratings.randomSplit([0.9,0.1], seed = 1)

# Create ALS model
als = ALS(userCol="userId2", itemCol="movieId2", ratingCol="rating", 
          nonnegative = True, implicitPrefs = False, coldStartStrategy="drop",
          maxIter =15, regParam=0.05, rank =100)

# Import the requisite items

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
model = als.fit(ratings)
# model.write().save('saved_model/1') 
# model = ALSModel.load('saved_model/1')

# View the predictions
test_predictions = model.transform(ratings)
RMSE = evaluator.evaluate(test_predictions)
print('RMSE', RMSE)

RMSE 0.6792096218051907


In [5]:
from pyspark.sql.types import StructType,StructField, FloatType, IntegerType
schema = StructType([ \
    StructField("userId2",IntegerType(),True), \
    StructField("movieId2",IntegerType(),True), \
    StructField("rating",FloatType(),True), \
  ])
    
# Let us consider only a thousand user, distribute items to G many distinct groups. This means that every distinct group is 
# one provider and items are divided among them. Pop_penal value is the penalty term we want to apply to lognorm values.
# Therefore, higher the lognorm value, higher the penalty applied to that item.

itemno = 18340; userno = 138493; u_subset = 1000
Group_no = 15; pop_penal = 1; topk = 10; genre_no=19
u_subset = u_subset; i_subset = itemno ###small test data is created here

## assign every item to a distinct provider group
movies = movies.sort('movieId2')
shuffled_groups = np.arange(1,1+Group_no).repeat(int(i_subset/Group_no))
np.random.seed(1)
np.random.shuffle(shuffled_groups)

schema_integer =StructType([ StructField("movieId2",IntegerType(),True), \
                            StructField("group",IntegerType(),True)  ])

###ratings2 incorporated into the pop. penalized predictions and movie information
cc = [(int(x+1), int(shuffled_groups[x])) for x in range(len(shuffled_groups))] 
df2 = spark.createDataFrame(data= cc, schema=schema_integer) 
movies = movies.join(df2,['movieId2'],'inner')


In [6]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

data2 = []    
for userx in range(1,u_subset):
    for i in range(1,i_subset+1): data2.append((userx,i,0.11)) ## Create data frame for user u for every item 

df = spark.createDataFrame(data=data2,schema=schema) 
test_predictions_for_this_user = model.transform(df) #Firstly we predict all the item-user pairs     

ratings2 = test_predictions_for_this_user.join(movies,['movieId2'],'inner')
ratings2 = ratings2.withColumn("prediction2", F.col("prediction") - F.col("lognorm")*pop_penal) # penalize popularity

ratings2.show() ## all the ratings for every user-item pair after the popularity penalization


+--------+-------+------+----------+-----------+----------+-----+-----------+
|movieId2|userId2|rating|prediction|     rgenre|   lognorm|group|prediction2|
+--------+-------+------+----------+-----------+----------+-----+-----------+
|     148|     31|  0.11| 2.6675463|  Adventure|0.49997655|    5|  2.1675696|
|     463|     31|  0.11| 2.2978752|      Drama| 0.4634931|    7|   1.834382|
|     471|     31|  0.11| 2.7329488|     Comedy|0.81200963|   11|  1.9209392|
|     496|     31|  0.11| 2.0440345|      Drama|0.46425992|    3|  1.5797746|
|     833|     31|  0.11| 2.3764446|     Comedy| 0.5946691|   15|  1.7817755|
|    1088|     31|  0.11| 2.4171207|    Musical| 0.8096021|    1|  1.6075187|
|    1238|     31|  0.11| 2.5431688|     Comedy| 0.6794116|    5|  1.8637571|
|    1342|     31|  0.11| 2.2901168|     Horror|0.68249434|   14|  1.6076224|
|    1580|     31|  0.11|  4.000636|     Action| 0.9329459|    4|  3.0676901|
|    1591|     31|  0.11| 3.6357775|     Sci-Fi| 0.7317807|    2

In [7]:
windowRate = Window.partitionBy('userId2',"group").orderBy(F.col("prediction2").desc())
topk_rates = ratings2.withColumn("row",F.row_number().over(windowRate)) \
  .filter(F.col("row") <= topk) 

topk_rates.show() ### Get top 10 items for user every u provider s according to penalized predictions ('prediction2')

+--------+-------+------+----------+-----------+-----------+-----+-----------+---+
|movieId2|userId2|rating|prediction|     rgenre|    lognorm|group|prediction2|row|
+--------+-------+------+----------+-----------+-----------+-----+-----------+---+
|    9684|      2|  0.11|  5.075377|    Romance|0.049434382|    2|   5.025943|  1|
|   10393|      2|  0.11|  4.770469|     Sci-Fi|        0.0|    2|   4.770469|  2|
|   14175|      2|  0.11|  4.754931|Documentary|        0.0|    2|   4.754931|  3|
|   12048|      2|  0.11|  4.819127|  Animation| 0.10049947|    2|  4.7186275|  4|
|   17137|      2|  0.11|  4.755494|  Animation| 0.09208068|    2|  4.6634135|  5|
|    9317|      2|  0.11|  4.693764|      Drama| 0.09208068|    2|  4.6016836|  6|
|   14195|      2|  0.11|  4.710457|Documentary| 0.16927862|    2|   4.541178|  7|
|   11083|      2|  0.11| 4.5048018|      Drama|        0.0|    2|  4.5048018|  8|
|   15316|      2|  0.11|  4.753965|  Animation| 0.25220758|    2|   4.501757|  9|
|   

In [8]:
windowRate2 = Window.partitionBy('userId2',"group")
genre_things2 = topk_rates.withColumn("cnt" ,F.size(F.collect_set("rgenre").over(windowRate2))).\
        drop('movieID2').drop("rating").drop("prediction").drop("rgenre").drop("lognorm").\
        drop("prediction2").drop("row").dropDuplicates()

windowRate3 = Window.partitionBy('userId2',"group","rgenre").orderBy(F.col("prediction2").desc())
top_genres = ratings2.withColumn("row",F.row_number().over(windowRate3)) \
  .filter(F.col("row") <= 1) 

top_genres.show() ## get the best item for each user and provider for each genre g



+--------+-------+------+----------+---------+-----------+-----+-----------+---+
|movieId2|userId2|rating|prediction|   rgenre|    lognorm|group|prediction2|row|
+--------+-------+------+----------+---------+-----------+-----+-----------+---+
|   15406|      1|  0.11| 4.0552235|  Musical|0.019176349|    2|   4.036047|  1|
|   14939|      1|  0.11|  3.220876|Film-Noir| 0.07290433|    5|  3.1479716|  1|
|    7945|      1|  0.11| 3.6485958|   Horror|        0.0|    5|  3.6485958|  1|
|   13681|      1|  0.11| 3.4138377|Film-Noir| 0.03538972|   12|   3.378448|  1|
|    8923|      1|  0.11| 3.8737524|Animation| 0.07290433|   15|   3.800848|  1|
|   14167|      1|  0.11| 3.8388336| Thriller| 0.11555064|   15|  3.7232828|  1|
|   11170|      2|  0.11|  4.338201|Animation| 0.08292894|    1|   4.255272|  1|
|    6537|      2|  0.11| 3.9814568|     IMAX|  0.6525875|    1|  3.3288693|  1|
|    9684|      2|  0.11|  5.075377|  Romance|0.049434382|    2|   5.025943|  1|
|   10393|      2|  0.11|  4

In [9]:
top_genres_temp = top_genres.join(genre_things2,["userId2","group"])

top_genres2 = top_genres_temp.withColumn("row",F.row_number().over(windowRate)) \
  .filter((F.col("cnt") < F.col("row")) & (F.col("row") <= topk)) 

ratings_last = topk_rates.unionByName(top_genres2.drop("cnt")).\
    orderBy('userId2',"group",F.col("prediction2").desc())

ratings_last.show() ## Create a new list by items with best predicted ratings which also has at least 10 distinct genres. 
#Therefore, at this point we need all the items to find the optimal solution whether we care more about the predicted rating 
#or the diversity of the system

+--------+-------+------+----------+-----------+-----------+-----+-----------+---+
|movieId2|userId2|rating|prediction|     rgenre|    lognorm|group|prediction2|row|
+--------+-------+------+----------+-----------+-----------+-----+-----------+---+
|   11658|      1|  0.11|  4.792809|Documentary| 0.03538972|    1|   4.757419|  1|
|   11819|      1|  0.11| 4.3358383|      Drama|        0.0|    1|  4.3358383|  2|
|   10129|      1|  0.11| 4.3320255|Documentary| 0.03538972|    1|  4.2966356|  3|
|   14613|      1|  0.11| 4.3614945|     Comedy| 0.07290433|    1|  4.2885904|  4|
|   16619|      1|  0.11| 4.3326073|     Action| 0.08292894|    1|   4.249678|  5|
|   14385|      1|  0.11|  4.279734|Documentary|0.061822653|    1|  4.2179117|  6|
|    8896|      1|  0.11|  4.171053|    Mystery|        0.0|    1|   4.171053|  7|
|   10945|      1|  0.11| 4.1559725|      Crime|        0.0|    1|  4.1559725|  8|
|    1790|      1|  0.11| 4.2657275|    Western| 0.15094036|    1|   4.114787|  9|
|   

At this point, rather than considering thousands of items for each user, we will consider around 15 items offered from every supplier (15) which is approximately 225 items. We already have a high quality subset of items, and can use heuristics to find a competitive solution. However, I would like to find the optimal solution by using optimization software Gurobi next.

In [10]:
from gurobipy import *
import pandas as pd

df = ratings_last.toPandas() # with the rest of the calculations, I found it faster to work with pandas

In [11]:
mm = Model('Opt')

u_list = list(set(df['userId2'].values)); i_list = list(set(df['movieId2'].values))
U = len(u_list)

genre_no_dict = {}; string_genre = list(set(df['rgenre'].values))
R = len(string_genre)
for j in range(R): genre_no_dict[j] = [j]

dvar_tuple = tuple(df[['userId2','movieId2']].itertuples(index=False, name=None))
x = mm.addVars(dvar_tuple, obj=df['prediction2'].values, vtype=GRB.BINARY, name ="x")
y = mm.addVars(U,R, vtype=GRB.BINARY)

####CONSTRAINTS
mm.addConstrs(quicksum(x[(j,i)] for i in df[df['userId2'] == j]['movieId2'].values) == topk for j in u_list) ###Top-k list for every user

w_constraint = 8 # recommend at least this amount of distinct items for every user
mm.addConstrs(quicksum(y[j,r] for r in range(R)) >= w_constraint for j in range(U))
for r in range(R):
    r_string = string_genre[r]
    temp = df[df['rgenre'] == str(r_string)]
    mm.addConstrs(quicksum(x[(u_list[j],i)] for i in temp[temp['userId2'] == u_list[j]]['movieId2'].values) >= y[j,r] for j in range(U) ) 

## decide on lower bound and upper bound on number of recommendations of items from each supplier
avg_rec = (U*topk)/Group_no ###number of total recommendations divided by the number of providers
upper_alpha= 1.2
lower_alpha= 0.8
mm.addConstrs(quicksum(x[(j,i)]  for j in u_list for i in df[(df['group'] == g) & (df['userId2'] == j)]['movieId2'].values) <= np.ceil(upper_alpha*avg_rec) for g in range(1,Group_no+1))
mm.addConstrs(quicksum(x[(j,i)]  for j in u_list for i in df[(df['group'] == g) & (df['userId2'] == j)]['movieId2'].values) >= np.floor(lower_alpha*avg_rec) for g in range(1,Group_no+1))
#for bigger data, this part could be made faster by preprocessing the groups beforehand

mm.optimize()

Set parameter Username
Academic license - for non-commercial use only - expires 2023-11-25
Gurobi Optimizer version 10.0.0 build v10.0.0rc2 (win64)

CPU model: 12th Gen Intel(R) Core(TM) i7-12700H, instruction set [SSE2|AVX|AVX2]
Thread count: 14 physical cores, 20 logical processors, using up to 20 threads

Optimize a model with 21009 rows, 241904 columns and 929654 nonzeros
Model fingerprint: 0x31e0bfa1
Variable types: 0 continuous, 241904 integer (241904 binary)
Coefficient statistics:
  Matrix range     [1e+00, 1e+00]
  Objective range  [2e+00, 7e+00]
  Bounds range     [1e+00, 1e+00]
  RHS range        [8e+00, 8e+02]
Found heuristic solution: objective 40057.639706
Presolve removed 2138 rows and 2138 columns
Presolve time: 0.87s
Presolved: 18871 rows, 239766 columns, 925378 nonzeros
Variable types: 0 continuous, 239766 integer (239766 binary)
Found heuristic solution: objective 39496.699337
Deterministic concurrent LP optimizer: primal simplex, dual simplex, and barrier
Showing ba

In [12]:
solution_val = mm.getVars()

In [13]:
df['answer'] = mm.x[0:len(x)]
df2 = df[df['answer'] > 0.95]
df2.head(5)

Unnamed: 0,movieId2,userId2,rating,prediction,rgenre,lognorm,group,prediction2,row,answer
12,17420,1,0.11,3.703664,Adventure,0.0,1,3.703664,10,1.0
52,10933,1,0.11,3.646404,Animation,0.0,4,3.646404,9,1.0
53,16192,1,0.11,3.694795,Western,0.049434,4,3.645361,10,1.0
66,7945,1,0.11,3.648596,Horror,0.0,5,3.648596,9,1.0
67,12959,1,0.11,3.75588,War,0.145809,5,3.610071,10,1.0


In [14]:
## Print metricsdf2['logmatch'] = df2['movieId2'].map(load_for_pop.set_index('movieId2')['lognorm'])    
        
Div = np.mean(df2.groupby('userId2')['rgenre'].nunique()) ## for each user check the number of distinct groups offered
Group = df2.groupby('group').agg({'answer': 'count'}).values ## number of recommendations made from each provider
Pop = sum(df2['lognorm'])/(U*10)  ## Total popularity value of the recommendation, smaller is better
Util = sum(df2['prediction']) 

Fair = 0
for group_fair in Group:
    Fair -= (group_fair[0]/(U*10))*math.log(group_fair/(U*10),len(Group)) ##Z-metric for fairness, similar to entropy error.


print('Popularity', Pop)
print('Diversity', Div)
print('Fairness',Fair)
print('Util',Util)

Popularity 0.1524306163531136
Diversity 8.20920920920921
Fairness 0.9939569198332971
Util 37805.42073702812


In [18]:
from pyspark.sql.functions import avg
ratings2.select(avg('lognorm')).collect()

[Row(avg(lognorm)=0.31906942299628643)]

We see that with popularity penalized and diversity incentivized we offer 8 distinct items in a top-10 list with much lower average popularity value than the average of the overall items provided. 

Overall, the procedure is as follows:
    
   - Assume we know the items, groups, item popularity, item genre, and which provider group the items belong to.
   - Train ALS and predict item-user utilities for all unknown values. 
   - Keep the best k many items offered for every user considering every provider. Note the number of different genres in this list as number n.
   - Keep the best items offered from each genre for every user and provider.
   - Combine the kept items for each user and provider noting that we only need k many distinct genres in the combined group.
   - In this way, we obtain the most desirable items considering popularity, diversity, and predicted utility of the items.
   - Time to use optimization software Gurobi to find the best solution for the recommender system problem. Our paper "A Unified Optimization Toolbox for Solving Popularity Bias, Fairness, and Diversity in Recommender Systems" explains the problem and its constraints in detail.

Note that, we can have a lot of control over the metrics. We can decide how much diversity we want to include in our lists, or how much we want to penalize the popular items. Similarly, we can increase or decrease the constraints on the provider fairness according to the problem at hand. 

Scalability is an issue when we increase the number of users, however, this model can still solve up to tens of thousand of users at the same time. If we want to scale to millions, two options can be considered:
-Large-scale optimization techniques such as Dantzig-Wolfe algorithm which I am currently working on.
-Smart heuristics, such as, separating some of the users according to their preferences and solve smaller and more manageable chunks of optimization problems. This procedure will lead to suboptimal solutions, but applied correctly should not lead to significant objective function value drops. We discussed more on heuristics in our paper "Making smart recommendations for perishable and stockout products."