In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy import ndimage
import multiprocessing as mp
import cv2
from tqdm import tqdm_notebook
from tqdm import tnrange
import torch.optim as optim
from torch.autograd import Variable
import torchvision.transforms as transforms
import os
from PIL import Image
import torch
import torch.nn 
import torchvision
import torch.nn.functional as F

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**LOADING THE DATA**

In [None]:
file="/content/drive/MyDrive/spectrograms"
files_in_directory = os.listdir(file) #EDIT THIS FOR ANOTHER DIRECTORY
examples = [file for file in files_in_directory]

print(examples)
XXtrain=[]
YYtrain=[]

for e in range(len(examples)-3):
  path=file+"/"+"class_"+str(e)
  f=os.listdir(path)
  for foo in f:
    jk=np.array(Image.open(path+"/"+foo))
    XXtrain.append(jk)
    YYtrain.append(e)
lxtrain=np.array(XXtrain)
lytrain=np.array(YYtrain)

In [None]:
file="/content/drive/MyDrive/spectrograms/val"
files_in_directory = os.listdir(file)#EDIT THIS FOR ANOTHER DIRECTORY
examples = [file for file in files_in_directory]
XXtest=[]
YYtest=[]
for e in range(len(examples)):
  path=file+"/"+"class_"+str(e+6)
  f=os.listdir(path)
  for foo in f:
    jk=np.array(Image.open(path+"/"+foo))
    XXtest.append(jk)
    YYtest.append(e+6)
lxtest=np.array(XXtest)
lytest=np.array(YYtest)

In [None]:
np.save('/content/drive/MyDrive/lxtrain', lxtrain)
np.save('/content/drive/MyDrive/lytrain', lytrain)
np.save('/content/drive/MyDrive/lxtest', lxtest)
np.save('/content/drive/MyDrive/lytest', lytest)

In [None]:
lxtrain.shape, lytrain.shape, lxtest.shape, lytest.shape

**PROTOTYPICAL NETWORKS**

In [None]:
def euclidean_dist(x, y):
  a = x.size(0)
  b = y.size(0)
  c = x.size(1)
  x = x.unsqueeze(1).expand(a, b, c)
  y = y.unsqueeze(0).expand(a, b, c)

  return torch.pow(x - y, 2).sum(2)
  #for finding the euclidean distance between the prototype and the query

In [None]:
class Flattening(torch.nn.Module):
  def __init__(self):
    super(Flattening, self).__init__()

  def forward(self, x):
    return x.view(x.size(0), -1)

In [None]:
def Protonet_conv2d(xshape, hiddenshape, yshape):
#protonet emdeddings and the main model
  embed = torch.nn.Sequential(
    torch.nn.Conv2d(xshape[0], hiddenshape, 3, padding=1),
    torch.nn.BatchNorm2d(hiddenshape),
    torch.nn.ReLU(),
    torch.nn.MaxPool2d(3),
    torch.nn.Conv2d(hiddenshape, hiddenshape, 3, padding=1),
    torch.nn.BatchNorm2d(hiddenshape),
    torch.nn.ReLU(),
    torch.nn.MaxPool2d(3),
    torch.nn.Conv2d(hiddenshape, hiddenshape, 3, padding=1),
    torch.nn.BatchNorm2d(hiddenshape),
    torch.nn.ReLU(),
    torch.nn.MaxPool2d(3),
    torch.nn.Conv2d(hiddenshape, yshape, 3, padding=1),
    torch.nn.BatchNorm2d(yshape),
    torch.nn.ReLU(),
    torch.nn.MaxPool2d(3),
    Flattening()
    )
    
  return load_samples(embed)

In [None]:
class load_samples(torch.nn.Module):
  def __init__(self, embed):
    
    super(load_samples, self).__init__()
    self.embed = embed.cuda()
#for predicting loss, accuracy and prediction  
  def loss_and_pred(self, spectograms):
    specs = spectograms['specs'].cuda()
    n_way = spectograms['n_way']
    n_support = spectograms['support']
    n_query = spectograms['query']
    support_data = specs[:, :support]
    labels = torch.arange(0, n_way).view(n_way, 1, 1).expand(n_way, query, 1).long()
    query_data = specs[:, support:]
    labels = Variable(labels, requires_grad=False)
    # print(labels)
    labels = labels.cuda()
    x = torch.cat([support_data.contiguous().view(n_way * support, *support_data.size()[2:]),query_data.contiguous().view(n_way * n_query, *query_data.size()[2:])], 0)
    y = self.embed.forward(x)
    # print(y.size)
    yshape = y.size(-1) 
    query_y = y[n_way*support:]
    prototype = y[:n_way*support].view(n_way, support, yshape).mean(1)
    # print("prototype")
    # print(prototype)
    distance = euclidean_dist(query_y, prototype)
    #finding the probability and the distance between prototype and the query
    probab = F.log_softmax(-distance,dim=1).view(n_way, query, -1)
    print(probab)
    lossprobab = -probab.gather(2, labels).squeeze().view(-1).mean()
    m, yk = probab.max(2)
    accuracy = torch.eq(yk, labels.squeeze()).float().mean()
    print(accuracy)
    return lossprobab, {'loss': lossprobab.item(),'acc': accuracy.item(),'yk': yk}

