2022 Kosuke Mori, Keisuke Toyoda, Kent Hino, Yusuke Kimura, and Takahiro Shinozaki

Speech command recognition tutorial

# Install toolkits

In [None]:
# TODO: tsubame実行用にモジュールをインストール
!python3 -m pip install --user torch
!python3 -m pip install --user torchaudio
!python3 -m pip install --user numpy
!python3 -m pip install --user matplotlib
!python3 -m pip install --user "git+https://github.com/tqdm/tqdm.git@devel#egg=tqdm"
!python3 -m pip install --user ipywidgets

# ここで一度KernelのRestart, Kernelタブ=>Restart Kernel

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch.optim as optim
import torchaudio
import os
import time
import numpy as np
import matplotlib.pyplot as plt
import tqdm
import IPython.display

# Check if GPU is available

In [None]:
if torch.cuda.is_available():
    print('CUDA（GPU）is available')
    device = 'cuda'
else:
    print('CUDA（GPU）is not available')
    device = 'cpu'

In [None]:
# Check the GPU status
! nvidia-smi

# Load Speech Commands dataset

Reference: https://pytorch.org/audio/stable/datasets.html

In [None]:
# You can controll the usage rate of the training data.
util_rate = 1.0

In [None]:
# Define a dataset class to arrange the features and labels
class TutorialDataset(Dataset):
    """Speech Commands dataset for this tutorial

        Args:
            subset (str): Subset type ['training', 'validation', 'testing'].
            label_dict (dict): Label dictionary ({'command1': 0, 'command2': 1, ...}).
            util_rate (float): Data utilization rate.
            transform (callable): Feature extraction function.

    """

    def __init__(self, subset, label_dict, util_rate=1.0, transform=None):
        assert subset in ['training', 'validation', 'testing']
        self.subset = subset
        self.label_dict = label_dict
        self.util_rate = util_rate
        self.transform = transform
        self.dataset = self._load_dataset()

    def _load_dataset(self):

        # Load all data
        dataset_all = torchaudio.datasets.SPEECHCOMMANDS(
            root='./', url='speech_commands_v0.02', download=True, subset=self.subset
        )

        # Extract 10-label data
        dataset = []
        for data in tqdm.tqdm(dataset_all, desc=f'{self.subset} dataset'):
            waveform, sample_rate, command, *_ = data
            if command in self.label_dict.keys():

                # Get label info
                label = torch.tensor(self.label_dict[command], dtype=torch.long)

                # Extract features
                feats = self.transform(waveform).squeeze(0)  # (f, t)
                feats = feats.transpose(0, 1)  # (f, t) -> (t, f)

                dataset.append([feats, label, waveform.squeeze(0), sample_rate, command])

        # Adjust the number of samples with util_rate
        assert self.util_rate == 1.0 or (self.util_rate != 1.0 and self.subset == 'training')
        dataset = dataset[: int(len(dataset) * self.util_rate)]

        return dataset

    def get_info(self, index):
        """

        Args:
            index (int): Index for dataset.

        Returns:
            list: [feats, label, waveform, sample_rate, command]

        """
        return self.dataset[index]

    def __getitem__(self, index):
        """

        Args:
            index (int): Index for the dataset.

        Returns:
            tuple: (feats, label)

        """
        return self.dataset[index][0], self.dataset[index][1]

    def __len__(self):
        return len(self.dataset)

In [None]:
# Define a command-label dectionary
label_dict = {'yes': 0, 'no': 1, 'up': 2, 'down': 3, 'left': 4, 'right': 5, 'on': 6, 'off': 7, 'stop': 8, 'go': 9}

# Prepare training, development, and test dataset
transform = torchaudio.transforms.MFCC(
    sample_rate=16000, n_mfcc=13, melkwargs={'n_fft': 400, 'hop_length': 160}
)
# This process may take more than 10 minutes
dataset_train = TutorialDataset('training', label_dict, util_rate, transform)
dataset_dev = TutorialDataset('validation', label_dict, 1.0, transform)
dataset_test = TutorialDataset('testing', label_dict, 1.0, transform)

