In [1]:
import cv2
import ollama
import base64
from ultralytics import YOLO
from collections import Counter
from collections import deque
from typing import Any, List
import json
from datetime import datetime, timezone
import os
import asyncio
import telegram
from dotenv import load_dotenv

Run 'pip install torchvision==0.22' to fix torchvision or 'pip install -U torch torchvision' to update both.
For a full compatibility table see https://github.com/pytorch/vision#installation


In [2]:
ppe_model = YOLO("YOLO11n_PPE.pt")
vlm_model_name = "llava:7b-v1.6-mistral-q2_K"

In [3]:
def draw_boxes(image, detections):
    for box in detections[0].boxes:
        x1, y1, x2, y2 = map(int, box.xyxy[0])
        cls_id = int(box.cls[0])
        conf = box.conf[0]
        label = f"{detections[0].names[cls_id]} {conf:.2f}"
        
        cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(image, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
    
    return image

In [16]:
def log_detections(class_counts, datetimestamp):
    log_entry = {
        'timestamp': datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f"),
        'detections': dict(class_counts)
    }
    log_message = json.dumps(log_entry, indent=4)
    
    os.makedirs("logs", exist_ok=True)
    # Append the JSON string as a new line to the log file
    with open(f"/logs/ppe_detections_{datetimestamp}.jsonl", "a") as log_file:
        log_file.write(log_message + "\n")

detections = ppe_model("con_site_2.jpg", device='cpu')

class_ids = detections[0].boxes.cls.int().tolist()
class_names = detections[0].names
detected_class_names = [class_names[i] for i in class_ids]
class_counts = Counter(detected_class_names)

print('\n\n',)
detections[0].show()

In [6]:
class ImageBuffer:
    """
    A class to manage a fixed-size buffer that stores the latest 'n' images.

    This buffer automatically discards the oldest item when it is full and a
    new item is added.
    """
    def __init__(self, max_size: int, save_dir: str):
        if not isinstance(max_size, int) or max_size <= 0:
            raise ValueError("max_size must be a positive integer.")
        if not isinstance(save_dir, str) or not save_dir:
            raise ValueError("save_dir must be a non-empty string.")

        self._max_size = max_size
        self._save_dir = save_dir
        os.makedirs(self._save_dir, exist_ok=True)
        
        # This deque will store the file paths
        self._managed_files = deque()

    def add(self, image: Any):
        """
        Saves a new image and deletes the oldest image if the buffer is full
        """
        timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f")
        filename = f"img_{timestamp}.jpg" 
        filepath = os.path.join(self._save_dir, filename)
        
        try:
            success = cv2.imwrite(filepath, image)
            if not success:
                print(f"Failed to save image to: {filepath}")
                return
        except Exception as e:
            print(f"Error saving image with OpenCV: {e}")
            return
        
        self._managed_files.append(filepath)
        
        #If the buffer is over capacity, delete the oldest file
        if len(self._managed_files) > self._max_size:
            oldest_file = self._managed_files.popleft()
            try:
                os.remove(oldest_file)
            except Exception as e:
                print(f"Error deleting file {oldest_file}: {e}")

    def get_file_list(self) -> List[str]:
        """Returns a list of the current image file paths being managed."""
        return list(self._managed_files)

    def __len__(self) -> int:
        """Returns the current number of images being managed on disk."""
        return len(self._managed_files)

In [7]:
class ViolationTracker:
    """
    Tracks the persistence of specific safety violations based on object detection counts.

    This class identifies violations by comparing counts of detected body parts against
    their corresponding Personal Protective Equipment (PPE). It only tracks and alerts
    for violations that are explicitly assigned a duration threshold during initialization.
    """
    def __init__(self, thresholds: dict = None):
        """
        Initializes the tracker.

        Args:
            thresholds (dict, optional): A dictionary mapping violation types to their
                                         duration thresholds in seconds. If None or empty,
                                         no violations will be tracked.
                                         Example: {'HEAD_UNPROTECTED': 10, 'BODY_UNPROTECTED': 20}
        """
        if thresholds is None:
            self.thresholds = {}
        else:
            self.thresholds = thresholds
        
        # The state dictionary only contains entries for violations being actively tracked.
        self.violations = {
            v_type: {'active': False, 'start_time': None, 'alert_sent': False}
            for v_type in self.thresholds.keys()
        }

        # Master map of all possible violation types to their check functions.
        self._violation_check_map = {
            'HEAD_UNPROTECTED': self._check_head_protection,
            'FACE_UNPROTECTED': self._check_face_protection,
            'EARS_UNPROTECTED': self._check_ear_protection,
            'HANDS_UNPROTECTED': self._check_hand_protection,
            'FEET_UNPROTECTED': self._check_foot_protection,
            'BODY_UNPROTECTED': self._check_body_protection
        }
        
        # Validate that provided threshold keys are valid violation types
        for v_type in self.thresholds:
            if v_type not in self._violation_check_map:
                raise ValueError(f"'{v_type}' is not a valid violation type.")
        
        print(f"ViolationTracker initialized. Tracking: {list(self.violations.keys()) or 'None'}")

    # --- Private Violation Check Methods ---

    def _check_head_protection(self, counts: Counter) -> bool:
        """Violation: More heads are detected than helmets."""
        return counts.get('head', 0) > counts.get('helmet', 0)

    def _check_face_protection(self, counts: Counter) -> bool:
        """Violation: More faces are detected than face-guards or face-masks combined."""
        face_coverings = counts.get('face-guard', 0) + counts.get('face-mask', 0)
        return counts.get('face', 0) > face_coverings

    def _check_ear_protection(self, counts: Counter) -> bool:
        """Violation: More ears are detected than ear-muffs."""
        return counts.get('ear', 0) > counts.get('ear-mufs', 0)

    def _check_hand_protection(self, counts: Counter) -> bool:
        """Violation: More hands are detected than gloves."""
        return counts.get('hands', 0) > counts.get('gloves', 0)

    def _check_foot_protection(self, counts: Counter) -> bool:
        """Violation: More feet are detected than shoes."""
        return counts.get('foot', 0) > counts.get('shoes', 0)

    def _check_body_protection(self, counts: Counter) -> bool:
        """Violation: Persons are detected without a safety-vest or safety-suit."""
        persons = counts.get('person', 0)
        body_coverings = counts.get('safety-vest', 0) + counts.get('safety-suit', 0)
        return persons > 0 and persons > body_coverings

    # --- Public Methods ---

    def update(self, class_counts: Counter):
        """
        Updates the state of all tracked violations based on the latest detection counts.
        """
        current_time = datetime.now(timezone.utc).timestamp()

        # Iterate only through the violations we were asked to track.
        for v_type, v_state in self.violations.items():
            check_function = self._violation_check_map[v_type]
            is_violated = check_function(class_counts)

            if is_violated:
                if not v_state['active']:
                    # A new violation has just started.
                    v_state['active'] = True
                    v_state['start_time'] = current_time
                    print(f"[{datetime.fromtimestamp(current_time, tz=timezone.utc).strftime('%H:%M:%S')}] New violation started: {v_type}")
            else:
                if v_state['active']:
                    # A previously active violation has just ended.
                    duration = current_time - v_state['start_time']
                    print(f"[{datetime.fromtimestamp(current_time, tz=timezone.utc).strftime('%H:%M:%S')}] Violation ended: {v_type} (duration: {duration:.1f}s)")
                    # Reset its state.
                    v_state['active'] = False
                    v_state['start_time'] = None
                    v_state['alert_sent'] = False

    def check_for_alerts(self) -> list:
        """
        Checks if any active violations have exceeded their time threshold.

        Returns:
            list: A list of alert dictionaries for violations needing notification.
                  Returns an empty list if no alerts should be sent.
        """
        current_time = datetime.now(timezone.utc).timestamp()
        alerts_to_send = []
        for v_type, v_state in self.violations.items():
            if v_state['active'] and not v_state['alert_sent']:
                duration = current_time - v_state['start_time']
                if duration >= self.thresholds[v_type]:
                    alert_info = {
                        'type': v_type,
                        'duration': duration,
                        'threshold': self.thresholds[v_type],
                        'message': f"ALERT: {v_type} violation has persisted for over {self.thresholds[v_type]} seconds."
                    }
                    alerts_to_send.append(alert_info)
                    v_state['alert_sent'] = True # Mark as sent to prevent spamming.
                    print(f"‼️  ALERT TRIGGERED: {alert_info['message']}")
        return alerts_to_send

In [8]:
async def send_telegram_alert(bot_token: str, chat_id: str, message: str, image_path: str = None):
    """
    Sends a text message and an optional image to a Telegram chat.

    Args:
        bot_token (str): The token for your Telegram bot from BotFather.
        chat_id (str): The chat ID of the recipient.
        message (str): The text message to send.
        image_path (str, optional): The file path to an image to send. Defaults to None.

    Returns:
        bool: True if successful, False otherwise.
    """
    try:
        # Initialize the bot
        bot = telegram.Bot(token=bot_token)
        
        # Send the main text message
        await bot.send_message(chat_id=chat_id, text=message)
        print(f"✅ Text message sent to Chat ID {chat_id}.")

        # If an image path is provided, send the photo
        if image_path:
            with open(image_path, 'rb') as photo_file:
                await bot.send_photo(chat_id=chat_id, photo=photo_file)
            print(f"✅ Photo '{image_path}' sent to Chat ID {chat_id}.")
        
        return True

    except telegram.error.TelegramError as e:
        print(f"❌ Error sending Telegram message: {e}")
        return False
    except FileNotFoundError:
        print(f"❌ Error: Image file not found at '{image_path}'")
        return False
    except Exception as e:
        print(f"❌ An unexpected error occurred: {e}")
        return False

In [9]:
load_dotenv()
BOT_TOKEN = os.getenv("BOT_TOKEN")
CHAT_ID = os.getenv("CHAT_ID")
if not BOT_TOKEN or not CHAT_ID:
    print("Error: BOT_TOKEN and CHAT_ID must be set in the environment variables.")
    exit(1)
    
#await send_telegram_alert(BOT_TOKEN, CHAT_ID, "Test message from PPE monitoring system.")

In [10]:
from langchain_ollama.chat_models import ChatOllama
from langchain.prompts import ChatPromptTemplate
from langchain_core.messages import HumanMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda #, RunnablePassthrough
from langchain_google_genai import ChatGoogleGenerativeAI

GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") 
if not GOOGLE_API_KEY:
    raise ValueError("GOOGLE_API_KEY not found in .env file")

In [11]:
def create_vlm_message(inputs) -> List[HumanMessage]:
    
    image_array = inputs['image_array']
    yolo_findings = inputs['yolo_findings']

    _, buffer = cv2.imencode(".jpeg", image_array)
    encoded_image = base64.b64encode(buffer).decode("utf-8")
    
    prompt_text = f"""
You are an expert on-site safety inspector. A real-time PPE detection model has already reported the following objects: **{yolo_findings}**.

Your task is to analyze the provided image to identify all **other** safety violations. Focus on:
1. **Environmental Hazards:** Spills, trip hazards (cables, debris), blocked pathways, fire risks.
2. **Improper Equipment Use:** Misuse of tools, unsecured machinery, unsafe vehicle operation.
3. **Unsafe Practices:** Workers in unsafe positions (e.g., under suspended loads, near an edge without fall protection), improper lifting, lack of warning signs or barriers.
4. Filter your findings and only output ```high priority hazards that require immediate attention```.

**Output Instructions:**
- Provide a bulleted list of all identified hazards.
- Each bullet point must be concise (under 20 words).
- If no additional hazards are found, respond with the single word "None".
- Don't output any text other than the bulleted hazards or None.
"""

    message = HumanMessage(
        content=[
            {"type": "text", "text": prompt_text},
            {
                "type": "image_url",
                "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"},
            },
        ]
    )
    return [message]

