In [None]:
# Init: model, camera, helpers (simplified, latency-focused)
import time
from collections import deque, Counter

import numpy as np
import cv2
from PIL import Image, ImageEnhance, ImageFilter
from joblib import load
import ipywidgets as widgets
import traitlets
from IPython.display import display
from jetbot import Camera, bgr8_to_jpeg

# --- Load model bundle ---
model_data = load('traffic_sign_mlp.joblib')
model = model_data.get('model', model_data)
class_names = model_data.get('class_names', ['Green','Sheep','Speed','Stop','Yellow'])
image_size = tuple(model_data.get('image_size', (224, 224)))

# --- Preprocessing (simple & consistent with demo) ---
def crop_image_array(img_bgr: np.ndarray) -> np.ndarray:
    """Crop image array: remove 25% from bottom, 20% from left and right"""
    height, width = img_bgr.shape[:2]
    # Calculate crop boundaries
    left = int(width * 0.20)
    top = 0  # Keep top unchanged
    right = int(width * 0.80)
    bottom = int(height * 0.75)
    return img_bgr[top:bottom, left:right]

def enhance_image(img):
    if img.mode != 'RGB':
        img = img.convert('RGB')
    img = ImageEnhance.Contrast(img).enhance(1.2)
    img = ImageEnhance.Sharpness(img).enhance(1.1)
    return img.filter(ImageFilter.MedianFilter(size=3))

def extract_features(arr: np.ndarray) -> np.ndarray:
    arr = arr.astype(np.float32) / 255.0
    hist_r = np.histogram(arr[:, :, 0], bins=8, range=(0, 1))[0]
    hist_g = np.histogram(arr[:, :, 1], bins=8, range=(0, 1))[0]
    hist_b = np.histogram(arr[:, :, 2], bins=8, range=(0, 1))[0]
    flat = arr.reshape(-1)
    return np.concatenate([flat, hist_r, hist_g, hist_b]).astype(np.float32)

def preprocess(frame_bgr: np.ndarray) -> np.ndarray:
    # Apply cropping first (before conversion)
    frame_bgr = crop_image_array(frame_bgr)
    x_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    # Use cropped size directly (no resize needed - matches training)
    img = Image.fromarray(x_rgb)
    # Only resize if model expects different size than natural crop
    if img.size != image_size:
        img = img.resize(image_size)
    img = enhance_image(img)
    feats = extract_features(np.asarray(img).astype(np.float32))
    return feats.reshape(1, -1)

# --- Majority vote ---
def majority(window, default='UNKNOWN'):
    return Counter(window).most_common(1)[0][0] if window else default

# --- Robot helpers (import inside to avoid errors on non-robot hosts) ---
LAST_SPEED = None
CURRENT_MOTION = 'stopped'

def set_speed(forward: float, turn: float):
    global LAST_SPEED
    try:
        import lfrobot
        if LAST_SPEED != (forward, turn):
            lfrobot.lfSpeed(float(forward))
            lfrobot.lfTurnSpeed(float(turn))
            LAST_SPEED = (forward, turn)
    except Exception as e:
        print('[set_speed] error:', e)

def set_motion(moving: bool):
    global CURRENT_MOTION
    try:
        import lfrobot
        if moving and CURRENT_MOTION != 'moving':
            lfrobot.lfStart()
            CURRENT_MOTION = 'moving'
        elif (not moving) and CURRENT_MOTION != 'stopped':
            lfrobot.lfStop()
            CURRENT_MOTION = 'stopped'
    except Exception as e:
        print('[set_motion] error:', e)

