In [20]:
from recommender import Recommender
from pyspark.ml.evaluation import RegressionEvaluator
from eval_model import TopQuantileEvaluator, NDCGEvaluator, NDCG10Evaluator
from pyspark.sql import functions as F

In [21]:
rmse_evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", predictionCol="prediction")

quant_evaluator = TopQuantileEvaluator()

ndcg_evaluator = NDCGEvaluator()

ndcg10_evaluator = NDCG10Evaluator()

In [22]:
# Load restaurant reviews
reviews_df = spark.read.parquet('../data/ratings_ugt10_igt10')

# Randomly split data into train and test datasets
train_df, test_df = reviews_df.randomSplit(weights=[0.5, 0.5])

print(train_df.printSchema())

root
 |-- user: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- rating: byte (nullable = true)

None


In [23]:
estimator = Recommender(
    useALS=True,
    useBias=True,
    lambda_1=7,
    lambda_2=12,
    userCol='user',
    itemCol='item',
    ratingCol='rating',
    rank=76,
    regParam=0.7,
    maxIter=15,
    nonnegative=True
)
model = estimator.fit(train_df)

train_predictions_df = model.transform(train_df)
predictions_df = model.transform(test_df)

print(predictions_df.printSchema())

train_predictions_df.registerTempTable("train_predictions_df")
predictions_df.registerTempTable("predictions_df")

root
 |-- user: integer (nullable = true)
 |-- item: integer (nullable = true)
 |-- rating: byte (nullable = true)
 |-- prediction: double (nullable = true)

None


In [24]:
# print('rmse: ', rmse_evaluator.evaluate(predictions_df))
# print('quant: ', quant_evaluator.evaluate(predictions_df))
print('train ndcg: ', ndcg_evaluator.evaluate(train_predictions_df))
print('test ndcg: ', ndcg_evaluator.evaluate(predictions_df))
print('train ndcg10: ', ndcg10_evaluator.evaluate(train_predictions_df))
print('test ndcg10: ', ndcg10_evaluator.evaluate(predictions_df))

train ndcg:  0.040854358939851565
test ndcg:  0.043669841712116964
train ndcg10:  0.0634047824074252
test ndcg10:  0.06786467880199432


In [25]:
predictions_df.head(40)

