In [1]:
import numpy as np
import torch

In [2]:
def get_max(logits):
    """ logits: B x L X V   --- >   items: argmax -> B x L X 1 """
    return torch.argmax(logits, dim=-1, keepdim=True)  # B x L X 1

In [3]:
"""
###########    EXAMPLE    ###########
logits = np.array([[[0.5, 0.2, 0.2], [0.3, 0.4, 0.2], [0.2, 0.84, 0.3], [0.87, 0.29, 0.1]],
                   [[0.5, 0.2, 0.2], [0.3, 0.4, 0.2], [0.2, 0.84, 0.3], [0.87, 0.29, 0.91]],
                   [[0.1, 0.2, 0.292], [0.3, 0.2, 0.2], [0.6, 0.36, 0.3], [0.37, 0.42, 0.122]]])
logits = torch.tensor(logits)



M = SRS_Metrics(reverse=True, k=2)
score = logits
true = torch.tensor([1, 2, 2])
M.HIT(true, score), M.NDCG_HIT(true, score), M.NDCG(true, score)

"""
import torch
import numpy as np


class SRS_Metrics:
    def __init__(self, device, myformat, reverse=True, k=10):
        self.target_position = -1 if reverse else 0
        self.device = device
        self.myformat = myformat  # np.array or torch.tensor
        self.k = k

    def check(self, true, score):
        """
        Returns the tensors and the number of dimensions.
        score: B x V or B x L x V
        true: B or B x V
        """
        # Data type correction
        if torch.is_tensor(score) and torch.is_tensor(true):
            length_score = len(score.size())
            length_true = len(true.size())
        elif isinstance(score, (np.ndarray, np.generic)) and isinstance(true, (np.ndarray, np.generic)):
            length_score = len(score.shape)
            length_true = len(true.shape)
            true = torch.tensor(true)
            score = torch.tensor(score)
        elif isinstance(true, (np.ndarray, np.generic)):
            true = torch.tensor(true)
            length_true = len(true.size())
            length_score = len(score.size())
        elif isinstance(score, (np.ndarray, np.generic)):
            score = torch.tensor(score)
            length_score = len(score.size())
            length_true = len(true.size())

        if true.get_device() != self.device:  true = true.to(self.device)
        if score.get_device() != self.device:  score = score.to(self.device)

        # Dimension correction
        if length_score == 3:
            score = score[:, self.target_position, :]
        if length_true == 2:
            true = true[:, self.target_position]
        # Top 
        _, indices = torch.topk(score, self.k)
        return score, true, indices

    def HIT(self, true, score, indices=None):
        """
        Returns the Hit Ratio on the top k for numpy arrays and torch tensors.
        score: B x V or B x L x V
        true: B or B x V
        """
        if indices is None:
            score, true, indices = self.check(true, score)
        return (indices == true.reshape(-1, 1)).any(1).float().mean()

    def NDCG(self, true, score, indices=None):
        """
        Returns the Normalized Discount Cumulative Gain (NDCG) on the top k for numpy arrays and torch tensors.
        score: B x V or B x L x V
        true: B or B x V
        """
        if indices is None:
            score, true, indices = self.check(true, score)
        matches_matrix = (indices == true.reshape(-1, 1)).float()
        idcg_normalizator = torch.div(1, torch.log2(torch.arange(2, 2 + matches_matrix.shape[-1]))).to(self.device)
        return torch.mul(matches_matrix, idcg_normalizator).sum(-1).mean()

    def NDCG_HIT(self, true, score, indices=None):
        """ Both previous metrics together """
        if indices is None:
            score, true, indices = self.check(true, score)
        print(f"true {true}")
        print(f"score {score}")
        HIT = self.HIT(true, score, indices)
        NDCG = self.NDCG(true, score, indices)
        if self.myformat == np.array:
            HIT = HIT.cpu().detach().numpy()
            NDCG = NDCG.cpu().detach().numpy()
        return NDCG, HIT


In [4]:
metrics_calculator = SRS_Metrics(device="cpu", myformat=np.array, reverse=True, k=4)

In [5]:
metrics_calculator = SRS_Metrics(device="cpu", myformat=np.array, reverse=True, k=4)
y_true = torch.tensor([[0, 2, 1], [1, 2, 1]]).float()