In [19]:
USE_API = False

if USE_API:
    llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash")

else:
    llm = ChatOllama(model=vlm_model_name, temperature=0.0)

# Agent
describe_agent = (
    RunnableLambda(create_vlm_message) # Input: image array -> Output: formatted message
    | llm                                # Input: message -> Output: AIMessage
    | StrOutputParser()                  # Input: AIMessage -> Output: string description
)

img_arr = cv2.imread("con_site_2.jpg")
summary = describe_agent.invoke({
        "image_array": img_arr, 
        "yolo_findings": "No-Facemask violation"
    })
print(summary)


In [20]:
async def main(ppe_rate_s = 1, vlm_rate_m = 3, img_buffer_size = 50):
    """ppe_rate: process one frame every n seconds for PPE detection
       vlm_rate_m: process one frame every n minutes for VLM description"""
       
    violation_thresholds = {
    'HEAD_UNPROTECTED': 10,
    'BODY_UNPROTECTED': 30  }
    tracker = ViolationTracker(thresholds=violation_thresholds)

    cap = cv2.VideoCapture(1)
    if not cap.isOpened():
        print("Error: Could not open webcam.")
        exit()
    print("Starting video capture...")
    
    vlm_rate_s = vlm_rate_m * 60 #rate in seconds
    
    # initial_timestamp will be used for log filename, so every run has a unique file
    initial_timestamp = datetime.now(timezone.utc).timestamp() 
    last_processed_time_ppe = initial_timestamp 
    last_processed_time_vlm = initial_timestamp

    print(f"The system will be performing PPE detection once every {ppe_rate_s} seconds... and VLM description once every {vlm_rate_m} minutes.")
    
    start_datetime = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f")
    previous_counts = Counter()
        
    img_buffer = ImageBuffer(max_size=img_buffer_size, save_dir=f"image_buffer/{start_datetime}")

    while True:
        success, frame = cap.read()

        if not success:
            break
        
        # Check if it's time to process a new frame for PPE detection
        current_time = datetime.now(timezone.utc).timestamp()
        if (current_time - last_processed_time_ppe) >= ppe_rate_s:
            print("-----------------------------------")
            last_processed_time_ppe = current_time

            detections = ppe_model(frame, device='cpu', verbose=False, show=False)
            class_ids = detections[0].boxes.cls.int().tolist()
            class_names = detections[0].names
            detected_class_names = [class_names[i] for i in class_ids]
            class_counts = Counter(detected_class_names)
            
            #update the violation tracker state
            tracker.update(class_counts)
            violation_alerts = tracker.check_for_alerts()
            for alert in violation_alerts:
                # Send Telegram alert asynchronously
                alert_message = alert['message']
                await send_telegram_alert(BOT_TOKEN, CHAT_ID, alert_message)
            if len(violation_alerts) > 0:
                await send_telegram_alert(BOT_TOKEN, CHAT_ID, "Latest image of the scene:", img_buffer.get_file_list()[-1])
                
            # If there are any changes in detected classes, log and display
            if class_counts != previous_counts:
                previous_counts = class_counts
                print(f"Detected changes at {datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S_%f')}: {dict(class_counts)}")
                log_detections(class_counts, initial_timestamp)
                annotated_frame = draw_boxes(frame.copy(), detections)
                img_buffer.add(annotated_frame)
                
                cv2.imshow("Detection Feed", annotated_frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
                
            # Check if it's time to process a new frame for VLM description
            current_time = datetime.now(timezone.utc).timestamp()
            if (current_time - last_processed_time_vlm) >= vlm_rate_s:
                last_processed_time_vlm = current_time
                print(f"Sending image to VLM at {datetime.now(timezone.utc).strftime("%Y_%m_%d__%H_%M_%S_%f")}")
                img_buffer.add(frame)
                vlm_description = describe_agent.invoke({
                    "image_array": frame, 
                    "yolo_findings": [alert['message'] for alert in violation_alerts]
                    })
                await send_telegram_alert(BOT_TOKEN, CHAT_ID, vlm_description, img_buffer.get_file_list()[-1])
        
    cap.release()
    cv2.destroyAllWindows()

In [None]:
await main(ppe_rate_s=1, vlm_rate_m=5, img_buffer_size=50)

ViolationTracker initialized. Tracking: ['HEAD_UNPROTECTED', 'BODY_UNPROTECTED']
Starting video capture...
The system will be performing PPE detection once every 1 seconds... and VLM description once every 5 minutes.
-----------------------------------
[08:36:52] New violation started: HEAD_UNPROTECTED
Detected changes at 20251008_083652_059597: {'head': 1}
Sending image to VLM at 2025_10_08__08_36_52_123548
