# Generating Images from Audio Data

In [1]:
from __future__ import print_function
import csv
import numpy as np
import random
import librosa
import wave
import os
import matplotlib.pyplot as plt
from matplotlib import cm
import pickle

from ipywidgets import interactive
import ipywidgets as widgets

from PIL import Image
import IPython.display as displayImg

from ipywidgets import interact, widgets
import glob
import IPython.display as ipd

from torch import nn
from torch import optim
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.nn.functional as F
from torchvision import datasets, transforms, models
import torchvision
import time
from sklearn.metrics import confusion_matrix

import sys

def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

def listdir_nohidden(path):
    return glob.glob(os.path.join(path, '*'))

def GenerateSpectrums(MainFile):
    SpectrumVariables={}
    with open('SpectrumVarialbes.csv', newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            for k in row:
                SpectrumVariables[k]=int(row[k])

    x ,sample_rate_in = librosa.load(MainFile,mono=True)
    audio_data = librosa.resample(x, sample_rate_in, SpectrumVariables['SAMPLE_RATE'])
    mel_spec_power = librosa.feature.melspectrogram(audio_data, sr=SpectrumVariables['SAMPLE_RATE'],
                                                    n_fft=SpectrumVariables['N_FFT'],
                                                    hop_length=SpectrumVariables['HOP_LENGTH'],
                                                    n_mels=SpectrumVariables['N_MELS'],
                                                    power=SpectrumVariables['POWER'],
                                                   fmin=SpectrumVariables['FMIN'],
                                                    fmax=SpectrumVariables['FMAX'])
    mel_spec_db = np.float32(librosa.power_to_db(mel_spec_power, ref=np.max))
    mel_spec_db-=mel_spec_db.min()
    mel_spec_db/=mel_spec_db.max()
    im = np.uint8(cm.gist_earth(mel_spec_db)*255)[:,:,:3]
    ArrayofPictures = []
    RESOLUTION = SpectrumVariables['RESOLUTION']
    for i in range(int(np.floor(im.shape[1]/RESOLUTION))):
        startx=RESOLUTION*i
        stopx=RESOLUTION*(i+1)
        ArrayofPictures.append(im[:,startx:stopx,:])
    return ArrayofPictures

def log_mel_spec_tfm(dataInput):
    src_path=dataInput[0]
    dst_path=dataInput[1]
    #print(src_path, dst_path)
    print('Starting on',os.path.split(src_path)[1])
    pictures = GenerateSpectrums(src_path)
    print(len(pictures))
    fname = os.path.split(src_path)[-1]
    count=0
    for pic in pictures:
        plt.imsave(os.path.join(dst_path,(fname.replace(".flac",'-')\
                                          .replace(".aif",'-').replace(".wav",'-')\
                                          .replace(".m4a",'-').replace(".mp3",'-')\
                                          +str(count)+'.png')), pic)
        count+=1
    if(count==0):
        print(src_path)


try:
    Type
except NameError:
    if(len(sys.argv)>1):
        print("FoundArguments, will start converting")
        source = str(sys.argv[1])
        target = str(sys.argv[2])
        log_mel_spec_tfm((source,target))
else:
    if(Type=="INTERFACE"):
        SOURCE_DATA_ROOT='../AudioData/'

        style = {'description_width': 'initial'}

        ClassSelection = widgets.Dropdown(options=listdir_nohidden(SOURCE_DATA_ROOT), description='Source for Training Data:',style=style)
        FileSelection = widgets.Dropdown(description='Audio file to visualize',style=style)

        def updateLocation(*args):
            FileSelection.options=listdir_nohidden(os.path.join(SOURCE_DATA_ROOT,ClassSelection.value))

        ClassSelection.observe(updateLocation)

        display(ClassSelection)
        display(FileSelection)
        updateLocation();
    elif(Type=="TRAINING"):
        SPECTRUM_IMAGES_ROOT="../GeneratedData/"
        class SpectrumDataset(torch.utils.data.Dataset):
            """Face Landmarks dataset."""
            def __init__(self,ClassName,root_dir,transform=None):
                """
                Args:
                    root_dir (string): Directory with all the images.
                    transform (callable, optional): Optional transform to be applied
                        on a sample.
                """
                self.root_dir = root_dir
                self.ClassName=ClassName
                self.fileList= [f for f in os.listdir(root_dir) if f.endswith('.png')]
                print(root_dir,len(self.fileList))
                self.transform = transform
            def ReduceSize(self,ItemCount):
                self.fileList = random.choices(self.fileList, k=ItemCount)
            def __len__(self):
                return len(self.fileList)
            def __getitem__(self, idx):
                if torch.is_tensor(idx):
                    idx = idx.tolist()
                img_path = os.path.join(self.root_dir,
                                        self.fileList[idx])
                image = Image.open(img_path)
                image=image.convert('RGB')
                if self.transform:
                    image = self.transform(image)
                return image,self.ClassName
        classes = [os.path.split(c)[1] for c in listdir_nohidden(SPECTRUM_IMAGES_ROOT)]
        widgetDict={}
        print("Select classes to use for training:");
        for c in classes:
            widgetDict[c]=widgets.Checkbox(
            value=False,
            description=c,
            disabled=False,
            indent=False)
            display(widgetDict[c])

FoundArguments, will start converting
Starting on -f


FileNotFoundError: [Errno 2] No such file or directory: '../SpectrumVarialbes.csv'

In [5]:
SOURCE_DATA_ROOT='../AudioData/BeatBox' 
GENERATED_DATA_ROOT='../GeneratedData/BeatBox'

# Split the dataset - 80% of the audio files into a training folder, while 20% into testing folder.

In [8]:
ToDoList=[]
SourceFoldersLabels = [f.path for f in os.scandir(SOURCE_DATA_ROOT) if f.is_dir()]
for path in SourceFoldersLabels:
    FileList = np.array([f.path for f in os.scandir(path) if f.is_file() and (f.name.endswith(".aif") or f.name.endswith(".flac") or f.name.endswith(".wav") or f.name.endswith(".m4a") or f.name.endswith(".mp3"))])
    Label = os.path.split(path)[-1]
    OutFolderTrain = os.path.join(GENERATED_DATA_ROOT,Label,'train')
    OutFolderTest = os.path.join(GENERATED_DATA_ROOT,Label,'test')
    if not os.path.exists(OutFolderTrain):
        os.makedirs(OutFolderTrain)
    if not os.path.exists(OutFolderTest):
        os.makedirs(OutFolderTest)
    np.random.shuffle(FileList)
    trainCount =np.int(np.floor(0.8*FileList.shape[0]))
    train_set = FileList[:trainCount]
    test_set = FileList[trainCount:]
    for f in train_set:
        ToDoList.append((os.path.abspath(f),os.path.abspath(OutFolderTrain)))
    for f in test_set:
        ToDoList.append((os.path.abspath(f),os.path.abspath(OutFolderTest)))
    print("Finished class",Label,". Going to the next.")
Commands = [[sys.executable, "../helperFunctions.py",t[0],t[1]] for t in ToDoList]
print("Done Creating our ToDoList. I'll start computing now, hold on.")
tempArray=[]
for i in range(len(Commands)):
    tempArray.append(Commands[i])
    if(len(tempArray)>=12):  ## <= To optimize you can type in here how many CPU cores/threads you have 
        procs = [Popen(j) for j in tempArray ]
        for p in procs:
            p.wait()
        tempArray=[]

procs = [ Popen(j) for j in tempArray ]
for p in procs:
    p.communicate() 
print("All Done.")

FileNotFoundError: [Errno 2] No such file or directory: '../AudioData/BeatBox'

# Starting the Training Session

In [None]:
UsedClasses=[]
for k in widgetDict:
    if widgetDict[k].value==True:
        UsedClasses.append(k)
if(len(UsedClasses)<3): # kick drum, snare drum, high hat
    print("Something is wrong here. we need at least 3 classes!")

In [None]:
classes =  tuple(UsedClasses)

transform = transforms.Compose(
    [transforms.Resize(224),
    transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainSets=[]
testSets=[]

for cl in classes:   
    trainPath=os.path.join(SPECTRUM_IMAGES_ROOT,cl,'train')
    if os.path.isdir (trainPath):
        trainSets.append(SpectrumDataset(classes.index(cl),trainPath,transform))
    else:
        print('Coud not find path',trainPath);
    
    testPath=os.path.join(SPECTRUM_IMAGES_ROOT,cl,'test')
    if os.path.isdir (testPath):
        testSets.append(SpectrumDataset(classes.index(cl),testPath,transform))
    else:
        print('Coud not find path',testPath);


lowestItemCount=np.inf
classID=None

for i,train in enumerate(trainSets):
    if(lowestItemCount>len(train)):
        lowestItemCount=len(train)
        classID=i
        lowestItemCount=len(train)
for i in range(len(trainSets)):
    trainSets[i].ReduceSize(lowestItemCount)
    

TrainDataSet = torch.utils.data.ConcatDataset(trainSets)
TestDataSet = torch.utils.data.ConcatDataset(testSets)

trainloader = torch.utils.data.DataLoader(TrainDataSet, batch_size=16, shuffle=True)
testloader = torch.utils.data.DataLoader(TestDataSet, batch_size=16, shuffle=False)

In [None]:
#Getting some random training images and showing them
dataiter = iter(trainloader)
images, labels = dataiter.next()
for i in range(trainloader.batch_size):
    imshow(images[i])
    print(classes[labels[i]])

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = models.resnet18(pretrained=False)

In [None]:
# LOAD PRE-TRAINED MODEL for UrbanSound dataset

ModelData = torch.load('../Models/MainModelUrban.pth',map_location='cpu')
model.load_state_dict(ModelData['model'])
print(ModelData["classes"])

# Re-training only the last layer of Model for Beatbox sound classficiation

In [None]:
'''
For each parameter in the network we turn of training, 
by setting  .requires_grad  to `False`.

This makes sure that the computer will not try to adjust thos variables when "training".
'''
for param in model.parameters(): #
    param.requires_grad = False #...
    
# Configuring model for training
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
train_epoch_losses=[]
test_epoch_losses=[]
epoch=0
    
'''
'fc' stands for "fully connected" and it is the very last layer in the neural net.
We replace this layer with a new fully connected layer, that connects the 512 input neurons to neurons for our classes.
'''    
model.fc = nn.Linear(512, len(classes)) 

In [None]:
#Training the network on the training dataset

for i in range(20):  # loop over the dataset multiple (5) times 
    epoch+=1
    print("Starting epoch:",epoch)
    epochLoss=0.0
    t0 = time.time()
    model.train()
    for i, data in enumerate(trainloader, 0):
        #print("Running Batches",i)
        # get the inputs
        inputs, labels = data
        if device.type=='cuda':
            inputs, labels = inputs.to(device), labels.to(device)
        
        # zero the parameter gradients
        optimizer.zero_grad()
        if((i+i)%200==0):
            if(i>0):
                print('Processed images:',i*trainloader.batch_size,'. Running Timer @ {:.2f}sec.'.format(time.time()-t0))
        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        epochLoss+=loss.item()
        
    
    model.eval()
    testLoss=0
    print("About to test the performance on the test set.")
    with torch.no_grad():
        for i, data in enumerate(testloader, 0):
            # get the inputs
            inputs, labels = data
            if device.type=='cuda':
                inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            testLoss+=loss.item()
            if(i%50==0):
                if(i>0):
                    print('Tested images:',i*testloader.batch_size,'. Running Timer @ {:.2f}sec.'.format(time.time()-t0))


    train_epoch_losses.append(epochLoss/len(trainloader))
    test_epoch_losses.append(testLoss/len(testloader))
    EpochLength = time.time()-t0
    print('{} train loss: {:.3f} and test loss: {:.3f}, and it took us: {:.2f} seconds.'.format (epoch + 1, epochLoss / len(trainloader),testLoss/len(testloader),EpochLength))  # DAVID CHanged it to 1000 from 2000 not sure if thats totally done
print('Finished Training')

# Saving the Trained Model and performing analytics

In [None]:
# saving the learnd model in file that can be loaded in for inference
torch.save({
    'model':model.state_dict(),
    'classes':classes,
    'resolution':224,
    'modelType':"resnet18" # <= If you try out different models make sure to change this too
},"../models/BeatBox.pth") # <=Edit file name here 

#Displaying how the loss progresses over time.
plt.plot(train_epoch_losses, label='Training Loss',c='r')
plt.plot(test_epoch_losses, label='Test Loss',c='g')
plt.legend()
plt.show()

In [None]:
# Print predicted and acual labels for Spectragrams just to verify trained model is working correctly

dataiter = iter(testloader)
model.eval()
for j in range (2):
    images, labels = dataiter.next()
    if device == 'cuda':
        images, labels = images.to(device), labels.to(device)
    outputs = model(images)
    _, predicted = torch.max(outputs, 1)

    for i in range(len(images)):
        imshow(images[i])
        print('GroundTruth: ',classes[labels[i]])
        print('Predicted: ',  classes[predicted[i]])

In [None]:
# Network analytics

class_correct = list(0. for i in range(len(classes)))
class_total = list(0. for i in range(len(classes)))
model.eval()
allLabels=[]
allPrediction=[]
with torch.no_grad():
    for data in testloader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        c = (predicted == labels).squeeze()
        if (c.dim()==0):
            continue
        for i in range(testloader.batch_size):
            if(len(labels)<=i):
                continue;
            label = labels[i]
            allLabels.append(labels[i].to('cpu').numpy())
            allPrediction.append(predicted[i].to('cpu').numpy())
            #print (c.shape)
            if(testloader.batch_size>1):

                class_correct[label] += c[i].item()
            else:
                class_correct[label] += c.item()
            class_total[label] += 1

print(confusion_matrix(allLabels, allPrediction))
for i in range(len(classes)):
    print('Accuracy of %5s : %2d %%' % (
        classes[i], 100 * class_correct[i] / class_total[i]))