In [1]:
import os
import time
import tempfile
import numpy as np
import cv2
import sounddevice as sd
from scipy.io.wavfile import write as wav_write
import librosa
import joblib
import tensorflow as tf
from ultralytics import YOLO

In [2]:
DURATION = 8          # record length
AUDIO_FS = 16000              # audio sampling rate used by audio model
FRAME_RATE = 10  
FRAME_INTERVAL = 0.5             # how many frames/sec to process from webcam
W_IMG = 0.6                   # image weight for fusion
W_AUD = 0.4                   # audio weight for fusion
SHOW_PREVIEW = True           # show webcam preview while recording
PREVIEW_WINDOW_NAME = "Recording - press 'q' to quit early"

In [3]:
YOLO_WEIGHTS = r"C:/Users/joshd/Documents/btech_project_full/btech_project/facial_emotion_detection/facial_emotion_detection_9400/runs/train/facial_emotion_detection_model/weights/best.pt"
AUDIO_MODEL_PATH = r"C:/Users/joshd/Documents/btech_project_full/btech_project/audio_emotion_detection/audio_emotion_CNN_model.h5"
LABEL_ENCODER_PATH = r"C:/Users/joshd/Documents/btech_project_full/btech_project/audio_emotion_detection/emotion_audio_label_encoder.pkl"

In [4]:
# Unified class ordering we will use for fusion
UNIFIED = ["angry", "calm", "disgust", "fearful", "happy", "neutral", "sad", "surprised"]

# Map YOLO -> unified (update if you used different labels)
YOLO_TO_UNIFIED = {
    "anger": "angry",
    "content": "calm",
    "disgust": "disgust",
    "fear": "fearful",
    "happy": "happy",
    "neutral": "neutral",
    "sad": "sad",
    "surprise": "surprised"
}

In [5]:
STRESS_MAP = {
    "happy": "none",
    "content": "none",
    "neutral": "mild",
    "calm": "mild",
    "surprised": "mild",
    "sad": "mediocre",
    "fear": "high",
    "fearful": "high",
    "anger": "high",
    "angry": "high",
    "disgust": "high"
}

In [6]:
print("Loading models...")
yolo_model = YOLO(YOLO_WEIGHTS)
audio_model = tf.keras.models.load_model(AUDIO_MODEL_PATH)
label_encoder = joblib.load(LABEL_ENCODER_PATH)
audio_classes = list(label_encoder.classes_) 
print("YOLO classes:", yolo_model.names)
print("Audio encoder classes:", audio_classes)
print("Unified classes:", UNIFIED)

Loading models...




YOLO classes: {0: 'anger', 1: 'content', 2: 'disgust', 3: 'fear', 4: 'happy', 5: 'neutral', 6: 'sad', 7: 'surprise'}
Audio encoder classes: [np.str_('angry'), np.str_('calm'), np.str_('disgust'), np.str_('fearful'), np.str_('happy'), np.str_('neutral'), np.str_('sad'), np.str_('surprised')]
Unified classes: ['angry', 'calm', 'disgust', 'fearful', 'happy', 'neutral', 'sad', 'surprised']


In [7]:
def yolo_result_to_unified_probs(result):

    try:
        boxes = result.boxes
    except Exception:
        boxes = None

    unified_probs = np.zeros(len(UNIFIED), dtype=float)
    if boxes is None or len(boxes) == 0:
        # no detection -> zeros
        return unified_probs

    try:
        confs = boxes.conf.cpu().numpy()
    except Exception:
        confs = np.array(boxes.conf)

    best_idx = int(np.argmax(confs))
    # get class id
    try:
        cls_tensor = boxes.cls[best_idx]
        cls_id = int(cls_tensor.item())
    except Exception:
        # fallback: direct indexing
        cls_id = int(boxes.cls[best_idx])

    names = yolo_model.names
    if isinstance(names, dict):
        yolo_label = names[cls_id]
    else:
        yolo_label = names[cls_id]

    unified_label = YOLO_TO_UNIFIED.get(yolo_label, None)
    if unified_label is None:
        # unknown mapping -> return zeros
        return unified_probs

    try:
        conf_val = float(boxes.conf[best_idx].item())
    except Exception:
        conf_val = float(confs[best_idx])

    unified_probs[UNIFIED.index(unified_label)] = conf_val
    # normalize if sum > 0
    s = unified_probs.sum()
    if s > 0:
        unified_probs = unified_probs / s
    return unified_probs