# --- Simple UI factory ---
def make_ui(show_camera: bool, show_pred: bool, show_bars: bool):
    image = widgets.Image(format='jpeg', width=224, height=224) if show_camera else None
    pred_html = widgets.HTML(value='<b>Prediction:</b>') if show_pred else None
    sliders = None
    if show_bars:
        sliders = {name: widgets.FloatSlider(description=name, min=0.0, max=1.0, step=0.01, value=0.0, readout_format='.2%') for name in class_names}
    status = widgets.HTML(value='<span style="color: gray;">Idle</span>')
    log_area = widgets.Textarea(value='', description='Log', layout=widgets.Layout(width='480px', height='160px'))
    # Buttons
    btn_pause = widgets.Button(description='Pause', button_style='')
    btn_resume = widgets.Button(description='Resume', button_style='success')
    btn_estop = widgets.Button(description='E-STOP', button_style='danger')
    btn_clear = widgets.Button(description='Clear E-STOP', button_style='warning')
    buttons = widgets.HBox([btn_pause, btn_resume, btn_estop, btn_clear])
    parts = []
    if image is not None and sliders is not None:
        parts.append(widgets.HBox([image, widgets.VBox([sliders[n] for n in class_names])]))
    elif image is not None:
        parts.append(image)
    elif sliders is not None:
        parts.append(widgets.VBox([sliders[n] for n in class_names]))
    if pred_html is not None:
        parts.append(pred_html)
    parts.extend([status, buttons, log_area])
    main = widgets.VBox(parts)
    return {
        'image': image,
        'pred': pred_html,
        'sliders': sliders,
        'status': status,
        'buttons': {
            'pause': btn_pause, 'resume': btn_resume, 'estop': btn_estop, 'clear': btn_clear
        },
        'log': log_area,
        'main': main,
    }

# --- Logging helpers ---
log_buffer = deque(maxlen=50)
ui = None  # set in Start cell

def append_log(msg: str):
    ts = time.strftime('%H:%M:%S')
    line = f'[{ts}] {msg}'
    log_buffer.append(line)
    if isinstance(ui, dict) and ui.get('log') is not None:
        ui['log'].value = '\n'.join(list(log_buffer)[-12:])

def set_status_html(html: str):
    if isinstance(ui, dict) and ui.get('status') is not None:
        ui['status'].value = html

# --- Camera (match working demo: 224x224) ---
camera = Camera.instance(width=224, height=224)
print('Model and camera initialized. Classes:', class_names)

In [None]:
# Start: controller + UI (sign-based control only)
import lfrobot

# --- Options (tune for latency) ---
SHOW_CAMERA_FEED = False
SHOW_PREDICTION = True
SHOW_CONFIDENCE_BARS = True
UI_INTERVAL_S = 0.20   # throttle UI updates (smaller -> more UI load)

# --- Controller config ---
NORMAL_SPEED = 0.12
SLOW_SPEED   = 0.08
TURN_SPEED   = 0.15 
MIN_CONF     = 0.95 # minimum confidence to accept prediction
SMOOTH_N     = 5 # majority vote window size
STOP_SECONDS = 5.0 # duration to hold stop state
STOP_COOLDOWN_S = 3.0 # cooldown before resuming after stop
SHEEP_CLEAR_FRAMES = 8  # consecutive non-Sheep frames required to clear Sheep stop
YELLOW_CLEAR_FRAMES = 6  # consecutive Green frames required to clear Yellow stop

# --- UI ---
ui = make_ui(SHOW_CAMERA_FEED, SHOW_PREDICTION, SHOW_CONFIDENCE_BARS)
display(ui['main'])

# --- Robot init ---
lfrobot.lfInit()
set_speed(NORMAL_SPEED, TURN_SPEED)
set_motion(True)
append_log('Robot initialized and started (NORMAL speed).')
set_status_html('<span style="color: green;">Moving</span>')

# --- State ---
_pred_window = deque(maxlen=SMOOTH_N)
_last_ui_time = 0.0
slow_mode_active = False
stop_state_active = False
stop_until_ts = 0.0
stop_cooldown_until = 0.0
sheep_stop_active = False
yellow_stop_active = False
e_stop_active = False
sheep_clear_count = 0
yellow_clear_count = 0  # new: counts consecutive Green frames while Yellow is active

# --- Button handlers ---
def on_pause(_):
    set_motion(False)
    append_log('Paused via UI button.')
    set_status_html('<span style="color: orange;">Paused</span>')

