In [1]:
import torch
import numpy as np

# --- The following Python libraries are employed to validate the correctness of the metrics. --- #
import sklearn
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score

In [2]:
class metrics:
    """
        Input: pred -> [B, N], true -> [B, N]
        where [B, N] denotes a two-dimensional array, with B representing BatchSize and N representing the length of the samples.
        Output: Average metrics per batch.
        
        输入: pred -> [B, N], true -> [B, N]
        其中[B, N]表示一个二维数组，其中B表示BatchSize，N表示样本长度
        返回：每个Batch的平均指标
    """
    def __init__(self):
        pass
    
    def clipping_k(self, pred, true, num = 0):
        """
            select Topk order by [pred]
        """
        if num > 0:
            num = min(num, true.shape[1])
            pred, indecs = torch.sort(pred, dim=1, descending=True)
            true = true.gather(dim=1, index=indecs)
            pred, true = pred[:, :num], true[:, :num]
        return pred, true
    
    def Acc_k(self, pred, true, num = 0, threshold = 0.5):
        """
            ACC = (TP + TN) / (TP + FP + FN + TN)
        """
        pred, true = self.clipping_k(pred, true, num)
        pred_binary = (pred > threshold).int()
        return 1 - (pred_binary ^ true).sum() / true.numel()
    
    def Precision_k(self, pred, true, num = 0, threshold = 0.5):
        """
            Precision = TP / (TP + FN)
        """
        pred, true = self.clipping_k(pred, true, num)
        pred_binary = (pred > threshold).int()
        PredPositive_Place = torch.where(pred_binary==1)
        return true[PredPositive_Place].sum() / len(PredPositive_Place[0])
    
    def Recall_k(self, pred, true, num = 0, threshold = 0.5):
        """
            Recall = TP / (TP + FP)
        """
        pred, true = self.clipping_k(pred, true, num)
        pred_binary = (pred > threshold).int()
        TruePositive_Place = torch.where(true==1)
        return pred_binary[TruePositive_Place].sum() / len(TruePositive_Place[0])
    
    def F1_score_k(self, pred, true, num = 0, threshold = 0.5):
        """
            F1 = 2*Recall*Precision / (Recall + Precision)
               = 2*TP / (2 * TP + FN + FP)
               = 2*TP / ((Pred_Positive) + (True_Positive))
        """
        pred, true = self.clipping_k(pred, true, num)
        pred_binary = (pred > threshold).int()
        return 2 * pred_binary[torch.where(true==1)].sum() \
                / (len(torch.where(true==1)[0]) + len(torch.where(pred_binary==1)[0]))
    
    def CG_k(self, pred, true, num = 0, threshold = 0.5):
        """
            CG@k = sum_k(rel_i)
        """
        pred, true = self.clipping_k(pred, true, num)
        return true.sum()
        pass
    
    def DCG_k(self, pred, true, num = 0, threshold = 0.5):
        """
            DCG@k = sum_k(rel_i / log2(i + 1))
        """
        pred, true = self.clipping_k(pred, true, num)
        _log = torch.log2(torch.arange(true.shape[1]) + 2).to('cpu' if pred.device == 'cpu' else 'cuda')
        return (true / _log).sum()
        pass
    
    def IDCG_k(self, pred, true, num = 0, threshold = 0.5):
        """
            IDCG@k = sum_k([sort]->rel_i / log2(i + 1))
        """
        pred, true = self.clipping_k(pred, true, num)
        true = torch.sort(true, dim=1, descending=True)[0]
        _log = torch.log2(torch.arange(true.shape[1]) + 2).to('cpu' if pred.device == 'cpu' else 'cuda')
        return (true / _log).sum()
        pass
    
    def NDCG_k(self, pred, true, num = 0, threshold = 0.5):
        """
            NDCG@k = DCG@k / IDCG@k
        """
        pred, true = self.clipping_k(pred, true, num)
        _log = torch.log2(torch.arange(2, true.shape[1] + 2)).to('cpu' if pred.device == 'cpu' else 'cuda')
        return (((true / _log).sum(1)) / (torch.sort(true, dim=1, descending=True)[0] / _log).sum(1)).sum()
        pass
    
    def HR_k(self, pred, true, num = 0, threshold = 0.5):
        """
            HR@k = sum_k(rel_i) / len(rel)
            
            Note: The original definition of HR involves the probability of correctly recommending 
            positive samples multiple times for an individual. However, during a single recommendation 
            process of the model, evaluations for all or multiple items are generated. As a result, 
            the top-k predictions from 'pred' are usually extracted to represent the items with the 
            highest predicted likelihood of interaction, which are then compared to the ground truth. 
            Therefore, based on my comprehension, the calculation is analogous to CG. :)
            
            注：HR原定义是对个人推荐多次命中正样本的概率，而模型的一次推荐过程通常产生所有/多个商品的评估
            因此通常将pred的topk拿出表示预测最可能交互的商品与true比较，因此在我理解上计算与CG相同 :)
        """
        pred, true = self.clipping_k(pred, true, num)
        return true.sum() / true.numel()
        pass
    
    def ARHR(self, pred, true, num = 0, threshold = 0.5):
        """
            ARHR@k = sum_k(rel_i / i) / len(rel)
        """
        pred, true = self.clipping_k(pred, true, num)
        _place = 1 / torch.arange(1, true.shape[1] + 1, device="cpu" if pred.device == 'cpu' else 'cuda')
        return (true * _place).sum() / true.numel()
        pass

