In [8]:
import librosa
import numpy as np
import os

### Audio Preprocessing ###

In [5]:
def preprocess_audio(file_path, duration=30, sr=16000):
    y, _ = librosa.load(file_path, sr=sr)
    if len(y) > sr * duration:
        y = y[:sr * duration]
    else:
        y = np.pad(y, (0, sr * duration - len(y)), 'constant')
    return y

make sure you have 30 second snippets of your songs in AUDIO_DIR

In [None]:
AUDIO_DIR = "/content/wav_snippets"
PROCESSED_AUDIO_DIR = "/content/processed_audio"
os.makedirs(PROCESSED_AUDIO_DIR, exist_ok=True)

In [9]:
audio_files = [f for f in os.listdir(AUDIO_DIR) if f.endswith(('.mp3', '.wav'))]

for file_name in audio_files:
    file_path = os.path.join(AUDIO_DIR, file_name)
    y = preprocess_audio(file_path)
    np.save(os.path.join(PROCESSED_AUDIO_DIR, file_name + '.npy'), y)

### Original Embeddings ###

In [12]:
from torchvggish import vggish, vggish_input
import torch

In [None]:
if torch.backends.mps.is_available():
    device = torch.device("mps") 
elif torch.cuda.is_availble:
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
model = vggish()
model.to(device)
model.eval()

Downloading: "https://github.com/harritaylor/torchvggish/releases/download/v0.1/vggish_pca_params-970ea276.pth" to /root/.cache/torch/hub/checkpoints/vggish_pca_params-970ea276.pth
100%|██████████| 177k/177k [00:00<00:00, 1.21MB/s]
Downloading: "https://github.com/harritaylor/torchvggish/releases/download/v0.1/vggish-10086976.pth" to /root/.cache/torch/hub/checkpoints/vggish-10086976.pth
100%|██████████| 275M/275M [00:10<00:00, 26.9MB/s]


VGG(
  (features): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (11): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): ReLU(inplace=True)
    (13): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (14): ReLU(inplace=True)
    (15): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
 

In [14]:
def extract_embedding(audio, sr=16000):
    examples = vggish_input.waveform_to_examples(audio, sr)
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    model.to(device)
    model.pproc._pca_matrix = model.pproc._pca_matrix.to(device)
    model.pproc._pca_means = model.pproc._pca_means.to(device)
    examples = examples.to(device)

    with torch.no_grad():
        embedding = model(examples)

    return embedding.cpu().numpy()

In [15]:
from tqdm import tqdm

In [16]:
EMBEDDINGS_DIR = '/content/embeddings'
os.makedirs(EMBEDDINGS_DIR, exist_ok=True)

for file_name in tqdm(audio_files):
    audio_path = os.path.join(PROCESSED_AUDIO_DIR, file_name + '.npy')
    audio = np.load(audio_path)
    embedding = extract_embedding(audio)
    np.save(os.path.join(EMBEDDINGS_DIR, file_name + '_embedding.npy'), embedding)

100%|██████████| 30/30 [00:34<00:00,  1.14s/it]


### Training with Feedback ###

In [21]:
import torch
from torch import nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os

In [19]:
def prepare_feedback_pairs(feedback):
    pairs = []
    for item in feedback:
        try:
            song_a = item['song_a']
            song_b = item['song_b']
            score = item['similarity_score']

            embedding_a = np.load(os.path.join(EMBEDDINGS_DIR, song_a + '_embedding.npy'))
            embedding_b = np.load(os.path.join(EMBEDDINGS_DIR, song_b + '_embedding.npy'))

            assert embedding_a.shape == (31, 128), f"Unexpected shape for {song_a}: {embedding_a.shape}"
            assert embedding_b.shape == (31, 128), f"Unexpected shape for {song_b}: {embedding_b.shape}"

            pairs.append((embedding_a, embedding_b, score))

        except Exception as e:
            print(f"Error processing pair {song_a} - {song_b}: {str(e)}")
            continue

    return pairs

In [22]:
class FeedbackDataset(Dataset):
    def __init__(self, pairs):
        self.pairs = pairs

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        embedding_a, embedding_b, score = self.pairs[idx]

        embedding_a = torch.tensor(embedding_a, dtype=torch.float32)  # [31, 128]
        embedding_b = torch.tensor(embedding_b, dtype=torch.float32)  # [31, 128]
        score = torch.tensor(score, dtype=torch.float32)

        return embedding_a, embedding_b, score

In [24]:
def collate_fn(batch):
    embeddings_a, embeddings_b, scores = zip(*batch)

    embeddings_a = torch.stack(embeddings_a)  # [batch_size, 31, 128]
    embeddings_b = torch.stack(embeddings_b)  # [batch_size, 31, 128]
    scores = torch.stack(scores)  # [batch_size]

    return embeddings_a, embeddings_b, scores

