In [1]:
#import urllib.request
#import os
#os.makedirs("models", exist_ok=True)
#urllib.request.urlretrieve("https://raw.githubusercontent.com/opencv/opencv/3.4.0/samples/dnn/face_detector/deploy.prototxt", "models/deploy.prototxt")
#urllib.request.urlretrieve("https://github.com/opencv/opencv_3rdparty/raw/dnn_samples_face_detector_20170830/res10_300x300_ssd_iter_140000.caffemodel", "models/res10_300x300_ssd_iter_140000.caffemodel")

In [2]:
#pip install ipywidgets

In [3]:
#pip install jupyter_contrib_nbextensions

In [5]:
#pip install tensorflow==2.17.0 deepface

In [19]:
import ipywidgets as widgets
from IPython.display import display

# Test widget
test_button = widgets.Button(description="Test Widget")
test_output = widgets.Label(value="Click the button to test widgets")
display(widgets.VBox([test_button, test_output]))

def on_test_button_click(_):
    test_output.value = "Widgets are working!"
test_button.on_click(on_test_button_click)

VBox(children=(Button(description='Test Widget', style=ButtonStyle()), Label(value='Click the button to test w…

In [51]:
import cv2
import numpy as np
import asyncio
from IPython.display import display, clear_output
import ipywidgets as widgets
import os
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load face detection and age estimation models
face_proto = "models/deploy.prototxt"
face_model = "models/res10_300x300_ssd_iter_140000.caffemodel"
age_proto = "models/age_deploy.prototxt"
age_model = "models/age_net.caffemodel"
try:
    face_net = cv2.dnn.readNet(face_model, face_proto)
    age_net = cv2.dnn.readNet(age_model, age_proto)
except Exception as e:
    logger.error(f"Failed to load models: {str(e)}")
    raise RuntimeError("Model loading failed.")

# Age intervals for Caffe model
AGE_INTERVALS = ['(0-2)', '(4-6)', '(8-12)', '(15-20)', '(25-32)', '(38-43)', '(48-53)', '(60-100)']
MODEL_MEAN_VALUES = (78.4263377603, 87.7689143744, 114.895847746)

# Initialize webcam
cap = None
for index in [0, 1, 2]:
    cap = cv2.VideoCapture(index)
    if cap.isOpened():
        logger.info(f"Webcam opened successfully on index {index}")
        break
    cap.release()
if not cap or not cap.isOpened():
    logger.error("Webcam initialization failed on all indices")
    raise RuntimeError("Webcam initialization failed on all indices")

2025-05-20 16:30:41,377 - INFO - Webcam opened successfully on index 0


In [53]:
def get_faces(frame, confidence_threshold=0.4):
    """Detect faces using SSD model with strict validation."""
    try:
        if frame is None or frame.size == 0:
            logger.error("Invalid input frame.")
            return []

        blob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300), (104.0, 177.0, 123.0))
        face_net.setInput(blob)
        detections = face_net.forward()
        h, w = frame.shape[:2]
        faces = []

        for i in range(detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            if confidence > confidence_threshold:
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                start_x, start_y, end_x, end_y = box.astype(int)
                start_x, start_y = max(0, start_x), max(0, start_y)
                end_x, end_y = min(w - 1, end_x), min(h - 1, end_y)
                if end_x > start_x + 5 and end_y > start_y + 5:
                    faces.append((start_x, start_y, end_x, end_y))
                else:
                    logger.warning(f"Invalid face region: ({start_x}, {start_y}, {end_x}, {end_y})")
        return faces
    except Exception as e:
        logger.error(f"Face detection failed: {str(e)}")
        return []

def predict_age_caffe(face_img):
    """Predict age using Caffe model with image validation."""
    try:
        if face_img is None or face_img.size == 0 or face_img.shape[0] < 5 or face_img.shape[1] < 5:
            logger.error("Empty or too small face image.")
            return None, 0.0

        blob = cv2.dnn.blobFromImage(face_img, 1.0, (227, 227), MODEL_MEAN_VALUES, swapRB=False, crop=False)
        age_net.setInput(blob)
        age_preds = age_net.forward()
        age_index = age_preds[0].argmax()
        age_confidence = age_preds[0][age_index]
        if age_confidence < 0.3:  # Further lowered for more predictions
            logger.warning("Age prediction confidence too low.")
            return None, 0.0
        return AGE_INTERVALS[age_index], age_confidence
    except Exception as e:
        logger.error(f"Caffe age prediction error: {str(e)}")
        return None, 0.0

In [55]:
import glob

def parse_utkface_filename(filename):
    """Parse UTKFace filename to extract age."""
    try:
        age = int(os.path.basename(filename).split('_')[0])
        return age
    except (IndexError, ValueError):
        logger.warning(f"Invalid UTKFace filename format: {filename}")
        return None

def filter_utkface_by_age(min_age, max_age, utkface_dir="UTKFace"):
    """Filter UTKFace images by age range."""
    return [p for p in glob.glob(os.path.join(utkface_dir, "*.jpg"))
            if parse_utkface_filename(p) is not None and min_age <= parse_utkface_filename(p) <= max_age]

def run_accuracy_test(utkface_dir="UTKFace", max_images=100, age_range=None):
    """Test model accuracy using UTKFace dataset."""
    test_images = []
    if age_range:
        min_age, max_age = age_range
        logger.info(f"Filtering UTKFace images for age range {min_age}-{max_age}")
        for img_path in filter_utkface_by_age(min_age, max_age, utkface_dir)[:max_images]:
            age = parse_utkface_filename(img_path)
            if age is not None:
                test_images.append({"path": img_path, "true_age": age})
    else:
        for img_path in glob.glob(os.path.join(utkface_dir, "*.jpg"))[:max_images]:
            age = parse_utkface_filename(img_path)
            if age is not None:
                test_images.append({"path": img_path, "true_age": age})

    if not test_images:
        logger.error(f"No valid UTKFace images found in {utkface_dir}")
        return "No valid UTKFace images found"

    errors = []
    processed_images = 0
    for item in test_images:
        try:
            img = cv2.imread(item["path"])
            if img is None:
                logger.error(f"Failed to load image: {item['path']}")
                continue

            faces = get_faces(img)
            if not faces:
                logger.warning(f"No faces detected in {item['path']}")
                continue

            start_x, start_y, end_x, end_y = faces[0]
            face_img = img[start_y:end_y, start_x:end_x]
            if face_img.size == 0:
                logger.warning(f"Empty face region in {item['path']}")
                continue

            predicted_age, confidence = predict_age_caffe(face_img)
            if predicted_age:
                age_range = predicted_age.strip('()').split('-')
                predicted_age = (int(age_range[0]) + int(age_range[1])) / 2
                error = abs(predicted_age - item["true_age"])
                errors.append(error)
                processed_images += 1
                logger.info(f"Image: {item['path']}, True Age: {item['true_age']}, Predicted Age: {predicted_age}, Error: {error}, Confidence: {confidence:.2f}")
            else:
                logger.warning(f"No valid age prediction for {item['path']}")
        except Exception as e:
            logger.error(f"Error processing {item['path']}: {str(e)}")
            continue

    if errors:
        mean_error = np.mean(errors)
        logger.info(f"Processed {processed_images} images successfully")
        return f"Mean Absolute Error: {mean_error:.2f} years"
    logger.error("No valid predictions made")
    return "No valid predictions made"

In [57]:
# UI Widgets
output = widgets.Image(format='jpeg')
status_label = widgets.Label(value="Status: Ready")
start_button = widgets.Button(description="Start Stream")
stop_button = widgets.Button(description="Stop Stream", disabled=True)
test_button = widgets.Button(description="Run Accuracy Test")
quit_button = widgets.Button(description="Quit")
ui = widgets.VBox([output, status_label, start_button, stop_button, test_button, quit_button])
display(ui)

# Streaming state
running = False

async def stream_frames():
    """Asynchronous loop for real-time frame processing."""
    global running, cap
    running = True
    if not cap.isOpened():
        logger.error("Webcam not available. Attempting to reinitialize...")
        cap.release()
        for index in [0, 1, 2]:
            cap = cv2.VideoCapture(index)
            if cap.isOpened():
                logger.info(f"Webcam reinitialized on index {index}")
                break
        if not cap.isOpened():
            status_label.value = "Status: Error - Webcam unavailable"
            running = False
            return

    while running:
        ret, frame = cap.read()
        if not ret:
            status_label.value = "Status: Error - Failed to capture frame"
            logger.error("Failed to capture frame.")
            running = False
            break

        faces = get_faces(frame)
        for (start_x, start_y, end_x, end_y) in faces:
            face_img = frame[start_y:end_y, start_x:end_x]
            if face_img.size == 0:
                logger.warning("Empty face region detected")
                continue

            try:
                age, confidence = predict_age_caffe(face_img)
                if age is not None:
                    label = f"Age: {age} ({confidence*100:.1f}%)"
                    cv2.rectangle(frame, (start_x, start_y), (end_x, end_y), (255, 0, 0), 2)
                    cv2.putText(frame, label, (start_x, max(start_y - 10, 15)),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            except Exception as e:
                logger.error(f"Age prediction failed: {str(e)}")
                continue

        ret, buffer = cv2.imencode('.jpg', frame)
        output.value = buffer.tobytes()
        await asyncio.sleep(0.025)  # ~40 FPS

def start_stream(_):
    """Start the webcam stream."""
    global running
    if not running:
        start_button.disabled = True
        stop_button.disabled = False
        status_label.value = "Status: Streaming..."
        asyncio.ensure_future(stream_frames())

def stop_stream(_):
    """Stop the webcam stream."""
    global running
    running = False
    start_button.disabled = False
    stop_button.disabled = True
    status_label.value = "Status: Stopped"

def run_test(_):
    """Run accuracy test with UTKFace."""
    status_label.value = "Status: Running accuracy test..."
    result = run_accuracy_test(age_range=(15, 20))  # Try 15-20 for better MAE
    status_label.value = f"Status: {result}"

def quit_app(_):
    """Quit the application."""
    global running, cap
    running = False
    if cap:
        cap.release()
    clear_output(wait=True)
    status_label.value = "Status: Application closed"

# Bind buttons to functions
start_button.on_click(start_stream)
stop_button.on_click(stop_stream)
test_button.on_click(run_test)
quit_button.on_click(quit_app)

VBox(children=(Image(value=b'', format='jpeg'), Label(value='Status: Ready'), Button(description='Start Stream…