print(f'# train samples: {len(dataset_train)} (using {util_rate * 100}%)')
print(f'# development samples: {len(dataset_dev)}')
print(f'# test samples: {len(dataset_test)}')

In [None]:
def custom_collate(batch):
    """Custom cocllate function to pad input sequences.

    Args: 
        batch: a batch of data

    Returns:
        tuple: (padded sequences, labels)

    """
    feats = [data[0] for data in batch]
    labels = [data[1] for data in batch]
    return pad_sequence(feats, batch_first=True, padding_value=0.0), torch.stack(labels, dim=0)

BatchSize = 256

# Prepare data loader for mini-batch training
loader_train = DataLoader(
    dataset_train, batch_size=BatchSize, collate_fn=custom_collate, shuffle=True, drop_last=True
)
loader_dev = DataLoader(dataset_dev, batch_size=BatchSize, collate_fn=custom_collate)
loader_test = DataLoader(dataset_test, batch_size=BatchSize, collate_fn=custom_collate)

# Display data

In [None]:
# Load a training sample
feats, label, waveform, sample_rate, command = dataset_train.get_info(0)

# Plot the waveform
fig = plt.figure(figsize=(12, 4))
fig.suptitle(f'Command: {command}')
ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(np.arange(len(waveform)) / sample_rate, waveform.numpy())
ax1.set_xlabel('Time [sec]')
ax1.set_xlim(0, len(waveform)/sample_rate)
ax1.set_title('Waveform')
ax1.grid(True, linestyle='--')

# Plot the extracted features
ax2 = fig.add_subplot(1, 2, 2)
img = ax2.imshow(feats.numpy().T, origin='lower', aspect='auto', cmap=plt.get_cmap('viridis'))
ax2.set_xlabel('Frame')
ax2.set_title('Features')
plt.colorbar(img)
plt.show()

# Display the audio
IPython.display.Audio(waveform, rate=sample_rate)

# Define a neural network model

References:
* S. Karita, N.E.Y. Soplin, S. Watanabe, M. Delcroix, A. Ogawa and T. Nakatani, "Improving Transformer-Based End-to-End Speech Recognition with Connectionist Temporal Classification and Language Model Integration," in *Procceedings of Interspeech* ,2019, 1408-1412.
* https://github.com/espnet/espnet



In [None]:
class NeuralNetworkModel(nn.Module):
    """Neural network model (Transformer-based).

    Args:
        idim (int): Input feature dimension.
        d_att (int): Attention dimension.
        n_heads (int): The number of attention heads.
        d_ff (int): Dimension of feed forward network.
        dropout_rate (float): Dropout rate.
        n_layers (int): The number of encoder layers.
        d_linear (int): Dimension of a hidden layer of the classifier.
        n_classes (int): The number of the output classes.

    """

    def __init__(
        self, 
        idim=13, 
        d_att=64, 
        n_heads=2, 
        d_ff=512, 
        dropout_rate=0.1, 
        n_layers=3, 
        n_classes=10
    ):
        super().__init__()
        self.subsampling = Subsampling(idim=idim, d_att=d_att)
        self.positional_encoding = PositionalEncoding()
        self.encoder_layers = nn.Sequential()
        for i in range(n_layers):
            self.encoder_layers.add_module(
                f'EncoderLayer{i}', 
                TransformerEncoderLayer(d_att, n_heads, d_ff, dropout_rate)
            )
        self.norm = nn.LayerNorm(d_att)
        self.dropout = nn.Dropout(dropout_rate)
        self.out = nn.Linear(d_att, n_classes)

    def forward(self, x):
        """Recognize the input speech commands.

        Args:
            x (torch.Tensor): Input features (batch, tmax, idim).

        Returns:
            torch.Tensor: Recognized classes (batch, num_classes).

        """
        # Transformer encoder
        x = self.subsampling(x)
        x = self.positional_encoding(x)
        x = self.encoder_layers(x)
        x = self.norm(x)

        # Classifier
        x = torch.mean(x, dim=1)  # (b, t, d_att) -> (b, d_att)
        x = self.dropout(x)
        x = self.out(x)

        return x

