# Training Audio Recognition using Tensorflow
### Speech Recognition for Controlling Robot (THAI COMMAND)
#### By. Arunwat Moonbung
#### SPECIAL THANKS TO "Leandro Roser"
#### FOR AUDIO-PREPROCESSING e.g. AUDIO-AUGMENTATION TECHNIQUES, DATA LOADING, DATA INTEGRITY OBSERVE.

In [1]:
import gc
import glob
import IPython.display as ipd
import json
import math
import os
import pickle
import shutil

import librosa, librosa.display
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler,MinMaxScaler,LabelBinarizer
import wandb
from wandb.keras import WandbCallback

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.python.client import device_lib


In [2]:
os.environ["CUDA_VISIBLE_DEVICES"]='0,1'
print(f"Currently using Tensorflow {tf.__version__}")
print(tf.config.list_physical_devices('GPU'))
print(tf.test.gpu_device_name())
tf.random.set_seed(6131501066)

Currently using Tensorflow 2.8.0
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
/device:GPU:0


### CLASS AND FUNCTION DEFINATION

In [3]:
DATASET_PATH = os.path.join("Data_Thai","train")
DATASET_JSON = os.path.join("Data_Thai","classmap.json")

In [4]:
def pad_audio(signal, NUM_SAMPLES_TO_CONSIDER):
    if len(signal) >= NUM_SAMPLES_TO_CONSIDER:
        return signal[:NUM_SAMPLES_TO_CONSIDER]
    else:
        return np.pad(signal, pad_width=(NUM_SAMPLES_TO_CONSIDER - len(signal), 0), mode='constant', constant_values=(0, 0))
    
def pad_audio_sec(self, signal, DURATION, NUM_SAMPLES_TO_CONSIDER):
        TOTAL_SAMPLE = DURATION*NUM_SAMPLES_TO_CONSIDER
        if len(signal) >= TOTAL_SAMPLE:
            return signal[:TOTAL_SAMPLE]
        else:
            #return np.pad(signal, pad_width=(0, TOTAL_SAMPLE - len(signal)), mode='constant', constant_values=(0, 0)) # PAD หลัง
            return np.pad(signal, pad_width=(TOTAL_SAMPLE - len(signal), 0), mode='constant', constant_values=(0, 0)) # PAD หน้า

def chop_audio(signal, NUM_SAMPLES_TO_CONSIDER=16000):
    while True:
        beg = np.random.randint(0, len(signal) - NUM_SAMPLES_TO_CONSIDER)
        yield signal[beg: beg + NUM_SAMPLES_TO_CONSIDER]

def choose_background_generator(signal, backgrounds, max_alpha=0.7):
    if backgrounds is None:
        return signal
    my_gen = backgrounds[np.random.randint(len(backgrounds))]
    background = next(my_gen) * np.random.uniform(0, max_alpha)
    augmented_data = signal + background
    augmented_data = augmented_data.astype(type(signal[0]))
    return augmented_data

def random_shift(signal, NUM_SAMPLES_TO_CONSIDER=16000, shift_max=0.2):
    shift = np.random.randint(NUM_SAMPLES_TO_CONSIDER * shift_max)
    out = np.roll(signal, shift)
    # Time shift
    if shift > 0:
        out[:shift] = 0
    else:
        out[shift:] = 0
    return out

def random_change_pitch(signal, NUM_SAMPLES_TO_CONSIDER=16000):
    pitch_factor = np.random.randint(1, 4)
    out = librosa.effects.pitch_shift(y=signal, sr=NUM_SAMPLES_TO_CONSIDER, n_steps=pitch_factor)
    return out

def random_speed_up(signal):
    where = ["start", "end"][np.random.randint(0, 1)]
    speed_factor = np.random.uniform(0, 0.5)
    up = librosa.effects.time_stretch(y=signal, rate=1 + speed_factor)
    up_len = up.shape[0]
    if where == "end":
        up = np.concatenate((up, np.zeros((signal.shape[0] - up_len,))))
    else:
        up = np.concatenate((np.zeros((signal.shape[0] - up_len,)), up))
    return up