In [25]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()

        self.seq_length = 31
        self.embedding_dim = 128
        self.hidden_dim = 256

        self.frame_encoder = nn.Sequential(
            nn.Linear(self.embedding_dim, self.hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(self.hidden_dim, self.embedding_dim),
            nn.LayerNorm(self.embedding_dim)
        )

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=self.embedding_dim,
            nhead=8,
            dim_feedforward=512,
            dropout=0.2,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(
            encoder_layer,
            num_layers=3
        )

        self.similarity_net = nn.Sequential(
            nn.Linear(self.embedding_dim, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )

    def forward_one(self, x):
        encoded = self.frame_encoder(x)  # [batch_size, 31, 128]
        transformed = self.transformer(encoded)  # [batch_size, 31, 128]
        pooled = torch.mean(transformed, dim=1)  # [batch_size, 128]

        return pooled

    def forward(self, x1, x2):
        out1 = self.forward_one(x1)  # [batch_size, 128]
        out2 = self.forward_one(x2)  # [batch_size, 128]

        diff = torch.abs(out1 - out2)  # [batch_size, 128]

        similarity = self.similarity_net(diff)
        return similarity.squeeze()

In [26]:
def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    num_batches = len(dataloader)

    for batch_idx, (embedding_a, embedding_b, scores) in enumerate(dataloader):
        embedding_a = embedding_a.to(device)  # [batch_size, 31, 128]
        embedding_b = embedding_b.to(device)  # [batch_size, 31, 128]
        scores = scores.to(device)  # [batch_size]

        optimizer.zero_grad()
        predictions = model(embedding_a, embedding_b)

        loss = criterion(predictions, scores)

        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()

        total_loss += loss.item()

        if batch_idx % 5 == 0:
            print(f'Batch {batch_idx}/{num_batches}, Loss: {loss.item():.4f}')

    return total_loss / num_batches

In [65]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = SiameseNetwork().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)
criterion = nn.MSELoss()

In [32]:
import json

Add file_path to similarity scores json below

In [None]:
file_path = "..."

with open(file_path, "r") as f:
    data = json.load(f)

In [61]:
feedback_pairs = prepare_feedback_pairs(data)
feedback_dataset = FeedbackDataset(feedback_pairs)
feedback_dataloader = DataLoader(
    feedback_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=0,
    collate_fn=collate_fn
)

In [62]:
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.5,
    patience=5,
    verbose=True
)



In [63]:
num_epochs = 100
best_loss = float('inf')

In [66]:
for epoch in range(num_epochs):
    train_loss = train_epoch(model, feedback_dataloader, optimizer, criterion, device)

    scheduler.step(train_loss)

    if train_loss < best_loss:
        best_loss = train_loss
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': best_loss,
        }, 'best_siamese_model.pt')

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {train_loss:.4f}')

Batch 0/23, Loss: 0.0785
Batch 5/23, Loss: 0.1235
Batch 10/23, Loss: 0.0805
Batch 15/23, Loss: 0.1026
Batch 20/23, Loss: 0.0882
Epoch 1/100, Loss: 0.0899
Batch 0/23, Loss: 0.1095


  return F.mse_loss(input, target, reduction=self.reduction)


Batch 5/23, Loss: 0.0498
Batch 10/23, Loss: 0.0483
Batch 15/23, Loss: 0.0857
Batch 20/23, Loss: 0.0763
Epoch 2/100, Loss: 0.0807
Batch 0/23, Loss: 0.0350
Batch 5/23, Loss: 0.0393
Batch 10/23, Loss: 0.0942
Batch 15/23, Loss: 0.1017
Batch 20/23, Loss: 0.0545
Epoch 3/100, Loss: 0.0691
Batch 0/23, Loss: 0.0669
Batch 5/23, Loss: 0.0534
Batch 10/23, Loss: 0.0618
Batch 15/23, Loss: 0.0716
Batch 20/23, Loss: 0.0287
Epoch 4/100, Loss: 0.0602
Batch 0/23, Loss: 0.1010
Batch 5/23, Loss: 0.0706
Batch 10/23, Loss: 0.0602
Batch 15/23, Loss: 0.0525
Batch 20/23, Loss: 0.0577
Epoch 5/100, Loss: 0.0627
Batch 0/23, Loss: 0.0804
Batch 5/23, Loss: 0.0914
Batch 10/23, Loss: 0.0553
Batch 15/23, Loss: 0.0726
Batch 20/23, Loss: 0.0884
Epoch 6/100, Loss: 0.0552
Batch 0/23, Loss: 0.0302
Batch 5/23, Loss: 0.0329
Batch 10/23, Loss: 0.0436
Batch 15/23, Loss: 0.1198
Batch 20/23, Loss: 0.0417
Epoch 7/100, Loss: 0.0541
Batch 0/23, Loss: 0.0470
Batch 5/23, Loss: 0.0778
Batch 10/23, Loss: 0.0386
Batch 15/23, Loss: 0.0370

