<a href="https://colab.research.google.com/github/petros94/ATIA-eduCopter/blob/master/Thesis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!cp drive/MyDrive/collab/thesis/MFCCs.zip MFCCs.zip
!mkdir mfccs 
!unzip MFCCs.zip -d mfccs

In [None]:
## Create Neural Net

import torch
import torchvision
from torch import nn

class SiameseNet(nn.Module):
  def __init__(self, frame_size) -> None:
    super(SiameseNet, self).__init__()
    # get resnet model
    self.resnet = torchvision.models.resnet18(pretrained=False)
    # over-write the first conv layer to be able to read MNIST images
    # as resnet18 reads (3,x,x) where 3 is RGB channels
    # whereas MNIST has (1,x,x) where 1 is a gray-scale channel
    self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    self.fc_in_features = self.resnet.fc.in_features
    self.frame_size=frame_size
    
    # remove the last layer of resnet18 (linear layer which is before avgpool layer)
    self.resnet = torch.nn.Sequential(*(list(self.resnet.children())[:-1]))

    # add linear layers to compare between the features of the two images
    self.fc = nn.Sequential(
        nn.Linear(self.fc_in_features, 256),
        nn.ReLU(inplace=True),
        nn.Linear(256, 256),
    )

    # initialize the weights
    self.resnet.apply(self.init_weights)
    self.fc.apply(self.init_weights)
        
  def init_weights(self, m):
      if isinstance(m, nn.Linear):
          torch.nn.init.xavier_uniform(m.weight)
          m.bias.data.fill_(0.01)

  def forward_once(self, x):
      output = self.resnet(x)
      output = output.view(output.size()[0], -1)
      return output

  def forward(self, x):
      output1 = self.forward_once(x)
      return output1

# Define dataset
from torch.nn import functional as F

class MFCCDataset(torch.utils.data.Dataset):
  def __init__(self, songs, samples_per_song=10, frame_size=400):
    self.triplets = generate_triplets(songs, samples_per_song)
    self.songs = songs
    self.samples_per_song = samples_per_song
    self.frame_size = frame_size

  def __getitem__(self, idx):
    triplet = self.triplets[idx]
    return triplet
  
  def __len__(self):
    return len(self.triplets)

In [None]:
#
# Generic Utils
#

import random
import torch
import os 
import numpy as np
import scipy.io

def load_songs():
  origin_path = 'mfccs/'
  entries = os.listdir(origin_path)

  songs = {}

  for dir in entries:
    subdir = os.listdir(origin_path + dir)
    songs[dir] = []
    for song in subdir:
      song_id = dir
      cover_id = song
      mat = scipy.io.loadmat(origin_path + dir + '/' + song)
      mfcc = mat['XMFCC']
      mfcc = np.array(mfcc)
      mfcc = (mfcc - np.mean(mfcc)) / np.std(mfcc)
      songs[dir].append({'song_id': song_id, 'cover_id': cover_id, 'mfcc': mfcc})
  return songs

def retrieve_mfcc(songs, song_id, cover_id):
  for s in songs[song_id]:
    if s['cover_id'] == cover_id:
      return s['mfcc']

def generate_triplets(songs, samples_per_song=10):
  triplets = []
  for k in songs:
    for anchor in songs[k]:
      song_id, cover_id = anchor['song_id'], anchor['cover_id']
      for n in range(samples_per_song):
        #pick random positive song
        while True:
            pos_song = random.choice(songs[k])
            if pos_song['cover_id'] != cover_id:
              break

        #pick random negative song
        while True:
            neg_songs = random.choice(list(songs.values()))
            neg_song = random.choice(neg_songs)
            if neg_song['song_id'] != song_id:
              break

        #add to triplets
        triplets.append({
            'anchor': {
                'song_id': anchor['song_id'], 
                'cover_id': anchor['cover_id']
                }, 
            'pos_song': {
                'song_id': pos_song['song_id'],
                'cover_id': pos_song['cover_id']
             },
             'neg_song': {
                'song_id': neg_song['song_id'],
                'cover_id': neg_song['cover_id']
             }})
  return triplets

def generate_segments(song: np.array, step=400, overlap=0.5):
  return [song[:, i:step+i] for i in np.arange(0, song.shape[1]-step, int(step*overlap))]

# def triplet_2_segments(songs, triplet, frame_size):
#   segs = []
#   for t in ('anchor', 'pos_song', 'neg_song'):
#     song_id = triplet[t]['song_id']
#     cover_id = triplet[t]['cover_id']
#     mfcc = retrieve_mfcc(songs, song_id, cover_id)
#     frames = generate_segments(mfcc, step=frame_size)
#     segs.append(frames)

#   # Find minimum length
#   min_len = min(list(map(lambda i: len(i), segs)))

#   # Crop to minimum length
#   segs = [seg[:min_len-1] for seg in segs]

#   # zip samples
#   ret_value = []
#   (anchor, pos, neg) = segs
#   for (a, p, n) in zip(anchor, pos, neg):
#     ret_value.append((a, p, n))   