---
## validate the correctness of the metrics

#### step 0. Generating Data

In [3]:
seed = 7777
SampleSize = 15
torch.manual_seed(seed) 
torch.cuda.manual_seed(seed) 
pred = torch.rand(SampleSize).unsqueeze(0)
true = torch.randint(2, (SampleSize,)).unsqueeze(0)
pred_binary = (pred > 0.5).int()

if SampleSize <= 20:
    print('pred: ', pred)
    print("true: ", true)
    print("pred_binary: ", pred_binary)
"""
As sklearn only accepts one-dimensional input arrays, 
we will need to prepare a one-dimensional array.
由于sklearn的输入数组只能是一维，因此我们需要准备一份一维数组
"""
pred_ForSklearn = pred.squeeze(0)
true_ForSklearn = true.squeeze(0)
pred_binary_ForSklearn = pred_binary.squeeze(0)

pred:  tensor([[0.7453, 0.7328, 0.2350, 0.8423, 0.8364, 0.5164, 0.1372, 0.0066, 0.8180,
         0.3887, 0.0759, 0.9314, 0.5957, 0.5706, 0.6584]])
true:  tensor([[1, 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 1, 0, 0]])
pred_binary:  tensor([[1, 1, 0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 1]], dtype=torch.int32)


#### step 1. Demonstration of the Top-k Selection Function's Effectiveness

In [4]:
ev = metrics()
# @k = 2
topk_pred, topk_true = ev.clipping_k(pred=pred, true=true, num=2)
print(topk_pred)
print(topk_true)

tensor([[0.9314, 0.8423]])
tensor([[1, 1]])


#### step 2. Verification of 'Accuracy' Computation Accuracy

In [15]:
ev = metrics()
ev.Acc_k(pred=pred.to('cuda'), true=true.to('cuda'))

tensor(0.4000, device='cuda:0')

In [6]:
accuracy_score(y_true=true_ForSklearn, y_pred=pred_binary_ForSklearn)

0.4

#### 3. Verification of 'Recall' Computation Accuracy

In [7]:
ev = metrics()
ev.Recall_k(pred=pred.to('cuda'), true=true.to('cuda'))

tensor(0.5556, device='cuda:0')

In [8]:
recall_score(y_true=true_ForSklearn, y_pred=pred_binary_ForSklearn)

0.5555555555555556

#### 4. Verification of 'Precision' Computation Accuracy

In [9]:
ev = metrics()
ev.Precision_k(pred=pred.unsqueeze(0).to('cuda'), true=true.unsqueeze(0).to('cuda'))

tensor(0.5000, device='cuda:0')

In [10]:
precision_score(y_true=true_ForSklearn, y_pred=pred_binary_ForSklearn)

0.5

#### 5. Verification of 'F1_score' Computation Accuracy

In [11]:
ev = metrics()
ev.F1_score_k(pred=pred.unsqueeze(0).to('cuda'), true=true.unsqueeze(0).to('cuda'))

tensor(0.5263, device='cuda:0')

In [12]:
f1_score(y_true=true_ForSklearn, y_pred=pred_binary_ForSklearn)

0.5263157894736842

#### 6. Verification of 'HR/ARHR' Computation Accuracy

In [13]:
"""Manual Verification of its Accuracy"""

ev = metrics()
print(ev.clipping_k(pred=pred.to('cuda'), true=true.to('cuda'), num=5))
print(ev.HR_k(pred=pred.to('cuda'), true=true.to('cuda'), num=5))
print(ev.ARHR(pred=pred.to('cuda'), true=true.to('cuda'), num=5))

(tensor([[0.9314, 0.8423, 0.8364, 0.8180, 0.7453]], device='cuda:0'), tensor([[1, 1, 0, 0, 1]], device='cuda:0'))
tensor(0.6000, device='cuda:0')
tensor(0.3400, device='cuda:0')


#### 7. Verification of 'NDCG' Computation Accuracy

In [14]:
"""Manual Verification of its Accuracy"""

ev = metrics()
print(ev.clipping_k(pred=pred.to('cuda'), true=true.to('cuda'), num=5))
print(ev.NDCG_k(pred=pred.to('cuda'), true=true.to('cuda'), num=5))

(tensor([[0.9314, 0.8423, 0.8364, 0.8180, 0.7453]], device='cuda:0'), tensor([[1, 1, 0, 0, 1]], device='cuda:0'))
tensor(0.9469, device='cuda:0')
