AquaGuard: Quality Control for Bottled Water

This is our requirement to use this code:
+ Python version >= 10.0
+ Ultralytics
+ Cuda 11.8
+ Cudnn
+ Torch with cuda support for version CUDA 11.8

In [None]:
import csv
import pandas as pd
import os
from ultralytics import YOLO
import cv2
import numpy as np
import torch
from datetime import datetime

# Load YOLO models for detection and segmentation
device = "cuda" if torch.cuda.is_available() else "cpu"  # Check if GPU is available
model_detection = YOLO('models\yolo11n.pt').to(device)  # YOLOv11 model for detection
model_segmentation = YOLO("models\yolo11_seg.pt").to(device)  # Model for segmentation

# Load video
video_path = 'test.mp4'
cap = cv2.VideoCapture(video_path)

# Initialize detection variables
object_id_counter = 1
active_objects = {}
object_status = {} 
exit_threshold = 100
missed_threshold = 20
extension_factor_x = 1.50 # Extend bounding box by 50% in x-axis
extension_factor_y = 1.30 # Extend bounding box by 30% in y-axis

# Panel information
total_wrong_bottles = 0
total_product = 0 

# Get video frame width and height
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
left_boundary = frame_width // 5
right_boundary = 4 * frame_width // 5

# Check if file exists or delete and create a new one
file_path = "result.csv"
if os.path.exists(file_path):
    os.remove(file_path)

# Create a new CSV file and write header
with open(file_path, mode="w", newline="") as csv_file:
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(["Bottle ID", "Label Status", "Bottle Cap Status", "Bottle Body Status", "Monitoring date"])

