In [3]:
import numpy as np
import os
import torch
from sklearn.preprocessing import OneHotEncoder
from tqdm import tqdm

def load_and_process_embeddings(speaker_dir, num_layers=25, max_samples=50, m_max=100):
    embeddings = [[] for _ in range(num_layers)]
    for layer in range(num_layers):
        layer_dir = os.path.join(speaker_dir, f"layer_{layer}")
        layer_files = sorted(os.listdir(layer_dir))[:max_samples]
        for file in layer_files:
            file_path = os.path.join(layer_dir, file)
            embedding = np.load(file_path)  # Shape: (1024, m)
            if embedding.shape[1] < m_max:
                padded_embedding = np.pad(embedding, ((0, 0), (0, m_max - embedding.shape[1])), 'constant')
            else:
                padded_embedding = embedding[:, :m_max]
            embeddings[layer].append(padded_embedding.flatten())  # Flatten to 1D
    return [np.array(embeddings[layer]) for layer in range(num_layers)]

def cca_cpu(X, Y_batch, reg=1e-4):
    X_mean = np.mean(X, axis=0)
    Y_mean = np.mean(Y_batch, axis=0)
    X_centered = X - X_mean
    Y_centered = Y_batch - Y_mean

    cov_XY = np.dot(X_centered.T, Y_centered) / (X.shape[0] - 1)
    cov_XX = np.dot(X_centered.T, X_centered) / (X.shape[0] - 1) + reg * np.eye(X_centered.shape[1])
    cov_YY = np.dot(Y_centered.T, Y_centered) / (Y_batch.shape[0] - 1) + reg * np.eye(Y_centered.shape[1])

    eigvals, eigvecs = np.linalg.eig(np.dot(np.linalg.inv(cov_XX), np.dot(cov_XY, np.dot(np.linalg.inv(cov_YY), cov_XY.T))))

    return np.mean(np.real(eigvals[-min(X.shape[1], Y_batch.shape[1]):]))

# Main script
speaker_dirs = [
    "/content/drive/MyDrive/enhancing_speaker_recognition_evaluation/data/speaker1",
    "/content/drive/MyDrive/enhancing_speaker_recognition_evaluation/data/speaker2",
    "/content/drive/MyDrive/enhancing_speaker_recognition_evaluation/data/speaker3"
]

X_layers = [[] for _ in range(25)]
y = []

for i, speaker_dir in enumerate(speaker_dirs):
    print(f"Loading data for speaker {i+1} from {speaker_dir}")
    speaker_embeddings = load_and_process_embeddings(speaker_dir, num_layers=25, max_samples=50, m_max=100)
    for layer in range(25):
        X_layers[layer].extend(speaker_embeddings[layer])
    y.extend([i] * len(speaker_embeddings[0]))

y = np.array(y)
print(f"Total samples loaded: {len(y)}")

encoder = OneHotEncoder()
Y = encoder.fit_transform(y.reshape(-1, 1)).toarray()
print(f"Shape of Y after one-hot encoding: {Y.shape}")


Loading data for speaker 1 from /content/drive/MyDrive/enhancing_speaker_recognition_evaluation/data/speaker1
Loading data for speaker 2 from /content/drive/MyDrive/enhancing_speaker_recognition_evaluation/data/speaker2
Loading data for speaker 3 from /content/drive/MyDrive/enhancing_speaker_recognition_evaluation/data/speaker3
Total samples loaded: 150
Shape of Y after one-hot encoding: (150, 3)


In [2]:
canonical_correlations = []
for layer in tqdm(range(25), desc="Processing Layers"):
    X_layer = np.array(X_layers[layer])
    if X_layer.shape[0] == 0:
        canonical_correlations.append(0)
        continue

    layer_correlations = []
    num_batches = X_layer.shape[0] // 2  # Reducing batch size to 2
    for i in range(num_batches):
        X_batch = X_layer[i*2:(i+1)*2]
        Y_batch = Y[i*2:(i+1)*2]

        if np.var(X_batch) == 0 or np.var(Y_batch) == 0:
            print(f"Zero variance in batch {i} for layer {layer+1}. Skipping.")
            continue

        correlation = cca_cpu(X_batch, Y_batch)
        layer_correlations.append(correlation)

    if layer_correlations:
        canonical_correlations.append(np.mean(layer_correlations))
    else:
        canonical_correlations.append(0)

    # Free up memory
    del X_layer
    del X_batch
    del Y_batch
    torch.cuda.empty_cache()

best_layer_index = np.argmax(canonical_correlations)
print(f'The layer with the highest canonical correlation is Layer {best_layer_index + 1}')

print("Canonical Correlation Values for each layer:")
for i, correlation in enumerate(canonical_correlations):
    print(f"Layer {i+1}: {correlation}")

import matplotlib.pyplot as plt
plt.plot(range(1, 26), canonical_correlations, marker='o')
plt.xlabel('Layer')
plt.ylabel('Canonical Correlation')
plt.title('Canonical Correlation by Layer')
plt.show()

NameError: name 'tqdm' is not defined