#   ret_value = torch.Tensor(np.array(ret_value))
#   return ret_value.unsqueeze(2).to(device)


def mfcc_triplet_2_segments(triplet, frame_size):
  segs = []
  for mfcc in triplet:
    frames = generate_segments(mfcc, step=frame_size)
    segs.append(frames)

  # Find minimum length
  min_len = min(list(map(lambda i: len(i), segs)))

  # Crop to minimum length
  segs = [torch.stack(seg[:min_len-1]) for seg in segs]

  # zip samples
  (anchor, pos, neg) = segs
  ret = torch.stack((anchor, pos, neg)).transpose(0,1).unsqueeze(2).to(device)
  return ret

def extract_frame(triplets, frame_size, triplet_idx, song_idx, frame_idx):
  s = list(triplets[triplet_idx].values())[song_idx]
  song_id = s['song_id']
  cover_id = s['cover_id']
  
  mfcc = retrieve_mfcc(songs, song_id, cover_id)
  segs = generate_segments(mfcc, step=frame_size)
  frame = segs[frame_idx]
  return frame

#
# Prediction Utils
#

def embed(model, frames):
  model.eval()
  with torch.no_grad():
    return model(frames)

def distance(model, frame1, frame2):
  e1 = embed(model, frame1)
  e2 = embed(model, frame2)
  return torch.norm(e1-e2)      

def triplet_frame_distance(model, frame1, frame2, frame3):
  e1 = embed(model, frame1)
  e2 = embed(model, frame2)
  e3 = embed(model, frame3)
  return torch.norm(e1-e2), torch.norm(e1-e3), torch.norm(e2-e3)

def triplet_song_distance(model, song1, song2, song3):
  e1 = embed(model, song1)
  e2 = embed(model, song2)
  e3 = embed(model, song3)
  return torch.norm(e1-e2), torch.norm(e1-e3), torch.norm(e2-e3)

def extract_and_distance(model, triplets, triplet_idx, frame_idx):
  f1 = extract_frame(triplets, model.frame_size, triplet_idx, 0, frame_idx)
  f2 = extract_frame(triplets, model.frame_size, triplet_idx, 1, frame_idx)
  f3 = extract_frame(triplets, model.frame_size, triplet_idx, 2, frame_idx)
  
  f1 = torch.Tensor(np.array(f1)).reshape(1,1,20,-1).to(device)
  f2 = torch.Tensor(np.array(f2)).reshape(1,1,20,-1).to(device)
  f3 = torch.Tensor(np.array(f3)).reshape(1,1,20,-1).to(device)

  return triplet_frame_distance(model, f1, f2, f3)  


#
# Visualization utils
#

import plotly.express as px
import time

def visualize_frame(triplets, frame_size, triplet_idx, song_idx, frame_idx):
  s = list(triplets[triplet_idx].values())[song_idx]
  song_id = s['song_id']
  cover_id = s['cover_id']
  
  mfcc = retrieve_mfcc(songs, song_id, cover_id)
  segs = generate_segments(mfcc, step=frame_size)

  frame = segs[frame_idx]

  overlap = 0.5*frame_size
  time_start = frame_idx*overlap*512/22050
  time_end = (frame_idx*overlap + frame_size)*512/22050
  
  duration = time.strftime('%H:%M:%S', time.gmtime(time_end-time_start))
  time_start = time.strftime('%H:%M:%S', time.gmtime(time_start))
  time_end = time.strftime('%H:%M:%S', time.gmtime(time_end))

  print(f"Song id: {s['cover_id']}, timeframe: {time_start} - {time_end}, duration: {duration}")
  f = px.imshow(frame, aspect='auto', range_color=[-3, 3], width=400, height=400)
  f.show()

import matplotlib.pyplot as plt 

def visualize_frames(triplets, frame_size, triplet_idx, frame_idx):
  visualize_frame(triplets, frame_size, triplet_idx, 0, frame_idx)
  visualize_frame(triplets, frame_size, triplet_idx, 1, frame_idx)
  visualize_frame(triplets, frame_size, triplet_idx, 2, frame_idx)




