In [2]:
import torch
import torch.nn.functional as F

from model import Song2Vec

input_shape = (1024, 2048, 3)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = Song2Vec().to(DEVICE)

# Generate two random inputs
input1 = torch.randn(1, *input_shape).to(DEVICE)
input2 = torch.randn(1, *input_shape).to(DEVICE)

# Get embeddings
_, embed1 = model.encode(input1)
_, embed2 = model.encode(input2)

# Compare embeddings
print("Are embeddings equal?", torch.allclose(embed1, embed2))
print("Embedding cosine similarity:", F.cosine_similarity(embed1, embed2))


Are embeddings equal? False
Embedding cosine similarity: tensor([0.9713], grad_fn=<SumBackward1>)


In [3]:
# Replicate forward pass manually and check cosine similarity for intermediate outputs

def cosine_similarity(x1, x2):
    return F.cosine_similarity(x1.flatten(), x2.flatten(), dim=0)

x1 = input1.permute(0, 3, 2, 1)
x2 = input2.permute(0, 3, 2, 1)

# Encoder
x1 = model.encoder(x1)
x2 = model.encoder(x2)
print("After encoder:", cosine_similarity(x1, x2).item())

# Positional encoding
B, L, D = x1.shape
pos = model.w_pe(torch.arange(L, device=DEVICE)).unsqueeze(0).expand(B, -1, -1) * 0.01
context1 = x1 + pos
context2 = x2 + pos
print("After positional encoding:", cosine_similarity(context1, context2).item())

# Add query vector and apply transformer encoder
context_w_query1 = torch.cat([context1, model.query_vector.expand(B, -1, -1)], dim=1)
context_w_query2 = torch.cat([context2, model.query_vector.expand(B, -1, -1)], dim=1)
context_w_query1 = model.transformer_encoder(context_w_query1)
context_w_query2 = model.transformer_encoder(context_w_query2)
print("After transformer encoder:", cosine_similarity(context_w_query1, context_w_query2).item())

# Separate context and z
context1 = context_w_query1[:, :-1, :]
context2 = context_w_query2[:, :-1, :]
z1 = context_w_query1[:, -1, :]
z2 = context_w_query2[:, -1, :]
print("Context after separation:", cosine_similarity(context1, context2).item())
print("Z after separation:", cosine_similarity(z1, z2).item())

# Normalize z
z1 = F.normalize(z1.squeeze(1), p=2, dim=1)
z2 = F.normalize(z2.squeeze(1), p=2, dim=1)
print("After normalization (final embedding):", cosine_similarity(z1, z2).item())

# Decoder (optional, as it's not part of the embedding process)
x1 = model.transformer_decoder(context1)
x2 = model.transformer_decoder(context2)
print("After transformer decoder:", cosine_similarity(x1, x2).item())

x1 = model.decoder(x1)
x2 = model.decoder(x2)
print("After CNN decoder:", cosine_similarity(x1, x2).item())

x1 = x1.permute(0, 3, 2, 1)
x2 = x2.permute(0, 3, 2, 1)
print("Final output:", cosine_similarity(x1, x2).item())


After encoder: 0.8254242539405823
After positional encoding: 0.8261311650276184
After transformer encoder: 0.5842868685722351
Context after separation: 0.5835574865341187
Z after separation: 0.9577295184135437
After normalization (final embedding): 0.9577295780181885
After transformer decoder: 0.753706157207489
After CNN decoder: 0.9974474310874939
Final output: 0.9974244236946106


In [4]:
from baseline_model import Song2Vec

model = Song2Vec().to(DEVICE)

# Generate two random inputs
input1 = torch.randn(1, *input_shape).to(DEVICE)
input2 = torch.randn(1, *input_shape).to(DEVICE)


In [5]:
# Replicate forward pass manually and check cosine similarity for intermediate outputs

def cosine_similarity(x1, x2):
    return F.cosine_similarity(x1.flatten(), x2.flatten(), dim=0)

# x shape: (B, H, W, C)
x1 = input1
x2 = input2

# Encoder
x1 = x1.permute(0, 3, 1, 2)  # Shape: (B, C, H, W)
x2 = x2.permute(0, 3, 1, 2)  # Shape: (B, C, H, W)
x1 = model.encoder(x1)        # Shape: (B, H_enc, W_enc)
x2 = model.encoder(x2)        # Shape: (B, H_enc, W_enc)
print("After encoder:", cosine_similarity(x1, x2).item())

