In [3]:
from sklearn.neighbors import KNeighborsClassifier

import numpy as np
import os.path

# import warnings filter
from warnings import simplefilter
# ignore all future warnings
simplefilter(action='ignore', category=FutureWarning)


suffix = ''

all_embeddings = []
all_labels = []
all_sequences = []

for database in ['kwitonda_portraits', 'martha_bwindi', 'martha_loango']:
    path = f'/workspace/data/crops_and_embeddings/{database}/embeddings'

    embeddings = np.load(f'{path}/embeddings{suffix}.npy')
    labels = np.load(f'{path}/individual_ids.npy')
    if os.path.exists(f'{path}/sequence_ids.npy'):
        sequences = np.load(f'{path}/sequence_ids.npy')
    else:
        sequences = np.load(f'{path}/file_names.npy')

    all_embeddings.append(embeddings)
    all_labels.append(labels)
    all_sequences.append(sequences)

embeddings = np.concatenate(all_embeddings)
labels = np.concatenate(all_labels)
sequences = np.concatenate(all_sequences)


assert len(embeddings) == len(labels) and len(embeddings) == len(sequences)

print(embeddings.shape)
print(labels.shape)

(1983, 256)
(1983,)


In [4]:
def predict_one(embeddings, labels, sequence_ids, test_idx, k=5):
    test_sequence = sequence_ids[test_idx]
    train_idx = [i for i in range(len(embeddings)) if i != test_idx and sequence_ids[i] != test_sequence]
    
    train_embeddings = embeddings[train_idx]
    train_labels = labels[train_idx]


    test_embedding = embeddings[test_idx]

    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(train_embeddings, train_labels)

    return knn.predict([test_embedding])[0], labels[test_idx]

# predict_one(embeddings, labels, 3)

# Calculate precision, recall, f1

def calculate_metrics(embeddings, labels, sequence_ids, min_samples=5, k=2):
    # only predicting on individuals with at least min_samples
    unique_labels, counts = np.unique(labels, return_counts=True)
    unique_labels = unique_labels[counts >= min_samples]
    
    mask = np.isin(labels, unique_labels)
    # embeddings = embeddings[mask]
    # labels = labels[mask]
    
    idx = [i for i in range(len(embeddings)) if labels[i] in unique_labels]
    
    print(f'Calculating metrics on {len(idx)} samples from {len(unique_labels)} individuals with {len(embeddings)} samples in total.')
    
    predictions = [predict_one(embeddings, labels, sequence_ids, i, k=k) for i in idx]
    predictions = np.array(predictions)
    
    tp = np.sum(predictions[:, 0] == predictions[:, 1])
    
    average_precision =  tp / len(embeddings)
    
    return average_precision
 

print(calculate_metrics(embeddings, labels, sequences, min_samples=1, k=3))
print(calculate_metrics(embeddings, labels, sequences, min_samples=5, k=3))

Calculating metrics on 1983 samples from 91 individuals with 1983 samples in total.
0.24659606656580937
Calculating metrics on 1884 samples from 34 individuals with 1983 samples in total.
0.24407463439233484
