### Import dependencies

In [2]:
import numpy as np
from collections import Counter
import os
import json
import numpy as np
from scipy.special import softmax
from pybktree import BKTree
from tqdm import tqdm

### Check if the data exists

In [3]:
labeled_embeddings_file = "../outputs/cache/embeddings.pkl"
unlabeled_embeddings_file = "../outputs/cache/reddit_embeddings_alexnet.pkl"

labeled_phases_files = "./pHash/labeled_phashes.txt"
unlabeled_phases_files = "./pHash/unlabeled_phashes.txt"

if os.path.exists(labeled_phases_files):
    print("File found")
else:
    raise FileNotFoundError("Labeled phash file not found")

if os.path.exists(unlabeled_phases_files):
    print("File found")
else:
    raise FileNotFoundError("Unlabeled phash file not found")

if os.path.exists(labeled_embeddings_file):
    print("File found")
else:
    raise FileNotFoundError("Labeled embeddings file not found")

if os.path.exists(unlabeled_embeddings_file):
    print("File found")
else:
    raise FileNotFoundError("Unlabeled embeddings file not found")


File found
File found
File found
File found


### Load the data

If phash is True, we load the phash data, otherwise we load the embeddings.

In [4]:

phash = True

def load_embeddings(embedding_file, unlabeled=False):
    df = pd.read_pickle(embedding_file)
    if unlabeled:
        path = df['path'].to_numpy()
        embeddings = np.vstack(df['embedding'].apply(np.array).to_numpy()).astype(np.float16)
        return embeddings, None, path
    else:
        path = df['path'].to_numpy()
        class_names = df['class_name'].to_numpy()
        embeddings = np.vstack(df['embedding'].apply(np.array).to_numpy()).astype(np.float16)
        return embeddings, class_names, path

    
def load_phases(phases_files):
    phashes = []
    imgage_paths = []
    class_names = []
    # Open the file containing the phashes
    with open(phases_files, 'r') as f:
        # Read the file into a list of lines
        lines = f.readlines()
    for line in lines:
        # Split the line into the filename and phash
        path, phash = line.strip().split('\t')
        imgage_paths.append(path)
        parts = os.path.normpath(path).split(os.sep)
        if "finetuning" in parts:
            #Get the dir of the image
            class_name = path.split('\\')[-2]
        else:
            class_name = parts[-2]
        # Get the class name from the path
        class_names.append(class_name)
        phashes.append(phash)
    return np.array(phashes), np.array(class_names), np.array(imgage_paths)

def phash_to_bin(phashes):
    binary_repr = [bin(int(phash, 16))[2:].zfill(64) for phash in phashes]
    return np.array([[int(bit) for bit in bin_str] for bin_str in binary_repr], dtype=np.uint8)

if not phash:
    # Load the embeddings
    print("Loading labeled embeddings")
    labeled_embeddings, labeled_templates, _ = load_embeddings(labeled_embeddings_file)
    print("Loading unlabeled embeddings")
    unlabeled_embeddings, _, image_paths = load_embeddings(unlabeled_embeddings_file, unlabeled=True)

else:
    # Load the phashes
    labeled_embeddings, labeled_templates, _ = load_phases(labeled_phases_files)
    labeled_embeddings = phash_to_bin(labeled_embeddings)
    unlabeled_embeddings, _, image_paths = load_phases(unlabeled_phases_files)
    unlabeled_embeddings = phash_to_bin(unlabeled_embeddings)

print("finished loading")
print(type(unlabeled_embeddings))
print(unlabeled_embeddings)


finished loading
<class 'numpy.ndarray'>
[[0 0 0 ... 1 0 1]
 [0 1 1 ... 1 0 1]
 [1 0 0 ... 1 1 0]
 ...
 [0 1 1 ... 0 1 0]
 [1 1 1 ... 0 1 0]
 [1 1 1 ... 0 1 1]]


Assign labels to phrases
------------------------

For phashes we use BKTree to build the tree and Hamming-distance to find the nearest neighbors.

In [6]:
def hamming_distance_tuple(a, b):
    """Calculate the Hamming distance between two tuples, considering only the second element."""
    return hamming_distance(a[1], b[1])

def hamming_distance(a, b):
    # a and b are numpy arrays containing 0s and 1s
    return np.count_nonzero(a != b)

# Assuming that we have your data in numpy arrays
labels = labeled_templates

# Initialize BK-tree with the Hamming distance function
bk_tree = BKTree(hamming_distance_tuple)

# Populate BK-tree with labeled embeddings
for i in range(labeled_embeddings.shape[0]):
    bk_tree.add((labels[i], labeled_embeddings[i]))

