# Visual Sound Detection Model

This notebook builds a computer vision classifier that learns from images located in the `images` folder. Each image file name is interpreted as its label. The pipeline covers environment setup, data ingestion, preprocessing, model training, evaluation, and an optional realtime webcam inference demo.

> **Note:** Add your images to `images/` before running the training steps. The notebook assumes a standard RGB image format (e.g., `.jpg`, `.png`).


In [None]:
import os
import sys
import math
import json
from pathlib import Path
from typing import Callable, Dict, List, Tuple

import numpy as np
import pandas as pd
from PIL import Image

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

import matplotlib.pyplot as plt
import seaborn as sns

# Optional: used later for webcam capture
try:
    import cv2
except ImportError:
    cv2 = None
    print("OpenCV (cv2) not found. Webcam inference cell will guide you through installing it.")

# Configure paths
PROJECT_ROOT = Path("/Users/raoabdul/Documents/Development/NextGen-Innovators")
IMAGE_DIR = PROJECT_ROOT / "images"
ARTIFACT_DIR = PROJECT_ROOT / "artifacts"
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)

# Basic reproducibility
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

print(f"TensorFlow version: {tf.__version__}")
print(f"Looking for images in: {IMAGE_DIR}")


## 1. Dataset Discovery & Label Parsing

This section scans the `images` directory, extracts labels from file names, and builds a dataframe describing the dataset. Customize `extract_label_from_name` if you prefer a different naming convention.


In [None]:
def extract_label_from_name(filename: str) -> str:
    """Derive a label from the image filename (without the extension).

    Strategy:
      1. Remove the file extension.
      2. Split on underscores, dashes, or spaces.
      3. Use the first token as the class name.

    Adjust this heuristic to match your naming scheme.
    """
    stem = Path(filename).stem
    for delimiter in ("_", "-", " "):
        if delimiter in stem:
            return stem.split(delimiter)[0].lower()
    return stem.lower()


def discover_dataset(image_dir: Path) -> pd.DataFrame:
    """Return a dataframe with columns [path, label]."""
    image_paths = sorted(
        p for p in image_dir.glob("**/*") if p.is_file() and p.suffix.lower() in {".jpg", ".jpeg", ".png", ".bmp"}
    )
    if not image_paths:
        raise FileNotFoundError(
            f"No images found in {image_dir}. Add files before running training."
        )
    records = []
    for path in image_paths:
        label = extract_label_from_name(path.name)
        records.append({"path": path, "label": label})
    df = pd.DataFrame(records)
    return df


def summarize_dataset(df: pd.DataFrame) -> None:
    """Print summary statistics and plot label distribution."""
    print(f"Total images: {len(df)}")
    label_counts = df["label"].value_counts().sort_values(ascending=False)
    print("\nLabel distribution:")
    print(label_counts)

    plt.figure(figsize=(10, 4))
    sns.barplot(x=label_counts.index, y=label_counts.values, palette="viridis")
    plt.xticks(rotation=45, ha="right")
    plt.xlabel("Label")
    plt.ylabel("Count")
    plt.title("Image count per label")
    plt.tight_layout()
    plt.show()


