In [None]:
!pip install  ultralytics

In [None]:
!pip install -q --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

In [None]:
# ===== IMPORTS =====
# Core libraries
import os
import time
from collections import defaultdict
from datetime import datetime, timezone
from typing import List, Tuple, Dict, Optional, Union  # Added Union here
from dataclasses import dataclass

# Computer Vision
import cv2
import numpy as np

# Machine Learning
import torch
from ultralytics import YOLO

# Google Drive API
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload

# Utilities
import json
import uuid

mask, gloves , person  dataset 

In [None]:
# ===== CONSTANTS & CONFIGURATION =====
class Config:
    # Model paths
    CUSTOM_WEIGHTS = "/kaggle/input/mask-gloves-preson-weights/custom-weights.pt"
    CAP_WEIGHTS = "/kaggle/input/capsweights/my_weights.pt"
    LOGO_WEIGHTS = "/kaggle/input/logoweights/logo.pt"
    
    # Video paths
    VIDEO_INPUT = "/kaggle/input/testvedio1/videolong.mp4"
    VIDEO_OUTPUT = "/kaggle/working/output_video3.mp4"
    
    # Detection thresholds
    PERSON_CONF = 0.4
    LOGO_CONF = 0.5
    CAP_CONF = 0.4
    IOU_THRESHOLD = 0.5
    
    # Violation settings
    VIOLATION_DIR = "violations7"
    VIOLATION_DURATION = 2  # seconds
    
    # Google Drive
    SERVICE_ACCOUNT_FILE = '/kaggle/input/upload/wide-planet-449115-b2-c6af973cadb6.json'
    DRIVE_SCOPES = ['https://www.googleapis.com/auth/drive']
    # Date configuration - Make this a variable you can change
    PROCESSING_DATE = "04/01/2025"  # Format: MM/DD/YYYY - Change this to your desired date
    # Or use current date by default:
    # PROCESSING_DATE = datetime.now(timezone.utc).date().strftime("%m/%d/%Y")

# Initialize config
config = Config()

In [None]:
# ===== MODEL LOADING =====
def load_models():
    """Load all YOLO models with error handling"""
    print("Loading models...")
    try:
        custom_model = YOLO(config.CUSTOM_WEIGHTS)  # person, mask, gloves
        cap_model = YOLO(config.CAP_WEIGHTS)       # hardhat detection
        logo_model = YOLO(config.LOGO_WEIGHTS)     # logo detection
        print("Models loaded successfully")
        return custom_model, cap_model, logo_model
    except Exception as e:
        print(f"Error loading models: {str(e)}")
        raise

custom_model, cap_model, logo_model = load_models()

acuracy 

In [None]:
# ===== UTILITY FUNCTIONS =====
@dataclass
class BoundingBox:
    """Data class for bounding box coordinates and metadata"""
    x1: float
    y1: float
    x2: float
    y2: float
    confidence: float
    class_id: int
    label: str

def compute_iou(box1: Union[List[float], np.ndarray], boxes: np.ndarray) -> np.ndarray:
    """Calculate Intersection over Union between boxes"""
    # Convert to numpy arrays if not already
    box1 = np.array(box1)
    boxes = np.array(boxes)
    
    # Calculate intersection coordinates
    x1 = np.maximum(box1[0], boxes[:, 0])
    y1 = np.maximum(box1[1], boxes[:, 1])
    x2 = np.minimum(box1[2], boxes[:, 2])
    y2 = np.minimum(box1[3], boxes[:, 3])
    
    # Compute intersection area
    inter_area = np.maximum(0, x2 - x1) * np.maximum(0, y2 - y1)
    
    # Compute union area
    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
    union_area = area1 + area2 - inter_area
    
    # Avoid division by zero
    return inter_area / (union_area + 1e-6)

def non_max_suppression(boxes: List[List[float]], 
                       scores: List[float], 
                       iou_threshold: float = 0.5) -> List[int]:
    """Apply Non-Maximum Suppression to detection boxes"""
    if not boxes:
        return []
    
    boxes_array = np.array(boxes)
    scores_array = np.array(scores)
    
    # Sort by descending score
    indices = np.argsort(scores_array)[::-1]
    
    keep = []
    while indices.size > 0:
        current = indices[0]
        keep.append(current)
        
        if indices.size == 1:
            break
            
        # Compute IoU between current box and remaining boxes
        current_box = boxes_array[current]
        remaining_boxes = boxes_array[indices[1:]]
        ious = compute_iou(current_box, remaining_boxes)
        
        # Remove boxes with IoU > threshold
        indices = indices[1:][ious <= iou_threshold]
    
    return keep

