In [None]:
# ----------------------------------------------------------
# Audience Analysis Script using Hailo-8, DeGirum SDK, and PiCamera2
# ----------------------------------------------------------
# Models: Face Detection, Age, Gender, Emotion, Embedding
# Hardware: Raspberry Pi 5 + Hailo-8 + Camera Module 3
# Filename: 000_audience_analysis_preview.ipynb
# Created date: 01 July 2025
# Last modified date: 06 July 2025
# Version: 1.0.0
# ----------------------------------------------------------

# ----------------------------------------------------------
# Sample JSON Output (Single Viewer Record)
# ----------------------------------------------------------
# {
#   "age_est": 50,
#   "age_score": 49.81,
#   "attention_duration": 3.4,
#   "emotion": "neutral",
#   "emotion_score": 0.93,
#   "env": {
#     "aqi_est": 115,
#     "humidity": 65.1,
#     "temp_c": 29.5
#   },
#   "gaze_at_screen": true,
#   "gender": "Female",
#   "gender_score": 0.98,
#   "location": {
#     "coordinates": "3.1319N, 101.6841E",
#     "mac_address": "88:A2:9E:1C:49:6F"
#   },
#   "timestamp": "2025-07-06T09:23:06.647690Z",
#   "viewer_id": "unknown"
# }

import degirum as dg
import degirum_tools
import cv2
from picamera2 import Picamera2
import time
import json
from datetime import datetime
import hashlib
import random
import uuid
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("audience_analysis_preview")

# ----------------------------------------------------------
# Camera Streaming Class (PiCamera2 with FPS control)
# ----------------------------------------------------------
class CameraStream:
    def __init__(self, fps=5):  # Limit camera to 5 FPS for performance
        self.picam2 = Picamera2()
        self.fps_interval = 1.0 / fps
        self.picam2.configure(self.picam2.create_preview_configuration(
            main={"format": "RGB888"}
        ))
        self.picam2.start(show_preview=False)
        time.sleep(2)  # Allow sensor to stabilize

    def __iter__(self):
        while True:
            start_time = time.time()
            frame = self.picam2.capture_array()
            yield frame
            elapsed = time.time() - start_time
            time.sleep(max(0.0, self.fps_interval - elapsed))  # Frame pacing

    def stop(self):
        try:
            self.picam2.stop()
            print("Camera stopped.")
        except Exception as e:
            print(f"[Camera Stop Error] {e}")

# ----------------------------------------------------------
# Inference Configuration
# ----------------------------------------------------------
inference_host_address = "@local"
zoo_url = "../models"
token = ""
device_type = "HAILORT/HAILO8"

# ----------------------------------------------------------
# Model Selection
# ----------------------------------------------------------
face_det_model_name     = "retinaface_mobilenet--736x1280_quant_hailort_hailo8_1"
face_embed_model_name   = "arcface_mobilefacenet--112x112_quant_hailort_hailo8_1"
age_model_name          = "yolov8n_relu6_age--256x256_quant_hailort_hailo8_1"
gender_model_name       = "yolov8n_relu6_fairface_gender--256x256_quant_hailort_hailo8_1"
emotion_cls_model       = "emotion_recognition_fer2013--64x64_quant_hailort_multidevice_1"

# ----------------------------------------------------------
# Load Models
# ----------------------------------------------------------

# Load Face Detection Model
face_det_model = dg.load_model(
    model_name=face_det_model_name,
    inference_host_address=inference_host_address,
    zoo_url=zoo_url,
    token=token,
    device_type=device_type
)
face_det_model.overlay_color = [(255, 255, 0), (0, 255, 0)]

# Load Face Embedding Model
face_embed_model = dg.load_model(
    model_name=face_embed_model_name,
    inference_host_address=inference_host_address,
    zoo_url=zoo_url,
    token=token,
    device_type=device_type
)

# Load Age Estimation Model
age_model = dg.load_model(
    model_name=age_model_name,
    inference_host_address=inference_host_address,
    zoo_url=zoo_url,
    token=token,
    device_type=device_type
)

# Load Gender Classification Model
gender_model = dg.load_model(
    model_name=gender_model_name,
    inference_host_address=inference_host_address,
    zoo_url=zoo_url,
    token=token,
    device_type=device_type
)

# Load Emotion Recognition Model
emotion_cls_model = dg.load_model(
    model_name=emotion_cls_model,
    inference_host_address=inference_host_address,
    zoo_url=zoo_url,
    token=token,
    device_type=device_type
)

# ----------------------------------------------------------
# Create Compound Models (Face Crop + Classification)
# ----------------------------------------------------------
face_emotion_model    = degirum_tools.CroppingAndClassifyingCompoundModel(face_det_model, emotion_cls_model)
face_age_model        = degirum_tools.CroppingAndClassifyingCompoundModel(face_det_model, age_model)
face_gender_model     = degirum_tools.CroppingAndClassifyingCompoundModel(face_det_model, gender_model)
face_embed_model_comp = degirum_tools.CroppingAndClassifyingCompoundModel(face_det_model, face_embed_model)

# ----------------------------------------------------------
# Utility: Generate Unique Viewer ID from Face Embedding
# ----------------------------------------------------------
def get_viewer_id(embedding):
    if not embedding:
        return "unknown"
    h = hashlib.md5(str(embedding).encode()).hexdigest()
    return h[:8]

