In [58]:
import torch
import torchaudio
import torchaudio.functional as F
import torchaudio.transforms as T
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import os
import numpy as np
import torch.nn as nn
import pandas as pd
import librosa
import librosa.display
from torch.utils.data import sampler

print(torch.__version__)
print(torchaudio.__version__)

1.11.0
0.11.0


In [59]:
class DAPSDatasetHelper():
    #Get the dataset dictionary 
    def get_file_descriptors(self,dirpath):
        directory={}
        dataset_path=self.dir
        cwd= os.getcwd()
        for i , (dirpath, dirname, filename) in enumerate(os.walk(dataset_path)):
            if(dirpath!=dataset_path):
                dirname=dirpath.split("/")[-1]
                files={}
                file_list=[]
                index=0
                for file in filename:
                    filepath = os.path.join( dirpath, file)
                    if ( (filepath.endswith('.wav'))):
                        if(file.startswith('.')):
                            pass
                        else:
                            file_list.append(filepath)
                file_list.sort()
                if(len(file_list)>0):
                    for filepath in file_list:
                        files[index]=filepath
                        index+=1
                    directory[dirname]=files
        return directory

    #initialization 
    def __init__(self):
        self.sample_rate=8000
        self.dir= "../dataset_daps/daps"
        self.dataset_dict=self.get_file_descriptors(self.dir)

        #stft config
        #frame size in ms
        self.framesize=25
        self.fft_len=self.sample_rate*self.framesize//1000
        self.window_size=self.fft_len
        self.hop_len=self.fft_len//2

        indx=2
        self.keys={}
        for key  in self.dataset_dict.keys():
            if(key=="produced"):
                self.keys[1]=key
            else:
                self.keys[indx]=key
                indx+=1

        self.num_files_per_category=len(self.dataset_dict["produced"].keys())

    #get the indexed file and sample rate
    def get_indxd_file(self,indx,isLabel=False):
        if(isLabel):
            category=self.keys[1]
        else:
            category=self.keys[np.random.randint(2,len(self.keys))]
        data,sr= librosa.load(self.dataset_dict[category][indx])
        Id= self.dataset_dict[category][indx].split("/")[-1].split('.')[0]
        return (data,sr, Id)

    def resample_audio(self,file,sr):
        out = librosa.resample(file, orig_sr=sr, target_sr=self.sample_rate)
        return out

    #get the train data and label at given index 
    def get_data(self,indx):
        data,sr,Id_data = self.get_indxd_file(indx)
        label,sr,Id_label = self.get_indxd_file(indx,True)
        if(sr == self.sample_rate ):
            pass
        else:
            data= self.resample_audio(data,sr)
            label= self.resample_audio(label,sr)

        return (data,label,Id_data,Id_label)


    #get stft frames with 50% overlap
    def getFeatures(self,file):

        n_fft = self.fft_len
        win_length = self.window_size
        hop_length = self.hop_len

        # define transformation
        spectrogram = T.Spectrogram(
            n_fft=n_fft,
            win_length=win_length,
            hop_length=hop_length,
            center=True,
            pad_mode="reflect",
            power=2.0,
        )
        # Perform transformation
        waveform=torch.from_numpy(file)
        spec = spectrogram(waveform)

        return spec

In [60]:
class DAPS(Dataset):
    def __init__(self):
        #super().__init__(self)
        self.daps= DAPSDatasetHelper()

    def __getitem__(self, index):
        data,label,_,_=self.daps.get_data(index)
        data_spec=self.daps.getFeatures(data)
        label_spec=self.daps.getFeatures(label)
        return (data_spec,label_spec)

    def __len__(self):
        return (len(self.daps.keys)-1)*self.daps.num_files_per_category

In [61]:
#Create Model classes
# create a class for linear layers 
class DenseLayer(nn.Module):
    def __init__(self,input_size,output_size,dropOut_p):
        super().__init_(self)
        self.dense=nn.Linear(input_size,output_size,bias=True)
        self.activation=nn.Tanh()
        self.dropOut=nn.Dropout(p=dropOut_p,inplace=False)
    
    def forward(self,x):
        y=self.dense(x)
        y=self.activation(y)
        y=self.dropOut()

        return y

