## **Imports**

In [None]:
from google.colab import files

import glob
import os
import librosa

import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from sklearn.model_selection import train_test_split
import numpy as np

## **Data Download Directory Setup**

In [None]:
# Download is large - this may take a while to run (assuming the data is not already available)

files.upload()

!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

!kaggle datasets download -d pythonafroz/electrical-motor-anomaly-detection-from-sound-data

Saving kaggle.json to kaggle.json
Dataset URL: https://www.kaggle.com/datasets/pythonafroz/electrical-motor-anomaly-detection-from-sound-data
License(s): CC-BY-NC-SA-4.0
Downloading electrical-motor-anomaly-detection-from-sound-data.zip to /content
100% 4.16G/4.16G [01:01<00:00, 59.2MB/s]
100% 4.16G/4.16G [01:01<00:00, 72.8MB/s]


In [None]:
# Extract Zip file contents

!unzip electrical-motor-anomaly-detection-from-sound-data.zip -d motor_data

## **MFCC Data Conversion**

In [None]:
# Start with aptly provided training data from set

wav_files_train = glob.glob("/content/motor_data/dev_data_fan/fan/train/*.wav", recursive=True)
print("Total training files:", len(wav_files_train))

wav_files_test_source = glob.glob("/content/motor_data/dev_data_fan/fan/source_test/*.wav", recursive=True)
print("Total testing files (source domain):", len(wav_files_test_source))

wav_files_test_target = glob.glob("/content/motor_data/dev_data_fan/fan/target_test/*.wav", recursive=True)
print("Total testing files (target domain):", len(wav_files_test_target))

"""
Converts a WAV file to an MFCC representation for easy usage within sequential
model training.
"""
def wav_to_mfcc(path, sr=16000, duration=2.0, n_mfcc=40):
    # Load audio file
    y, _ = librosa.load(path, sr=sr)

    # Fix length to 2 seconds
    samples = int(sr * duration)
    y = librosa.util.fix_length(data=y, size=samples)

    # Compute MFCC
    mfcc = librosa.feature.mfcc(
        y=y,
        sr=sr,
        n_mfcc=n_mfcc
    )

    # Add channel dimension (n_mfcc, time, 1)
    mfcc = mfcc[..., np.newaxis]

    return mfcc

# X training data
x_train = []
for f in wav_files_train:
    mfcc = wav_to_mfcc(f)
    x_train.append(mfcc)
x_train = np.array(x_train)
print("X shape:", x_train.shape)

# Y1 target domain test data
y1_test = []
for f in wav_files_test_target:
    mfcc = wav_to_mfcc(f)
    y1_test.append(mfcc)
y1_test = np.array(y1_test)
print("Y1 shape:", y1_test.shape)

# Y2 source domain test data
y2_test = []
for f in wav_files_test_source:
    mfcc = wav_to_mfcc(f)
    y2_test.append(mfcc)
y2_test = np.array(y2_test)
print("Y2 shape:", y2_test.shape)

Total training files: 3009
Total testing files (source domain): 600
Total testing files (target domain): 600


KeyboardInterrupt: 

## **Model Setup & Training**

In [None]:
def pad_mfcc(X):
    H, W = X.shape[1:3]
    pad_h = (4 - H % 4) % 4
    pad_w = (4 - W % 4) % 4
    return np.pad(X, ((0,0), (0,pad_h), (0,pad_w), (0,0)), mode='constant')

x_train = pad_mfcc(x_train)

X_train, X_val = train_test_split(
    x_train, test_size=0.2, shuffle=True, random_state=42
)

print("X_train shape:", X_train.shape)
print("X_val shape:", X_val.shape)

input_shape = X_train.shape[1:]

# small conv autoencoder
inp = layers.Input(shape=input_shape)
x = layers.Conv2D(16, (3,3), activation='relu', padding='same', strides=2)(inp)
x = layers.Conv2D(32, (3,3), activation='relu', padding='same', strides=2)(x)
x = layers.Conv2D(64, (3,3), activation='relu', padding='same')(x)

# bottleneck
x = layers.GlobalAveragePooling2D()(x)
bottleneck = layers.Dense(32, activation='relu')(x)

