In [1]:
"""
AI Assistant for Visually Impaired Users
=========================================

INSTALLATION INSTRUCTIONS:
1. Install required packages:
   pip install opencv-python ultralytics numpy pyttsx3

2. Run the script:
   python main.py

3. Press 'q' to quit the application

REQUIREMENTS:
- Webcam connected to the system
- Python 3.7 or higher
- Internet connection for first run (to download YOLOv8n model)

This script provides real-time object detection and audio guidance
for visually impaired users using a webcam feed.
"""

import cv2
import numpy as np
import pyttsx3
import time
from ultralytics import YOLO
from collections import defaultdict
import threading


class VisionAssistant:
    """Main class for the vision assistance system"""
    
    def __init__(self):
        """Initialize the vision assistant with all necessary components"""
        
        # Initialize text-to-speech engine
        try:
            self.tts_engine = pyttsx3.init()
            self.tts_engine.setProperty('rate', 180)  # Speed of speech
            self.tts_engine.setProperty('volume', 1.0)  # Volume (0.0 to 1.0)
        except Exception as e:
            print(f"Error initializing TTS engine: {e}")
            self.tts_engine = None
        
        # Load YOLOv8 nano model (will download on first run)
        try:
            print("Loading YOLOv8 model...")
            self.model = YOLO('yolov8n.pt')  # Nano model for CPU efficiency
            print("Model loaded successfully!")
        except Exception as e:
            print(f"Error loading YOLO model: {e}")
            raise
        
        # Define high-priority object classes (COCO dataset classes)
        self.priority_classes = {
            'person', 'chair', 'couch', 'bed', 'dining table', 
            'door', 'car', 'bicycle', 'motorcycle', 'bus', 'truck',
            'bottle', 'cup', 'laptop', 'cell phone', 'stairs'
        }
        
        # Tracking system to avoid repeated announcements
        self.last_announcement = {}  # {position_key: timestamp}
        self.announcement_cooldown = 3.0  # Seconds before re-announcing same object
        
        # TTS lock to prevent overlapping speech
        self.tts_lock = threading.Lock()
        self.is_speaking = False
        
        # Frame dimensions (will be set when video starts)
        self.frame_width = 0
        self.frame_height = 0
    
    def calculate_position(self, x_center, bbox_width):
        """
        Determine if object is on left, center, or right of frame
        
        Args:
            x_center: X coordinate of bounding box center
            bbox_width: Width of bounding box
            
        Returns:
            str: Position description ('left', 'center', 'right')
        """
        # Divide frame into three zones
        left_boundary = self.frame_width * 0.33
        right_boundary = self.frame_width * 0.67
        
        if x_center < left_boundary:
            return 'left'
        elif x_center > right_boundary:
            return 'right'
        else:
            return 'ahead'
    
    def estimate_proximity(self, bbox_width, bbox_height):
        """
        Estimate how close an object is based on bounding box size
        
        Args:
            bbox_width: Width of bounding box
            bbox_height: Height of bounding box
            
        Returns:
            str: Proximity description ('very close', 'close', 'moderate', 'far')
        """
        # Calculate bounding box area as percentage of frame
        bbox_area = bbox_width * bbox_height
        frame_area = self.frame_width * self.frame_height
        area_ratio = bbox_area / frame_area
        
        if area_ratio > 0.3:
            return 'very close'
        elif area_ratio > 0.15:
            return 'close'
        elif area_ratio > 0.05:
            return 'moderate'
        else:
            return 'far'
    
    def calculate_priority(self, class_name, proximity, position):
        """
        Calculate priority score for detected objects
        
        Args:
            class_name: Name of detected object
            proximity: Proximity level
            position: Position in frame
            
        Returns:
            int: Priority score (higher = more important)
        """
        priority = 0
        
        # Base priority for important objects
        if class_name in self.priority_classes:
            priority += 10
        
        # Extra priority for people
        if class_name == 'person':
            priority += 5
        
        # Proximity scoring
        proximity_scores = {
            'very close': 15,
            'close': 10,
            'moderate': 5,
            'far': 0
        }
        priority += proximity_scores.get(proximity, 0)
        
        # Position scoring (ahead is most important)
        position_scores = {
            'ahead': 10,
            'left': 5,
            'right': 5
        }
        priority += position_scores.get(position, 0)
        
        return priority
    
    def should_announce(self, announcement_key):
        """
        Check if enough time has passed to re-announce this object
        
        Args:
            announcement_key: Unique key for this announcement
            
        Returns:
            bool: True if should announce, False otherwise
        """
        current_time = time.time()
        
        if announcement_key not in self.last_announcement:
            self.last_announcement[announcement_key] = current_time
            return True
        
        time_since_last = current_time - self.last_announcement[announcement_key]
        
        if time_since_last >= self.announcement_cooldown:
            self.last_announcement[announcement_key] = current_time
            return True
        
        return False
    
    def speak_async(self, text):
        """
        Speak text asynchronously to avoid blocking video processing
        
        Args:
            text: Text to speak
        """
        if not self.tts_engine or self.is_speaking:
            return
        
        def speak_thread():
            with self.tts_lock:
                self.is_speaking = True
                try:
                    self.tts_engine.say(text)
                    self.tts_engine.runAndWait()
                except Exception as e:
                    print(f"TTS error: {e}")
                finally:
                    self.is_speaking = False
        
        # Start speech in separate thread
        thread = threading.Thread(target=speak_thread, daemon=True)
        thread.start()
    
    def process_detections(self, results):
        """
        Process YOLO detection results and generate guidance
        
        Args:
            results: YOLO detection results
            
        Returns:
            list: Processed detection data for visualization
        """
        detections = []
        
        # Extract detection information
        for result in results:
            boxes = result.boxes
            
            for box in boxes:
                # Get bounding box coordinates
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                
                # Calculate center and dimensions
                x_center = (x1 + x2) / 2
                y_center = (y1 + y2) / 2
                bbox_width = x2 - x1
                bbox_height = y2 - y1
                
                # Get class name and confidence
                class_id = int(box.cls[0])
                class_name = self.model.names[class_id]
                confidence = float(box.conf[0])
                
                # Only process detections with confidence > 0.5
                if confidence < 0.5:
                    continue
                
                # Determine position and proximity
                position = self.calculate_position(x_center, bbox_width)
                proximity = self.estimate_proximity(bbox_width, bbox_height)
                
                # Calculate priority
                priority = self.calculate_priority(class_name, proximity, position)
                
                detections.append({
                    'bbox': (int(x1), int(y1), int(x2), int(y2)),
                    'class_name': class_name,
                    'confidence': confidence,
                    'position': position,
                    'proximity': proximity,
                    'priority': priority,
                    'center': (int(x_center), int(y_center))
                })
        
        return detections
    
    def generate_guidance(self, detections):
        """
        Generate audio guidance based on detected objects
        
        Args:
            detections: List of processed detections
        """
        if not detections:
            # Clear path - only announce occasionally
            if self.should_announce('clear_path'):
                self.speak_async("Clear path")
            return
        
        # Sort by priority (highest first)
        detections.sort(key=lambda x: x['priority'], reverse=True)
        
        # Announce top priority objects (max 2 to avoid overwhelming)
        announced_count = 0
        max_announcements = 2
        
        for detection in detections:
            if announced_count >= max_announcements:
                break
            
            class_name = detection['class_name']
            position = detection['position']
            proximity = detection['proximity']
            
            # Create unique announcement key
            announcement_key = f"{class_name}_{position}_{proximity}"
            
            # Check if we should announce this
            if not self.should_announce(announcement_key):
                continue
            
            # Generate announcement text
            if proximity == 'very close':
                message = f"Warning! {class_name} very close {position}"
            elif proximity == 'close':
                message = f"{class_name} {position}"
            else:
                # Only announce moderate/far objects if they're high priority
                if detection['priority'] < 20:
                    continue
                message = f"{class_name} {position}"
            
            # Speak the message
            self.speak_async(message)
            announced_count += 1
    
    def draw_detections(self, frame, detections):
        """
        Draw bounding boxes and labels on frame for visualization
        
        Args:
            frame: Video frame
            detections: List of processed detections
            
        Returns:
            frame: Annotated frame
        """
        for detection in detections:
            x1, y1, x2, y2 = detection['bbox']
            class_name = detection['class_name']
            confidence = detection['confidence']
            position = detection['position']
            proximity = detection['proximity']
            
            # Color based on proximity
            color_map = {
                'very close': (0, 0, 255),    # Red
                'close': (0, 165, 255),       # Orange
                'moderate': (0, 255, 255),    # Yellow
                'far': (0, 255, 0)            # Green
            }
            color = color_map.get(proximity, (255, 255, 255))
            
            # Draw bounding box
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            
            # Create label
            label = f"{class_name} ({position})"
            
            # Draw label background
            (label_width, label_height), _ = cv2.getTextSize(
                label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2
            )
            cv2.rectangle(
                frame, 
                (x1, y1 - label_height - 10), 
                (x1 + label_width, y1), 
                color, 
                -1
            )
            
            # Draw label text
            cv2.putText(
                frame, 
                label, 
                (x1, y1 - 5), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.6, 
                (0, 0, 0), 
                2
            )
        
        return frame
    
    def run(self):
        """Main loop for the vision assistant"""
        
        # Initialize webcam
        print("Initializing webcam...")
        cap = cv2.VideoCapture(0)
        
        if not cap.isOpened():
            print("Error: Could not open webcam")
            print("Please ensure your webcam is connected and not in use by another application")
            return
        
        # Get frame dimensions
        self.frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        print(f"Webcam initialized: {self.frame_width}x{self.frame_height}")
        print("Press 'q' to quit")
        
        # Initial greeting
        self.speak_async("Vision assistant started")
        
        # Main processing loop
        try:
            while True:
                # Capture frame
                ret, frame = cap.read()
                
                if not ret:
                    print("Error: Failed to capture frame")
                    break
                
                # Run YOLOv8 inference
                results = self.model(frame, verbose=False, device='cpu')
                
                # Process detections
                detections = self.process_detections(results)
                
                # Generate audio guidance
                self.generate_guidance(detections)
                
                # Draw detections on frame
                annotated_frame = self.draw_detections(frame, detections)
                
                # Add instruction text
                cv2.putText(
                    annotated_frame,
                    "Press 'q' to quit",
                    (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.7,
                    (255, 255, 255),
                    2
                )
                
                # Display frame
                cv2.imshow('Vision Assistant', annotated_frame)
                
                # Check for quit key
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        
        except KeyboardInterrupt:
            print("\nInterrupted by user")
        
        except Exception as e:
            print(f"Error during execution: {e}")
        
        finally:
            # Cleanup
            print("Shutting down...")
            cap.release()
            cv2.destroyAllWindows()
            
            if self.tts_engine:
                self.speak_async("Vision assistant stopped")
                time.sleep(1)  # Allow final message to complete


def main():
    """Entry point for the application"""
    try:
        assistant = VisionAssistant()
        assistant.run()
    except Exception as e:
        print(f"Fatal error: {e}")
        print("\nTroubleshooting:")
        print("1. Ensure all packages are installed: pip install opencv-python ultralytics numpy pyttsx3")
        print("2. Check that your webcam is connected and accessible")
        print("3. Ensure you have internet connection for first run (model download)")


if __name__ == "__main__":
    main()

Loading YOLOv8 model...
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.4.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 8.1MB/s 0.8s.8s<0.0ss.9s
Model loaded successfully!
Initializing webcam...
Webcam initialized: 640x480
Press 'q' to quit
Shutting down...
