In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F  
from DNN_utils import (flatten, column_str_to_numpy, check_accuracy, check_loss)
from torch.utils.data import Dataset, DataLoader, sampler

import numpy as np 
import pandas as pd
from joblib import load 
import matplotlib.pyplot as plt
from sklearn.utils import resample
from sklearn.model_selection import train_test_split

# Import helper functions.
import mfcc_label 
import get_prob

# Read the data
df_train_val = pd.read_csv('processed_data/dnn_never_train.csv')
df_test = pd.read_csv('processed_data/dnn_never_test.csv')

# Some columns are recorded as string although they are arrays.
column_str_to_numpy(df_train_val, 'mfcc')
column_str_to_numpy(df_train_val, 'label')
column_str_to_numpy(df_test, 'mfcc')
column_str_to_numpy(df_test, 'label')

#Split the train set into train and validation sets.
df_train_pre, df_val = train_test_split(df_train_val, test_size=0.2, random_state=42)
df_train_pre.reset_index(drop=True, inplace=True)
df_val.reset_index(drop=True, inplace=True)

# Create a single class label (type: int) which is the highest probability class in the label vector (type: 14x1 array).
df_train_pre['single_class_label'] = df_train_pre['label'].apply(lambda x: np.argmax(x))
df_val['single_class_label'] = df_val['label'].apply(lambda x: np.argmax(x))
df_test['single_class_label'] = df_test['label'].apply(lambda x: np.argmax(x))

# Configurations 
#NUM_TRAIN = int(0.8*len(df_train_pre)) # Number of training examples for splitting training and validation datasets. 
#NUM_ROWS = len(df_train_pre)
device = 'cpu'
dtype = torch.float32

# DNN Architecture Hyperparameters
minibatch_size = 64

In [3]:
print(len(df_train_val))
print(len(df_train_pre))
print(len(df_val)) 


15493
12394
3099


In [None]:
pd.set_option('display.max_colwidth', None)

# Upsample from observations that give positive probability on one of the 12 classes that correspond to 'never'. 
def upsample_minority(df, mask):
    df_minority = df[mask]
    df_majority = df[~mask]

    df_minority_upsampled = resample(df_minority,
                                    replace=True,     # sample with replacement
                                    n_samples=len(df_majority),    # to match majority class
                                    random_state=42) # reproducible results

    df_upsampled = pd.concat([df_majority, df_minority_upsampled])
    return df_upsampled

mask = df_train_pre['label'].apply(lambda x: any(elem > 0 for elem in x[:12]))
#mask = df_train_pre['label'].apply(lambda x: all(elem == 0 for elem in x[12:]))
df_train = upsample_minority(df_train_pre, mask)

print('Before upsampling:')
print(df_train_pre['label'].apply(lambda x: any(elem > 0 for elem in x[:12])).value_counts()) 

# Display new class counts
print('After upsampling:')
print(df_train['label'].apply(lambda x: any(elem > 0 for elem in x[:12])).value_counts()) 



In [None]:
print(df_train['single_class_label'].value_counts())

# Step 1: Sample from df_filtered such that 

#Questions:
#1- MFCC's that correspond to the same phoneme are closer in the feature space.
#   Should we take advantage of this while oversampling? (e.g. taking convex combination of two observations.)


In [None]:
load('processed_data/train_test_dataset_never.joblib')['test'] 

In [None]:
# Convert dataset into a format that torch can read.
class CustomDataset(Dataset):
    def __init__(self, dataframe, transform=None, train=True):
        # Convert the DataFrame to tensors or appropriate formats initially
        self.mfcc = torch.tensor(np.vstack(dataframe['mfcc'].to_list()), dtype=torch.float32)
        self.label = torch.tensor(np.vstack(dataframe['label'].to_list()), dtype=torch.long)
        self.transform = transform
        self.train = train

    def __len__(self):
        return len(self.mfcc)

    def __getitem__(self, idx):
        mfcc = self.mfcc[idx]
        label = self.label[idx]
        
        if self.transform:
            mfcc = self.transform(mfcc)

        return mfcc, label

