In [None]:
from scipy import signal
import matplotlib.pyplot as plt
from numpy import *
import numpy as np
from scipy.io.wavfile import write, read
import control
import os
from scipy.fftpack import fft, dct
from scipy.cluster.vq import vq, kmeans, whiten
from python_speech_features import mfcc #To compare
from sklearn.metrics import confusion_matrix
import seaborn as sns
import random

from IPython import display

In [None]:
def pre_emphasis(x, alpha):
    y=np.zeros((x.size,))
    y[0]=x[0]
    for i in range(1,x.size):
        y[i]=x[i]-alpha*x[i-1]
    return y

def get_STE(signal, nFrames, sampsPerFrame):
    STEs = []  # list of short-time energies
    for k in range(nFrames):
        startIdx = k*sampsPerFrame
        stopIdx = startIdx + sampsPerFrame
        window = zeros(signal.shape)
        window[startIdx:stopIdx] = np.hamming(sampsPerFrame)  # Hamming window
        STE = sum((signal**2)*(window**2))
        STEs.append(STE)
    return STEs

def end_pointer(signal, STEs, n):
    folder='./cut_gender_separated_data/Female/'
    digits = ['0','0','1','1','2','2','3','3','4','4','5','5',
             '6','6','7','7','8','8','9','9']
    width_phone = 18  #For simplicity fix length of all phones 
    frame=8  #Start analysis from this frame
    count=0
    while(frame<len(STEs)):
        if(STEs[frame]>=0.5):  #When over a threshold 
            sound=signal1[(frame-2)*sampsPerFrame:(frame+width_phone)*sampsPerFrame]  #Extract signal of width 20 frames
            write(folder + digits[count] + '/digit_{}_{}_{}.wav'.format(digits[count], n, count%2), 8000, sound)
            frame+=27  #Skip certain amount onto further analysis
            count+=1
        else: frame+=1
    print(count)
    
def mel_freq(f):
    return 2595*np.log10(1 + (f/700))

def mel_inv(f):
    return 700*(10**(f/2595)-1)

def mel_bins(f_start, f_end, nfilts, fs):
    f_start_mel = mel_freq(f_start)
    f_end_mel = mel_freq(f_end)
    f=np.linspace(f_start_mel, f_end_mel, nfilts+2)
    for i in range(f.shape[0]):
        f[i]=mel_inv(f[i])
        f[i]=np.floor((257)*f[i]/fs)
    return f