#class for convolutional layers
class ConvLayer(nn.Module):
    def __init__(self,in_ch,out_ch,kernel_size,stride,padding,dropOut_p):
        super().__init_(self)
        self.conv=nn.Conv2d(in_ch,out_ch,kernel_size, stride=stride,padding=padding)
        self.activation=nn.ReLU()
        self.dropOut=nn.Dropout(p=dropOut_p,inplace=False)
    
    def forward(self,x):
        y=self.conv(x)
        y=self.activation(y)
        y=self.dropOut()

        return y

def Loss_SDR(yhat,y):
    loss= 1

#RNN model for Residual echo suppression
class RES(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, dropout_prob):
        super().__init__(self)
        # Defining the number of layers and the nodes in each layer
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        self.rnn = nn.RNN(
            input_dim, hidden_dim, layer_dim, batch_first=True, dropout=dropout_prob
        )
        self.fc1= DenseLayer(hidden_dim,output_dim,dropOut_p=dropout_prob)

    def forward(self, x):
        # Initializing hidden state for first input with zeros
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_()

        # Forward propagation by passing in the input and hidden state into the model
        out, h0 = self.rnn(x, h0.detach())

        # Reshaping the outputs in the shape of (batch_size, seq_length, hidden_size)
        # so that it can fit into the fully connected layer
        out = out[:, -1, :]

        # Convert the final state to our desired output shape (batch_size, output_dim)
        out = self.fc1(out)
        return out

In [62]:
dtype = torch.float32 # we will be using float throughout this tutorial
USE_GPU = True
if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

In [63]:
def flatten(x):
    N = x.shape[0] # read in N, C, H, W
    return x.view(N, -1)  # "flatten" the C * H * W values into a single vector per image

In [64]:
NUM_TRAIN=60
NUM_VAL=30
NUM_TEST=10
print_every = 100
dataset_train = DAPS()
loader_train = DataLoader(dataset_train, batch_size=1, num_workers=0,
                          sampler=sampler.SubsetRandomSampler(range(NUM_TRAIN)))

In [66]:
for t, (x, y) in enumerate(loader_train):
        # Move the data to the proper device (GPU or CPU)
        x = x.to(device=device, dtype=dtype)
        y = y.to(device=device, dtype=torch.long)

        print(x.shape)
        print(y.shape)

torch.Size([1, 101, 13876])
torch.Size([1, 101, 13876])
torch.Size([1, 101, 13230])
torch.Size([1, 101, 13230])
torch.Size([1, 101, 11261])
torch.Size([1, 101, 11261])
torch.Size([1, 101, 10857])
torch.Size([1, 101, 10857])
torch.Size([1, 101, 15908])
torch.Size([1, 101, 15908])


KeyboardInterrupt: 

In [65]:
def train_model(model_fn, params, learning_rate):
    """
    Train a model on CIFAR-10.
    
    Inputs:
    - model_fn: A Python function that performs the forward pass of the model.
      It should have the signature scores = model_fn(x, params) where x is a
      PyTorch Tensor of image data, params is a list of PyTorch Tensors giving
      model weights, and scores is a PyTorch Tensor of shape (N, C) giving
      scores for the elements in x.
    - params: List of PyTorch Tensors giving weights for the model
    - learning_rate: Python scalar giving the learning rate to use for SGD
    
    Returns: Nothing
    """
    for t, (x, y) in enumerate(loader_train):
        # Move the data to the proper device (GPU or CPU)
        x = x.to(device=device, dtype=dtype)
        y = y.to(device=device, dtype=torch.long)

        # Forward pass: compute scores and loss
        yhat = model_fn(x, params)
        loss = Loss_SDR(yhat, y)

        # Backward pass: PyTorch figures out which Tensors in the computational
        # graph has requires_grad=True and uses backpropagation to compute the
        # gradient of the loss with respect to these Tensors, and stores the
        # gradients in the .grad attribute of each Tensor.
        loss.backward()

        # Update parameters. We don't want to backpropagate through the
        # parameter updates, so we scope the updates under a torch.no_grad()
        # context manager to prevent a computational graph from being built.
        with torch.no_grad():
            for w in params:
                w -= learning_rate * w.grad

                # Manually zero the gradients after running the backward pass
                w.grad.zero_()

        if t % print_every == 0:
            print('Iteration %d, loss = %.4f' % (t, loss.item()))
            #check_accuracy_part2(loader_val, model_fn, params)
            print()