### init environment

In [6]:
import torch
import numpy as np
from functools import partial

In [7]:
good_count = 20        # общее кол-во товаров
user_count = 100         # кол-во пользователей для которых генерим данные
recommend_count = 10    # кол-во генерируемых рекомендаций для каждого пользователя
price_range = (199, 2490)

# генерируем проверочные данные для заданий
rnd = np.random.default_rng(11)
goods = np.arange(good_count) + 50      # список id существующих товаров
prices = np.round(rnd.random(good_count) * (max(price_range) - min(price_range)) + min(price_range), 2)     # цены товаров

recommends = []         # списки рекомендаций для пользователей
rec_prices = []         # списки цен рекомендованных товаров
boughts = []            # список покупок
b_prices = []           # список цен покупок
for _ in range(user_count):
    indexes = rnd.choice(goods.size, size=recommend_count, replace=False)    # индексы рекомендаций
    recommends.append(goods[indexes])
    rec_prices.append(prices[indexes])

    boughts_count = rnd.integers(good_count) + 1
    indexes = rnd.choice(goods.size, size=boughts_count, replace=False)
    boughts.append(goods[indexes])
    b_prices.append(prices[indexes])

### 1. hit rate at k

In [8]:
def hit_rate(recommended_list, bought_list):
    flags = np.isin(bought_list, recommended_list)
    hit_rate = (flags.sum() > 0) * 1
    return hit_rate


def hit_rate_at_k(recommended_list, bought_list, k=5):
    """ Hit rate@k = (был ли хотя бы 1 релевантный товар среди топ-k рекомендованных) """
    # с использованием numpy
    flags = np.isin(recommended_list[:k], bought_list)
    return (flags.sum() > 0) * 1

    # без использования numpy
    # return (len(set(bought_list) & set(recommended_list[:k])) > 0) * 1

In [9]:
np.isin(recommends[10][:5], boughts[10])

array([False,  True,  True, False,  True])

In [10]:
# check
user = 0
val = hit_rate_at_k(recommends[user], boughts[user], 5)
print(f'Hit rate@k value (k=5): {val}')

Hit rate@k value (k=5): 1


In [11]:
# также можно в hit_rate() как recommended_list передавать нужный slice:
user = 0
val = hit_rate(recommends[user][:5], boughts[user])
print(f'Hit rate@k value (k=5): {val}')

Hit rate@k value (k=5): 1


### 2. money precision at k

In [12]:
def money_precision_at_k(recommended_list, bought_list, prices_recommended, k=5):
    """ Доля дохода по рекомендованным объектам
    :param recommended_list - список id рекомендаций
    :param bought_list - список id покупок
    :param prices_recommended - список цен для рекомендаций
    """
    flags = np.isin(recommended_list[:k], bought_list)
    prices = np.array(prices_recommended[:k])
    return flags @ prices / prices.sum()

In [13]:
# check
user = 10
money_precision_at_k(recommends[user], boughts[user], rec_prices[user], k=5)

0.6854211885678256

In [14]:
recommends[user], boughts[user], rec_prices[user]

(array([53, 58, 67, 68, 51, 62, 63, 61, 57, 59]),
 array([58, 66, 63, 50, 57, 51, 61, 55, 56, 67, 59, 62, 52, 64, 65, 54, 60]),
 array([ 264.73, 2371.62, 1372.87, 2070.14, 1342.85, 1717.57,  829.73,
        1370.59,  496.31, 1623.74]))

### 3. recall at k

In [15]:
def recall_at_k(recommended_list, bought_list, k=5):
    """ Recall on top k items """
    flags = np.isin(bought_list, recommended_list[:k])
    return flags.sum() / len(bought_list)

In [16]:
# check
user = 10
recall_at_k(recommends[user], boughts[user], k=5)

0.17647058823529413

### 4. money recall at k

In [17]:
def money_recall_at_k(recommended_list, bought_list, prices_recommended, prices_bought, k=5):
    """ Доля дохода по релевантным рекомендованным объектам
    :param recommended_list - список id рекомендаций
    :param bought_list - список id покупок
    :param prices_recommended - список цен для рекомендаций
    :param prices_bought - список цен покупок
    """
    flags = np.isin(recommended_list[:k], bought_list)      # get recommend to bought matches
    prices = np.array(prices_recommended[:k])               # get prices of recommended items
    return flags @ prices / np.sum(prices_bought)