def on_resume(_):
    global e_stop_active, stop_state_active, sheep_stop_active, yellow_stop_active, sheep_clear_count, yellow_clear_count, stop_cooldown_until
    if e_stop_active:
        append_log('Cannot resume: E-STOP is active. Use Clear E-STOP.')
        return
    # Clear all stop states
    stop_state_active = False
    sheep_stop_active = False
    yellow_stop_active = False
    sheep_clear_count = 0
    yellow_clear_count = 0
    stop_cooldown_until = 0.0  # clear cooldown
    # Resume movement
    target = SLOW_SPEED if slow_mode_active else NORMAL_SPEED
    set_speed(target, TURN_SPEED)
    set_motion(True)
    append_log('Resumed via UI button (all stops cleared).')
    set_status_html('<span style="color: green;">Moving</span>')

def on_estop(_):
    global e_stop_active
    e_stop_active = True
    set_motion(False)
    append_log('E-STOP engaged! Robot stopped.')
    set_status_html('<span style="color: red;">E-STOP</span>')

def on_clear(_):
    global e_stop_active
    e_stop_active = False
    append_log('E-STOP cleared.')
    set_status_html('<span style="color: gray;">Idle</span>')

ui['buttons']['pause'].on_click(on_pause)
ui['buttons']['resume'].on_click(on_resume)
ui['buttons']['estop'].on_click(on_estop)
ui['buttons']['clear'].on_click(on_clear)

