In [2]:
import os, warnings
import numpy as np
import matplotlib.pyplot as plt
warnings.filterwarnings("ignore")
import torch
import torch.nn as nn
from torch.utils.data import DataLoader,random_split,Dataset
import torchaudio
from torchaudio import transforms
from torch import Tensor
from sklearn import preprocessing
from tqdm import tqdm
import torch.optim as optim
import torch.nn.functional as F

In [9]:
segmendDataLength = 1200
path = "../input/dataset/splited"

In [5]:
dirlbl1 = []
segmentData = []
for root, dirs, files in os.walk(path):
    if files:
        for pt in files:
            data = torchaudio.load(os.path.join(str(root),str(pt)))
            for k in range(0,len(data[0][0]),segmendDataLength):
                if len(data[0][0][k:k+segmendDataLength]) == segmendDataLength:
                    dat = (data[0][0][k:k+segmendDataLength]).view(1, segmendDataLength)
                    segmentData.append(dat)
                    dirlbl1.append(os.path.basename(os.path.dirname(os.path.join(str(root),str(pt)))))
le = preprocessing.LabelEncoder()
encodedLable1 = torch.tensor(le.fit_transform(dirlbl1))

In [6]:
segmentData[0].shape

In [7]:
import IPython.display as pl
pl.Audio(segmentData[333][0],rate=48000)

In [8]:
class dataSet1():
    
    def __init__(self, segmentData, dirlbl1):
        self.segmentData = segmentData
        self.lable1 = dirlbl1
        
    def __getitem__(self, index):
        mel_specgram = transforms.MelSpectrogram(48000, hop_length=10)(self.segmentData[index][0]).view(1, 128,121)
        return mel_specgram, self.lable1[index]
    
    def __len__(self):
        return len(self.segmentData)

In [9]:
train_loader1 = torch.utils.data.DataLoader(dataSet1(segmentData,encodedLable1), batch_size = 32, shuffle = True,)

In [10]:
splitedLengh1 = int(len(segmentData)*0.8)
splitedLengh2 = len(segmentData) - splitedLengh1

In [14]:
train_ds1, valid_ds1 = torch.utils.data.random_split(train_loader1.dataset, (splitedLengh1, splitedLengh2))

In [4]:
class sttModel(nn.Module):
    def __init__(self, num_class):
        super(sttModel,self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=1,out_channels=8,kernel_size=3,stride=1)
        self.dropout1 = nn.Dropout(0.3) 
    
        self.conv2 = nn.Conv2d(in_channels=8,out_channels=16,kernel_size=3,stride=1)
        self.dropout2 = nn.Dropout(0.3)
        
        #self.conv3 = nn.Conv2d(in_channels=16,out_channels=32,kernel_size=3,stride=1)
        #self.dropout3 = nn.Dropout(0.3)
        
        #self.conv4 = nn.Conv2d(in_channels=32,out_channels=64,kernel_size=3,stride=1)
        #self.dropout4 = nn.Dropout(0.3)
        
        self.fc1 = nn.Linear(16*12*13, 256)
        self.dropout5 = nn.Dropout(0.3)
        self.fc2 = nn.Linear(256,128)
        self.dropout6 = nn.Dropout(0.3)
        self.fc3 = nn.Linear(128, num_class)
        
    def forward(self, x):
        
        x = F.max_pool2d(F.relu(self.conv1(x)),kernel_size=3)
        x = self.dropout1(x)
        x = F.max_pool2d(F.relu(self.conv2(x)),kernel_size=3)
        x = self.dropout2(x)
        #x = F.max_pool2d(F.relu(self.conv3(x)),kernel_size=3)
        #x = self.dropout3(x)
        
        #x = F.max_pool2d(F.relu(self.conv4(x)),kernel_size=3)
        #x = self.dropout4(x)
        
        #print(x.shape)
        x = F.relu(self.fc1(x.reshape(-1,x.shape[1] * x.shape[2]*x.shape[3])))
        x = self.dropout5(x)
        
        x = F.relu(self.fc2(x))
        x = self.dropout6(x)
        
        x = self.fc3(x)
        
        #print(x.shape)
        return x 

In [12]:
train_audio_transforms = nn.Sequential(torchaudio.transforms.MelSpectrogram())
net = sttModel(num_class=5)


In [18]:
trainloader = torch.utils.data.DataLoader(train_ds1, batch_size=32, shuffle=True)