In [18]:
# check
user = 10
money_recall_at_k(recommends[user], boughts[user], rec_prices[user], b_prices[user], k=5)

0.23424252219100425

### 5. map at k

In [19]:
def precision_at_k(recommended_list, bought_list, k=5):
    flags = np.isin(bought_list, recommended_list[:k])
    return flags.sum() / k

def ap_k(recommended_list, bought_list, k=5):
    # переработано
    flags = np.isin(recommended_list, bought_list)
    if sum(flags) == 0:
        return 0

    sum_ = 0
    for i in range(0, k-1):
        if flags[i]:
            sum_ += precision_at_k(recommended_list, bought_list, k=i+1)
    result = sum_ / flags.sum()
    return result

    # func = partial(precision_at_k, recommended_list, bought_list)
    # rel_items = np.arange(1, k + 1)[flags[:k]]                  # получаем номера релевантных объектов
    # return np.sum(list(map(func, rel_items))) / flags.sum()     # считаем avg precision@k для этих объектов

In [20]:
# v1
def map_k_v1(recommended_list, bought_list, k=5, u=1, w=None):
    """ Среднее AP@k по u пользователям """
    apk = []
    w = w if w is not None else np.ones(shape=(u, ))
    for user, user_weight in zip(range(u), w):
        apk.append(ap_k(recommended_list[user], bought_list[user], k) * user_weight)
    
    return np.mean(apk)

In [21]:
# v2
def map_k_v2(recommended_list, bought_list, k=5, u=1, w=None):
    """ Среднее AP@k по u пользователям """
    func = partial(ap_k, k=k)
    weights = w if w is not None else np.ones(shape=(u, ))
    apk = np.array(list(map(func, recommended_list[:u], bought_list[:u]))) * weights
    return apk.mean()

In [22]:
%%time
# check
map_k_v1(recommends, boughts, u=50, k=6)

CPU times: user 4.33 ms, sys: 4.15 ms, total: 8.49 ms
Wall time: 7.08 ms


0.35443373015873014

In [23]:
%%time
# check
map_k_v2(recommends, boughts, u=50, k=6)

CPU times: user 7.79 ms, sys: 0 ns, total: 7.79 ms
Wall time: 6.61 ms


0.35443373015873014

### 6. mean reciprocal rank

Mean Reciprocal Rank

- Считаем для первых k рекоммендаций
- Найти ранк первого релевантного предсказания $k_u$
- Посчитать reciprocal rank = $\frac{1}{k_u}$

$$MRR = mean(\frac{1}{k_u})$$

In [24]:
def reciprocal_rank(recommended_list, bought_list, n=1, k=5):    
    """ обратный ранг n релевантных рекомендаций среди первых k рекомендаций
    Это не совсем канонический reciprocal rank, но при n=1 должен работать как принято
    
    :param recommended_list - список рекомендаций
    :param bought_list - список покупок
    :param n - учитывать первые n релевантных рекомендаций
    :param k - искать релевантные среди первых k рекомендаций
    """
    flags = np.isin(recommended_list[:k], bought_list)
    ranks = np.arange(1, k + 1)[flags][:n]      # ранги первых n рекомендаций из первых k. равен 0 если рекомендация нерелевантна
    ideal_ranks = np.arange(1, n + 1)
    return (1 / ranks).sum() / (1 / ideal_ranks).sum() if flags.any() else 0

In [25]:
# check
user = 10
a = reciprocal_rank(recommends[user], boughts[user], n=1, k=5)
b = reciprocal_rank(recommends[user], boughts[user], n=5, k=5)
a, b

(0.5, 0.4525547445255474)

In [26]:
recommends[user], sorted(boughts[user])

(array([53, 58, 67, 68, 51, 62, 63, 61, 57, 59]),
 [50, 51, 52, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67])

In [27]:
def mean_reciprocal_rank_v1(recommended_list, bought_list, n=1, k=5):
    """ Среднеобратный ранг """
    ranks = []
    for data in zip(recommended_list, bought_list):
        ranks.append(reciprocal_rank(*data, k=k, n=n))
    return np.mean(ranks)

