<a href="https://colab.research.google.com/github/piyush-c38/Gunshot_Direction_Estimation/blob/main/IDP_Gunshot_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Complete the workflow pipeline

In [None]:
import sounddevice as sd
import numpy as np
import scipy.signal as signal
import matplotlib.pyplot as plt
import soundfile as sf
import librosa
import librosa.display
import tensorflow as tf
import tensorflow_hub as hub
import joblib
import os
import noisereduce as nr
from multiprocessing import Process, Manager
from tensorflow.keras.models import load_model

# === Constants ===
DURATION = 5  # seconds
FS = 44100
LOWCUT = 3000
HIGHCUT = 15000
GAIN_DB = 6
MIC_INDICES = [5, 6, 7]
MIC_LABELS = ["mic1", "mic2", "mic3"]
TEST_ANGLE = "test"

# === Model Paths ===
LABEL_ENCODER_PATH = "./trained_models/label_encoder_510.pkl"
MODEL_1_PATH = "./trained_models/laptop_vggish_trained_model_510data.keras"
MODEL_2_PATH = './trained_models/gunshot_direction_model_v2.pkl'

# === Load ML Model 1 ===
model_1 = tf.keras.models.load_model(MODEL_1_PATH)
label_encoder = joblib.load(LABEL_ENCODER_PATH)
vggish_model = hub.load('https://tfhub.dev/google/vggish/1')

# === Load ML Model 2 ===
model_2, model_2_features = joblib.load(MODEL_2_PATH)

# === VGGish Embedding Extractor ===
def extract_vggish_embeddings(audio, sr=16000):
    if len(audio) < int(sr * 0.96):
        raise ValueError("Audio too short for VGGish.")
    desired_len = sr * 10
    if len(audio) < desired_len:
        audio = np.pad(audio, (0, desired_len - len(audio)), mode='constant')
    else:
        audio = audio[:desired_len]
    audio_tensor = tf.convert_to_tensor(audio, dtype=tf.float32)
    embeddings = vggish_model(audio_tensor)
    return np.mean(embeddings.numpy(), axis=0)

# === Predict Gunshot / Not ===
def predict_audio_array(audio, sr):
    audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)
    embedding = extract_vggish_embeddings(audio)
    embedding = np.expand_dims(embedding, axis=0)
    prediction = model_1.predict(embedding)
    predicted_index = np.argmax(prediction)
    predicted_label = label_encoder.inverse_transform([predicted_index])[0]
    return predicted_label

# === Predict Direction ===
def predict_direction(m1, m2, m3):
    diff12 = m1 - m2
    diff13 = m1 - m3
    diff23 = m2 - m3
    total = m1 + m2 + m3
    norm1, norm2, norm3 = m1 / total, m2 / total, m3 / total
    peak_intensity = max(m1, m2, m3)
    features = np.array([[m1, m2, m3, diff12, diff13, diff23, norm1, norm2, norm3, peak_intensity]])
    return model_2.predict(features)[0]

# === Process Each Mic Audio ===
# === Process Each Mic Audio ===
def process_audio(audio, fs, label, results_dict, test_number):
    # Save raw audio
    raw_audio = audio.copy()

    # Bandpass Filter
    order = 4
    nyq = 0.5 * fs
    low = LOWCUT / nyq
    high = HIGHCUT / nyq
    b, a = signal.butter(order, [low, high], btype='band')
    filtered_audio = signal.filtfilt(b, a, audio)

    # Noise Reduction
    denoised_audio = nr.reduce_noise(y=filtered_audio, sr=fs)

    # Save raw and filtered audios
    sf.write(f"{TEST_ANGLE}_{test_number}_{label}_raw.wav", raw_audio, fs)
    sf.write(f"{TEST_ANGLE}_{test_number}_{label}_filtered.wav", denoised_audio, fs)

    # Find peak from filtered audio
    t = np.linspace(0, len(audio)/fs, len(audio))
    pos_idx = np.where(denoised_audio > 0)[0]
    if len(pos_idx) > 0:
        max_index = np.argmax(denoised_audio[pos_idx])
        peak_val = denoised_audio[pos_idx[max_index]]
        peak_time = t[pos_idx[max_index]]
        print(f"🔍 {label.upper()} Peak: {peak_val:.4f} at {peak_time:.4f}s")
    else:
        peak_val = 0
        print(f"⚠️ {label.upper()} No positive peak detected.")
        peak_time = None

    # Save both audios and peak to results dict
    results_dict[label] = {
        "raw_audio": raw_audio,          # For gunshot classification
        "filtered_audio": denoised_audio, # For peak detection
        "sr": fs,
        "peak": peak_val
    }