def get_image_list(train_audio_path):
    classes = os.listdir(train_audio_path)
    classes = [thisclass for thisclass in classes if thisclass != '_background_noise_']
    index = [i for i,j in enumerate(classes)]
    outlist = []
    labels = []
    text_labels = dict(zip(classes, index))
    for thisindex, thisclass in zip(index, classes):
        filelist = [f for f in os.listdir(os.path.join(train_audio_path, thisclass)) if f.endswith('.wav')]
        filelist = [os.path.join(train_audio_path, thisclass, x) for x in filelist]
        outlist.append(filelist)
        labels.append(np.full(len(filelist), fill_value=thisindex))
    try:
        with open(DATASET_JSON, "w") as f:
            json.dump(text_labels, f, indent=4)
        print(f"#: SAVING CLASS LABEL MAPPING.. AT {train_audio_path}")
    except:
        print("!: ERROR WHILE SAVING .json file")
        
    return outlist, labels, text_labels

def split_train_test_stratified_shuffle(images_list, labels, train_size=0.7):
    classes_size = [len(x) for x in images_list]
    classes_vector = [np.arange(x) for x in classes_size]
    total = np.sum(classes_size)
    total_train = [int(train_size * total * x) for x in classes_size / total]
    train_index = [np.random.choice(x, y, replace=False) for x,y in zip(classes_size, total_train)]
    validation_index = [np.setdiff1d(i,j) for i,j in zip(classes_vector,train_index)]
    train_set = [np.array(x)[idx] for x,idx in zip(images_list, train_index)]
    validation_set = [np.array(x)[idx] for x,idx in zip(images_list, validation_index)]
    train_labels = [np.array(x)[idx] for x,idx in zip(labels,train_index)]
    validation_labels = [np.array(x)[idx] for x,idx in zip(labels, validation_index)]
    # ----------------------------------------------------------------------------------------- 
    train_set = np.array([element for array in train_set for element in array])
    validation_set = np.array([element for array in validation_set for element in array])
    train_labels = np.array([element for array in train_labels for element in array])
    validation_labels = np.array([element for array in validation_labels for element in array])
    # -----------------------------------------------------------------------------------------
    train_shuffle = np.random.permutation(len(train_set))
    validation_shuffle = np.random.permutation(len(validation_set))
    train_set = train_set[train_shuffle]
    validation_set = validation_set[validation_shuffle]
    train_labels = train_labels[train_shuffle]
    validation_labels = validation_labels[validation_shuffle]
    return train_set, train_labels, validation_set, validation_labels

def preprocess_data(file_path, background_generator, n_mfcc=40, hop_length=512, n_fft=4096, NUM_SAMPLES_TO_CONSIDER=16000, threshold=0.7):
    # Downsample to NUM_SAMPLES_TO_CONSIDER Hz
    signal, sr = librosa.load(file_path, sr=NUM_SAMPLES_TO_CONSIDER)
    signal = pad_audio(signal, sr)
    if np.random.uniform(0, 1) > threshold:
        # ADD NOISE TO 30% OF DATA
        signal = choose_background_generator(signal, background_generator)
    if np.random.uniform(0, 1) > threshold:
        signal = random_shift(signal)
    if np.random.uniform(0, 1) > threshold:
        signal = random_change_pitch(signal)
    if np.random.uniform(0, 1) > threshold:
        signal = random_speed_up(signal)
    MFCCs = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=n_mfcc) # Transpose for sklearn
    MFCCs = np.moveaxis(MFCCs, 1, 0)
    #scaler = MinMaxScaler() # OPTIONAL FOR Scaling
    scaler = StandardScaler() 
    MFCCs_scaled = scaler.fit_transform(MFCCs)
    # MFCCs Input Shape -> (NUM_SAMPLE x NUM_MFCC_COEFFICIENT x 1)
    return MFCCs_scaled.reshape(MFCCs_scaled.shape[0], MFCCs_scaled.shape[1], 1)

