Для своего эксперимента я выбрала статью Y-Vector: Multiscale Waveform Encoder for Speaker Embedding. В статье предложен новый  multi-scale waveform
encoder,который включает в себя три ветви, состоящие из сверточных слоёв,squeeze-and-excitation блоков и TDNN нейросетью.
Отличительной особенностью подхода, предложенного, в статье является то, что нейросеть построенная по представленной архитектуре позволяет извлекать фичи из необработанных данных, в то время как до этого они создавались вручную.
Для эксперимента я взяла датасет LIBRISPEECH и постаралась воспроизвести архитектуру из статьи : Multi-scale Filtering Layer,tf-SE Downsampling Block,Multi-level Feature Map Aggregation.
К сожалению в самом конце сталкнулась с ошибкой, котору пока что не могу разрешить, поэтому не удалось продолжить эксперимент.

  


In [67]:
from collections import Counter
from pathlib import Path
import torch
import glob
import torchaudio
import torch.nn as nn
import torchaudio.datasets
import torch.nn.functional as F
from torch import Tensor
#from torch.utils.data import Dataset
#from datasets import load_dataset, load_metric
import numpy as np
import torch.optim as optim
from torch.optim import SGD
#from transformers import get_sheduler
from tqdm.auto import tqdm
import logging
import math
import sys

In [54]:
train_loader = torchaudio.datasets.LIBRISPEECH("./", url="train-clean-100", download=True)
test_loader = torchaudio.datasets.LIBRISPEECH("./", url="test-clean", download=True)

In [55]:

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [56]:
class Fp32GroupNorm(nn.GroupNorm):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def forward(self, inputs):
        output = F.group_norm(
            inputs.float(),
            self.num_groups,
            self.weight.float() if self.weight is not None else None,
            self.bias.float() if self.bias is not None else None,
            self.eps,
        )
        return output.type_as(inputs)

class Fp32LayerNorm(nn.LayerNorm):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def forward(self, inputs):
        output = F.layer_norm(
            inputs.float(),
            self.normalized_shape,
            self.weight.float() if self.weight is not None else None,
            self.bias.float() if self.bias is not None else None,
            self.eps,
        )
        return output.type_as(inputs)

class TransposeLast(nn.Module):
    def __init__(self, deconstruct_idx=None):
        super().__init__()
        self.deconstruct_idx = deconstruct_idx

    def forward(self, x):
        if self.deconstruct_idx is not None:
            x = x[self.deconstruct_idx]
        return x.transpose(-2, -1)

def norm_block(is_layer_norm, dim, affine=True, is_instance_norm=False):
    if is_layer_norm:
        mod = nn.Sequential(
            TransposeLast(),
            Fp32LayerNorm(dim, elementwise_affine=affine),
            TransposeLast(),
        )
    else:
        if is_instance_norm:
            mod = Fp32GroupNorm(dim, dim, affine=False) # instance norm
        else:
            mod = Fp32GroupNorm(1, dim, affine=affine)  # layer norm

    return 

In [57]:
class Fp32GroupNorm(nn.GroupNorm):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def forward(self, inputs):
        output = F.group_norm(
            inputs.float(),
            self.num_groups,
            self.weight.float() if self.weight is not None else None,
            self.bias.float() if self.bias is not None else None,
            self.eps,
        )
        return output.type_as(inputs)

def norm_block(is_layer_norm, dim, affine=True, is_instance_norm=False):
    if is_layer_norm:
        mod = nn.Sequential(
            TransposeLast(),
            Fp32LayerNorm(dim, elementwise_affine=affine),
            TransposeLast(),
        )
    else:
        if is_instance_norm:
            mod = Fp32GroupNorm(dim, dim, affine=False) # instance norm
        else:
            mod = Fp32GroupNorm(1, dim, affine=affine)  # layer norm

    return mod

In [58]:
class SEBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.fgate = nn.Sequential(nn.Linear(channels, channels), nn.Sigmoid())
        self.tgate = nn.Sequential(nn.Linear(channels, 1), nn.Sigmoid())
    def forward(self, x):
        
        fg = self.fgate(x.mean(dim=-1))
        x = x * fg.unsqueeze(-1)
        
        tg = x.permute(0, 2, 1).contiguous().view(-1, x.shape[1])
        tg = self.tgate(tg).view(x.shape[0], x.shape[2]).unsqueeze(1)
        out = x * tg
        return out