def train_val_test_split(
    df: pd.DataFrame,
    train_frac: float = 0.7,
    val_frac: float = 0.2,
    test_frac: float = 0.1,
    seed: int = 42,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Split the dataframe into train/validation/test subsets stratified by label."""
    if not math.isclose(train_frac + val_frac + test_frac, 1.0, rel_tol=1e-5):
        raise ValueError("train_frac + val_frac + test_frac must sum to 1.0")

    df_shuffled = df.sample(frac=1.0, random_state=seed).reset_index(drop=True)
    splits: Dict[str, List[pd.Series]] = {"train": [], "val": [], "test": []}

    for label, group in df_shuffled.groupby("label"):
        n = len(group)
        n_train = max(1, int(n * train_frac))
        n_val = max(1, int(n * val_frac)) if n >= 3 else 0
        n_test = n - n_train - n_val
        splits["train"].append(group.iloc[:n_train])
        splits["val"].append(group.iloc[n_train : n_train + n_val])
        splits["test"].append(group.iloc[n_train + n_val :])

    train_df = pd.concat(splits["train"]).reset_index(drop=True)
    val_df = pd.concat(splits["val"]).reset_index(drop=True)
    test_df = pd.concat(splits["test"]).reset_index(drop=True)

    print(
        f"Split counts -> train: {len(train_df)}, val: {len(val_df)}, test: {len(test_df)}"
    )
    return train_df, val_df, test_df


In [None]:
try:
    dataset_df = discover_dataset(IMAGE_DIR)
    display(dataset_df.head())
    summarize_dataset(dataset_df)
except FileNotFoundError as e:
    dataset_df = None
    print(e)
    print("Populate the images directory and rerun this cell when ready.")


## 2. Preprocessing Pipeline

This section builds TensorFlow `tf.data.Dataset` pipelines with on-the-fly preprocessing and optional augmentation. Adjust the image size or augmentations to suit your dataset.


In [None]:
IMG_SIZE = (224, 224)
BATCH_SIZE = 16
AUTOTUNE = tf.data.AUTOTUNE


def load_image(path: tf.Tensor) -> tf.Tensor:
    """Load an image from disk and resize it."""
    image = tf.io.read_file(path)
    image = tf.io.decode_image(image, channels=3, expand_animations=False)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, IMG_SIZE)
    return image


def build_dataset(
    df: pd.DataFrame,
    class_to_index: Dict[str, int],
    augment: bool = False,
) -> tf.data.Dataset:
    paths = df["path"].astype(str).values
    labels = df["label"].map(class_to_index).values

    ds = tf.data.Dataset.from_tensor_slices((paths, labels))

    def _load_and_preprocess(path, label):
        image = load_image(path)
        if augment:
            image = augment_image(image)
        return image, tf.one_hot(label, depth=len(class_to_index))

    ds = ds.shuffle(len(df), seed=SEED)
    ds = ds.map(_load_and_preprocess, num_parallel_calls=AUTOTUNE)
    ds = ds.batch(BATCH_SIZE).prefetch(AUTOTUNE)
    return ds


def augment_image(image: tf.Tensor) -> tf.Tensor:
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    image = tf.image.random_brightness(image, max_delta=0.1)
    image = tf.image.random_contrast(image, 0.9, 1.1)
    return image


def prepare_datasets(df: pd.DataFrame):
    labels = sorted(df["label"].unique())
    class_to_index = {label: idx for idx, label in enumerate(labels)}
    index_to_class = {idx: label for label, idx in class_to_index.items()}

    train_df, val_df, test_df = train_val_test_split(df)

    train_ds = build_dataset(train_df, class_to_index, augment=True)
    val_ds = build_dataset(val_df, class_to_index, augment=False) if len(val_df) else None
    test_ds = build_dataset(test_df, class_to_index, augment=False) if len(test_df) else None

    return {
        "train": train_ds,
        "val": val_ds,
        "test": test_ds,
        "class_to_index": class_to_index,
        "index_to_class": index_to_class,
        "train_df": train_df,
        "val_df": val_df,
        "test_df": test_df,
    }


In [None]:
if dataset_df is not None:
    data_bundle = prepare_datasets(dataset_df)
else:
    data_bundle = None
    print("Dataset not prepared yet. Populate images and rerun.")


## 3. Model Architecture

We define a compact convolutional neural network suitable for small to medium datasets. Swap in transfer learning (e.g., EfficientNet) if you need higher performance.


In [None]:
def build_classifier(num_classes: int) -> keras.Model:
    inputs = keras.Input(shape=(*IMG_SIZE, 3))
    x = layers.Rescaling(1.0 / 255)(inputs)

    x = layers.Conv2D(32, 3, activation="relu", padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D()(x)

    x = layers.Conv2D(64, 3, activation="relu", padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D()(x)

    x = layers.Conv2D(128, 3, activation="relu", padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D()(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Conv2D(256, 3, activation="relu", padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(0.3)(x)

    x = layers.Dense(128, activation="relu")(x)
    x = layers.Dropout(0.2)(x)

    outputs = layers.Dense(num_classes, activation="softmax")(x)

    model = keras.Model(inputs, outputs, name="visual_classifier")
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=1e-3),
        loss="categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model


if data_bundle is not None:
    num_classes = len(data_bundle["class_to_index"])
    model = build_classifier(num_classes)
    model.summary()
else:
    model = None
    print("Model not built yet. Prepare datasets first.")


## 4. Training Configuration

Define callbacks, train the model, and capture the training history. Adjust epochs or callbacks for your dataset size.


In [None]:
EPOCHS = 25

if data_bundle is not None and model is not None:
    callbacks = [
        keras.callbacks.ModelCheckpoint(
            filepath=str(ARTIFACT_DIR / "visual_classifier.keras"),
            monitor="val_accuracy",
            save_best_only=True,
            verbose=1,
        ),
        keras.callbacks.ReduceLROnPlateau(
            monitor="val_loss",
            factor=0.5,
            patience=3,
            min_lr=1e-6,
            verbose=1,
        ),
        keras.callbacks.EarlyStopping(
            monitor="val_loss", patience=6, restore_best_weights=True, verbose=1
        ),
    ]

    history = model.fit(
        data_bundle["train"],
        validation_data=data_bundle["val"],
        epochs=EPOCHS,
        callbacks=callbacks,
    )
else:
    history = None
    print("Training skipped. Ensure datasets and model are initialized above.")


In [None]:
def plot_history(history: keras.callbacks.History) -> None:
    metrics = ["accuracy", "loss"]
    plt.figure(figsize=(12, 4))
    for idx, metric in enumerate(metrics, 1):
        plt.subplot(1, 2, idx)
        plt.plot(history.history[metric], label=f"train_{metric}")
        val_metric = f"val_{metric}"
        if val_metric in history.history:
            plt.plot(history.history[val_metric], label=f"val_{metric}")
        plt.xlabel("Epoch")
        plt.ylabel(metric.capitalize())
        plt.legend()
        plt.grid(True)
    plt.tight_layout()
    plt.show()


if history is not None:
    plot_history(history)
else:
    print("No training history to plot.")


## 5. Evaluation & Diagnostics

Run evaluation on the held-out test set, visualize predictions, and inspect the confusion matrix.


In [None]:
from sklearn.metrics import classification_report, confusion_matrix


def evaluate_model(model: keras.Model, data_bundle: dict) -> Dict[str, np.ndarray]:
    test_ds = data_bundle["test"]
    if test_ds is None:
        raise ValueError("Test dataset is empty. Provide more images per class.")

    test_results = model.evaluate(test_ds, verbose=1)
    print(dict(zip(model.metrics_names, test_results)))

    y_true = []
    y_pred = []
    for images, labels in test_ds:
        logits = model.predict(images, verbose=0)
        y_true.extend(tf.argmax(labels, axis=1).numpy())
        y_pred.extend(tf.argmax(logits, axis=1).numpy())

    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    report = classification_report(
        y_true,
        y_pred,
        target_names=[data_bundle["index_to_class"][i] for i in range(len(data_bundle["index_to_class"]))],
        digits=4,
    )
    print("\nClassification report:\n", report)

    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(6, 5))
    sns.heatmap(
        cm,
        annot=True,
        fmt="d",
        cmap="Blues",
        xticklabels=[data_bundle["index_to_class"][i] for i in range(len(cm))],
        yticklabels=[data_bundle["index_to_class"][i] for i in range(len(cm))],
    )
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.title("Confusion Matrix")
    plt.tight_layout()
    plt.show()

    return {"y_true": y_true, "y_pred": y_pred, "cm": cm}


if model is not None and data_bundle is not None and data_bundle["test"] is not None:
    evaluation_outputs = evaluate_model(model, data_bundle)
else:
    evaluation_outputs = None
    print("Evaluation skipped. Ensure model is trained and test set is available.")


## 6. Single Image Inference Helper

Use the helper below to classify individual images. Provide a path to any image file after training.


In [None]:
def predict_image(path: Path, model: keras.Model, index_to_class: Dict[int, str]) -> Dict[str, float]:
    image = load_image(tf.convert_to_tensor(str(path)))
    image = tf.expand_dims(image, axis=0)
    preds = model.predict(image, verbose=0)[0]
    return {
        index_to_class[idx]: float(prob)
        for idx, prob in enumerate(preds)
    }


def display_prediction(predictions: Dict[str, float]) -> None:
    items = sorted(predictions.items(), key=lambda x: x[1], reverse=True)
    for label, prob in items:
        print(f"{label}: {prob:.3f}")


# Example usage (update the path after adding images)
example_image_path = IMAGE_DIR / "sample_image.jpg"
if model is not None and example_image_path.exists():
    probs = predict_image(example_image_path, model, data_bundle["index_to_class"])
    display_prediction(probs)
else:
    print("Set `example_image_path` to an existing image after training to see predictions.")


## 7. Realtime Webcam Inference (Optional)

The cell below captures frames from your default webcam, runs classification, and overlays the predicted label. Install OpenCV (`pip install opencv-python`) if it's not already available. Press `q` to quit the loop.


In [None]:
if cv2 is None:
    print("OpenCV is not installed. Run `pip install opencv-python` and restart the kernel before executing this cell.")
elif model is None or data_bundle is None:
    print("Train the model before running webcam inference.")
else:
    label_colors = {
        label: tuple(np.random.randint(0, 255, size=3).tolist())
        for label in data_bundle["class_to_index"].keys()
    }

    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Unable to access the webcam. Check camera permissions or device connection.")
    else:
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    print("Failed to read frame from webcam.")
                    break

                rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                pil_image = Image.fromarray(rgb_frame)
                resized = pil_image.resize(IMG_SIZE)
                input_arr = np.array(resized, dtype=np.float32) / 255.0
                input_arr = np.expand_dims(input_arr, axis=0)

                preds = model.predict(input_arr, verbose=0)[0]
                best_idx = int(np.argmax(preds))
                best_label = data_bundle["index_to_class"][best_idx]
                best_prob = float(preds[best_idx])

                text = f"{best_label}: {best_prob:.2f}"
                color = label_colors[best_label]
                cv2.putText(
                    frame,
                    text,
                    (20, 40),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    1.0,
                    color,
                    2,
                    cv2.LINE_AA,
                )

                cv2.imshow("Realtime Classification", frame)
                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
        finally:
            cap.release()
            cv2.destroyAllWindows()


## 8. Save and Reload Model

Persist the trained model and provide a simple loader. This allows you to reuse the classifier without retraining every session.


In [None]:
MODEL_PATH = ARTIFACT_DIR / "visual_classifier.keras"
METADATA_PATH = ARTIFACT_DIR / "class_mapping.json"

if model is not None and data_bundle is not None:
    model.save(MODEL_PATH)
    with open(METADATA_PATH, "w") as f:
        json.dump(data_bundle["index_to_class"], f, indent=2)
    print(f"Model saved to {MODEL_PATH}")
    print(f"Class mapping saved to {METADATA_PATH}")
else:
    print("Model not saved. Train the model first.")


def load_model_and_mapping(model_path: Path, metadata_path: Path):
    loaded_model = keras.models.load_model(model_path)
    with open(metadata_path) as f:
        index_to_class = {int(k): v for k, v in json.load(f).items()}
    return loaded_model, index_to_class


# Example reload usage (uncomment after training and saving)
# reloaded_model, index_to_class = load_model_and_mapping(MODEL_PATH, METADATA_PATH)
# display_prediction(predict_image(example_image_path, reloaded_model, index_to_class))


## 9. Next Steps Checklist

- Add class-balanced images into `images/`.
- Run the notebook sequentially from top to bottom.
- Monitor training curves and adjust hyperparameters as needed.
- Test with realtime webcam inference once satisfied with model accuracy.
- Consider augmenting the dataset or using transfer learning for improved performance.