class data_generator(keras.utils.Sequence):
    def __init__(self, x_set, y_set, batch_size, background_generator):
        self.x, self.y = x_set, y_set
        self.batch_size = batch_size
        self.background_generator = background_generator
    
    def __len__(self):
        return math.ceil(len(self.x) / self.batch_size)
    
    def __getitem__(self, idx):
        idx_from = idx * self.batch_size
        idx_to = (idx + 1) * self.batch_size
        batch_x = self.x[idx_from:idx_to]
        batch_y = self.y[idx_from:idx_to]
        x = [preprocess_data(elem, self.background_generator) for elem in batch_x]
        y = batch_y
        return np.array(x).astype(np.float32), np.array(y).astype(np.float32)
        #return np.array(x), np.array(y)
    
def build_model(num_classes, input_shape):
    model = keras.Sequential()
    model.add(keras.Input(shape=input_shape))
    model.add(keras.layers.Conv2D(filters=32, kernel_size=(3,3), padding="same"))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    model.add(keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding="same"))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    model.add(keras.layers.Conv2D(filters=128, kernel_size=(3,3), padding="same"))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    model.add(keras.layers.Conv2D(filters=256, kernel_size=(3,3), padding="same"))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    ##############################################################################
    #model.add(keras.layers.Dropout(0.25))
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(128))
    model.add(keras.layers.Activation("relu"))
    #model.add(keras.layers.Dropout(0.5))
    #model.add(keras.layers.Dense(num_classes, activation='softmax'))
    model.add(keras.layers.Dense(num_classes))
    model.add(keras.layers.Activation("softmax"))
    return model

# kernel_regularizer=keras.regularizers.l2(0.001)

'''def build_model(num_classes, input_shape):
    model = keras.Sequential()
    model.add(keras.Input(shape=input_shape))
    model.add(keras.layers.Conv2D(filters=32, kernel_size=(3,3), 
                                padding="same", activation="relu",
                                kernel_regularizer=keras.regularizers.l2(0.001)))
    model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    model.add(keras.layers.Conv2D(filters=64, kernel_size=(3,3),
                                padding="same", activation="relu",
                                kernel_regularizer=keras.regularizers.l2(0.001)))
    model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    model.add(keras.layers.Conv2D(filters=128, kernel_size=(3,3),
                                padding="same", activation="relu",
                                kernel_regularizer=keras.regularizers.l2(0.001)))
    model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    model.add(keras.layers.Conv2D(filters=256, kernel_size=(3,3),
                                padding="same", activation="relu",
                                kernel_regularizer=keras.regularizers.l2(0.001)))
    model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
    model.add(keras.layers.Dropout(0.25))
    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(128, activation="relu"))
    model.add(keras.layers.Dropout(0.5))
    model.add(keras.layers.Dense(num_classes, activation='softmax'))
    return model'''

def multiclass_roc(y_test, y_pred, average="macro"):
    lb = LabelBinarizer()
    lb.fit(y_test)
    y_test = lb.transform(y_test)
    y_pred = lb.transform(y_pred)
    all_labels = np.unique(y_test)

    for (idx, c_label) in enumerate(all_labels):
        fpr, tpr, thresholds = roc_curve(y_test[:,idx].astype(int), y_pred[:,idx])
        c_ax.plot(fpr, tpr, label = '%s (AUC:%0.2f)'  % (c_label, auc(fpr, tpr)))
    c_ax.plot(fpr, fpr, 'b-', label = 'Random Guessing')
    return roc_auc_score(y_test, y_pred, average=average)

In [None]:
'''train_audio_sample = os.path.join("Data_Thai","train","backward","b1.wav")
x,sr = librosa.load(train_audio_sample, sr = 16000)
x = pad_audio(x, sr)
choose_background_generator(x, background_generator)'''

### LOADING DATASET / VERIFY LOADING DATASET

In [5]:
# LOAD DATA WITH BACKGROUNDS
wavfiles = glob.glob(os.path.join(DATASET_PATH, "_background_noise_/*wav"))
wavfiles = [librosa.load(elem, sr = 16000)[0] for elem in wavfiles]
# wavfile คือ array ของไฟล์เสียง
background_generator = [chop_audio(x) for x in wavfiles]

