In [1]:
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "1"
import numpy as np


from sklearn.metrics.pairwise import euclidean_distances as ed
from sklearn.metrics.pairwise import cosine_distances as cd
from collections import defaultdict
from pyeer.eer_info import get_eer_stats
from sklearn.metrics.pairwise import manhattan_distances as md

from sklearn.metrics.pairwise import cosine_similarity as cs

from pytorch_metric_learning.distances import LpDistance, CosineSimilarity,SNRDistance

def EERf(genuine,impostor):
    genuine = np.array(genuine).ravel()
    impostor = np.array(impostor).ravel()
    #print(genuine.shape, genuine[:5],impostor[:5])
    stats_a = get_eer_stats(genuine, impostor)
    return(stats_a.eer,stats_a.fmr100)


def calculate_and_print_averages(genuine,impostor):
    eer_values = []

    for key in genuine.keys():
        re = EERf(genuine[key],impostor[key])  
        eer_values.append(re[0])  

    average_eer = np.mean(eer_values) * 100
    std_eer = np.std(eer_values) * 100
    return average_eer, std_eer

import torch.nn.functional as F





def calculate_similarity_scores_two(enrollment_embeddings, y_enrollment, verification_embeddings, y_verification,distance):
    similarity_results = []
    similarity_results_by_class = []
    similarity_results_by_class_dict = defaultdict(list)
    unique_classes = np.unique(y_enrollment)
    class_indices = [np.where(y_enrollment == cls)[0] for cls in unique_classes]

    if distance == "cd":
        similarity_matrix = -1 * cd(verification_embeddings, enrollment_embeddings)
    elif distance == "ed":
        similarity_matrix = -1 * ed(verification_embeddings, enrollment_embeddings)

    for i in range(similarity_matrix.shape[0]):
        current_class = y_verification[i]
        predicted_scores = similarity_matrix[i]
        same_class_indices = class_indices[np.where(unique_classes == current_class)[0][0]]

        for cls in unique_classes:
            same_class_indices = class_indices[np.where(unique_classes == cls)[0][0]]
            max_score = sum(sorted(predicted_scores[same_class_indices], reverse=True)[:10]) / 10
            if current_class == cls:
                pass
                #similarity_results_by_class.append([max_score, 1, current_class, cls, i])
                #similarity_results_by_class_dict[cls].append([max_score, 1, current_class, cls, i, cls])
            else:
                similarity_results_by_class.append([max_score, 0, current_class, cls, i])
                similarity_results_by_class_dict[cls].append([max_score])

    return similarity_results_by_class, similarity_results_by_class_dict

def assessment_model_data_two(enrollment_data, ye, verification_data, yv, e_network, distance):
    x_enrollment, y_enrollment = enrollment_data, ye
    x_verification, y_verification = verification_data, yv
    enrollment_embeddings = compute_embedding_batch_two(x_enrollment, e_network)
    verification_embeddings = compute_embedding_batch_two(x_verification, e_network)
    similarity_results_by_class, similarity_results_by_class_dict = calculate_similarity_scores_two(
        enrollment_embeddings, y_enrollment, verification_embeddings, y_verification,distance
    )
    return similarity_results_by_class, similarity_results_by_class_dict




import torch
import numpy as np

def compute_embedding_batch_two(x_test_batch, embedding_network, batch_size=100, device="cuda"):
    #print(x_test_batch.shape)
    return x_test_batch

In [2]:
# Loading the files with the same names
x_test_e = np.load('x_test_e.npy')
y_test_e = np.load('y_test_e.npy')
s_test_e = np.load('s_test_e.npy')
h_test_e = np.load('h_test_e.npy')

x_test_v = np.load('x_test_v.npy')
y_test_v = np.load('y_test_v.npy')
s_test_v = np.load('s_test_v.npy')
h_test_v = np.load('h_test_v.npy')

# Verify loaded arrays (optional)
print(x_test_e.shape, y_test_e.shape, s_test_e.shape, h_test_e.shape)
print(x_test_v.shape, y_test_v.shape, s_test_v.shape, h_test_v.shape)
results2, impostor = assessment_model_data_two(x_test_e, y_test_e, x_test_v, y_test_v, None, distance='cd')


(10000, 128) (10000,) (10000,) (10000,)
(160051, 128) (160051,) (160051,) (160051,)


In [3]:
from collections import defaultdict
import numpy as np
from sklearn.metrics.pairwise import cosine_distances as cd

# Merge the two datasets
x_test = np.concatenate((x_test_e, x_test_v), axis=0)
y_test = np.concatenate((y_test_e, y_test_v), axis=0)
s_test = np.concatenate((s_test_e, s_test_v), axis=0)
h_test = np.concatenate((h_test_e, h_test_v), axis=0)