In [None]:
def train(model, train_set, valid_set, n_epochs, batch_size):

  def collate_fn(batch):
    x = []
    for triplet in batch:
      mfccs = [torch.from_numpy(retrieve_mfcc(songs, v['song_id'], v['cover_id'])) for v in list(triplet.values())]
      x.append(mfcc_triplet_2_segments(mfccs, train_set.frame_size))
    return torch.cat(x).transpose(0,1).to(device)

  model.train()
  model.to(device)

  optimizer = torch.optim.Adam(model.parameters(), lr=5e-4)
  criterion = torch.nn.TripletMarginLoss()

  train_dataloader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
  valid_dataloader = torch.utils.data.DataLoader(valid_set, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)

  train_loss = 0
  train_batches = int(len(train_set)/batch_size)
  valid_batches = int(len(valid_set)/batch_size)
  frame_size = train_set.frame_size
  for epoch in range(n_epochs):
    print(32*"=")
    print(f"Epoch {epoch}")
    epoch_loss = 0
    model.train()

    for batch, x in enumerate(train_dataloader):
    
      optimizer.zero_grad()
      (anchor, pos, neg) = x 

      anchor.to(device)
      pos.to(device)
      neg.to(device)

      anchor_out = model(anchor)
      pos_out = model(pos)
      neg_out = model(neg)

      loss = criterion(anchor_out, pos_out, neg_out)

      loss.backward()
      optimizer.step()

      epoch_loss += loss.item()

      if batch%16==0:
        print(f'batch {batch}/{train_batches}, loss: {loss.item()}')

    if epoch%1==0:
      print('Evaluating model')
      model.eval()
      valid_loss=0
      with torch.no_grad():
        for batch, x in enumerate(valid_dataloader):     
       
          (anchor, pos, neg) = x 

          anchor.to(device)
          pos.to(device)
          neg.to(device)

          anchor_out = model(anchor)
          pos_out = model(pos)
          neg_out = model(neg)

          loss = criterion(anchor_out, pos_out, neg_out)
          valid_loss += loss.item()
        print(f"Epoch {epoch} valid loss: {valid_loss/valid_batches}")

    print(f"Epoch {epoch} train loss: {epoch_loss/train_batches}")


def embed(model, frames):
  model.eval()
  with torch.no_grad():
    return model(frames)

def distance(model, frame1, frame2):
  e1 = embed(model, frame1)
  e2 = embed(model, frame2)
  return torch.norm(e1-e2)      

def triplet_frame_distance(model, frame1, frame2, frame3):
  e1 = embed(model, frame1)
  e2 = embed(model, frame2)
  e3 = embed(model, frame3)
  return torch.norm(e1-e2), torch.norm(e1-e3), torch.norm(e2-e3)

def triplet_song_distance(model, song1, song2, song3):
  e1 = embed(model, song1)
  e2 = embed(model, song2)
  e3 = embed(model, song3)
  return torch.norm(e1-e2), torch.norm(e1-e3), torch.norm(e2-e3)

def extract_and_distance(model, triplets, triplet_idx, frame_idx):
  f1 = extract_frame(triplets, model.frame_size, triplet_idx, 0, frame_idx)
  f2 = extract_frame(triplets, model.frame_size, triplet_idx, 1, frame_idx)
  f3 = extract_frame(triplets, model.frame_size, triplet_idx, 2, frame_idx)
  
  f1 = torch.Tensor(np.array(f1)).reshape(1,1,20,-1).to(device)
  f2 = torch.Tensor(np.array(f2)).reshape(1,1,20,-1).to(device)
  f3 = torch.Tensor(np.array(f3)).reshape(1,1,20,-1).to(device)

  return triplet_distance(model, f1, f2, f3)  

In [None]:
device = 'cuda'

songs = load_songs()
train_perc = 0.8
valid_songs = dict(list(songs.items())[int(len(songs)*0.8):])
train_songs = dict(list(songs.items())[:int(len(songs)*0.8)])

samples_per_song=6
frame_size=1600

valid_dataset = MFCCDataset(valid_songs, samples_per_song, frame_size)
train_dataset = MFCCDataset(train_songs, samples_per_song, frame_size)

print(f"train set size: {len(train_dataset)}")

train set size: 4770


In [None]:
model = SiameseNet(frame_size=frame_size)
train(model, train_dataset, valid_dataset, 5, 4)


In [None]:

f1 = extract_frame(dataset.triplets, model.frame_size, triplet_idx, 0, 0)
f2 = extract_frame(dataset.triplets, model.frame_size, triplet_idx, 1, 1)
f3 = extract_frame(dataset.triplets, model.frame_size, triplet_idx, 2, 0)
  
f1 = torch.Tensor(np.array(f1)).reshape(1,1,20,-1).to(device)
f2 = torch.Tensor(np.array(f2)).reshape(1,1,20,-1).to(device)
f3 = torch.Tensor(np.array(f3)).reshape(1,1,20,-1).to(device)

triplet_distance(model, f1, f2, f3)

(tensor(1.8102, device='cuda:0'),
 tensor(3.1204, device='cuda:0'),
 tensor(2.6324, device='cuda:0'))

In [None]:
triplet_idx = 5
frame_idx = 0
dataset = valid_dataset

print("Distances:", extract_and_distance(model, dataset.triplets, triplet_idx, frame_idx))
visualize_frames(dataset.triplets, model.frame_size, triplet_idx ,frame_idx)

Distances: (tensor(2.5757, device='cuda:0'), tensor(3.1204, device='cuda:0'), tensor(1.2286, device='cuda:0'))
Song id: 291617_MFCC.mat, timeframe: 00:00:00 - 00:00:37, duration: 00:00:37


Song id: 458810_MFCC.mat, timeframe: 00:00:00 - 00:00:37, duration: 00:00:37


Song id: 416505_MFCC.mat, timeframe: 00:00:00 - 00:00:37, duration: 00:00:37
