<a href="https://colab.research.google.com/github/muskang48/Speaker-Diarization/blob/master/Change_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
%tensorflow_version 1.x
import librosa
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings("ignore")
file_list = ['Hindi1_01', 'Hindi1_02', 'Hindi1_03']



def extract_feature(file_name):
    file = "/content/drive/My Drive/SRU/" + file_name + ".wav"
    frame_size = 2048
    frame_shift = 512
    y, sr = librosa.load(file)
    #MFCC Extraction 
    mfccs = librosa.feature.mfcc(y, sr, n_mfcc=12, hop_length=frame_shift, n_fft=frame_size)
    mfcc_delta = librosa.feature.delta(mfccs)
    mfcc_delta2 = librosa.feature.delta(mfccs, order=2)

    mfcc = mfccs[1:, ]
    norm_mfcc = (mfcc - np.mean(mfcc, axis=1, keepdims=True)) / np.std(mfcc, axis=1, keepdims=True)
    norm_mfcc_delta = (mfcc_delta - np.mean(mfcc_delta, axis=1, keepdims=True)) / np.std(mfcc_delta, axis=1, keepdims=True)
    norm_mfcc_delta2= (mfcc_delta2 - np.mean(mfcc_delta2, axis=1, keepdims=True)) / np.std(mfcc_delta2, axis=1, keepdims=True)

    ac_feature = np.vstack((norm_mfcc, norm_mfcc_delta, norm_mfcc_delta2))
   
 #Loading Annotation File
    ann = pd.read_csv('/content/drive/My Drive/SRU/annotations1 (1).csv')
    ann['End_point'] = ann['Duration'] + ann['Offset']

    change_point = []
    for i in range(len(ann['End_point'])):
        dur_1 = int((ann['End_point'][i]-0.075)*sr)  # left 50ms
        dur_2 = int((ann['End_point'][i]+0.075)*sr)  # right 50ms
        change_point.append((dur_1, dur_2))
   
    sub_seq_len = int(3.2*sr/frame_shift)
    sub_seq_step= int(0.8*sr/frame_shift)

    feature_len = ac_feature.shape[1]

    def is_change_point(n):
        flag = False
        for x in change_point:
            if n > x[0] and n < x[1]:
                flag = True
                break

            if n+frame_size-1 > x[0] and n+frame_size-1 < x[1]:
                flag = True
                break
        return flag

    sub_train_x = []
    sub_train_y = []
    for i in range(0, feature_len-sub_seq_len, sub_seq_step):
        sub_seq_x = np.transpose(ac_feature[:, i: i+sub_seq_len])
        sub_train_x.append(sub_seq_x[np.newaxis, :, :])
        tmp = []
        for index in range(i, i+sub_seq_len):
            if is_change_point(index*frame_shift):
                tmp.append(1)
            else:
                tmp.append(0)
        lab_y = np.array(tmp)
        lab_y = np.reshape(lab_y, (1, sub_seq_len))
        sub_train_y.append(lab_y)
    return sub_train_x, sub_train_y


def load_dataset():
    all_x = []
    all_y = []
    for audio_file in file_list:
        new_train_x, new_train_y = extract_feature(audio_file)
        new_train_x = np.vstack(new_train_x)
        new_train_y = np.vstack(new_train_y)
        print(new_train_x.shape)
        print(new_train_y.shape)

        all_x.append(new_train_x)
        all_y.append(new_train_y)
    print(len(all_x))
    print(len(all_y))

    all_x_stack = np.vstack(all_x)
    all_y_stack = np.vstack(all_y)
    print(all_x_stack.shape, all_y_stack.shape)
    print('over')
    return all_x_stack, all_y_stack