In [6]:
# TRAINING DATA PATHS, 
# SPLIT TRAIN-TEST VIA STRATIFIED SAMPLING, 
# CALL A DATA GENERATOR FOR KERAS
# LOAD TRAIN
images_list, labels, classes_map = get_image_list(DATASET_PATH)
train_set, train_labels, validation_set, validation_labels = split_train_test_stratified_shuffle(images_list, labels)
train_datagen = data_generator(x_set=train_set, y_set=train_labels, batch_size=32, background_generator=background_generator)
validation_datagen = data_generator(x_set=validation_set, y_set=validation_labels, batch_size=32, background_generator=None)


#: SAVING CLASS LABEL MAPPING.. AT Data_Thai\train


In [None]:
#display(train_datagen[0][0].shape)

In [None]:
'''display(images_list)
display(labels)
display(classes_map)'''

In [7]:
# CHECK IF TRAINING DATASET ALSO CONTAIN IN VALIDATION DATASET? // ควรเป็น False เพราะต้องไม่มีตัวไหนที่ซ้ำกัน
inv_map =  {v: k for k, v in classes_map.items()}
any_present=[i in validation_set for i in train_set]
np.any(any_present)

False

In [8]:
# CHECK MATCHING FILE .WAV FROM (train_set) and LABELS FROM (train_labels) 
# โฟลเดอร์ไฟล์ ต้องตรงกับ Label ข้างล่างตามลำดับ ไม่งั้นแสดงว่า Data-pre ผิดพลาด
test1 = np.random.randint(10, 100, 10)
train_set[test1],[inv_map[int(i)] for i in train_labels[test1]]

(array(['Data_Thai\\train\\backward\\b108.wav',
        'Data_Thai\\train\\forward\\f315.wav',
        'Data_Thai\\train\\turnright\\r905.wav',
        'Data_Thai\\train\\grab\\g205.wav',
        'Data_Thai\\train\\forward\\f211.wav',
        'Data_Thai\\train\\stop\\s415.wav',
        'Data_Thai\\train\\release\\re809.wav',
        'Data_Thai\\train\\release\\re806.wav',
        'Data_Thai\\train\\search\\se106.wav',
        'Data_Thai\\train\\release\\re809.wav'], dtype='<U34'),
 ['backward',
  'forward',
  'turnright',
  'grab',
  'forward',
  'stop',
  'release',
  'release',
  'search',
  'release'])

In [9]:
# STRATIFIED SAMPLING WORKS
# CHECK UNIQUE VALUE IN NUM_CLASSES (unique) [0, 1, ...]
# COUNT NUM_SAMPLE IN EACH UNIQUE VALUE (counts) [500, 500, ...]
unique, counts = np.unique(validation_labels, return_counts=True)
x = dict(zip(unique, counts)) # CONVERT IT INTO DICTIONARY {label: counts}
out = pd.DataFrame(sorted(x.items(), key=lambda kv: kv[0])) #CREATE DATAFRAME FROM DICTIONARY (x)
out.drop(0, inplace = True, axis = 1) # DROP EXEEDS(INDEX) COLUMNS
out = out.apply(lambda x: 100 * x/sum(x)) # CONVERT COUNT TO PERCENTAGE OF EACH UNIQUE LABEL COUNT เช่น 0:49.89 1:50.10 (%)

total_labels = [y for x in labels for y in x] # LIST TO CONTAIN ALL LABELS OF DATA[0,0,0,1,0,0,1]
unique, counts = np.unique(total_labels, return_counts=True)
y=dict(zip(unique, counts)) #CONVERT INTO DICT {label: counts} (FOR ENTIRE DATASET NO SPLIT)
out2 = pd.DataFrame(sorted(y.items(), key=lambda kv: kv[0]))
out2.drop(0, inplace = True, axis = 1)
out2 = out2.apply(lambda x: 100 * x/sum(x))

