## Step 1: Imports

In [1]:
import os
import json
import numpy as np
import librosa
import tensorflow as tf
from tqdm import tqdm
import matplotlib.pyplot as plt

## Step 2: Paths and Global Configuration

In [2]:
# Paths
TEST_AUDIO_ROOT = r"E:\InstruNet-AI\data\polyphonic_test_data"
MODEL_PATH = r"E:\InstruNet-AI\saved_models\best_l2_regularized_model.h5"
OUTPUT_JSON_DIR = r"E:\InstruNet-AI\notebooks\reports\Task 16"
OUTPUT_PLOT_DIR = r"E:\InstruNet-AI\notebooks\reports\Task 16"

TARGET_SR = 16000
WINDOW_SEC = 3.0
HOP_SEC = 1.5
N_MELS = 128
TARGET_FRAMES = 126
EPS = 1e-8

## Step 3: Class Metadata

In [3]:
class_names = [
    "cel", "cla", "flu", "gac", "gel",
    "org", "pia", "sax", "tru", "vio", "voi"
]

class_to_id = {c: i for i, c in enumerate(class_names)}

## Step 4: Model Load

In [4]:
model = tf.keras.models.load_model(MODEL_PATH)
model.summary()



## Step 5: Preprocessing & Segmentation

In [5]:
def stereo_to_mono(audio):
    if audio.ndim == 1:
        return audio
    return np.mean(audio, axis=0)

def peak_normalize(audio):
    peak = np.max(np.abs(audio))
    return audio / peak if peak > 0 else audio

def trim_silence(audio, thresh=0.02):
    idx = np.where(np.abs(audio) > thresh)[0]
    if len(idx) == 0:
        return audio
    return audio[idx[0]: idx[-1]]

def fix_duration(audio, sr=TARGET_SR, duration=WINDOW_SEC):
    target_len = int(sr * duration)
    if len(audio) > target_len:
        return audio[:target_len]
    else:
        return np.pad(audio, (0, target_len - len(audio)), mode="constant")

def generate_log_mel(audio, sr=TARGET_SR):
    mel = librosa.feature.melspectrogram(
        y=audio,
        sr=sr,
        n_fft=2048,
        hop_length=512,
        win_length=2048,
        n_mels=N_MELS,
        power=2.0
    )
    mel_db = librosa.power_to_db(mel, ref=np.max)
    mel_db = (mel_db - mel_db.mean()) / (mel_db.std() + EPS)
    return mel_db

def fix_mel_frames(mel):
    if mel.shape[1] < TARGET_FRAMES:
        mel = np.pad(mel, ((0,0),(0, TARGET_FRAMES - mel.shape[1])))
    return mel[:, :TARGET_FRAMES]

def extract_features(y):
    y = stereo_to_mono(y)
    y = peak_normalize(y)
    y = trim_silence(y)
    y = fix_duration(y)
    mel = generate_log_mel(y)
    return fix_mel_frames(mel)

def sliding_windows(y):
    win_len = int(TARGET_SR * WINDOW_SEC)
    hop_len = int(TARGET_SR * HOP_SEC)
    for start in range(0, len(y) - win_len + 1, hop_len):
        yield y[start:start + win_len]

## Step 6: Segment Predictions, Aggregation and Smoothing

### (a) Raw Segment-Level Predictions

In [6]:
def get_segment_predictions(audio_path):
    y, _ = librosa.load(audio_path, sr=TARGET_SR, mono=False)
    y = stereo_to_mono(y)

    segment_probs = []
    segment_times = []

    hop_len = int(TARGET_SR * HOP_SEC)

    for idx, window in enumerate(sliding_windows(y)):
        mel = extract_features(window)
        mel = mel[np.newaxis, ..., np.newaxis]
        probs = model.predict(mel, verbose=0)[0]

        segment_probs.append(probs)
        segment_times.append(idx * HOP_SEC)

    return np.array(segment_probs), np.array(segment_times)

### (b) Average Aggregation (Temporal Stabilization)

In [7]:
def average_aggregation(segment_probs):
    return segment_probs  # timeline preserved (no collapse)

### (c) Optional Temporal Smoothing

In [8]:
def moving_average_smoothing(x, window=3):
    if window <= 1 or x.shape[0] < window:
        return x

    smoothed = np.zeros_like(x)

    kernel = np.ones(window) / window

    for i in range(x.shape[1]):
        conv = np.convolve(x[:, i], kernel, mode="same")
        smoothed[:, i] = conv[:x.shape[0]]  # FORCE length match

    return smoothed

## Step 7: Serialize to JSON (WITH METADATA)

In [9]:
def save_intensity_json(
    wav_name,
    times,
    intensities,
    aggregation="average",
    smoothing="moving_average",
    smooth_window=3,
    threshold=0.25
):
    data = {
        "audio_file": wav_name,
        "segment_duration_sec": WINDOW_SEC,
        "hop_duration_sec": HOP_SEC,
        "aggregation": aggregation,
        "smoothing": {
            "method": smoothing,
            "window": smooth_window
        },
        "threshold": threshold,
        "classes": class_names,
        "timeline": []
    }

    for t, vals in zip(times, intensities):
        data["timeline"].append({
            "time_sec": float(t),
            "intensity": {
                cls: float(vals[i])
                for i, cls in enumerate(class_names)
            }
        })

    out_path = os.path.join(
        OUTPUT_JSON_DIR,
        wav_name.replace(".wav", "_intensity.json")
    )

    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2)

    return out_path

## Step 8: Render Intensity Graphs

In [10]:
def plot_intensity(times, intensities, wav_name, instruments=("pia", "gac"), threshold=0.25):
    plt.figure(figsize=(12, 4))

    for inst in instruments:
        idx = class_names.index(inst)
        plt.plot(times, intensities[:, idx], label=inst)

    plt.axhline(threshold, linestyle="--", color="red", alpha=0.5)
    plt.xlabel("Time (seconds)")
    plt.ylabel("Intensity / Confidence")
    plt.title(f"Instrument Intensity Timeline — {wav_name}")
    plt.legend()
    plt.tight_layout()

    out_path = os.path.join(
        OUTPUT_PLOT_DIR,
        wav_name.replace(".wav", "_intensity.png")
    )

    plt.savefig(out_path)
    plt.close()

    return out_path

## Step 9: End-to-End Intensity Graphs Generation

In [11]:
test_files = [f for f in os.listdir(TEST_AUDIO_ROOT) if f.endswith(".wav")]
print(f"Found {len(test_files)} test files.")

for wav in tqdm(test_files, desc="Generating intensity graphs"):
    audio_path = os.path.join(TEST_AUDIO_ROOT, wav)

    # Step 1
    seg_probs, times = get_segment_predictions(audio_path)

    # Step 2
    aggregated = average_aggregation(seg_probs)

    # Step 3
    smoothed = moving_average_smoothing(aggregated, window=3)

    # Step 4
    json_path = save_intensity_json(
        wav, times, smoothed,
        aggregation="average",
        smoothing="moving_average",
        smooth_window=3
    )

    # Step 5 (example: piano & guitar)
    plot_intensity(times, smoothed, wav, instruments=("pia", "gac"))

print("Task 16 completed: JSON + intensity graphs generated.")

Found 1573 test files.


Generating intensity graphs: 100%|███████████████████████████████████████████████| 1573/1573 [1:29:47<00:00,  3.42s/it]

Task 16 completed: JSON + intensity graphs generated.