In [3]:
from keras.legacy import interfaces
from keras.optimizers import Optimizer
from keras import backend as K
#SNORM Optimizer
class SMORMS3(Optimizer):
    """SMORMS3 optimizer.
    Default parameters follow those provided in the blog post.
    # Arguments
        lr: float >= 0. Learning rate.
        epsilon: float >= 0. Fuzz factor.
        decay: float >= 0. Learning rate decay over each update.
    # References
        - [RMSprop loses to SMORMS3 - Beware the Epsilon!](http://sifter.org/~simon/journal/20150420.html)
    """

    def __init__(self, learning_rate=0.001, epsilon=1e-16, decay=0.,
                 **kwargs):
        super(SMORMS3, self).__init__(**kwargs)
        with K.name_scope(self.__class__.__name__):
            self.learning_rate = K.variable(learning_rate, name='learning_rate')
            self.decay = K.variable(decay, name='decay')
            self.iterations = K.variable(0, dtype='int64', name='iterations')
        self.epsilon = epsilon
        self.initial_decay = decay

    @interfaces.legacy_get_updates_support
    def get_updates(self, loss, params):
        grads = self.get_gradients(loss, params)
        shapes = [K.shape(p) for p in params]
        ms = [K.zeros(shape) for shape in shapes]
        vs = [K.zeros(shape) for shape in shapes]
        mems = [K.zeros(shape) for shape in shapes]
        self.weights = [self.iterations] + ms + vs + mems
        self.updates = [K.update_add(self.iterations, 1)]

        learning_rate = self.learning_rate
        if self.initial_decay > 0:
            learning_rate *= (1. / (1. + self.decay * K.cast(self.iterations,
                                                  K.dtype(self.decay))))


        for p, g, m, v, mem in zip(params, grads, ms, vs, mems):

            r = 1. / (1. + mem)
            new_m = (1. - r) * m + r * g
            new_v = (1. - r) * v + r * K.square(g)
            denoise = K.square(new_m) / (new_v + self.epsilon)
            new_p = p - g * K.minimum(learning_rate, denoise) / (K.sqrt(new_v) + self.epsilon)
            new_mem = 1. + mem * (1. - denoise)

            self.updates.append(K.update(m, new_m))
            self.updates.append(K.update(v, new_v))
            self.updates.append(K.update(mem, new_mem))

            # Apply constraints.
            if getattr(p, 'constraint', None) is not None:
                new_p = p.constraint(new_p)

            self.updates.append(K.update(p, new_p))
        return self.updates

    def get_config(self):
        config = {'learning_rate': float(K.get_value(self.learning_rate)),
                  'decay': float(K.get_value(self.decay)),
                  'epsilon': self.epsilon}
        base_config = super(SMORMS3, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

Using TensorFlow backend.


In [0]:
from keras.layers.core import Dense
from keras.models import Sequential
from keras.layers import Bidirectional, TimeDistributed, Dropout
from keras.layers import LSTM
import numpy as np
import keras

def train_bilstm():

    model = Sequential()

    model.add(Bidirectional(LSTM(128, return_sequences=True)))
    model.add(Dropout(0.3))
    model.add(Bidirectional(LSTM(128, return_sequences=True)))
    model.add(Dropout(0.3))
    model.add(TimeDistributed(Dense(32)))
    model.add(TimeDistributed(Dense(32)))
    model.add(TimeDistributed(Dense(1, activation='sigmoid')))

    model.build(input_shape=(None, 137, 35))

    model.compile(loss=keras.losses.binary_crossentropy, optimizer=SMORMS3(), metrics=['accuracy'])
    model.summary()

    all_x, all_y = load_dataset()
    print(all_y.shape, np.sum(all_y))

    subsample_all_x = []
    subsample_all_y = []
    for index in range(all_y.shape[0]):
        class_positive = sum(all_y[index])
        if class_positive > 5:
            subsample_all_x.append(all_x[index][np.newaxis, :, :])
            subsample_all_y.append(all_y[index])

    all_x = np.vstack(subsample_all_x)
    all_y = np.vstack(subsample_all_y)
    print(all_y.shape, np.sum(all_y))

    all_y = all_y[:, :, np.newaxis]

    indices = np.random.permutation(all_x.shape[0])
    all_x_random = all_x[indices]
    all_y_random = all_y[indices]

    datasize = all_x_random.shape[0]
    train_size = int(datasize*0.97)
    train_x = all_x_random[0:train_size]
    valid_x = all_x_random[train_size:]

    train_y = all_y_random[0:train_size]
    valid_y = all_y_random[train_size:]
    print('train over')

    my = model.fit(x=train_x, y=train_y, batch_size=256, epochs=50,
              validation_data=(valid_x, valid_y), shuffle=True)
    model.save('/content/drive/My Drive/SRU/model_hindi_2.h5')
    def save_model(model, json_model_file, h5_model_file):
        # serialize model to JSON
        model_json = model.to_json()
        with open(json_model_file, "w") as json_file:
            json_file.write(model_json)
        # serialize weights to HDF5
        model.save_weights(h5_model_file)
        print("Saved model to disk")

    model_name = 'speech_seg1'
    json_model_file = '/content/drive/My Drive/SRU/model_hindi_2'+'.json'
    h5_model_file = '/content/drive/My Drive/SRU/model_hindi_2'+'.h5'
    save_model(model, json_model_file, h5_model_file)

In [0]:
#Calling the function to Train the Chnage detction Model based on Bi-LSTM
train_bilstm()