# === Record Audio from Mic ===
def record_mic(index, label, results_dict, test_number):
    print(f"🎙️ Recording {label}...")
    audio = sd.rec(int(DURATION * FS), samplerate=FS, channels=1, dtype='float32', device=index)
    sd.wait()
    audio = audio.flatten()
    process_audio(audio, FS, label, results_dict, test_number)

#=== Plot Radar ===
def plot_gunshot_direction(angle):

    if(angle == 0 or angle == 360):
        start_angle = -30
        end_angle = 30
    elif(angle == 60):
        start_angle = 270
        end_angle = 330
    elif(angle == 120):
        start_angle = 210
        end_angle = 270
    elif(angle == 180):
        start_angle = 150
        end_angle = 210
    elif(angle == 240):
        start_angle = 90
        end_angle = 150
    elif(angle == 300):
        start_angle = 30
        end_angle = 90

    fig, ax = plt.subplots(figsize=(6, 6), subplot_kw={'projection': 'polar'})
    fig.patch.set_facecolor('black')  # Background of figure

    # Radar settings
    ax.set_facecolor('darkgreen')    # Radar (circle) background
    ax.set_theta_zero_location('N')  # 0 degrees at the top
    ax.set_theta_direction(-1)       # Clockwise
    ax.set_rticks([])                # Remove radial ticks
    ax.set_xticks(np.deg2rad(np.arange(0, 360, 60)))  # Add gridlines at every 45°
    ax.grid(color='lightgreen', linestyle='--', linewidth=0.7)

    # Plot detected sector
    theta = np.linspace(np.deg2rad(start_angle), np.deg2rad(end_angle), 100)
    r = np.ones_like(theta)
    ax.fill_between(theta, 0, r, color='lime', alpha=0.6)  # Highlighted detected region

    # Circle border
    circle = plt.Circle((0, 0), 1, transform=ax.transData._b, color='lightgreen', fill=False, linewidth=2)
    ax.add_artist(circle)
    plt.show()

# === Main Pipeline ===
def main():
    test_number = input("Enter test number: ")

    print("🎬 Starting 3-mic recording session...")
    with Manager() as manager:
        results = manager.dict()
        processes = []

        for idx, label in zip(MIC_INDICES, MIC_LABELS):
            p = Process(target=record_mic, args=(idx, label, results, test_number))
            p.start()
            processes.append(p)
        for p in processes:
            p.join()

        # Extract peak values and audios
        mic_peaks = {label: results[label]["peak"] for label in MIC_LABELS}
        mic_raw_audio = {label: results[label]["raw_audio"] for label in MIC_LABELS}
        mic_sr = results[MIC_LABELS[0]]["sr"]

        # Find mic with max peak
        max_mic_label = max(mic_peaks, key=mic_peaks.get)
        selected_raw_audio = mic_raw_audio[max_mic_label]

        print(f"\n📌 Highest peak from: {max_mic_label.upper()}")

        # Predict gunshot using raw audio
        predicted_label = predict_audio_array(selected_raw_audio, mic_sr)

        if predicted_label.lower() == "gunshot":
            print("✅ Gunshot Detected!")
            # Predict direction using filtered peaks
            direction = predict_direction(mic_peaks["mic1"], mic_peaks["mic2"], mic_peaks["mic3"])
            print(f"🎯 Predicted Direction: {direction}°")
            plot_gunshot_direction(direction)
        else:
            print("❌ Not a gunshot.")

if __name__ == "__main__":
    main()