In [8]:
def image_frame_to_unified_probs(frame):

    results = yolo_model(frame)       
    res0 = results[0]
    return yolo_result_to_unified_probs(res0)

In [9]:
def audio_file_to_unified_probs(audio_path):

    y, sr = librosa.load(audio_path, sr=AUDIO_FS)
    mel = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=120)
    mel_db = librosa.power_to_db(mel, ref=np.max)
    # resize to (120,174)
    mel_db_resized = cv2.resize(mel_db, (174, 120))
    # standardize
    mel_db_resized = (mel_db_resized - mel_db_resized.mean()) / (mel_db_resized.std() + 1e-6)
    inp = mel_db_resized[..., np.newaxis].astype(np.float32)
    inp = np.expand_dims(inp, axis=0)   # (1,120,174,1)

    # model predict -> returns probabilities in audio_classes order
    preds = audio_model.predict(inp, verbose=0)[0]   # shape (num_audio_classes,)
    # convert into unified ordering
    unified_probs = np.zeros(len(UNIFIED), dtype=float)
    for i, cls in enumerate(audio_classes):
        if cls in UNIFIED:
            unified_idx = UNIFIED.index(cls)
            unified_probs[unified_idx] = preds[i]
    # normalize
    s = unified_probs.sum()
    if s > 0:
        unified_probs = unified_probs / s
    return unified_probs

def probs_to_label_and_conf(probs):
    idx = int(np.argmax(probs))
    return UNIFIED[idx], float(probs[idx])