In [59]:
class TDNNLayer(nn.Module):
    
    def __init__(self, input_dim, output_dim,
                 context_size, dilation=1):
        '''
        TDNN as defined by https://www.danielpovey.com/files/2015_interspeech_multisplice.pdf
        Affine transformation not applied globally to all frames but smaller windows with local context
        batch_norm: True to include batch normalisation after the non linearity
        
        Context size and dilation determine the frames selected
        (although context size is not really defined in the traditional sense)
        For example:
            context size 5 and dilation 1 is equivalent to [-2,-1,0,1,2]
            context size 3 and dilation 2 is equivalent to [-2, 0, 2]
            context size 1 and dilation 1 is equivalent to [0]
        '''
        super(TDNNLayer, self).__init__()
        self.context_size = context_size
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.dilation = dilation
        self.kernel = nn.Linear(input_dim*context_size, output_dim)
        
    def forward(self, inputs):
        '''
        input: size (batch, input_features, seq_len)
        outpu: size (batch, new_seq_len, output_features)
        '''
        
        # ----------Convolution = unfold + matmul + fold
        x = inputs
        _, d, _ = x.shape
        assert (d == self.input_dim), 'Input dimension was wrong. Expected ({}), got ({})'.format(self.input_dim, d)
        x = x.unsqueeze(1)
        
        # Unfold input into smaller temporal contexts
        x = F.unfold(x, (self.input_dim, self.context_size), 
                     stride=(self.input_dim, 1), 
                     dilation=(1, self.dilation))

        # N, output_dim*context_size, new_t = x.shape
        x = x.transpose(1, 2)
        x = self.kernel(x) # matmul
        
        # transpose to channel first
        x = x.transpose(1, 2)

        return x
    

In [60]:
class FeatureExtractionModel(nn.Module):
    def __init__(self):
        non_affine_group_norm=False
        super(FeatureExtractionModel, self).__init__()
        self.branch1 = nn.Sequential(nn.Conv1d(1,90,12,16),
                                     norm_block(is_layer_norm=False, dim=90, affine=not non_affine_group_norm,
                                     is_instance_norm=True), 
                                     nn.ReLU(),
                                     nn.Conv1d(90,160,5,3),
                                     norm_block(is_layer_norm=False, dim=160, affine=not non_affine_group_norm,
                                     is_instance_norm=True), 
                                     nn.ReLU()
                                   )
        self.branch2 = nn.Sequential(nn.Conv1d(1,90,18,9),
                                     norm_block(is_layer_norm=False, dim=90, affine=not non_affine_group_norm,
                                     is_instance_norm=True), 
                                     nn.ReLU(),
                                     nn.Conv1d(90,160,5,2),
                                     norm_block(is_layer_norm=False, dim=160, affine=not non_affine_group_norm,
                           is_instance_norm=True), 
                                     nn.ReLU()
                                    )
        self.branch3 = nn.Sequential(nn.Conv1d(1,90, 36, 18),
                                     norm_block(is_layer_norm=False, dim=90, affine=not non_affine_group_norm,
                           is_instance_norm=True), 
                                     nn.ReLU(),
                                     nn.Conv1d(90,192, 5, 1,padding=2),
                                     norm_block(is_layer_norm=False, dim=160, affine=not non_affine_group_norm,
                           is_instance_norm=True), 
                                     nn.ReLU()
                                    )
       
        self.skip1 = nn.MaxPool1d(kernel_size=5, stride=8)
        self.skip2 = nn.MaxPool1d(kernel_size=3, stride=4, padding=1)

        def forward(self, x):
          # wave encoder
         enc = []
         ft_shape = []
         for conv in self.conv_front:
            enc.append(conv(x))
            ft_shape.append(conv(x).shape[-1])
            
            ft_max = np.min(np.array(ft_shape))
            enc = torch.cat((enc[0][:, :, :ft_max], enc[1][:, :, :ft_max], enc[2][:, :, :ft_max]), dim=1)
        
            skip1_out = self.skip1(enc)
            out1 = self.branch1(enc)
            skip2_out = self.skip2(out1)
            out2 = self.branch2(out1)
            out3= self.branch3(out2)
            t_max = np.min(np.array([skip1_out.shape[-1], skip2_out.shape[-1], out3.shape[-1]]))
            out = torch.cat((skip1_out[:, :, :t_max], skip2_out[:, :, :t_max], out3[:, :, :t_max]), dim=1)
            output = self.am4(out)
        
       
            return output