y_score = torch.tensor([[   [0.8700, 0.0000, 0.1000, 0.0223, 0.0223, 0.0223],
                            [0.2700, 0.1900, 0.0000,0.1700, 0.0000, 0.2100],
                            [0.3700, 0.4200, 0.1220, 0.2700, 0.1900,  0.1000]],
                           [[0.8700, 0.0000, 0.1000, 0.0223, 0.0223, 0.0223],
                            [0.2700, 0.1900, 0.0000,0.0000, 0.1200, 0.2100],
                            [0.3700, 0.2600, 0.0000, 0.2700, 0.1900,  0.0000]]]).float()


b = torch.tensor([[   [0.8700, 0.2900, 0.1000],
                            [0.2700, 0.1900, 0.0100],
                            [0.3700, 0.4200, 0.1220]],
                           [[0.8700, 0.2900, 0.1000],
                            [0.2700, 0.1900, 0.0100],
                            [0.3700, 0.2600, 0.1220]]]).float()
# b = torch.tensor([[0.8700,0.2700, 0.0223],
#                   [0.2700, 0.1900, 0.0100]])
a = torch.tensor([[3, 2, 1], [1, 2, 0]])
c = torch.tensor([[3, 2, 1], [1, 2, 0]]).reshape(2,3,1)
print(a.size(), b.size())
NDCG, HIT = metrics_calculator.NDCG_HIT(true=y_true, score=y_score)  # [B x L], [B x L x V]
NDCG, HIT

torch.Size([2, 3]) torch.Size([2, 3, 3])
true tensor([1., 1.])
score tensor([[0.3700, 0.4200, 0.1220, 0.2700, 0.1900, 0.1000],
        [0.3700, 0.2600, 0.0000, 0.2700, 0.1900, 0.0000]])


(array(0.75, dtype=float32), array(1., dtype=float32))

In [6]:
import tensorflow as tf

In [161]:
# a = a[:, :, None]
istarget = tf.reshape(tf.cast(tf.not_equal(a, 0), dtype=float), [tf.shape(a)[0] * tf.shape(a)[1]])
istarget, a

(<tf.Tensor: shape=(6,), dtype=float32, numpy=array([1., 1., 1., 1., 1., 0.], dtype=float32)>,
 tensor([[3, 2, 1],
         [1, 2, 0]]))

In [164]:
unum = 13
for _ in range(20):
    print(max([np.random.randint(1, unum + 1) for _ in range(100)]))

13
13
13
13
13
13
13
13
13
13
13
13
13
13
13
13
13
13
13
13


In [159]:
score = np.array([[1.0, 5, 2.0], [3.0, 3.1, 4.0]])
NDCG, HT, valid_user = 0.0, 0.0, 0.0
k = 2
B = score.shape[0]
# take the maximum of the negatives, i.e., the minimum, and then again to get the ranking value
rank_of_the_first_item = np.argsort(np.argsort(-score, axis=-1), axis=-1)[:, 0]
for i in rank_of_the_first_item:
    if i < k:
        NDCG += 1 / np.log2(i + 2)
        HT += 1
        print(HT, NDCG)
HT /= B
NDCG /= B

In [160]:
HT, NDCG

(0.0, 0.0)

In [117]:
NDCG, HT, valid_user = 0.0, 0.0, 0.0
predictions = np.array([[1.0, 5, 2.0], [3.0, 3.1, 4.0]])
predictions = -predictions
print(predictions)
# Take the first line
predictions = predictions
# score = -prediction. The real values
score = -predictions.copy()
print(score)
# take the maximum of the negatives, i.e., the minimum
rank_of_the_first_item = predictions.argsort().argsort()[0]
print("rank_of_the_first_item",rank_of_the_first_item)

valid_user += 1

if rank_of_the_first_item < 10:
    NDCG += 1 / np.log2(rank_of_the_first_item + 2)
    HT += 1

[[-1.  -5.  -2. ]
 [-3.  -3.1 -4. ]]
[[1.  5.  2. ]
 [3.  3.1 4. ]]
rank_of_the_first_item [2 0 1]


ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

In [110]:
rank_of_the_first_item

2

In [45]:
usernum = 10
size = 5
import random
if usernum > size:
    users = random.sample(range(1, usernum + 1), size)

In [70]:
users = (np.random.randint(1, usernum) for _ in range(size))
def random_neq(l, r, s):
    """ random integer between l and r but avoiding s """
    t = np.random.randint(l, r)
    while t in s:
        t = np.random.randint(l, r)
    return t
users = (random_neq(1,9, set({2,3,4,5})) for _ in range(10))

