In [1]:
import cv2, threading, queue
import numpy as np
import torch
import sounddevice as sd
import whisper
import time

from helper_functions import create_bold_stickman, load_gt_landmarks, sanitize_word

In [2]:
default_in, default_out = sd.default.device
print(f"Default input device index : {default_in}")
print(f"Default output device index: {default_out}")

Default input device index : 1
Default output device index: 3


In [3]:
FILLER_WORDS = {
    # articles & determiners
    "a", "an", "the",
    # to‑be verbs
    "is", "are", "am", "was", "were", "be", "being", "been",
    # conjunctions & prepositions
    "and", "or", "but", "so", "of", "to", "in", "for", "on", "at", "by",
    # common disfluencies
    "um", "uh", "like", "you", "know", "okay", "right", "i", "mean"
}

In [5]:
gt_land = load_gt_landmarks("landmark_data")
if "no" not in gt_land:
    raise RuntimeError("You must have a 'no.json' in landmark_data for missing-word fallback")

In [6]:
# —————————————————————————————————————————————
# 2) Set up Whisper tiny model on GPU if possible
# —————————————————————————————————————————————
device = "cuda" if torch.cuda.is_available() else "cpu"
model  = whisper.load_model("tiny").to(device)

In [9]:
# —————————————————————————————————————————————
# 2) Audio / word queues & latency tracking
# —————————————————————————————————————————————
audio_q   = queue.Queue()
word_q    = queue.Queue()
LATENCIES = []  # list of floats

SAMPLERATE = 16000
BLOCK_SIZE = SAMPLERATE        # 1s blocks
OVERLAP    = int(0.2*SAMPLERATE)  # 0.2s overlap

def audio_callback(indata, frames, time_info, status):
    if status:
        print("Audio status:", status)
    arr = np.frombuffer(indata, dtype=np.int16).reshape(frames,1)
    audio_q.put(arr)

def audio_worker():
    buffer = np.empty((0,), dtype=np.float32)
    while True:
        chunk = audio_q.get()
        if chunk is None:
            break
        f32 = chunk[:,0].astype(np.float32)/32768.0
        buffer = np.concatenate([buffer, f32])
        if len(buffer) >= SAMPLERATE:
            segment = buffer[:SAMPLERATE]
            buffer  = buffer[-OVERLAP:]
            t0 = time.time()
            result = model.transcribe(segment, word_timestamps=False)
            text   = result["text"].strip()
            for tok in text.split():
                w = sanitize_word(tok)
                if not w:
                    continue
                # flush old
                while True:
                    try: word_q.get_nowait()
                    except queue.Empty: break
                # enqueue with timestamp
                word_q.put((w, t0))
                print(f"[DEBUG] enqueue '{w}' @ {t0:.3f}")

# start audio
stream = sd.RawInputStream(
    samplerate=SAMPLERATE, blocksize=BLOCK_SIZE,
    dtype="int16", channels=1, callback=audio_callback
)
stream.start()
t_audio = threading.Thread(target=audio_worker, daemon=True)
t_audio.start()

# —————————————————————————————————————————————
# 3) Webcam loop with latency overlay
# —————————————————————————————————————————————
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    raise RuntimeError("Cannot open webcam")

FPS   = cap.get(cv2.CAP_PROP_FPS) or 30.0
DELAY = int(1000/FPS)

current_word = None
display_text = ""
sequence     = []
seq_idx      = 0
last_latency = 0.0

while True:
    ret, frame = cap.read()
    if not ret:
        break

    # grab any new word + its enqueue timestamp
    try:
        w, t_enqueue = word_q.get_nowait()
        t_display = time.time()
        last_latency = (t_display - t_enqueue)
        LATENCIES.append(last_latency)
        print(f"[DEBUG] display '{w}' latency={last_latency*1000:.1f}ms")

        current_word = w
        if w in gt_land:
            sequence = gt_land[w]
            display_text = w
        else:
            sequence = gt_land["no"]
            display_text = f"{w} not found"
        seq_idx = 0
    except queue.Empty:
        pass

    # draw stickman
    if sequence:
        flm = sequence[seq_idx % len(sequence)]
        seq_idx += 1
        ov = create_bold_stickman(flm, width=300, height=300)
        H,W = frame.shape[:2]
        x_off = (W-300)//2; y_off = H-300
        alpha = ov[:,:,3]/255.0
        for c in range(3):
            frame[y_off:y_off+300, x_off:x_off+300, c] = (
                alpha*ov[:,:,c] + (1-alpha)*frame[y_off:y_off+300,x_off:x_off+300,c]
            ).astype(np.uint8)
        cv2.putText(frame, display_text,
                    (x_off+5, y_off-10),
                    cv2.FONT_HERSHEY_SIMPLEX,1.0,(255,255,255),2,cv2.LINE_AA)

    # overlay latency in ms top-left
    cv2.putText(frame, f"Latency: {last_latency*1000:.1f} ms",
                (10,30),
                cv2.FONT_HERSHEY_SIMPLEX,0.8,(0,255,0),2,cv2.LINE_AA)

    cv2.imshow("Live ASL Translation w/ Latency", frame)
    if cv2.waitKey(DELAY)&0xFF==ord('q'):
        break

# —————————————————————————————————————————————
# 4) Cleanup & report stats
# —————————————————————————————————————————————
stream.stop()
audio_q.put(None)
t_audio.join()
cap.release()
cv2.destroyAllWindows()

if LATENCIES:
    import statistics
    print("Latency stats (ms):")
    print(f"  min: {min(LATENCIES)*1000:.1f}")
    print(f"  avg: {statistics.mean(LATENCIES)*1000:.1f}")
    print(f"  max: {max(LATENCIES)*1000:.1f}")
else:
    print("No latency samples collected.")

[DEBUG] enqueue 'after' @ 1746165211.496
[DEBUG] enqueue 'now' @ 1746165211.496
[DEBUG] display 'now' latency=243.7ms
[DEBUG] enqueue 'good' @ 1746165222.494
[DEBUG] display 'good' latency=114.7ms
[DEBUG] enqueue 'funny' @ 1746165236.496
[DEBUG] display 'funny' latency=526.4ms
[DEBUG] enqueue 'problem' @ 1746165255.495
[DEBUG] display 'problem' latency=151.5ms
Latency stats (ms):
  min: 114.7
  avg: 259.1
  max: 526.4