In [28]:
def mean_reciprocal_rank_v2(recommended_list, bought_list, n=1, k=5):
    """ Среднеобратный ранг - без for-loop """
    func = partial(reciprocal_rank, n=n, k=k)
    ranks = list(map(func, recommended_list, bought_list))
    return np.mean(ranks)

In [29]:
%%time
# check
mean_reciprocal_rank_v1(recommends, boughts, n=1, k=5)

CPU times: user 7.2 ms, sys: 0 ns, total: 7.2 ms
Wall time: 6.23 ms


0.6926666666666667

In [30]:
%%time
# check
mean_reciprocal_rank_v2(recommends, boughts, n=1, k=5)

CPU times: user 10.2 ms, sys: 681 µs, total: 10.9 ms
Wall time: 9.09 ms


0.6926666666666667

### 7. NDCG@k

In [31]:
N = 5

torch.random.manual_seed(29)
ys_true = torch.randint(0, 5, size=(N, ))
ys_pred = torch.rand(N)

# rnd = np.random.default_rng(77)
# ys_true = rnd.integers(5, size=N)
# ys_pred = rnd.random(size=N)

ys_true, ys_pred


(tensor([0, 3, 4, 2, 2]), tensor([0.3418, 0.3887, 0.5124, 0.8961, 0.3233]))

In [32]:
# def cumulative_gain(y_true, y_pred, k=3) -> float:
#     """ Cumulative gain at k """
#     _, argsort = torch.sort(y_pred, descending=True, dim=0)
#     return float(y_true[argsort[:k]].sum())

# # check
# cumulative_gain(ys_true, ys_pred, k=3)

In [33]:
def compute_gain(y_value, gain_scheme: str):
    # vectorized
    if gain_scheme == "exp2":
        gain = 2 ** y_value - 1
    elif gain_scheme == "const":
        gain = y_value
    else:
        raise ValueError(f"{gain_scheme} method not supported, only exp2 and const.")
    return gain

In [34]:
def dcg(ys_pred, ys_true, gain_scheme: str = 'const', k=3) -> float:
    """ Discounted Cumulative Gain at K """
    # argsort = np.argsort(np.array(ys_pred))[::-1]   # sort @k with numpy
    _, argsort = torch.sort(ys_pred, descending=True, dim=0)      # the same with torch
    ys_true_sorted = ys_true[argsort[:k]]

    gains = compute_gain(ys_true_sorted, gain_scheme)
    log_weights = np.log2(np.arange(k) + 2)
    return float((gains / log_weights).sum())

    # ret = 0
    # for idx, cur_y in enumerate(ys_true_sorted, 1):
    #     gain = compute_gain(cur_y, gain_scheme)
    #     ret += gain / (np.log2(idx + 1))
    # return float(ret)


In [35]:
def idcg(ys_true, gain_scheme: str = 'const', k=3) -> float:
    """ Ideal DCG at K """
    y_true_sorted, _ = torch.sort(ys_true, descending=True, dim=0)
    gains = compute_gain(y_true_sorted[:k], gain_scheme)        
    log_weights = np.log2(np.arange(k) + 2)
    return float((gains / log_weights).sum())

# эта функция не обязательна - она 1 в 1 повторяет dcg(), а значит, можно использовать его:
print(idcg(ys_true, 'const', k=3), dcg(ys_true, ys_true, 'const', k=3))
print(idcg(ys_true, 'exp2', k=3), dcg(ys_true, ys_true, 'exp2', k=3))

6.892789260714372 6.892789260714372
20.916508275000204 20.916508275000204


In [36]:
def ndcg(ys_pred, ys_true, gain_scheme: str = 'const', k=3) -> float:
    """ Normalized Discounted Cumulative Gain at K """
    pred_dcg = dcg(ys_pred, ys_true, gain_scheme, k)
    ideal_dcg = dcg(ys_true, ys_true, gain_scheme, k)
    return pred_dcg / ideal_dcg

In [38]:
ndcg(ys_pred, ys_true, 'const', k=3), ndcg(ys_pred, ys_true, 'exp2', k=3)

(0.8739160282497203, 0.7632223358547978)

### useful links

1. https://habr.com/ru/company/econtenta/blog/303458/