In [None]:
# Importing necessary packages
import numpy as np
from scipy.io import wavfile
import math
import matplotlib.pyplot as plt
import ntpath
import os
from sklearn.cluster import KMeans
import sys
import random
sys.setrecursionlimit(5000)

In [None]:
speechTrainPath = r"C:\Users\ujjaw\Desktop\MLSP_Assignments\Ass3\Q3\speech_music_classification\train\speech"
musicTrainPath = r"C:\Users\ujjaw\Desktop\MLSP_Assignments\Ass3\Q3\speech_music_classification\train\music"

In [None]:
musicTrainData = []
for filename in os.scandir(musicTrainPath):
    if filename.is_file():
        filename2 = musicTrainPath + "\\" + ntpath.basename(filename)
        musicTrainData.append(filename2)

speechTrainData = []
for filename in os.scandir(speechTrainPath):

    if filename.is_file():
        filename2 = speechTrainPath + "\\" + ntpath.basename(filename)
        speechTrainData.append(filename2)
# print(len(musicTrainData))
# print(len(speechTrainData))

In [None]:
F = np.zeros((64,400), np.complex64)
def spectrogram(data):
    hammingWindowSize=int(25*16000/1000)
    shiftLength = int(10*16000/1000)
#     nFFT=256
    for i in range(64):
        for j in range(400):
            F[i,j] =  np.exp(-2j *i * j * math.pi/400)
    spec = np.zeros(((1 + np.int(np.floor((len(data) - hammingWindowSize) / float(shiftLength)))), 64), np.complex64)
    for i in range((1 + np.int(np.floor((len(data) - hammingWindowSize) / float(shiftLength))))):
        spec[i] = np.matmul(F,np.multiply(data[i * shiftLength: i * shiftLength + hammingWindowSize],np.hamming(hammingWindowSize)))
    spec = np.log(np.abs(spec[:, :64 // 2])+1e-8)
    return spec

In [None]:
musicFrames = []
for i in range(len(musicTrainData)):
    sampleRate, data = wavfile.read(musicTrainData[i])
    tempSpecData = spectrogram(data)
    musicFrames.extend(tempSpecData)
musicFrames = np.array(musicFrames)
print(musicFrames.shape)

In [None]:
speechFrames = []
for i in range(len(speechTrainData)):
    sampleRate, data = wavfile.read(speechTrainData[i])
    tempSpecData = spectrogram(data)
    speechFrames.extend(tempSpecData)
speechFrames = np.array(speechFrames)
print(speechFrames.shape)

In [None]:
T = 2998
N = 3
K = 8
no_of_files = 40

In [None]:
kmeansMusic = KMeans(n_clusters=K, random_state=0).fit(musicFrames)
kmeansSpeech = KMeans(n_clusters=K, random_state=0).fit(speechFrames)

In [None]:
obsMusic = np.array(kmeansMusic.labels_).reshape(40,2998)
obsSpeech = np.array(kmeansSpeech.labels_).reshape(40,2998)

print(obsMusic.shape)
print(obsSpeech.shape)

Implementing HMM; All the functions for part a,b,c,d are defined in HMM Class

In [None]:
# Implementing HMM
class HMM:
    
    def __init__(self):
        #Given inital lambda
        self.pi = np.array([0.5, 0.5, 0]).reshape(1,N)
        self.A = np.array([0.6, 0.4, 0.0, 0.3, 0.5, 0.2, 0.0, 0.1, 0.9]).reshape(N,N)
        self.B = np.array([0.25, 0.25, 0.25, 0.25, 0.0, 0.0, 0.0, 0.0, 0.125, 0.125, 0.125, 0.125, 0.125, 0.125, 0.125, 0.125, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5, 0.5, 0.0]).reshape(N,K)
                
        #Matrices for DP
        self.alphaStored = np.full((T, N), -1)
        self.betaStored = np.full((T, N), -1)
        self.deltaStored = np.full((T, N), -1)
        self.psiStored = np.full((T, N), -1)
        
        #Array for scaling factor
        self.c = np.full((1,T), -1)
        
    def funcAlpha(self,t,j,o):
        if self.alphaStored[t][j] == -1:
            if t == 0:
                self.alphaStored[t][j] = self.pi[0][j] * self.B[j][o[0]] 
                
                #Handling Scaling
                ####################################################
                if (self.c[0][t] != -1):
                    if (j==N): 
                        for x in range(N):
                            self.c[0][t] += self.alphaStored[t][x]
                        for x in range(N):
                            self.alphaStored[t][x] /= self.c[0][t]
                #####################################################
            else:
                temp = 0
                for i in range(N):
                    temp += self.funcAlpha(t-1,i,o) * self.A[i][j] * self.B[j][o[t]]
                self.alphaStored[t][j] = temp
                
                #Handling Scaling
                #####################################################
                if (j==N):
                    if (self.c[0][t] != -1):
                        for x in range(N):
                            self.c[0][t] += self.alphaStored[t][x]
                        for x in range(N):
                            self.alphaStored[t][x] /= self.c[0][t]
                ####################################################
                
        return self.alphaStored[t][j]

    def findForwardLikelihood(self,obSeq):
        tempLikelihoodForward = 0
        for i in range(N):
            tempLikelihoodForward += self.funcAlpha(T-1,i,obSeq)
        self.alphaStored = np.full((T, N), -1)
        return tempLikelihoodForward
    
    def funcBeta(self,t,i,o):
        if self.betaStored[t][i] == -1:
            if t == T-1:
                self.betaStored[t][i] = 1
            else:
                temp = 0
                for j in range(N):
                    temp += self.funcBeta(t+1,j,o) * self.A[i][j] * self.B[j][o[t+1]]
                self.betaStored[t][i] = temp
        return self.betaStored[t][i]

    def findBackwardLikelihood(self,obSeq):
        tempLikelihoodBackward = 0
        for i in range(N):
            tempBeta = self.funcBeta(1,i,obSeq)
            tempLikelihoodBackward += tempBeta * self.pi[0][i] * self.B[i][obSeq[0]]
        self.betaStored = np.full((T, N), -1)
        return tempLikelihoodBackward
    
    #Implementing Alternate Viterbi Algorithm 
    def viterbiAlgorithm(self,t,j,o):
        # 0:Preprocessing
        pi_log = np.log(self.pi, out=np.zeros_like(self.pi), where=(self.pi!=0))
        A_log = np.log(self.A, out=np.zeros_like(self.A), where=(self.A!=0))
        B_log = np.log(self.B, out=np.zeros_like(self.B), where=(self.B!=0))
        if self.deltaStored[t][j] == -1:
            if t == 0:
                # 1:Initialization
                self.deltaStored[t][j] = pi_log[0][j] + B_log[j][o[0]]
                self.psiStored[t][j] = 0
            else:
                # 2:Recursion
                Max = -1
                maxIndex = -1
                for i in range(N):
                    tempVA = self.viterbiAlgorithm(t-1,i,o) 
                    temp = tempVA + A_log[i][j]
                    if (temp>Max):
                        Max = temp
                        maxIndex = i
                self.deltaStored[t][j] = Max + B_log[j][o[t]]
                self.psiStored[t][j] = maxIndex
        return self.deltaStored[t][j]

    def findStateSequence(self,obSeq):
        p = -1
        q = np.zeros((1,T))
        # 3:Termination
        for i in range(N):
            temp = self.viterbiAlgorithm(T-1,i,obSeq)
            if (temp>p):
                p = temp
                q[0][T-1] = i
        # 4:Backtracking
        c = T-2
        while (c>-1):
            x = int(q[0][c+1])
            q[0][c] = self.psiStored[c+1][x]
            c -= 1
        self.deltaStored = np.full((T, N), -1)
        self.psiStored = np.full((T, N), -1)
        return q
    
    def funcGamma(self,t,i,o):
        numerator = 0
        denominator = 0
        for j in range(N):
            temp = self.funcAlpha(t,j,o) * self.funcBeta(t,j,o)
            if (i == j):
                numerator = temp
            denominator += temp
        return numerator/denominator
    
    def funcZita(self,t,i,j,o):
        numerator = 0
        denominator = 0
        for x in range(N):
            for y in range(N):
                temp = self.funcAlpha(t,x,o) * self.A[x][y] * self.B[y][o[t+1]] * self.funcBeta(t+1,y,o)
                if (x==i and y==j):
                    numerator = temp
                denominator += temp
        return numerator/denominator
    
    def reconstructPI(self,obsComplete):
        E = obsComplete.shape[0]
        pi_new = np.zeros((1,N))
        for i in range(N):
            for e in range(E):
                pi[0][i] = self.funcGamma(1,i,obsComplete[e])
        pi_new = pi_new/E
        return pi_new

    def reconstructA(self,obsComplete):
        A_new = np.zeros((N,N))
        E = obsComplete.shape[0]
        for i in range(N):
            for j in range(N):
                numerator = 0
                denominator = 0
                for e in range(E):
                    for t in range(T-1):
                        numerator += self.funcZita(t,i,j,obsComplete[e])
                        denominator += self.funcGamma(t,i,obsComplete[e])
                A_new[i][j] = numerator/denominator
        return A_new
    
    def reconstructB(self,obsComplete):
        B_new = np.zeros((N,K))
        E = obsComplete.shape[0]
        for i in range(N):
            for k in range(K):
                numerator = 0
                denominator = 0
                for e in range(E):
                    for t in range(T-1):
                        tempGamma = self.funcGamma(t,i,obsComplete[e])
                        numerator += self.viterbiAlgorithm(t,i,obsComplete[e]) * tempGamma
                        denominator += tempGamma
                B_new[i][k] = numerator/denominator
        return B_new

    def HMM_Model(self,obsComplete):
        d = 0.0001
        LikelihoodList = []
        for i in range(20):
            temp_new_pi = self.reconstructPI(obsComplete)
            temp_new_A = self.reconstructA(obsComplete)
            temp_new_B = self.reconstructB(obsComplete)
            
            prevLikelihood = self.findLogLikelihood(obsComplete)
            self.A = temp_new_A
            self.B = temp_new_B
            self.pi = temp_new_pi
            currLikelihood = self.findLogLikelihood(obsComplete)
            
            LikelihoodList.append(prevLikelihood)
            self.plotLikelihood(LikelihoodList)
            
            #Convergence of Baum-Welch
            if(currLikelihood - prevLikelihood<d):
                break
            
    def findLogLikelihood(self,obsComplete):
        E = obsComplete.shape[0]
        sumLikelihood = 0
        for e in range(E):
            temp = self.findForwardLikelihood(obsComplete[e])
            sumLikelihood += temp
        sumLikelihood = sumLikelihood / E
        return sumLikelihood
    
    def plotLikelihood(self,likelihoodList):
        plt.xlabel("#Iterations")
        plt.ylabel("Log Likelihood")
        plt.plot(np.array(logLikeliHoodList) * (-1))
        
            

# Part A

In [None]:
# Computing Likelihood
rm = random.randint(0, no_of_files-1)
rs = random.randint(0, no_of_files-1)
obSeqMusic = obsMusic[rm]
obSeqSpeech = obsSpeech[rs]

partAMusic = HMM()
partASpeech = HMM()

likelihoodForwardMusic = partAMusic.findForwardLikelihood(obSeqMusic)
likelihoodForwardSpeech = partASpeech.findForwardLikelihood(obSeqSpeech)
print(likelihoodForwardMusic)
print(likelihoodForwardSpeech)

if(likelihoodForwardMusic>likelihoodForwardSpeech):
    print("Music File is more likely")
else:
    print("Speech File is more likely")

# Part B

In [None]:
# Computing Likelihood
rm = random.randint(0, no_of_files-1)
rs = random.randint(0, no_of_files-1)
obSeqMusic = obsMusic[rm]
obSeqSpeech = obsSpeech[rs]

partBMusic = HMM()
partBSpeech = HMM()
likelihoodBackwardMusic = partBMusic.findBackwardLikelihood(obSeqMusic)
likelihoodBackwardSpeech = partBSpeech.findBackwardLikelihood(obSeqSpeech)
print(likelihoodBackwardMusic)
print(likelihoodBackwardSpeech)

if(likelihoodBackwardMusic>likelihoodBackwardSpeech):
    print("Music File is more likely")
else:
    print("Speech File is more likely")

# Part C: Viterbi Algorithm

In [None]:
# Testing Viterbi Algorithm
obSeq = obsMusic[0]
partC = HMM()

q = partC.findStateSequence(obSeq)
print (q[0])

# Part D

In [None]:
#Training the Models
lambda1 = HMM()
lambda1.HMM_Model(obsMusic)

lambda2 = HMM()
lambda2.HMM_Model(obsSpeech)

# PART E:Testing the data

In [None]:
#Testing the test data
testPath = r"C:\Users\ujjaw\Desktop\MLSP_Assignments\Ass3\Q3\speech_music_classification\test"
testData = []
testFiles = []
for filename in os.scandir(testPath):
    if filename.is_file():
        filename2 = testPath + "\\" + ntpath.basename(filename)
        testFiles.append(filename2)
for i in testFiles:
    if(i.split('\\')[-1].split('_')[0] == 'music'):
        testData.append(0)
    else:
        testData.append(1)
no_of_test_files = len(testFiles)
# print(testData)

In [None]:
testFrames = []
for i in range(len(testFiles)):
    sampleRate, data = wavfile.read(testFiles[i])
    tempSpecData = spectrogram(data)
    testFrames.extend(tempSpecData)
testFrames = np.array(testFrames)
# print(testFrames.shape)

In [None]:
kmeansTest = KMeans(n_clusters=K, random_state=0).fit(testFrames)
obsTest = np.array(kmeansTest.labels_).reshape(no_of_test_files,2998)

In [None]:
predicted = []
for i in range(no_of_test_files):
    
    testMusicLikelihood = lambda1.findForwardLikelihood(obsTest[i])
    testSpeechLikelihood = lambda2.findForwardLikelihood(obsTest[i])
    
    if(testMusicLikelihood > testSpeechLikelihood):
        predicted.append(0)
    else:
        predicted.append(1)

In [None]:
for i in range(no_of_test_files):
    if (predicted[i] == 0):
        print(testFiles[i].split("\\")[-1], " is classified as music")
    else:
        print(testFiles[i].split("\\")[-1], " is classified as speech")

In [None]:
correctPrediction = 0
totalPrediction = no_of_test_files
for i in range(totalPrediction):
    if(predicted[i] == testData[i]):
        correctPrediction += 1
print("Accuracy = ", "{:.2f}".format(correctPrediction/totalPrediction*100), "%")