In [61]:
class TDNNBlock(nn.Module):
    
    def __init__(self, input_dim, bn_dim,
                 skip, context_size, dilation=1, 
                 bottleneck=False):
        '''
        TDNNBlock
        '''
        super(TDNNBlock, self).__init__()

        # bn conv
        self.bottleneck = bottleneck
        if bottleneck:
            self.bnconv1d = nn.Conv1d(input_dim, bn_dim, 1)
            self.nonlinear1 = nn.PReLU()
            self.norm1 = nn.GroupNorm(1, bn_dim, eps=1e-08)
            self.tdnnblock = TDNNLayer(bn_dim, input_dim, context_size, dilation)
        else:
            self.tdnnblock = TDNNLayer(input_dim, input_dim, context_size, dilation)
        
        # tdnn
        self.nonlinear2 = nn.PReLU()
        self.norm2 = nn.GroupNorm(1, input_dim, eps=1e-08)
        
        # skip connection
        self.skip = skip
        if self.skip:
            self.skip_out = nn.MaxPool1d(kernel_size=context_size, 
                                         stride=1, dilation=dilation)

    def forward(self, x):
        '''
        input: size (batch, seq_len, input_features)
        outpu: size (batch, new_seq_len, output_features)
        '''
        out = x
        if self.bottleneck:
            out = self.nonlinear1(self.bnconv1d(out))
            out = self.norm1(out)
        
        out = self.nonlinear2(self.tdnnblock(out))
        out = self.norm2(out)

        if self.skip:
            skip = self.skip_out(x)
            return out, skip
        else:
            return out

class TDNN(nn.Module):
    
    def __init__(self, filter_dim, input_dim, bn_dim,
                 skip, context_size=3, layer=9, stack=1, 
                 bottleneck=False):
        '''
        stacked TDNN Blocks
        '''
        super(TDNN, self).__init__()
        
#         # BottleNeck Layer
#         self.LN = nn.GroupNorm(1, filter_dim, eps=1e-8)
#         self.BN_conv = nn.Conv1d(filter_dim, input_dim, 1)
        
        # Residual Connection
        self.skip = skip
        
        # TDNN for feature extraction
        self.receptive_field = 0
        
        self.tdnn = nn.ModuleList([])
        for s in range(stack):
            for i in range(layer):
                self.tdnn.append(TDNNBlock(input_dim, bn_dim, self.skip, 
                                           context_size=3, dilation=2**i, 
                                           bottleneck=bottleneck))
                
            if i == 0 and s == 0:
                self.receptive_field += context_size
            else:
                self.receptive_field += (context_size - 1) * 2 ** i
                
        print("Receptive field: {:3d} frames.".format(self.receptive_field))
        
        
    def forward(self, x):
        '''
        input: size (batch, seq_len, input_features)
        outpu: size (batch, new_seq_len, output_features)
        '''
        
#         output = self.BN_conv(self.LN(x))
        
        for i in range(len(self.tdnn)):
            if self.skip:
                output, skips = self.tdnn[i](x)
                output = skips + output
            else:
                output = self.tdnn[i](output)

        return output

In [62]:
class TDNN_Block(nn.Module):
    def __init__(self, input_dim, output_dim=512, context_size=5, dilation=1, norm='bn', affine=True):
        super(TDNN_Block, self).__init__()
        if norm == 'bn':
            norm_layer = nn.BatchNorm1d(output_dim, affine=affine)
        elif norm == 'ln':
#             norm_layer = nn.GroupNorm(1, output_dim, affine=affine)
            norm_layer = Fp32GroupNorm(1, output_dim, affine=affine)
        elif norm == 'in':
            norm_layer = nn.GroupNorm(output_dim, output_dim, affine=False)
        else:
            raise ValueError('Norm should be {bn, ln, in}.')
        self.tdnn_layer = nn.Sequential(
            TDNNLayer(input_dim, output_dim, context_size, dilation),
            norm_layer,
            nn.ReLU()
        )
    def forward(self, x):
        return self.tdnn_layer(x)

In [63]:
class Downsampling(nn.Module):
  def __init__(self):
      super(Downsampling, self).__init__()
      self.DS1= nn.Sequential(nn.Conv1d(512, 5, 2),
                              nn.ReLU()
                              )
                               
                               
      self.DS1= nn.Sequential(nn.Conv1d(512, 3, 2),
                              nn.ReLU())
      self.DS1= nn.Sequential(nn.Conv1d(512, 3, 2),
                                  nn.ReLU())
      self.am2 = SEBlock(512)
      self.am3 = SEBlock(512)
      self.am4 = SEBlock(512*3)
        
      
  def forward(self, x):
      se=self.am2(x)
      x=self.am2(se)
      se=self.am2(se)
      x=self.DS1(se)
      out1 = self.am1(x)
      return out1
      

