This notebook is based on [DLA Seminar](https://github.com/markovka17/dla/blob/2022/week06/seminar.ipynb)

In [None]:
def train_one_epoch(model, dataloader, criterion, optimizer, scheduler, device, epoch):
    model.train()

    avg_loss = 0
    step = epoch * len(dataloader)
    for batch_idx, (wav, label) in tqdm(enumerate(dataloader), total=len(dataloader)):
        wav, label = wav.to(device), label.to(device)

        preds = model(wav)
        loss = criterion(preds, label)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        scheduler.step()

        avg_loss += loss.item()
        
        wandb.log({
            "train_step_loss": loss.item(),
            "lr": scheduler.get_last_lr()[0], # get current lr for the 0th param group
            "acc_step": count_acc(preds, label),
            "fa_step": count_fa(preds, label),
            "fr_step": count_fr(preds, label),
        }, step=step + batch_idx)

        if batch_idx == 0:        
            wandb.log({"train_image": wandb.Audio(wav[0].detach().cpu().numpy(), sample_rate=16000,
                                                  caption=f"Label: {label[0]}, Pred: {preds[0].argmax(-1)}")},
                      step=step+batch_idx)

    avg_loss = avg_loss / (batch_idx + 1)
    return avg_loss


def evaluate(model, dataloader, criterion, device):
    model.eval()

    avg_loss = 0
    accuracy = 0
    fa = 0
    fr = 0
    total_elements = 0
    for batch_idx, (wav, label) in enumerate(dataloader):
        wav, label = wav.to(device), label.to(device)

        preds = model(wav)
        loss = criterion(preds, label)

        accuracy += count_acc(preds, label)
        fa += count_fa(preds, label)
        fr += count_fr(preds, label)
        avg_loss += loss.item()
        

    avg_loss = avg_loss / (batch_idx + 1)
    accuracy = accuracy / (batch_idx + 1)
    fa = fa / (batch_idx + 1)
    fr = fr / (batch_idx + 1)

    return avg_loss, accuracy, fa, fr


def train(model, train_dataloader, val_dataloader, criterion, optimizer, scheduler, device, n_epochs):

    train_avg_losses = []
    val_avg_losses = []
    val_accuracy_list = []

    for epoch in range(n_epochs):        
        train_avg_loss = train_one_epoch(model, train_dataloader, criterion, optimizer, scheduler, device, epoch)
        val_avg_loss, val_accuracy, val_fa, val_fr = evaluate(model, val_dataloader, criterion, device)

        wandb.log({
            "train_avg_loss": train_avg_loss, 
            "val_avg_loss": val_avg_loss,
            "val_accuracy": val_accuracy,
            "val_fa": val_fa,
            "val_fr": val_fr,
        }, step=(epoch + 1) * len(train_dataloader))

In [None]:
import dataclasses
import torch
from typing import Tuple, Union, List, Callable, Optional

@dataclasses.dataclass
class TaskConfig:
    keyword: str = 'sheila'  # We will use 1 key word -- 'sheila'
    batch_size: int = 128
    learning_rate: float = 3e-4
    weight_decay: float = 1e-5
    num_epochs: int = 20
    n_mels: int = 40
    cnn_out_channels: int = 8
    kernel_size: Tuple[int, int] = (5, 20)
    stride: Tuple[int, int] = (2, 8)
    hidden_size: int = 64
    gru_num_layers: int = 2
    bidirectional: bool = False
    num_classes: int = 2
    sample_rate: int = 16000
    device: torch.device = torch.device(
        'cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
# !wget http://download.tensorflow.org/data/speech_commands_v0.01.tar.gz -O speech_commands_v0.01.tar.gz
# !mkdir speech_commands && tar -C speech_commands -xvzf speech_commands_v0.01.tar.gz 1> log

In [None]:
from torch.utils.data import Dataset, DataLoader

In [None]:
import pandas as pd
from pathlib import Path
import json
from collections import OrderedDict

def read_json(fname):
    fname = Path(fname)
    with fname.open("rt") as handle:
        return json.load(handle, object_hook=OrderedDict)


def write_json(content, fname):
    fname = Path(fname)
    with fname.open("wt") as handle:
        json.dump(content, handle, indent=4, sort_keys=False)

In [None]:
class SpeechCommandDataset(Dataset):

    def __init__(
        self,
        transform: Optional[Callable] = None,
        path2dir: str = None,
        keywords: Union[str, List[str]] = None,
        csv: Optional[pd.DataFrame] = None,
        part: "str" = "train",
    ):        
        self.transform = transform

        self.path2dir = path2dir
        self.keywords = keywords
        self.index = self.create_or_load_index(part)

    def create_or_load_index(self, part):
        index_path = Path(f"{part}_index.json")
        
        if not index_path.exists():
            self.create_index(part)
            
        return read_json(index_path)

    def create_index(self, part):
        path2dir = Path(self.path2dir)
        keywords = self.keywords if isinstance(self.keywords, list) else [self.keywords]
        
        all_keywords = [
            p.stem for p in path2dir.glob('*')
            if p.is_dir() and not p.stem.startswith('_')
        ]

        index = []
        for keyword in all_keywords:
            paths = (path2dir / keyword).rglob('*.wav')
            if keyword in keywords:
                for path2wav in paths:
                    index.append({
                        "path": path2wav.as_posix(),
                        "keyword": keyword,
                        "label": 1
                    })
            else:
                for path2wav in paths:
                    index.append({
                        "path": path2wav.as_posix(),
                        "keyword": keyword,
                        "label": 0
                    })

        torch.manual_seed(0)
        indexes = torch.randperm(len(index))
        train_indexes = indexes[:int(len(index) * 0.8)]
        val_indexes = indexes[int(len(index) * 0.8):]

        train_index = [index[i] for i in train_indexes]
        val_index = [index[i] for i in val_indexes]

        train_index_path = pathlib.Path("train_index.json")
        write_json(train_index, str(train_index_path))
        
        val_index_path = pathlib.Path("val_index.json")
        write_json(val_index, str(val_index_path))

    def __getitem__(self, ind: int):
        instance = self.index[ind]

        path2wav = instance['path']
        wav, sr = torchaudio.load(path2wav)
        wav = wav.sum(dim=0)
        
        if self.transform:
            wav = self.transform(wav)

        return {
            'wav': wav,
            'keywors': instance['keyword'],
            'label': instance['label']
        }

    def __len__(self):
        return len(self.index)


In [None]:
class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return outputclass CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output


class CRNNV3(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output

In [None]:
class SpeechCommandDatasetV3(Dataset):

    def __init__(
        self,
        transform: Optional[Callable] = None,
        path2dir: str = None,
        keywords: Union[str, List[str]] = None,
        csv: Optional[pd.DataFrame] = None,
        part: "str" = "train",
    ):        
        self.transform = transform

        self.path2dir = path2dir
        self.keywords = keywords
        self.index = self.create_or_load_index(part)

    def create_or_load_index(self, part):
        index_path = Path(f"{part}_index.json")
        
        if not index_path.exists():
            self.create_index(part)
            
        return read_json(index_path)

    def create_index(self, part):
        path2dir = Path(self.path2dir)
        keywords = self.keywords if isinstance(self.keywords, list) else [self.keywords]
        
        all_keywords = [
            p.stem for p in path2dir.glob('*')
            if p.is_dir() and not p.stem.startswith('_')
        ]

        index = []
        for keyword in all_keywords:
            paths = (path2dir / keyword).rglob('*.wav')
            if keyword in keywords:
                for path2wav in paths:
                    index.append({
                        "path": path2wav.as_posix(),
                        "keyword": keyword,
                        "label": 1
                    })
            else:
                for path2wav in paths:
                    index.append({
                        "path": path2wav.as_posix(),
                        "keyword": keyword,
                        "label": 0
                    })

        torch.manual_seed(0)
        indexes = torch.randperm(len(index))
        train_indexes = indexes[:int(len(index) * 0.8)]
        val_indexes = indexes[int(len(index) * 0.8):]

        train_index = [index[i] for i in train_indexes]
        val_index = [index[i] for i in val_indexes]

        train_index_path = pathlib.Path("train_index.json")
        write_json(train_index, str(train_index_path))
        
        val_index_path = pathlib.Path("val_index.json")
        write_json(val_index, str(val_index_path))

    def __getitem__(self, ind: int):
        instance = self.index[ind]

        path2wav = instance['path']
        wav, sr = torchaudio.load(path2wav)
        wav = wav.sum(dim=0)
        
        if self.transform:
            wav = self.transform(wav)

        return {
            'wav': wav,
            'keywors': instance['keyword'],
            'label': instance['label']
        }

    def __len__(self):
        return len(self.index)


In [None]:
class SpeechCommandDatasetV4(Dataset):

    def __init__(
        self,
        transform: Optional[Callable] = None,
        path2dir: str = None,
        keywords: Union[str, List[str]] = None,
        csv: Optional[pd.DataFrame] = None,
        part: "str" = "train",
    ):        
        self.transform = transform

        self.path2dir = path2dir
        self.keywords = keywords
        self.index = self.create_or_load_index(part)

    def create_or_load_index(self, part):
        index_path = Path(f"{part}_index.json")
        
        if not index_path.exists():
            self.create_index(part)
            
        return read_json(index_path)

    def create_index(self, part):
        path2dir = Path(self.path2dir)
        keywords = self.keywords if isinstance(self.keywords, list) else [self.keywords]
        
        all_keywords = [
            p.stem for p in path2dir.glob('*')
            if p.is_dir() and not p.stem.startswith('_')
        ]

        index = []
        for keyword in all_keywords:
            paths = (path2dir / keyword).rglob('*.wav')
            if keyword in keywords:
                for path2wav in paths:
                    index.append({
                        "path": path2wav.as_posix(),
                        "keyword": keyword,
                        "label": 1
                    })
            else:
                for path2wav in paths:
                    index.append({
                        "path": path2wav.as_posix(),
                        "keyword": keyword,
                        "label": 0
                    })

        torch.manual_seed(0)
        indexes = torch.randperm(len(index))
        train_indexes = indexes[:int(len(index) * 0.8)]
        val_indexes = indexes[int(len(index) * 0.8):]

        train_index = [index[i] for i in train_indexes]
        val_index = [index[i] for i in val_indexes]

        train_index_path = pathlib.Path("train_index.json")
        write_json(train_index, str(train_index_path))
        
        val_index_path = pathlib.Path("val_index.json")
        write_json(val_index, str(val_index_path))

    def __getitem__(self, ind: int):
        instance = self.index[ind]

        path2wav = instance['path']
        wav, sr = torchaudio.load(path2wav)
        wav = wav.sum(dim=0)
        
        if self.transform:
            wav = self.transform(wav)

        return {
            'wav': wav,
            'keywors': instance['keyword'],
            'label': instance['label']
        }

    def __len__(self):
        return len(self.index)


In [None]:
import torchaudio

class AugsCreation:

    def __init__(self):
        self.background_noises = [
            'speech_commands/_background_noise_/white_noise.wav',
            'speech_commands/_background_noise_/dude_miaowing.wav',
            'speech_commands/_background_noise_/doing_the_dishes.wav',
            'speech_commands/_background_noise_/exercise_bike.wav',
            'speech_commands/_background_noise_/pink_noise.wav',
            'speech_commands/_background_noise_/running_tap.wav'
        ]

        self.noises = [
            torchaudio.load(p)[0].squeeze()
            for p in self.background_noises
        ]

    def add_rand_noise(self, audio):

        # randomly choose noise
        noise_num = torch.randint(low=0, high=len(
            self.background_noises), size=(1,)).item()
        noise = self.noises[noise_num]

        noise_level = torch.Tensor([1])  # [0, 40]

        noise_energy = torch.norm(noise)
        audio_energy = torch.norm(audio)
        alpha = (audio_energy / noise_energy) * \
            torch.pow(10, -noise_level / 20)

        start = torch.randint(
            low=0,
            high=max(int(noise.size(0) - audio.size(0) - 1), 1),
            size=(1,)
        ).item()
        noise_sample = noise[start: start + audio.size(0)]

        audio_new = audio + alpha * noise_sample
        audio_new.clamp_(-1, 1)
        return audio_new

    def __call__(self, wav):
        aug_num = torch.randint(low=0, high=4, size=(1,)).item()   # choose 1 random aug from augs
        augs = [
            lambda x: x,
            lambda x: (x + torch.distributions.Normal(0, 0.01).sample(x.size())).clamp_(-1, 1),
            lambda x: torchaudio.transforms.Vol(.25)(x),
            lambda x: self.add_rand_noise(x)
        ]

        return augs[aug_num](wav)

In [None]:
train_dataset = SpeechCommandDataset(
    path2dir='speech_commands', keywords=TaskConfig.keyword, part="train", transform=AugsCreation()
)
val_dataset = SpeechCommandDataset(
    path2dir='speech_commands', keywords=TaskConfig.keyword, part="val"
)

In [None]:
train_dataset.index[:2]

In [None]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(data):
    wavs = []
    labels = []    

    for el in data:
        wavs.append(el['wav'])
        labels.append(el['label'])

    # torch.nn.utils.rnn.pad_sequence takes list(Tensors) and returns padded (with 0.0) Tensor
    wavs = pad_sequence(wavs, batch_first=True)    
    labels = torch.Tensor(labels).long()
    return wavs, labels

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=TaskConfig.batch_size,
                          shuffle=False, collate_fn=collate_fn,
                          num_workers=2, pin_memory=True)

val_dataloader = DataLoader(val_dataset, batch_size=TaskConfig.batch_size,
                        shuffle=False, collate_fn=collate_fn,
                        num_workers=2, pin_memory=True)

In [None]:
from torch import nn

In [None]:
class LogMelspec(nn.Module):

    def __init__(self, config):
        super().__init__()
        self.melspec = torchaudio.transforms.MelSpectrogram(
                sample_rate=config.sample_rate,
                n_fft=400,
                win_length=400,
                hop_length=160,
                n_mels=config.n_mels
        )

        self.spec_augs = nn.Sequential(
                torchaudio.transforms.FrequencyMasking(freq_mask_param=15),
                torchaudio.transforms.TimeMasking(time_mask_param=35),
        )


    def __call__(self, batch):
        x = torch.log(self.melspec(batch).clamp_(min=1e-9, max=1e9))
        if self.training:
            x = self.spec_augs(x)
        return x

In [None]:
class SpeechCommandDatasetV2(Dataset):

    def __init__(
        self,
        transform: Optional[Callable] = None,
        path2dir: str = None,
        keywords: Union[str, List[str]] = None,
        csv: Optional[pd.DataFrame] = None,
        part: "str" = "train",
    ):        
        self.transform = transform

        self.path2dir = path2dir
        self.keywords = keywords
        self.index = self.create_or_load_index(part)

    def create_or_load_index(self, part):
        index_path = Path(f"{part}_index.json")
        
        if not index_path.exists():
            self.create_index(part)
            
        return read_json(index_path)

    def create_index(self, part):
        path2dir = Path(self.path2dir)
        keywords = self.keywords if isinstance(self.keywords, list) else [self.keywords]
        
        all_keywords = [
            p.stem for p in path2dir.glob('*')
            if p.is_dir() and not p.stem.startswith('_')
        ]

        index = []
        for keyword in all_keywords:
            paths = (path2dir / keyword).rglob('*.wav')
            if keyword in keywords:
                for path2wav in paths:
                    index.append({
                        "path": path2wav.as_posix(),
                        "keyword": keyword,
                        "label": 1
                    })
            else:
                for path2wav in paths:
                    index.append({
                        "path": path2wav.as_posix(),
                        "keyword": keyword,
                        "label": 0
                    })

        torch.manual_seed(0)
        indexes = torch.randperm(len(index))
        train_indexes = indexes[:int(len(index) * 0.8)]
        val_indexes = indexes[int(len(index) * 0.8):]

        train_index = [index[i] for i in train_indexes]
        val_index = [index[i] for i in val_indexes]

        train_index_path = pathlib.Path("train_index.json")
        write_json(train_index, str(train_index_path))
        
        val_index_path = pathlib.Path("val_index.json")
        write_json(val_index, str(val_index_path))

    def __getitem__(self, ind: int):
        instance = self.index[ind]

        path2wav = instance['path']
        wav, sr = torchaudio.load(path2wav)
        wav = wav.sum(dim=0)
        
        if self.transform:
            wav = self.transform(wav)

        return {
            'wav': wav,
            'keywors': instance['keyword'],
            'label': instance['label']
        }

    def __len__(self):
        return len(self.index)


In [None]:
class Attention(nn.Module):

    def __init__(self, hidden_size: int):
        super().__init__()

        self.energy = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.Tanh(),
            nn.Linear(hidden_size, 1)
        )
    
    def forward(self, input):
        energy = self.energy(input)
        alpha = torch.softmax(energy, dim=-2)
        return (input * alpha).sum(dim=-2)

class SpeechCommandDatasetV5(Dataset):

    def __init__(
        self,
        transform: Optional[Callable] = None,
        path2dir: str = None,
        keywords: Union[str, List[str]] = None,
        csv: Optional[pd.DataFrame] = None,
        part: "str" = "train",
    ):        
        self.transform = transform

        self.path2dir = path2dir
        self.keywords = keywords
        self.index = self.create_or_load_index(part)

    def create_or_load_index(self, part):
        index_path = Path(f"{part}_index.json")
        
        if not index_path.exists():
            self.create_index(part)
            
        return read_json(index_path)

    def create_index(self, part):
        path2dir = Path(self.path2dir)
        keywords = self.keywords if isinstance(self.keywords, list) else [self.keywords]
        
        all_keywords = [
            p.stem for p in path2dir.glob('*')
            if p.is_dir() and not p.stem.startswith('_')
        ]

        index = []
        for keyword in all_keywords:
            paths = (path2dir / keyword).rglob('*.wav')
            if keyword in keywords:
                for path2wav in paths:
                    index.append({
                        "path": path2wav.as_posix(),
                        "keyword": keyword,
                        "label": 1
                    })
            else:
                for path2wav in paths:
                    index.append({
                        "path": path2wav.as_posix(),
                        "keyword": keyword,
                        "label": 0
                    })

        torch.manual_seed(0)
        indexes = torch.randperm(len(index))
        train_indexes = indexes[:int(len(index) * 0.8)]
        val_indexes = indexes[int(len(index) * 0.8):]

        train_index = [index[i] for i in train_indexes]
        val_index = [index[i] for i in val_indexes]

        train_index_path = pathlib.Path("train_index.json")
        write_json(train_index, str(train_index_path))
        
        val_index_path = pathlib.Path("val_index.json")
        write_json(val_index, str(val_index_path))

    def __getitem__(self, ind: int):
        instance = self.index[ind]

        path2wav = instance['path']
        wav, sr = torchaudio.load(path2wav)
        wav = wav.sum(dim=0)
        
        if self.transform:
            wav = self.transform(wav)

        return {
            'wav': wav,
            'keywors': instance['keyword'],
            'label': instance['label']
        }

    def __len__(self):
        return len(self.index)


class CRNN(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output

In [None]:
class CRNNV2(nn.Module):

    def __init__(self, config: TaskConfig):
        super().__init__()
        self.config = config

        self.mel_spec = LogMelspec(config)

        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels=1, out_channels=config.cnn_out_channels,
                kernel_size=config.kernel_size, stride=config.stride
            ),
            nn.Flatten(start_dim=1, end_dim=2),
        )

        self.conv_out_frequency = (config.n_mels - config.kernel_size[0]) // \
            config.stride[0] + 1
        
        self.gru = nn.GRU(
            input_size=self.conv_out_frequency * config.cnn_out_channels,
            hidden_size=config.hidden_size,
            num_layers=config.gru_num_layers,
            dropout=0.1,
            bidirectional=config.bidirectional,
            batch_first=True
        )

        self.attention = Attention(config.hidden_size)
        self.classifier = nn.Linear(config.hidden_size, config.num_classes)
    
    def forward(self, input):
        input = self.mel_spec(input)
        
        input = input.unsqueeze(dim=1)
        conv_output = self.conv(input).transpose(-1, -2)
        gru_output, _ = self.gru(conv_output)
        contex_vector = self.attention(gru_output)
        output = self.classifier(contex_vector)
        return output