In [10]:
def live_record_and_fuse(duration=DURATION, frame_interval=FRAME_INTERVAL, w_img=W_IMG, w_aud=W_AUD):
    # temporary files
    tmp_dir = tempfile.mkdtemp(prefix="live_fusion_")
    audio_tmp = os.path.join(tmp_dir, "live_audio.wav")

    # Start audio recording (non-blocking)
    print(f"[Live] Starting {duration}s audio recording (fs={AUDIO_FS})...")
    audio_rec = sd.rec(int(duration * AUDIO_FS), samplerate=AUDIO_FS, channels=1, dtype='float32')
    time.sleep(0.2)  # small delay to ensure recording started

    # Start capturing frames at intervals
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        raise RuntimeError("Cannot open webcam (0).")
    print("[Live] Capturing frames from webcam...")
    frames = []
    start = time.time()
    next_capture = start
    while True:
        now = time.time()
        if now >= next_capture:
            ret, frame = cap.read()
            if not ret:
                print("[Live] Warning: failed to read frame from webcam.")
            else:
                # store frame (BGR)
                frames.append(frame.copy())
                # optional: show small preview (comment out if not desired)
                # cv2.imshow("Live (press q to quit)", frame)
            next_capture += frame_interval
        # break when duration exceeded
        if now - start >= duration:
            break
        # small sleep to avoid busy loop
        time.sleep(0.01)

    # Stop camera and audio
    cap.release()
    # if showing preview: cv2.destroyAllWindows()
    sd.wait()  # wait for audio to finish
    # write audio to disk (convert float32 [-1,1] to int16)
    audio_array = (audio_rec.squeeze() * 32767).astype('int16')
    wav_write(audio_tmp, AUDIO_FS, audio_array)
    print(f"[Live] Saved audio to: {audio_tmp}")
    print(f"[Live] Captured {len(frames)} frames.")

    # IMAGE PREDICTION: average unified probs across captured frames
    if len(frames) == 0:
        img_unified_avg = np.zeros(len(UNIFIED), dtype=float)
    else:
        probs_list = []
        for f in frames:
            p = image_frame_to_unified_probs(f)
            probs_list.append(p)
        img_unified_avg = np.mean(np.stack(probs_list, axis=0), axis=0)
        # normalize
        s = img_unified_avg.sum()
        if s > 0:
            img_unified_avg = img_unified_avg / s

    # AUDIO PREDICTION
    aud_unified = audio_file_to_unified_probs(audio_tmp)

    # Convert probs -> labels + confidences
    img_label, img_conf = probs_to_label_and_conf(img_unified_avg)
    aud_label, aud_conf = probs_to_label_and_conf(aud_unified)

    # Fusion
    fused = (w_img * img_unified_avg) + (w_aud * aud_unified)
    s = fused.sum()
    if s > 0:
        fused = fused / s
    fused_label, fused_conf = probs_to_label_and_conf(fused)

    # Print nicely
    print("\n========== LIVE FUSION RESULTS ==========")
    print(f"Duration recorded : {duration:.1f}s")
    print(f"Image Model → {img_label} (confidence={img_conf:.3f})")
    print(f"Audio Model → {aud_label} (confidence={aud_conf:.3f})")
    print(f"FUSED RESULT → {fused_label} (confidence={fused_conf:.3f})")
    print("Detailed probabilities (UNIFIED order):")
    for cls_name, p in zip(UNIFIED, fused):
        print(f"  {cls_name:10s} : {p:.3f}")
    print("=========================================\n")

    # cleanup temp files
    try:
        os.remove(audio_tmp)
        os.rmdir(tmp_dir)
    except Exception:
        pass

    return {
        'image_label': img_label,
        'image_conf': img_conf,
        'image_probs': img_unified_avg,
        'audio_label': aud_label,
        'audio_conf': aud_conf,
        'audio_probs': aud_unified,
        'fused_label': fused_label,
        'fused_conf': fused_conf,
        'fused_probs': fused
    }

def stress_level_from_label(label):
    STRESS_ORDER = {
        "none": 0,
        "mild": 1,
        "mediocre": 2,
        "high": 3
    }
    return STRESS_MAP.get(label, "mild"), STRESS_ORDER[STRESS_MAP.get(label, "mild")]

def assess_stress(img_label, audio_label):
    stress_img, score_img = stress_level_from_label(img_label)
    stress_audio, score_audio = stress_level_from_label(audio_label)

    # Take the stronger stress estimation
    if score_img > score_audio:
        return stress_img
    else:
        return stress_audio

In [11]:
if __name__ == "__main__":
    print("Starting live webcam + mic demo.")
    result = live_record_and_fuse(duration=DURATION)
    stress = assess_stress(result['image_label'], result['audio_label'])
    print(f"Stress Level: {stress.capitalize()}")


Starting live webcam + mic demo.
[Live] Starting 8s audio recording (fs=16000)...
[Live] Capturing frames from webcam...
[Live] Saved audio to: C:\Users\Admin\AppData\Local\Temp\live_fusion_u16p26tg\live_audio.wav
[Live] Captured 17 frames.

0: 480x640 1 anger, 1 sad, 126.5ms
Speed: 10.9ms preprocess, 126.5ms inference, 144.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 neutral, 1 sad, 56.1ms
Speed: 1.6ms preprocess, 56.1ms inference, 13.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 neutral, 1 sad, 68.7ms
Speed: 1.0ms preprocess, 68.7ms inference, 3.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 neutral, 1 sad, 75.6ms
Speed: 1.1ms preprocess, 75.6ms inference, 13.5ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 neutral, 1 sad, 81.7ms
Speed: 5.8ms preprocess, 81.7ms inference, 6.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 neutral, 1 sad, 86.1ms
Speed: 9.6ms preprocess, 86.1ms inference, 9.4m