# DTW

#### Main

In [63]:
from os import walk
import librosa
from dtaidistance import dtw_ndim 
import numpy as np
import statistics


# read db files 
filenames = next(walk("./db"), (None, None, []))[2]  # [] if no file

# load compared wav file 
mydata,samplerate=librosa.load("./db/Ahmed.wav",mono=True,duration=5)
# Extract mfcc feature
my_mfcc = librosa.feature.mfcc(mydata, samplerate)

Output=[]

for file in filenames:
    y,sr = librosa.load("./db/"+file,mono=True,duration=5)
    mfcc = librosa.feature.mfcc(y, sr,n_mfcc=13)
    distances=[]
    for i in range(13):
        d = dtw_ndim.distance(my_mfcc[i], mfcc[i])
        distances.append(d)
    my_mean = statistics.mean(distances)
    # print(my_mean)
    Output.append((file, my_mean))


print(sorted(Output,key=lambda l:l[1]))


[('Ahmed.wav', 0.0), ('Maram.wav', 79.28634417952661), ('Maryem.wav', 84.76229744737878), ('Hassan.wav', 86.57366553228464), ('Mayar.wav', 87.92548676581538), ('Amira.wav', 88.11779057299333), ('Raouf.wav', 91.0904175612278), ('Sara.wav', 99.47813219391566), ('Tarek.wav', 104.19648858575916), ('Dalia.wav', 108.15356819100323), ('Salah.wav', 115.27040278672031), ('Shady.wav', 115.9197291411085), ('Radwa.wav', 120.50508002276047), ('Zeyad.wav', 124.1582889858837), ('Yasmin.wav', 137.87250320638722), ('Mostafa.wav', 148.75218157074514), ('Ahmed1.wav', 151.01477704582447), ('Radwa2.wav', 160.46632554928877), ('Ahmed2.wav', 164.75968299061523), ('Tarek1.wav', 166.3467904149053), ('Dalia1.wav', 172.3775641723068), ('Amira1.wav', 177.61862796320872), ('Sara1.wav', 178.9869829877864), ('Hassan1.wav', 179.55352454735362), ('Shady1.wav', 180.13118364102687), ('Radwa1.wav', 184.03098586902334), ('ahmed4.wav', 198.9202862777289), ('Ahmed5.wav', 261.7594386858333)]


#### DTW - fastdtw

In [46]:
import numpy as np
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
x = np.array([1, 2, 3, 3, 7])
y = np.array([1, 2, 2, 2, 2, 2, 2, 4,7,7,7,7,7])
distance, path = fastdtw(x, y, dist=euclidean)
print(distance)
print(path)

