# Preprocessing

This file makes folders and processes the TIMIT dataset to filterbanks

In [1]:
import matplotlib.pyplot as plt

import os

In [2]:

import scipy.io.wavfile
from pathlib import Path
import re

import pandas as pd
import numpy as np
import librosa

Import of 'jit' requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.
  from numba.decorators import jit as optional_jit


In [3]:
# Change to change output folder
OUTPUT_PATH = Path('./MFCCS_DELTA_DELTA_NPY_LONGER')

TRAIN_PATH = Path('./TRAIN_WAV')
TEST_PATH = Path('./TEST_WAV')

# All of the sub directories, such as TRAIN_PATH/DR1
TRAIN_PATHS = [x for x in TRAIN_PATH.iterdir()]
TEST_PATHS = [x for x in TEST_PATH.iterdir()]

It would have been possible to use grub or something similar here.
But I figured that out quite late.

In [4]:
# All of the different accents/people talking
TRAIN_ACCENTS = [[x for x in xs.iterdir()] for xs in TRAIN_PATHS]
TEST_ACCENTS = [[x for x in xs.iterdir()] for xs in TEST_PATHS]

In [5]:
pat = r'^([A-Z0-9]+)\.[A-Z]+$'

def get_files(pathss):
    """
        Returns all of the files in a folder and
        all of the file-groupings in a path.
        For example if we have a folder with multiple files
        such as path/test.png, path/test.txt etc.
        Then this will return path/test
    """
    FILES = []
    for paths in pathss:
        for xs in paths:
            for x in xs.iterdir():
                FILES.append(x)
        #TRAIN_FILES.append([[x for x in xs.iterdir()] for xs in paths])
    FILES_2 = []
    matches = []

    for x in FILES:
        # I might be able to use x.stem insead of this
        # Path has .match for regex matching also. Might be quicker and better
        match = re.search(pat, x.name)
        if match == None or match.group(1) in matches:
            continue
        FILES_2.append(x.with_suffix(''))
        matches.append(match.group(1))
    return FILES, FILES_2 

# TRAIN_FILES here is the files while TRAIN_FILES_2 are the file groupings
(TRAIN_FILES_ALL, TRAIN_FILES_GROUPINGS) = get_files(TRAIN_ACCENTS)
(TEST_FILES_ALL, TEST_FILES_GROUPINGS) = get_files(TEST_ACCENTS)
print(TRAIN_FILES_GROUPINGS[:5])
print(TRAIN_FILES_ALL[:5])
#print(TEST_FILES_2[:5])
print(len(TRAIN_FILES_GROUPINGS))

[PosixPath('TRAIN_WAV/DR7/MMDG0/SX160'), PosixPath('TRAIN_WAV/DR7/MMDG0/SA1'), PosixPath('TRAIN_WAV/DR7/MMDG0/SA2'), PosixPath('TRAIN_WAV/DR7/MMDG0/SI2035'), PosixPath('TRAIN_WAV/DR7/MMDG0/SX340')]
[PosixPath('TRAIN_WAV/DR7/MMDG0/SX160.PHN'), PosixPath('TRAIN_WAV/DR7/MMDG0/SA1.TXT'), PosixPath('TRAIN_WAV/DR7/MMDG0/SA1.WAV.wav'), PosixPath('TRAIN_WAV/DR7/MMDG0/SA2.PHN'), PosixPath('TRAIN_WAV/DR7/MMDG0/SX70.WAV.wav')]
1718


In [6]:
path = Path(OUTPUT_PATH)
path.mkdir(parents=True, exist_ok=True)

In [7]:
bad_phones = ["h#"]
#from Lee & Hon, 1989
phone_transfers = {
    "ao": "aa",
    "ax": "ah",
    "ax-h": "ah",
    "axr": "er",
    "hv": "hh",
    "ix": "ih",
    "el": "l",
    "em": "m",
    "en": "n",
    "nx": "n",
    "eng": "ng",
    "zh": "sh",
    "ux": "uw",
    "pcl": "sil",
    "tcl": "sil",
    "kcl": "sil",
    "bcl": "sil",
    "dcl": "sil",
    "gcl": "sil",
    "h#": "sil",
    "pau": "sil",
    "epi": "sil"
}

In [8]:

def get_phn(file, i):
    """
        Gets the phone of file file at time i
    """
    phns = pd.read_csv(Path(file).with_suffix("").with_suffix('.PHN'), sep = " " , header=None)
    phn = phns[2][0]
    length = int(phns[1][0]) - int(phns[0][0])
    for idx, x in enumerate(phns[0]):
        #print(i, idx, x, phns[2][idx])
        if i >= x:
            phn = phns[2][idx]
            length = int(phns[1][idx]) - int(phns[0][idx])
            continue
        return phn, length
        break
    else:
        return phn, int(phns[1][idx - 1]) - int(phns[0][idx - 1])
            
def normalize(arr):
    # from : https://stackoverflow.com/questions/31152967/normalise-2d-numpy-array-zero-mean-unit-variance
    return (arr - arr.mean(axis=0)) / arr.std(axis=0)

