In [185]:
# Assignment 3 Part B -- DTW and HMM

# CWRT DTW

In [186]:
# Imports

import os
import numpy as np
import matplotlib.pyplot as plt
import heapq
import time
from collections import Counter

In [187]:
# Load Train and Dev MFCCs
# Structure of train_mfcc and dev_mfcc = {digit:{filename:np_array_of_MFCC}}
INF = 9999999
digits = [2,4,6,8,9]
train_mfcc = {}
dev_mfcc = {}

for dig in digits:
    train_path = f"./IsolatedDigits/{dig}/train/"
    dev_path = f"./IsolatedDigits/{dig}/dev/"
    
    # Train MFCCs
    train_fps = os.listdir(train_path)
    mfcc_fps = [fp for fp in train_fps if fp[len(fp)-4:len(fp)] == 'mfcc']
    train_mfcc[dig] = {}
    for fp in mfcc_fps:
        fn = fp.split('.')[0]
        train_mfcc[dig][fn] = np.loadtxt(train_path+fp, skiprows=1)
    
    # Dev MFCCs
    dev_fps = os.listdir(dev_path)
    mfcc_fps = [fp for fp in dev_fps if fp[len(fp)-4:len(fp)] == 'mfcc']
    dev_mfcc[dig] = {}
    for fp in mfcc_fps:
        fn = fp.split('.')[0]
        dev_mfcc[dig][fn] = np.loadtxt(dev_path+fp, skiprows=1)

In [188]:
# TODO Remove
print(train_mfcc.keys(), train_mfcc[2].keys())

dict_keys([2, 4, 6, 8, 9]) dict_keys(['ac_2', 'ag_2', 'ai_2', 'an_2', 'bh_2', 'bi_2', 'br_2', 'ca_2', 'cg_2', 'cl_2', 'cm_2', 'dc_2', 'dg_2', 'ea_2', 'ec_2', 'ee_2', 'eg_2', 'ei_2', 'ek_2', 'es_2', 'hg_2', 'hp_2', 'ig_2', 'ih_2', 'il_2', 'jc_2', 'ji_2', 'jj_2', 'jn_2', 'jp_2', 'kc_2', 'kf_2', 'kh_2', 'kk_2', 'kn_2', 'kt_2', 'la_2', 'ld_2', 'ls_2'])


In [200]:
def cost(v1,v2):
    '''
    Computes cost of difference between two vectors
    '''

    return np.linalg.norm(v1-v2)

def dtw(x, y):
    '''
    Computes DTW between sequences of MFCC vectors x and y 
    '''
    
    XLEN = x.shape[0]
    YLEN = y.shape[0]
    
    dp = [[INF for j in range(YLEN+1)] for i in range(XLEN+1)]
    dp[0][0] = 0

    for i in range(1,XLEN+1):
        for j in range(1,YLEN+1):
            cost_here = cost(x[i-1], y[j-1])
            dp[i][j] = cost_here + min([dp[i-1][j], dp[i][j-1], dp[i-1][j-1]])
    
    return dp[-1][-1]

def majorityVoting(lst):
    data = Counter(lst)
    return data.most_common(1)[0][0]

