In [1]:
import os
import threading
import asyncio
from playsound import playsound
import cv2
from dotenv import load_dotenv, find_dotenv
import supervision as sv
import numpy as np
import openai
import time
from ultralytics import YOLO
from text_to_speech import text_to_speech

In [2]:
def call_tts_and_play(text):
    """Create a fresh loop so it can run inside a thread or notebook."""
    output_file = "output_audio.mp3"
    loop = asyncio.new_event_loop()
    try:
        loop.run_until_complete(text_to_speech(text, output_file))
        if os.path.exists(output_file):
            playsound(output_file)
        else:
            print("❌ Audio file was not created.", flush=True)
    except Exception as e:
        print(f"❌ TTS error: {e}", flush=True)
    finally:
        loop.close()
        if os.path.exists(output_file):
            os.remove(output_file)
            print("Temporary audio file removed.", flush=True)
        else:
            print("No temporary audio file to remove.", flush=True)
    print("TTS process completed.", flush=True)


In [3]:
dotenv_path = find_dotenv()
load_dotenv(dotenv_path)

True

In [4]:
openai.api_key = os.getenv("OPENAI_API_KEY")

In [None]:
class_names = ["person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat",
"traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird",
"cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", 
"backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard",
"sports ball", "kite", "baseball bat", "baseball glove", "skateboard",
"ball", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife",
"spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot",
"hot dog", "pizza", "donut", "cake", "chair", "couch", "potted plant", "bed",
"dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard",
"cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book",
"clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush", "watch"]
# buffer for summary data
scene_buffer = []
buffer_lock  = threading.Lock()
last_send    = time.time()
SEND_INTERVAL = 7  # seconds
latest_description = ""

In [13]:
def send_to_gpt():
    global last_send, latest_description
    while True:
        now = time.time()
        if now - last_send >= SEND_INTERVAL:
            # 1) pull & clear buffer
            with buffer_lock:
                batch = scene_buffer.copy()
                scene_buffer.clear()
                last_send = now

            if batch:
                prompt = "You are an assistant for a visually impaired user. Here is what was just seen:\n"
                for evt in batch:
                    prompt += (
                        f"- {evt['class']} (ID {evt['track_id']}) at "
                        f"x={evt['cx']:.0f}, y={evt['cy']:.0f} moving "
                        f"vx={evt['vx']:.1f}, vy={evt['vy']:.1f}\n"
                    )
                prompt += "\nDescribe this briefly in natural spoken language."
                print(f"\n[GPT Prompt]\n{prompt}\n", flush=True)
                try:
                    resp = openai.chat.completions.create(
                        model="gpt-4",
                        messages=[
                            {"role":"system","content":"You describe surroundings for a blind user. Be as brief as possible, while still being informative."},
                            {"role":"user",  "content":prompt}
                        ],
                        temperature=0.7,
                        max_tokens=150
                    )
                    text = resp.choices[0].message.content.strip()
                    # 2) update shared var
                    latest_description = text
                    print(f"\n[Descriptive update]\n{text}\n", flush=True)
                    # 3) kick off TTS in a fire-and-forget thread
                    threading.Thread(target=call_tts_and_play, args=(text,), daemon=True).start()

                except Exception as e:
                    print(f"[GPT Error] {e}", flush=True)

        time.sleep(0.5)

In [7]:
def create_kalman():
    kf = cv2.KalmanFilter(4, 2)               # 4 state vars (x, y, vx, vy), 2 measurements (x, y)
    # State transition: x' = x + vx, y' = y + vy
    kf.transitionMatrix = np.array([
        [1, 0, 1, 0],
        [0, 1, 0, 1],
        [0, 0, 1, 0],
        [0, 0, 0, 1]
    ], np.float32)
    # We directly observe x and y
    kf.measurementMatrix = np.array([
        [1, 0, 0, 0],
        [0, 1, 0, 0]
    ], np.float32)
    # Tune noise covariances as needed
    kf.processNoiseCov = np.eye(4, dtype=np.float32) * 1e-2
    kf.measurementNoiseCov = np.eye(2, dtype=np.float32) * 1e-1
    kf.errorCovPost = np.eye(4, dtype=np.float32)
    return kf

In [8]:
model = YOLO("yolov8m-world.pt")

In [9]:
model.set_classes(class_names)

