### Spoken Language Processing
В этом задании предлагается обучить классификатор класса возраста по голосу (пример с тем, как это можно сделать для пола см. в семинаре)

Подумайте, как лучше предсказывать возраст (может быть разбить на группы?) и какой лосс использовать

P.S. не забудьте, что если то вы работает в Colab, то вы можете поменять среду выполнения на GPU/TPU!

Вопросы по заданию/материалам: @Nestyme

In [1]:
!pip3 install timit-utils==0.9.0
!pip3 install torchaudio
! wget https://ndownloader.figshare.com/files/10256148 
!unzip -q 10256148

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting timit-utils==0.9.0
  Downloading timit_utils-0.9.0-py3-none-any.whl (11 kB)
Collecting python-speech-features
  Downloading python_speech_features-0.6.tar.gz (5.6 kB)
Building wheels for collected packages: python-speech-features
  Building wheel for python-speech-features (setup.py) ... [?25l[?25hdone
  Created wheel for python-speech-features: filename=python_speech_features-0.6-py3-none-any.whl size=5888 sha256=ce3752b7f50bd1e0f441c838521e8d0d25081cb396feb95f96b019a1c5d0a47b
  Stored in directory: /root/.cache/pip/wheels/b0/0e/94/28cd6afa3cd5998a63eef99fe31777acd7d758f59cf24839eb
Successfully built python-speech-features
Installing collected packages: python-speech-features, timit-utils
Successfully installed python-speech-features-0.6 timit-utils-0.9.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
--2022-08-11 16:17:02-

In [2]:
import timit_utils as tu
import os
import librosa
import numpy as np
from tqdm import tqdm

import torch
import torch.nn as nn
from torch.optim import Adam
import torch.nn.functional as F

import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score

import IPython
_TIMIT_PATH = 'data/lisa/data/timit/raw/TIMIT'

In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

## Задание 1
Загрузите данные для обучения. Для этого:
1. Скачайте датасет TIMIT (см семинар)
2. Соберите пары "голос"  — "класс возраста" также, как на семинаре собирались пары "голос"  — "пол". Аудиодорожки сконвертируйте в мелспектрограммы при помощи `torchaudio либо` `librosa`

P.S. вы можете использовать свою реализацию, а можете предложенную (см следующие ячейки)

In [77]:
import timit_utils as tu
import os
import librosa
import numpy as np
from tqdm import tqdm
import torch as t


class timit_dataloader:
    def __init__(self, data_path=_TIMIT_PATH, train_mode=True, age_mode=True):
        self.doc_file_path = os.path.join(data_path, 'DOC', 'SPKRINFO.TXT')
        self.corpus = tu.Corpus(data_path)
        with open(self.doc_file_path) as f:
            self.id_age_dict = dict(
                [(tmp.split(' ')[0], 86 - int(tmp.split('  ')[5].split('/')[-1].replace('??', '50'))) \
                 for tmp in f.readlines()[39:]])
        if train_mode:
            self.trainset = self.create_dataset('train', age_mode=age_mode)
            self.validset = self.create_dataset('valid', age_mode=age_mode)
        self.testset = self.create_dataset('test', age_mode=age_mode)

    def return_age(self, id):
        return self.id_age_dict[id]

    def return_data(self):
        return self.trainset, self.validset, self.testset

    def return_test(self):
        return self.testset

    def create_dataset(self, mode, age_mode=False):
        global people
        assert mode in ['train', 'valid', 'test']
        if mode == 'train':
            people = [self.corpus.train.person_by_index(i) for i in range(350)]
        if mode == 'valid':
            people = [self.corpus.train.person_by_index(i) for i in range(350, 400)]
        if mode == 'test':
            people = [self.corpus.test.person_by_index(i) for i in range(150)]
        spectrograms_and_targets = []
        for person in tqdm(people):
              try:
                  target = self.return_age(person.name)
                  for i in range(len(person.sentences)):
                      spectrograms_and_targets.append(
                          self.preprocess_sample(person.sentence_by_index(i).raw_audio, target, age_mode=True))
              except:
                  print(person.name, target)

        X, y = map(np.stack, zip(*spectrograms_and_targets))
        X = X.transpose([0, 2, 1])  # to [batch, time, channels]
        return X, y

    @staticmethod
    def spec_to_image(spec, eps=1e-6):
        mean = spec.mean()
        std = spec.std()
        spec_norm = (spec - mean) / (std + eps)
        spec_min, spec_max = spec_norm.min(), spec_norm.max()
        spec_scaled = 255 * (spec_norm - spec_min) / (spec_max - spec_min)
        spec_scaled = spec_scaled.astype(np.uint8)
        return spec_scaled

    @staticmethod
    def clasterize_by_age(age):
        if age < 25:
            return 0
        elif 25 <= age < 40:
            return 1
        elif age >= 40:
            return 2

    def preprocess_sample(self, amplitudes, target, age_mode=False, sr=16000, max_length=150):
        spectrogram = librosa.feature.melspectrogram(amplitudes, sr=sr, n_mels=128, fmin=1, fmax=8192)[:, :max_length]
        spectrogram = np.pad(spectrogram, [[0, 0], [0, max(0, max_length - spectrogram.shape[1])]], mode='constant')
        target = self.clasterize_by_age(target)
        return self.spec_to_image(np.float32(spectrogram)), target

    def preprocess_sample_inference(self, amplitudes, sr=16000, max_length=150, device='cpu'):
        spectrogram = librosa.feature.melspectrogram(amplitudes, sr=sr, n_mels=128, fmin=1, fmax=8192)[:, :max_length]
        spectrogram = np.pad(spectrogram, [[0, 0], [0, max(0, max_length - spectrogram.shape[1])]], mode='constant')
        spectrogram = np.array([self.spec_to_image(np.float32(spectrogram))]).transpose([0, 2, 1])

        return t.tensor(spectrogram, dtype=t.float).to(device, non_blocking=True)


class dataloader:
    def __init__(self, spectrograms, targets):
        self.data = list(zip(spectrograms, targets))

    def next_batch(self, batch_size, device):
        indices = np.random.randint(len(self.data), size=batch_size)

        input = [self.data[i] for i in indices]

        source = [line[0] for line in input]
        target = [line[1] for line in input]
        batch, label = self.torch_batch(source, target, device)
        label = label.long()
        
        return batch, label

    @staticmethod
    def torch_batch(source, target, device):
        return tuple(
            [
                t.tensor(val, dtype=t.float).to(device, non_blocking=True)
                for val in [source, target]
            ]
        )

    @staticmethod
    def padd_sequences(lines, pad_token=0):
        lengths = [len(line) for line in lines]
        max_length = max(lengths)

        return np.array(
            [
                line + [pad_token] * (max_length - lengths[i])
                for i, line in enumerate(lines)
            ]
        )

Простая сверточная сеть, ее можно дотюнить или поменять по желанию

In [78]:
import torch
import torch.nn as nn
import torch.nn.functional as F


N_CLASSES = 3

class Model(nn.Module):
    def __init__(self, window_sizes=(3, 4, 5)):
        super(Model, self).__init__()

        self.convs = nn.ModuleList([
            nn.Conv2d(1, 128, [window_size, 128], padding=(window_size - 1, 0))
            for window_size in window_sizes
        ])

        self.fc = nn.Linear(128 * len(window_sizes), N_CLASSES)

    def forward(self, x):
        x = torch.unsqueeze(x, 1)  # [B, C, T, E] Add a channel dim.
        xs = []
        for conv in self.convs:
            x2 = F.relu(conv(x))  # [B, F, T, 1]
            x2 = torch.squeeze(x2, -1)  # [B, F, T]
            x2 = F.max_pool1d(x2, x2.size(2))  # [B, F, 1]
            xs.append(x2)
        x = torch.cat(xs, 2)  # [B, F, window]

        # FC
        x = x.view(x.size(0), -1)  # [B, F * window]
        logits = self.fc(x)  # [B, class]
        return logits

In [79]:
# _timit_dataloader = timit_dataloader()
# train, valid, test = _timit_dataloader.return_data()

In [80]:
train_loader = dataloader(*train)
valid_loader = dataloader(*valid)
test_loader = dataloader(*test)

#Задание 2
1. Обучите свой классификатор категории возраста
2. Попробуйте улучшить результат. Можно попробовать усложнить сетку, подвигать границы категорий, поискать новые данные, что угодно, кроме учиться на тесте :)
3. Какой подход оказался самым эффективным? Как думаете, почему?
4. Как считаете, где можно было бы применить такой классификатор в качестве вспомогательной задачи?


In [81]:
lr = 1e-3 # Предполагаемый learning rate. Он может быть больше или меньше :)

model = Model()
model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr)
criterion = torch.nn.CrossEntropyLoss()

