In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import pandas as pd
import numpy as np

In [115]:
spark = SparkSession.builder.getOrCreate()

In [116]:
data = pd.DataFrame({'CSNO' : [1, 2, 3, 4], 'RANK' : [[5, 2, 7, 3, 4, 6], [1, 9, 3, 4, 5, 6], [1, 2, 3, 4, 8, 6], [3, 4, 6]]})

In [117]:
df = spark.createDataFrame(data)

In [118]:
df.show()

+----+------------------+
|CSNO|              RANK|
+----+------------------+
|   1|[5, 2, 7, 3, 4, 6]|
|   2|[1, 9, 3, 4, 5, 6]|
|   3|[1, 2, 3, 4, 8, 6]|
|   4|         [3, 4, 6]|
+----+------------------+



## MRR@k

In [119]:
def get_MRR_k(rel, k) :
    
    m_idx = np.min(sorted(rel)[:k])
    RR = np.float(1 / m_idx)
    
    return RR

In [120]:
udf_MRR = f.udf(get_MRR_k)

In [121]:
df.withColumn('RR@5', udf_MRR(f.col('RANK'), f.lit(5))).show()

+----+------------------+------------------+
|CSNO|              RANK|              RR@5|
+----+------------------+------------------+
|   1|[5, 2, 7, 3, 4, 6]|               0.5|
|   2|[1, 9, 3, 4, 5, 6]|               1.0|
|   3|[1, 2, 3, 4, 8, 6]|               1.0|
|   4|         [3, 4, 6]|0.3333333333333333|
+----+------------------+------------------+



## MAP@k

In [122]:
def get_MAP(rel, k) :
    ideal = sorted(rel)[:k]
    z = list(np.zeros(k))
    o = list(np.ones(k))
    
    for v in ideal :
        if v <= k :
            z[v-1] = 1
        else :
            pass
    ap = 0
    for i in range(k) :
        cum_z = z[:i + 1].count(1)
        cum_o = o[:i + 1].count(1)
        ap += cum_z / cum_o

    avg_precision = ap / k
    
    return avg_precision

In [123]:
udf_pre = f.udf(get_MAP)

In [125]:
df.withColumn('MAP@5', udf_pre(f.col('RANK'), f.lit(5))).show()

+----+------------------+-------------------+
|CSNO|              RANK|              MAP@5|
+----+------------------+-------------------+
|   1|[5, 2, 7, 3, 4, 6]| 0.5433333333333333|
|   2|[1, 9, 3, 4, 5, 6]| 0.7433333333333334|
|   3|[1, 2, 3, 4, 8, 6]|               0.96|
|   4|         [3, 4, 6]|0.24666666666666667|
+----+------------------+-------------------+



## nDCG@k

In [149]:
data = pd.DataFrame({'CSNO' : [1, 2, 3, 4, 5, 6], 'REL' : [[7, 4, 2, 1], [3, 5, 8, 10], [5, 2, 7, 4, 10, 1], [1, 9, 3, 5, 6], [7, 2, 3, 4, 8, 6], [3, 4, 6]]})

In [150]:
df = spark.createDataFrame(data)

In [151]:
df.show()

+----+-------------------+
|CSNO|                REL|
+----+-------------------+
|   1|       [7, 4, 2, 1]|
|   2|      [3, 5, 8, 10]|
|   3|[5, 2, 7, 4, 10, 1]|
|   4|    [1, 9, 3, 5, 6]|
|   5| [7, 2, 3, 4, 8, 6]|
|   6|          [3, 4, 6]|
+----+-------------------+



In [152]:
def get_nDCG(rel, k) :
    
    ideal = sorted(rel)[::-1][:k]
    
    cdg = 0
    for i, v in enumerate(rel[:k]) :
        cdg += (v / np.log2(i + 2))
    cdg = np.float(cdg)
    
    idcg = 0
    for i, v in enumerate(ideal[:k]) :
        idcg += (v / np.log2(i + 2))
    idcg = np.float(idcg)
    
    ndcg_k = np.float(cdg / idcg)
    
    return ndcg_k

In [153]:
get_ndcg = f.udf(get_nDCG)

In [154]:
df = df.withColumn('nDCG@5', get_ndcg(f.col('REL'), f.lit(5)))

In [155]:
df.show()

+----+-------------------+------------------+
|CSNO|                REL|            nDCG@5|
+----+-------------------+------------------+
|   1|       [7, 4, 2, 1]|               1.0|
|   2|      [3, 5, 8, 10]| 0.767612682944761|
|   3|[5, 2, 7, 4, 10, 1]|0.7908698802386929|
|   4|    [1, 9, 3, 5, 6]|0.7458455304749564|
|   5| [7, 2, 3, 4, 8, 6]|0.7966977652608278|
|   6|          [3, 4, 6]|0.8503549433237109|
+----+-------------------+------------------+