B, H_enc, W_enc = x1.shape
x1 = x1.permute(0, 2, 1)     # Shape: (B, W_enc, H_enc)
x2 = x2.permute(0, 2, 1)     # Shape: (B, W_enc, H_enc)
x1 = x1.contiguous().view(B, W_enc, -1)  # Shape: (B, W_enc, H_enc)
x2 = x2.contiguous().view(B, W_enc, -1)  # Shape: (B, W_enc, H_enc)
print("After reshape:", cosine_similarity(x1, x2).item())

# GRU Encoder
out1, h_n1 = model.gru_encoder(x1)  # out shape: (B, W_enc, 1024), h_n shape: (num_layers * num_directions, B, 512)
out2, h_n2 = model.gru_encoder(x2)
print("After GRU encoder (out):", cosine_similarity(out1, out2).item())
print("After GRU encoder (h_n):", cosine_similarity(h_n1, h_n2).item())

z1 = h_n1[-1]  # Taking the last layer's hidden state as the latent vector, shape: (B, 512)
z2 = h_n2[-1]
print("Z before normalization:", cosine_similarity(z1, z2).item())

z1 = F.normalize(z1, p=2, dim=1)  # Normalize the latent vector
z2 = F.normalize(z2, p=2, dim=1)
print("After normalization (final embedding):", cosine_similarity(z1, z2).item())

# Decoder (optional, as it's not part of the embedding process)
out1, _ = model.gru_decoder(out1)  # Shape: (B, W_enc, 1024)
out2, _ = model.gru_decoder(out2)
print("After GRU decoder:", cosine_similarity(out1, out2).item())

out1 = model.fc_dec(out1)
out2 = model.fc_dec(out2)
print("After fc_dec:", cosine_similarity(out1, out2).item())

x1 = model.decoder(out1.permute(0, 2, 1))  # Shape: (B, 3, H, W)
x2 = model.decoder(out2.permute(0, 2, 1))
print("After CNN decoder:", cosine_similarity(x1, x2).item())

x1 = x1.permute(0, 2, 3, 1)  # Shape: (B, H, W, C)
x2 = x2.permute(0, 2, 3, 1)
print("Final output:", cosine_similarity(x1, x2).item())


After encoder: -0.0015101489843800664
After reshape: -0.0015101489843800664
After GRU encoder (out): 0.07361114770174026
After GRU encoder (h_n): 0.13077013194561005
Z before normalization: 0.17049498856067657
After normalization (final embedding): 0.17049501836299896


RuntimeError: input.size(-1) must be equal to input_size. Expected 1, got 512

In [None]:
wo_orig_mod["batch_norm.running_mean"], wo_orig_mod["batch_norm.running_var"]


In [23]:
state_dict = torch.load("epoch_5.pt", map_location=DEVICE)["model_state_dict"]
wo_orig_mod = {k.replace("_orig_mod.", ""): v for k, v in state_dict.items()}

  state_dict = torch.load("epoch_5.pt", map_location=DEVICE)["model_state_dict"]


In [24]:
state_dict.keys()