# Set a distance threshold for classifying a point as "templateless"
distance_threshold = 28

# Assign labels to the unlabeled points
assigned_labels = []
confidence_scores = []

def assign_labels(unlabeled_embeddings):
    for unlabeled_embedding in tqdm(unlabeled_embeddings, desc="Assigning labels", total=unlabeled_embeddings.shape[0]):
        try:
            neighbors = bk_tree.find((None, unlabeled_embedding), distance_threshold)
            if not neighbors:
                # If no neighbors within distance threshold, classify as "templateless"
                assigned_labels.append("templateless")
                confidence_scores.append(0.0)
                continue

            # Consider top 3 nearest neighbors
            top_neighbors = sorted(neighbors, key=lambda x: x[0])[:3]

            # Unpack labels and distances from the neighbors
            nearest_labels = [label for distance, (label, _) in top_neighbors]
            distances = np.array([distance for distance, _ in top_neighbors])
            
            # Apply weighted voting for label assignment
            inverse_distances = 1 / (1 + distances)
            weights = softmax(inverse_distances)
            weighted_votes = Counter()
            for label, weight in zip(nearest_labels, weights):
                weighted_votes[label] += weight

            # Assign the label with the highest weighted votes
            most_common_label, highest_weighted_vote = weighted_votes.most_common(1)[0]
            assigned_labels.append(most_common_label)

            # Use the highest weighted vote as the confidence score
            confidence_scores.append(highest_weighted_vote)
        except KeyboardInterrupt:
            return assigned_labels, confidence_scores
    return assigned_labels, confidence_scores

assigned_labels, confidence_scores = assign_labels(unlabeled_embeddings)    
# Convert lists to numpy arrays
assigned_labels = np.array(assigned_labels)
confidence_scores = np.array(confidence_scores)


Assigning labels: 100%|██████████| 1000/1000 [12:22<00:00,  1.35it/s]


After all the phashes have been assigned a label, we can evaluate the quality of the clustering.

In [7]:
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score

# Convert labels to integers for the metrics functions
label_to_int = {label: i for i, label in enumerate(set(assigned_labels))}
labels_int = np.array([label_to_int[label] for label in assigned_labels])

print('Calculating Silhouette Coefficient...')
silhouette = silhouette_score(unlabeled_embeddings, labels_int)
print('Silhouette Coefficient:', silhouette)

print('Calculating Calinski-Harabasz Index...')
calinski_harabasz = calinski_harabasz_score(unlabeled_embeddings, labels_int)
print('Calinski-Harabasz Index:', calinski_harabasz)

print('Calculating Davies-Bouldin Index...')
davies_bouldin = davies_bouldin_score(unlabeled_embeddings, labels_int)
print('Davies-Bouldin Index:', davies_bouldin)


Calculating Silhouette Coefficient...
Silhouette Coefficient: -0.02024418170567639
Calculating Calinski-Harabasz Index...
Calinski-Harabasz Index: 2.4129534652132167
Calculating Davies-Bouldin Index...
Davies-Bouldin Index: 0.8584082928801149


Lastly we can save the results to a JSON file.

In [80]:
import json
import pickle

np.save('assigned_labels.npy', assigned_labels)
np.save('confidence_scores.npy', confidence_scores)


# Step 5: Save results to JSON file
print(image_paths.shape)

# Collecting the results
results = dict()

for path, label, confidence in zip(image_paths, assigned_labels, confidence_scores):
    if label not in results:
        results[label] = {"cluster_name": label, "images": {}}
    results[label]["images"][path] = float(confidence)  # convert numpy float to Python float

results['metrics'] = {
    'silhouette_score': silhouette,
    'calinski_harabasz_score': calinski_harabasz,
    'davies_bouldin_score': davies_bouldin,
}

# Save the results to a JSON file
with open('../outputs/clusters/jsons/phash_results.json', 'w') as f:
    json.dump(list(results.values()), f, indent=4)

(17436,)


Assign labels to reddit embeddings
---

In [35]:
from vptree import VPTree
import numpy as np
from collections import Counter
from scipy.special import softmax
from sklearn.preprocessing import normalize
from scipy.spatial import distance

def cosine_similarity_tuple(a, b):
    """Calculate the cosine similarity between two tuples, considering only the second element."""
    return distance.cosine(a[1], b[1])

# Load the embeddings
print("Loading labeled embeddings")
labeled_embeddings, labeled_templates, _ = load_embeddings(labeled_embeddings_file)

# Assuming that we have your data in numpy arrays
labels = labeled_templates