# --- Frame callback ---
def on_frame(change):
    global slow_mode_active, stop_state_active, stop_until_ts, stop_cooldown_until, sheep_stop_active, yellow_stop_active, _last_ui_time, sheep_clear_count, yellow_clear_count
    try:
        frame = change['new']
        feats = preprocess(frame)
        probs = model.predict_proba(feats)[0]
        idx = int(np.argmax(probs))
        top_p = float(probs[idx])
        raw_label = class_names[idx]
        # Use raw label directly (no normalization)
        label_for_vote = raw_label if top_p >= MIN_CONF else 'UNKNOWN'
        _pred_window.append(label_for_vote)
        vote = majority(_pred_window)
        now = time.time()

        # UI
        if now - _last_ui_time >= UI_INTERVAL_S:
            if ui.get('pred') is not None:
                color = 'green' if top_p > 0.8 else ('orange' if top_p > 0.6 else 'red')
                ui['pred'].value = f"<b>Prediction:</b> <span style='color:{color};'>{raw_label}</span> (vote: {vote}) — {top_p:.1%}"
            if ui.get('sliders') is not None:
                for i, name in enumerate(class_names):
                    ui['sliders'][name].value = float(probs[i])
            if ui.get('image') is not None:
                ui['image'].value = bgr8_to_jpeg(frame)
            _last_ui_time = now

        # Controller logic 
        if e_stop_active:
            set_motion(False)
            return

        # Check if ANY stop is currently active (only one at a time allowed)
        any_stop_active = sheep_stop_active or yellow_stop_active or stop_state_active

        # 1) SHEEP: stop while present; clear after N consecutive non-Sheep frames
        if sheep_stop_active:
            # SHEEP stop is active - handle its clearing logic
            if vote == 'Sheep':
                sheep_clear_count = 0  # reset counter when Sheep is seen again
                set_motion(False)
                return
            else:
                # increment clear counter when Sheep not seen
                sheep_clear_count += 1
                if sheep_clear_count >= int(SHEEP_CLEAR_FRAMES):
                    sheep_stop_active = False
                    sheep_clear_count = 0
                    target = SLOW_SPEED if slow_mode_active else NORMAL_SPEED
                    set_speed(target, TURN_SPEED)
                    set_motion(True)
                    append_log('SHEEP cleared; resuming.')
                    set_status_html('<span style="color: green;">Moving</span>')
                else:
                    # keep stopped until threshold reached
                    set_motion(False)
                return
        elif vote == 'Sheep' and not any_stop_active:
            # New SHEEP detected and no other stop is active
            sheep_stop_active = True
            sheep_clear_count = 0
            set_motion(False)
            append_log('Stopping: SHEEP detected.')
            set_status_html('<span style="color: red;">SHEEP</span>')
            return

        # 2) YELLOW: stop until YELLOW is cleared by N consecutive GREEN frames
        if yellow_stop_active:
            # YELLOW stop is active - handle its clearing logic
            if vote == 'Yellow':
                yellow_clear_count = 0  # reset counter when Yellow is seen again
                set_motion(False)
                return
            elif vote == 'Green':
                yellow_clear_count += 1
                if yellow_clear_count >= int(YELLOW_CLEAR_FRAMES):
                    yellow_stop_active = False
                    yellow_clear_count = 0
                    target = SLOW_SPEED if slow_mode_active else NORMAL_SPEED
                    set_speed(target, TURN_SPEED)
                    set_motion(True)
                    append_log('YELLOW cleared after consecutive GREEN; resuming.')
                    set_status_html('<span style="color: green;">Moving</span>')
                else:
                    # still waiting for enough GREEN frames; remain stopped
                    set_motion(False)
                return
            else:
                # any other non-GREEN frame while Yellow active resets the counter and stays stopped
                yellow_clear_count = 0
                set_motion(False)
                return
        elif vote == 'Yellow' and not any_stop_active:
            # New YELLOW detected and no other stop is active
            yellow_stop_active = True
            yellow_clear_count = 0
            set_motion(False)
            append_log('Stopping: YELLOW light.')
            set_status_html('<span style="color: orange;">YELLOW</span>')
            return

        # 3) STOP: timed stop for STOP_SECONDS, then resume with cooldown
        if stop_state_active:
            # STOP is active - handle timed duration
            if now < stop_until_ts:
                set_motion(False)
                return
            else:
                stop_state_active = False
                slow_mode_active = False
                set_speed(NORMAL_SPEED, TURN_SPEED)
                set_motion(True)
                stop_cooldown_until = now + float(STOP_COOLDOWN_S)
                append_log('Releasing STOP; cooldown active.')
                set_status_html('<span style="color: green;">Moving</span>')
                return
        elif vote == 'Stop' and not any_stop_active and now >= stop_cooldown_until:
            # New STOP detected and no other stop is active and not in cooldown
            stop_state_active = True
            stop_until_ts = now + float(STOP_SECONDS)
            set_motion(False)
            append_log(f'Stopping: STOP sign. Holding for {STOP_SECONDS:.1f}s.')
            set_status_html('<span style="color: red;">STOP</span>')
            return

        # 4) SPEED: engage slow mode until next STOP
        if vote == 'Speed':
            if not slow_mode_active:
                append_log('SLOW mode engaged: SPEED sign.')
            slow_mode_active = True
            if not (stop_state_active or sheep_stop_active or yellow_stop_active):
                set_speed(SLOW_SPEED, TURN_SPEED)
                set_motion(True)
                set_status_html('<span style="color: green;">Moving (SLOW)</span>')
            return

        # 5) GREEN: go (respect slow mode)
        if vote == 'Green':
            if not (stop_state_active or sheep_stop_active or yellow_stop_active):
                target = SLOW_SPEED if slow_mode_active else NORMAL_SPEED
                set_speed(target, TURN_SPEED)
                set_motion(True)
                set_status_html('<span style="color: green;">Moving</span>')
            return
        # 6) UNKNOWN: keep current state
    except Exception as e:
        try:
            set_motion(False)
        except Exception:
            pass
        append_log(f'Controller error: {e}')

camera.observe(on_frame, names='value')
append_log('Controller started. Run the Stop cell to detach and deinitialize.')

In [None]:
# Stop: detach callback, stop and deinit robot, stop camera
try:
    camera.unobserve(on_frame, names='value')
    append_log('Controller detached.')
except Exception as e:
    print('Detach error:', e)

try:
    import lfrobot
    lfrobot.lfStop()
    lfrobot.lfDeinit()
    append_log('Robot stopped and deinitialized.')
except Exception as e:
    print('Robot stop/deinit error:', e)

try:
    camera.stop()
    append_log('Camera stopped.')
except Exception as e:
    print('Camera stop error:', e)