In [None]:
config = TaskConfig()
model = CRNN(config)
model

In [None]:
model(train_dataset[0]["wav"].unsqueeze(0))

In [None]:
from tqdm.auto import tqdm
import wandb

wandb.login()

In [None]:
criterion = nn.CrossEntropyLoss()

In [None]:
model = CRNNV2(...)

model.to(config.device)

NUM_EPOCHS = 2
optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS * len(train_dataloader), eta_min=1e-4)

with wandb.init(
                project="seminar_wandb_kws", # project name
                name="crnnv2" # run name within the project
            ) as run:
    train(model, train_dataloader, val_dataloader, criterion, optimizer, scheduler, config.device, NUM_EPOCHS)

In [None]:
# FA - true: 0, model: 1
# FR - true: 1, model: 0

def count_fa(preds, labels):
    preds = torch.argmax(preds, dim=-1)

    FA = torch.sum(preds[labels == 0])
    
    # torch.numel - returns total number of elements in tensor
    return FA.item() / torch.numel(preds)

def count_fr(preds, labels):
    preds = torch.argmax(preds, dim=-1)

    FR = torch.sum(labels[preds == 0])
    
    # torch.numel - returns total number of elements in tensor
    return FR.item() / torch.numel(preds)

def count_acc(preds, labels):
    preds = torch.argmax(preds, dim=-1)

    acc = torch.sum(preds == labels)
    
    # torch.numel - returns total number of elements in tensor
    return acc.item() / torch.numel(preds)

In [None]:
model.to(config.device)

NUM_EPOCHS = 2
optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS * len(train_dataloader), eta_min=1e-4)

In [None]:
with wandb.init(
                project="seminar_wandb_kws", # project name
                name="crnn" # run name within the project
            ) as run:
    train(model, train_dataloader, val_dataloader, criterion, optimizer, scheduler, config.device, NUM_EPOCHS)

In [None]:
model = CRNNV3(...)

model.to(config.device)

NUM_EPOCHS = 2
optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate, weight_decay=config.weight_decay)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS * len(train_dataloader), eta_min=1e-4)

with wandb.init(
                project="seminar_wandb_kws", # project name
                name="crnnv3" # run name within the project
            ) as run:
    train(model, train_dataloader, val_dataloader, criterion, optimizer, scheduler, config.device, NUM_EPOCHS)