# Combine into a single data structure
data = {'X': x_test, 'Y': y_test, 'S': s_test, 'H': h_test}

def compute_similarity_per_pair(data):
    similarity_results = defaultdict(lambda: defaultdict(list))  # Pair -> Subject -> Scores

    subjects = np.unique(data['Y'])  # Unique subjects

    for subject in subjects:
        # Filter data for the current subject
        indices = np.where(data['Y'] == subject)[0]
        subject_data = data['X'][indices]
        subject_sessions = data['S'][indices]
        subject_headsets = data['H'][indices]

        # Find the lowest session as enrollment data
        min_session = np.min(subject_sessions)
        enrollment_indices = np.where(subject_sessions == min_session)[0]
        enrollment_data = subject_data[enrollment_indices]
        enrollment_headset = subject_headsets[enrollment_indices[0]]  # Assume same headset for enrollment

        if enrollment_data.ndim == 1:
            enrollment_data = enrollment_data.reshape(1, -1)

        # Iterate through all verification samples (different sessions or headsets)
        verification_indices = np.where(subject_sessions != min_session)[0]
        for i2 in verification_indices:
            verification_sample = subject_data[i2]
            verification_headset = subject_headsets[i2]

            # Ensure verification_sample has the correct shape
            if verification_sample.ndim == 1:
                verification_sample = verification_sample.reshape(1, -1)

 
            distances = -1 * cd(enrollment_data, verification_sample)
            max_score = sum(sorted(distances, reverse=True)[:10]) / 10  # Top-10 average similarity

            # Create a sorted hardware pair key
            pair = tuple(sorted((enrollment_headset, verification_headset)))

            # Store the result: Pair -> Subject -> Scores
            #print(max_score)
            similarity_results[pair][subject].append(max_score)

    return similarity_results

# Example usage
similarity_results = compute_similarity_per_pair(data)

# Print results
for pair, subjects in similarity_results.items():
    print(f"Pair {pair}:")
    for subject, scores in subjects.items():
        avg_score = np.mean(scores)
        print(f"  Subject {subject}: Average Similarity = {avg_score:.4f}, Count = {len(scores)}")




Pair (b'Geodisi', b'HydroCe'):
  Subject 70: Average Similarity = -0.2521, Count = 1200
  Subject 85: Average Similarity = -0.3966, Count = 100
  Subject 86: Average Similarity = -0.2053, Count = 500
  Subject 96: Average Similarity = -0.3184, Count = 900
  Subject 103: Average Similarity = -0.2078, Count = 800
  Subject 106: Average Similarity = -0.3533, Count = 1400
  Subject 109: Average Similarity = -0.2674, Count = 100
  Subject 111: Average Similarity = -0.2902, Count = 600
  Subject 115: Average Similarity = -0.3648, Count = 1500
  Subject 118: Average Similarity = -0.2801, Count = 800
  Subject 119: Average Similarity = -0.2946, Count = 400
  Subject 125: Average Similarity = -0.2214, Count = 1100
  Subject 129: Average Similarity = -0.2673, Count = 100
  Subject 130: Average Similarity = -0.1539, Count = 700
  Subject 131: Average Similarity = -0.3005, Count = 1000
  Subject 136: Average Similarity = -0.3141, Count = 1300
  Subject 145: Average Similarity = -0.3384, Count = 90

In [4]:
from sklearn.metrics.pairwise import cosine_distances as cd
from collections import defaultdict
import numpy as np

def compute_similarity_per_pair(data):
    similarity_results = defaultdict(lambda: defaultdict(list))  # Pair -> Subject -> Scores

    subjects = np.unique(data['Y'])  # Unique subjects

    for subject in subjects:
        # Filter data for the current subject
        indices = np.where(data['Y'] == subject)[0]
        subject_data = data['X'][indices]
        subject_sessions = data['S'][indices]
        subject_headsets = data['H'][indices]

        # Combine session and hardware into unique pairs
        session_hardware_pairs = list(set(zip(subject_sessions, subject_headsets)))
        session_hardware_pairs.sort()  # Ensure consistent ordering by session number

        # Loop through all pairs of (session, hardware)
        for (sess1, hw1) in session_hardware_pairs:
            for (sess2, hw2) in session_hardware_pairs:
                if sess1 < sess2:  # Enforce lower session as enrollment
                    # Find enrollment data
                    enrollment_indices = np.where((subject_sessions == sess1) & (subject_headsets == hw1))[0]
                    enrollment_data = subject_data[enrollment_indices]
                    if enrollment_data.ndim == 1:
                        enrollment_data = enrollment_data.reshape(1, -1)
                    enrollment_hw = hw1

                    # Find verification data
                    verification_indices = np.where((subject_sessions == sess2) & (subject_headsets == hw2))[0]
                    for i2 in verification_indices:
                        verification_sample = subject_data[i2]
                        verification_hw = hw2

                        # Ensure verification_sample has the correct shape
                        if verification_sample.ndim == 1:
                            verification_sample = verification_sample.reshape(1, -1)

                        # Compute cosine similarity
                        distances = -1 * cd(enrollment_data, verification_sample)
                        max_score = sum(sorted(distances, reverse=True)[:10]) / 10

                        # Create a sorted hardware pair key
                        pair = tuple(sorted((enrollment_hw, verification_hw)))

                        # Append similarity score
                        similarity_results[pair][subject].append(float(max_score.item()))

    return similarity_results