2.0
[(0, 0), (1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (2, 7), (3, 7), (4, 8), (4, 9), (4, 10), (4, 11), (4, 12)]


#### DTW - dtaidistance

In [47]:
import numpy as np
from dtaidistance import dtw_ndim

series1 = np.array([[0, 0],  # first 2-dim point at t=0
                    [0, 1],  # second 2-dim point at t=1
                    [2, 1],
                    [0, 1],
                    [0, 0]], dtype=np.double)
series2 = np.array([[0, 0],
                    [2, 1],
                    [0, 1],
                    [0, .5],
                    [0, .5],
                    [0, 0]], dtype=np.double)

d = dtw_ndim.distance(series1, series2)
print (d)

# dtw_i = 0
# for dim in range(ndim):
#     dtw_i += dtw.distance(s1[:,dim], s2[:,dim])

1.224744871391589


#### DTW - dtw

In [48]:
import numpy as np

# We define two sequences x, y as numpy array
# where y is actually a sub-sequence from x
x = np.array([2, 0, 1, 1, 2, 4, 2, 1, 2, 0]).reshape(-1, 1)
y = np.array([1, 1, 2, 4, 2, 1, 2, 0]).reshape(-1, 1)

from dtw import dtw

manhattan_distance = lambda x, y: np.abs(x - y)

d, cost_matrix, acc_cost_matrix, path = dtw(x, y, dist_method=manhattan_distance)

print(d)
# >>> 2.0 # Only the cost for the insertions is kept

# You can also visualise the accumulated cost and the shortest path
import matplotlib.pyplot as plt

plt.imshow(acc_cost_matrix.T, origin='lower', cmap='gray', interpolation='nearest')
plt.plot(path[0], path[1], 'w')
plt.show()

TypeError: cannot unpack non-iterable DTW object

#### DTW - Librosa

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
y, sr = librosa.load(librosa.ex('brahms'), offset=10, duration=15)
X = librosa.feature.chroma_cens(y=y, sr=sr)
noise = np.random.rand(X.shape[0], 200)
Y = np.concatenate((noise, noise, X, noise), axis=1)
D, wp = librosa.sequence.dtw(X, Y, subseq=True)
fig, ax = plt.subplots(nrows=2, sharex=True)
img = librosa.display.specshow(D, x_axis='frames', y_axis='frames',
                               ax=ax[0])
ax[0].set(title='DTW cost', xlabel='Noisy sequence', ylabel='Target')
ax[0].plot(wp[:, 1], wp[:, 0], label='Optimal path', color='y')
ax[0].legend()
fig.colorbar(img, ax=ax[0])
ax[1].plot(D[-1, :] / wp.shape[0])
ax[1].set(xlim=[0, Y.shape[1]], ylim=[0, 2],
          title='Matching cost function')

#### DTW - Fast 2

In [None]:
import pandas as pd
import numpy as np

# Plotting Packages
import matplotlib.pyplot as plt
import seaborn as sbn

import matplotlib as mpl
mpl.rcParams['figure.dpi'] = 300
savefig_options = dict(format="png", dpi=300, bbox_inches="tight")


# Computation packages
from scipy.spatial.distance import euclidean
from fastdtw import fastdtw

def compute_euclidean_distance_matrix(x, y) -> np.array:
    """Calculate distance matrix
    This method calcualtes the pairwise Euclidean distance between two sequences.
    The sequences can have different lengths.
    """
    dist = np.zeros((len(y), len(x)))
    for i in range(len(y)):
        for j in range(len(x)):
            dist[i,j] = (x[j]-y[i])**2
    return dist


def compute_accumulated_cost_matrix(x, y) -> np.array:
    """Compute accumulated cost matrix for warp path using Euclidean distance
    """
    distances = compute_euclidean_distance_matrix(x, y)

    # Initialization
    cost = np.zeros((len(y), len(x)))
    cost[0,0] = distances[0,0]
    
    for i in range(1, len(y)):
        cost[i, 0] = distances[i, 0] + cost[i-1, 0]  
        
    for j in range(1, len(x)):
        cost[0, j] = distances[0, j] + cost[0, j-1]  

    # Accumulated warp path cost
    for i in range(1, len(y)):
        for j in range(1, len(x)):
            cost[i, j] = min(
                cost[i-1, j],    # insertion
                cost[i, j-1],    # deletion
                cost[i-1, j-1]   # match
            ) + distances[i, j] 
            
    return cost


# Create two sequences
x = [3, 1, 2, 2, 1]
y = [2, 0, 0, 3, 3, 1, 0]


# fig, ax = plt.subplots(figsize=(14, 10))

# # Remove the border and axes ticks
# fig.patch.set_visible(False)
# ax.axis('off')

# xx = [(i, x[i]) for i in np.arange(0, len(x))]
# yy = [(j, y[j]) for j in np.arange(0, len(y))]

# for i, j in zip(xx, yy[:-2]):
#     ax.plot([i[0], j[0]], [i[1], j[1]], '--k', linewidth=4)

# ax.plot(x, '-ro', label='x', linewidth=4, markersize=20, markerfacecolor='lightcoral', markeredgecolor='lightcoral')
# ax.plot(y, '-bo', label='y', linewidth=4, markersize=20, markerfacecolor='skyblue', markeredgecolor='skyblue')
# ax.set_title("Euclidean Distance!??", fontsize=28, fontweight="bold")

# fig.savefig("ex1_euclidean_distance.png", **savefig_options)

dtw_distance, warp_path = fastdtw(x, y, dist=euclidean)
cost_matrix = compute_accumulated_cost_matrix(x, y)

fig, ax = plt.subplots(figsize=(12, 8))
ax = sbn.heatmap(cost_matrix, annot=True, square=True, linewidths=0.1, cmap="YlGnBu", ax=ax)
ax.invert_yaxis()

# Get the warp path in x and y directions
path_x = [p[0] for p in warp_path]
path_y = [p[1] for p in warp_path]

# Align the path from the center of each cell
path_xx = [x+0.5 for x in path_x]
path_yy = [y+0.5 for y in path_y]

ax.plot(path_xx, path_yy, color='blue', linewidth=3, alpha=0.2)

fig.savefig("ex1_heatmap2.png", **savefig_options)


print("DTW distance: ", dtw_distance)
print("Warp path: ", warp_path)

cost_matrix = compute_accumulated_cost_matrix(x, y)
print(np.flipud(cost_matrix)) # Flipping the cost matrix for easier comparison with heatmap values!

cost_matrix = compute_accumulated_cost_matrix(x, y)
print(np.flipud(cost_matrix)) # Flipping the cost matrix for easier comparison with heatmap values!

#=============================2

time1 = np.linspace(start=0, stop=1, num=50)
time2 = time1[0:40]

x1 = 3 * np.sin(np.pi * time1) + 1.5 * np.sin(4*np.pi * time1)
x2 = 3 * np.sin(np.pi * time2 + 0.5) + 1.5 * np.sin(4*np.pi * time2 + 0.5)
distance, warp_path = fastdtw(x1, x2, dist=euclidean)
fig, ax = plt.subplots(figsize=(16, 12))

# Remove the border and axes ticks
fig.patch.set_visible(False)
ax.axis('off')

for [map_x, map_y] in warp_path:
    ax.plot([map_x, map_y], [x1[map_x], x2[map_y]], '-k')

ax.plot(x1, color='blue', marker='o', markersize=10, linewidth=5)
ax.plot(x2, color='red', marker='o', markersize=10, linewidth=5)
ax.tick_params(axis="both", which="major", labelsize=18)

fig.savefig("ex2_dtw_distance.png", **savefig_options)

#### DTW - without Library

In [None]:
"""
A very simple implementation of Dynamic Time Warping
by Simon King, 2015
https://speech.zone

License: Creative Commons Attribution 3.0 Unported
https://creativecommons.org/licenses/by/3.0/

You can re-use and distribute as you like, provided you retain this header.
"""

import math

def pretty_print_array(a):
	"""
	Print an array with nice line wrapping
	"""

	# Exercise: make it print out row and column indices, correctly aligned

	for i in range(len(a)):
		for j in range(len(a[i])):
			if a[i][j] == (None,None):
				print ("(-, -)")
			else:
				print (a[i][j])
		print


def local_distance(template_frame, test_frame):
	"""
	Compute the Euclidean distance between two feature 'vectors' (which must actually be ints)
	"""

	# Exercise: improve the function to accept vectors instead of scalars

	# sanity check, assuming we are working with ints
	assert type(template_frame) == type(test_frame) == int

	# Exercise: what sanity check would you perform when using floats?
	# Exercise: what sanity check(s) could you perform when using vectors of floats?

	# in one dimension, taking the square and then the square root gives the
	#  same result as taking the absolute value
	#  but we will implement it this way to make it obvious how to make the function
	#  work in higher dimensions
	return math.sqrt(  pow(template_frame - test_frame , 2)  )


def dtw(template, test):
	"""
	Perform Dynamic Time Warping between two sequences 
	"""

	global_distance=0
	alignment=[]

	# the backpointer and global_distance datastructures will have dimensions of len(template) x len (test)
	# 
	# they could be combined as an array of some class that we would have to define
	#  and that class would contain both the global distance and the backpointer for each position

	# Exercise: improve the datastructure along the lines suggested above


	# a data structure to hold the backpointers
	#  this is an array (implemented as a list of lists)
	# 
	# the backpointers are tuples of two values, pointing to the previous cell
	#  we will initialise them with empty values here
	backpointer=[]
	empty_backpointer=(None,None)
	for i in range(len(template)):
		this_row=[]
		for j in range(len(test)):
			this_row.append(empty_backpointer)
		backpointer.append(this_row)

	# Exercise: make the verbosity of the code controllable
	#  without having to comment lines such as the following out
	#  e.g., using a global variable "verbosity_level"

	# print "Initialised the backpointer data structure:"
	# pretty_print_array(backpointer)

	# a data structure to hold the shortest "global distance so far"
	#  this is an array (implemented as a list of lists) 
	global_distance=[]
	dummy_value=-1
	for i in range(len(template)):
		this_row=[]
		for j in range(len(test)):
			this_row.append(dummy_value)
		global_distance.append(this_row)

	# print "Initialised the global distance data structure:"
	# pretty_print_array(global_distance)

	# visit every position in the global distance matrix
	#  in order
	for i in range(len(template)):
		for j in range(len(test)):

			# deal with the edge cases first

			if (i==0) and (j==0):
				# this is the starting point
				# the global distance is just the local distance here
				# because there are no incoming paths
				global_distance[i][j] = local_distance( template[i], test[j] )
				backpointer[i][j] = (None,None)

			elif (i==0):
				# incoming paths can only come from one direction: j-1

				# check that the necessary previous position has already been visited
				assert global_distance[i][j-1] >= 0

				global_distance[i][j] = global_distance[i][j-1] + local_distance( template[i], test[j] )
				backpointer[i][j] = (i,j-1)

			elif (j==0):
				# incoming paths can only come from one direction: i-1

				# check that the necessary previous position has already been visited
				assert global_distance[i-1][j] >= 0

				global_distance[i][j] = global_distance[i-1][j] + local_distance( template[i], test[j] )
				backpointer[i][j] = (i-1,j)


			else:
				# the general case where paths can come from three directions

				# check that the necessary previous positions have already been visited
				assert global_distance[i][j-1]   >= 0
				assert global_distance[i-1][j]   >= 0
				assert global_distance[i-1][j-1] >= 0


				# this is where the Dynamic Programming happens !

				lowest_global_distance = global_distance[i-1][j]
				backpointer[i][j] = (i-1,j)

				if global_distance[i][j-1] < lowest_global_distance:
					lowest_global_distance = global_distance[i][j-1]
					backpointer[i][j] = (i,j-1)

				if global_distance[i-1][j-1] < lowest_global_distance:
					lowest_global_distance = global_distance[i-1][j-1]
					backpointer[i][j] = (i-1,j-1)


				global_distance[i][j] = lowest_global_distance + local_distance( template[i], test[j] )



	# the best possible global distance is just the value in the "last" corner of the matrix
	#  (remembering that everything is indexed from 0)
	D = global_distance[len(template)-1][len(test)-1]


	# Exercise: perform the backtracing using a recursive function instead of a while loop

	# now do the backtrace
	alignment=[]

	# start at the end - the last frame of the template aligns with the last frame of the test signal
	i,j = len(template)-1 , len(test)-1
	alignment.append( (i,j) )

	# only stop backtracing when BOTH i and j are 0
	#  if only ONE of them is 0, we are simply at an edge, but not the beginning
	while ( (i!=0) or (j!=0) ):

		alignment.append(backpointer[i][j])
		i,j = backpointer[i][j]

	#  reverse the alignment list so it is in the same order as the template and test signals
	#  (i.e., so that time runs forwards!)
	alignment.reverse()

	# return the global distance and the alignment
	return D, alignment



def main():
	"""
	The main function.
	Sets up some test templates and a test signal and uses DTW to compare them
	"""
	# hardcoded data to avoid having to load from a file
	templates=[]
	templates.append( [3, 2, 1, 2, 3, 3, 3] )
	templates.append( [1, 1, 2, 3, 4, 3, 2] )
	test =            [1, 2, 3, 2, 1]

	# Exercise: improve the code so that it will handle sequences of vectors, such as
	# templates=[]
	# templates.append( [ [3,3,2], [2,1,2], [1,2,2], [2,2,3] ] )
	# templates.append( [ [1,1,1], [1,1,2], [2,2,3], [3,2,4], [4,1,5] ] )
	# test =            [ [1,1,2], [2,2,4], [3,3,4], [2,4,3] ]

	# Exercise: add the ability to load the templates and the test signal from files

	for t in templates:
		D, alignment = dtw(t, test)
		print ("Distance from template",t,"to test",test,"is", D)

		for (i,j) in alignment:
			print ("Alignment",(i,j),": ",t[i]," aligns with", test[j])

if __name__ == "__main__":
	main()

In [None]:
   # manhattan_distance = lambda mymfcc2, mfcc2: np.abs(my_mfcc[i] - mfcc[i]) #trial2
    # d, cost_matrix, acc_cost_matrix, path = dtw(my_mfcc[i], mfcc[i], dist_method=euclidean) #trial2
    # distance, warp_path = fastdtw(my_mfcc[i], mfcc[i], dist=euclidean)#trial4
    # D, wp = librosa.sequence.dtw(mymfcc2, mfcc2, subseq=True)#trial5