print(out)
print(out2)
display(out2.join(out, lsuffix='VALIDATION_SET', rsuffix='ENTIRE_SET')[:5])
np.allclose(out.iloc[:,0].values, out2.iloc[:,0].values,  atol=0.01) 
# Returns True if two arrays are element-wise equal within a tolerance.
# ใช้ดูว่าการแบ่งสัดส่วนของ Class target ของ VALIDATION_SET เมื่อเทียบกับ ENTIRE_SET ใกล้เคียงกันไหม

           1
0  12.590799
1  12.590799
2  11.864407
3  12.590799
4  12.106538
5  12.348668
6  12.832930
7  13.075061
           1
0  12.509144
1  12.655450
2  11.923921
3  12.582297
4  12.143380
5  12.435991
6  12.728603
7  13.021214


Unnamed: 0,1VALIDATION_SET,1ENTIRE_SET
0,12.509144,12.590799
1,12.65545,12.590799
2,11.923921,11.864407
3,12.582297,12.590799
4,12.14338,12.106538


False

### TRAINING PROCESS
#### MODEL BUILDING

In [10]:
# check format, channel last, (x_train.shape[0], rows, cols, 1)
print(keras.backend.image_data_format())

channels_last


In [11]:
ROWS = 32
COLUMNS = 40
BATCH_SIZE = 32
EPOCHS = 100
LEARNING_RATE = 0.001
optimizer = keras.optimizers.Adam(learning_rate = LEARNING_RATE)
loss_fn = keras.losses.SparseCategoricalCrossentropy()
acc_metric = keras.metrics.SparseCategoricalAccuracy()

MODEL_PATH = "models"
if not os.path.exists(MODEL_PATH):
    os.makedirs(MODEL_PATH)
train_size = train_set.shape[0]
validation_size = validation_set.shape[0]
steps_per_epoch = train_size//BATCH_SIZE

checkpoint_filepath = os.path.join(MODEL_PATH, 
                                'model.{epoch:02d}-{val_sparse_categorical_accuracy:.2f}-{val_loss:.2f}.h5')

checkpoint_callback = keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath,
                                                    save_weights_only=False,
                                                    monitor='val_sparse_categorical_accuracy',
                                                    mode='max',
                                                    save_best_only=True,
                                                    verbose=1)

reduce_lr_callback = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2,
                                                    patience=3, min_lr=1e-5, vebose=1)
earlystop_callback = keras.callbacks.EarlyStopping(monitor="val_loss",
                                                min_delta=1e-3,
                                                patience=5,
                                                verbose=1)

In [12]:
tf.keras.backend.clear_session()
model = build_model(len(classes_map), (ROWS, COLUMNS, 1))
model.compile(optimizer=optimizer, loss=loss_fn, metrics=[acc_metric])   
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 32, 40, 32)        320       
                                                                 
 activation (Activation)     (None, 32, 40, 32)        0         
                                                                 
 max_pooling2d (MaxPooling2D  (None, 16, 20, 32)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 16, 20, 64)        18496     
                                                                 
 activation_1 (Activation)   (None, 16, 20, 64)        0         
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 8, 10, 64)        0         
 2D)                                                    

#### TRAINING PROCESS