# Example usage
similarity_results = compute_similarity_per_pair(data)

# Print results
for pair, subjects in similarity_results.items():
    print(f"Pair {pair}:")
    for subject, scores in subjects.items():
        avg_score = np.mean(scores)
        print(f"  Subject {subject}: Average Similarity = {avg_score:.4f}, Count = {len(scores)}")


Pair (b'Geodisi', b'HydroCe'):
  Subject 70: Average Similarity = -0.2451, Count = 8400
  Subject 85: Average Similarity = -0.3566, Count = 1800
  Subject 86: Average Similarity = -0.1960, Count = 7000
  Subject 96: Average Similarity = -0.2903, Count = 8100
  Subject 103: Average Similarity = -0.2103, Count = 8000
  Subject 106: Average Similarity = -0.3181, Count = 36700
  Subject 109: Average Similarity = -0.2614, Count = 400
  Subject 111: Average Similarity = -0.2636, Count = 7800
  Subject 115: Average Similarity = -0.3721, Count = 40300
  Subject 118: Average Similarity = -0.2940, Count = 8800
  Subject 119: Average Similarity = -0.2968, Count = 1600
  Subject 125: Average Similarity = -0.2303, Count = 8800
  Subject 129: Average Similarity = -0.3274, Count = 400
  Subject 130: Average Similarity = -0.1812, Count = 8400
  Subject 131: Average Similarity = -0.3695, Count = 9000
  Subject 136: Average Similarity = -0.2934, Count = 7800
  Subject 145: Average Similarity = -0.2796, 

In [6]:
calculate_and_print_averages(similarity_results[(b'Geodisi', b'HydroCe')],impostor)

(13.25206861038949, 6.383754496119846)

In [7]:
calculate_and_print_averages(similarity_results[(b'BioSemi', b'HydroCe')],impostor)

  warn("It is possible that you had set the wrong score"


(48.20999544620841, 28.876722623711732)

In [23]:
# Function to calculate all possible hardware pairs from unique triplets and count unique subjects per pair
def calculate_hardware_pairs_with_subjects(unique_triplets):
    all_pairs = {}
    subject_counts = {}
    subjects = np.unique(unique_triplets['label'])

    for subject in subjects:
        subject_triplets = unique_triplets[unique_triplets['label'] == subject]
        hardware_combinations = subject_triplets['hardware']

        # Generate all pairs for this subject including pairs from the same hardware in different sessions
        for i in range(len(hardware_combinations)):
            for j in range(i + 1, len(hardware_combinations)):
                pair = tuple(sorted((hardware_combinations[i], hardware_combinations[j])))
                
                # Count total occurrences of the pair
                all_pairs[pair] = all_pairs.get(pair, 0) + 1

                # Add subject to the pair-specific set
                if pair not in subject_counts:
                    subject_counts[pair] = set()
                subject_counts[pair].add(subject)

    # Convert subject sets to unique counts
    unique_subject_counts = {pair: len(subjects) for pair, subjects in subject_counts.items()}
    
    return all_pairs, unique_subject_counts

# Run the updated function
hardware_pairs, subject_counts = calculate_hardware_pairs_with_subjects(unique_triplets)

# Print results
print("All possible hardware pairs used by subjects (including same hardware in different sessions):")
for pair, count in hardware_pairs.items():
    print(f"Pair {pair}: {count} times, Unique Subjects: {subject_counts[pair]}")


All possible hardware pairs used by subjects (including same hardware in different sessions):
Pair (b'Geodisi', b'HydroCe'): 5559 times, Unique Subjects: 52
Pair (b'Geodisi', b'Geodisi'): 1963 times, Unique Subjects: 49
Pair (b'HydroCe', b'HydroCe'): 7989 times, Unique Subjects: 72
Pair (b'BioSemi', b'HydroCe'): 331 times, Unique Subjects: 2
Pair (b'BioSemi', b'BioSemi'): 3270 times, Unique Subjects: 26
