<a href="https://colab.research.google.com/github/nehaprasanth918/acoustic-pest-detection/blob/main/training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import gc

# 🚀 Enable High-RAM Mode in Colab: Runtime → Change runtime type → High-RAM
# 🚀 Mount Google Drive if necessary
from google.colab import drive
drive.mount('/content/drive')

# Define paths to the dataset
combined_features_path = "/content/drive/My Drive/dataset/combined_mfcc_features.npy"
combined_labels_path = "/content/drive/My Drive/dataset/combined_mfcc_labels.npy"

# 🛠 Function to load dataset in batches (Prevents RAM crash)
def load_large_npy(file_path, batch_size=1000):
    data = np.load(file_path, allow_pickle=True)  # Load entire array
    for i in range(0, len(data), batch_size):
        yield data[i:i+batch_size]

# ✅ Load and Convert to Efficient Data Type
X = np.load(combined_features_path, allow_pickle=True).astype(np.float32)  # Convert to float32 to save memory
y = np.load(combined_labels_path, allow_pickle=True)

print("✅ Dataset loaded successfully!")
print(f"📌 Total samples: {len(X)}")
print(f"📊 MFCC Feature Shape: {X[0].shape}")  # Shape of one sample
print(f"📋 Unique Classes: {np.unique(y)}")

# 🔄 Step 1: Compute max_time_steps efficiently (Use 95th percentile to prevent excessive padding)
mfcc_lengths = [mfcc.shape[1] for mfcc in X]
max_time_steps = int(np.percentile(mfcc_lengths, 95))  # 95th percentile
print(f"🔹 Optimal max_time_steps: {max_time_steps}")

# 🚀 Step 2: Efficiently Pad MFCCs
def pad_mfcc(mfcc, target_length):
    pad_width = target_length - mfcc.shape[1]
    if pad_width > 0:
        return np.pad(mfcc, ((0, 0), (0, pad_width)), mode='constant')  # Pad only time axis
    return mfcc[:, :target_length]  # Trim if too long

X_padded = np.array([pad_mfcc(mfcc, max_time_steps) for mfcc in X], dtype=np.float32)

# 🚀 Step 3: Normalize MFCC values efficiently
X_normalized = (X_padded - np.mean(X_padded, axis=(0, 1))) / np.std(X_padded, axis=(0, 1))

# 🚀 Step 4: Encode labels as numbers
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

# 🚀 Step 5: Split dataset into training (80%) and testing (20%)
X_train, X_test, y_train, y_test = train_test_split(X_normalized, y_encoded, test_size=0.2, random_state=42)

# 🔄 Step 6: Reshape for Edge Impulse compatibility (Swap timesteps & features)
X_train = np.transpose(X_train, (0, 2, 1))  # Shape: (samples, time_steps, features)
X_test = np.transpose(X_test, (0, 2, 1))

# 🔥 Garbage Collection to Free Memory
gc.collect()

# ✅ Final Status
print("✅ Preprocessing completed!")
print(f"📌 Training samples: {X_train.shape[0]}, Testing samples: {X_test.shape[0]}")
print(f"🔢 Label encoding: {dict(enumerate(label_encoder.classes_))}")  # Show class mappings
print(f"✔ Final X_train shape: {X_train.shape}")  # Expected (samples, time_steps, features)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
✅ Dataset loaded successfully!
📌 Total samples: 62332
📊 MFCC Feature Shape: (13, 79)
📋 Unique Classes: ['bombus terrestris' 'bradysia difformis' 'coccilena septempunctata'
 'myzus persicae' 'nezara viridula' 'palomena prasina'
 'trialeurodes vaporariorum' 'tuta absoluta']
🔹 Optimal max_time_steps: 79
✅ Preprocessing completed!
📌 Training samples: 49865, Testing samples: 12467
🔢 Label encoding: {0: 'bombus terrestris', 1: 'bradysia difformis', 2: 'coccilena septempunctata', 3: 'myzus persicae', 4: 'nezara viridula', 5: 'palomena prasina', 6: 'trialeurodes vaporariorum', 7: 'tuta absoluta'}
✔ Final X_train shape: (49865, 79, 13)