**EPISODIC TRAINING**

In [None]:
def train(model, optimizer, train_x, train_y, n_way, support, query, max_epoch, epoch_size):
  
  #divide the learning rate by 2 at each epoch, as suggested in paper
  scheduler = optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.5, last_epoch=-1)
  epoch = 0 #epochs done so far
  #here n_way =5 and training is done in episodic manner
  while epoch < max_epoch:
    loss_total = 0.0
    accuracy = 0.0

    for episode in tnrange(epoch_size, desc="Epoch {:d} train".format(epoch+1)):
      batch = []
      K = np.random.choice(np.unique(train_y), n_way)
      # print(K)
      for cls in K:
        # print(cls)
        datax_cls = train_x[train_y == cls]
        perm = np.random.permutation(datax_cls)
        batch_cls = perm[:(support+query)]
        # print(batch_cls)
        batch.append(batch_cls)
      batch = np.array(batch)
      batch = torch.from_numpy(batch).float()
      batch = batch.permute(0,1,4,2,3)
      subset = {'specs': batch,'n_way': n_way,'support': support,'query': query}
      optimizer.zero_grad()
      loss, output = model.loss_and_pred(subset)
      # print(loss)
      loss_total += output['loss']
      accuracy += output['acc']
      # print(accuracy)
      loss.backward()
      optimizer.step()
    epoch_loss = loss_total / epoch_size
    # print(epoch_size)
    epoch_acc = accuracy / epoch_size
    print('Epoch {:d} -- Loss: {:.4f} Acc: {:.4f}'.format(epoch+1,epoch_loss, epoch_acc))
    epoch += 1
    # print(epoch)
    scheduler.step()
    

In [None]:
#for loading the dataset from the drive
lxtrain = np.load('/content/drive/MyDrive/urbansound8k/lxtrain.npy')
lytrain = np.load('/content/drive/MyDrive/urbansound8k/lytrain.npy')
lxtest = np.load('/content/drive/MyDrive/urbansound8k/lxtest.npy')
lytest = np.load('/content/drive/MyDrive/urbansound8k/lytest.npy')

In [None]:
model = Protonet_conv2d(xshape=(4, 120, 128),hiddenshape=128,yshape=128)
optimizer = optim.Adam(model.parameters(), lr = 0.003)
#adam optimizer

In [None]:
filename="/content/drive/MyDrive/urbansound8k/bestmodel.pt"
n_way = 5 # number of classes taken at a time
support = 5 # number of examples in support set
query = 5 # number of examples in query set
max_epoch = 10
epoch_size = 2000
train(model, optimizer, lxtrain, lytrain, n_way, support, query, max_epoch, epoch_size)
torch.cuda.empty_cache()
torch.cuda.memory_summary(device=None, abbreviated=False)
#for saving the best model to be used in future
torch.save(model,filename)

In [None]:
model = torch.load('/content/drive/MyDrive/urbansound8k/bestmodel.pt')
#loading the saved model use only if the model is saved else dont use this command

**TESTING AND EVALUATION**

In [None]:
##TESTTINGGG
def test(model, test_x, test_y, n_way, n_support, n_query, test_episode):
  accuracy = 0.0
  loss_total = 0.0
  for episode in tnrange(test_episode):
    batch = []
    K = np.random.choice(np.unique(test_y), n_way)
    # print(K)
    for cls in K:
      # print(cls)
      datax_cls = test_x[test_y == cls]
      perm = np.random.permutation(datax_cls)
      batch_cls = perm[:(support+query)]
      batch.append(batch_cls)
      # episodic training in progress and in batchwise
    batch = np.array(batch)
    # print(batch)
    batch = torch.from_numpy(batch).float()
    batch = batch.permute(0,1,4,2,3)
    subset = {'specs': batch,'n_way': n_way,'support': n_support,'query': n_query}
    loss, output = model.loss_and_pred(subset)
    # print(output)
    loss_total += output['loss']
    accuracy += output['acc']
  avg_loss = loss_total / test_episode
  avg_acc = accuracy / test_episode
  print('Test results -- Loss: {:.4f} Acc: {:.4f}'.format(avg_loss, avg_acc))

EVALUATION METRICS: The average accuracy calculated over the test episodes

In [None]:
n_way = 5 # number of class selection at random
support = 5
query = 10 
test_x = lxtest
test_y = lytest
test_episode = 2000
test(model, test_x, test_y, n_way, support, query, test_episode)
#this command calculated the prediction and weigted accuracy across each episodes and averages them same goes for the loss we can get the evaluation and accuracy either from this or generated a predicted csv file which is later taken as input by the evaluation file to calculate the accuracya and the F1 Score.

 **CODE ENDS**