In [1]:
ls

[0m[01;34msample_data[0m/


In [2]:
import os
import sys
import logging
import shutil
import numpy as np
DATA_DIR='/content/drive/MyDrive/data_files/' #CREATE A FOLDER WHERE THE AUDIO FEATURES SHOULD BE STORED ALONG WITH THE CLIPS OF INDIVIDUAL USERS

In [3]:
!pip install python_speech_features

Collecting python_speech_features
  Downloading python_speech_features-0.6.tar.gz (5.6 kB)
Building wheels for collected packages: python-speech-features
  Building wheel for python-speech-features (setup.py) ... [?25l[?25hdone
  Created wheel for python-speech-features: filename=python_speech_features-0.6-py3-none-any.whl size=5888 sha256=f8ab89d86749109bf9b4ce0d1f50bdc7da26eeebeb18e80bc8191ff73951de5e
  Stored in directory: /root/.cache/pip/wheels/b0/0e/94/28cd6afa3cd5998a63eef99fe31777acd7d758f59cf24839eb
Successfully built python-speech-features
Installing collected packages: python-speech-features
Successfully installed python-speech-features-0.6


In [4]:
import librosa
import numpy as np
import python_speech_features as psf
from abc import abstractmethod
import torch
from torch import nn
import torch.nn.functional as F

In [5]:
def get_fbanks(audio_file):
    
    def normalize_frames(signal, epsilon=1e-12):
        return np.array([(v - np.mean(v)) / max(np.std(v), epsilon) for v in signal])

    y, sr = librosa.load(audio_file, sr=16000)
    assert sr == 16000

    trim_len = int(0.25 * sr)
    if y.shape[0] < 1 * sr:
        # if less than 1 seconds, don't use that audio
        return None
    

    y = y[trim_len:-trim_len]

    # frame width of 25 ms with a stride of 15 ms. This will have an overlap of 10s
    filter_banks, energies = psf.fbank(y, samplerate=sr, nfilt=64, winlen=0.025, winstep=0.01)
    filter_banks = normalize_frames(signal=filter_banks)

    filter_banks = filter_banks.reshape((filter_banks.shape[0], 64, 1))
    return filter_banks


In [6]:
def extract_fbanks(path):
    fbanks = get_fbanks(path)
    num_frames = fbanks.shape[0]

    # sample sets of 64 frames each

    numpy_arrays = []
    start = 0
    while start < num_frames + 64:
        slice_ = fbanks[start:start + 64]
        if slice_ is not None and slice_.shape[0] == 64:
            assert slice_.shape[0] == 64
            assert slice_.shape[1] == 64
            assert slice_.shape[2] == 1

            slice_ = np.moveaxis(slice_, 2, 0)
            slice_ = slice_.reshape((1, 1, 64, 64))
            numpy_arrays.append(slice_)
        start = start + 64

    print('num samples extracted: {}'.format(len(numpy_arrays)))
    return np.concatenate(numpy_arrays, axis=0)

In [7]:
#Calculate and returns the delta of given feature vector matrix
def calculate_delta(array):
    rows,cols = array.shape
    deltas = np.zeros((rows,20))
    N = 2
    for i in range(rows):
        index = []
        j = 1
        while j <= N:
            if i-j < 0:
                first = 0
            else:
                first = i-j
            if i+j > rows -1:
                second = rows -1
            else:
                second = i+j
            index.append((second,first))
            j+=1
        deltas[i] = ( array[index[0][0]]-array[index[0][1]] + (2 * (array[index[1][0]]-array[index[1][1]])) ) / 10
    return deltas

#convert audio to mfcc features
def extract_features(audio,rate):    
    mfcc_feat = mfcc.mfcc(audio,rate, 0.025, 0.01,20,appendEnergy = True, nfft=1103)
    mfcc_feat = preprocessing.scale(mfcc_feat)
    delta = calculate_delta(mfcc_feat)
    print(delta)

    #combining both mfcc features and delta
    combined = np.hstack((mfcc_feat,delta)) 
    return combined
    print(combined)

In [8]:
class FBankResBlock(nn.Module):

    def __init__(self, in_channels, out_channels, kernel_size, stride=1):
        super().__init__()
        padding = (kernel_size - 1) // 2
        self.network = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, padding=padding, stride=stride),
            nn.BatchNorm2d(in_channels),
            nn.ReLU(),
            nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=kernel_size, padding=padding, stride=stride),
            nn.BatchNorm2d(out_channels)
        )
        self.relu = nn.ReLU()

    def forward(self, x):
        out = self.network(x)
        out = out + x
        out = self.relu(out)
        return out

class FBankNet(nn.Module):

    def __init__(self):
        super().__init__()
        self.network = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5, padding=(5 - 1)//2, stride=2),
            FBankResBlock(in_channels=32, out_channels=32, kernel_size=3),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5, padding=(5 - 1)//2, stride=2),
            FBankResBlock(in_channels=64, out_channels=64, kernel_size=3),
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=5, padding=(5 - 1) // 2, stride=2),
            FBankResBlock(in_channels=128, out_channels=128, kernel_size=3),
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=5, padding=(5 - 1) // 2, stride=2),
            FBankResBlock(in_channels=256, out_channels=256, kernel_size=3),
            nn.AvgPool2d(kernel_size=4)
        )
        self.linear_layer = nn.Sequential(
            nn.Linear(256, 250)
        )

    @abstractmethod
    def forward(self, *input_):
        raise NotImplementedError('Call one of the subclasses of this class')

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [10]:
class FBankCrossEntropyNet(FBankNet):
    def __init__(self, reduction='mean'):
        super().__init__()
        self.loss_layer = nn.CrossEntropyLoss(reduction=reduction)

    def forward(self, x):
        n = x.shape[0]
        out = self.network(x)
        out = out.reshape(n, -1)
        out = self.linear_layer(out)
        return out

    def loss(self, predictions, labels):
        loss_val = self.loss_layer(predictions, labels)
        return loss_val

In [11]:
ls


[0m[01;34mdrive[0m/  [01;34msample_data[0m/


In [12]:
from scipy.io.wavfile import read
from sklearn import preprocessing
import python_speech_features as mfcc
#read test file
def mfcc_feature_extract(audio,sr):
  #sr,audio = read()
  folder = DATA_DIR + un +"/"
  fb = extract_fbanks(dest)
  dest = shutil.copy(audio,folder)
  embed=get_embeddings(fb)  
  print()  
# extract mfcc features
  vector =  extract_features('/content/drive/MyDrive/test set audio/millie_bobby.wav',16000)
  log_likelihood = np.zeros(len(embed.numpy)) 
  return vector
  print(log_likelihood)
 


In [13]:
def get_cosine_distance(a, b):
    a = torch.from_numpy(a)
    b = torch.from_numpy(b)
    return (1 - F.cosine_similarity(a, b)).numpy() #cosine dissimilarity

MODEL_PATH = '/content/drive/MyDrive/triplet_loss_trained_model.pth'#PATH TO THE .pth TRAINED MODEL 
model_instance = FBankCrossEntropyNet()
model_instance.load_state_dict(torch.load(MODEL_PATH, map_location=lambda storage, loc: storage))
model_instance = model_instance.double()
model_instance.eval()


def get_embeddings(x):
    x = torch.from_numpy(x)
    with torch.no_grad():
        embeddings = model_instance(x)
    return embeddings.numpy()

In [14]:
import os
def trai(un,audio):
    dir = DATA_DIR + un
    if not os.path.exists(dir):
        os.makedirs(dir)

    folder = DATA_DIR + un +"/"
    dest = shutil.copy(audio,folder)
    fb = extract_fbanks(dest)
    embed=get_embeddings(fb)
    print(embed.shape)
    mean_embed = np.mean(embed, axis=0)
    np.save(folder+'embeddings.npy', mean_embed)
    #Once this function is called , the audio file and the corresponding extracted Fbank features will be saved (as embeddings.npy) in the subdirectory named by the username which is inside the DIR mentioned above(data_files)
    print(embed)
    return embed

trai('Milly','/content/drive/MyDrive/test set audio/millie_bobby.wav') # provide username and corresponding voice filepath

num samples extracted: 239
(239, 250)
[[-15.02505005 -10.31992903   7.43854145 ... -19.16979343 -42.59248984
  -24.86764991]
 [ 40.5438627   -3.00685779  80.72839719 ... -32.1626715  -90.78324609
  -53.95828654]
 [ 25.6999827  -34.75219548  22.70737031 ... -42.85495609 -36.23036327
  -35.09802812]
 ...
 [-12.17522473  19.44586562  -5.09754773 ... -19.81713125  -7.72688135
  -23.92664872]
 [ 22.63363485   4.18643573  19.88879577 ...  -5.77168065 -28.66890852
    7.72692264]
 [-41.96184185 -20.5290955  -24.26242499 ... -28.58812491   8.73351376
   -5.99463302]]


array([[-15.02505005, -10.31992903,   7.43854145, ..., -19.16979343,
        -42.59248984, -24.86764991],
       [ 40.5438627 ,  -3.00685779,  80.72839719, ..., -32.1626715 ,
        -90.78324609, -53.95828654],
       [ 25.6999827 , -34.75219548,  22.70737031, ..., -42.85495609,
        -36.23036327, -35.09802812],
       ...,
       [-12.17522473,  19.44586562,  -5.09754773, ..., -19.81713125,
         -7.72688135, -23.92664872],
       [ 22.63363485,   4.18643573,  19.88879577, ...,  -5.77168065,
        -28.66890852,   7.72692264],
       [-41.96184185, -20.5290955 , -24.26242499, ..., -28.58812491,
          8.73351376,  -5.99463302]])

In [16]:
sound_emb = trai('Milly','/content/drive/MyDrive/test set audio/millie_bobby.wav') # provide username and corresponding voice filepath

num samples extracted: 239
(239, 250)
[[-15.02505005 -10.31992903   7.43854145 ... -19.16979343 -42.59248984
  -24.86764991]
 [ 40.5438627   -3.00685779  80.72839719 ... -32.1626715  -90.78324609
  -53.95828654]
 [ 25.6999827  -34.75219548  22.70737031 ... -42.85495609 -36.23036327
  -35.09802812]
 ...
 [-12.17522473  19.44586562  -5.09754773 ... -19.81713125  -7.72688135
  -23.92664872]
 [ 22.63363485   4.18643573  19.88879577 ...  -5.77168065 -28.66890852
    7.72692264]
 [-41.96184185 -20.5290955  -24.26242499 ... -28.58812491   8.73351376
   -5.99463302]]


In [17]:
sound_emb

array([[-15.02505005, -10.31992903,   7.43854145, ..., -19.16979343,
        -42.59248984, -24.86764991],
       [ 40.5438627 ,  -3.00685779,  80.72839719, ..., -32.1626715 ,
        -90.78324609, -53.95828654],
       [ 25.6999827 , -34.75219548,  22.70737031, ..., -42.85495609,
        -36.23036327, -35.09802812],
       ...,
       [-12.17522473,  19.44586562,  -5.09754773, ..., -19.81713125,
         -7.72688135, -23.92664872],
       [ 22.63363485,   4.18643573,  19.88879577, ...,  -5.77168065,
        -28.66890852,   7.72692264],
       [-41.96184185, -20.5290955 , -24.26242499, ..., -28.58812491,
          8.73351376,  -5.99463302]])

In [15]:
THRESHOLD = 0.5 #μAP + 3𝜎AP = 0.45
def tes(audio):
  f=0
  t_aud =audio
  t_fbanks = extract_fbanks(t_aud)
  t_embeddings = get_embeddings(t_fbanks)
  for root, dirs, files in os.walk(DATA_DIR):
        for folder in dirs:
            pa=os.path.join(root,folder)
            for a,b,c in os.walk(pa):
                if 'embeddings.npy' in c:   
                      em_pa=pa+"/embeddings.npy"   
                      st_emb =np.load(em_pa).reshape((1, -1))
                      distances = get_cosine_distance(t_embeddings, st_emb)
                      # print('mean distances', np.mean(distances), flush=True)   
                      positives = distances < THRESHOLD
                      positives_mean = np.mean(positives)
                      # print('positives mean: {}'.format(positives_mean), flush=True)
                      if positives_mean >= .65: 
                          print(f"HEY , I KNOW YOU MAN!!...YOU ARE MR.{folder},right?")
                          f=1
                          return
  if f==0:
      print("OH,NO !! I DON'T KNOW YOU! BUT YOU CAN DEFINITELY TRAIN ME TO RECOGNIZE YOU!!")
      return
                        

tes('/content/drive/MyDrive/test set audio/millie_bobby.wav')#Test voice clip

# This takes in the input voice , extracts the features , compares it with each and every embedding.np file of each subfolder/user and returns the ID of the user if the cosine dissimilarity is below a certain Threshold.
                




num samples extracted: 239
HEY , I KNOW YOU MAN!!...YOU ARE MR.Milly,right?


In [None]:
from sklearn.svm import SVC
metrics.plot_roc_curve(clf, X_test, y_test)

NameError: ignored

In [None]:
predictions = decision_tree_classifier.predict(X_test)
from sklearn.metrics import classification_report
print classification_report(y_test, predictions)

In [None]:
model1 = torch.load('/content/drive/MyDrive/triplet_loss_trained_model.pth')
model1 = TheModelClass(*args, **kwargs)
model1.load_state_dict(torch.load(PATH))
model1.eval()

In [None]:
model1.eval()    

In [None]:
embed = trai('Milly','/content/drive/MyDrive/test set audio/millie_bobby.wav') 
import torch.nn.functional as F
embed = F.normalize(embed, p=2, dim=1)