In [None]:
# Step 1: Load and preprocess dataset (your existing code)
# Ensure MFCC shape is (samples, 79, 13)

# Step 2: Flatten MFCC features for Edge Impulse
X_train_flattened = X_train.reshape(X_train.shape[0], -1)
X_test_flattened = X_test.reshape(X_test.shape[0], -1)

print("✅ Flattened MFCC Shape for Edge Impulse:", X_train_flattened.shape)


✅ Flattened MFCC Shape for Edge Impulse: (49865, 1027)


In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import GRU, Dense, Dropout, BatchNormalization, Bidirectional, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

# ✅ Improved GRU Model
model = Sequential([
    Input(shape=(79, 13)),  # Shape based on MFCCs

    # 🔥 Bidirectional GRUs
    Bidirectional(GRU(256, return_sequences=True, recurrent_dropout=0.2)),
    BatchNormalization(),
    Dropout(0.3),

    Bidirectional(GRU(256, return_sequences=True, recurrent_dropout=0.2)),
    BatchNormalization(),
    Dropout(0.4),

    Bidirectional(GRU(128, return_sequences=True, recurrent_dropout=0.2)),
    BatchNormalization(),
    Dropout(0.4),

    GRU(64),  # Last GRU layer
    BatchNormalization(),
    Dropout(0.5),

    # 🔥 Swish activation for better gradient flow
    Dense(128, activation='swish', kernel_regularizer=l2(0.0005)),
    Dropout(0.5),
    Dense(64, activation='swish', kernel_regularizer=l2(0.0005)),

    Dense(len(label_encoder.classes_), activation='softmax')  # Output layer
])

# ✅ Lower Learning Rate with One-Cycle Policy
optimizer = Adam(learning_rate=0.0003)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# ✅ Callbacks
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=0.00001)

# ✅ Save the best model automatically
checkpoint = ModelCheckpoint("/content/drive/MyDrive/best_model.keras",
                             monitor="val_accuracy",
                             save_best_only=True,
                             mode="max")


# ✅ Train the Model (Auto-Save)
history = model.fit(
    X_train, y_train,
    epochs=70,
    batch_size=128,
    validation_data=(X_test, y_test),
    callbacks=[early_stop, lr_scheduler, checkpoint]  # Auto-save enabled!
)

print("✅ Model training completed and best model saved!")

# 📊 Training Visualization
def plot_training_results(history):
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Train Loss', color='blue')
    plt.plot(history.history['val_loss'], label='Val Loss', color='red')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training & Validation Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Train Accuracy', color='blue')
    plt.plot(history.history['val_accuracy'], label='Val Accuracy', color='red')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Training & Validation Accuracy')
    plt.legend()

    plt.show()

# 📊 Plot the results
plot_training_results(history)


Epoch 1/70
[1m390/390[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m500s[0m 1s/step - accuracy: 0.2495 - loss: 2.0486 - val_accuracy: 0.1591 - val_loss: 3.3897 - learning_rate: 3.0000e-04
Epoch 2/70
[1m390/390[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m489s[0m 1s/step - accuracy: 0.4253 - loss: 1.5556 - val_accuracy: 0.2817 - val_loss: 2.0525 - learning_rate: 3.0000e-04
Epoch 3/70
[1m390/390[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m502s[0m 1s/step - accuracy: 0.5281 - loss: 1.2863 - val_accuracy: 0.3463 - val_loss: 2.0933 - learning_rate: 3.0000e-04
Epoch 4/70
[1m390/390[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m502s[0m 1s/step - accuracy: 0.5814 - loss: 1.1093 - val_accuracy: 0.5120 - val_loss: 1.3571 - learning_rate: 3.0000e-04
Epoch 5/70
[1m390/390[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m476s[0m 1s/step - accuracy: 0.6193 - loss: 1.0165 - val_accuracy: 0.6376 - val_loss: 0.9675 - learning_rate: 3.0000e-04
Epoch 6/70
[1m390/390[0m [32m━━━━━━━━━━━━━