odict_keys(['_orig_mod.batch_norm.weight', '_orig_mod.batch_norm.bias', '_orig_mod.batch_norm.running_mean', '_orig_mod.batch_norm.running_var', '_orig_mod.batch_norm.num_batches_tracked', '_orig_mod.encoder.conv1.weight', '_orig_mod.encoder.conv1.bias', '_orig_mod.encoder.conv2.weight', '_orig_mod.encoder.conv2.bias', '_orig_mod.encoder.conv3.weight', '_orig_mod.encoder.conv3.bias', '_orig_mod.encoder.convlast.weight', '_orig_mod.encoder.convlast.bias', '_orig_mod.gru_encoder.weight_ih_l0', '_orig_mod.gru_encoder.weight_hh_l0', '_orig_mod.gru_encoder.bias_ih_l0', '_orig_mod.gru_encoder.bias_hh_l0', '_orig_mod.gru_encoder.weight_ih_l0_reverse', '_orig_mod.gru_encoder.weight_hh_l0_reverse', '_orig_mod.gru_encoder.bias_ih_l0_reverse', '_orig_mod.gru_encoder.bias_hh_l0_reverse', '_orig_mod.gru_encoder.weight_ih_l1', '_orig_mod.gru_encoder.weight_hh_l1', '_orig_mod.gru_encoder.bias_ih_l1', '_orig_mod.gru_encoder.bias_hh_l1', '_orig_mod.gru_encoder.weight_ih_l1_reverse', '_orig_mod.gru_enco

In [15]:
from baseline_model import Song2Vec

model = Song2Vec().to(DEVICE)
state_dict = torch.load("step_560.pt", map_location=DEVICE)["model_state_dict"]
wo_orig_mod = {k.replace("_orig_mod.", ""): v for k, v in state_dict.items()}
model.load_state_dict(wo_orig_mod)

model.eval()

with torch.no_grad():
    output = model(input1)
    print(output)

  state_dict = torch.load("step_560.pt", map_location=DEVICE)["model_state_dict"]


(tensor([[[[-2.1421e-02,  1.2490e-02, -3.8730e-01],
          [-2.4051e-01, -3.7790e-01, -4.8709e-01],
          [-2.7372e-01, -4.4416e-01, -5.8825e-01],
          ...,
          [-2.7992e-01, -4.0753e-01, -5.0603e-01],
          [-2.7981e-01, -4.3660e-01, -5.6459e-01],
          [-1.1806e-04,  7.6912e-02, -4.5826e-01]],

         [[-1.4067e-01, -1.7305e-01, -5.0333e-01],
          [-5.5233e-01, -9.0192e-01, -6.2505e-01],
          [-5.3559e-01, -8.6621e-01, -6.3778e-01],
          ...,
          [-5.1352e-01, -8.3766e-01, -7.6831e-01],
          [-5.3875e-01, -9.2243e-01, -5.7940e-01],
          [-1.4377e-01, -1.7548e-01, -5.1291e-01]],

         [[-1.4946e-01, -1.8562e-01, -4.6224e-01],
          [-6.3752e-01, -1.0346e+00, -7.1471e-01],
          [-6.3791e-01, -1.0296e+00, -7.6039e-01],
          ...,
          [-6.6915e-01, -1.0856e+00, -7.3696e-01],
          [-6.6520e-01, -1.0835e+00, -7.2113e-01],
          [-1.7430e-01, -2.1340e-01, -5.2969e-01]],

         ...,

         [[ 1.6

In [16]:
import glob
import numpy as np
from preprocess import process_audio_file


# Get first 100 files from fma_small
fma_small_files = sorted(glob.glob('data/fma_small/**/*.mp3'))[:100]



In [17]:
from tqdm import tqdm
import torch
from torch.utils.data import DataLoader, Dataset

class AudioDataset(Dataset):
    def __init__(self, file_paths):
        self.file_paths = file_paths

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        audio_array = process_audio_file(self.file_paths[idx])
        audio_array = np.ascontiguousarray(audio_array)
        return torch.from_numpy(audio_array).float()

# Create dataset and dataloader
dataset = AudioDataset(fma_small_files)
batch_size = 16  # Adjust based on your GPU memory
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

# Calculate mean and variance of audio arrays
embeddings = []
model.eval()
with torch.no_grad():
    for batch in tqdm(dataloader, desc="Processing audio files"):
        batch = batch.to(DEVICE)
        batch_embeddings, _ = model.encode(batch)
        embeddings.append(batch_embeddings.cpu().numpy())

embeddings = np.vstack(embeddings)


Processing audio files: 100%|██████████| 7/7 [00:52<00:00,  7.52s/it]


In [18]:
# Calculate mean and variance of songs in the dataset
num_songs = len(fma_small_files)
song_mean = 0
song_variance = 0

# Calculate mean
for file_path in tqdm(fma_small_files, desc="Calculating mean"):
    audio = process_audio_file(file_path)
    song_mean += np.mean(audio)
song_mean /= num_songs

# Calculate variance
for file_path in tqdm(fma_small_files, desc="Calculating variance"):
    audio = process_audio_file(file_path)
    song_variance += (np.mean(audio) - song_mean) ** 2
song_variance /= (num_songs - 1)

print("Mean of songs in the dataset:")
print(song_mean)

print("\nVariance of songs in the dataset:")
print(song_variance)


Calculating mean:   3%|▎         | 3/100 [00:00<00:27,  3.57it/s]


KeyboardInterrupt: 

In [None]:
model.batch_norm.running_mean, model.batch_norm.running_var

In [None]:
# audio_mean = np.mean(all_audio, axis=(0, 1, 2))
# audio_variance = np.var(all_audio, axis=(0, 1, 2))

# print("Mean of audio arrays per channel:")
# for i, mean in enumerate(audio_mean):
#     print(f"Channel {i}: {mean:.4f}")

# print("\nVariance of audio arrays per channel:")
# for i, var in enumerate(audio_variance):
#     print(f"Channel {i}: {var:.4f}")

In [19]:
import pandas as pd

from sklearn.metrics.pairwise import cosine_similarity

In [20]:
# Process a single song as the query (using the first song in the list)
query_file = fma_small_files[11]
query_audio = process_audio_file(query_file)
query_audio = np.ascontiguousarray(query_audio)  # Ensure contiguous memory layout
query_tensor = torch.from_numpy(query_audio).float().unsqueeze(0).to(DEVICE)
with torch.no_grad():
    query_embedding, _ = model.encode(query_tensor)


# Calculate cosine similarities
similarities = cosine_similarity(query_embedding, embeddings)[0]

# Get top 5 most similar and least similar songs
top_5_indices = np.argsort(similarities)[-5:][::-1]
bottom_5_indices = np.argsort(similarities)[:5]
top_5_similarities = similarities[top_5_indices]
bottom_5_similarities = similarities[bottom_5_indices]

print(f"Top 5 most similar songs to {query_file}:")
for i, (index, similarity) in enumerate(zip(top_5_indices, top_5_similarities), 1):
    print(f"{i}. Song {index+1}: Similarity = {similarity:.4f}")

print(f"\nTop 5 least similar songs to {query_file}:")
for i, (index, similarity) in enumerate(zip(bottom_5_indices, bottom_5_similarities), 1):
    print(f"{i}. Song {index+1}: Similarity = {similarity:.4f}")

Top 5 most similar songs to data/fma_small/000/000200.mp3:
1. Song 12: Similarity = 1.0000
2. Song 75: Similarity = 0.9998
3. Song 73: Similarity = 0.9998
4. Song 32: Similarity = 0.9997
5. Song 86: Similarity = 0.9996

Top 5 least similar songs to data/fma_small/000/000200.mp3:
1. Song 62: Similarity = -0.9938
2. Song 61: Similarity = -0.8621
3. Song 60: Similarity = -0.8520
4. Song 58: Similarity = -0.8265
5. Song 37: Similarity = -0.6531


In [13]:
bottom_5_indices

array([61, 60, 59, 57, 41])

In [21]:
import ipywidgets as widgets
from IPython.display import display, Audio
import librosa

def play_audio(file_path):
    y, sr = librosa.load(file_path, duration=30)  # Load up to 30 seconds
    return Audio(data=y, rate=sr)

def create_audio_player(file_path, title):
    audio = play_audio(file_path)
    return widgets.VBox([
        widgets.HTML(f"<b>{title}</b>"),
        widgets.HTML(audio._repr_html_())
    ])

# Create audio players for query song and top 5 similar songs
query_player = create_audio_player(query_file, "Query Song")

top_5_players = [
    create_audio_player(fma_small_files[index], f"Top {i+1} Similar (Similarity: {similarity:.4f})")
    for i, (index, similarity) in enumerate(zip(top_5_indices, top_5_similarities))
]

# Create audio players for bottom 5 similar songs
bottom_5_players = [
    create_audio_player(fma_small_files[index], f"Bottom {i+1} Similar (Similarity: {similarity:.4f})")
    for i, (index, similarity) in enumerate(zip(bottom_5_indices, bottom_5_similarities))
]

# Display all players
display(widgets.VBox([
    query_player,
    widgets.HBox([
        widgets.VBox([widgets.HTML("<b>Top 5 Most Similar</b>")] + top_5_players),
        widgets.VBox([widgets.HTML("<b>Bottom 5 Least Similar</b>")] + bottom_5_players)
    ])
]))


VBox(children=(VBox(children=(HTML(value='<b>Query Song</b>'), HTML(value='\n                <audio  controls=…