# Function to create information panel
def create_info_panel(bottle_id, label_status, bottle_cap_status, bottle_body_status):
    panel = np.ones((720, 300, 3), dtype=np.uint8) * 220

    cv2.putText(panel, f"Bottle ID: {bottle_id if bottle_id is not None else 'N/A'}", (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    if label_status == "Wrong Label": # Red color for Wrong Label
        label_color = (0, 0, 255)  
    elif label_status == "No Label": # Yellow color for No Label
        label_color = (0, 255, 255) 
    else:
        label_color = (0, 255, 0) # Green color for OK Label
        
    # Display information    
    cv2.putText(panel, f"Label: {label_status}", (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, label_color, 2)

    cap_color = (0, 255, 0) if bottle_cap_status == "OK" else (0, 0, 255)
    cv2.putText(panel, f"Bottle Cap: {bottle_cap_status}", (10, 160), cv2.FONT_HERSHEY_SIMPLEX, 0.6, cap_color, 2)

    body_color = (0, 255, 0) if bottle_body_status == "OK" else (0, 0, 255)
    cv2.putText(panel, f"Bottle Body: {bottle_body_status}", (10, 200), cv2.FONT_HERSHEY_SIMPLEX, 0.6, body_color, 2)

    cv2.putText(panel, f"Total wrong bottles: {total_wrong_bottles}", (10, 260), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
    cv2.putText(panel, f"Total products: {total_product}", (10, 290), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)

    return panel

# Process video frames
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Draw detection boundaries
    cv2.line(frame, (left_boundary, 0), (left_boundary, frame_height), (0, 255, 255), 2)
    cv2.line(frame, (right_boundary, 0), (right_boundary, frame_height), (0, 255, 255), 2)

    # Run object detection
    detection_results = model_detection.track(frame, persist=True) # Persist to track objects
    new_active_objects = {}

    # Loop through detected objects in detection results
    for obj in detection_results[0].boxes:
        if obj.cls == 39:
            bbox = tuple(obj.xyxy[0].int().tolist())
            center_x, center_y = (bbox[0] + bbox[2]) // 2, (bbox[1] + bbox[3]) // 2

            if left_boundary <= center_x <= right_boundary:
                found_existing_id = False # Flag to check if object ID is found
                for object_id, (last_position, missed_frames) in active_objects.items():
                    last_x, last_y = last_position # Get last position of object ID
                    distance = ((center_x - last_x) ** 2 + (center_y - last_y) ** 2) ** 0.5 # Calculate distance between current and last position

                    if distance < exit_threshold: # If distance is less than exit_threshold, update object ID
                        new_active_objects[object_id] = ((center_x, center_y), 0) # Update object ID
                        found_existing_id = True # Set flag to True
                        bottle_id = object_id # Set bottle_id to object_id
                        break

                if not found_existing_id: # If object ID is not found, create a new object ID
                    new_active_objects[object_id_counter] = ((center_x, center_y), 0)
                    bottle_id = object_id_counter
                    object_id_counter += 1 # Increment object_id_counter

                    # Reset status for a new bottle
                    object_status[bottle_id] = { 
                        "label_status": "No Label",
                        "bottle_cap_status": "No Bottle Cap",
                        "bottle_body_status": "OK",
                        "is_counted": False 
                    }

                    total_product += 1  # Increment total_product when a new bottle is detected

                # Extend bounding box
                width = bbox[2] - bbox[0]
                height = bbox[3] - bbox[1]
                new_width = int(width * extension_factor_x)
                new_height = int(height * extension_factor_y)
                new_x1 = max(0, center_x - new_width // 2)
                new_y1 = max(0, center_y - new_height // 2)
                new_x2 = min(frame_width, center_x + new_width // 2)
                new_y2 = min(frame_height, center_y + new_height // 2)

                roi = frame[new_y1:new_y2, new_x1:new_x2] # Extract ROI for segmentation
                seg_results = model_segmentation.predict( # Run segmentation on the ROI
                    source=roi,
                    show=False,
                    save=False,
                    conf=0.3
                )

                is_bottle_wrong = False # Flag to check if bottle has any defect

                if seg_results:
                    annotated_roi = seg_results[0].plot()
                    frame[new_y1:new_y2, new_x1:new_x2] = annotated_roi

                    # Check for Label, Bottle Cap, DentedBody, and NoLabel in segmentation results
                    for seg_obj in seg_results[0].boxes:
                        if seg_obj.cls == 0:  # BrokenCap
                            object_status[bottle_id]["bottle_cap_status"] = "Broken"
                            is_bottle_wrong = True
                        elif seg_obj.cls == 1 and seg_obj.conf >= 0.5:
                            object_status[bottle_id]["bottle_cap_status"] = "OK"
                        elif seg_obj.cls == 3 and seg_obj.conf >= 0.5:
                            if object_status[bottle_id]["label_status"] != "Wrong Label":
                                object_status[bottle_id]["label_status"] = "OK"
                        elif seg_obj.cls == 6 and seg_obj.conf >= 0.3:
                            object_status[bottle_id]["label_status"] = "Wrong Label"
                            is_bottle_wrong = True
                        elif seg_obj.cls == 5 and seg_obj.conf >= 0.3:
                            object_status[bottle_id]["label_status"] = "No Label"
                            is_bottle_wrong = True
                        elif seg_obj.cls == 2:
                            object_status[bottle_id]["bottle_body_status"] = "Dented"
                            is_bottle_wrong = True

                # Count bottle as wrong if it has any defect and not counted yet
                if is_bottle_wrong and not object_status[bottle_id]["is_counted"]:
                    total_wrong_bottles += 1
                    object_status[bottle_id]["is_counted"] = True  # Mark bottle as counted

                # Draw bounding box and ID within boundaries
                if left_boundary <= center_x <= right_boundary:
                    cv2.rectangle(frame, (new_x1, new_y1), (new_x2, new_y2), (0, 255, 0), 2)
                    cv2.putText(frame, f"ID: {bottle_id}",
                                (new_x1, new_y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                
                # Get current date and time to save monitoring date
                monitoring_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

                # Save bottle information to CSV file
                with open(file_path, mode="a", newline="") as csv_file:
                    csv_writer = csv.writer(csv_file)
                    csv_writer.writerow([ 
                        bottle_id,
                        object_status[bottle_id]["label_status"],
                        object_status[bottle_id]["bottle_cap_status"],
                        object_status[bottle_id]["bottle_body_status"],
                        monitoring_date
                    ])

    # Check for missed frames and remove object ID if missed frames exceed threshold
    for object_id, (last_position, missed_frames) in active_objects.items():
        if object_id not in new_active_objects:
            if missed_frames < missed_threshold:
                new_active_objects[object_id] = (last_position, missed_frames + 1)

    active_objects = {k: v for k, v in new_active_objects.items() if left_boundary <= v[0][0] <= right_boundary}

    # Display information panel
    if bottle_id in object_status:
        status = object_status[bottle_id]
        info_panel = create_info_panel(bottle_id, status["label_status"], status["bottle_cap_status"], status["bottle_body_status"])
    else:
        info_panel = create_info_panel(None, "N/A", "N/A", "N/A")

    frame_resized = cv2.resize(frame, (640, 720))
    stacked_frame = np.hstack((frame_resized, info_panel))

    cv2.imshow("Bottle Inspection", stacked_frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Save cleaned CSV file without duplicate entries
df = pd.read_csv('result.csv')
df_cleaned = df.drop_duplicates(subset=['Bottle ID'], keep='last')
df_cleaned.to_csv('result.csv', index=False) 

cap.release()
cv2.destroyAllWindows()