In [None]:
# ===== WORKER TRACKING =====
class WorkerTracker:
    """Track workers across frames and manage violations"""
    def __init__(self, fps: int):
        self.fps = fps
        self.worker_id_counter = 0
        self.worker_history = {}  # worker_id: last_box
        self.violation_state = {} # worker_id: violation_data
        
    def assign_id(self, new_box: List[float]) -> int:
        """Assign new or existing worker ID based on box position"""
        new_cx = (new_box[0] + new_box[2]) / 2
        new_cy = (new_box[1] + new_box[3]) / 2
        
        for wid, last_box in self.worker_history.items():
            last_cx = (last_box[0] + last_box[2]) / 2
            last_cy = (last_box[1] + last_box[3]) / 2
            if np.sqrt((new_cx - last_cx)**2 + (new_cy - last_cy)**2) < 50:
                return wid
                
        self.worker_id_counter += 1
        return self.worker_id_counter
    
    def update_violation(self, worker_id: int, frame_count: int, 
                        violations: set) -> bool:
        """Update and check if violation duration exceeds threshold"""
        if not violations:
            if worker_id in self.violation_state:
                del self.violation_state[worker_id]
            return False
            
        violation_key = tuple(sorted(violations))
        
        if worker_id not in self.violation_state:
            self.violation_state[worker_id] = {
                "start_frame": frame_count,
                "types": violation_key,
                "saved": False
            }
            return False
            
        duration = (frame_count - self.violation_state[worker_id]["start_frame"]) / self.fps
        return duration >= config.VIOLATION_DURATION and not self.violation_state[worker_id]["saved"]