In [82]:
len(train_loader.data)

3500

In [83]:
from tqdm.notebook import tqdm

batch_size = 32
num_epochs = 10
# Train loop
for e in range(num_epochs):
    model.train()
    train_loss = 0
    for _ in tqdm(range(len(train_loader.data) // batch_size)):
        batch, label = train_loader.next_batch(batch_size, device)
        optimizer.zero_grad()
        output = model(batch)
        loss = criterion(output, label.to(device))
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
        
        
    valid_loss = 0
    valid_acc = 0
    model.eval()
    with torch.no_grad():
        for _ in tqdm(range(len(valid_loader.data) // batch_size)):
            batch, label = valid_loader.next_batch(batch_size, device)
            optimizer.zero_grad()
            output = model(batch)
            loss = criterion(output, label.to(device))
            valid_loss += loss.item()
            valid_acc += (label.to(device) == output.argmax(axis=1)).sum().item()
            

    print(f"Train Loss: {train_loss / (len(train_loader.data) * batch_size)}, "
          f"Valid Loss: {valid_loss / (len(valid_loader.data) * batch_size)}, "
          f"Valid Acc: {valid_acc / (len(valid_loader.data) * batch_size)}")
    
        
# Testing
test_acc = 0
model.eval()
with torch.no_grad():
    for _ in tqdm(range(len(valid_loader.data) // batch_size)):
        batch, label = test_loader.next_batch(batch_size, device)
        output = model(batch.to(device))
        test_acc += (label.to(device) == output.argmax(axis=1)).sum().item()
        
print(f"\n\nTest Acc: {test_acc / len(test_loader.data)}")

  0%|          | 0/109 [00:00<?, ?it/s]

  0%|          | 0/15 [00:00<?, ?it/s]

Train Loss: 0.0021941091255950076, Valid Loss: 0.0021968719586730003, Valid Acc: 0.01225


  0%|          | 0/109 [00:00<?, ?it/s]

  0%|          | 0/15 [00:00<?, ?it/s]

Train Loss: 0.0009111301877668925, Valid Loss: 0.001591060671955347, Valid Acc: 0.017375


  0%|          | 0/109 [00:00<?, ?it/s]

KeyboardInterrupt: ignored

In [45]:
output

tensor([[-10.6408,  10.4873,   5.7788],
        [ -5.9964,  11.4990,   0.1581],
        [ -7.7907,  11.0476,   1.1530],
        [ -7.0995,   9.6965,   0.4297],
        [ -3.6649,   5.2626,   1.0697],
        [ -9.8489,  15.3502,   0.5224],
        [ -3.9091,  16.7425,   1.3721],
        [-12.2200,  14.2283,   1.1577],
        [ -5.2460,   7.4208,   2.0428],
        [-13.4973,  13.2735,  -1.1485],
        [ -5.0260,  10.0439,   1.9646],
        [ -3.7016,  11.8879,   2.7470],
        [-10.1612,  14.1866,   5.0858],
        [ -6.4035,  11.0872,   2.1518],
        [ -6.1621,   7.4851,   1.2029],
        [ -9.9743,  16.2831,  -0.6941],
        [ -5.8429,   7.5712,   2.8951],
        [ -4.6402,  13.4553,   4.8520],
        [ -3.1549,  13.6726,   0.7645],
        [-12.4719,  17.4741,   0.3005],
        [ -4.1342,   5.0192,   1.4031],
        [-10.6124,   9.1830,   5.4277],
        [ -8.6404,  13.1591,  -2.3923],
        [ -9.2389,   9.7900,   2.6508],
        [ -5.3542,  10.1725,  -0.0357],