In [111]:
class FrameAgregation(nn.Module):
  def __init__(self, feature_dim=512, embed_dim=512, norm='bn', p_dropout=0.0):
        super(FrameAgregation, self).__init__()
        self.tdnn = nn.Sequential(
            TDNN_Block(feature_dim, 512, 5, 1, norm=norm),
            TDNN_Block(512, 512, 3, 2, norm=norm),
            TDNN_Block(512, 512, 3, 3, norm=norm),
            TDNN_Block(512, 512, 1, 1, norm=norm),
            TDNN_Block(512, 1500, 1, 1, norm=norm),
        )
        
        self.fc1 = nn.Linear(3000, 512)
        self.bn = nn.LayerNorm(512)
        self.dropout_fc1 = nn.Dropout(p=p_dropout)
        self.lrelu = nn.LeakyReLU(0.2)
        self.fc2 = nn.Linear(512, embed_dim)
    
def forward(self, x):
        # Note: x must be (batch_size, feat_dim, chunk_len)
        x = self.tdnn(x)
        
        stats = torch.cat((x.mean(dim=2), x.std(dim=2)), dim=1)
        
        x = self.dropout_fc1(self.lrelu(self.bn(self.fc1(stats))))
        x = self.fc2(x)
        
        return x


In [138]:
class architecture(nn.Module):
    def __init__(self, embed_dim=512):
        super(architecture, self).__init__()
        
        self.feature_encoder = FeatureExtractionModel()
        self.downsampler=Downsampling()
        self.aggregator = FrameAgregation(feature_dim=512*3, embed_dim=128, norm='ln')
    def forward(self, x):
        out = self.feature_encoder(x)
        out= self.downsampler(out)
        out = self.aggregator(out)
        
        return out

In [139]:
my_model=architecture()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(my_model.parameters(), lr=0.01, momentum=0.9)
epochs=20
batch_num =64



In [140]:
class IterMeter(object):
    """keeps track of total iterations"""
    def __init__(self):
        self.val = 0

    def step(self):
        self.val += 1

    def get(self):
        return self.val


def train(model, device, train_loader, criterion, optimizer, scheduler, epoch, iter_meter, experiment):
    model.train()
    data_len = len(train_loader.dataset)
    with experiment.train():
        for batch_idx, _data in enumerate(train_loader):
            spectrograms, labels, input_lengths, label_lengths = _data 
            spectrograms, labels = spectrograms.to(device), labels.to(device)

            optimizer.zero_grad()

            output = model(spectrograms)  # (batch, time, n_class)
            output = F.log_softmax(output, dim=2)
            output = output.transpose(0, 1) # (time, batch, n_class)

            loss = criterion(output, labels, input_lengths, label_lengths)
            loss.backward()

            experiment.log_metric('loss', loss.item(), step=iter_meter.get())
            experiment.log_metric('learning_rate', scheduler.get_lr(), step=iter_meter.get())

            optimizer.step()
            iter_meter.step()
            if batch_idx % 100 == 0 or batch_idx == data_len:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(spectrograms), data_len,
                    100. * batch_idx / len(train_loader), loss.item()))


def test(model, device, test_loader, criterion, epoch, iter_meter, experiment):
    print('\nevaluating…')
    model.eval()
    test_loss = 0
    test_cer, test_wer = [], []
    with experiment.test():
        with torch.no_grad():
            for I, _data in enumerate(test_loader):
                spectrograms, labels, input_lengths, label_lengths = _data 
                spectrograms, labels = spectrograms.to(device), labels.to(device)

                output = model(spectrograms)  # (batch, time, n_class)
                output = F.log_softmax(output, dim=2)
                output = output.transpose(0, 1) # (time, batch, n_class)

                loss = criterion(output, labels, input_lengths, label_lengths)
                test_loss += loss.item() / len(test_loader)
                                
    avg_cer = sum(test_cer)/len(test_cer)
    avg_wer = sum(test_wer)/len(test_wer)
    
    print('Test set: Average loss: {:.4f}, Average CER: {:4f} Average WER: {:.4f}\n'.format(test_loss, avg_cer, avg_wer))


In [141]:
def main(model,train_dataset,test_dataset,learning_rate= 0.01, batch_size=batch_num , epochs=epochs):
  use_cuda = torch.cuda.is_available()
      #torch.manual_seed(7) 
  model().to(device)
  print(model)
  print('Num Model Parameters', sum([param.nelement() for param in model.parameters()]))
  iter_meter = IterMeter()
  for epoch in range(1, epochs + 1):
        train(model, device, train_loader, criterion, optimizer, epoch, iter_meter)
        test(model, device, test_loader, criterion, epoch, iter_meter )


In [142]:
experement=main(my_model,train_loader,test_loader)

TypeError: ignored