In [None]:
import sys
sys.path.append('/nvme/zhiyong/ARPL')
import os
import os.path as osp
import numpy as np

import torch
from torch.autograd import Variable
import torch.nn.functional as F

from core import evaluation

from my_datasets.osr_dataloader_spk_n import SpeakerDataloader

def load_and_average_embeddings(dir_path, num_speakers=5, num_utterances=20):
    mat_enrolled = torch.zeros(num_speakers, 192)  
    for spk_id in range(num_speakers):
        spk_embeddings = []
        for utt_id in range(num_utterances):
            file_path = os.path.join(dir_path, str(spk_id), f"{utt_id}.npy")
            embedding = np.load(file_path, allow_pickle=True)
            embedding_tensor = torch.from_numpy(embedding)
            spk_embeddings.append(embedding_tensor)
        spk_embeddings_tensor = torch.stack(spk_embeddings)
        mat_enrolled[spk_id] = torch.mean(spk_embeddings_tensor, dim=0)
    return mat_enrolled

def test4Cosine(testloader, outloader, dir_path, use_gpu=True):
    correct, total = 0, 0
    _pred_k, _pred_u, _labels = [], [], []
    _pred_emb,_pred_emb_u = [], []
    _outlabels=[]

    # Load and average embeddings to create mat_enrolled
    mat_enrolled = load_and_average_embeddings(dir_path)
    mat_enrolled = torch.tensor(mat_enrolled).float()
    if use_gpu:
        mat_enrolled = mat_enrolled.cuda()

    with torch.no_grad():
        for data, labels in testloader:
            if use_gpu:
                data, labels = data.cuda(), labels.cuda()
            
            logits = F.cosine_similarity(data.unsqueeze(1), mat_enrolled.unsqueeze(0), dim=2)
            predictions = logits.data.max(1)[1]
            total += labels.size(0)
            correct += (predictions == labels.data).sum()

            _pred_k.append(logits.data.cpu().numpy())
            _labels.append(labels.data.cpu().numpy())
            _pred_emb.append(data.data.cpu().numpy())

        for batch_idx, (data, labels) in enumerate(outloader):
            if use_gpu:
                data, labels = data.cuda(), labels.cuda()

            logits = F.cosine_similarity(data.unsqueeze(1), mat_enrolled.unsqueeze(0), dim=2)

            _pred_u.append(logits.data.cpu().numpy())
            _pred_emb_u.append(data.data.cpu().numpy())
            _outlabels.append(labels.data.cpu().numpy())
            

    acc = float(correct) * 100. / float(total)
    print('Acc: {:.5f}'.format(acc))

    _pred_k = np.concatenate(_pred_k, 0)
    _pred_u = np.concatenate(_pred_u, 0)
    _labels = np.concatenate(_labels, 0)
    _outlabels = np.concatenate(_outlabels, 0)
    
    # Out-of-Distribution detction evaluation
    x1, x2 = np.max(_pred_k, axis=1), np.max(_pred_u, axis=1)
    results = evaluation.metric_ood(x1, x2)['Bas']
    
    # OSCR
    _oscr_socre = evaluation.compute_oscr(_pred_k, _pred_u, _labels)

    results['ACC'] = acc
    results['OSCR'] = _oscr_socre * 100.

    return results, _pred_emb, _labels, _pred_emb_u,_pred_k,_pred_u,_outlabels



In [None]:
Data = SpeakerDataloader(known=list(range(5)), train_root='/server9/speech_group/wsh/datasets/Qualcomm_eresnet/emb-eres2netv2_train', 
                            test_root='/server9/speech_group/wsh/datasets/Qualcomm_eresnet/emb-eres2netv2_test5', batch_size=16)
trainloader, testloader, outloader = Data.train_loader, Data.test_loader, Data.out_loader

results, _pred_emb, _labels, _pred_emb_u,_pred_k,_pred_u,_outlabels = test4Cosine(testloader, outloader, '/server9/speech_group/wsh/datasets/Qualcomm_eresnet/emb-eres2netv2_train', use_gpu=True)
print("Acc (%): {:.3f}\t AUROC (%): {:.3f}\t OSCR (%): {:.3f}\t".format(results['ACC'], results['AUROC'], results['OSCR']))

In [None]:
from score.my_scorer import score_me
num_samples = _pred_k.shape[0]
num_classes = _pred_k.shape[1]

score_list = []
label_list = []

for i in range(num_samples):
    logits = _pred_k[i] 
    true_label = _labels[i] 

    for cls in range(num_classes):
        score_list.append(logits[cls]) 
        label_list.append(1 if cls == true_label else 0)  
                    
num_out_samples = _pred_u.shape[0] 
num_out_classes = _pred_u.shape[1]

for i in range(num_out_samples):
    logits = _pred_u[i] 

    for cls in range(num_out_classes):
        score_list.append(logits[cls]) 
        label_list.append(0) 

score_list = np.array(score_list)
label_list = np.array(label_list)


configuration = {
                'p_target': [0.1], 
                'c_miss': 1,           
                'c_fa': 1            
            }
eer, min_c, act_c = score_me(score_list, label_list, configuration)

print(f"EER: {eer * 100:.2f}%")
print(f"Min_C: {min_c:.3f}")
print(f"Act_C: {act_c:.3f}")