def dtwWithPath(x,y):
    '''
    Same as dtw() but also return the warped form of y wrt x
    '''

    XLEN = x.shape[0]
    YLEN = y.shape[0]
    
    dp = [[[INF,i,j] for j in range(YLEN+1)] for i in range(XLEN+1)]
    dp[0][0] = [0,None,None]

    # Find DTW Matrix
    for i in range(1, XLEN+1):
        for j in range(1, YLEN+1):
            c = cost(x[i-1], y[j-1])
            i_1 = max([i-1,0])
            j_1 = max([j-1,0])
            dp[i][j] = min([ [dp[i_1][j][0]+c, i_1, j], [dp[i][j_1][0]+c, i, j_1], [dp[i_1][j_1][0]+c, i_1, j_1]  ])

    # Extract Path
    curr = [XLEN, YLEN]
    path = [curr.copy()]
    dirs = []
    next = dp[curr[0]][curr[1]][1:]

    # ct = 0
    while next[0] != None and next[1] != None: 
        if next[0] < curr[0] and next[1] < curr[1]:
            dirs.append('J')
        elif next[1] < curr[1]:
            dirs.append('R')
        else:
            dirs.append('D')

        path.append(next.copy())
        curr = next.copy()
        next = dp[curr[0]][curr[1]][1:]

    path = path[::-1][1:]
    dirs = dirs[::-1][1:]
    
    # R : avg from curr to curr+runlength(R) inclusive
    # D : interp between curr and curr+runlength(D) inclusive, runlength(D) times

    # CWRT
    # Resolve all Rs first, they are avgs and their endpoints dont change
    # Then resolve Ds using the updated endpoints. Helps in case of ...DR...
    
    # Resolving Rs
    new_dirs = []
    warped_y = []
    R_run = 0
    D_run = 0
    ptr = 0
    for dir_ind in range(len(dirs)):
        dir = dirs[dir_ind]
        if dir == 'J':
            new_dirs.append('J')
            if R_run != 0:
                warped_y.append(np.mean(y[ptr-R_run:ptr+1], axis=0))
                R_run = 0
            elif D_run != 0:
                warped_y.append(y[ptr])
                D_run = 0
            else:
                warped_y.append(y[ptr])
            ptr+=1
        elif dir == 'R':
            R_run += 1
            ptr += 1
        else: # dir == 'D'
            D_run += 1
            new_dirs.append('D')
    if D_run != 0:
        warped_y.append(y[ptr])
        
    elif R_run != 0:
        warped_y.append(np.mean(y[ptr-R_run:ptr+1], axis=0))
    
    warped_y.append(y[-1])
    dirs = new_dirs.copy()

    # Resolving Ds
    y = warped_y.copy()
    warped_y = []
    D_run = 0
    ptr = 0
    for dir_ind in range(len(dirs)):
        dir = dirs[dir_ind]
        if dir == 'J':
            if D_run != 0:
                interps = np.linspace(y[ptr], y[ptr+1], D_run+2)[:-1]
                interps = [interps[ind] for ind in range(interps.shape[0])]
                warped_y.extend(interps)
                D_run = 0
                ptr+=1
            else:
                warped_y.append(y[ptr])
                ptr+=1
        else: # dir == 'D'
            D_run += 1
    if D_run != 0:
        interps = np.linspace(y[ptr], y[ptr+1], D_run+2)[:-1]
        interps = [interps[ind] for ind in range(interps.shape[0])]
        warped_y.extend(interps)
        D_run = 0
    else:
        warped_y.append(y[-1])
    
    return dp[-1][-1], warped_y


In [201]:
test_arr1 = np.array([1,7,3,4,1,10,5,4,7,4])
test_arr2 = np.array([1,4,5,10,9,3,2,6,8,4])
print(cost(test_arr1[1],test_arr2[1]))
res = dtwWithPath(test_arr1, test_arr2)
print(f"warped y: {res[1]}")

3.0
warped y: [1, 4.0, 4.333333333333333, 4.666666666666667, 5, 9.5, 3, 2, 7.0, 4]


In [203]:
def getTemplates(train_mfcc):
    '''
    Computes a reference template for each class using CWRT
    '''

    templ = {dig:None for dig in train_mfcc.keys()}
    for dig in train_mfcc.keys():
        
        dig_fns = sorted(train_mfcc[dig].keys())
        init_templ = train_mfcc[dig][dig_fns[0]]
        warped_collection = [init_templ]
        for fn in dig_fns[1:]:
            mfcc_here = train_mfcc[dig][fn]
            res = dtwWithPath(init_templ, mfcc_here)
            warped_mfcc_here = res[1]
            
            warped_collection.append(warped_mfcc_here)
            
        # print('shapes:', [len(arr) for arr in warped_collection])
        new_templ = np.mean(np.array(warped_collection), axis=0)
        templ[dig] = new_templ
    
    return templ
            

In [205]:
# TODO Template Generation