testloader = torch.utils.data.DataLoader(valid_ds1, batch_size=32, shuffle=True)

In [13]:
best_acc=0

def train(net,trainloader,optim,scheduler,criterion,epoch,device):
    print("Training")
    net.train()
    train_loss = 0
    total = 0
    total_correct = 0
    
    iterator = tqdm(trainloader)
    
    for inputs,targets in iterator:
        
        inputs,targets = inputs.to(device), targets.to(device)
        
        optim.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs,targets)
        loss.backward()
        optim.step()
        #scheduler.step()
        
        train_loss += loss.item()
        _,predicted = torch.max(outputs.data,1)
        total_correct += (predicted == targets).sum().item()
        total += targets.size(0)
    
    print("Epoch: [{}]  loss: [{:.2f}] Accuracy [{:.2f}] ".format(epoch+1,train_loss/len(trainloader),
                                                                           total_correct*100/total))
    
def test(net,testloader,optim,criterion,epoch,device,results_txt,model_name):
    global best_acc
    print("validation")
    net.eval()
    test_loss,total,total_correct = 0,0,0
    
    iterator = tqdm(testloader)
    
    for inputs, targets in iterator:
        inputs, targets = inputs.to(device), targets.to(device)
        outputs = net(inputs)
        loss = criterion(outputs, targets)

        test_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        total_correct += (predicted == targets).sum().item()

    # Save checkpoint when best model
    acc = 100. * total_correct / total
    print("\nValidation Epoch #%d\t\t\tLoss: %.4f Acc@1: %.2f%%" %(epoch+1, test_loss/len(testloader), acc))

    f = open(results_txt+".txt","a+")
    f.write("Validation Epoch #%d\t\t\tLoss: %.4f Acc@1: %.2f%% \n" %(epoch+1, test_loss/len(testloader), acc))
    f.close() 
        
    
    if acc > best_acc:
        if isinstance(net, torch.nn.DataParallel):
            print("multiple GPU")
            print('Saving Best model...\t\t\tTop1 = %.2f%%' %(acc))
            state = {
                'model':net.module.state_dict(),
                'model1': net.state_dict(),
                'model2': net,
                'acc':acc,
                'epoch':epoch,
            }
        
        else:
            print("not multiple GPU")
            state = {
                    'model':net,
                    'acc':acc,
                    'epoch':epoch,
                    }      
            
        if not os.path.isdir('checkpoint'):
            os.mkdir('checkpoint')
        save_point = './checkpoint/'
        if not os.path.isdir(save_point):
            os.mkdir(save_point)
        torch.save(state, save_point+model_name+'.t7')
        best_acc = acc
        
    return best_acc

In [14]:
num_epochs=100
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

net = net.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(),lr=0.001)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                              steps_per_epoch=20,
                                              epochs=num_epochs,
                                              anneal_strategy='linear') 


In [21]:
for epoch in range(0, num_epochs):
    
    train(net,trainloader,optimizer,scheduler,criterion,epoch,device)
    best_acc = test(net,testloader,optimizer,criterion,epoch,device,"result","model")

In [None]:
torch.save(net,"model.pth")

In [7]:
net = torch.load("model.pth",map_location=torch.device('cpu'))
testPth = "C:\\Users\\k40\\Desktop\\thesis\\AudioData\\test data\\abebe.wav"

In [10]:
TestSegmentData = []

data = torchaudio.load(testPth)
for k in range(0,len(data[0][0]),segmendDataLength):
    if len(data[0][0][k:k+segmendDataLength]) == segmendDataLength:
        dat = (data[0][0][k:k+segmendDataLength]).view(1, segmendDataLength)
        TestSegmentData.append(dat)

In [11]:
testedData = []
for i in TestSegmentData:
    mel_specgram = transforms.MelSpectrogram(48000, hop_length=10)(i[0]).view(1,1, 128,121)
    testedData.append(mel_specgram)

In [16]:
out = []
lables = []
for inputs in testedData:
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    inputs = inputs.to(device)
    #lables.append(targets)
    outputs = net(inputs)
    _, predicted = torch.max(outputs.data, 1)
    out.append(predicted[0].tolist())

In [17]:
from collections import Counter
key = list(Counter(out).keys())
uniqueValue = list(Counter(out).values())

In [19]:
key

[0, 1, 3, 2]

In [20]:
uniqueValue

[19, 55, 21, 11]