In [13]:
# WANDB LOGING
# INITIALIZE WANDB PROJECT AND SPECIFY HYPERPARAMETER DATA
run = wandb.init(project='NLP_SpeechControlTH',entity="thmlbdshoichi") #entity = username wandb
wandb.config = {
    "learning_rate": LEARNING_RATE,
    "epochs": EPOCHS,
    "batch_size": BATCH_SIZE
}
config = wandb.config # CONFIGURE OF EXPERIMENT

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mthmlbdshoichi[0m (use `wandb login --relogin` to force relogin)


In [14]:
history = model.fit(train_datagen,
                    steps_per_epoch=steps_per_epoch,
                    epochs=EPOCHS,
                    validation_data=validation_datagen,
                    validation_steps=validation_size//BATCH_SIZE,
                    callbacks=[earlystop_callback, reduce_lr_callback, checkpoint_callback, WandbCallback()])

Epoch 1/100
Epoch 1: val_sparse_categorical_accuracy improved from -inf to 0.34375, saving model to models\model.01-0.34-1.91.h5
Epoch 2/100
Epoch 2: val_sparse_categorical_accuracy improved from 0.34375 to 0.57292, saving model to models\model.02-0.57-1.09.h5
Epoch 3/100
Epoch 3: val_sparse_categorical_accuracy improved from 0.57292 to 0.84635, saving model to models\model.03-0.85-0.47.h5
Epoch 4/100
Epoch 4: val_sparse_categorical_accuracy improved from 0.84635 to 0.92448, saving model to models\model.04-0.92-0.24.h5
Epoch 5/100
Epoch 5: val_sparse_categorical_accuracy improved from 0.92448 to 0.93490, saving model to models\model.05-0.93-0.20.h5
Epoch 6/100
Epoch 6: val_sparse_categorical_accuracy improved from 0.93490 to 0.96354, saving model to models\model.06-0.96-0.11.h5
Epoch 7/100
Epoch 7: val_sparse_categorical_accuracy improved from 0.96354 to 0.97656, saving model to models\model.07-0.98-0.08.h5
Epoch 8/100
Epoch 8: val_sparse_categorical_accuracy improved from 0.97656 to 0

In [15]:
# WANDB.log
run.finish()




0,1
epoch,▁▁▂▂▂▃▃▃▄▄▅▅▅▆▆▆▇▇▇██
loss,█▆▄▂▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
lr,███████████████▂▂▂▂▁▁
sparse_categorical_accuracy,▁▃▆▇▇████████████████
val_loss,█▅▃▂▂▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁
val_sparse_categorical_accuracy,▁▃▆▇▇████████████████

0,1
best_epoch,15.0
best_val_loss,0.0096
epoch,20.0
loss,0.03692
lr,4e-05
sparse_categorical_accuracy,0.98698
val_loss,0.01663
val_sparse_categorical_accuracy,0.9974


In [16]:
#CUT LAST LAYER
#new_model = model.layers[-1].output
model_new = keras.models.Model(inputs=model.inputs, outputs=model.layers[-2].output)
model_new.set_weights(model.get_weights())
optimizer = keras.optimizers.Adam(learning_rate = 0.01)
loss_fn = keras.losses.SparseCategoricalCrossentropy()
acc_metric = keras.metrics.SparseCategoricalAccuracy()
model_new.compile(optimizer=optimizer, loss=loss_fn, metrics=[acc_metric])
model_new.summary()
model_new.save('model_nsmv3x.h5')

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 32, 40, 1)]       0         
                                                                 
 conv2d (Conv2D)             (None, 32, 40, 32)        320       
                                                                 
 activation (Activation)     (None, 32, 40, 32)        0         
                                                                 
 max_pooling2d (MaxPooling2D  (None, 16, 20, 32)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 16, 20, 64)        18496     
                                                                 
 activation_1 (Activation)   (None, 16, 20, 64)        0         
                                                             

#### EVALUDATE THE MODEL PERFORMANCE

In [None]:
'''test_error, test_accuracy = model.evaluate(x_test, y_test)
print(f"LOSS ERROR: {test_error*100:.3f}% | ACCURACY: {test_accuracy*100:.3f}%")'''