# decoder
x = layers.Dense((input_shape[0]//4)*(input_shape[1]//4)*64, activation='relu')(bottleneck)
x = layers.Reshape((input_shape[0]//4, input_shape[1]//4, 64))(x)
x = layers.Conv2DTranspose(32, (3,3), strides=2, padding='same', activation='relu')(x)
x = layers.Conv2DTranspose(16, (3,3), strides=2, padding='same', activation='relu')(x)
out = layers.Conv2D(1, (3,3), activation='linear', padding='same')(x)

autoencoder = models.Model(inputs=inp, outputs=out)
autoencoder.compile(optimizer='adam', loss='mse')

# train (only normal data)
autoencoder.fit(X_train, X_train,
                validation_data=(X_val, X_val),
                epochs=40, batch_size=16)

X_train shape: (2407, 40, 64, 1)
X_val shape: (602, 40, 64, 1)
Epoch 1/40
[1m151/151[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 76ms/step - loss: 708.8404 - val_loss: 38.8773
Epoch 2/40
[1m151/151[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19s[0m 70ms/step - loss: 36.8085 - val_loss: 32.4769
Epoch 3/40
[1m151/151[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 72ms/step - loss: 32.6942 - val_loss: 31.5425
Epoch 4/40
[1m151/151[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 73ms/step - loss: 32.2026 - val_loss: 31.2218
Epoch 5/40
[1m151/151[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 78ms/step - loss: 32.5461 - val_loss: 31.4273
Epoch 6/40
[1m151/151[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 85ms/step - loss: 32.4896 - val_loss: 31.0916
Epoch 7/40
[1m151/151[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 69ms/step - loss: 32.6038 - val_loss: 31.0821
Epoch 8/40
[1m151/151[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [

<keras.src.callbacks.history.History at 0x7d8c8bbadfd0>

In [None]:
# Get params, model summary

autoencoder.summary()

In [None]:
# Calculate the threshold source - very important for inference
# Always varies from device to device, this is just a quick test
# I would not use this threshold automatically for deployment, but it is a good starting point
recons = autoencoder.predict(X_val)
val_errors = np.mean((recons - X_val)**2, axis=(1,2,3))

# Use 95th percentile; above = anomalous, below = normal
threshold_source = np.percentile(val_errors, 95)

print(threshold_source)

[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 40ms/step
40.008354


## **Quantization / Compression**

In [None]:
model_path = "autoencoder_baseline.keras"
autoencoder.save(model_path)

# Convert baseline to INT8 quantized
converter = tf.lite.TFLiteConverter.from_keras_model(autoencoder)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

quant_model_path = "autoencoder_int8.tflite"
with open(quant_model_path, "wb") as f:
    f.write(tflite_model)

original_size = os.path.getsize(model_path) / 1e6
quant_size = os.path.getsize(quant_model_path) / 1e6

print(f"Original model size: {original_size:.2f} MB")
print(f"Quantized INT8 model size: {quant_size:.2f} MB")
print(f"Size reduction: {100*(original_size - quant_size)/original_size:.1f}%")

Saved artifact at '/tmp/tmpiimdfvoe'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 40, 64, 1), dtype=tf.float32, name='keras_tensor')
Output Type:
  TensorSpec(shape=(None, 40, 64, 1), dtype=tf.float32, name=None)
Captures:
  138042571985488: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138042571982224: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138042576845904: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138042576839376: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138042571993552: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138042576842832: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138042576841104: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138042576840336: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138042576844368: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138042576845328: TensorSpec(shape=(), dtype=tf.resource, name=None)
  13804257

## **F1 Score**

In [None]:
y1_true_labels = []
for f in wav_files_test_target:
    if 'anomaly' in f:
        y1_true_labels.append(1)
    else:
        y1_true_labels.append(0)
y1_true_labels = np.array(y1_true_labels)

y2_true_labels = []
for f in wav_files_test_source:
    if 'anomaly' in f:
        y2_true_labels.append(1)
    else:
        y2_true_labels.append(0)
y2_true_labels = np.array(y2_true_labels)

print("y1_true_labels shape:", y1_true_labels.shape)
print("y2_true_labels shape:", y2_true_labels.shape)

y1_true_labels shape: (600,)
y2_true_labels shape: (600,)


In [None]:
y1_recons = autoencoder.predict(y1_test)
y1_errors = np.mean((y1_recons - y1_test)**2, axis=(1,2,3))

y2_recons = autoencoder.predict(y2_test)
y2_errors = np.mean((y2_recons - y2_test)**2, axis=(1,2,3))

print("y1_errors shape:", y1_errors.shape)
print("y2_errors shape:", y2_errors.shape)

[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 98ms/step
[1m19/19[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 62ms/step
y1_errors shape: (600,)
y2_errors shape: (600,)


In [None]:
y1_pred_labels = (y1_errors > 9).astype(int)
y2_pred_labels = (y2_errors > 9).astype(int)

print("y1_pred_labels shape:", y1_pred_labels.shape)
print("y2_pred_labels shape:", y2_pred_labels.shape)

y1_pred_labels shape: (600,)
y2_pred_labels shape: (600,)


In [None]:
from sklearn.metrics import f1_score

f1_target = f1_score(y1_true_labels, y1_pred_labels)
f1_source = f1_score(y2_true_labels, y2_pred_labels)

average_f1 = (f1_target + f1_source) / 2

print(f"F1 Score for Target Domain: {f1_target:.4f}")
print(f"F1 Score for Source Domain: {f1_source:.4f}")
print(f"F1 Score across both domains: {average_f1:.4f}")

F1 Score for Target Domain: 0.6712
F1 Score for Source Domain: 0.6619
F1 Score across both domains: 0.6666