# ----------------------------------------------------------
# Utility: Draw Demographic Labels on Frame
# ----------------------------------------------------------
def draw_overlay(image, emotion_results, age_results, gender_results):
    for i in range(len(emotion_results)):
        try:
            bbox = emotion_results[i].get("bbox", [])
            x1, y1, x2, y2 = map(int, bbox)

            age           = round(age_results[i].get("score", 0))
            age_score     = age_results[i].get("score", 0.0)
            gender        = gender_results[i].get("label", "")
            gender_score  = gender_results[i].get("score", 0.0)
            emotion       = emotion_results[i].get("label", "")
            emotion_score = emotion_results[i].get("score", 0.0)

            label = f"{gender} ({gender_score:.2f}) | Age: {age} ({age_score:.2f}) | {emotion} ({emotion_score:.2f})"

            cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 255), 2)
            (w, h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.45, 1)
            cv2.rectangle(image, (x1, y1 - 22), (x1 + w, y1), (0, 255, 255), -1)
            cv2.putText(image, label, (x1, y1 - 5),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 0, 0), 1)
        except Exception as e:
            print(f"[Overlay Error] Face {i}: {e}")
    return image

# ----------------------------------------------------------
# Utility: Get MAC Address as Device ID
# ----------------------------------------------------------
def get_mac_address():
    mac = uuid.getnode()
    mac_str = ':'.join(f'{(mac >> ele) & 0xff:02x}' for ele in range(40, -1, -8))
    return mac_str.upper()

mac_address = get_mac_address()

# ----------------------------------------------------------
# Generator: Run Multi-Model Inference per Frame
# ----------------------------------------------------------
def run_inference(video_source):
    for frame in video_source:
        emotion_result = face_emotion_model.predict(frame)
        age_result     = face_age_model.predict(frame)
        gender_result  = face_gender_model.predict(frame)
        embed_result   = face_embed_model_comp.predict(frame)
        yield frame, emotion_result, age_result, gender_result, embed_result

# ----------------------------------------------------------
# Main Runtime
# ----------------------------------------------------------
video_source = CameraStream(fps=5)

try:
    with degirum_tools.Display("Audience Analysis") as display:
        for frame, emotion_result, age_result, gender_result, embed_result in run_inference(video_source):

            if not emotion_result.results:
                continue  # Skip frame if no face detected

            base_image = emotion_result.image
            overlayed_image = draw_overlay(
                base_image,
                emotion_result.results,
                age_result.results,
                gender_result.results
            )
            display.show(overlayed_image)

            # Align results length across all models
            min_len = min(
                len(emotion_result.results),
                len(age_result.results),
                len(gender_result.results),
                len(embed_result.results)
            )

            for i in range(min_len):
                try:
                    embedding = embed_result.results[i].get("embedding", [])
                    viewer_id = get_viewer_id(embedding)

                    json_output = {
                        "timestamp": datetime.utcnow().isoformat() + "Z",
                        "location": {
                            "mac_address": mac_address,
                            "coordinates": "3.1319N, 101.6841E"
                        },
                        "env": {
                            "temp_c": 29.5,
                            "humidity": 65.1,
                            "aqi_est": 115
                        },
                        "viewer_id": viewer_id,
                        "age_est": round(age_result.results[i].get("score", 0)),
                        "age_score": age_result.results[i].get("score", 0.0),
                        "gender": gender_result.results[i].get("label", ""),
                        "gender_score": gender_result.results[i].get("score", 0.0),
                        "emotion": emotion_result.results[i].get("label", ""),
                        "emotion_score": emotion_result.results[i].get("score", 0.0),
                        "attention_duration": round(random.uniform(2.0, 7.5), 1),
                        
                        # NOTE: Placeholder simulation. Replace with real gaze/attention inference if available.
                        "gaze_at_screen": random.choice([True, False])
                    }

                    # NOTE: Debug purpose
                    #print(json.dumps(json_output, separators=(",", ":")))
                    #print(json.dumps(json_output, indent=2))  # for human-readable debug

                except Exception as e:
                    print(f"[JSON Output Error] Face {i}: {e}")

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

except KeyboardInterrupt:
    print("Interrupted by user.")

finally:
    video_source.stop()
    cv2.destroyAllWindows()




INFO:degirum.zoo_manager:Local inference with local zoo from '../models' dir
INFO:degirum.zoo_manager:Local inference with local zoo from '../models' dir
INFO:degirum.zoo_manager:Local inference with local zoo from '../models' dir
INFO:degirum.zoo_manager:Local inference with local zoo from '../models' dir
INFO:degirum.zoo_manager:Local inference with local zoo from '../models' dir
[1:51:24.428782919] [11602] [1;32m INFO [1;37mCamera [1;34mcamera_manager.cpp:326 [0mlibcamera v0.5.0+59-d83ff0a4
INFO:picamera2.picamera2:Initialization successful.
[1:51:24.436922060] [11693] [1;32m INFO [1;37mRPI [1;34mpisp.cpp:720 [0mlibpisp version v1.2.1 981977ff21f3 29-04-2025 (14:13:50)
[1:51:24.447677021] [11693] [1;32m INFO [1;37mRPI [1;34mpisp.cpp:1179 [0mRegistered camera /base/axi/pcie@1000120000/rp1/i2c@88000/imx708@1a to CFE device /dev/media0 and ISP device /dev/media2 using PiSP variant BCM2712_D0
INFO:picamera2.picamera2:Camera now open.
INFO:picamera2.picamera2:Camera configura