# Warp kc of Nc train examples of class c to same audio, then average
templ = getTemplates(train_mfcc)
print("Templates Built")

Templates Built


In [213]:
# Compute DTW error, get majority votes of top K, get class

def predictCWRT(train_mfcc, dev_mfcc):

    t1 = time.time()
    correct = 0
    total = 0
    for dev_dig in dev_mfcc.keys():
        for dev_fn in dev_mfcc[dev_dig].keys():
            dev_frames = dev_mfcc[dev_dig][dev_fn]
            print(f"Dev {dev_fn} prediction: ", end = '')
            
            dig_best = None
            err_best = INF
            errs = []
            for train_dig in train_mfcc.keys():
                template = templ[train_dig]
                err_here = dtw(template, dev_frames)
                if err_here < err_best:
                    err_best = err_here
                    dig_best = train_dig
            
            print(dig_best)
            if dig_best == dev_dig:
                correct+=1
            
            total += 1
    
    t2 = time.time()
    acc = correct/total
    print(f"Acc: {acc}")
    print(f"Time taken: {int(t2-t1)}")

def predict(train_mfcc, dev_mfcc):
    t1 = time.time()
    K = 5
    correct = 0
    total = 0
    for dev_dig in dev_mfcc.keys():
        for dev_fn in dev_mfcc[dev_dig].keys():
            dev_frames = dev_mfcc[dev_dig][dev_fn]
            print(f"Dev {dev_fn} prediction: ", end = '')
            
            dig_best = None
            err_best = INF
            errs = []
            for train_dig in train_mfcc.keys():
            
                for train_fn in train_mfcc[train_dig].keys():
                    
                    train_frames = train_mfcc[train_dig][train_fn]
                    err_here = dtw(train_frames, dev_frames)
                    errs.append([err_here, train_dig])
            
            topK = heapq.nsmallest(K, errs)
            topK_digs = [val[1] for val in topK]
            dig_best = majorityVoting(topK_digs)
            
            print(dig_best)
            if dig_best == dev_dig:
                correct+=1
            else:
                print("Failed, votes:", topK_digs)
            total += 1
    
    t2 = time.time()
    acc = correct/total
    print(f"Acc: {acc}")
    print("Time taken: {:.4f} sec".format(t2-t1))


In [215]:
predict(train_mfcc, dev_mfcc)
predictCWRT(train_mfcc, dev_mfcc)

Dev mk_2 prediction: 2
Dev mm_2 prediction: 2
Dev ms_2 prediction: 2
Dev mw_2 prediction: 2
Dev nc_2 prediction: 2
Dev ng_2 prediction: 2
Dev nh_2 prediction: 2
Dev pe_2 prediction: 2
Dev pk_2 prediction: 2
Dev pm_2 prediction: 2
Dev pp_2 prediction: 2
Dev ra_2 prediction: 2
Dev mk_4 prediction: 4
Dev mm_4 prediction: 4
Dev ms_4 prediction: 4
Dev mw_4 prediction: 4
Dev nc_4 prediction: 4
Dev ng_4 prediction: 4
Dev nh_4 prediction: 4
Dev pe_4 prediction: 4
Dev pk_4 prediction: 4
Dev pm_4 prediction: 4
Dev pp_4 prediction: 4
Dev ra_4 prediction: 4
Dev mk_6 prediction: 6
Dev mm_6 prediction: 6
Dev ms_6 prediction: 6
Dev mw_6 prediction: 6
Dev nc_6 prediction: 6
Dev ng_6 prediction: 6
Dev nh_6 prediction: 6
Dev pe_6 prediction: 6
Dev pk_6 prediction: 6
Dev pm_6 prediction: 6
Dev pp_6 prediction: 6
Dev ra_6 prediction: 6
Dev mk_8 prediction: 8
Dev mm_8 prediction: 8
Dev ms_8 prediction: 8
Dev mw_8 prediction: 8
Dev nc_8 prediction: 8
Dev ng_8 prediction: 8
Dev nh_8 prediction: 8
Dev pe_8 pr

In [None]:
# Online Handwritten Character Recognition