In [10]:
bounding_box_annotator = sv.BoxAnnotator(
    thickness=2,
)
label_annotator = sv.LabelAnnotator(
    text_thickness=2,
    text_scale=1)

In [11]:

kalman_filters = {} 

In [None]:
# start the background sender
threading.Thread(target=send_to_gpt, daemon=True).start()


# ─── MAIN TRACK & KALMAN LOOP ───────────────────────────────────────────────────

cap = cv2.VideoCapture(0)
if not cap.isOpened():
    raise RuntimeError("Could not open camera")

while True:
    ret, frame = cap.read()
    if not ret:
        break

    results = model.track(
        frame,
        tracker="custom_bytetrack.yaml",
        conf=0.25,
        iou=0.5,
        persist=True,
        verbose=False
    )
    result = results[0]

    annotated = frame.copy()
    # supply a default `ids` list if ByteTrack hasn't assigned any yet
    ids = result.boxes.id
    if ids is None:
        ids = [0] * len(result.boxes.cls)

    for box, cls, tid in zip(result.boxes.xyxy,
                            result.boxes.cls,
                            ids):
        x1, y1, x2, y2 = map(int, box)
        cx, cy = (x1 + x2) / 2, (y1 + y2) / 2

        # Kalman init/predict/correct
        if tid not in kalman_filters:
            kf = create_kalman()
            kf.statePost = np.array([[cx],[cy],[0],[0]], np.float32)
            kalman_filters[tid] = kf
        else:
            kf = kalman_filters[tid]

        pred = kf.predict().flatten()
        kf.correct(np.array([[cx],[cy]], np.float32))
        px, py, pvx, pvy = pred

        # draw box + ID
        label = f"{result.names[int(cls)]} ID:{int(tid)}"
        cv2.rectangle(annotated, (x1, y1), (x2, y2), (0,255,0), 2)
        cv2.putText(annotated, label, (x1, y1-10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)

        # draw Kalman prediction arrow
        cv2.circle(annotated, (int(px),int(py)), 4, (0,0,255), -1)
        cv2.arrowedLine(annotated,
                        (int(cx),int(cy)),
                        (int(px),int(py)),
                        (0,0,255), 2, tipLength=0.3)

        # buffer for GPT…
        with buffer_lock:
            scene_buffer.append({
                "track_id": tid,
                "class":    result.names[int(cls)],
                "cx":       cx,
                "cy":       cy,
                "vx":       pvx,
                "vy":       pvy
            })

    # ─── Overlay the latest GPT description ───────────────────────────────────────
    if latest_description:
        y0 = annotated.shape[0] - 20
        cv2.putText(annotated,
                    latest_description[:50] + "...",  # trim to fit
                    (10, y0),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.6, (255,255,255), 2)

    cv2.imshow("YOLOv8 + ByteTrack + Kalman", annotated)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()


[GPT Prompt]
You are an assistant for a visually impaired user. Here is what was just seen:
- person (ID 6.0) at x=346, y=295 moving vx=0.0, vy=0.0
- bed (ID 9.0) at x=566, y=324 moving vx=0.0, vy=0.0
- person (ID 6.0) at x=348, y=296 moving vx=0.0, vy=0.0
- bed (ID 9.0) at x=566, y=324 moving vx=0.0, vy=0.0
- person (ID 6.0) at x=344, y=296 moving vx=0.0, vy=0.0
- bed (ID 9.0) at x=565, y=324 moving vx=0.0, vy=0.0
- person (ID 6.0) at x=344, y=295 moving vx=0.0, vy=0.0
- bed (ID 9.0) at x=565, y=324 moving vx=0.0, vy=0.0
- person (ID 6.0) at x=348, y=295 moving vx=0.0, vy=0.0
- bed (ID 9.0) at x=565, y=323 moving vx=0.0, vy=0.0
- person (ID 6.0) at x=348, y=295 moving vx=0.0, vy=0.0
- bed (ID 9.0) at x=565, y=323 moving vx=0.0, vy=0.0
- person (ID 6.0) at x=346, y=295 moving vx=0.0, vy=0.0
- bed (ID 9.0) at x=565, y=324 moving vx=0.0, vy=0.0
- person (ID 6.0) at x=348, y=295 moving vx=0.0, vy=0.0
- bed (ID 9.0) at x=566, y=324 moving vx=0.0, vy=0.0
- person (ID 6.0) at x=348, y=295 m