# Create an instance of your dataset with your DataFrame
dataset_train = CustomDataset(df_train, train=True)  # Assuming df is your pandas DataFrame
dataset_val = CustomDataset(df_val, train=True)
dataset_test = CustomDataset(df_test,train=False)


# Create the DataLoader to handle batching
loader_train = DataLoader(dataset_train, batch_size=minibatch_size,
                          sampler=sampler.SubsetRandomSampler(range(len(df_train))))

loader_val = DataLoader(dataset_val, batch_size=1,
                        sampler=sampler.SequentialSampler(range(len(df_val))))

loader_test = DataLoader(dataset_test, batch_size=1,
                        sampler=sampler.SequentialSampler(range(len(df_test))))

In [None]:
# Example code to print the contents of the first few batches in loader_train

for i, (inputs, labels) in enumerate(loader_train):
    print(f"Batch {i + 1}")
    print(f"Features (MFCCs) size: {inputs.size()}")
    print(f"Labels size: {labels.size()}")
    print("\n") 
    
    # Optional: Stop after a few batches to avoid flooding the output
    if i == 2:  # Adjust this number based on how many batches you want to see
        break


In [None]:
class DNN_FC(nn.Module):
    def __init__(self, input_size, num_classes):
        super().__init__()
        # assign layer objects to class attributes
        # We may write a loop if we use the same activation function for all layers.
        self.fc1 = nn.Linear(input_size, input_size)
        nn.init.kaiming_normal_(self.fc1.weight)
        self.fc2 = nn.Linear(input_size, input_size)
        nn.init.kaiming_normal_(self.fc2.weight) 
        self.fc3 = nn.Linear(input_size, input_size)
        nn.init.kaiming_normal_(self.fc3.weight)
        self.fc4 = nn.Linear(input_size, input_size)
        nn.init.kaiming_normal_(self.fc4.weight)
        self.fc5 = nn.Linear(input_size, num_classes)
        nn.init.kaiming_normal_(self.fc5.weight)
    
    def forward(self, x):
        x_temp = x
        x_temp = flatten(x_temp)
        x_temp = F.relu(self.fc1(x_temp))
        x_temp = F.relu(self.fc2(x_temp))
        x_temp = F.relu(self.fc3(x_temp))
        x_temp = F.relu(self.fc4(x_temp))
        scores = self.fc5(x_temp)
        return scores


def test_DNN_FC():
    input_size = len(df_train.iloc[0]['mfcc'])  # Feature dimension for mfcc
    num_classes = len(df_train.iloc[0]['label']) # Number of phoneme classes
    dtype = torch.float32
    x = torch.zeros((minibatch_size, input_size), dtype=dtype)  # minibatch size 64, feature dimension 20
    model = DNN_FC(input_size, num_classes)
    scores = model(x)
    print(scores.size())  # you should see [minibatch_size, num_classes]
test_DNN_FC()

In [None]:
input_size = len(df_train['mfcc'][0])
num_classes = len(df_train['label'][0])
learning_rate = 1e-2
model = DNN_FC(input_size, num_classes)
optimizer = optim.Adam(model.parameters(), lr=learning_rate) 
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)

train_loss_lst, val_loss_lst = train(model, optimizer, scheduler, epochs = 10)



In [None]:
print(train_loss_lst)
print(val_loss_lst) 

# Plot the accuracy values
plt.plot(val_loss_lst[2:], label='Validation Loss')
plt.plot(train_loss_lst[2:], label='Training Loss')

# Add labels and title to the plot
plt.xlabel('Iteration')
plt.ylabel('Loss')
plt.title('Training and Validation Accuracy')
plt.legend()




In [None]:
# Save the model parameters
#torch.save(model.state_dict(), 'model_parameters.pth')

# To load the model parameters, use the following procedure: 
# Create an instance of the neural network
#new_model = DNN_FC(input_size, num_classes)
# Load the saved model parameters
#model_parameters = torch.load('model_parameters.pth')
# Assign the loaded parameters to the model
#new_model.load_state_dict(model_parameters) 

check_accuracy(loader_val, model) 
check_accuracy(loader_test, model) 
loader_train_fortest = DataLoader(dataset_train, batch_size=1,
                                  sampler=sampler.SequentialSampler(range(len(df_train))))

