In [1]:
import matplotlib.pyplot as plt
from scipy.fftpack import fft
from scipy import linalg
from scipy.linalg import eigh
import numpy as np
import pandas as pd
import librosa
import os

### Spectrogram Function

In [2]:
def spectrogram(path):
    data, sample_rate = librosa.load(path, sr = 16000) # Reading the file and decomposing into sample rate and amplitude vector
    
    # Defining Hamming window particulars
    window = 0.025
    shift = 0.01
    
    # Translating into number of samples from given Hamming window particulars
    frame_length, frame_step = int(window * sample_rate), int(shift * sample_rate)
    signal_length = len(data)

    # Calculating number of frames. Converting to inn tas the values are floats and the calculated values need to be discrete integers.
    num_frames = int((signal_length - frame_length) / frame_step)

    # Creating an envelope of array that shifts with Hamming window 
    envelope_length = num_frames * num_frames + frame_length

    # Storing data into a temporary array, to be used for appending
    temp = np.zeros((envelope_length - signal_length))
    envelope = np.append(data, temp) 

    # Using np.tile to create repeating arrays while using arange to set-up frame length and corresponding entries
    storage = np.tile(np.arange(0, frame_length), (num_frames, 1)) + np.tile(np.arange(0, num_frames * frame_step, frame_step), (frame_length, 1)).T

    # Storing the data into the frames
    frames = envelope[storage]
    
    point_FFT = 64

    fft_data = np.absolute(np.fft.fft(frames, point_FFT))  # Magnitude of the FFT
    
    fft_data = fft_data.T
    
    fft_data = fft_data[0:32,:] # Slicing into top 32 vectors 
    
    spectrogram = fft_data.T - np.mean(fft_data.T, axis=0)

    spectrogram = spectrogram.T
    
    return spectrogram

In [3]:
# Storing file names

names_speech = os.listdir('speech_music_classification/train/speech/')

names_music = os.listdir('speech_music_classification/train/music/')


# Creating the music data matrix
music_data = []

for name in names_music:
    music_data.append(spectrogram('speech_music_classification/train/music/'+name))

music_data = np.array(music_data)

music_data = music_data.reshape(music_data.shape[1], music_data.shape[2]*music_data.shape[0])
music_data = music_data.T


# Creating the speech data matrix
speech_data = []

for name in names_speech:
    speech_data.append(spectrogram('speech_music_classification/train/speech/'+name))

speech_data = np.array(speech_data)

speech_data = speech_data.reshape(speech_data.shape[1], speech_data.shape[2]*speech_data.shape[0])
speech_data = speech_data.T

### KMeans Implementation

In [52]:
np.random.seed(42)


class KMeans(object):
    def pairwise_dist(self, x, y): # get individual distances
        xSumSquare = np.sum(np.square(x),axis=1);
        ySumSquare = np.sum(np.square(y),axis=1);
        mul = np.dot(x, y.T);
        dists = np.sqrt(abs(xSumSquare[:, np.newaxis] + ySumSquare-2*mul))
        return dists

    def _init_centers(self, points, K, **kwargs): # compute centers
        row, col = points.shape
        retArr = np.empty([K, col])
        for number in range(K):
            randIndex = np.random.randint(row)
            retArr[number] = points[randIndex]
        
        return retArr

    def _update_assignment(self, centers, points): # update assignment of each data point of the particular cluster
        row, col = points.shape
        cluster_idx = np.empty([row])
        distances = self.pairwise_dist(points, centers)
        cluster_idx = np.argmin(distances, axis=1)

        return cluster_idx

    def _update_centers(self, old_centers, cluster_idx, points): # update cluster centroids based on new means
        K, D = old_centers.shape
        new_centers = np.empty(old_centers.shape)
        for i in range(K):
            new_centers[i] = np.mean(points[cluster_idx == i], axis = 0)
        return new_centers

    def _get_loss(self, centers, cluster_idx, points):  # find the loss value
        dists = self.pairwise_dist(points, centers)
        loss = 0.0
        N, D = points.shape
        for i in range(N):
            loss = loss + np.square(dists[i][cluster_idx[i]])
        
        return loss

    def __call__(self, points, K, max_iters=100, abs_tol=1e-16, rel_tol=1e-16, verbose=False, **kwargs): # implement KMeans on the data
        centers = self._init_centers(points, K, **kwargs)
        for it in range(max_iters):
            cluster_idx = self._update_assignment(centers, points)
            centers = self._update_centers(centers, cluster_idx, points)
            loss = self._get_loss(centers, cluster_idx, points)
            K = centers.shape[0]
            if it:
                diff = np.abs(prev_loss - loss)
                if diff < abs_tol and diff / prev_loss < rel_tol:
                    break
            prev_loss = loss
        return cluster_idx, centers, loss