### Update Embeddings ###

In [39]:
def update_embeddings(embeddings, file_names, siamese_model, device, batch_size=32):
    siamese_model.eval()
    updated_embeddings = []

    for i in range(0, len(embeddings), batch_size):
        batch = embeddings[i:i + batch_size]

        batch_tensor = torch.tensor(batch, dtype=torch.float32).to(device)

        if len(batch_tensor.shape) == 2:
            batch_tensor = batch_tensor.unsqueeze(0)

        with torch.no_grad():
            updated_batch = siamese_model.forward_one(batch_tensor)
            updated_embeddings.extend(updated_batch.cpu().numpy())

    return np.array(updated_embeddings)

In [40]:
def cosine_similarity(a, b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

In [42]:
def get_similar_songs(track_id, embeddings_dict, siamese_model, device, top_n=10):
    siamese_model.eval()

    query_song = track_id + '.wav'

    query_embedding = embeddings_dict[track_id]
    query_tensor = torch.tensor(query_embedding, dtype=torch.float32).unsqueeze(0).to(device)

    similarities = []

    for song_id in embeddings_dict.keys():
        if song_id + '.wav' == query_song:
            continue

        comp_embedding = embeddings_dict[song_id]
        comp_tensor = torch.tensor(comp_embedding, dtype=torch.float32).unsqueeze(0).to(device)

        with torch.no_grad():
            query_updated = siamese_model.forward_one(query_tensor)
            comp_updated = siamese_model.forward_one(comp_tensor)

            query_updated_np = query_updated.cpu().numpy()
            comp_updated_np = comp_updated.cpu().numpy()

            similarity = cosine_similarity(query_updated_np.flatten(), comp_updated_np.flatten())
            similarities.append((song_id, similarity))

    similarities.sort(key=lambda x: x[1], reverse=True)

    print(f"\nUpdated similar songs to {track_id}:")
    for song_id, score in similarities[:top_n]:
        print(f"{song_id}: Similarity Score = {score:.3f}")

    return similarities[:top_n]

### Recommendation ###

In [44]:
import glob

In [45]:
def load_embeddings(embeddings_dir):
    embeddings_dict = {}
    embedding_files = glob.glob(os.path.join(embeddings_dir, '*_embedding.npy'))

    print(f"Loading {len(embedding_files)} embedding files...")

    for file_path in embedding_files:
        try:
            embedding = np.load(file_path)

            file_name = os.path.basename(file_path)
            track_id = file_name.replace('_embedding.npy', '')
            embeddings_dict[track_id] = embedding

        except Exception as e:
            print(f"Error loading {file_path}: {str(e)}")
            continue

    print(f"Successfully loaded {len(embeddings_dict)} embeddings")
    return embeddings_dict

In [67]:
embeddings_dict = load_embeddings(EMBEDDINGS_DIR)

Loading 30 embedding files...
Successfully loaded 30 embeddings


In [None]:
model_path = '/best_siamese_model.pt'
checkpoint = torch.load(model_path)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(device)

  checkpoint = torch.load(model_path)


SiameseNetwork(
  (frame_encoder): Sequential(
    (0): Linear(in_features=128, out_features=256, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.2, inplace=False)
    (3): Linear(in_features=256, out_features=128, bias=True)
    (4): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
  )
  (transformer): TransformerEncoder(
    (layers): ModuleList(
      (0-2): 3 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=128, out_features=128, bias=True)
        )
        (linear1): Linear(in_features=128, out_features=512, bias=True)
        (dropout): Dropout(p=0.2, inplace=False)
        (linear2): Linear(in_features=512, out_features=128, bias=True)
        (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.2, inplace=False)
        (dropout2): Dropout(p=0.2, inplace=False)
      )
    )
  

replace '...wav' with file name of song

In [None]:
similar_songs = get_similar_songs(
    "....wav",
    embeddings_dict,
    model,
    device,
    top_n=10
)


Updated similar songs to Drake - Hotline Bling_snippet.wav:
Future - WAIT FOR U (feat. Drake & Tems)_snippet.wav: Similarity Score = 0.459
Kendrick Lamar - luther_snippet.wav: Similarity Score = 0.438
Mac Miller - My Favorite Part_snippet.wav: Similarity Score = 0.415
Drake - Too Good_snippet.wav: Similarity Score = 0.389
Drake - One Dance_snippet.wav: Similarity Score = 0.378
Frank Ocean - In My Room_snippet.wav: Similarity Score = 0.308
Drake - Controlla_snippet.wav: Similarity Score = 0.308
French Montana - Unforgettable (feat. Swae Lee)_snippet.wav: Similarity Score = 0.305
Mac Miller - Wings_snippet.wav: Similarity Score = 0.294
Mac Miller - Knock Knock_snippet.wav: Similarity Score = 0.256