# Normalize embeddings to have unit norm, this makes cosine similarity work as expected
labeled_embeddings = normalize(labeled_embeddings, norm='l2', axis=1)

# Initialize VP-tree with the cosine similarity function
vp_tree = VPTree(list(zip(labels, labeled_embeddings)), cosine_similarity_tuple)

Loading labeled embeddings


  arr = asarray(arr)


In [36]:
labeled_embeddings.shape

(800, 2048)

In [62]:
from tqdm import tqdm

print("Loading unlabeled embeddings")
unlabeled_embeddings, _, image_paths = load_embeddings(unlabeled_embeddings_file, unlabeled=True)

# Set a distance threshold for classifying a point as "templateless"
# The range of cosine similarity is [-1, 1], so the distance threshold should be in the range [0, 2]
distance_threshold = 0.32

# Assign labels to the unlabeled points
assigned_labels = []
confidence_scores = []

for unlabeled_embedding in tqdm(unlabeled_embeddings, desc="Assigning labels", total=unlabeled_embeddings.shape[0]):
    neighbors = vp_tree.get_all_in_range((None, unlabeled_embedding), distance_threshold)
    if not neighbors:
        # If no neighbors within distance threshold, classify as "templateless"
        assigned_labels.append("templateless")
        confidence_scores.append(0.0)
        continue

    # Consider top 3 nearest neighbors
    top_neighbors = sorted(neighbors, key=lambda x: x[0])[:3]


    # Unpack labels and distances from the neighbors
    nearest_labels = [label for distance, (label, _) in top_neighbors]
    distances = np.array([distance for distance, _ in top_neighbors])
    
    # Apply weighted voting for label assignment
    inverse_distances = 1 / (1 + distances)
    weights = softmax(inverse_distances)
    weighted_votes = Counter()
    for label, weight in zip(nearest_labels, weights):
        weighted_votes[label] += weight

    # Assign the label with the highest weighted votes
    most_common_label, highest_weighted_vote = weighted_votes.most_common(1)[0]
    assigned_labels.append(most_common_label)

    # Use the highest weighted vote as the confidence score
    confidence_scores.append(highest_weighted_vote)

# Convert lists to numpy arrays
assigned_labels = np.array(assigned_labels)
confidence_scores = np.array(confidence_scores)


Loading unlabeled embeddings


Assigning labels: 100%|██████████| 200/200 [00:14<00:00, 14.27it/s]


In [None]:
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score

# Convert labels to integers for the metrics functions
label_to_int = {label: i for i, label in enumerate(set(assigned_labels))}
labels_int = np.array([label_to_int[label] for label in assigned_labels])

print('Calculating Silhouette Coefficient...')
silhouette = silhouette_score(unlabeled_embeddings, labels_int)
print('Silhouette Coefficient:', silhouette)

print('Calculating Calinski-Harabasz Index...')
calinski_harabasz = calinski_harabasz_score(unlabeled_embeddings, labels_int)
print('Calinski-Harabasz Index:', calinski_harabasz)

print('Calculating Davies-Bouldin Index...')
davies_bouldin = davies_bouldin_score(unlabeled_embeddings, labels_int)
print('Davies-Bouldin Index:', davies_bouldin)


In [63]:
# Count the number of templateless images
num_templateless = np.sum(assigned_labels == "templateless")
print(f"Number of templateless images: {num_templateless}")

# Get the index of templateless images
templateless_indices = np.where(assigned_labels == "templateless")[0]

# Get the image paths of templateless images
templateless_image_paths = image_paths[templateless_indices]

print(templateless_image_paths)

Number of templateless images: 2
['/storage/kym-datasets/Memes2023_splitted_resized/finetuning/val/ytmnd/ytmnd_15.jpg'
 '/storage/kym-datasets/Memes2023_splitted_resized/finetuning/val/yume-nikki/yume-nikki_2.png']


In [None]:
import json
import pickle

np.save('assigned_labels.npy', assigned_labels)
np.save('confidence_scores.npy', confidence_scores)


# Step 5: Save results to JSON file
print(image_paths.shape)

# Collecting the results
results = dict()

for path, label, confidence in zip(image_paths, assigned_labels, confidence_scores):
    if label not in results:
        results[label] = {"cluster_name": label, "images": {}}
    results[label]["images"][path] = float(confidence)  # convert numpy float to Python float

results['metrics'] = {
    'silhouette_score': silhouette,
    'calinski_harabasz_score': calinski_harabasz,
    'davies_bouldin_score': davies_bouldin,
}

# Save the results to a JSON file
with open('../outputs/clusters/jsons/embeddings_results.json', 'w') as f:
    json.dump(list(results.values()), f, indent=4)

(17436,)
