# Install packages

In [None]:
#pip install pandas numpy torch torchvision yolo opencv-python ultralytics PIL glob2 -q

In [None]:
import torch
import torchvision

import os
import numpy
import cv2
from torchsummary import summary
from ultralytics import YOLO

# Select device

In [None]:
import torch
device=["cuda" if torch.cuda.is_available() else "cpu"] # gpu switch

# Data spliting --> train & validation & test

In [None]:
import cv2
import os
import random
import shutil
import numpy as np

def split_dataset(input_dir, output_dir, train_ratio=0.7, val_ratio=0.2):
    
    #Split the dataset into train, validation, and test subsets.
    
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    for subset in ['train', 'val', 'test']:
        os.makedirs(os.path.join(output_dir, subset,'' ), exist_ok=True)
        os.makedirs(os.path.join(output_dir, subset,'' ), exist_ok=True)

    image_files = [f for f in os.listdir(input_dir) if f.endswith(('.jpg', '.png'))]
    random.shuffle(image_files)
    
    total = len(image_files)
    train_end = int(total * train_ratio)
    val_end = train_end + int(total * val_ratio)
    
    subsets = {
        'train': image_files[:train_end],
        'val': image_files[train_end:val_end],
        'test': image_files[val_end:]
    }
    
    for subset, files in subsets.items():
        for image_file in files:
            base_name = os.path.splitext(image_file)[0]
            annotation_file = base_name + '.txt'
            
            # Copy images
            shutil.copy(os.path.join(input_dir, image_file), os.path.join(output_dir, subset, '', image_file))
            # Copy annotations
            if os.path.exists(os.path.join(input_dir, annotation_file)):
                shutil.copy(os.path.join(input_dir, annotation_file), os.path.join(output_dir, subset, '', annotation_file))

def augment_images_and_annotations(input_images_path, input_annotations_path, output_path, augmentations_count):
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    
    for image_name in os.listdir(input_images_path):
        if not image_name.endswith(('.jpg', '.png')):
            continue
        
        image_path = os.path.join(input_images_path, image_name)
        annotation_path = os.path.join(input_annotations_path, image_name.replace('.jpg', '.txt').replace('.png', '.txt'))
        
        image = cv2.imread(image_path)
        img_height, img_width = image.shape[:2]
        
        if not os.path.exists(annotation_path):
            print(f"Annotation file for {image_name} not found. Skipping.")
            continue
        
        bboxes = parse_yolo_annotations(annotation_path, img_width, img_height)
        
        for i in range(augmentations_count):
            augmented_image, augmented_bboxes = random_scaling(image, bboxes)
            augmented_image, augmented_bboxes = random_horizontal_flip(augmented_image, augmented_bboxes)
            augmented_image, augmented_bboxes = random_rotation(augmented_image, augmented_bboxes)
            augmented_image, augmented_bboxes = random_shearing(augmented_image, augmented_bboxes)
            augmented_image = adjust_brightness(augmented_image)
            
            # Save augmented image
            augmented_image_name = f"{os.path.splitext(image_name)[0]}_aug_{i}.jpg"
            augmented_image_path = os.path.join(output_path, augmented_image_name)
            cv2.imwrite(augmented_image_path, augmented_image)
            
            # Save augmented annotations
            augmented_annotation_path = os.path.join(output_path, augmented_image_name.replace('.jpg', '.txt'))
            yolo_bboxes = convert_to_yolo_format(augmented_bboxes, img_width, img_height)
            with open(augmented_annotation_path, 'w') as annotation_file:
                annotation_file.write("\n".join(yolo_bboxes))

# Example usage
# Step 1: Split the dataset
split_dataset(
    input_dir="dataset",
    output_dir="dataset",
    train_ratio=0.7,
    val_ratio=0.2  # Remaining 0.1 will be for the test set
)

print("Data split successfully completed.")


# Train data augmentation

