# Preliminaries

In [1]:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [34]:
config = {
    'model_path':'/gdrive/MyDrive/arman/verification-models/neural_net2.pth',
    'voice_sample1':'/content/sample1.wav',
    'voice_sample2':'/content/sample2.wav',
}

In [2]:
!pip install wave
!pip install ffmpeg-python
!pip install speechbrain
!pip install torchaudio
!pip install librosa

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting wave
  Downloading Wave-0.0.2.zip (38 kB)
Building wheels for collected packages: wave
  Building wheel for wave (setup.py) ... [?25l[?25hdone
  Created wheel for wave: filename=Wave-0.0.2-py3-none-any.whl size=1240 sha256=16d62300c2088c919edb3bcbcdaef14adde657584f98c198a0891510d10041ee
  Stored in directory: /root/.cache/pip/wheels/25/e8/fe/458c7dac00c6abedad6380b9d0ef1a5cbc7c21807df1d30915
Successfully built wave
Installing collected packages: wave
Successfully installed wave-0.0.2
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ffmpeg-python
  Downloading ffmpeg_python-0.2.0-py3-none-any.whl (25 kB)
Installing collected packages: ffmpeg-python
Successfully installed ffmpeg-python-0.2.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting speechbrain
  Down

In [85]:
from speechbrain.lobes.augment import EnvCorrupt
import librosa
import soundfile as sf
import torch
from torch import nn
import speechbrain as sb
import torchaudio
from speechbrain.pretrained import SpectralMaskEnhancement
from speechbrain.pretrained import SpeakerRecognition
from IPython.display import HTML, Audio
from google.colab.output import eval_js
from base64 import b64decode
import json
import random
from scipy.io.wavfile import read as wav_read
from difflib import get_close_matches
import io
import scipy
import ffmpeg
import wave
import os
import numpy as np
import IPython.display as ipd
from collections import Counter

In [32]:
AUDIO_HTML = """
<script>
var my_div = document.createElement("DIV");
var my_p = document.createElement("P");
var my_btn = document.createElement("BUTTON");
var t = document.createTextNode("Press to start recording");

my_btn.appendChild(t);
//my_p.appendChild(my_btn);
my_div.appendChild(my_btn);
document.body.appendChild(my_div);

var base64data = 0;
var reader;
var recorder, gumStream;
var recordButton = my_btn;

var handleSuccess = function(stream) {
  gumStream = stream;
  var options = {
    //bitsPerSecond: 8000, //chrome seems to ignore, always 48k
    mimeType : 'audio/webm;codecs=opus'
    //mimeType : 'audio/webm;codecs=pcm'
  };            
  //recorder = new MediaRecorder(stream, options);
  recorder = new MediaRecorder(stream);
  recorder.ondataavailable = function(e) {            
    var url = URL.createObjectURL(e.data);
    var preview = document.createElement('audio');
    preview.controls = true;
    preview.src = url;
    document.body.appendChild(preview);

    reader = new FileReader();
    reader.readAsDataURL(e.data); 
    reader.onloadend = function() {
      base64data = reader.result;
      //console.log("Inside FileReader:" + base64data);
    }
  };
  recorder.start();
  };

recordButton.innerText = "Recording... press to stop";

navigator.mediaDevices.getUserMedia({audio: true}).then(handleSuccess);


function toggleRecording() {
  if (recorder && recorder.state == "recording") {
      recorder.stop();
      gumStream.getAudioTracks()[0].stop();
      recordButton.innerText = "Saving the recording... pls wait!"
  }
}

// https://stackoverflow.com/a/951057
function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

var data = new Promise(resolve=>{
//recordButton.addEventListener("click", toggleRecording);
recordButton.onclick = ()=>{
toggleRecording()

sleep(2000).then(() => {
  // wait 2000ms for the data to be available...
  // ideally this should use something like await...
  //console.log("Inside data:" + base64data)
  resolve(base64data.toString())

});

}
});
      
</script>
"""

def get_audio():
  display(HTML(AUDIO_HTML))
  data = eval_js("data")
  binary = b64decode(data.split(',')[1])
  
  process = (ffmpeg
    .input('pipe:0')
    .output('pipe:1', format='wav')
    .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True, quiet=True, overwrite_output=True)
  )
  output, err = process.communicate(input=binary)
  
  riff_chunk_size = len(output) - 8
  # Break up the chunk size into four bytes, held in b.
  q = riff_chunk_size
  b = []
  for i in range(4):
      q, r = divmod(q, 256)
      b.append(r)

  # Replace bytes 4:8 in proc.stdout with the actual size of the RIFF chunk.
  riff = output[:4] + bytes(b) + output[8:]

  sr, audio = wav_read(io.BytesIO(riff))

  return audio, sr

# Loading the Models

In [4]:
class SpeakerVerification(SpeakerRecognition):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.similarity = torch.nn.CosineSimilarity(dim=-1, eps=1e-6)

    @classmethod
    def from_hparams(cls, *args, **kwargs):
        verification = super(cls,cls).from_hparams(*args, **kwargs)
        source = kwargs['source']
        if os.path.exists(os.path.join(source, 'imposter_embeddings.pt')):
            verification.imp_emb = torch.load(os.path.join(source, 'imposter_embeddings.pt'))
        
        return verification
    
    def compute_snorm(self, emb1, emb2):
        emb1 = emb1.squeeze(0)
        emb2 = emb2.squeeze(0)
        score_e1 = self.similarity(emb1, self.imp_emb)
        score_e2 = self.similarity(emb1, self.imp_emb)
        score_e1_e2 = self.similarity(emb1, emb2)
        score_e1_normed = (score_e1_e2 - score_e1.mean()) / score_e1.std()
        score_e2_normed = (score_e1_e2 - score_e2.mean()) / score_e2.std()
        return score_e1_normed + score_e2_normed
          
    def verify_files(self, path_x, path_y, threshold=0.25, mean_norm=True, snorm=True):
        """Speaker verification with cosine distance
        Returns the score and the decision (0 different speakers,
        1 same speakers).
        Returns
        -------
        score
            The score associated to the binary verification output
            (cosine distance).
        prediction
            The prediction is 1 if the two signals in input are from the same
            speaker and 0 otherwise.
        """
        batch_x, _ = torchaudio.load(path_x)
        batch_y, _ = torchaudio.load(path_y)
        # Verify:
        emb1 = self.encode_batch(batch_x, normalize=mean_norm)
        emb2 = self.encode_batch(batch_y, normalize=mean_norm)
        # SNorm
        if snorm and hasattr(self, 'imp_emb'):
            score = self.compute_snorm(emb1, emb2)
        else:
            score = self.similarity(emb1, emb2)
        decision = score > threshold
        # Squeeze:
        return score[0], decision[0]
    def get_output_embeddings(self, path_x, path_y, threshold=0.25, mean_norm=True, snorm=True):
        batch_x, _ = torchaudio.load(path_x)
        batch_y, _ = torchaudio.load(path_y)
        # Verify:
        emb1 = self.encode_batch(batch_x, normalize=mean_norm)
        emb2 = self.encode_batch(batch_y, normalize=mean_norm)
        return emb1, emb2
    def get_speaker_vector(self, path_x, mean_norm=True, snorm=True):
        batch_x, _ = torchaudio.load(path_x)
        embedding = self.encode_batch(batch_x, normalize=mean_norm)[0][0].numpy()
        return embedding
    def get_input_vector(self, path_x, mean_norm=True, snorm=True):
        batch_x, _ = torchaudio.load(path_x)
        return batch_x.numpy()[0]

In [5]:
!rm -rf /content/temp_verification_model/
!mkdir /content/temp_verification_model/
!cp -r /gdrive/MyDrive/shared_space/ecapa-tdnn/* /content/temp_verification_model/

In [6]:
# Note: This needs a GPU!
verification_model = SpeakerVerification.from_hparams(source="/content/temp_verification_model/", hparams_file='hparams_inference.yaml', savedir="/tmp")

In [7]:
class my_neural_net(torch.nn.Module):
    def __init__(self):
        super(my_neural_net, self).__init__() 
        self.flatten = torch.nn.Flatten()
        self.first_layer = torch.nn.Sequential( 
            nn.Linear(2*192, 192),
            nn.ReLU(),
            nn.Linear(192, 1),
            nn.Sigmoid()
        )
    def forward(self, x):
        x = self.flatten(x)
        output = self.first_layer(x)
        return output

In [14]:
model = my_neural_net()
model.load_state_dict(torch.load(config['model_path']))
model.eval() # use this line if you have Dropout and BatchNormalization layers in your model

my_neural_net(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (first_layer): Sequential(
    (0): Linear(in_features=384, out_features=192, bias=True)
    (1): ReLU()
    (2): Linear(in_features=192, out_features=1, bias=True)
    (3): Sigmoid()
  )
)

In [65]:
def verify_pair(path1, path2):
  vec1 = verification_model.get_speaker_vector(path1)
  vec2 = verification_model.get_speaker_vector(path2)
  features = np.array([vec1, vec2]).reshape(1, 2, 192)
  output = model(torch.from_numpy(features).to(torch.float32))
  output = output.detach().squeeze().numpy()
  num_speakers = 1 if abs(output-1) < abs(output) else 2
  return f'the voices belong to {num_speakers} speaker(s) - score: {output}'

In [108]:
def verify_vectors(vec1, vec2):
  features = np.array([vec1, vec2]).reshape(1, 2, 192)
  output = model(torch.from_numpy(features).to(torch.float32))
  output = output.detach().squeeze().numpy()
  output = 1 if abs(output-1) < abs(output) else 0
  return output

#Recording an audio file using the current device's microphone

In [62]:
audio, sr = get_audio()
scipy.io.wavfile.write(config['voice_sample1'],sr, audio)

In [63]:
audio, sr = get_audio()
scipy.io.wavfile.write(config['voice_sample2'],sr, audio)

In [66]:
verify_pair(config['voice_sample1'], config['voice_sample2'])

'the voices belong to 1 speaker(s) - score: 1.0'

# Testing on our own data

In [69]:
list_of_file_names = [str(i)+'.wav' for i in range(1,26)]
list_of_file_names.extend(['26-1.wav','26-2.wav','26-3.wav','26-4.wav','26-5.wav'])
list_of_file_names.extend([str(i)+'.wav' for i in range(27, 41)])

In [70]:
list_of_speakers = ['amirinezhad','chavoshi','dehghanmonfared','dehghanpoor','hosseini','jafarpisheh',
                    'maghsoodloo','malekzadeh','ramezani','razavi','shayanfar']
list_of_speakers.extend(['woman'+str(i) for i in range(1, 10)])

In [71]:
our_path = '/gdrive/MyDrive/shared_space/car-commands/'
vectors = []
labels = []
for speaker in list_of_speakers:
  for file_name in list_of_file_names:
    file_path = f'{our_path}/{speaker}/{file_name}'
    curr_out = verification_model.get_speaker_vector(file_path)
    vectors.append(curr_out)
    labels.append(speaker)

In [74]:
len(vectors)

880

In [94]:
labels[550]

'woman2'

In [107]:
person_counter = Counter()
idx_counter = Counter()
num_iterations = 0
unique_men = set()
unique_women = set()
pos_counter = 0
neg_counter = 0

pair_vectors = []
pair_labels = []

while num_iterations <= 100000:
  num_iterations += 1
  print(f'\r iter:{num_iterations}, {len(unique_men)} u-men, {len(unique_women)} u-women, {pos_counter} pos, {neg_counter} neg', end=' ')

  idx1, idx2 = random.sample([i for i in range(880)], 2)
  person1 = labels[idx1]
  gender1 = 'female' if person1.startswith('woman') else 'male'

  if person_counter[person1] >= 20:
    continue
  if gender1 == 'male' and len(unique_women)>0 and len(unique_men)/len(unique_women) > 1.5:
    continue
  if gender1 == 'female' and len(unique_men)>0 and len(unique_women)/len(unique_men) > 1.5:
    continue

  person2 = labels[idx2]
  gender2 = 'female' if person1.startswith('woman') else 'male'

  if person_counter[person2] >= 20:
    continue
  if gender1 == 'male' and len(unique_women)>0 and len(unique_men)/len(unique_women) > 1.5:
    continue
  if gender1 == 'female' and len(unique_men)>0 and len(unique_women)/len(unique_men) > 1.5:
    continue

  if person1 == person2 and neg_counter > 0 and pos_counter / neg_counter > 1.5:
    continue
  if person1 != person2 and pos_counter > 0 and neg_counter / pos_counter > 1.5:
    continue

  # if everything is ok

  if gender1 == 'male':
    unique_men.add(person1)
  else:
    unique_women.add(person1)
  if gender2 == 'male':
    unique_men.add(person2)
  else:
    unique_women.add(person2)

  if person1 == person2:
    pos_counter += 1
  else:
    neg_counter += 1

  pair_vectors.append([vectors[idx1], vectors[idx2]])
  pair_labels.append(0 if person1 != person2 else 1)

 iter:100001, 20 u-men, 20 u-women, 4918 pos, 7378 neg 

In [112]:
pred_labels = [verify_vectors(pair[0], pair[1]) for pair in pair_vectors]

In [116]:
accuracy = sum([1 if true == pred else 0 for true, pred in zip(pair_labels, pred_labels)])/len(pair_labels)

In [119]:
print(f'Accuracy on our own dataset : {accuracy*100} %')

Accuracy on our own dataset : 82.31945348080677 %