### EM Algorithm

In [8]:
#### Defining Gaussian Function
def Gaussian(x, mu, sigma):
    d = len(x)  # Number of entries in the vector
    N = (1/((2*np.pi)**(d/2)))*(np.exp(-0.5*(x-mu)@(np.linalg.inv(sigma))@((x-mu).T)))
    return N


#### Expectation step
def E_step(data, weight, mu, sigma, K):
    
    n = data.shape[0]
    
    num = np.empty([n,K])
    R = np.empty([n,K])
    den = np.zeros(n)
    
    for n, sample in enumerate(data):
        for k in range(K):
            num[n][k] = weight[k]*Gaussian(sample, mu[k], sigma[k])
            den[n] = den[n] + num[n][k]
        
        R[n] = num[n]/den[n]
    
    return R


#### Maximization step
def M_step(R,data, weight, mu, sigma):
    
    N = data.shape[0]
    
    D = data.shape[1]
    
    R = np.where(np.isnan(R), np.ma.array(R, mask=np.isnan(R)).mean(axis=0), R)  # Making sure that NaNs if created are replaced by mean of their columns
    
    assignment = []  # Array which stores the assigned cluster for all n data points
    for sample in R:
        assignment.append(np.argmax(sample))
        
    N = []  # N stores the count assigned for each Gaussian clusters
    
    for k in range(K):
        N.append(assignment.count(k))
        
    
    # Updating means
    for k in range(K):
        p = 0
        for n in range(len(R)):
            p += R[n][k]*data[n]
        mu[k] = p/N[k]
    
        
    # Updating covariance matrices
    for k in range(K):
        p = np.zeros([D,D])
        for n in range(len(R)):
            X = ((data[n]-mu[k])).reshape(D,1)
            p +=(R[n][k]*X*X.T)
        sigma[k] = p/N[k]
        sigma[k]= np.diag(np.diag(sigma[k]))
    
    
    # Updating weights
    tot = len(data)
    for k in range(K):
        weight[k] = N[k]/tot
        
    
    return assignment, weight, mu, sigma
            

### Initialization

In [9]:
def initialize(data, K):

    n = data.shape[0]
    d = data.shape[1]

    # Initializing weights from priors
    np.random.seed(42)
    weight = [np.random.rand() for i in range(K)]
    s = sum(weight)
    weight = [ i/s for i in weight ]


    # Initializing mean and sigma using K-Means algorithm
    kmeans = KMeans(K)
    identified_clusters = kmeans._call_(data)

    mu = np.empty([K,d])
    sigma = np.zeros([K,d,d])
    for k in range(K):
        tmp = data[np.argwhere(identified_clusters==k)]
        tmp = tmp.reshape(tmp.shape[0],tmp.shape[2])
        mu[k] = tmp.mean(axis=0)
        sigma[k] = (tmp.T@tmp)/tmp.shape[0]
        sigma[k]= np.diag(np.diag(sigma[k]))
    
    return weight, mu, sigma

### Training GMM for Music for K = 2

In [57]:
K = 2
weight_m, mu_m, sigma_m = initialize(music_data, K)
R_m = E_step(music_data, weight_m, mu_m, sigma_m, K)
assignment_m, weight_m, mu_m, sigma_m = M_step(R_m,music_data, weight_m, mu_m, sigma_m)

  R[n] = num[n]/den[n]


### Training GMM for Speech for K = 2

In [58]:
K = 2
weight_s, mu_s, sigma_s = initialize(speech_data, K)
R_s = E_step(speech_data, weight_s, mu_s, sigma_s, K)
assignment_s, weight_s, mu_s, sigma_s = M_step(R_s,speech_data, weight_s, mu_s, sigma_s)

  R[n] = num[n]/den[n]


### Function to implement testing

In [54]:
def test(tst):
    
    likelihood_s = []   
    for k in range(K):
        for ele in t:
            num = weight_s[k]*Gaussian(ele, mu_s[k], sigma_s[k])
            likelihood_s.append(num)
            
    likelihood_m = []  
    for k in range(K):
        for ele in t:
            num = weight_m[k]*Gaussian(ele, mu_m[k], sigma_m[k])
            likelihood_m.append(num)
            
    if((sum(likelihood_s)/len(likelihood_s))>(sum(likelihood_m)/len(likelihood_m))):
        return "S"
    else:
        return "M"

The idea here is to calculate individual likelihood of each data point for each test file and average it over the columns. For whichever class the average value is higher, the final decision list will be appended accordingly.

In [17]:
names_test = os.listdir('speech_music_classification/test/')

test_data = []

for name in names_test:
    test_data.append((spectrogram('speech_music_classification/test/'+name)).T)
    
test_data = np.array(test_data)

decision = []

for data in test_data:
    decision.append(test(data))