In [None]:
import cv2
import os
import random
import numpy as np

# Define augmentation functions
def random_scaling(image, bboxes):
    scale = random.uniform(0.8, 1.2)
    height, width = image.shape[:2]
    new_width, new_height = int(width * scale), int(height * scale)
    scaled_image = cv2.resize(image, (new_width, new_height))
    
    # Adjust bounding boxes
    new_bboxes = []
    for bbox in bboxes:
        class_id, x_min, y_min, x_max, y_max = bbox
        x_min *= scale
        y_min *= scale
        x_max *= scale
        y_max *= scale
        new_bboxes.append([class_id, x_min, y_min, x_max, y_max])
    
    # Crop or pad to original size
    if scale > 1.0:
        start_x = (new_width - width) // 2
        start_y = (new_height - height) // 2
        cropped_image = scaled_image[start_y:start_y + height, start_x:start_x + width]
        for bbox in new_bboxes:
            bbox[1] -= start_x
            bbox[2] -= start_y
            bbox[3] -= start_x
            bbox[4] -= start_y
    else:
        padded_image = np.zeros_like(image)
        start_x = (width - new_width) // 2
        start_y = (height - new_height) // 2
        padded_image[start_y:start_y + new_height, start_x:start_x + new_width] = scaled_image
        cropped_image = padded_image
        for bbox in new_bboxes:
            bbox[1] += start_x
            bbox[2] += start_y
            bbox[3] += start_x
            bbox[4] += start_y
    
    return cropped_image, new_bboxes

def random_horizontal_flip(image, bboxes):
    if random.random() < 0.5:
        image = cv2.flip(image, 1)
        width = image.shape[1]
        new_bboxes = []
        for bbox in bboxes:
            class_id, x_min, y_min, x_max, y_max = bbox
            new_bboxes.append([class_id, width - x_max, y_min, width - x_min, y_max])
        return image, new_bboxes
    return image, bboxes

def parse_yolo_annotations(annotation_path, img_width, img_height):
    with open(annotation_path, 'r') as file:
        annotations = []
        for line in file:
            parts = line.strip().split()
            class_id = int(parts[0])
            x_center, y_center, bbox_width, bbox_height = map(float, parts[1:])
            x_min = (x_center - bbox_width / 2) * img_width
            y_min = (y_center - bbox_height / 2) * img_height
            x_max = (x_center + bbox_width / 2) * img_width
            y_max = (y_center + bbox_height / 2) * img_height
            annotations.append([class_id, x_min, y_min, x_max, y_max])
        return annotations

def convert_to_yolo_format(bboxes, img_width, img_height):
    yolo_bboxes = []
    for bbox in bboxes:
        class_id, x_min, y_min, x_max, y_max = bbox
        x_center = (x_min + x_max) / 2 / img_width
        y_center = (y_min + y_max) / 2 / img_height
        bbox_width = (x_max - x_min) / img_width
        bbox_height = (y_max - y_min) / img_height
        yolo_bboxes.append(f"{class_id} {x_center} {y_center} {bbox_width} {bbox_height}")
    return yolo_bboxes

def augment_images_and_annotations(input_images_path, input_annotations_path, output_path, augmentations_count):
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    
    for image_name in os.listdir(input_images_path):
        if not image_name.endswith(('.jpg', '.png')):
            continue
        
        image_path = os.path.join(input_images_path, image_name)
        annotation_path = os.path.join(input_annotations_path, image_name.replace('.jpg', '.txt').replace('.png', '.txt'))
        
        image = cv2.imread(image_path)
        img_height, img_width = image.shape[:2]
        
        if not os.path.exists(annotation_path):
            print(f"Annotation file for {image_name} not found. Skipping.")
            continue
        
        bboxes = parse_yolo_annotations(annotation_path, img_width, img_height)
        
        for i in range(augmentations_count):
            augmented_image, augmented_bboxes = random_scaling(image, bboxes)
            augmented_image, augmented_bboxes = random_horizontal_flip(augmented_image, augmented_bboxes)
            
            # Save augmented image
            augmented_image_name = f"{os.path.splitext(image_name)[0]}_aug_{i}.jpg"
            augmented_image_path = os.path.join(output_path, augmented_image_name)
            cv2.imwrite(augmented_image_path, augmented_image)
            
            # Save augmented annotations
            augmented_annotation_path = os.path.join(output_path, augmented_image_name.replace('.jpg', '.txt'))
            yolo_bboxes = convert_to_yolo_format(augmented_bboxes, img_width, img_height)
            with open(augmented_annotation_path, 'w') as annotation_file:
                annotation_file.write("\n".join(yolo_bboxes))

