# Setup

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import sys
import datetime
import numpy as np
import tensorflow as tf
from tensorflow.keras.regularizers import l2
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, BatchNormalization, \
    Add, Input, Conv1D, MaxPooling1D, Flatten, \
    Lambda

from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.models import load_model
import tensorflow.keras.backend as K

from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from collections import Counter
from sklearn.metrics import classification_report

from tqdm.auto import tqdm

In [None]:
os.chdir("/content/drive/MyDrive/Work/Justin Gatau Maules/AlarmWaterClassification")

# Utilies

In [None]:
def print_M(conf_M, class_names):
    s = "activity," + ",".join(class_names)
    print(s)
    for i, row in enumerate(conf_M):
        print(class_names[i] + "," + ",".join(map(str, row)))

def print_M_P(conf_M, class_names):
    s = "activity," + ",".join(class_names)
    print(s)
    for i, row in enumerate(conf_M):
        total = sum(row)
        percentages = [str(round(val / total, 2)) if total > 0 else '0' for val in row]
        print(class_names[i] + "," + ",".join(percentages))

def showResult(result, y_test, class_names):
    predictions = [np.argmax(y) for y in result]
    expected = [np.argmax(y) for y in y_test]

    num_labels = y_test.shape[1]
    conf_M = [[0] * num_labels for _ in range(num_labels)]

    for e, p in zip(expected, predictions):
        conf_M[e][p] += 1

    print_M(conf_M, class_names)
    print_M_P(conf_M, class_names)

def load_weight(path):
    model = load_model(path)
    print(model.summary())
    return model

# Model

In [None]:
def build_improved_model(input_shape, num_labels):
    model = tf.keras.models.Sequential([
        Input(shape=(input_shape, 1)),
        Conv1D(32, 6, activation='relu'),
        MaxPooling1D(pool_size=(3)),
        Conv1D(16, 3, activation='relu'),
        MaxPooling1D(pool_size=(3)),
        Flatten(),
        Dense(64, activation='relu'),
        Dense(32, activation='relu'),
        Dense(18, activation='relu'),
        Dropout(0.5),
        Dense(num_labels, activation='softmax')
    ])

    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    return model

# Data

## Data loading

In [None]:
model_weight_out = os.path.join('weights', 'exp_model_16k_1.3.weights.h5')

# if os.path.exists(model_weight_out):
#     sys.exit(f"The same file name exists already: {model_weight_out}")

############### Loading the datasets #####################

print('\nLoading the data\n')

featuresPath = "data/features/"

class_names = np.load(os.path.join(featuresPath, 'class_names.npy'))

X_train = np.load(os.path.join(featuresPath, 'X_train.npy'))
y_train = np.load(os.path.join(featuresPath, 'y_train.npy'))

X_val = np.load(os.path.join(featuresPath, 'X_val.npy'))
y_val = np.load(os.path.join(featuresPath, 'y_val.npy'))

X_test = np.load(os.path.join(featuresPath, 'X_test.npy'))
y_test = np.load(os.path.join(featuresPath, 'y_test.npy'))

num_labels = y_train.shape[1]


Loading the data



## Data Balancing

In [None]:
print("\nBalancing the data\n")

print("Train Class distribution before balancing:", Counter(np.argmax(y_train, axis=1)))

# Upsampling using SMOTE
smote = SMOTE(sampling_strategy={2: 9000})
oversampled_features, oversampled_labels = smote.fit_resample(X_train, y_train)

# Downsampling using RandomUnderSampler
undersampler = RandomUnderSampler(sampling_strategy={0: 7300, 1: 7300})
undersampled_features, undersampled_labels = undersampler.fit_resample(
    oversampled_features, oversampled_labels)

print("Train Class distribution after balancing:", Counter(
    np.argmax(undersampled_labels, axis=1)))

X_train = undersampled_features
y_train = undersampled_labels


Balancing the data

Train Class distribution before balancing: Counter({1: 7377, 0: 7334, 2: 834})




Train Class distribution after balancing: Counter({2: 9000, 0: 7300, 1: 7300})


# Training

In [None]:
###################### Training the model ###########################3
print("\nTraining the model\n")

model = build_improved_model(X_train.shape[1], num_labels)
# model.summary()

def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return float(lr * tf.math.exp(-0.1))

callback = LearningRateScheduler(scheduler)

model.fit(X_train, y_train, batch_size=32, epochs=30,
          validation_data=(X_val, y_val),
          callbacks=[callback])


Training the model

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.src.callbacks.History at 0x7fcf8f003dc0>

# Testing

In [None]:
print("\nTesting the model\n")

result = model.predict(X_test)

cnt, cnt_alarm, cnt_other, cnt_water = 0, 0, 0, 0
alarm_num, other_num, water_num = (sum(np.argmax(y_test, axis=1) == 0),
                                    sum(np.argmax(y_test, axis=1) == 1),
                                    sum(np.argmax(y_test, axis=1) == 2))

for i in range(len(y_test)):
    pred = np.argmax(result[i])
    if np.argmax(y_test[i]) == pred:
        cnt += 1
        if pred == 0:
            cnt_alarm += 1
        elif pred == 1:
            cnt_other += 1
        else:
            cnt_water += 1

acc = round(cnt * 100 / len(y_test), 2)
acc_alarm = round(cnt_alarm * 100 / alarm_num, 2)
acc_other = round(cnt_other * 100 / other_num, 2)
acc_water = round(cnt_water * 100 / water_num, 2)

print(
    f"Total Accuracy: {acc}%, Alarm Accuracy: {acc_alarm}%, Others Accuracy: {acc_other}%, Water Accuracy: {acc_water}%")

showResult(result, y_test, class_names)

print("\n")
print(classification_report(
    np.argmax(y_test, axis=1),
    np.argmax(result, axis=1),
    target_names=list(class_names)
))


Testing the model

Total Accuracy: 89.16%, Alarm Accuracy: 88.99%, Others Accuracy: 91.86%, Water Accuracy: 65.96%
activity,Alarm,Water,Other
Alarm,380,46,1
Water,28,395,7
Other,6,10,31
activity,Alarm,Water,Other
Alarm,0.89,0.11,0.0
Water,0.07,0.92,0.02
Other,0.13,0.21,0.66


              precision    recall  f1-score   support

       Alarm       0.92      0.89      0.90       427
       Water       0.88      0.92      0.90       430
       Other       0.79      0.66      0.72        47

    accuracy                           0.89       904
   macro avg       0.86      0.82      0.84       904
weighted avg       0.89      0.89      0.89       904



# Saving

In [None]:
if not os.path.exists("Models"):
    os.makedirs("Models")
path = os.path.join("Models", f"audio_NN_New{datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}_acc_{acc}")
model_json = model.to_json()
with open(f"{path}.json", "w") as json_file:
    json_file.write(model_json)
model.save_weights(f"{path}.weights.h5")

if not os.path.exists("weights"):
    os.makedirs("weights")
# model.save(model_weight_out, overwrite=True, include_optimizer=False)

model.save('weights/89%_model_16k_1.3.weights.h5', overwrite=True, include_optimizer=False)

  saving_api.save_model(


In [None]:
loaded_model = tf.keras.models.load_model('weights/89%_model_16k_1.3.weights.h5')