In [None]:
# ===== VIDEO PROCESSING =====
class VideoProcessor:
    """Handle video processing pipeline"""
    def __init__(self):
        self.cap = cv2.VideoCapture(config.VIDEO_INPUT)
        self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
        self.tracker = WorkerTracker(self.fps)
        os.makedirs(config.VIOLATION_DIR, exist_ok=True)
        
    def process_frame(self, frame: np.ndarray, frame_count: int) -> np.ndarray:
        """Process a single frame for safety violations"""
        # Run all model predictions
        custom_results = custom_model(frame, verbose=False)[0]
        cap_results = cap_model(frame, verbose=False)[0]
        logo_results = logo_model(frame, verbose=False)[0]
        
        # Parse detections
        person_boxes = []
        other_detections = []
        logo_boxes = []
        cap_boxes = []
        
        # Process custom model results (person, mask, gloves)
        for result in custom_results.boxes.data.tolist():
            x1, y1, x2, y2, conf, cls = result
            label = custom_results.names[int(cls)]
            if conf >= config.PERSON_CONF:
                if label == "person":
                    person_boxes.append((x1, y1, x2, y2, conf))
                else:
                    other_detections.append((x1, y1, x2, y2, label, conf))
        
        # Process logo detections
        for result in logo_results.boxes.data.tolist():
            x1, y1, x2, y2, conf, _ = result
            if conf >= config.LOGO_CONF:
                logo_boxes.append((x1, y1, x2, y2, conf))
        
        # Process hardhat detections
        for result in cap_results.boxes.data.tolist():
            x1, y1, x2, y2, conf, cls = result
            if conf >= config.CAP_CONF:
                label = cap_results.names[int(cls)]
                if label in ["Hardhat", "NO-Hardhat"]:
                    cap_boxes.append((x1, y1, x2, y2, label, conf))
        
        # Apply NMS to each detection type
        person_boxes = self._apply_nms_to_detections(person_boxes)
        logo_boxes = self._apply_nms_to_detections(logo_boxes)
        cap_boxes = self._apply_nms_to_detections(cap_boxes)
        other_detections = self._apply_nms_to_detections(other_detections)
        
        # Process each detected person
        for x1, y1, x2, y2, conf in person_boxes:
            worker_id = self._process_worker(frame, x1, y1, x2, y2, 
                                           logo_boxes, other_detections, cap_boxes, 
                                           frame_count)
        
        return frame
    
    def _apply_nms_to_detections(self, detections):
        """Helper method to apply NMS to a list of detections"""
        if not detections:
            return []
            
        boxes = [d[:4] for d in detections]
        scores = [d[-1] if len(d) == 5 else d[4] for d in detections]
        keep = non_max_suppression(boxes, scores, config.IOU_THRESHOLD)
        return [detections[i] for i in keep]
    
    def _process_worker(self, frame, x1, y1, x2, y2, logo_boxes, 
                       other_detections, cap_boxes, frame_count):
        """Process an individual worker's violations"""
        # Check if person has logo (is a worker)
        is_worker = any(lx1 >= x1 and ly1 >= y1 and lx2 <= x2 and ly2 <= y2 
                       for (lx1, ly1, lx2, ly2, _) in logo_boxes)
        
        if is_worker:
            worker_id = self.tracker.assign_id((x1, y1, x2, y2))
            self.tracker.worker_history[worker_id] = (x1, y1, x2, y2)
            
            # Draw worker bounding box
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
            cv2.putText(frame, f"Worker {worker_id}", (int(x1), int(y1)-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
            
            # Check for violations
            violations = self._check_violations(x1, y1, x2, y2, other_detections, cap_boxes, frame)
            
            # Update violation state
            if self.tracker.update_violation(worker_id, frame_count, violations):
                self._save_violation(frame, worker_id, violations, frame_count)
    
    def _check_violations(self, x1, y1, x2, y2, other_detections, cap_boxes, frame):
        """Check for safety violations within worker's bounding box"""
        violations = set()
        
        # Check mask/glove violations
        for vx1, vy1, vx2, vy2, vlabel, _ in other_detections:
            if vx1 >= x1 and vy1 >= y1 and vx2 <= x2 and vy2 <= y2:
                if vlabel in ["no-mask", "no-gloves"]:
                    violations.add(vlabel)
                    color = (0, 0, 255)  # Red for violations
                else:
                    color = (0, 255, 255)  # Yellow for proper equipment
                cv2.rectangle(frame, (int(vx1), int(vy1)), (int(vx2), int(vy2)), color, 2)
                cv2.putText(frame, vlabel, (int(vx1), int(vy1)-10), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        
        # Check hardhat violations
        for cx1, cy1, cx2, cy2, clabel, _ in cap_boxes:
            if cx1 >= x1 and cy1 >= y1 and cx2 <= x2 and cy2 <= y2:
                if clabel == "NO-Hardhat":
                    violations.add(clabel)
                    color = (0, 0, 255)
                else:
                    color = (0, 255, 255)
                cv2.rectangle(frame, (int(cx1), int(cy1)), (int(cx2), int(cy2)), color, 2)
                cv2.putText(frame, clabel, (int(cx1), int(cy1)-10), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        
        return violations
    
    def _save_violation(self, frame, worker_id, violations, frame_count):
        """Save violation frame to disk"""
        violation_str = "_".join(sorted(violations))
        filename = f"violation_worker{worker_id}_{violation_str}_frame{frame_count}.jpg"
        filepath = os.path.join(config.VIOLATION_DIR, filename)
        cv2.imwrite(filepath, frame)
        self.tracker.violation_state[worker_id]["saved"] = True
    
    def process_video(self):
        """Main video processing loop"""
        frame_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        
        # Remove the 'with' statement and manually manage the VideoWriter
        out = cv2.VideoWriter(config.VIDEO_OUTPUT, fourcc, self.fps, 
                             (frame_width, frame_height))
        
        frame_count = 0
        try:
            while self.cap.isOpened():
                ret, frame = self.cap.read()
                if not ret:
                    break
                
                processed_frame = self.process_frame(frame, frame_count)
                out.write(processed_frame)
                frame_count += 1
        finally:
            # Ensure resources are properly released
            self.cap.release()
            out.release()
            print(f"Processed {frame_count} frames")

In [None]:
# ===== GOOGLE DRIVE INTEGRATION =====
class DriveUploader:
    """Handle Google Drive uploads and folder management"""
    def __init__(self):
        self.service = self._authenticate()
        self.root_folder = self._get_or_create_root_folder()
        self.metadata = self._load_metadata()
    
    def _authenticate(self):
        """Authenticate with service account credentials"""
        creds = service_account.Credentials.from_service_account_file(
            config.SERVICE_ACCOUNT_FILE, 
            scopes=config.DRIVE_SCOPES
        )
        return build('drive', 'v3', credentials=creds)
    
    def _get_or_create_root_folder(self):
        """Get or create the root violations folder"""
        query = "name='Safety_Violation_System1' and mimeType='application/vnd.google-apps.folder'"
        results = self.service.files().list(q=query, fields="files(id,name)").execute()
        
        if results.get('files'):
            folder = results['files'][0]
            print(f"Using existing folder: {folder['name']} ({folder['id']})")
            return folder
        
        folder_metadata = {
            'name': 'Safety_Violation_System1',
            'mimeType': 'application/vnd.google-apps.folder'
        }
        folder = self.service.files().create(body=folder_metadata, fields='id,name').execute()
        print(f"Created new folder: {folder['name']} ({folder['id']})")
        return folder
    
    def _load_metadata(self):
        """Load or create metadata file"""
        query = f"name='violation_metadata.json' and '{self.root_folder['id']}' in parents"
        results = self.service.files().list(q=query, fields="files(id)").execute()
        
        if results.get('files'):
            file_id = results['files'][0]['id']
            request = self.service.files().get_media(fileId=file_id)
            return json.loads(request.execute().decode('utf-8'))
        
        return {
            'root_folder_id': self.root_folder['id'],
            'date_folders': {},
            'created_at': datetime.now(timezone.utc).isoformat()
        }
    
    def _get_date_folder(self, date_str):
        """Get or create date-specific folder"""
        if date_str in self.metadata['date_folders']:
            return self.metadata['date_folders'][date_str]
        
        folder_name = f"violations_{date_str}_{uuid.uuid4().hex[:8]}"
        folder_metadata = {
            'name': folder_name,
            'mimeType': 'application/vnd.google-apps.folder',
            'parents': [self.root_folder['id']]
        }
        folder = self.service.files().create(body=folder_metadata, fields='id,name').execute()
        
        self.metadata['date_folders'][date_str] = {
            'folder_id': folder['id'],
            'folder_name': folder['name'],
            'display_date': config.PROCESSING_DATE,
            'created_at': datetime.now(timezone.utc).isoformat()
        }
        return self.metadata['date_folders'][date_str]
    
    def upload_files(self):
        """Upload all violation files to Google Drive"""
        try:
            date_str = datetime.strptime(config.PROCESSING_DATE, "%m/%d/%Y").strftime("%m_%d_%Y")
            date_folder = self._get_date_folder(date_str)
            
            # Upload video
            self._upload_file(config.VIDEO_OUTPUT, date_folder['folder_id'], 'video/mp4')
            
            # Upload violation images
            for filename in os.listdir(config.VIOLATION_DIR):
                if filename.endswith('.jpg'):
                    filepath = os.path.join(config.VIOLATION_DIR, filename)
                    self._upload_file(filepath, date_folder['folder_id'], 'image/jpeg')
            
            # Share with email
            self._share_folder(self.root_folder['id'], "nadiamaged2003@gmail.com")
            
            # Save metadata
            self._save_metadata()
            
            print("Upload completed successfully")
        except Exception as e:
            print(f"Upload failed: {str(e)}")
    
    def _upload_file(self, filepath, folder_id, mime_type):
        """Upload a single file with retry logic"""
        filename = os.path.basename(filepath)
        file_metadata = {
            'name': filename,
            'parents': [folder_id]
        }
        media = MediaFileUpload(filepath, mimetype=mime_type)
        
        for attempt in range(3):
            try:
                self.service.files().create(
                    body=file_metadata,
                    media_body=media,
                    fields='id'
                ).execute()
                print(f"Uploaded {filename}")
                return
            except Exception as e:
                if attempt == 2:
                    raise
                time.sleep(5)
    
    def _share_folder(self, folder_id, email):
        """Share folder with specified email"""
        permission = {
            'type': 'user',
            'role': 'writer',
            'emailAddress': email
        }
        self.service.permissions().create(
            fileId=folder_id,
            body=permission,
            fields='id'
        ).execute()
        print(f"Shared folder with {email}")
    
    def _save_metadata(self):
        """Update metadata file in Drive"""
        temp_file = 'temp_metadata.json'
        with open(temp_file, 'w') as f:
            json.dump(self.metadata, f)
        
        query = f"name='violation_metadata.json' and '{self.root_folder['id']}' in parents"
        results = self.service.files().list(q=query, fields="files(id)").execute()
        
        if results.get('files'):
            file_id = results['files'][0]['id']
            media = MediaFileUpload(temp_file, mimetype='application/json')
            self.service.files().update(
                fileId=file_id,
                media_body=media
            ).execute()
        else:
            file_metadata = {
                'name': 'violation_metadata.json',
                'parents': [self.root_folder['id']]
            }
            media = MediaFileUpload(temp_file, mimetype='application/json')
            self.service.files().create(
                body=file_metadata,
                media_body=media,
                fields='id'
            ).execute()
        
        os.remove(temp_file)

In [None]:
# ===== MAIN EXECUTION =====
if __name__ == "__main__":
    print("Starting safety violation detection...")
    
    # Process video
    processor = VideoProcessor()
    processor.process_video()
    print("Video processing completed")
    
    # Upload results
    uploader = DriveUploader()
    uploader.upload_files()
    
    print("Processing complete!")