# Example usage
augment_images_and_annotations(
    input_images_path="dataset/train",
    input_annotations_path="dataset/train",
    output_path="dataset/train",
    augmentations_count=2
)


# Create a YAML file

In [None]:
import os

# Path to the images directory
images_dir = '/dataset'

# Path to the classes file
classes_file = 'classes.txt'

# Load classes from the classes file
with open(classes_file, 'r') as f:
    classes = [line.strip() for line in f]

# Create a YAML file
with open('data.yaml', 'w') as f:
    f.write(f'train: {images_dir+"/train"}\n')
    f.write(f'val: {images_dir+"/val"}\n')  # Same directory used for validation (can be modified)
    f.write(f'nc: {len(classes)}\n')
    f.write(f'names: {classes}\n')

# Load the YAML file using PyYAML
import yaml
with open('data.yaml', 'r') as file:
   prime_service = yaml.safe_load(file)
prime_service

# Import YOLO model

In [None]:
from ultralytics import YOLO
model = YOLO('best.pt')

# Model training

In [None]:
training=model.train(
   data='data.yaml',
   epochs=199,
   batch=37,
   plots=True,
    imgsz=640,
   name='YOLO_11x_train')

In [None]:
model.val()

In [None]:
pred=model.predict("dataset/test", save=True, imgsz=640, conf=0.5)

# Pictures prediction

In [None]:
pred = model("tr3.jpg") 
for result in pred:
    boxes = result.boxes  # Boxes object for bounding box outputs
    masks = result.masks  # Masks object for segmentation masks outputs
    keypoints = result.keypoints  # Keypoints object for pose outputs
    probs = result.probs  # Probs object for classification outputs
    obb = result.obb  # Oriented boxes object for OBB outputs
    result.show()  # display to screen
      # save to disk

# Implement video

In [None]:
# Open the video file
import cv2
cap = cv2.VideoCapture('video.mp4')

while cap.isOpened():
    # Read a frame qqrom the video
    success, frame = cap.read()

    if success:
        # Perform object detection on the frame
        results = model(frame)

        # Visualize the results on the frame
        annotated_frame = results[0].plot()

        # Display the annotated frame
        cv2.imshow('YOLOv11 Detection', annotated_frame)

        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    else:
        # Break the loop if the end of the video is reached
        break

# Release the video capture object and close all windows
cap.release()
cv2.destroyAllWindows()

# Horizontal line counting -accumulated

In [None]:
#Horizontal

import cv2
from ultralytics import YOLO

# Load the trained YOLOv11 model
model = YOLO('best.pt')  # Replace with your model path

# Initialize video capture
video_path = 'video.mp4'  # Replace with your video path
cap = cv2.VideoCapture(video_path)

# Define ROI line position
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
roi_line_position = int(frame_width * 0.2)  # Adjust as needed

# Initialize variables
truck_count = 0
counted_ids = set()