def mel_filterbank(f, nfilts):
    fbank = np.zeros([nfilts,512//2+1])
    for m in range(nfilts):
        for k in range(int(f[m]), int(f[m+1])):
            fbank[m,k] = (k - f[m]) / (f[m+1]-f[m])
        for k in range(int(f[m+1]), int(f[m+2])):
            fbank[m,k] = (f[m+2]-k) / (f[m+2]-f[m+1])
    return fbank

def feature_extractor(digit):
    features = []  # list of MFCC features
    fs=8000
    nfilts=26
    
    window_length = 10  #in ms
    sampsPerFrame = int(window_length*fs/1000)
    nFrames = int(len(digit)/sampsPerFrame)
    
    for k in range(nFrames):
        startIdx = k*sampsPerFrame
        stopIdx = startIdx + sampsPerFrame
        window = np.hamming(sampsPerFrame)  # Hamming window
        frame = window*digit[startIdx:stopIdx]
        frame_fft = np.fft.fft(frame, n=512) #Compute a 512 point DFT
        p=(np.abs(frame_fft)**2)/frame.shape[0] #Compute the Periodogram
        
        bins = mel_bins(300, 4000, nfilts, fs) 
        fbank = mel_filterbank(bins, nfilts)  #Creating the Mel Filterbank
        c=np.dot(p[:257],fbank.T) 
        c=20*log10(c)  
        c=dct(c)  #Discrete Cosine Transform
        features.append(c[:13])

    return features

def get_accuracy(actual, predicted):
    correct = 0
    for i in range(len(actual)):
        if actual[i] == predicted[i]:
            correct += 1
    return 100.0*correct/len(actual)

def euclidean_distance(v1, v2):
    distance = 0.0
    for i in range(v1.shape[0]):
        distance += (v1[i] - v2[i])**2
    return sqrt(distance)

def vq_codebook(codebook, clusters):
    vq_cb=np.zeros((codebook.shape[0],clusters,codebook.shape[2]))
    for i in range(codebook.shape[0]):
        vq_cb[i] = kmeans(codebook[i],clusters)[0]
    return vq_cb

In [None]:
######### QUESTION 1 #########

directory = './CA4-data/'
done=0
for file in os.listdir(directory):
    fs, signal1 = read(directory+file)
    signal = signal1/max(abs(signal1))  #Scale the audio file

    # For a window with sampling frequency fs
    window_length = 30  #in ms
    sampsPerFrame = int(window_length*fs/1000)
    nFrames = int(len(signal)/sampsPerFrame)
    
    # Get the STE of the input signal
    STEs=get_STE(signal, nFrames, sampsPerFrame)
    
    # Cut the digits and save in the respective folder
    end_pointer(signal1, STEs, done)
    
    done+=1
    print("Done: ", done, file)

In [None]:
file='./CA4-data/zero_to_nine_Shradha1.wav'

fs, signal1 = read(file)
signal = signal1/max(abs(signal1))  #Scale the audio file

# For a window with sampling frequency fs
window_length = 30  #in ms
sampsPerFrame = int(window_length*fs/1000)
nFrames = int(len(signal)/sampsPerFrame)

STEs=get_STE(signal, nFrames, sampsPerFrame)
plt.plot(STEs[:15])
plt.title('Short-Time Energy')
plt.ylabel('ENERGY')
plt.xlabel('FRAME')

width_phone=18
frame=7
count=0
while(frame<len(STEs)):
    if(STEs[frame]>=0.5):
        count+=1
        sound=signal1[(frame-2)*sampsPerFrame:(frame+width_phone)*sampsPerFrame]
        write('./sounds/test_{}.wav'.format(frame), 8000, sound)
        frame+=27
    else: frame+=1
print(count)

In [None]:
########### Question 2 ###########

file1 ='./cut_dataset/7/digit_7_0_0.wav'

fs, digit1 = read(file1)
digit = digit1/max(abs(digit1))  #Scale the audio file

features=np.array(feature_extractor(digit))

In [None]:
########### Question 3 ##########
# Creating a codebook with training data
directory = './cut_dataset/'
codebook = np.zeros((10,108*60,13))
for label in range(10):  #Iterate over each digit
    folder = os.path.join(directory,str(label)+'/')
    i=0
    for file in os.listdir(folder):
        fs, digit1 = read(folder+file)
        digit = digit1/max(abs(digit1))  #Scale the audio file
        features=np.array(feature_extractor(digit))
        codebook[label,i*60:(i+1)*60,:] = features
        i+=1
    print('done', label)

In [None]:
# Splits a codebook with all data into training and test data
# Test ranges from 0 to 27
def split_codebook(codebook, test): 
    train_codebook = np.zeros((codebook.shape[0],codebook.shape[1]-(4*60),codebook.shape[2]))
    testset = np.zeros((codebook.shape[0],4*60,codebook.shape[2]))
    for label in range(10):  #Iterate over each digit
        i=0
        j=0
        for count in range(codebook.shape[1]//60):
            if(count < 4*test or count >= 4*test+4): #if does not belong to test set add to codebook
                train_codebook[label,i*60:(i+1)*60,:] = codebook[label,count*60:(count+1)*60,:]
                i+=1
            else: #add to test set
                testset[label,j*60:(j+1)*60,:] = codebook[label,count*60:(count+1)*60,:]
                j+=1             
    return codebook, testset

In [None]:
def predict_with_features(features, vq_cb):
    min_distance=100000000 
    pred=10
    for label in range(vq_cb.shape[0]):  #Iterate overall all digit vector clouds
        d_label=0
        for i in range(features.shape[0]):  #For the ith frame of test sample
            d_min=10000000
            for k in range(vq_cb.shape[1]):  #Compare it with the K vectors per digit
                d = euclidean_distance(features[i],vq_cb[label,k])  #Calculate Euclidean distance
                if(d<d_min):
                    d_min=d
            d_label +=d_min  #Accumulate minimum Euclidean distances for all frames of test sample
        d_label/=60  #Average out 
        if(d_label < min_distance):
            min_distance = d_label
            pred = label
    return pred

In [None]:
accuracy_list = []
k_list = [4, 8, 16, 64]
for K in k_list:
    pred_list = []
    target_list = []
    for test in range(27):
        cb, testset = split_codebook(codebook, test)
        cb = vq_codebook(cb, K)
        directory = './cut_dataset/'
        for label in range(10):  #Iterate over each digit
            for i in range(4):
                test_features = testset[label, i*60:(i+1)*60:]
                pred = predict_with_features(test_features, cb)
                pred_list.append(pred)
                target_list.append(label)
                total+=1
    accuracy = get_accuracy(target_list, pred_list)
    accuracy_list.append(accuracy)    
    print('For {} clusters:'.format(K), "Accuracy = ", accuracy)
    conf = confusion_matrix(target_list, pred_list)
    print(conf)
    
fig = plt.figure()
plt.title("Accuracy vs K clusters")
plt.xlabel('K clusters')
plt.ylabel('Accuracy')
plt.plot(k_list, accuracy_list)

fig = plt.figure()
plt.title("WER (%) vs K clusters")
plt.xlabel('K clusters')
plt.ylabel('WER')
plt.plot(k_list, list(np.array([100,100,100,100])-np.array(accuracy_list)))

In [None]:
def get_dynamic_features(digit):
    mfcc_features = np.array(feature_extractor(digit))
    dynamic_features = np.zeros((mfcc_features.shape[0], mfcc_features.shape[1]*3))
    dynamic_features[:,:13] = mfcc_features
    dynamic_features[:,13:26] = mfcc_features
    dynamic_features[:,26:39] = mfcc_features
    for j in range(1,12):
        dynamic_features[:,13+j] = (mfcc_features[:,j+1] - mfcc_features[:,j-1])/2.0
    for j in range(2,11):
        dynamic_features[:,26+j] = (mfcc_features[:,j+2] - mfcc_features[:,j-2])/2.0
    return dynamic_features

In [None]:
list(get_dynamic_features(digit))[0]

In [None]:
####### Question 4 ############
# Creating a codebook with 39 features + Pre Emphasis
directory = './cut_dataset/'
codebook_1 = np.zeros((10,108*60,39))
for label in range(10):  #Iterate over each digit
    folder = os.path.join(directory,str(label)+'/')
    i=0
    for file in os.listdir(folder):
        fs, digit1 = read(folder+file)
        digit = digit1/max(abs(digit1))  #Scale the audio file
        digit = pre_emphasis(digit, 0.9)  #Pre emphasis of 0.9 alpha
        features=get_dynamic_features(digit)
        codebook_1[label,i*60:(i+1)*60,:] = features
        i+=1
    print('done', label)

pred_list = []
target_list = []
for test in range(27):
    cb, testset = split_codebook(codebook_1, test)
    cb = vq_codebook(cb, 64)
    directory = './cut_dataset/'
    for label in range(10):  #Iterate over each digit
        for i in range(4):
            test_features = testset[label, i*60:(i+1)*60:]
            pred = predict_with_features(test_features, cb)
            pred_list.append(pred)
            target_list.append(label)
            total+=1
accuracy = get_accuracy(target_list, pred_list)  
print("\nAccuracy = ", accuracy)
conf = confusion_matrix(target_list, pred_list)
print(conf)

In [None]:
# Creating a codebook with 13 features on PRE EMPHASISED SOUND
directory = './cut_dataset/'
codebook_2 = np.zeros((10,108*60,13))
for label in range(10):  #Iterate over each digit
    folder = os.path.join(directory,str(label)+'/')
    i=0
    for file in os.listdir(folder):
        fs, digit1 = read(folder+file)
        digit = digit1/max(abs(digit1))  #Scale the audio file
        digit = pre_emphasis(digit, 0.9)  #Pre emphasis of 0.9 alpha
        features=np.array(feature_extractor(digit))
        codebook_2[label,i*60:(i+1)*60,:] = features
        i+=1
    print('done', label)

pred_list = []
target_list = []
for test in range(27):
    cb, testset = split_codebook(codebook_2, test)
    cb = vq_codebook(cb, 64)
    directory = './cut_dataset/'
    for label in range(10):  #Iterate over each digit
        for i in range(4):
            test_features = testset[label, i*60:(i+1)*60:]
            pred = predict_with_features(test_features, cb)
            pred_list.append(pred)
            target_list.append(label)
            total+=1
accuracy = get_accuracy(target_list, pred_list)  
print("\nAccuracy = ", accuracy)
conf = confusion_matrix(target_list, pred_list)
print(conf)

In [None]:
# Creating a codebook with 13 dynamic features (delta delta mfccs)
directory = './cut_dataset/'
codebook_3 = np.zeros((10,108*60,13))
for label in range(10):  #Iterate over each digit
    folder = os.path.join(directory,str(label)+'/')
    i=0
    for file in os.listdir(folder):
        fs, digit1 = read(folder+file)
        digit = digit1/max(abs(digit1))  #Scale the audio file
        features=get_dynamic_features(digit)[:,26:39]
        codebook_3[label,i*60:(i+1)*60,:] = features
        i+=1
    print('done', label)

pred_list = []
target_list = []
for test in range(27):
    cb, testset = split_codebook(codebook_3, test)
    cb = vq_codebook(cb, 64)
    directory = './cut_dataset/'
    for label in range(10):  #Iterate over each digit
        for i in range(4):
            test_features = testset[label, i*60:(i+1)*60:]
            pred = predict_with_features(test_features, cb)
            pred_list.append(pred)
            target_list.append(label)
            total+=1
accuracy = get_accuracy(target_list, pred_list)  
print("\nAccuracy = ", accuracy)
conf = confusion_matrix(target_list, pred_list)
print(conf)

In [None]:
#### Gender based separation in data

directory = './Digits_female/'
done=0
for file in os.listdir(directory):
    fs, signal1 = read(directory+file)
    signal = signal1/max(abs(signal1))  #Scale the audio file

    # For a window with sampling frequency fs
    window_length = 30  #in ms
    sampsPerFrame = int(window_length*fs/1000)
    nFrames = int(len(signal)/sampsPerFrame)
    
    # Get the STE of the input signal
    STEs=get_STE(signal, nFrames, sampsPerFrame)
    
    # Cut the digits and save in the respective folder
    end_pointer(signal1, STEs, done)
    
    done+=1
    print("Done: ", done, file)

In [None]:
directory = './cut_gender_separated_data/Male'
codebook_male = np.zeros((10,68*60,13))
for label in range(10):  #Iterate over each digit
    folder = os.path.join(directory,str(label)+'/')
    i=0
    for file in os.listdir(folder):
        fs, digit1 = read(folder+file)
        digit = digit1/max(abs(digit1))  #Scale the audio file
        features=np.array(feature_extractor(digit))
        codebook_male[label,i*60:(i+1)*60,:] = features
        i+=1
    print('done', label)

directory = './cut_gender_separated_data/Female'
codebook_female = np.zeros((10,40*60,13))
for label in range(10):  #Iterate over each digit
    folder = os.path.join(directory,str(label)+'/')
    i=0
    for file in os.listdir(folder):
        fs, digit1 = read(folder+file)
        digit = digit1/max(abs(digit1))  #Scale the audio file
        features=np.array(feature_extractor(digit))
        codebook_female[label,i*60:(i+1)*60,:] = features
        i+=1
    print('done', label)

In [None]:
#### Training data: Males, Testing data: Males ########
print('Training data: Males, Testing data: Males\n')
pred_list = []
target_list = []
for test in range(17):
    cb, testset = split_codebook(codebook_male, test)
    cb = vq_codebook(cb, 64)
    directory = './cut_dataset/'
    for label in range(10):  #Iterate over each digit
        for i in range(4):
            test_features = testset[label, i*60:(i+1)*60:]
            pred = predict_with_features(test_features, cb)
            pred_list.append(pred)
            target_list.append(label)
            total+=1
accuracy = get_accuracy(target_list, pred_list)  
print("\nAccuracy = ", accuracy)
conf = confusion_matrix(target_list, pred_list)
print(conf)

#### Training data: Females, Testing data: Females ########
print('\nTraining data: Females, Testing data: Females\n')
pred_list = []
target_list = []
for test in range(10):
    cb, testset = split_codebook(codebook_female, test)
    cb = vq_codebook(cb, 64)
    directory = './cut_dataset/'
    for label in range(10):  #Iterate over each digit
        for i in range(4):
            test_features = testset[label, i*60:(i+1)*60:]
            pred = predict_with_features(test_features, cb)
            pred_list.append(pred)
            target_list.append(label)
            total+=1
accuracy = get_accuracy(target_list, pred_list)  
print("\nAccuracy = ", accuracy)
conf = confusion_matrix(target_list, pred_list)
print(conf)

In [None]:
#### Training data: Males, Testing data: Females ########
print('Training data: Males, Testing data: Females\n')
pred_list = []
target_list = []
for test in range(10):
    _, testset = split_codebook(codebook_female, test)
    cb = codebook_male
    cb = vq_codebook(cb, 64)
    directory = './cut_dataset/'
    for label in range(10):  #Iterate over each digit
        for i in range(4):
            test_features = testset[label, i*60:(i+1)*60:]
            pred = predict_with_features(test_features, cb)
            pred_list.append(pred)
            target_list.append(label)
            total+=1
accuracy = get_accuracy(target_list, pred_list)  
print("\nAccuracy = ", accuracy)
conf = confusion_matrix(target_list, pred_list)
print(conf)

#### Training data: Females, Testing data: Males ########
print('Training data: Females, Testing data: Males\n')
pred_list = []
target_list = []
for test in range(17):
    _, testset = split_codebook(codebook_male, test)
    cb = codebook_female
    cb = vq_codebook(cb, 64)
    directory = './cut_dataset/'
    for label in range(10):  #Iterate over each digit
        for i in range(4):
            test_features = testset[label, i*60:(i+1)*60:]
            pred = predict_with_features(test_features, cb)
            pred_list.append(pred)
            target_list.append(label)
            total+=1
accuracy = get_accuracy(target_list, pred_list)  
print("\nAccuracy = ", accuracy)
conf = confusion_matrix(target_list, pred_list)
print(conf)

In [None]:
def distance_cost_plot(distances):
    im = plt.imshow(distances, interpolation='nearest', cmap='Blues') 
    plt.gca().invert_yaxis()
    plt.xlabel("Template Features")
    plt.ylabel("Test Features")
    plt.grid()
    plt.colorbar();

In [None]:
def path_cost(x, y, accumulated_cost, distances):
    #Maps frames between x and y for minimum cost
    path = [[len(x)-1, len(y)-1]]
    cost = 0
    i = len(x)-1
    j = len(y)-1
    while i>0 and j>0:
        if i==0:
            j = j - 1
        elif j==0:
            i = i - 1
        else:
            if accumulated_cost[i-1, j] == min(accumulated_cost[i-1, j-1],
                                               accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
                i = i - 1
            if accumulated_cost[i, j-1] == min(accumulated_cost[i-1, j-1],
                                               accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
                j = j-1
            else:
                i = i - 1
                j = j - 1
        path.append([i, j])
    path.append([0,0])
    for [x, y] in path:
        cost = cost + distances[x, y]
    return path, cost

In [None]:
def dynamic_time_warping(template_features, test_features):
    x=template_features
    y=test_features
    distances = np.zeros((len(x), len(y)))
    
    #Create a distance matrix for all combinations of test and template frames
    for i in range(len(x)):
        for j in range(len(y)):
            distances[i,j] = euclidean_distance(x[i],y[j])
    
    #Develop the accumulated cost array i.e. the minimum total cost to get to a point x,y
    accumulated_cost = np.zeros((len(x), len(y)))
    accumulated_cost[0,0] = distances[0,0]
    for i in range(1, len(y)):
        accumulated_cost[0,i] = distances[0,i] + accumulated_cost[0, i-1] 
    for i in range(1, len(x)):
        accumulated_cost[i,0] = distances[i, 0] + accumulated_cost[i-1, 0]    
    for i in range(1, len(x)):
        for j in range(1, len(y)):
            accumulated_cost[i, j] = min(accumulated_cost[i-1, j-1],
                                         accumulated_cost[i-1, j], accumulated_cost[i, j-1]) + distances[i, j]
            
    path, cost = path_cost(x, y, accumulated_cost, distances)      
    
#     path_x = [point[0] for point in path]
#     path_y = [point[1] for point in path]

#     distance_cost_plot(accumulated_cost)
#     plt.plot(path_x, path_y, 'r-')
    
    return path, cost

In [None]:
file1 ='./cut_dataset/1/digit_1_37_1.wav'
file2 ='./cut_dataset/1/digit_1_37_1.wav'
_, digit1 = read(file1)
_, digit2 = read(file2)

template = np.array(feature_extractor(digit1/max(abs(digit1))))
test = np.array(feature_extractor(digit2/max(abs(digit2))))

path,cost= dynamic_time_warping(template, test)

In [None]:
print(cost)
print(len(path))
path

In [None]:
def predict_with_DTW(test_features, cb):
    min_distance=100000000 
    pred=10
    for label in range(cb.shape[0]):  #Iterate overall all digit vector clouds
        d_label=0
        d_min=10000000
        for i in range(3):  #Compare with 3 random template samples from the class
            n = random.randint(0,103)
            template_features = cb[label, n*60:(n+1)*60,:]
            _, cost = dynamic_time_warping(template_features, test_features) 
            if(cost<d_min):
                d_min = cost
        d_label = d_min
        if(d_label < min_distance):
            min_distance = d_label
            pred = label
    return pred

In [None]:
directory = './cut_dataset/'
codebook_dtw = np.zeros((10,108*60,13))
for label in range(10):  #Iterate over each digit
    folder = os.path.join(directory,str(label)+'/')
    i=0
    for file in os.listdir(folder):
        fs, digit1 = read(folder+file)
        digit = digit1/max(abs(digit1))  #Scale the audio file
        features=np.array(feature_extractor(digit))
        codebook_dtw[label,i*60:(i+1)*60,:] = features
        i+=1
    print('done', label)

In [None]:
pred_list = []
target_list = []
for test in range(27):
    print(test)
    cb, testset = split_codebook(codebook_dtw, test)
    for label in range(10):  #Iterate over each digit
        print(label)
        for i in range(4):  #Size of testset
            test_features = testset[label, i*60:(i+1)*60:]
            pred = predict_with_DTW(test_features, cb)
            pred_list.append(pred)
            target_list.append(label)
accuracy = get_accuracy(target_list, pred_list)  
print("\nAccuracy = ", accuracy)
conf = confusion_matrix(target_list, pred_list)
print(conf)