In [1]:
# import the necessary libs
import pandas
import numpy
import os
import zipfile

import matplotlib.pyplot as plt 
import h5py
import types 

import librosa # we will make use of `features` and `filters` modules
import hmmlearn # make use of hmm

import sklearn.linear_model
import sklearn.pipeline
import sklearn.preprocessing
import sklearn.metrics

import functools
import itertools
import multiprocessing

import dask_jobqueue
import dask.distributed

# Data Preparation.

In [2]:
rpm_lower = 715
rpm_uppper = 815

win_len_ms = 1000 # sample length (ms)

skip = 5000 # the number of measurements to skip from the start of each recording
smapling_freq = 4096 # sampling rate (Hz)

In [3]:
# the recordings to be used during training
# w_unb - without unbalance, unb - with unbalance
wo_unb,unb = '0D', '3D'
# define the file path of the datasets
infile = '/home/ix502iv/Documents/Probabilistic_Graphical_Models/DataSet/dataset_hmm.zip'

In [4]:
def load_zipfile(zfile, n):
    win_len = int(win_len_ms/1000*smapling_freq)
    with zfile.open(n + '.csv', 'r') as f:
        data = pandas.read_csv(f).iloc[skip:, :]

    n = (data.shape[0]//win_len) * win_len
    data = data.iloc[:n, :]

    rpm = numpy.reshape(data['Measured_RPM'].values, (-1, win_len), order='C')
    vibr = numpy.reshape(data['Vibration_3'].values, (-1, win_len), order='C')
    #choosing rpm based on the sensitivity aspect of hmm_mfcc : rpm_lw < rpm < rpm_up.
    ind, = numpy.nonzero(numpy.all(rpm>rpm_lower, axis=1) & numpy.all(rpm<rpm_uppper, axis=1))
    # randomly permutate a sequence : return a permutated range
    numpy.random.seed(170287); ind = numpy.random.permutation(ind)
    return vibr[ind, :].copy()

In [5]:
def load_data(filename, n_good, n_bad):
    with zipfile.ZipFile(filename, 'r') as zfile:
        good = load_zipfile(zfile, wo_unb) # 0D
        bad = load_zipfile(zfile, unb) # 3D
    return good, bad

In [6]:
# lets load the files
wo_unb, unb = load_data(infile, wo_unb, unb)

In [7]:
print("Without Unb. Samples '#0D'", wo_unb.shape[0])

Without Unb. Samples '#0D' 325


In [8]:
print("Unb. Samples '3D'", unb.shape[0])

Unb. Samples '3D' 331


# Training Routine

In [16]:
# 9 args within the train function
def train(
    smapling_freq,
    wo_unb, # load the data without unbalance
    unb, # data with unbalance
    *,

    train_ratio = 0.5, # ratio of data used for training the HMM

    # Hyperparams.
    fft_win = 31.25, # length of one fft window in milliseconds
    hop_len = 8.0, # displacement of consecutive windows (ms)
    n_mels = 15, # number of mel filters
    hmm_states = 5, # number of states in the HMM
):
    wo_unb = int(wo_unb/1000 * smapling_freq)
    hop_len = int(hop_len/1000 * smapling_freq)
    mfcc_args = dict(
        sr=smapling_freq,
        n_fft = nfft,
        n_mels = n_mels,
        hop_length = hop_len
    )

    # extract the features : without the unbalance
    tmp = [librosa.feature.mfcc(wo_unb[i,:], **mfcc_args).T
            for i in range(wo_unb)]
    feat_train = numpy.concatenate(tmp, axis=0)
    train_len = [m.shape[0] for m in tmp]

    # scale the features
    scaler = sklearn.preprocessing.StandardScaler()
    scaler.fit(feat_train)

    # train the HMM, on the without unb data
    model = hmmlearn.hmm.GaussianHMM(n_components=hmm_states)
    model.fit(scaler.transform(feat_train), length=train_len)

    # compute hmm output/score for the training data : without_unbal/with_unb.
    tmp1 = [
        model.score(scaler.transform(
            librosa.feature.mfcc(wo_unb[i, :], **mfcc_args).T))
        for i in range(wo_unb, wo_unb.shape[0])]
    tmp2 = [
        model.score(scaler.transform(
            librosa.feature.mfcc(unb[i,:], **mfcc_args).T))
        for i in range(unb, unb.shape[0])]

    # LogisticRegression
    log_reg = sklearn.pipeline.make_pipeline(
        sklearn.preprocessing.StandardScaler(),
        sklearn.linear_model.LogisticRegression(),
    )

    log_reg.fit(
        numpy.concatenate([numpy.reshape(tmp1, (-1,1)), numpy.reshape(tmp2, (-1,1))], axis=0),
        numpy.array([0]*len(tmp1)+[1]*len[tmp2])
    )
    return mfcc_args, model, scaler, log_reg

# Testing Routine

In [10]:
def test(models, wo_unb, unb):
    mfcc_args, model, scaler, lr = models

    tmp1 = [model.score(scaler.transform(
                librosa.feature.mfcc(wo_unb[i, :], **mfcc_args).T))
            for i in range(wo_unb.shape[0])]
    tmp2 = [model.score(scaler.transform(
                librosa.feature.mfcc(unb[i,:], **mfcc_args).T))
            for i in range(bad.shape[0])]
            
    result = log_reg.predict(
        numpy.concatenate([
            np.reshape(tmp1, (-1,1)), 
            np.reshape(tmp2, (-1,1))], axis=0))
    actual = numpy.array([0]*len(tmp1)+[1]*len(tmp2))

    return sklearn.metrics.balanced_accuracy_score(actual, result)

In [11]:
def get_score(
    smapling_freq, wo_unb, unb, *,
    train_samples = 200, # the number of samples for training
    **kwargs
):
    m = train(smapling_freq, wo_unb[:train_samples,:], unb[:train_samples,:], **kwargs)
    return test(m,wo_unb[train_samples:,:], unb[train_samples:,:])

In [12]:
hmm_states = [1,2,3,4,5]
n_mels = [10,15,20]
fft_win = [10,30,100,300]
hop_len = [5,10,20,50]

args = list(itertools.product(hmm_states, n_mels, fft_win, hop_len))

In [17]:
get_score(
    smapling_freq, wo_unb, unb,
    hmm_states = hmm_states,
    n_mels = n_mels,
    fft_win = fft_win,
    hop_len = hop_len
)

TypeError: only size-1 arrays can be converted to Python scalars