class TransformerEncoderLayer(nn.Module):
    """A Transformer encoder layer.

    Args:
        d_att (int): Attention dimension.
        d_head (int): The number of attention heads.
        d_ff (int): Dimension of feed forward network.
        dropout_rate (float): Dropout rate.

    """

    def __init__(self, d_att, d_head, d_ff, dropout_rate=0.1):
        super().__init__()
        self.mha = MultiHeadAttention(d_att, d_head, dropout_rate)
        self.ff = FeedForward(d_att, d_ff, dropout_rate)
        self.norm_mha = nn.LayerNorm(d_att)
        self.norm_ff = nn.LayerNorm(d_att)
        self.dropout = nn.Dropout(dropout_rate)
    
    def forward(self, x):
        """

        Args:
            x (torch.Tensor): Pre-encoded inputs (batch, tmax, d_att).

        Returns:
            torch.Tensor: Encoded outputs (batch, tmax, d_att).

        """
        # Multi-Head Attention
        res = x
        x = self.norm_mha(x)
        x = res + self.dropout(self.mha(x, x, x))

        # Feed-Forward
        res = x
        x = self.norm_ff(x)
        x = res + self.dropout(self.ff(x))

        return x

class Subsampling(nn.Module):
    """Convolutional Subsampling.

    Args:
        idim (int): Input feature dimension.
        d_att (int): Attention dimension.

    """

    def __init__(self, idim, d_att):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, d_att, kernel_size=(3, 3), stride=(2, 2)),
            nn.ReLU(),
            nn.Conv2d(d_att, d_att, kernel_size=(3, 3), stride=(2, 2)),
            nn.ReLU(),
        )
        self.linear = nn.Linear(d_att * (((idim - 1) // 2 - 1) // 2), d_att)

    def forward(self, x):
        """

        Args:
            x (torch.Tensor): Input features (batch, tmax, idim).

        Returns:
            torch.Tensor: Subsampled features (batch, tmax', d_att).

        """
        x = x.unsqueeze(1)  # (b, t, idim) -> (b, c=1, t, idim)
        x = self.conv(x)
        b, c, t, f = x.size()
        x = x.transpose(1, 2).contiguous().view(b, t, c * f)  # (b, c, t, f) -> (b, t, c * t)
        x = self.linear(x)

        return x

class PositionalEncoding(nn.Module):
    """Positional Encoding.

    Args:
        idim (int): Input feature dimension.

    """

    def __init__(self):
        super().__init__()

    def forward(self, x):
        """

        Args:
            x: (torch.Tensor): Subsampled features (batch, tmax, d_att).

        Returns:
            torch.Tensor: Encoded features (batch, tmax, d_att).

        """
        _, tmax, d_att = x.size()
        pos = torch.arange(0, tmax, dtype=torch.float32).unsqueeze(1)
        pe = torch.zeros(1, tmax, d_att, dtype=torch.float32).to(x.device)
        pe[:, :, 0::2] = torch.sin(pos / torch.pow(10000, torch.arange(0, d_att, 2) / d_att))
        pe[:, :, 1::2] = torch.cos(pos / torch.pow(10000, torch.arange(0, d_att, 2) / d_att))
        x = x + pe

        return x

class MultiHeadAttention(nn.Module):
    """Multi-Head Attention.

    Args:
        d_att (int): Dimension of attention.
        d_head (int): The number of attention heads.
        dropout_rate (float): Dropout rate.

    """

    def __init__(self, d_att, n_heads, dropout_rate):
        super().__init__()
        self.linear_q = nn.Linear(d_att, d_att)
        self.linear_k = nn.Linear(d_att, d_att)
        self.linear_v = nn.Linear(d_att, d_att)
        self.linear_head = nn.Linear(d_att, d_att)
        self.dropout = nn.Dropout(dropout_rate)
        self.n_heads = n_heads
        self.d_comn = d_att // self.n_heads

    def forward(self, q, k, v):
        """

        Args:
            q: (torch.Tensor): Query (batch, tmax, d_att).
            k: (torch.Tensor): Key (batch, tmax, d_att).
            v: (torch.Tensor): Value (batch, tmax, d_att).

        Returns:
            torch.Tensor: Output shape (batch, tmax, d_att).

        """
        # Linear
        qw = self.linear_q(q)
        kw = self.linear_k(k)
        vw = self.linear_v(v)

        # Reshape tensor (b, t, d_att) -> (b, n_heads, t, d_comn)
        b, t, d_att = q.size()
        qw = qw.view(b, t, self.n_heads, self.d_comn).transpose(1, 2)
        kw = kw.view(b, t, self.n_heads, self.d_comn).transpose(1, 2)
        vw = vw.view(b, t, self.n_heads, self.d_comn).transpose(1, 2)

        # Dot-attention
        matmul = torch.matmul(qw, kw.transpose(2, 3)) 
        scale = matmul / torch.sqrt(torch.tensor(self.d_comn))
        softmax = torch.softmax(scale, dim=-1)
        att = torch.matmul(self.dropout(softmax), vw)  # (b, n_heads, t, d_comn)

        # Concatenate
        att = att.transpose(1, 2).contiguous().view(b, -1, self.n_heads * self.d_comn)  # (b, t, d_att)

        # Linear
        mha = self.linear_head(att)

        return mha

class FeedForward(nn.Module):
    """Feed-Forward Network.

    Args:
        d_ff (int): Dimension of feed-forward network.

    """

    def __init__(self, d_att, d_ff, dropout_rate):
        super().__init__()
        self.ff = nn.Sequential(
            nn.Linear(d_att, d_ff), nn.ReLU(), nn.Dropout(dropout_rate), nn.Linear(d_ff, d_att)
        )

    def forward(self, x):
        """

        Args:
            x: (torch.Tensor): Input shape (batch, tmax, d_att).

        Returns:
            torch.Tensor: Output shape (batch, tmax, d_att).

        """
        return self.ff(x)

# Train the network

In [None]:
# Build a model
model = NeuralNetworkModel().to(device)
print(f"# model parameters: {sum(p.numel() for p in model.parameters()):,}")

# Define an optimizer and a loss function
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)
criterion = nn.CrossEntropyLoss()

epochs = 50
train_acc_list = []
train_loss_list = []
dev_acc_list = []
dev_loss_list = []
best_loss = float("inf")

# Make a directory to save the results
os.makedirs("result", exist_ok=True)

# Train the model
training_start_time = time.time()
for epoch in tqdm.trange(epochs, desc='Overall'):

    batch_train_loss = 0
    batch_train_acc = 0
    batch_dev_loss = 0
    batch_dev_acc = 0

    # Training
    model.train()
    for feats, labels in loader_train:
        feats, labels = feats.to(device), labels.to(device)

        # Reset gradients
        optimizer.zero_grad()

        # Forward
        outputs = model(feats)
        loss = criterion(outputs, labels)
        
        # Backward
        loss.backward()

        # Update weights
        optimizer.step()

        batch_train_loss += loss.item()
        batch_train_acc += (outputs.max(1)[1] == labels).sum().item()

    train_loss_list.append(batch_train_loss / len(loader_train))
    train_acc_list.append(batch_train_acc / len(loader_train.dataset))

    # Validation
    model.eval()
    with torch.no_grad():
        for feats, labels in loader_dev:
            feats, labels = feats.to(device), labels.to(device)

            # Forward
            outputs = model(feats)
            loss = criterion(outputs, labels)

            batch_dev_loss += loss.item()
            batch_dev_acc += (outputs.max(1)[1] == labels).sum().item()

    dev_loss_list.append(batch_dev_loss / len(loader_dev))
    dev_acc_list.append(batch_dev_acc / len(loader_dev.dataset))

    # Save the best model
    if best_loss > dev_loss_list[-1]:
        torch.save(model.state_dict(), "result/model_best_loss.pth")
        best_loss = dev_loss_list[-1]

    # Print statistics
    print(
        "Epoch: {}/{} - train_loss: {:.4f} - train_acc: {:.4f} - dev_loss: {:.4f} - dev_acc: {:.4f}".format(
            epoch+1, epochs, train_loss_list[-1], train_acc_list[-1], dev_loss_list[-1], dev_acc_list[-1]
        )
    )

# Save the training time
training_time = time.time() - training_start_time
with open("result/training_time.txt", "w") as time_f:
    time_f.write(f"training time = {int(training_time)} (sec)\n")

# **Plot the learning curve**

In [None]:
# Plot the accuracy
fig = plt.figure(figsize=(12, 4))
fig.suptitle('Learning curve')
ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(np.arange(1, epochs+1), train_acc_list, label="train_acc")
ax1.plot(np.arange(1, epochs+1), dev_acc_list, label="dev_acc")
ax1.set_title("Accuracy")
ax1.set_xlabel("Epoch")
ax1.set_ylabel("Accuracy")
ax1.grid(True, linestyle='--')
ax1.legend(loc='lower right')

# Plot the loss
ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(np.arange(1, epochs+1), train_loss_list, label="train_loss")
ax2.plot(np.arange(1, epochs+1), dev_loss_list, label="dev_loss")
ax2.set_title("Loss")
ax2.set_xlabel("Epoch")
ax2.set_ylabel("Loss")
ax2.grid(True, linestyle='--')
ax2.legend(loc='upper right')
plt.show()
plt.savefig("result/learning_curve.pdf")

# **Test the network**

In [None]:
# Load the trained model
model = NeuralNetworkModel()
model.load_state_dict(torch.load("result/model_best_loss.pth"))
model = model.to(device)

# Test the model
batch_test_acc = 0
model.eval()
with torch.no_grad():
    for feats, labels in loader_test:
        feats, labels = feats.to(device), labels.to(device)

        # Forward
        outputs = model(feats)
        
        batch_test_acc += (outputs.max(1)[1] == labels).sum().item()

test_acc = batch_test_acc / len(loader_test.dataset)

# Print the test result
print(f"test accuracy: {test_acc*100:.2f} %")

# Save the test result
with open("result/score.txt", "w") as score_f:
    score_f.write(f"test accuracy = {test_acc*100:.2f} %")

In [None]:
! cat result/training_time.txt
! cat result/score.txt

# **Display a recognition result**

In [None]:
# You can change the audio id of the test set from 0 to 4073.
test_id = 0

In [None]:
# Load a test sample
feats, label, waveform, sample_rate, command = dataset_test.get_info(test_id)

# Recognize the test sample
model = NeuralNetworkModel()
model.load_state_dict(torch.load("result/model_best_loss.pth"))
model = model.to(device)
model.eval()
with torch.no_grad():
    output = model(feats.unsqueeze(0).to(device))
    pred_class = output.max(1)[1]
    pred_command = [k for k, v in label_dict.items() if v == pred_class][0]
print(f"groundtruth: {command}")
print(f"prediction   : {pred_command}")

# Plot the waveform
fig = plt.figure(figsize=(12, 4))
ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(np.arange(len(waveform)) / sample_rate, waveform.numpy())
ax1.set_xlabel('Time [sec]')
ax1.set_xlim(0, len(waveform)/sample_rate)
ax1.set_title('Waveform')
ax1.grid(True, linestyle='--')

# Plot the extracted features
ax2 = fig.add_subplot(1, 2, 2)
img = ax2.imshow(feats.numpy().T, origin='lower', aspect='auto', cmap=plt.get_cmap('viridis'))
ax2.set_xlabel('Frame')
ax2.set_title('Features')
plt.colorbar(img)
plt.show()

# Display the audio
IPython.display.Audio(waveform, rate=sample_rate)