check_accuracy(loader_train_fortest, model)



In [None]:
check_accuracy(loader_val, model)
check_accuracy(loader_test, model) 
check_accuracy(loader_train, model) 


In [None]:
def infer_probabilities(loader, model):
    if loader.dataset.train:
        print('Getting estimated probabilities on validation set')
    else:
        print('Getting estimated probabilities on test set') 
    model.eval()  # set model to evaluation mode
    probabilities_dict = {} 
    batch_size = loader.batch_size
    with torch.no_grad():
        for idx, (x, y) in enumerate(loader):
            y = flatten(y) # Flatten y to convert dimension from (Nx1) to (N,)
            x = x.to(device=device, dtype=dtype)  # move to device, e.g. GPU
            y = y.to(device=device, dtype=dtype) 
            scores = model(x) 
            probabilities = torch.softmax(scores, dim=1) 
            
            # Save the probabilities with the corresponding row index
            for i in range(len(probabilities)):
                probabilities_dict[idx * batch_size + i] = probabilities[i].numpy()
    
    return probabilities_dict


def find_emission(loader, model, scale = False):
    '''
    Find emission probabilities for a given data loader and model.
    Consider changing this function if it takes too long. Currently: O(n) 
    Args:
        loader: torch Data loader
        model: torch DNN model
        scale: Boolean: Set true to scale the output probability of DNN.
    Returns:
        emission_df: Dataframe for emission probabilities.
    '''
    # Get the inferred probabilities for each class (12 states, background and silence)
    probabilities_dict = infer_probabilities(loader, model) 
    emission = probabilities_dict
    # Get the prior vector and the transition probabilities. We don't need the transition probabilities.
    prior_vector, _ = get_prob.main(rerun=False) 

    # For each key=row_idx and val=prob_array, convert the inferred probabilities into emission.
    for key, val in emission.items():
        # Slice val to exclude the probabilities for background and silence.
        if scale == True:
            log_prob = np.where(val > 0, np.log(val), -np.inf)   # Get the log probabilities. 
            log_prob = log_prob[:-2]  # Exclude the background and silence in the emission probability calculation. 
            emission[key] = [log_prob-prior_vector]  # Divide by prior vector in the log space. 
        else:
            emission[key] = [val] 

    emission_df = pd.DataFrame.from_dict(emission, orient='index', columns=['Emission']) 
    return emission_df 

In [None]:
prior_vector, _ = get_prob.main(rerun=True)
estimate_prob = infer_probabilities(loader_test, model)
emission_data = find_emission(loader_test, model)


In [None]:
def path_to_emission(file_path_wav: str, file_path_phn: str):
    '''
    Given the path of a file, get the emission probabilities.
    Args:
        file_path: Path of the audio file as a string.
    Returns:
        emit: pd.dataframe
            Emission probabilities for each frame in the audio file.
    '''
    df_test = mfcc_label.prepare_data(file_path_phn,file_path_wav)
    column_str_to_numpy(df_test, 'mfcc')
    column_str_to_numpy(df_test, 'label')
    # Convert dataframe into a loader so that torch can work with.
    dataset_test = CustomDataset(df_test,train=False)
    loader_test = DataLoader(dataset_test, batch_size=1,
                        sampler=sampler.SequentialSampler(range(len(df_test))))

    emission_data = find_emission(loader_test, model)
    return emission_data
    

path_to_emission('timit/data/TRAIN/DR4/MDCD0/SX425.WAV','timit/data/TRAIN/DR4/MDCD0/SX425.PHN')

In [None]:
def get_emission_all_paths(path_type: str = 'test'):
    paths = load('processed_data/train_test_dataset_never.joblib')[path_type]
    data = {}
    for i in range(len(paths)):
        file_path_wav, file_path_phn, file_path_word = paths[i]
        emission_data = path_to_emission(file_path_wav, file_path_phn)
        data[(file_path_wav, file_path_phn, file_path_word)] = emission_data

    return data
data = get_emission_all_paths() 

from joblib import dump
dump(data, "processed_data/test_data_for_hmm.joblib")