In [62]:
import argparse
import os
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from pathlib import Path
import re
from tqdm import tqdm
import pandas as pd
import csv


def list_files(path:str, allowed_ext:list) -> list:
    return [
        Path(os.path.join(dp, f)) 
        for dp, dn, filenames in os.walk(path) 
        for f in filenames 
        if os.path.splitext(f)[1] in allowed_ext
    ]


class Matcher:
    def __init__(self, probe_directory:str, gallery_directory:str=None, subject_id_regexp:str=None):
        p_templates = list_files(probe_directory, [".npy"])
        self.p_features = self.get_features(p_templates, "Probe Features: ")
        self.p_subjects = self.get_subject_ids(p_templates, subject_id_regexp, "Probe Subjects: ")
        self.p_labels = self.get_labels(p_templates, "Probe Labels: ")

        if gallery_directory is None:
            g_templates = p_templates
            self.g_features = self.p_features
            self.g_subject = self.p_subjects
            self.g_labels = self.p_labels
            print(f"Matching {probe_directory} to {probe_directory}")
        else:
            g_templates = list_files(gallery_directory, [".npy"])
            self.g_features = self.get_features(g_templates, "Gallery Features: ")
            self.g_subjects = self.get_subject_ids(g_templates, subject_id_regexp, "Gallery Subjects: ")
            self.g_labels = self.get_labels(g_templates, "Gallery Labels: ")
            self.probe_equal_gallery = False
            print(f"Matching {probe_directory} to {gallery_directory}")

        num_probes, num_gallery = len(p_templates), len(g_templates)

        # This matrix has:
        #   0:  Impostors, 1:  Authentic, -1: Marked for Removal
        self.authentic_impostor = np.zeros(shape=(num_probes, num_gallery), dtype=np.int8)
        for i in range(num_probes):
            self.authentic_impostor[i, self.p_subjects[i] == self.g_subjects] = 1
            self.authentic_impostor[i, self.p_labels[i] == self.g_labels] = -1

            if gallery_directory is None:
                self.authentic_impostor[i, 0 : min(i + 1, num_gallery)] = -1
        
        # Find comparisons with itself
        locations = np.transpose(
            np.where(np.core.defchararray.find(self.p_labels[:,np.newaxis], self.g_labels) >= 0)
        )

        for location in locations:
            i, j = location
            self.authentic_impostor[i, j] = -1

        self.matches = self.match_features(self.p_features, self.g_features)


    def get_labels(self, paths:list, description:str="Labels:") -> np.ndarray:
        return np.asarray([str(p.stem) for p in tqdm(paths, desc=description)])


    def get_features(self, feature_paths:list, description:str="Features:") -> np.ndarray:
        return np.asarray([ np.load(str(fp)) for fp in tqdm(feature_paths, desc=description) ])


    def get_subject_ids(self, feature_paths:list, regexp:str=None, description:str="Subjects:") -> np.array:
        def matcher(path, regexp=None):
            filename = str(Path(path).stem)
            if regexp is None:
                return filename.split("_")[0]
            else:
                match = re.search(regexp, filename)
                if not match: raise TypeError
                return match.group(0)
        
        return np.asarray([matcher(f, regexp) for f in tqdm(feature_paths, desc=description)])


    def match_features(self, probes:np.ndarray, gallery:np.ndarray) -> np.ndarray:
        return cosine_similarity(probes, gallery)


    def create_label_indices(self, labels) -> np.ndarray:
        indices = np.linspace(0, len(labels) - 1, len(labels)).astype(int)
        return np.transpose(np.vstack([indices, labels]))


    def get_indices_score(self, auth_or_imp):
        x, y = np.where(self.authentic_impostor == auth_or_imp)
        return np.transpose(
            np.vstack(
                [
                    x,
                    y,
                    np.round(self.matches[self.authentic_impostor == auth_or_imp], 6),
                ]
            )
        )


    def save_matches(self, output_directory:str, group_name:str, match_type:str="all", file_type="csv"):
        print("Saving matches output to " + f"{str(Path(output_directory))}")
        
        if match_type not in ["all", "authentic", "impostor"]: raise TypeError
        
        authentic_output = Path(output_directory) / f"{group_name}_authentic_scores.{file_type}"
        impostor_output = Path(output_directory) / f"{group_name}_impostor_scores.{file_type}"

        if match_type == "all":
            authentic, impostor = self.get_indices_score(1), self.get_indices_score(0)
            choices = [(authentic_output, authentic), (impostor_output, impostor)]
        elif match_type == "authentic":
            authentic = self.get_indices_score(1)
            choices = [(authentic_output, authentic)]
        elif match_type == "impostor":
            impostor = self.get_indices_score(0)
            choices = [(impostor_output, impostor)]

        for result_directory, data in choices:
            probe_labels = (self.p_labels[idx] for idx in np.int64(data[:,0]))
            gallery_labels = (self.g_labels[idx] for idx in np.int64(data[:,1]))
            scores = data[:,2]

            if file_type in ["csv", "txt"]:
                if file_type == "csv":
                    delimiting_character = ","
                elif file_type == "txt":
                    delimiting_character = " "
                
                with open(result_directory, "w") as out:
                    csv_out = csv.writer(out, delimiter=delimiting_character)
                    csv_out.writerows(zip(probe_labels, gallery_labels, scores))

            elif file_type == "npy":
                np.save(str(result_directory), scores)

                if "authentic_scores" in result_directory.stem:
                    labels_path = str(Path(output_directory) / f"{group_name}_authentic_scores_labels.txt")
                else:
                    labels_path = str(Path(output_directory) / f"{group_name}_impostor_scores_labels.txt")
                
                with open(labels_path, "w") as out:
                    csv_out = csv.writer(out, delimiter=" ")
                    csv_out.writerows(zip(probe_labels, gallery_labels))

    def save_score_matrix(self, output_directory:str, group_name:str, file_type="csv"):
        print("Saving score matrix to " + f"{str(Path(output_directory))}")
        matrix_output = Path(output_directory) / f"{group_name}_score_matrix.{file_type}"

        if file_type == "csv":
            df = pd.DataFrame(self.matches, index=self.p_labels, columns=self.g_labels)
            df.to_csv(str(matrix_output))
        elif file_type == "npy":
            matrix_row_labels = str(Path(output_directory) / f"{group_name}_score_matrix_row_labels.txt")
            matrix_col_labels = str(Path(output_directory) / f"{group_name}_score_matrix_col_labels.txt")
            np.save(str(matrix_output), self.matches)
            np.savetxt(matrix_row_labels, self.p_labels, fmt="%s")
            np.savetxt(matrix_col_labels, self.g_labels, fmt="%s")