In [72]:
list(users)

[8, 7, 8, 8, 1, 6, 7, 6, 1]

In [36]:
predictions.argsort()

array([1, 2, 0])

In [87]:
def binary_cross_entropy(pred, y): return -((pred + 1e-24).log() * y + (1 - y) * (1 - pred + 1e-24).log()).sum()
pos_emb = torch.tensor([[3, 2, 1], [1, 2, 0]])
# pos_emb = pos_emb.view(-1)
pos_emb = torch.sigmoid(pos_emb)
binary_cross_entropy(pos_emb, pos_emb), pos_emb

(tensor(2.7791),
 tensor([[0.9526, 0.8808, 0.7311],
         [0.7311, 0.8808, 0.5000]]))

In [84]:
def get_mask(tensor):
    return tensor[tensor != 0].unsqueeze(-1)
pos_emb.to(torch.bool).float()

tensor([[1., 1., 1.],
        [1., 1., 0.]])

In [47]:

from typing import Optional, Tuple, Any
def _expand_mask(mask: torch.Tensor, dtype: torch.dtype, tgt_len: Optional[int] = None):
    """
    Expands attention_mask from `[bsz, seq_len]` to `[bsz, 1, tgt_seq_len, src_seq_len]`.
    """
    bsz, src_len = mask.size()
    tgt_len = tgt_len if tgt_len is not None else src_len

    expanded_mask = mask[:, None, None, :].expand(bsz, 1, tgt_len, src_len).to(dtype)

    inverted_mask = 1.0 - expanded_mask

    return inverted_mask.masked_fill(inverted_mask.to(torch.bool), torch.finfo(dtype).min)

def get_attetion_mask(inputs_ids, inputs_embeds):
    attention_mask = inputs_ids.clone()
    attention_mask[attention_mask != 0] = 1
    return _expand_mask(attention_mask, inputs_embeds.dtype)

In [48]:
get_attetion_mask(y_true, y_true)

tensor([[[[-3.4028e+38,  0.0000e+00,  0.0000e+00],
          [-3.4028e+38,  0.0000e+00,  0.0000e+00],
          [-3.4028e+38,  0.0000e+00,  0.0000e+00]]],


        [[[ 0.0000e+00,  0.0000e+00,  0.0000e+00],
          [ 0.0000e+00,  0.0000e+00,  0.0000e+00],
          [ 0.0000e+00,  0.0000e+00,  0.0000e+00]]]])

In [54]:
y_true * y_true.to(torch.bool).float()

tensor([[0., 4., 2.],
        [2., 4., 2.]])

In [9]:
torch.permute(y_score, (0, 2, 1)).size()

torch.Size([2, 6, 3])

In [207]:
y_score = np.array([[0.5, 0.2, 0.2],  # 0 is in top 2
                 [0.3, 0.4, 0.2],  # 1 is in top 2
                 [0.2, 0.4, 0.3],  # 2 is in top 2
                 [0.7, 0.2, 0.1]]) # 2 isn't in top 2
np.argpartition(y_score,-2)

array([[1, 2, 0],
       [2, 0, 1],
       [0, 2, 1],
       [2, 1, 0]])

In [None]:

   test_seq: tensor([47484, 16289,  7157,  2045,  7369, 19901, 30601, 46404, 25635,  9169,
        20962, 49880, 44097, 46088, 23853, 33111, 32175, 34216, 26956,  6582,
        44620,  6318, 49679, 42857, 45715, 26650, 23133, 16790, 14568, 14869,
        24371, 42534, 30484, 52893, 12467, 28804, 18231, 17863, 43356, 31260,
        33540, 21910, 30796, 49069,  8771, 11094, 46572,   194, 34776, 17570,
        51547, 11191, 42753, 32328, 42071, 20390, 40198,  2262, 52889, 28370,
        26513,  6717, 21260, 28934, 41690,  4379, 40584, 46547, 19836, 29034,
        24001, 45497, 10333, 42605,  2801], device='cuda:0')
   seq: tensor([    0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0, 12888, 49583,     1,  4733,  5761,
        10845, 11210, 26875, 37882, 39167, 39988, 43922, 45974, 47202, 47483,
        47849, 48131, 48895, 52222, 34944, 50059,  4489,  5910, 11663, 12147,
        12956, 32489, 37163, 44593, 48076, 54035,  3180,  4824, 14847, 17019,
        31989, 33412, 40168, 45056, 48406], device='cuda:0')