In [None]:
#Was created and launched on collab https://colab.research.google.com/drive/1_TdO-FcP1v4V9ltkZ-eiJo48M4lwfkUO?usp=sharing
#!pip install torchaudio
import torch
import numpy
import torchaudio
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import Parameter
import soundfile

from scipy.spatial import distance
from google.colab import drive





Collecting torchaudio
[?25l  Downloading https://files.pythonhosted.org/packages/aa/55/01ad9244bcd595e39cea5ce30726a7fe02fd963d07daeb136bfe7e23f0a5/torchaudio-0.8.1-cp37-cp37m-manylinux1_x86_64.whl (1.9MB)
[K     |▏                               | 10kB 18.0MB/s eta 0:00:01[K     |▍                               | 20kB 25.9MB/s eta 0:00:01[K     |▌                               | 30kB 26.6MB/s eta 0:00:01[K     |▊                               | 40kB 18.1MB/s eta 0:00:01[K     |▉                               | 51kB 14.0MB/s eta 0:00:01[K     |█                               | 61kB 11.2MB/s eta 0:00:01[K     |█▏                              | 71kB 12.5MB/s eta 0:00:01[K     |█▍                              | 81kB 13.6MB/s eta 0:00:01[K     |█▌                              | 92kB 11.9MB/s eta 0:00:01[K     |█▊                              | 102kB 12.9MB/s eta 0:00:01[K     |█▉                              | 112kB 12.9MB/s eta 0:00:01[K     |██                    

In [None]:
class SEBasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None, reduction=8):
        super(SEBasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.se = SELayer(planes, reduction)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.relu(out)
        out = self.bn1(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.se(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)
        return out


class SEBottleneck(nn.Module):
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, downsample=None, reduction=8):
        super(SEBottleneck, self).__init__()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, planes * 4, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes * 4)
        self.relu = nn.ReLU(inplace=True)
        self.se = SELayer(planes * 4, reduction)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)
        out = self.se(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out


class SELayer(nn.Module):
    def __init__(self, channel, reduction=8):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
                nn.Linear(channel, channel // reduction),
                nn.ReLU(inplace=True),
                nn.Linear(channel // reduction, channel),
                nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y

In [None]:
class PreEmphasis(torch.nn.Module):

    def __init__(self, coef: float = 0.97):
        super().__init__()
        self.coef = coef
        # make kernel
        # In pytorch, the convolution operation uses cross-correlation. So, filter is flipped.
        self.register_buffer(
            'flipped_filter', torch.FloatTensor([-self.coef, 1.]).unsqueeze(0).unsqueeze(0)
        )

    def forward(self, input: torch.tensor) -> torch.tensor:
        assert len(input.size()) == 2, 'The number of dimensions of input tensor must be 2!'
        # reflect padding to match lengths of in/out
        input = input.unsqueeze(1)
        input = F.pad(input, (1, 0), 'reflect')
        return F.conv1d(input, self.flipped_filter).squeeze(1)
        
class ResNetSE(nn.Module):
    def __init__(self, block, layers, num_filters, nOut, encoder_type='SAP', n_mels=40, log_input=True, **kwargs):
        super(ResNetSE, self).__init__()

        print('Embedding size is %d, encoder %s.'%(nOut, encoder_type))
        
        self.inplanes   = num_filters[0]
        self.encoder_type = encoder_type
        self.n_mels     = n_mels
        self.log_input  = log_input

        self.conv1 = nn.Conv2d(1, num_filters[0] , kernel_size=3, stride=1, padding=1)
        self.relu = nn.ReLU(inplace=True)
        self.bn1 = nn.BatchNorm2d(num_filters[0])
        

        self.layer1 = self._make_layer(block, num_filters[0], layers[0])
        self.layer2 = self._make_layer(block, num_filters[1], layers[1], stride=(2, 2))
        self.layer3 = self._make_layer(block, num_filters[2], layers[2], stride=(2, 2))
        self.layer4 = self._make_layer(block, num_filters[3], layers[3], stride=(2, 2))

        self.instancenorm   = nn.InstanceNorm1d(n_mels)
        self.torchfb        = torch.nn.Sequential(
                PreEmphasis(),
                torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_fft=512, win_length=400, hop_length=160, window_fn=torch.hamming_window, n_mels=n_mels)
                )

        outmap_size = int(self.n_mels/8)

        self.attention = nn.Sequential(
            nn.Conv1d(num_filters[3] * outmap_size, 128, kernel_size=1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Conv1d(128, num_filters[3] * outmap_size, kernel_size=1),
            nn.Softmax(dim=2),
            )

        if self.encoder_type == "SAP":
            out_dim = num_filters[3] * outmap_size
        elif self.encoder_type == "ASP":
            out_dim = num_filters[3] * outmap_size * 2
        else:
            raise ValueError('Undefined encoder')

        self.fc = nn.Linear(out_dim, nOut)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes * block.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def new_parameter(self, *size):
        out = nn.Parameter(torch.FloatTensor(*size))
        nn.init.xavier_normal_(out)
        return out

    def forward(self, x):

        with torch.no_grad():
            with torch.cuda.amp.autocast(enabled=False):
                x = self.torchfb(x)+1e-6
                if self.log_input: x = x.log()
                x = self.instancenorm(x).unsqueeze(1)

        x = self.conv1(x)
        x = self.relu(x)
        x = self.bn1(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = x.reshape(x.size()[0],-1,x.size()[-1])

        w = self.attention(x)

        if self.encoder_type == "SAP":
            x = torch.sum(x * w, dim=2)
        elif self.encoder_type == "ASP":
            mu = torch.sum(x * w, dim=2)
            sg = torch.sqrt( ( torch.sum((x**2) * w, dim=2) - mu**2 ).clamp(min=1e-5) )
            x = torch.cat((mu,sg),1)

        x = x.view(x.size()[0], -1)
        x = self.fc(x)

        return x


def MainModel(nOut=256, **kwargs):
    # Number of filters
    num_filters = [32, 64, 128, 256]
    model = ResNetSE(SEBasicBlock, [3, 4, 6, 3], num_filters, nOut, **kwargs)
    return model


In [None]:
model = MainModel(512, n_mels = 64, encoder_type='ASP')

loaded_state = torch.load("baseline_v2_ap.model", map_location=torch.device('cpu'))
len_S = len('__S__.')

d = loaded_state

d1 = {}

for k, v in d.items():
    d1[k[len_S:]] = v
d1.pop("softmax.fc.weight")
d1.pop("softmax.fc.bias")
d1.pop("angleproto.w")
d1.pop("angleproto.b")
model.load_state_dict(d1)
model.eval()

Embedding size is 512, encoder ASP.


ResNetSE(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu): ReLU(inplace=True)
  (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Sequential(
    (0): SEBasicBlock(
      (conv1): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (se): SELayer(
        (avg_pool): AdaptiveAvgPool2d(output_size=1)
        (fc): Sequential(
          (0): Linear(in_features=32, out_features=4, bias=True)
          (1): ReLU(inplace=True)
          (2): Linear(in_features=4, out_features=32, bias=True)
          (3): Sigmoid()
        )
      )
    )
    (1): SEBasicBlock(
      (c

In [None]:
def loadWAV(filename, max_frames, evalmode=True, num_eval=10):

    # Maximum audio length
    max_audio = max_frames * 160 + 240

    # Read wav file and convert to torch tensor
    audio, sample_rate = soundfile.read(filename)

    audiosize = audio.shape[0]

    if audiosize <= max_audio:
        shortage    = max_audio - audiosize + 1 
        audio       = numpy.pad(audio, (0, shortage), 'wrap')
        audiosize   = audio.shape[0]

    if evalmode:
        startframe = numpy.linspace(0,audiosize-max_audio,num=num_eval)
    else:
        startframe = numpy.array([numpy.int64(random.random()*(audiosize-max_audio))])
    
    feats = []
    if evalmode and max_frames == 0:
        feats.append(audio)
    else:
        for asf in startframe:
            feats.append(audio[int(asf):int(asf)+max_audio])

    feat = numpy.stack(feats,axis=0).astype(numpy.float)

    return feat;

In [None]:
#Trying to compare two speech fragments of the same person
feat1 = []
audio1 = loadWAV("female1.wav", 400, evalmode=True)
feat1.append(audio1)
feat1 = numpy.concatenate(feat1, axis=0)
tens1 = model.forward(torch.FloatTensor(feat1))

feat2 = []
audio2 = loadWAV("female2.wav", 400, evalmode=True)
feat2.append(audio2)
feat2 = numpy.concatenate(feat2, axis=0)
tens2 = model.forward(torch.FloatTensor(feat2))
res1 = distance.cosine(numpy.mean(tens1.detach().numpy(), axis = 0), numpy.mean(tens2.detach().numpy(), axis = 0))
txt = "Distance for same person's speech case is {}".format(str(res1))
print(txt)

#Trying to compare two speech fragments of different people
feat3 = []
audio3 = loadWAV("male1.wav", 400, evalmode=True)
feat3.append(audio3)
feat3 = numpy.concatenate(feat3, axis=0)
tens3 = model.forward(torch.FloatTensor(feat3))

res2 = distance.cosine(numpy.mean(tens1.detach().numpy(), axis = 0), numpy.mean(tens3.detach().numpy(), axis = 0))
txt = "Distance for different people's speech case is {}".format(str(res2))
print(txt)

Distance for same person's speech case is 0.03420382738113403
Distance for different people's speech case is 0.32118064165115356


In [None]:
class test_dataset_loader(Dataset):
    def __init__(self, test_list, test_path, eval_frames, num_eval, **kwargs):
        self.max_frames = eval_frames;
        self.num_eval = num_eval
        self.test_path = test_path
        self.test_list = test_list

    def __getitem__(self, index):
        audio = loadWAV(os.path.join(self.test_path, self.test_list[index]), 400, evalmode=True)
        return torch.FloatTensor(audio), self.test_list[index]

    def __len__(self):
        return len(self.test_list)


In [None]:
def evaluateFromList(model, test_list, test_path, nDataLoaderThread, print_interval=100, num_eval=10, **kwargs):
    rank = 0

    lines = []
    files = []
    feats = {}
    ## Read all lines
    with open(test_list) as f:
        lines = f.readlines()
    ## Get a list of unique file names
    files = list(itertools.chain(*[x.strip().split()[-2:] for x in lines]))
    setfiles = list(set(files))
    setfiles.sort()
    ## Define test data loader
    test_dataset = test_dataset_loader(setfiles, test_path, 2, num_eval=2, **kwargs)
    sampler = None

    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=1,
        shuffle=False,
        num_workers=0,
        drop_last=False,
        sampler=sampler
    )
    ## Extract features for every image
    for idx, data in enumerate(test_loader):
        print('HERE10')
        inp1 = data[0][0]
        inp2 = data[1][0]
        feat1 = []
        feat1.append(inp1)
        ref_feat = numpy.concatenate(feat1, axis=0)

        feats[data[1][0]] = ref_feat

    all_scores = [];
    all_labels = [];
    all_trials = [];

    if rank == 0:

        ## Read files and compute all scores
        for idx, line in enumerate(lines):

            data = line.split();

            ref_feat = feats[data[1]]
            com_feat = feats[data[2]]
            tensRef = model.forward(torch.FloatTensor(ref_feat))
            tensCom = model.forward(torch.FloatTensor(com_feat))

            dist = distance.cosine(numpy.mean(tensRef.detach().numpy(), axis=0),
                                   numpy.mean(tensCom.detach().numpy(), axis=0))

            score = dist

            all_scores.append(score)
            all_labels.append(int(data[0]))

    return (all_scores, all_labels);



In [None]:
scores, labels = evaluateFromList(model, "test_list1.txt", "vox1_test_wav/wav", 5)
scores = np.array(scores)
labels = np.array(labels)
boolLabels = labels > 0
plotArr = numpy.array([labels, scores]).transpose()
plotArr0 = scores[boolLabels == False]
plotArr1 = scores[boolLabels]

bins = numpy.linspace(0, 1.5, 2000)
pyplot.hist(plotArr0, density=True, bins=bins, label='False')
pyplot.hist(plotArr1, density=True, bins=bins, label='True')
pyplot.ylabel('Amount of samples')
pyplot.xlabel('Distance');
pyplot.legend(loc='upper right')
pyplot.show()