In [13]:
def wavfile_to_mfccs(path, labels_file, idx="TRAIN_WAV"):
    #(sampling_rate, _sig) = scipy.io.wavfile.read(path)
    file = path
    (_sig, sampling_rate) = librosa.load(path, sr = 16000)
    window = int(400 * 2.8 / 2)
    for i in range(0, len(_sig) - 150, window):
        for mult in range(2):
            i = int(i + mult * (window / 2))
            if len(_sig) - i < window + 1:
                break
                
            sig = _sig[i: i + window]  
            
            segment_duration_ms = 25
            n_fft = int((segment_duration_ms / 1000.) * sampling_rate)

            # To get 15 x 3 samples
            hop_duration_ms = 5/2
            hop_length = int((hop_duration_ms / 1000.) * sampling_rate)

            mfcc_count = 40

            mfccs = librosa.feature.mfcc(
                y=sig,
                sr=sampling_rate,
                n_mfcc=mfcc_count,
                hop_length=hop_length,
                n_fft=n_fft
            )
            mfcc_delta = librosa.feature.delta(mfccs, width = 3)
            mfcc_delta2 = librosa.feature.delta(mfccs, order=2, width = 3)
            mfccs_and_deltas = np.hstack([mfccs, mfcc_delta, mfcc_delta2])
            
            mfccs_and_deltas = normalize(mfccs_and_deltas)
            
            label, phn_length = get_phn(path, i)
            
            if label in phone_transfers.keys():
                    label = phone_transfers[label]
                    
            if (label == "q"): 
                continue
            yield mfccs_and_deltas, label
        

In [None]:
def _parse_timit_line(self, line):
    start_frame, end_frame, label = line.split(' ')

    return int(start_frame), int(end_frame), label.strip('\n')


In [13]:
def wavfile_to_mfccs(path, labels_file, idx="TRAIN_WAV"):
    #(sampling_rate, _sig) = scipy.io.wavfile.read(path)
    file = path
    (_sig, sampling_rate) = librosa.load(path, sr = 16000)
    window = int(400 * 2.8 / 2)
    for i in range(0, len(_sig) - 150, window):
        for mult in range(2):
            i = int(i + mult * (window / 2))
            if len(_sig) - i < window + 1:
                break
                
            sig = _sig[i: i + window]  
            
            segment_duration_ms = 25
            n_fft = int((segment_duration_ms / 1000.) * sampling_rate)

            # To get 15 x 3 samples
            hop_duration_ms = 5/2
            hop_length = int((hop_duration_ms / 1000.) * sampling_rate)

            mfcc_count = 40

            mfccs = librosa.feature.mfcc(
                y=sig,
                sr=sampling_rate,
                n_mfcc=mfcc_count,
                hop_length=hop_length,
                n_fft=n_fft
            )
            mfcc_delta = librosa.feature.delta(mfccs, width = 3)
            mfcc_delta2 = librosa.feature.delta(mfccs, order=2, width = 3)
            mfccs_and_deltas = np.hstack([mfccs, mfcc_delta, mfcc_delta2])
            
            mfccs_and_deltas = normalize(mfccs_and_deltas)
            
            labels = []

            with open(labels_file, 'r') as f:
                for line in f.readlines():
                    start_frame, end_frame, label = self._parse_timit_line(line)

                    phn_frames = end_frame - start_frame
                    labels.extend([label] * phn_frames)

            label, phn_length = get_phn(path, i)
            
            labels.extend([label] * phn_length)
                    
            if (len(labels[i: i+window]) > 0) :
                label = max(labels[i: i + window])
            if label in phone_transfers.keys():
                    label = phone_transfers[label]
            if (label == "q"): 
                continue
        
            yield mfccs_and_deltas, label
        

In [14]:
def read_wavfiles_to_mfccs(files):
    print(len(files))
    
    X, y = [], []
    divider = len(files)
    print("Starting converting...")
    for i, file in enumerate(files):
        if (i % int(divider/20) == 0): print(f"{i / divider :.2f}")
        for mfccs, label in wavfile_to_mfccs(file.with_suffix(".WAV.wav"), file.with_suffix(".PHN")):
            if (label != "q"):
                X.append(normalize(mfccs))
                y.append(label)
    print("Done")
    return X, y

In [None]:
X, y = read_wavfiles_to_mfccs(TRAIN_FILES_GROUPINGS)

OUTPUT_PATH = "TEST_HIS_METHOD"

np.save(OUTPUT_PATH / "X_train", X)
np.save(OUTPUT_PATH / "y_train", y)

X, y= read_wavfiles_to_mfccs(TEST_FILES_GROUPINGS)

np.save(OUTPUT_PATH / "X_test", X)
np.save(OUTPUT_PATH / "y_test", y)

X,y = "", ""

1718
Starting converting...
0.00
0.05
0.10
0.15
0.20
0.25
0.30
0.35
0.40
0.45
0.49
0.54
0.59
0.64
0.69
0.74
0.79
0.84
0.89
0.94
0.99
Done
626
Starting converting...
0.00
0.05
0.10
0.15
0.20
0.25
0.30
0.35
0.40
0.45
0.50
0.54
0.59
0.64
0.69
0.74
0.79
0.84


In [23]:
print(len(TEST_FILES_GROUPINGS))

626