[Row(user=148, item=1342, rating=3, prediction=3.540774848167022),
 Row(user=148, item=623, rating=3, prediction=3.513993506770806),
 Row(user=148, item=137, rating=5, prediction=3.905822759236992),
 Row(user=148, item=2027, rating=4, prediction=3.5372629508440667),
 Row(user=148, item=321, rating=4, prediction=3.2196287628480587),
 Row(user=148, item=1160, rating=4, prediction=3.2945274173632004),
 Row(user=148, item=2797, rating=2, prediction=2.565378822933188),
 Row(user=148, item=368, rating=3, prediction=3.591911341813706),
 Row(user=148, item=642, rating=4, prediction=3.1399276932696516),
 Row(user=148, item=1183, rating=3, prediction=2.9878156642558267),
 Row(user=148, item=784, rating=4, prediction=3.3050335809953797),
 Row(user=148, item=914, rating=5, prediction=3.3181439113424034),
 Row(user=148, item=2678, rating=4, prediction=3.1182221027665733),
 Row(user=148, item=236, rating=5, prediction=3.8167642708677896),
 Row(user=148, item=4200, rating=4, prediction=3.389878863929

In [26]:
predictions_df.groupBy('user').count().orderBy('count', ascending=False).head(10)

[Row(user=0, count=456),
 Row(user=2, count=347),
 Row(user=1, count=336),
 Row(user=3, count=304),
 Row(user=4, count=295),
 Row(user=6, count=242),
 Row(user=5, count=240),
 Row(user=7, count=216),
 Row(user=9, count=212),
 Row(user=8, count=209)]

In [27]:
df2a = spark.sql(
'''
    select
        user,
        sum(dcg) / sum(idcg) as ndcg
    from (
        select
            user,
            rating / log(2, 1 + 
                row_number() OVER (
                    PARTITION BY user
                    ORDER BY prediction DESC
                )
            ) as dcg,
            rating / log(2, 1 + 
                row_number() OVER (
                        PARTITION BY user
                        ORDER BY rating DESC
                    )
            ) as idcg
        from predictions_df
    ) x
    group by user

'''
)


df2b = spark.sql(
'''
    select 
        p.user,
        p.dcg / a.idcg as ndcg10
    from (
        select
            x.user,
            sum(x.rating / log(2, 1 + x.pred_row_num)) as dcg
        from (
            select
                user,
                rating,
                row_number() OVER (
                    PARTITION BY user
                    ORDER BY prediction DESC
                ) as pred_row_num
            from predictions_df
        ) x 
        where x.pred_row_num <= 10
        group by x.user
    ) p
    join (
        select
            x.user,
            sum(x.rating / log(2, 1 + x.actual_row_num)) as idcg
        from (
            select
                user,
                rating,
                row_number() OVER (
                    PARTITION BY user
                    ORDER BY rating DESC
                ) as actual_row_num
            from predictions_df
        ) x 
        where x.actual_row_num <= 10
        group by x.user
    ) a on a.user = p.user
'''
)

print(df2a.show(10))
print(df2b.show(10))

+----+------------------+
|user|              ndcg|
+----+------------------+
| 148|0.9646962597956135|
| 463|0.9596764020655281|
| 471|0.9678051020325722|
| 496|0.9621525950828473|
| 833|0.9631342487318908|
|1088|0.9739746963777636|
|1238|0.9768922971648027|
|1342| 0.886232397232901|
|1580|0.9561892451394816|
|1591|0.9786135657109796|
+----+------------------+
only showing top 10 rows

None
+----+------------------+
|user|            ndcg10|
+----+------------------+
| 148|0.8345057637681018|
| 463|0.8605202897368526|
| 471|0.8717908420343268|
| 496|0.8608970003302908|
| 833|0.8511244491581771|
|1088|0.8932880058765676|
|1238|0.9311331006419016|
|1342|0.6935498527651609|
|1580|0.8704978084093802|
|1591|0.9162164076223135|
+----+------------------+
only showing top 10 rows

None


In [28]:
df3 = spark.sql(
'''
select
    user,
    item,
    rating,
    prediction,
    rating / log(2, 1 + 
        row_number() OVER (
            PARTITION BY user
            ORDER BY prediction DESC
        )
    ) as dcg,
    rating / log(2, 1 + 
        row_number() OVER (
                PARTITION BY user
                ORDER BY rating DESC
            )
    ) as idcg,
    row_number() OVER (
        PARTITION BY user
        ORDER BY prediction DESC
    ) as pred_row_num,
    row_number() OVER (
        PARTITION BY user
        ORDER BY rating DESC
    ) as ideal_row_num
from predictions_df
where user = 148
order by pred_row_num
'''
)
df3.show(100)

+----+----+------+------------------+-------------------+-------------------+------------+-------------+
|user|item|rating|        prediction|                dcg|               idcg|pred_row_num|ideal_row_num|
+----+----+------+------------------+-------------------+-------------------+------------+-------------+
| 148| 137|     5| 3.905822759236992|                5.0|                5.0|           1|            1|
| 148| 236|     5|3.8167642708677896|  3.154648767857287|  3.154648767857287|           2|            2|
| 148| 462|     4| 3.795090406609635|                2.0|  0.884258917830015|           3|           22|
| 148|  45|     4| 3.725317086765978| 1.7227062322935722| 0.8724171679421261|           4|           23|
| 148| 128|     5| 3.705003941581893|  1.934264036172708|                2.5|           5|            3|
| 148|  43|     5|3.6721713139637284| 1.7810359355401109| 2.1533827903669653|           6|            4|
| 148|  74|     1| 3.626755793383918| 0.333333333333333

In [29]:
toy_df = spark.createDataFrame([
    (1,1,1,3.8), (1,2,3,3.8), (1,3,1,3.8), (1,4,1,3.8), (1,5,5,3.8),
    (1,6,4,3.8), (1,7,5,3.8), (1,8,5,3.8), (1,9,5,3.8), (1,10,5,3.8),
],['user','item','rating', 'prediction'])

toy_df.registerTempTable("toy_df")

In [30]:
df3 = spark.sql(
'''
select
    user,
    item,
    rating,
    prediction,
    rating / log(2, 1 + 
        row_number() OVER (
            PARTITION BY user
            ORDER BY prediction DESC
        )
    ) as dcg,
    rating / log(2, 1 + 
        row_number() OVER (
                PARTITION BY user
                ORDER BY rating DESC
            )
    ) as idcg,
    row_number() OVER (
        PARTITION BY user
        ORDER BY prediction DESC
    ) as pred_row_num,
    row_number() OVER (
        PARTITION BY user
        ORDER BY rating DESC
    ) as ideal_row_num
from toy_df
'''
)
df3.show(100)

+----+----+------+----------+-------------------+-------------------+------------+-------------+
|user|item|rating|prediction|                dcg|               idcg|pred_row_num|ideal_row_num|
+----+----+------+----------+-------------------+-------------------+------------+-------------+
|   1|   5|     5|       3.8|                5.0|                5.0|           1|            1|
|   1|   7|     5|       3.8|  3.154648767857287|  3.154648767857287|           2|            2|
|   1|   8|     5|       3.8|                2.5|                2.5|           3|            3|
|   1|   9|     5|       3.8| 2.1533827903669653| 2.1533827903669653|           4|            4|
|   1|  10|     5|       3.8|  1.934264036172708|  1.934264036172708|           5|            5|
|   1|   6|     4|       3.8| 1.4248287484320887| 1.4248287484320887|           6|            6|
|   1|   2|     3|       3.8|                1.0|                1.0|           7|            7|
|   1|   1|     1|       3.8| 

In [31]:
avg_rating_df = (
    train_df
    .agg(
        F.avg('rating').alias('avg_rating')
    )
)

train_predict_df = (
    train_df
    .crossJoin(avg_rating_df)
    .withColumn(
        'prediction',
        F.col('avg_rating') + F.randn()
    )
    .select(
        'user',
        'item',
        'rating',
        'prediction'
    )
)

train_predict_df.registerTempTable("train_predict_df")

train_predict_df.show()

+----+----+------+------------------+
|user|item|rating|        prediction|
+----+----+------+------------------+
|   0|  22|     4| 3.438706702931956|
|   0|  34|     3| 2.472287134629834|
|   0|  43|     5| 2.800809337009305|
|   0|  62|     4|2.8652366697503315|
|   0|  74|     5| 4.203724077491838|
|   0| 106|     4|3.6165322950273247|
|   0| 134|     3|3.4806515832663814|
|   0| 146|     3|1.0257466319226944|
|   0| 149|     5|  5.40518003004515|
|   0| 188|     3| 3.016204829162767|
|   0| 190|     4|3.1782809346052545|
|   0| 222|     5|  2.96560250164553|
|   0| 230|     4| 4.012016542731463|
|   0| 350|     5| 3.745525760959535|
|   0| 399|     4| 3.892543386994308|
|   0| 403|     2|  3.68719142950705|
|   0| 434|     4|5.1836311444517555|
|   0| 457|     4| 3.593295770876345|
|   0| 464|     4| 4.961416887205116|
|   0| 533|     4|  4.65880147889484|
+----+----+------+------------------+
only showing top 20 rows



In [32]:
df4 = spark.sql(
'''
select
    user,
    item,
    rating,
    prediction,
    rating / log(2, 1 + 
        row_number() OVER (
            PARTITION BY user
            ORDER BY prediction DESC
        )
    ) as dcg,
    rating / log(2, 1 + 
        row_number() OVER (
                PARTITION BY user
                ORDER BY rating DESC
            )
    ) as idcg,
    row_number() OVER (
        PARTITION BY user
        ORDER BY prediction DESC
    ) as pred_row_num,
    row_number() OVER (
        PARTITION BY user
        ORDER BY rating DESC
    ) as ideal_row_num
from train_predict_df
where user = 148
order by pred_row_num
'''
)
df4.show(20)

+----+----+------+------------------+------------------+-------------------+------------+-------------+
|user|item|rating|        prediction|               dcg|               idcg|pred_row_num|ideal_row_num|
+----+----+------+------------------+------------------+-------------------+------------+-------------+
| 148|2304|     4|6.1706119023937624|               4.0| 0.7798360875751452|           1|           34|
| 148|1649|     4| 5.853989483210963|2.5237190142858297| 0.6918787617803084|           2|           54|
| 148| 369|     4|5.6701715093340965|               2.0|  0.884258917830015|           3|           22|
| 148| 221|     3|   5.5749159610576|1.2920296742201793|0.48163225630206424|           4|           74|
| 148| 633|     3| 5.563554499918556|1.1605584217036249|  0.480159219764464|           5|           75|
| 148|2441|     3| 5.262028353850321|1.0686215613240666|0.47187974204864064|           6|           81|
| 148| 379|     4| 5.122374288143945|1.3333333333333333| 0.87241

In [33]:
test_predict_df = (
    test_df
    .crossJoin(avg_rating_df)
    .withColumn(
        'prediction',
        F.col('avg_rating') + F.randn()
    )
    .select(
        'user',
        'item',
        'rating',
        'prediction'
    )
)

test_predict_df.registerTempTable("test_predict_df")

test_predict_df.show()

+----+----+------+------------------+
|user|item|rating|        prediction|
+----+----+------+------------------+
|   0|  18|     4|   5.8382427406166|
|   0|  32|     4|4.6595378220695975|
|   0|  35|     5| 3.666596910823167|
|   0|  36|     3|3.5493592850388094|
|   0|  50|     5| 5.249635815300059|
|   0|  70|     4|3.4895007637050472|
|   0|  77|     3|  2.83960383925089|
|   0|  78|     5|2.5888604065867105|
|   0|  98|     5| 4.375817741178096|
|   0| 116|     5| 4.519717794780622|
|   0| 136|     2| 4.503841947780726|
|   0| 157|     4| 5.584036452988005|
|   0| 161|     4|2.2316745115703567|
|   0| 198|     5| 4.448128710449876|
|   0| 217|     3|3.5708312971283434|
|   0| 235|     4| 4.017185579566163|
|   0| 236|     4| 3.036628914714359|
|   0| 243|     4| 3.810116619652029|
|   0| 266|     4| 2.517439511208432|
|   0| 294|     2| 5.199446980791228|
+----+----+------+------------------+
only showing top 20 rows



In [34]:
df5 = spark.sql(
'''
select
    user,
    item,
    rating,
    prediction,
    rating / log(2, 1 + 
        row_number() OVER (
            PARTITION BY user
            ORDER BY prediction DESC
        )
    ) as dcg,
    rating / log(2, 1 + 
        row_number() OVER (
                PARTITION BY user
                ORDER BY rating DESC
            )
    ) as idcg,
    row_number() OVER (
        PARTITION BY user
        ORDER BY prediction DESC
    ) as pred_row_num,
    row_number() OVER (
        PARTITION BY user
        ORDER BY rating DESC
    ) as ideal_row_num
from test_predict_df
where user = 148
'''
)
df5.show(200)

+----+----+------+------------------+-------------------+-------------------+------------+-------------+
|user|item|rating|        prediction|                dcg|               idcg|pred_row_num|ideal_row_num|
+----+----+------+------------------+-------------------+-------------------+------------+-------------+
| 148|1945|     3|5.8026854996111155|                3.0|0.47187974204864064|           1|           81|
| 148|1055|     3| 5.779313255718588| 1.8927892607143721|0.49111696633564683|           2|           68|
| 148| 275|     4| 5.757909587955016|                2.0| 0.7371553325948247|           3|           42|
| 148|1571|     4| 5.629198979997861| 1.7227062322935722| 0.7201253066267705|           4|           46|
| 148|2334|     3| 5.523136737160017| 1.1605584217036249| 0.4705853245783229|           5|           82|
| 148|1183|     3| 5.470647577132097| 1.0686215613240666| 0.4759048767467881|           6|           78|
| 148|1147|     4| 5.363699977681501| 1.333333333333333

In [35]:
df6 = spark.sql(
'''
select 1 - avg(p.dcg / a.idcg) as ndcg
from (
    select
        x.user,
        sum(x.rating / log(2, 1 + x.pred_row_num)) as dcg
    from (
        select
            user,
            rating,
            row_number() OVER (
                PARTITION BY user
                ORDER BY prediction DESC
            ) as pred_row_num
        from predictions_df
    ) x 
    where x.pred_row_num <= 10
    group by x.user
) p
join (
    select
        x.user,
        sum(x.rating / log(2, 1 + x.actual_row_num)) as idcg
    from (
        select
            user,
            rating,
            row_number() OVER (
                PARTITION BY user
                ORDER BY rating DESC
            ) as actual_row_num
        from predictions_df
    ) x 
    where x.actual_row_num <= 10
    group by x.user
) a on a.user = p.user
''')

df6.collect()[0][0]

0.06786467880199432

In [36]:
# test top N ndcg implementation
def eval_ndcg(df):
    df.registerTempTable("df")
    
    score_df = spark.sql(
    '''
    select 1 - avg(p.dcg / a.idcg) as ndcg
    from (
        select
            x.user,
            sum(x.rating / log(2, 1 + x.pred_row_num)) as dcg
        from (
            select
                user,
                rating,
                row_number() OVER (
                    PARTITION BY user
                    ORDER BY prediction DESC
                ) as pred_row_num
            from df
        ) x 
        where x.pred_row_num <= 10
        group by x.user
    ) p
    join (
        select
            x.user,
            sum(x.rating / log(2, 1 + x.actual_row_num)) as idcg
        from (
            select
                user,
                rating,
                row_number() OVER (
                    PARTITION BY user
                    ORDER BY rating DESC
                ) as actual_row_num
            from df
        ) x 
        where x.actual_row_num <= 10
        group by x.user
    ) a on a.user = p.user
    '''
    )
    
    return score_df.collect()[0][0]

In [37]:
print('train ndcg: ', ndcg_evaluator.evaluate(train_predictions_df))
print('test ndcg: ', ndcg_evaluator.evaluate(predictions_df))
print('train ndcg_10: ', eval_ndcg(train_predictions_df))
print('test ndcg_10: ', eval_ndcg(predictions_df))

train ndcg:  0.040854358939851565
test ndcg:  0.043669841712116964
train ndcg_10:  0.0634047824074252
test ndcg_10:  0.06786467880199432


In [38]:
print('random train ndcg: ', ndcg_evaluator.evaluate(train_predict_df))
print('random test ndcg: ', ndcg_evaluator.evaluate(test_predict_df))
print('random train ndcg_10: ', eval_ndcg(train_predict_df))
print('random test ndcg_10: ', eval_ndcg(test_predict_df))

random train ndcg:  0.06848897846773916
random test ndcg:  0.06809032973399232
random train ndcg_10:  0.10590540704842988
random test ndcg_10:  0.10565139486116693
