# YOLO V8 PEOPLE COUNT AND HELMET DETECTION  IN TRAFFIC

Collect Images From Video Dataset

In [None]:
import cv2
import os

def video_to_frames(video_path, output_folder, frame_interval=1):
    # Create the output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Capture the video from the given path
    cap = cv2.VideoCapture(video_path)
    count = 0
    saved_frame_count = 0

    while True:
        # Read the frame from the video
        ret, frame = cap.read()

        # If there are no more frames, break the loop
        if not ret:
            break

        # Save the frame only if it's the nth frame
        if count % frame_interval == 0:
            frame_filename = os.path.join(output_folder, f"frame_4{saved_frame_count:05d}.jpeg")
            cv2.imwrite(frame_filename, frame)
            saved_frame_count += 1

        # Increase the frame count
        count += 1

    # Release the video capture object
    cap.release()
    print(f"Frames extracted and saved in {output_folder}")



# Example usage:
video_path = '/home/manikkamani/Downloads/test_s1/3553898991-preview.mp4'
output_folder = '/home/manikkamani/Downloads/test_s1/test_data'

# Extract every 8th frame
video_to_frames(video_path, output_folder, frame_interval=8)


Image Augmentation

In [50]:
import os
import random
from PIL import Image
from torchvision import transforms

# Define the path to your image folder
image_folder = '/home/manikkamani/Downloads/test_s1/test_data'

# List all image files in the folder
image_files = [f for f in os.listdir(image_folder) if os.path.isfile(os.path.join(image_folder, f))]

# Number of images to randomly select
num_images_to_augment = int(len(image_files)*0.3)

# Randomly select image files
selected_files = random.sample(image_files, num_images_to_augment)


# Define the additional augmentations
additional_transform = transforms.Compose([
    transforms.ColorJitter(brightness=0.2),  # Adjust brightness
    transforms.RandomHorizontalFlip(),  # Optional: Random horizontal flip
    transforms.ToTensor()  # Convert to tensor
])

# Function to apply additional transformations
def process_image(image_path):
    # Open the image
    image = Image.open(image_path)
    
    # Apply additional transformations
    augmented_image = additional_transform(image)
    
    return augmented_image

# Process and save/display the images
for file in selected_files:
    image_path = os.path.join(image_folder, file)
    augmented_image = process_image(image_path)
    
    # Convert tensor back to PIL image for display
    to_pil = transforms.ToPILImage()
    augmented_image_pil = to_pil(augmented_image)
    
    # Save or display the augmented image
    augmented_image_pil.save(f'{image_folder}/_augmented_{file}')

Split Data in to Train Test Validation

In [51]:
import os
import shutil
import random

def split_dataset(image_dir, label_dir, output_dir, train_size=0.7, val_size=0.2):

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # Create directories for splits
    for split in ['train', 'val', 'test']:
        for subdir in ['images', 'labels']:
            os.makedirs(os.path.join(output_dir, subdir, split), exist_ok=True)

    # List all files
    all_files = [f for f in os.listdir(image_dir) if f.endswith('.jpg')]  # Assuming .jpg images
    random.shuffle(all_files)  # Shuffle files to ensure random split

    # Calculate number of samples for each split
    total_files = len(all_files)
    train_end = int(train_size * total_files)
    val_end = int((train_size + val_size) * total_files)

    # Split data
    train_files = all_files[:train_end]
    val_files = all_files[train_end:val_end]
    test_files = all_files[val_end:]

    # Function to move files
    def move_files(file_list, split):
        for file_name in file_list:
            # Move image
            src_image = os.path.join(image_dir, file_name)
            dst_image = os.path.join(output_dir, 'images', split, file_name)
            shutil.copy(src_image, dst_image)
            
            # Move label
            label_name = file_name.replace('.jpg', '.txt')
            src_label = os.path.join(label_dir, label_name)
            dst_label = os.path.join(output_dir, 'labels', split, label_name)
            if os.path.exists(src_label):
                shutil.copy(src_label, dst_label)

    # Move files to respective directories
    move_files(train_files, 'train')
    move_files(val_files, 'val')
    move_files(test_files, 'test')

# Example usage
image_directory = '/home/manikkamaDownloads/test_s1/test_data'
label_directory = '/home/manikkamani/Downloads/test_s1/test_data'
output_directory = '/home/manikkamani/Downloads/test_s1/data2'

split_dataset(image_directory, label_directory, output_directory)


Create Training Job Config YAML file

In [56]:
import os
import yaml

def create_yaml(dataset_dir, num_classes, class_names, output_yaml):

    # Paths to dataset splits
    train_path = os.path.join(dataset_dir, 'images', 'train')
    val_path = os.path.join(dataset_dir, 'images', 'val')
    test_path = os.path.join(dataset_dir, 'images', 'test')

    # Prepare YAML content
    yaml_content = {
        'train': train_path,
        'val': val_path,
        'test': test_path,
        'nc': num_classes,
        'names': class_names
    }

    # Write YAML content to file
    with open(f'{output_yaml}/data.yaml', 'w') as file:
        yaml.safe_dump(yaml_content, file, sort_keys=False)

    print(f'YAML configuration file saved to {output_yaml}')


class_names_list = ['person', 'rider', 'with_helmet', 'without_helmet']  # Replace with your actual class names
number_of_classes = len(class_names_list)


create_yaml(image_directory, number_of_classes, class_names_list, output_directory)


YAML configuration file saved to /home/manikkamani/Downloads/test_s1/data2


Model Training

In [None]:
from ultralytics import YOLO

# Load a pretrained YOLOv8 model
model = YOLO('yolov8s.pt')  # Replace with yolov8m.pt, yolov8l.pt, or yolov8x.pt for larger models

# Train the model on your dataset
model.train(data='dataset/data.yaml', epochs=100, imgsz=640, lr0=0.01, batch=8, weight_decay=0.0005)


Inference

In [None]:
import cv2
from ultralytics import YOLO
 
# Load the YOLOv8 model
model = YOLO("runs/detect/train7/weights/best.pt")
 
# Open the video file
video_path = "test_videos/3418603233-preview.mp4"
cap = cv2.VideoCapture(video_path)
 
# Define class labels
rider_label = "rider"
people_label = "people"
without_helmate_label = 'without_helmate'

# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()
 
    if success:
        # Run YOLOv8 inference on the frame
        results = model(frame)
        
        # Extract the detections
        detections = results[0].boxes
        
        # Initialize counters
        rider_count = 0
        people_count = 0
        without_helmate = 0
        
        # Count detections
        for box in detections:
            class_id = int(box.cls)  # Get the class ID for this detection
            class_name = results[0].names[class_id]  # Get the class name from the class ID

            if class_name == rider_label:
                rider_count += 1
            elif class_name == people_label:
                people_count += 1
            elif class_name == without_helmate_label:
                without_helmate += 1
            
 
        # Visualize the results on the frame
        annotated_frame = results[0].plot()
        
        # Add text to display counts
        cv2.putText(annotated_frame, f"{without_helmate_label.capitalize()} Count: {without_helmate}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
        cv2.putText(annotated_frame, f"{people_label.capitalize()} Count: {people_count+rider_count}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
 
        # Display the annotated frame
        cv2.imshow("Inference", annotated_frame)
 
        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    else:
        # Break the loop if the end of the video is reached
        break

# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()