In [1]:
import warnings
warnings.simplefilter("ignore")

In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
from torch.utils.data import DataLoader, Dataset
import os

In [27]:
class SpeechCommandsDataset(Dataset):
    def __init__(self, subset):
        self.dataset = torchaudio.datasets.SPEECHCOMMANDS(root='../data', download=True, subset=subset)
        self.label_dict = os.listdir('../data/SpeechCommands/speech_commands_v0.02')

        self.data = []
        self.labels = []
        for i in range(len(self.dataset)):
            if self.dataset[i][0].shape == (1,16000):
                self.data.append(self.dataset[i][0])
                self.labels.append(self.dataset[i][2])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        waveform, label = self.data[index], self.labels[index]
        return waveform, self.label_dict.index(label)

In [51]:
class WaveformPyramidDataset(Dataset):
    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def downsample(self, waveform, factor):
        return F.interpolate(waveform.unsqueeze(1), size=waveform.size(-1) // factor, mode='linear', align_corners=True).squeeze(1)

    def __getitem__(self, idx):
        waveform, label = self.dataset[idx]
        
        g1 = waveform
        g2 = self.downsample(g1, 2)
        g3 = self.downsample(g2, 2)
        g4 = self.downsample(g3, 2)

        # Laplacian pyramid differences
        l1 = g1
        l2 = torch.abs(g1 - F.interpolate(g2.unsqueeze(1), size=g1.size(-1), mode='linear', align_corners=True).squeeze(1))
        l3 = torch.abs(g2 - F.interpolate(g3.unsqueeze(1), size=g2.size(-1), mode='linear', align_corners=True).squeeze(1))
        l4 = torch.abs(g3 - F.interpolate(g4.unsqueeze(1), size=g3.size(-1), mode='linear', align_corners=True).squeeze(1))

        # Ensure all Laplacians are of the same length
        l2 = F.interpolate(l2.unsqueeze(1), size=g2.size(-1), mode='linear', align_corners=True).squeeze(1)
        l3 = F.interpolate(l3.unsqueeze(1), size=g3.size(-1), mode='linear', align_corners=True).squeeze(1)
        l4 = F.interpolate(l4.unsqueeze(1), size=g4.size(-1), mode='linear', align_corners=True).squeeze(1)

        return [l1, l2, l3, l4], label

In [56]:
class PyramidBlock1D(nn.Module):
    def __init__(self, in_channels, out_channels, input_dim):
        super(PyramidBlock1D, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=1)
        self.ln1 = nn.LayerNorm([out_channels, input_dim])
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1)
        self.ln2 = nn.LayerNorm([out_channels, input_dim // 2])
        self.dropout = nn.Dropout(0.1)  
        self.pool = nn.MaxPool1d(2)

    def forward(self, x):
        x = torch.relu(self.ln1(self.conv1(x)))
        x = self.pool(x)
        x = torch.relu(self.ln2(self.conv2(x)))
        x = self.dropout(x)
        return x
    
class LaplacianNet1D(nn.Module):
    def __init__(self, input_shape, num_classes):
        super(LaplacianNet1D, self).__init__()
        self.fc_input = (256 + input_shape[0]) * (input_shape[1] // 8)  
        self.pyramid_block1 = PyramidBlock1D(input_shape[0], 64, input_shape[1])
        self.pyramid_block2 = PyramidBlock1D(64 + input_shape[0], 128, input_shape[1] // 2)
        self.pyramid_block3 = PyramidBlock1D(128 + input_shape[0], 256, input_shape[1] // 4)
        self.fc = nn.Linear(self.fc_input, 128)
        self.output = nn.Linear(128, num_classes)
        print('Using LaplacianNet1D')

    def forward(self, inputs):
        x_l1, x_l2, x_l3, x_l4 = inputs

        x1 = self.pyramid_block1(x_l1)
        x2 = self.pyramid_block2(torch.cat((x1, x_l2), dim=1))
        x3 = self.pyramid_block3(torch.cat((x2, x_l3), dim=1))
        x = torch.cat([x3, x_l4], dim=1)
        x = x.view(x.size(0), -1)
        x = nn.functional.relu(self.fc(x))
        return self.output(x)

In [64]:
def train(model, device, train_loader, optimizer, criterion, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = [d.to(device) for d in data], target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        if batch_idx % 10 == 0:
            print(f'Epoch: {epoch} [{batch_idx * len(data):5d}/{len(train_loader.dataset)} '
                  f'({100. * batch_idx / len(train_loader):2.0f}%)]\tLoss: {loss.item():.6f}', end='\r')

def test(model, device, test_loader, criterion):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = [d.to(device) for d in data], target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()  # Sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # Get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct:5d}/{len(test_loader.dataset)} '
          f'({100. * correct / len(test_loader.dataset):2.0f}%)\n')

In [58]:
train_dataset = SpeechCommandsDataset(subset='training')
test_dataset = SpeechCommandsDataset(subset='testing')

In [59]:
print(train_dataset.label_dict)

['happy', 'no', 'bird', 'five', 'six', 'marvin', 'seven', '_background_noise_', 'testing_list.txt', 'one', 'visual', 'four', 'LICENSE', 'wow', 'go', 'forward', 'down', 'tree', 'cat', 'yes', 'validation_list.txt', 'eight', 'house', 'up', 'off', 'README.md', 'stop', 'zero', 'learn', 'left', 'dog', 'two', 'right', 'backward', 'nine', 'three', 'bed', 'follow', '.DS_Store', 'on', 'sheila']


In [60]:
waveform, label = train_dataset.__getitem__(0)
print(waveform, label)

tensor([[-0.0658, -0.0709, -0.0753,  ..., -0.0700, -0.0731, -0.0704]]) 33


In [61]:
train_pyramid_dataset = WaveformPyramidDataset(train_dataset)
test_pyramid_dataset = WaveformPyramidDataset(test_dataset)

train_loader = DataLoader(train_pyramid_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_pyramid_dataset, batch_size=32, shuffle=False)

In [62]:
input_shape = train_dataset[0][0].shape 
num_classes = len(train_dataset.label_dict)
print("Input Shape:", input_shape)
print("Num Classes:", num_classes)

Input Shape: torch.Size([1, 16000])
Num Classes: 41


In [65]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = LaplacianNet1D(input_shape=input_shape, num_classes=num_classes).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0003)
criterion = nn.CrossEntropyLoss()

for epoch in range(1, 11): 
    train(model, device, train_loader, optimizer, criterion, epoch)
    test(model, device, test_loader, criterion)

Using LaplacianNet1D