# Main loop
while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Perform detection and tracking
    results = model.track(frame, tracker='bytetrack.yaml')

    # Process results
    for result in results:
        boxes = result.boxes
        for box in boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            class_id = int(box.cls[0])
            confidence = float(box.conf[0])
            track_id = int(box.id[0]) if box.id is not None else None

            # Filter for trucks
            if class_id == 0 and confidence > 0.5:
                # Draw bounding box and track ID
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f'ID: {track_id}', (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

                # Calculate centroid
                cx = int((x1 + x2) / 2)
                cy = int((y1 + y2) / 2)

                # Check if the truck has crossed the ROI line
                if track_id not in counted_ids and cy > roi_line_position:
                    truck_count += 1
                    counted_ids.add(track_id)
                    print(f"Truck count: {truck_count}")

    # Draw ROI line
    cv2.line(frame, (0, roi_line_position), (frame_width, roi_line_position),
             (255, 0, 0), 2)

    # Display the truck count
    cv2.putText(frame, f"Truck Count: {truck_count}", (10, 50),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

    # Show the frame
    cv2.imshow('Advanced Truck Counter', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
cap.release()
cv2.destroyAllWindows()


# Vertical line counting -accumulated

In [None]:
#Vertical
import cv2
from ultralytics import YOLO

# Load the trained YOLOv11 model
model = YOLO('best.pt')  # Replace with your model path

# Initialize video capture
video_path = 'video.mp4'  # Replace with your video path
cap = cv2.VideoCapture(video_path)

# Define ROI line position (vertical line)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
roi_line_position = int(frame_width * 0.2)  # Adjust as needed

# Initialize variables
truck_count = 0
counted_ids = set()

# Main loop
while True:
    ret, frame = cap.read()
    if not ret:
        break

    # Perform detection and tracking
    results = model.track(frame, tracker='bytetrack.yaml')

    # Process results
    for result in results:
        boxes = result.boxes
        for box in boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            class_id = int(box.cls[0])
            confidence = float(box.conf[0])
            track_id = int(box.id[0]) if box.id is not None else None

            # Filter for trucks
            if class_id == 0 and confidence > 0.5:
                # Draw bounding box and track ID
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f'ID: {track_id}', (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

                # Calculate centroid
                cx = int((x1 + x2) / 2)
                cy = int((y1 + y2) / 2)

                # Check if the truck has crossed the ROI line
                if track_id not in counted_ids and cx > roi_line_position:
                    truck_count += 1
                    counted_ids.add(track_id)
                    print(f"Truck count: {truck_count}")

    # Draw ROI line (vertical line)
    cv2.line(frame, (roi_line_position, 0), (roi_line_position, frame_height),
             (255, 0, 0), 2)

    # Display the truck count
    cv2.putText(frame, f"Truck Count: {truck_count}", (10, 50),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

    # Show the frame
    cv2.imshow('Advanced Truck Counter', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
cap.release()
cv2.destroyAllWindows()


# Momentary counting

In [None]:
# Object counter

import cv2
from ultralytics import YOLO

# Load the YOLO model
model = YOLO('best.pt')  # Pre-trained YOLOv11 model (or replace with your own model)

# Open the video file
cap = cv2.VideoCapture('video.mp4')

# Store the total object count
total_objects_count = 0

while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()

    if success:
        # Perform object detection on the current frame
        results = model(frame)

        # Get the number of objects detected in the current frame
        current_objects_count = len(results[0].boxes)

        # Update the total object count to the current count
        total_objects_count = current_objects_count

        # Generate an annotated frame with detected objects
        annotated_frame = results[0].plot()

        # Display the total object count on the frame
        cv2.putText(
            annotated_frame,
            f"Total Objects: {total_objects_count}",
            (10, 50),
            cv2.FONT_HERSHEY_SIMPLEX,
            1,
            (0, 255, 0),
            2,)

        # Display the frame in a window
        cv2.imshow('YOLOv11 Detection', annotated_frame)

        # Exit the loop if the 'q' key is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    else:
        # Exit the loop if the end of the video is reached
        break

# Release the video resource and close the window
cap.release()
cv2.destroyAllWindows()