In [None]:
class Keyword_Demo_v2:
    def __init__(self, model_path, text_labels, plot=False):
        if os.path.exists(model_path):
            self.model = keras.models.load_model(model_path)
            #self.model.summary()
        else:
            self.model = None
        if os.path.exists(text_labels):
            with open(text_labels, "r") as f:
                data = json.load(f)
            self.text_labels = [k for k in data.keys()]
        else:
            self.txt_labels = None
        self.plot = plot
    
    def predict(self, file_path):
        # EXTRACT MFCCs
        MFCCs = self.preprocess_data_test(file_path)
        #MFCCs = MFCCs[np.newaxis, ...]
        print(f"INPUT SHAPE: {MFCCs.shape}")
        # PREDICT -> OUTPUT PROBABILITY
        predictions = self.model.predict(MFCCs)
        predicted_index = np.argmax(predictions)
        predicted_conf = predictions[0][predicted_index]
        predicted_label = self.text_labels[predicted_index]
        if predicted_conf < 0.7:
            predicted_label = 'OTHER_KEYWORDS'
            print(f"Keyword Detected '{predicted_label}' | ORIGINAL: '{self.text_labels[predicted_index]}' NUM:'{predicted_index}' Confidence: {predicted_conf*100:.2f}%")
        else:
            print(f"Keyword Detected '{predicted_label}' | 'NUM:{predicted_index}' Confidence: {predicted_conf*100:.2f}%")
        return predicted_label, predicted_conf
    
    def preprocess_data_test(self, file_path, n_mfcc=40, n_fft=4096, hop_length=512, NUM_SAMPLES_TO_CONSIDER=16000):
        # LOAD AUDIO FILE
        signal, sr = librosa.load(file_path, sr=NUM_SAMPLES_TO_CONSIDER)
        signal = self.pad_audio(signal, sr)
        # EXTRACT MFCCs
        MFCCs = librosa.feature.mfcc(y=signal, n_mfcc=n_mfcc,
                                    hop_length=hop_length,
                                    n_fft=n_fft)
        
        # PLOT OR NOT
        if self.plot:
            librosa.display.specshow(MFCCs, sr=sr, hop_length=hop_length)
            plt.title(f"MFCCs Sample {file_path} (BEFORE MFCCs.T)")
            plt.xlabel("Time (sec)")
            plt.ylabel("MFCC")
            plt.colorbar()
            plt.show()
            
        MFCCs = MFCCs.T # MFCCs = np.moveaxis(MFCCs, 1, 0)
        scaler = StandardScaler()
        MFCCs_scaled = scaler.fit_transform(MFCCs)
        MFCCs_scaled = MFCCs_scaled.reshape(MFCCs_scaled.shape[0], MFCCs_scaled.shape[1], 1)
        MFCCs_scaled = MFCCs_scaled[np.newaxis, ...]
        # MFCCs Input Shape -> (NUM_SAMPLE x NUM_MFCC_COEFFICIENT x 1)
        return MFCCs_scaled #IF ERROR BRING MFCCs_scaled back to this line

    def pad_audio(self, signal, NUM_SAMPLES_TO_CONSIDER):
        if len(signal) >= NUM_SAMPLES_TO_CONSIDER:
            return signal[:NUM_SAMPLES_TO_CONSIDER]
        else:
            return np.pad(signal, pad_width=(NUM_SAMPLES_TO_CONSIDER - len(signal), 0), mode='constant', constant_values=(0, 0))

In [None]:
model_path = 'models_v2/model-best.h5'
model_pred = keras.models.load_model(model_path)

In [None]:
TEST_PATH = os.path.join("Data_Thai","test")
pred_demo = Keyword_Demo_v2(model_path, DATASET_JSON, plot=False)
file_test_list = [filenames for _,_,filenames in os.walk(TEST_PATH)][0]
for idx, file in enumerate(file_test_list):
    print(f"# {idx+1} {file}---------------------------------------------------")
    AUDIO_DATA_INPUT = os.path.join("Data_Thai/test/",file)
    keyword_result = pred_demo.predict(AUDIO_DATA_INPUT)


In [None]:
pred_demo2 = Keyword_Demo_v2(model_path, DATASET_JSON, plot=False)
for idx, (dirpath, dirnames, filenames) in enumerate(os.walk(DATASET_PATH)):
    print(filenames)

### REAL-TIME SPEECH COMMAND RECOGNITION (INFERENCE PROCESS)

### DEBUG - TEST PART

In [None]:
from vscode_audio import Audio
x,sr = librosa.load(train_audio_sample, sr = 16000)
x_augmented = random_shift(x)
x_augmented = random_speed_up(x_augmented)
x_augmented = random_change_pitch(x_augmented)
Audio(x_augmented, sr)