In [5]:
import numpy as np
import librosa
import dtw
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Bidirectional, LSTM, Dense, Dropout
from mealpy.swarm_based.PSO import OriginalPSO
from mealpy.utils.problem import Problem
from mealpy.utils.space import FloatVar, IntegerVar
import os
import librosa.display
from scipy.signal import butter, lfilter
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Define dataset paths
BASE_DIR = "C:\\Users\\naikg\\keyword-spotting\\data\\google_speech_recognition_v2"
FILE_LIST_PATH = os.path.join(BASE_DIR, "testing_list.txt")

# Butterworth low-pass filter to reduce high-frequency noise
def butter_lowpass_filter(data, cutoff=4000, sr=16000, order=5):
    nyquist = 0.5 * sr
    normal_cutoff = cutoff / nyquist
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return lfilter(b, a, data)

# Read file paths and labels
def read_file_paths_and_labels(file_list_path, base_directory):
    file_paths, labels = [], []
    
    with open(file_list_path, 'r') as file:
        for line in file:
            line = line.strip()
            if line:
                label, filename = line.split('/', 1)
                full_path = os.path.join(base_directory, line)
                file_paths.append(full_path)
                labels.append(label)
    
    return file_paths, labels

# Extract MFCC with noise reduction & pre-emphasis
def extract_mfcc(file_path, sr=16000, n_mfcc=40):
    try:
        y, sr = librosa.load(file_path, sr=sr)
        y = butter_lowpass_filter(y, cutoff=4000, sr=sr)
        y = librosa.effects.preemphasis(y)
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
        return mfcc.T  
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None

# Load and process dataset
file_paths, labels = read_file_paths_and_labels(FILE_LIST_PATH, BASE_DIR)
X_train = [extract_mfcc(fp) for fp in file_paths if extract_mfcc(fp) is not None]
y_train = labels

# Step 1: DTW-based initial keyword detection using sliding window
def dtw_filter(query_audio, target_audio, threshold=300, window_size=10, step_size=50):
    query_mfcc = extract_mfcc(query_audio)
    target_mfcc = extract_mfcc(target_audio)
    
    if query_mfcc is None or target_mfcc is None:
        return False
    
    for start in range(0, len(target_mfcc) - window_size, step_size):
        segment = target_mfcc[start:start + window_size]
        dtw_distance, _, _, _ = dtw.dtw(query_mfcc, segment, dist=lambda x, y: np.linalg.norm(x - y, ord=1))
        if dtw_distance < threshold:
            return True
    
    return False  

# Step 2: BiLSTM model with PSO-optimized hyperparameters
def create_bilstm_model(input_shape, lr, dropout):
    model = Sequential([
        Bidirectional(LSTM(128, return_sequences=True), input_shape=input_shape),
        Bidirectional(LSTM(64)),
        Dropout(dropout),
        Dense(32, activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    model.compile(loss='binary_crossentropy', optimizer=tf.keras.optimizers.Adam(lr), metrics=['accuracy'])
    return model

# PSO Objective Function
def pso_objective(solution):
    lr, dropout, epochs = solution
    model = create_bilstm_model((100, 40), lr, dropout)
    history = model.fit(np.array(X_train), np.array(y_train), epochs=int(epochs), batch_size=32, verbose=0)
    return history.history['loss'][-1]  

# Optimize with PSO
def optimize_hyperparameters():
    problem = Problem(
        obj_func=pso_objective,
        bounds=[FloatVar(0.0001, 0.01), FloatVar(0.1, 0.5), IntegerVar(5, 50)],
        minmax="min"
    )
    pso = OriginalPSO(epoch=20, pop_size=10)
    best_solution = pso.solve(problem)
    return best_solution.solution  

# Train BiLSTM with optimized parameters
def train_bilstm_model():
    best_lr, best_dropout, best_epochs = optimize_hyperparameters()
    model = create_bilstm_model((100, 40), best_lr, best_dropout)
    model.fit(np.array(X_train), np.array(y_train), epochs=int(best_epochs), batch_size=32)
    return model

# Final QbE detection using BiLSTM
def qbe_bilstm_detect(model, audio_file):
    mfcc = extract_mfcc(audio_file)
    mfcc = np.expand_dims(mfcc, axis=0)  
    prediction = model.predict(mfcc)
    return prediction > 0.5  

if dtw_filter("cat.wav", "keyword_new_spotting_script_2000.wav"):
    print("DTW: Possible keyword detected! Refining with BiLSTM...")
    bilstm_model = train_bilstm_model()
    found = qbe_bilstm_detect(bilstm_model, "target_audio.wav")
    print("Final Detection:", "✅ Keyword Found!" if found else "❌ No Keyword")
else:
    print("DTW: No keyword detected.")


DTW: No keyword detected.


In [None]:
import os
import pandas as pd

# Test audio file
test_audio_path = "keyword_new_spotting_script_5sec.wav"
test_mfcc = extract_mfcc(test_audio_path)  # Extract MFCC for test audio

# CSV filename
csv_filename = "keyword_detection_5sec.csv"

# Store results
results = []
missing_keywords = []

# Loop over each keyword
for keyword in keywords:
    keyword_audio_path = f"C:/Users/naikg/Ltsmkeyword/keyword_samples_wav/{keyword}.wav"

    if os.path.exists(keyword_audio_path):
        print(f"🔍 Processing '{keyword}'...")
        keyword_mfcc = extract_mfcc(keyword_audio_path)
        print(keyword_mfcc)# Extract MFCC for keyword
        result = detect_keywords_dtw(test_mfcc, keyword_mfcc, keyword)  # Detect occurrences
        results.append(result)
    else:
        missing_keywords.append(keyword)

# Save results to CSV
if results:
    df = pd.DataFrame(results)
    df.to_csv(csv_filename, index=False, mode="w")  # Overwrites the CSV for fresh results
    print(f"\n Detection results saved to '{csv_filename}'")

# Report missing keywords
if missing_keywords:
    print("\n Missing keyword files:")
    print(", ".join(missing_keywords))