In [63]:
probe = "/home/xavier/Pictures/cf_cloaked/templates/"
gallery = "/home/xavier/Pictures/cf_og/templates/"
m = Matcher(probe, gallery)#, gallery)

Probe Features: 100%|██████████| 10940/10940 [00:00<00:00, 15148.47it/s]
Probe Subjects: 100%|██████████| 10940/10940 [00:00<00:00, 407778.52it/s]
Probe Labels: 100%|██████████| 10940/10940 [00:00<00:00, 2610404.24it/s]
Gallery Features: 100%|██████████| 10939/10939 [00:00<00:00, 15498.23it/s]
Gallery Subjects: 100%|██████████| 10939/10939 [00:00<00:00, 410980.85it/s]
Gallery Labels: 100%|██████████| 10939/10939 [00:00<00:00, 2843952.86it/s]


Matching /home/xavier/Pictures/cf_cloaked/templates/ to /home/xavier/Pictures/cf_og/templates/


In [64]:
m.save_matches(os.getcwd(), "test", "all", "npy")
print("Done!")

Saving matches output to /home/xavier/Documents/git/FeatureMatcher
Done!


In [22]:
a = np.array(["ABC_123_cloaked", "XYZ_123_cloaked", "ABC_234_cloaked", "ZZZ_123_cloaked"])
b = np.array(["XYZ_123", "ABC_123", "ABC_234"])

# a_b = np.zeros(shape=(len(a), len(b)), dtype=np.int8)
# for i in range(len(b)):
#     a_b[]

In [31]:
locations = np.where(np.core.defchararray.find(a[:,np.newaxis], b) >= 0)

In [32]:
np.core.defchararray.find(a[:,np.newaxis], b)

array([[-1,  0, -1],
       [ 0, -1, -1],
       [-1, -1,  0],
       [-1, -1, -1]])

In [30]:
a[:, np.newaxis], b

(array([['ABC_123_cloaked'],
        ['XYZ_123_cloaked'],
        ['ABC_234_cloaked'],
        ['ZZZ_123_cloaked']], dtype='<U15'),
 array(['XYZ_123', 'ABC_123', 'ABC_234'], dtype='<U7'))

In [39]:
locations

(array([0, 1, 2]), array([1, 0, 2]))

In [48]:
loc = np.transpose(locations)
loc

array([[0, 1],
       [1, 0],
       [2, 2]])

In [52]:
for l in loc:
    i, j = l
    print(*l)

0 1
1 0
2 2


In [37]:
x

array([0, 1, 2])

In [38]:
y

array([1, 0, 2])