In [None]:
!nvidia-smi
!pip install ipython

In [None]:
!pip install ultralytics

In [None]:
!pip show torch

In [None]:
import torch
print(torch.cuda.is_available())  # Should return True

In [None]:
import torch
print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("CUDA version:", torch.version.cuda)

In [None]:
import torch
print(torch.version.cuda)  # This will show the CUDA version that PyTorch was built with.


In [None]:
import torch
print(torch.__version__)  # Verify PyTorch version
print(torch.cuda.is_available())  # Check if CUDA is available


In [None]:
import torch
print(torch.__version__)
print(torch.cuda.is_available())
print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU available")


In [None]:
import torch

print("Number of GPU: ", torch.cuda.device_count())
print("GPU Name: ", torch.cuda.get_device_name())


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

In [None]:
from ultralytics import YOLO

from IPython.display import display, Image

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os

# Replace with the name of your folder
folder_name = 'dataset/train/images'

# Get the absolute path
absolute_path = os.path.abspath(folder_name)
print(f"Absolute path: {absolute_path}")


In [None]:
import os

# Define paths for images and labels
image_folder = r'archive\dataset\dataset\train\images'
label_folder = r'archive\dataset\dataset\train\labels'
# List the image and label files
image_files = os.listdir(image_folder)
label_files = os.listdir(label_folder)

# Count the current number of images and labels
num_images = len(image_files)
num_labels = len(label_files)

num_images, num_labels


In [None]:
from PIL import Image
import os

# Define the folder containing the images
image_folder = r'archive\dataset\dataset\train\images'

# List all .png files in the folder
png_files = [f for f in os.listdir(image_folder) if f.endswith('.png')]

# Convert each .png to .jpg
for png_file in png_files:
    # Construct full file path
    png_path = os.path.join(image_folder, png_file)
    jpg_path = os.path.join(image_folder, os.path.splitext(png_file)[0] + '.jpg')

    # Open the .png image and convert it to .jpg
    with Image.open(png_path) as img:
        rgb_img = img.convert('RGB')  # Ensure the image is in RGB mode
        rgb_img.save(jpg_path, 'JPEG')

    # Optionally, delete the original .png file
    os.remove(png_path)
    print(f"Converted {png_file} to {os.path.basename(jpg_path)} and deleted the original.")

print("Conversion completed successfully.")


In [None]:
import os

# Define paths for images and labels
image_folder = r'archive\dataset\dataset\train\images'
label_folder = r'archive\dataset\dataset\train\labels'

# List the image and label files (filter by extensions)
image_files = [f for f in os.listdir(image_folder) if f.endswith(('.jpg', '.png', '.jpeg'))]
label_files = [f for f in os.listdir(label_folder) if f.endswith('.txt')]

# Extract base names without extensions for comparison
image_basenames = {os.path.splitext(file)[0] for file in image_files}
label_basenames = {os.path.splitext(file)[0] for file in label_files}

# Find the images that don't have corresponding labels
unmatched_images = image_basenames - label_basenames

# Delete unmatched images
for image_name in unmatched_images:
    # Construct the full file path
    for extension in ['.jpg', '.png', '.jpeg']:
        image_path = os.path.join(image_folder, image_name + extension)
        if os.path.exists(image_path):
            os.remove(image_path)
            print(f"Deleted unmatched image: {image_path}")

# Print completion message
print("Unmatched images deleted successfully.")


In [None]:
potholes66

In [None]:
import os

# Define paths to the folders for each category
pothole_folder = 'archive/dataset/dataset/classes/pothole/images'
crack_folder = 'archive/dataset/dataset/classes/cracks/images'
open_manhole_folder = 'archive/dataset/dataset/classes/open_manhole/images'

# Function to count image files in a folder
def count_images(folder, extensions=('.jpg', '.jpeg', '.png')):
    return len([f for f in os.listdir(folder) if f.endswith(extensions)])

# Count images in each category
pothole_count = count_images(pothole_folder)
crack_count = count_images(crack_folder)
open_manhole_count = count_images(open_manhole_folder)

# Output results
print(f"Number of pothole images: {pothole_count}")
print(f"Number of crack images: {crack_count}")
print(f"Number of open manhole images: {open_manhole_count}")

# Total count
total_images = pothole_count + crack_count + open_manhole_count
print(f"Total number of images: {total_images}")



# data argumentation

## Method 1

In [None]:
import os
import cv2
import albumentations as A
from tqdm import tqdm

# Paths for images and labels
images_path = 'archive/dataset/dataset/classes/cracks/images'
labels_path = 'archive/dataset/dataset/classes/cracks/labels/txt'
output_images_path = 'archive/dataset/dataset/classes/cracks/aug_date/images'
output_labels_path = 'archive/dataset/dataset/classes/cracks/aug_date/labels'

# Ensure output directories exist
os.makedirs(output_images_path, exist_ok=True)
os.makedirs(output_labels_path, exist_ok=True)

# Transformation pipeline with flipping
augment = A.Compose(
    [
        A.Rotate(limit=30, border_mode=0, p=1.0),               # Rotate within ±30 degrees
        A.RandomBrightnessContrast(brightness_limit=0.3,
                                    contrast_limit=0.3, p=1.0), # Adjust brightness
        A.HorizontalFlip(p=0.5),                                # Random horizontal flip
        A.VerticalFlip(p=0.5),                                  # Random vertical flip
    ],
    bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids'])
)

# 只抓 .jpg/.png 跟 .txt
image_files = sorted([f for f in os.listdir(images_path)
                      if f.lower().endswith(('.jpg', '.png', '.jpeg'))])
label_files  = sorted([f for f in os.listdir(labels_path)
                      if f.lower().endswith('.txt')])

# Initial image count
current_image_count = len(image_files)
target_image_count = 500
new_image_id = current_image_count + 1

# Loop through existing images and augment
for image_file, label_file in tqdm(zip(image_files, label_files), total=len(image_files)):
    if current_image_count >= target_image_count:
        break

    # Load image and corresponding label
    image_path = os.path.join(images_path, image_file)
    label_path = os.path.join(labels_path, label_file)
    image = cv2.imread(image_path)

    # Load bounding boxes from YOLO label file
    with open(label_path, 'r') as f:
        lines = f.readlines()
    bboxes = []
    category_ids = []
    for line in lines:
        class_id, x_center, y_center, width, height = map(float, line.split())
        bboxes.append([x_center, y_center, width, height])
        category_ids.append(int(class_id))

    # Apply augmentation
    augmented = augment(image=image, bboxes=bboxes, category_ids=category_ids)

    # Save augmented image
    augmented_image_path = os.path.join(output_images_path, f'{new_image_id:04}.jpg')
    cv2.imwrite(augmented_image_path, augmented['image'])

    # Save augmented labels
    augmented_label_path = os.path.join(output_labels_path, f'{new_image_id:04}.txt')
    with open(augmented_label_path, 'w') as f:
        for bbox, category_id in zip(augmented['bboxes'], augmented['category_ids']):
            f.write(f"{category_id} {' '.join(map(str, bbox))}\n")

    # Update counters
    new_image_id += 1
    current_image_count += 1

print(f"Augmentation completed! Total images: {current_image_count}")


## Method 2

In [None]:
import os
import cv2
import albumentations as A
from tqdm import tqdm

# Paths for images and labels
images_path = 'archive/dataset/dataset/classes/cracks/images'
labels_path = 'archive/dataset/dataset/classes/cracks/labels'
output_images_path = 'archive/dataset/dataset/classes/cracks/aug_date/images'
output_labels_path = 'archive/dataset/dataset/classes/cracks/aug_date/labels'

# Ensure output directories exist
os.makedirs(output_images_path, exist_ok=True)
os.makedirs(output_labels_path, exist_ok=True)  # Only need to create the directories once

# Transformation pipeline without any augmentation (we'll apply them manually)
rotate_augment = A.Compose([A.Rotate(limit=30, border_mode=0, p=1.0)], bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids']))
horizontal_flip_augment = A.Compose([A.HorizontalFlip(p=1.0)], bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids']))
vertical_flip_augment = A.Compose([A.VerticalFlip(p=1.0)], bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids']))
brightness_contrast_augment = A.Compose([A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, p=1.0)], bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids']))

# Get list of all images and labels
# 只抓 .jpg/.png 跟 .txt
image_files = sorted([f for f in os.listdir(images_path)
                      if f.lower().endswith(('.jpg', '.png', '.jpeg'))])
label_files  = sorted([f for f in os.listdir(labels_path)
                      if f.lower().endswith('.txt')])

# Initial image count
current_image_count = 667  # Start from 667 (your current count)
max_image_count = 1000
new_image_id = current_image_count

# Loop through existing images and augment
for image_file, label_file in tqdm(zip(image_files, label_files), total=len(image_files)):
    if new_image_id > max_image_count:
        break

    # Load image and corresponding label
    image_path = os.path.join(images_path, image_file)
    label_path = os.path.join(labels_path, label_file)
    image = cv2.imread(image_path)

    # Load bounding boxes from YOLO label file
    with open(label_path, 'r') as f:
        lines = f.readlines()
    bboxes = []
    category_ids = []
    for line in lines:
        class_id, x_center, y_center, width, height = map(float, line.split())
        bboxes.append([x_center, y_center, width, height])
        category_ids.append(int(class_id))

    # Apply augmentations separately
    aug_image = image.copy()

    # Apply rotation
    augmented_rotate = rotate_augment(image=aug_image, bboxes=bboxes, category_ids=category_ids)
    augmented_image_path = os.path.join(output_images_path, f'{new_image_id:04}.jpg')
    cv2.imwrite(augmented_image_path, augmented_rotate['image'])

    # Apply horizontal flip
    augmented_flip = horizontal_flip_augment(image=aug_image, bboxes=bboxes, category_ids=category_ids)
    augmented_image_path = os.path.join(output_images_path, f'{new_image_id:04}.jpg')
    cv2.imwrite(augmented_image_path, augmented_flip['image'])

    # Apply vertical flip
    augmented_vertical_flip = vertical_flip_augment(image=aug_image, bboxes=bboxes, category_ids=category_ids)
    augmented_image_path = os.path.join(output_images_path, f'{new_image_id:04}.jpg')
    cv2.imwrite(augmented_image_path, augmented_vertical_flip['image'])

    # Apply brightness and contrast
    augmented_brightness = brightness_contrast_augment(image=aug_image, bboxes=bboxes, category_ids=category_ids)
    augmented_image_path = os.path.join(output_images_path, f'{new_image_id:04}.jpg')
    cv2.imwrite(augmented_image_path, augmented_brightness['image'])

    # Save augmented labels
    augmented_label_path = os.path.join(output_labels_path, f'{new_image_id:04}.txt')
    with open(augmented_label_path, 'w') as f:
        for bbox, category_id in zip(augmented_rotate['bboxes'], augmented_rotate['category_ids']):
            f.write(f"{category_id} {' '.join(map(str, bbox))}\n")

    augmented_label_path = os.path.join(output_labels_path, f'{new_image_id:04}.txt')
    with open(augmented_label_path, 'w') as f:
        for bbox, category_id in zip(augmented_flip['bboxes'], augmented_flip['category_ids']):
            f.write(f"{category_id} {' '.join(map(str, bbox))}\n")

    augmented_label_path = os.path.join(output_labels_path, f'{new_image_id:04}.txt')
    with open(augmented_label_path, 'w') as f:
        for bbox, category_id in zip(augmented_vertical_flip['bboxes'], augmented_vertical_flip['category_ids']):
            f.write(f"{category_id} {' '.join(map(str, bbox))}\n")

    augmented_label_path = os.path.join(output_labels_path, f'{new_image_id:04}.txt')
    with open(augmented_label_path, 'w') as f:
        for bbox, category_id in zip(augmented_brightness['bboxes'], augmented_brightness['category_ids']):
            f.write(f"{category_id} {' '.join(map(str, bbox))}\n")

    # Update counters
    new_image_id += 1

print(f"Augmentation completed! Total images: {new_image_id - 667}")


## Method 3

In [None]:
import os
import cv2
import albumentations as A
from tqdm import tqdm

# Paths for images and labels
images_path = 'archive/dataset/dataset/classes/cracks/images'
labels_path = 'archive/dataset/dataset/classes/cracks/labels'
output_images_path = 'archive/dataset/dataset/classes/cracks/aug_date/images'
output_labels_path = 'archive/dataset/dataset/classes/cracks/aug_date/labels'

# Ensure output directories exist
os.makedirs(output_images_path, exist_ok=True)
os.makedirs(output_labels_path, exist_ok=True)

# Transformation pipeline with flipping
augment = A.Compose(
    [
        A.Rotate(limit=30, border_mode=0, p=1.0),               # Rotate within ±30 degrees
        A.RandomBrightnessContrast(brightness_limit=0.3,
                                    contrast_limit=0.3, p=1.0), # Adjust brightness
        A.HorizontalFlip(p=0.5),                                # Random horizontal flip
        A.VerticalFlip(p=0.5),                                  # Random vertical flip
    ],
    bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids'])
)

# Get list of all images and labels
# 只抓 .jpg/.png 跟 .txt
image_files = sorted([f for f in os.listdir(images_path)
                      if f.lower().endswith(('.jpg', '.png', '.jpeg'))])
label_files  = sorted([f for f in os.listdir(labels_path)
                      if f.lower().endswith('.txt')])

# Initial image count (starting from 0833)
current_image_count = 832  # Your last image is 0832
new_image_id = 833  # Start the new numbering from 0833

# Loop through existing images and augment
for image_file, label_file in tqdm(zip(image_files, label_files), total=len(image_files)):

    # Load image and corresponding label
    image_path = os.path.join(images_path, image_file)
    label_path = os.path.join(labels_path, label_file)
    image = cv2.imread(image_path)

    # Load bounding boxes from YOLO label file
    with open(label_path, 'r') as f:
        lines = f.readlines()
    bboxes = []
    category_ids = []
    for line in lines:
        class_id, x_center, y_center, width, height = map(float, line.split())
        bboxes.append([x_center, y_center, width, height])
        category_ids.append(int(class_id))

    # Apply augmentation
    augmented = augment(image=image, bboxes=bboxes, category_ids=category_ids)

    # Save augmented image with numeric naming convention
    augmented_image_path = os.path.join(output_images_path, f'{new_image_id:04}.jpg')
    cv2.imwrite(augmented_image_path, augmented['image'])

    # Save augmented labels with numeric naming convention
    augmented_label_path = os.path.join(output_labels_path, f'{new_image_id:04}.txt')
    with open(augmented_label_path, 'w') as f:
        for bbox, category_id in zip(augmented['bboxes'], augmented['category_ids']):
            f.write(f"{category_id} {' '.join(map(str, bbox))}\n")

    # Update counters
    new_image_id += 1

    if new_image_id > 1000:  # Stop once the image count reaches 1000
        break

print(f"Augmentation completed! Total images: {new_image_id - 833}")


# Useless

In [None]:
%cd /content/drive/MyDrive/data

In [None]:
!ls

# Model Trainning YOLOv8s **(don't touch it)** (bullshit)

## Results are fetched from dataset, Model created pictures

### stupid pictures

In [None]:
!ls /content/drive/MyDrive/data/runs/detect/train3

In [None]:
Image(filename='/content/drive/MyDrive/data/runs/detect/train3/results.png', width=600)

In [None]:
Image(filename='/content/drive/MyDrive/data/runs/detect/train3/F1_curve.png', width=600)

In [None]:
Image(filename='/content/drive/MyDrive/data/runs/detect/train/P_curve.png', width=600)

In [None]:
Image(filename='/content/drive/MyDrive/data/runs/detect/train3/confusion_matrix.png', width=600)

### command line validation

In [None]:
!yolo task=detect mode=val model=/content/drive/MyDrive/data/runs/detect/train/weights/best.pt data=data.yaml

In [None]:
!yolo task=detect mode=predict model=/content/drive/MyDrive/data/runs/detect/train/weights/best.pt conf=0.25 source=archive/dataset/dataset/test/images


Test data's

In [None]:
import os
import cv2
import matplotlib.pyplot as plt

# Path to the folder where YOLO saved the results
results_path = "runs/detect/predict"

# Get the list of predicted images
predicted_images = sorted(
    [os.path.join(results_path, img) for img in os.listdir(results_path) if img.endswith(('.jpg', '.png'))]
)

# Set up the plot (1 row and as many columns as needed)
fig, axes = plt.subplots(1, len(predicted_images), figsize=(20, 5))

# Loop through the images and display them
for i, image_path in enumerate(predicted_images):
    # Load the image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB for display

    # Display the image in the row
    axes[i].imshow(image)
    axes[i].axis('off')  # Hide axes
    axes[i].set_title(f"Image {i+1}")

plt.tight_layout()
plt.show()


### **Result Show:**

In [None]:
import matplotlib.pyplot as plt
from PIL import Image
from ultralytics import YOLO
from google.colab import files

# Upload the image from your local machine
uploaded = files.upload()  # This will prompt you to upload an image

# Get the uploaded image file path
image_path = list(uploaded.keys())[0]

# Load the trained YOLOv8 model
model = YOLO('/content/drive/MyDrive/data/runs/detect/train/weights/best.pt')

# Perform prediction on the uploaded image
results = model(image_path)

# Extract the image with bounding boxes
img_with_boxes = results[0].plot()  # Plot the image with boxes around detected objects

# Display the image with bounding boxes using matplotlib
plt.imshow(img_with_boxes)
plt.axis('off')  # Turn off axis
plt.show()


### glob

In [None]:
import glob
from IPython.display import Image, display

# Define the path to the prediction results
predicted_images_path = 'runs/detect/predict'  # This is the default output folder

# List all images with .jpg and .jpeg extensions in the prediction folder
predicted_images = glob.glob(predicted_images_path + '/*.jpg') + glob.glob(predicted_images_path + '/*.jpeg')

# Display each detected image with a smaller size
for img_path in predicted_images:
    print(f"Detected image: {img_path}")
    display(Image(filename=img_path, width=300))  # Set width to 300px (adjust as needed)

# YOLO (useless)

In [None]:
import os

# Define paths for your train and valid folders
base_dir = 'archive'
train_path = os.path.join(base_dir, 'dataset', 'dataset', 'train', 'images')
valid_path = os.path.join(base_dir, 'dataset', 'dataset', 'valid', 'images')

# Count the number of files in each folder
train_files = len(os.listdir(train_path))
valid_files = len(os.listdir(valid_path))

# Calculate the total number of files
total_files = train_files + valid_files

# Calculate percentages
train_percentage = (train_files / total_files) * 100
valid_percentage = (valid_files / total_files) * 100

# Print results
print(f"Training Data: {train_files} images ({train_percentage:.2f}%)")
print(f"Validation Data: {valid_files} images ({valid_percentage:.2f}%)")


In [None]:
!yolo task=detect mode=predict model=/content/drive/MyDrive/data/runs/detect/train9/weights/best.pt conf=0.25 source=/content/drive/MyDrive/data/dataset/test/images


In [None]:
import os
import cv2
import matplotlib.pyplot as plt

# Path to the folder where YOLO saved the results
results_path = "runs/detect/predict"

# Get the list of predicted images
predicted_images = sorted(
    [os.path.join(results_path, img) for img in os.listdir(results_path) if img.endswith(('.jpg', '.png'))]
)

# Set up the plot (1 row and as many columns as needed)
fig, axes = plt.subplots(1, len(predicted_images), figsize=(20, 5))

# Loop through the images and display them
for i, image_path in enumerate(predicted_images):
    # Load the image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB for display

    # Display the image in the row
    axes[i].imshow(image)
    axes[i].axis('off')  # Hide axes
    axes[i].set_title(f"Image {i+1}")

plt.tight_layout()
plt.show()


In [None]:
from ultralytics import YOLO
import cv2
from IPython.display import Video  # For displaying the video in Colab

# Load your YOLO model
model = YOLO('/content/drive/MyDrive/data/runs/detect/train/weights/best.pt')

# Path to your input video
video_path = '/content/drive/MyDrive/data/dataset/test_video/testvideo1.mp4'  # Replace with your video file path
output_path = '/content/drive/MyDrive/data/predicted/train9_video1.mp4'

# Open video
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Define video writer to save the annotated video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Process the video frame by frame
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Perform detection
    results = model(frame)

    # Annotate the frame
    annotated_frame = results[0].plot()

    # Write the annotated frame to the output video
    out.write(annotated_frame)

# Release resources
cap.release()
out.release()

# Display the processed video in Colab
print(f"Processed video saved at: {output_path}")
Video(output_path, embed=True)

In [None]:
from ultralytics import YOLO
import cv2
from IPython.display import Video  # For displaying the video in Colab

# Load your YOLO model
model = YOLO('/content/drive/MyDrive/data/runs/detect/train20/weights/best.pt')

# Path to your input video
video_path = '/content/drive/MyDrive/data/dataset/test_video/testvideo1.mp4'  # Replace with your video file path
output_path = '/content/drive/MyDrive/data/predicted/train20_video1.mp4'

# Open video
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Define video writer to save the annotated video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Process the video frame by frame
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Perform detection
    results = model(frame)

    # Annotate the frame
    annotated_frame = results[0].plot()

    # Write the annotated frame to the output video
    out.write(annotated_frame)

# Release resources
cap.release()
out.release()

# Display the processed video in Colab
print(f"Processed video saved at: {output_path}")
Video(output_path, embed=True)

In [None]:
import os
import cv2
import matplotlib.pyplot as plt

# Path to the folder where YOLO saved the results
results_path = "runs/detect/predict7"

# Get the list of predicted images
predicted_images = sorted(
    [os.path.join(results_path, img) for img in os.listdir(results_path) if img.endswith(('.jpg', '.png'))]
)

# Set the number of rows and columns
rows, columns = 4, 5  # 4 rows and 5 columns for 20 images

# Adjust the figure size for better visibility
fig, axes = plt.subplots(rows, columns, figsize=(20, 15))

# Loop through the images and display them
for i, image_path in enumerate(predicted_images):
    # Load the image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB for display

    # Get the corresponding subplot
    ax = axes[i // columns, i % columns]
    ax.imshow(image)
    ax.axis('off')  # Hide axes
    ax.set_title(f"Image {i+1}")

# Hide any unused subplots
for i in range(len(predicted_images), rows * columns):
    axes[i // columns, i % columns].axis('off')

plt.tight_layout()
plt.show()


# ~

In [None]:
import os
import cv2
import albumentations as A
from tqdm import tqdm

# Paths for images and labels
images_path = '/content/drive/MyDrive/data/dataset/classes/cracks/images'
labels_path = '/content/drive/MyDrive/data/dataset/classes/cracks/labels/txt'
output_images_path = '/content/drive/MyDrive/data/dataset/classes/cracks/augmented/images'
output_labels_path = '/content/drive/MyDrive/data/dataset/classes/cracks/augmented/labels'

# Ensure output directories exist
os.makedirs(output_images_path, exist_ok=True)
os.makedirs(output_labels_path, exist_ok=True)  # Ensure the output labels directory exists

# Transformation pipeline without any augmentation (we'll apply them manually)
rotate_augment = A.Compose([A.Rotate(limit=30, border_mode=0, p=1.0)], bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids']))
horizontal_flip_augment = A.Compose([A.HorizontalFlip(p=1.0)], bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids']))
vertical_flip_augment = A.Compose([A.VerticalFlip(p=1.0)], bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids']))
brightness_contrast_augment = A.Compose([A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, p=1.0)], bbox_params=A.BboxParams(format='yolo', label_fields=['category_ids']))

# Get list of all images and labels
image_files = sorted(os.listdir(images_path))
label_files = sorted(os.listdir(labels_path))

# Loop through existing images and augment
for image_file, label_file in tqdm(zip(image_files, label_files), total=len(image_files)):
    # Load image and corresponding label
    image_path = os.path.join(images_path, image_file)
    label_path = os.path.join(labels_path, label_file)
    image = cv2.imread(image_path)

    # Load bounding boxes from YOLO label file
    with open(label_path, 'r') as f:
        lines = f.readlines()
    bboxes = []
    category_ids = []
    for line in lines:
        class_id, x_center, y_center, width, height = map(float, line.split())
        bboxes.append([x_center, y_center, width, height])
        category_ids.append(int(class_id))  # Ensure class_id is an integer

    # Apply augmentations separately
    aug_image = image.copy()

    # Apply rotation
    augmented_rotate = rotate_augment(image=aug_image, bboxes=bboxes, category_ids=category_ids)
    base_name = os.path.splitext(image_file)[0]
    augmented_image_path = os.path.join(output_images_path, f'{base_name}_aug_rotate.jpg')
    cv2.imwrite(augmented_image_path, augmented_rotate['image'])

    # Save augmented label for rotated image
    augmented_label_path = os.path.join(output_labels_path, f'{base_name}_aug_rotate.txt')
    os.makedirs(os.path.dirname(augmented_label_path), exist_ok=True)  # Ensure directory exists
    with open(augmented_label_path, 'w') as f:
        for bbox, category_id in zip(augmented_rotate['bboxes'], augmented_rotate['category_ids']):
            f.write(f"{int(category_id)} {' '.join(map(str, bbox))}\n")  # Ensure class_id is saved as an integer

    # Apply horizontal flip
    augmented_flip = horizontal_flip_augment(image=aug_image, bboxes=bboxes, category_ids=category_ids)
    augmented_image_path = os.path.join(output_images_path, f'{base_name}_aug_flip.jpg')
    cv2.imwrite(augmented_image_path, augmented_flip['image'])

    # Save augmented label for horizontal flip
    augmented_label_path = os.path.join(output_labels_path, f'{base_name}_aug_flip.txt')
    os.makedirs(os.path.dirname(augmented_label_path), exist_ok=True)  # Ensure directory exists
    with open(augmented_label_path, 'w') as f:
        for bbox, category_id in zip(augmented_flip['bboxes'], augmented_flip['category_ids']):
            f.write(f"{int(category_id)} {' '.join(map(str, bbox))}\n")  # Ensure class_id is saved as an integer

    # Apply vertical flip
    augmented_vertical_flip = vertical_flip_augment(image=aug_image, bboxes=bboxes, category_ids=category_ids)
    augmented_image_path = os.path.join(output_images_path, f'{base_name}_aug_vertical_flip.jpg')
    cv2.imwrite(augmented_image_path, augmented_vertical_flip['image'])

    # Save augmented label for vertical flip
    augmented_label_path = os.path.join(output_labels_path, f'{base_name}_aug_vertical_flip.txt')
    os.makedirs(os.path.dirname(augmented_label_path), exist_ok=True)  # Ensure directory exists
    with open(augmented_label_path, 'w') as f:
        for bbox, category_id in zip(augmented_vertical_flip['bboxes'], augmented_vertical_flip['category_ids']):
            f.write(f"{int(category_id)} {' '.join(map(str, bbox))}\n")  # Ensure class_id is saved as an integer

    # Apply brightness and contrast
    augmented_brightness = brightness_contrast_augment(image=aug_image, bboxes=bboxes, category_ids=category_ids)
    augmented_image_path = os.path.join(output_images_path, f'{base_name}_aug_brightness_contrast.jpg')
    cv2.imwrite(augmented_image_path, augmented_brightness['image'])

    # Save augmented label for brightness and contrast
    augmented_label_path = os.path.join(output_labels_path, f'{base_name}_aug_brightness_contrast.txt')
    os.makedirs(os.path.dirname(augmented_label_path), exist_ok=True)  # Ensure directory exists
    with open(augmented_label_path, 'w') as f:
        for bbox, category_id in zip(augmented_brightness['bboxes'], augmented_brightness['category_ids']):
            f.write(f"{int(category_id)} {' '.join(map(str, bbox))}\n")  # Ensure class_id is saved as an integer

print(f"Augmentation completed!")


first augmentaion

In [None]:
import os

# Define directories for images and labels
image_dir = '/content/drive/MyDrive/data/dataset/train/images'
txt_dir = '/content/drive/MyDrive/data/dataset/train/labels'

# Count number of image files and label files
image_files = [f for f in os.listdir(image_dir) if os.path.isfile(os.path.join(image_dir, f))]
label_files = [f for f in os.listdir(txt_dir) if os.path.isfile(os.path.join(txt_dir, f))]

# Print the counts
print(f"Number of image files: {len(image_files)}")
print(f"Number of label files: {len(label_files)}")


In [None]:
!nvcc --version

In [None]:
from ultralytics import YOLO

# Load the YOLOv8 model
model = YOLO('yolov8s.pt')  # Pre-trained YOLOv8 small model

# Train the model with increased image size
model.train(
    data="C:/Users/Ahsan/Downloads/data/data.yaml",  # Path to data.yaml
    epochs=25,
    imgsz=640,  # Increased image size
    plots=True
)


In [None]:
!yolo task=detect mode=predict model=/content/drive/MyDrive/data/runs/detect/train20/weights/best.pt conf=0.25 source=/content/drive/MyDrive/data/dataset/test/images


In [None]:
import os
import cv2
import matplotlib.pyplot as plt

# Path to the folder where YOLO saved the results
results_path = "runs/detect/predict8"

# Get the list of predicted images
predicted_images = sorted(
    [os.path.join(results_path, img) for img in os.listdir(results_path) if img.endswith(('.jpg', '.png'))]
)

# Set the number of rows and columns
rows, columns = 4, 5  # 4 rows and 5 columns for 20 images

# Adjust the figure size for better visibility
fig, axes = plt.subplots(rows, columns, figsize=(20, 15))

# Loop through the images and display them
for i, image_path in enumerate(predicted_images):
    # Load the image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB for display

    # Get the corresponding subplot
    ax = axes[i // columns, i % columns]
    ax.imshow(image)
    ax.axis('off')  # Hide axes
    ax.set_title(f"Image {i+1}")

# Hide any unused subplots
for i in range(len(predicted_images), rows * columns):
    axes[i // columns, i % columns].axis('off')

plt.tight_layout()
plt.show()


In [None]:
pip install -U ultralytics

In [None]:
import os
import json
import cv2
import numpy as np

# Class names from your data.yaml
class_names = ["pothole", "cracks", "open_manhole"]

# Function to convert YOLO annotations to COCO format
def convert_yolo_to_coco(image_dir, label_dir, output_json):
    coco_images = []
    coco_annotations = []
    coco_categories = [{"id": i+1, "name": class_names[i]} for i in range(len(class_names))]
    annotation_id = 1  # Start annotation ID from 1

    # Process each image in the directory
    for image_filename in os.listdir(image_dir):
        if image_filename.endswith(".jpg"):  # Process only jpg images
            image_path = os.path.join(image_dir, image_filename)
            label_path = os.path.join(label_dir, os.path.splitext(image_filename)[0] + ".txt")

            if not os.path.exists(label_path):  # Skip if no label file exists for the image
                continue

            # Get image info for COCO format
            image = cv2.imread(image_path)
            height, width, _ = image.shape
            image_id = len(coco_images) + 1

            coco_images.append({
                "id": image_id,
                "file_name": image_filename,
                "width": width,
                "height": height
            })

            # Process each line in the label file
            with open(label_path, "r") as label_file:
                for line in label_file:
                    # Parse YOLO annotation
                    class_id, x_center, y_center, w, h = map(float, line.strip().split())

                    # Convert from normalized to pixel coordinates
                    x_min = (x_center - w / 2) * width
                    y_min = (y_center - h / 2) * height
                    x_max = (x_center + w / 2) * width
                    y_max = (y_center + h / 2) * height

                    # Create COCO annotation (add 1 to class_id for COCO format)
                    coco_annotation = {
                        "segmentation": [],  # Add segmentation data if available
                        "area": (x_max - x_min) * (y_max - y_min),  # Area of the bounding box
                        "iscrowd": 0,
                        "image_id": image_id,
                        "bbox": [x_min, y_min, x_max - x_min, y_max - y_min],
                        "category_id": int(class_id) + 1,  # COCO category ID starts at 1
                        "id": annotation_id
                    }
                    coco_annotations.append(coco_annotation)
                    annotation_id += 1

    # Create final COCO dataset
    coco_data = {
        "images": coco_images,
        "annotations": coco_annotations,
        "categories": coco_categories
    }

    # Save to JSON file
    with open(output_json, "w") as output_file:
        json.dump(coco_data, output_file, indent=4)

# Directories for images and labels
image_dir = '/content/drive/MyDrive/data/dataset/valid/images'
label_dir = '/content/drive/MyDrive/data/dataset/valid/labels'

# Output JSON file for COCO annotations
output_json = '/content/drive/MyDrive/data/valid_annotations.json'

# Convert YOLO to COCO
convert_yolo_to_coco(image_dir, label_dir, output_json)


In [None]:
import json
import matplotlib.pyplot as plt
import cv2
import random
from pycocotools.coco import COCO

# Path to your COCO annotations JSON file
coco_annotation_file = 'F:/data/valid_annotations1.json'


# Initialize COCO API
coco = COCO(coco_annotation_file)

# Check the categories (class names)
categories = coco.loadCats(coco.getCatIds())
category_names = [category['name'] for category in categories]
print(f"Categories: {category_names}")

# Define fixed colors for each category
category_colors = {
    "pothole": (255, 0, 0),        # Red for potholes
    "cracks": (0, 255, 0),         # Green for cracks
    "open_manhole": (0, 0, 255)    # Blue for open manholes
}

# Load image IDs
image_ids = coco.getImgIds()  # Get all image IDs
num_images_to_show = 30  # Number of random images to display

# Shuffle the list of image IDs to display random images
random.shuffle(image_ids)
images_to_display = image_ids[:num_images_to_show]  # Select the first 30 random images

# Set up the matplotlib figure to show multiple images
plt.figure(figsize=(15, 10))

for i, image_id in enumerate(images_to_display):
    # Load the image
    image_info = coco.loadImgs(image_id)[0]
    image_path = 'F:/data/dataset/valid/images/' + image_info['file_name']
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Load the annotations for this image
    annotation_ids = coco.getAnnIds(imgIds=image_id)
    annotations = coco.loadAnns(annotation_ids)

    # Draw the bounding boxes on the image
    for ann in annotations:
        bbox = ann['bbox']  # The bounding box: [x, y, width, height]
        category_id = ann['category_id']

        # Adjust for YOLO category indexing (starts from 0 in YOLO, but starts from 1 in COCO)
        if category_id > 0:
            category_name = category_names[category_id - 1]
        else:
            category_name = category_names[category_id]  # Since YOLO starts from 0

        # Get the fixed color for this category
        color = category_colors[category_name]

        # Draw bounding box and label with the assigned color
        x, y, w, h = bbox
        cv2.rectangle(image, (int(x), int(y)), (int(x + w), int(y + h)), color, 2)
        cv2.putText(image, category_name, (int(x), int(y)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    # Display the image with its bounding boxes
    plt.subplot(6, 5, i + 1)  # 6 rows, 5 columns of images
    plt.imshow(image)
    plt.axis('off')
    plt.title(f"Image {i+1}")

# Show the plot with random images
plt.tight_layout()
plt.show()


# COCO object detection (Torch)

In [None]:
!pip install torch torchvision pycocotools tqdm

In [2]:
import torch
import torchvision
from torch.utils.data import DataLoader
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F
from pycocotools.coco import COCO
import os
from PIL import Image
from tqdm import tqdm

class COCODataset(torch.utils.data.Dataset):
    def __init__(self, annotation_file, image_dir, transforms=None):
        self.coco = COCO(annotation_file)
        self.image_dir = image_dir
        self.transforms = transforms
        self.image_ids = list(self.coco.imgs.keys())

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        image_id = self.image_ids[idx]
        annotations = self.coco.loadAnns(self.coco.getAnnIds(imgIds=image_id))
        image_info = self.coco.loadImgs(image_id)[0]

        # Load image
        image_path = os.path.join(self.image_dir, image_info["file_name"])
        image = Image.open(image_path).convert("RGB")

        # Process annotations
        boxes = []
        labels = []
        for ann in annotations:
            x, y, width, height = ann["bbox"]
            if width > 0 and height > 0:  # Only add valid boxes
                boxes.append([x, y, x + width, y + height])
                labels.append(ann["category_id"])

        # Convert to tensor
        if len(boxes) == 0:  # Handle no annotations
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([image_id])
        }

        if self.transforms:
            image = self.transforms(image)

        return image, target


### useless

In [None]:
from torch.utils.data import DataLoader

# Create datasets for train and validation
train_annotation_file = 'archive/train_annotations1.json'
val_annotation_file = 'archive/valid_annotations1.json'
train_image_dir = 'archive/dataset/dataset/train/images'
val_image_dir = 'archive/dataset/dataset/valid/images'

train_dataset = COCODataset(train_annotation_file, train_image_dir)
val_dataset = COCODataset(val_annotation_file, val_image_dir)

# Create DataLoader for both train and validation datasets
train_data_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
val_data_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))


### useless/>

In [3]:
from torch.utils.data import DataLoader

# Define paths
train_annotation_file = 'archive/train_annotations1.json'
val_annotation_file = 'archive/valid_annotations1.json'
train_image_dir = 'archive/dataset/dataset/train/images'
val_image_dir = 'archive/dataset/dataset/valid/images'

# Define transformations (Optional, can be expanded as needed)
def get_transform():
    return torchvision.transforms.Compose([
        torchvision.transforms.ToTensor()
    ])

# Create datasets
train_dataset = COCODataset(train_annotation_file, train_image_dir, transforms=get_transform())
val_dataset = COCODataset(val_annotation_file, val_image_dir, transforms=get_transform())

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=2, collate_fn=lambda x: tuple(zip(*x)))
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, num_workers=2, collate_fn=lambda x: tuple(zip(*x)))


loading annotations into memory...
Done (t=0.02s)
creating index...
index created!
loading annotations into memory...
Done (t=0.00s)
creating index...
index created!


In [4]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
# Load a pre-trained Faster R-CNN model
weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
model = fasterrcnn_resnet50_fpn(weights=weights)
num_classes = 4  # 3 classes (pothole, cracks, open_manhole) + 1 background
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Using device: {device}")


Using device: cuda


In [5]:
import os
import json
from PIL import Image

def validate_data(image_dir, annotation_file):
    # Load annotations from the JSON file
    with open(annotation_file) as f:
        annotations = json.load(f)

    # Get image file names from the annotation data
    image_files = {image['file_name'] for image in annotations['images']}

    # Check if image files exist
    missing_images = []
    for image_file in image_files:
        if not os.path.exists(os.path.join(image_dir, image_file)):
            missing_images.append(image_file)

    if missing_images:
        print(f"Missing images: {missing_images}")
        return False

    # Check if all bounding boxes are valid
    invalid_bboxes = []
    for annotation in annotations['annotations']:
        x, y, w, h = annotation['bbox']
        if w <= 0 or h <= 0:
            invalid_bboxes.append(annotation['id'])

    if invalid_bboxes:
        print(f"Invalid bounding boxes for annotations: {invalid_bboxes}")
        return False

    print("Data validation successful!")
    return True

# Define paths for your data
train_annotation_file = 'archive/train_annotations1.json'
val_annotation_file = 'archive/valid_annotations1.json'
train_image_dir = 'archive/dataset/dataset/train/images'
val_image_dir = 'archive/dataset/dataset/valid/images'

# Validate train and validation data
if not validate_data(train_image_dir, train_annotation_file):
    print("Training data validation failed. Please check your images and annotations.")
elif not validate_data(val_image_dir, val_annotation_file):
    print("Validation data validation failed. Please check your images and annotations.")
else:
    print("Starting training...")
    # Proceed with training after successful data validation
    model.train()  # Make sure your model is in training mode


Data validation successful!
Data validation successful!
Starting training...


In [6]:
def train_one_epoch(model, optimizer, data_loader, device):
    model.train()
    epoch_loss = 0
    for images, targets in tqdm(data_loader, desc="Training"):
        images = [image.to(device) for image in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        optimizer.zero_grad()

        # Forward pass
        loss_dict = model(images, targets)

        # Sum all losses in the loss_dict
        if isinstance(loss_dict, list):
            losses = sum(loss for loss in loss_dict)  # Don't use .item(), keep it as a tensor
        else:
            losses = sum(loss for loss in loss_dict.values())  # Same here

        losses.backward()  # Now losses is a tensor, so backward() can be called
        optimizer.step()  # Update the weights
        epoch_loss += losses.item()  # Add the loss to the epoch's loss (convert to float here for reporting)

    avg_epoch_loss = epoch_loss / len(data_loader)
    return avg_epoch_loss

def validate_one_epoch(model, data_loader, device):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():  # No gradients needed for validation
        for images, targets in data_loader:
            images = [image.to(device) for image in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # Forward pass
            loss_dict = model(images, targets)

            # Sum all losses in the loss_dict
            if isinstance(loss_dict, list):
                losses = sum(loss for loss in loss_dict)  # Don't use .item(), keep it as a tensor
            else:
                losses = sum(loss for loss in loss_dict.values())  # Same here

            epoch_loss += losses.item()  # Add the loss to the epoch's loss (convert to float here for reporting)

    avg_loss = epoch_loss / len(data_loader)
    return avg_loss


In [7]:
optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

In [None]:
# check if using GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Using device: {device}")

num_epochs = 10
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")

    train_loss = train_one_epoch(model, optimizer, train_loader, device)
    val_loss = validate_one_epoch(model, val_loader, device)

    lr_scheduler.step()

    print(f"Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

    # Save model checkpoint
    torch.save(model.state_dict(), f"faster_rcnn_epoch_{epoch + 1}.pth")
    print(f"Model saved for epoch {epoch + 1}")

Using device: cuda
Epoch 1/10


Training:   0%|          | 0/559 [00:00<?, ?it/s]

# yolo

In [None]:
from ultralytics import YOLO

# Load a model
model = YOLO("yolo11n.pt")

# Train the model
train_results = model.train(
    data="/content/drive/MyDrive/data/data.yaml",  # path to dataset YAML
    epochs=25,  # number of training epochs
    imgsz=224,  # training image size
    device="cpu",  # device to run on, i.e. device=0 or device=0,1,2,3 or device=cpu
)

# Evaluate model performance on the validation set
metrics = model.val()

# Perform object detection on an image
results = model("/content/drive/MyDrive/data/dataset/test/images/images (1).jpeg")
results[0].show()

# Export the model to ONNX format
path = model.export(format="onnx")  # return path to exported model

In [None]:
from ultralytics import YOLO

# Load a model
model = YOLO("yolov8s.pt")

# Train the model
train_results = model.train(
    data="F:/data/data.yaml",  # path to dataset YAML file
    epochs=25,  # number of training epochs
    imgsz=416,  # reduce image size from 224 to 416 or smaller
    device="cuda",  # device to run on, use "cuda" if you want to use GPU
)


# Evaluate model performance on the validation set
metrics = model.val()

# Perform object detection on an image (make sure the path is correct)
results = model("F:/data/dataset/test/images/images (1).jpeg")  # Adjust the path
results[0].show()  # Show the results

# Export the model to ONNX format
path = model.export(format="onnx")  # return path to exported model


In [None]:
!yolo task=detect mode=predict model=C:/Users/Ahsan/runs/detect/train25/weights/best.pt conf=0.25 source=F:/data/dataset/test/images


In [None]:
import os
import cv2
import matplotlib.pyplot as plt

# Path to the folder where YOLO saved the results
results_path = "runs/detect/predict6"

# Get the list of predicted images
predicted_images = sorted(
    [os.path.join(results_path, img) for img in os.listdir(results_path) if img.endswith(('.jpg', '.png'))]
)

# Set up the plot (1 row and as many columns as needed)
fig, axes = plt.subplots(1, len(predicted_images), figsize=(20, 5))

# Loop through the images and display them
for i, image_path in enumerate(predicted_images):
    # Load the image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB for display

    # Display the image in the row
    axes[i].imshow(image)
    axes[i].axis('off')  # Hide axes
    axes[i].set_title(f"Image {i+1}")

plt.tight_layout()
plt.show()


In [None]:
import torch
from pycocotools.coco import COCO
import os
from PIL import Image

# Define the Dataset class
class COCODataset(torch.utils.data.Dataset):
    def __init__(self, annotation_file, image_dir, transforms=None):
        self.coco = COCO(annotation_file)  # Load the COCO annotations
        self.image_dir = image_dir
        self.transforms = transforms  # Transformations to be applied to the image
        self.image_ids = list(self.coco.imgs.keys())  # Get all image IDs

    def __len__(self):
        return len(self.image_ids)  # Return the number of images in the dataset

    def __getitem__(self, idx):
        image_id = self.image_ids[idx]  # Get the image ID at the index
        annotations = self.coco.loadAnns(self.coco.getAnnIds(imgIds=image_id))  # Load annotations for the image
        image_info = self.coco.loadImgs(image_id)[0]  # Get image info

        # Load the image
        image_path = os.path.join(self.image_dir, image_info["file_name"])
        image = Image.open(image_path).convert("RGB")

        # Process the annotations into bounding boxes and labels
        boxes = []
        labels = []
        for ann in annotations:
            x, y, width, height = ann["bbox"]
            if width > 0 and height > 0:  # Only add valid boxes (non-zero size)
                boxes.append([x, y, x + width, y + height])
                labels.append(ann["category_id"])

        # Convert to tensor
        if len(boxes) == 0:  # Handle images with no annotations
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([image_id])
        }

        # Apply transformations to the image, if any
        if self.transforms:
            image = self.transforms(image)

        return image, target


In [None]:
from torchvision import transforms

# Define the transformation function
def get_transform(train=True):
    transform_list = []
    if train:
        # Omit RandomHorizontalFlip since augmentation is already done in preprocessing
        pass
    transform_list.append(transforms.ToTensor())  # Converts image to Tensor
    transform_list.append(transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]))  # Normalize the image

    return transforms.Compose(transform_list)



In [None]:
from tqdm import tqdm

# Define the training function
def train_one_epoch(model, optimizer, data_loader, device):
    model.train()
    epoch_loss = 0
    for images, targets in tqdm(data_loader, desc="Training"):
        images = [image.to(device) for image in images]  # Move images to device (GPU)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]  # Move targets to device

        optimizer.zero_grad()

        # Forward pass
        loss_dict = model(images, targets)

        # Sum all losses in the loss_dict
        if isinstance(loss_dict, list):
            losses = sum(loss for loss in loss_dict)  # Don't use .item(), keep it as a tensor
        else:
            losses = sum(loss for loss in loss_dict.values())  # Same here

        losses.backward()  # Now losses is a tensor, so backward() can be called
        optimizer.step()  # Update the weights
        epoch_loss += losses.item()  # Add the loss to the epoch's loss (convert to float here for reporting)

    avg_epoch_loss = epoch_loss / len(data_loader)
    return avg_epoch_loss

# Define the validation function
import torch

def validate_one_epoch(model, data_loader, device):
    model.train()  # Temporarily enable training mode to compute validation loss

    total_loss = 0
    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        with torch.no_grad():  # Disable gradient computation for efficiency
            loss_dict = model(images, targets)

            # Check if the model returned a valid loss dictionary
            if isinstance(loss_dict, dict):
                batch_loss = sum(loss.item() for loss in loss_dict.values())
            else:
                raise ValueError("Model did not return a loss dictionary during validation.")

            total_loss += batch_loss

    return total_loss / len(data_loader)


In [None]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection import FasterRCNN_ResNet50_FPN_Weights
from torch.utils.data import DataLoader
import torch
import os
import csv

# Create datasets for training and validation
train_images_path = "F:/data/dataset/train/images"
valid_images_path = "F:/data/dataset/valid/images"
train_annotations_path = "F:/data/train_annotations1.json"
valid_annotations_path = "F:/data/valid_annotations1.json"

train_dataset = COCODataset(train_annotations_path, train_images_path, transforms=get_transform(train=True))
val_dataset = COCODataset(valid_annotations_path, valid_images_path, transforms=get_transform(train=False))

# Create DataLoader for both train and validation datasets
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

# Initialize the model, optimizer, and learning rate scheduler
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model = fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.COCO_V1)
model.to(device)

optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Ensure the model directory exists
os.makedirs("F:/data/model_cnn/faster_r_cnn_1", exist_ok=True)

# Create a CSV file to store mAP values
with open("F:/data/model_cnn/faster_r_cnn_1/map_values.csv", "w", newline="") as csvfile:
    fieldnames = ["Epoch", "mAP"]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")

    train_loss = train_one_epoch(model, optimizer, train_loader, device)
    val_loss = validate_one_epoch(model, val_loader, device)

    lr_scheduler.step()

    print(f"Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")

    # Save model checkpoint
    save_path = f"F:/data/model_cnn/faster_r_cnn_1/faster_rcnn_epoch_{epoch + 1}.pth"
    torch.save(model.state_dict(), save_path)
    print(f"Model saved for epoch {epoch + 1} at {save_path}")


# finetune fasterrcnn_resnet50_fpn

In [None]:
pip install torchmetrics

In [None]:
from tqdm import tqdm

# Define the training function
def train_one_epoch(model, optimizer, data_loader, device):
    model.train()
    epoch_loss = 0
    for images, targets in tqdm(data_loader, desc="Training"):
        images = [image.to(device) for image in images]  # Move images to device (GPU)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]  # Move targets to device

        optimizer.zero_grad()

        # Forward pass
        loss_dict = model(images, targets)

        # Sum all losses in the loss_dict
        if isinstance(loss_dict, list):
            losses = sum(loss for loss in loss_dict)  # Don't use .item(), keep it as a tensor
        else:
            losses = sum(loss for loss in loss_dict.values())  # Same here

        losses.backward()  # Now losses is a tensor, so backward() can be called
        optimizer.step()  # Update the weights
        epoch_loss += losses.item()  # Add the loss to the epoch's loss (convert to float here for reporting)

    avg_epoch_loss = epoch_loss / len(data_loader)
    return avg_epoch_loss

# Define the validation function
import torch

def validate_one_epoch(model, data_loader, device):
    model.train()  # Temporarily enable training mode to compute validation loss

    total_loss = 0
    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        with torch.no_grad():  # Disable gradient computation for efficiency
            loss_dict = model(images, targets)

            # Check if the model returned a valid loss dictionary
            if isinstance(loss_dict, dict):
                batch_loss = sum(loss.item() for loss in loss_dict.values())
            else:
                raise ValueError("Model did not return a loss dictionary during validation.")

            total_loss += batch_loss

    return total_loss / len(data_loader)



In [None]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights
from torch.utils.data import DataLoader
import torch
import os
import csv
from tqdm import tqdm
from torchmetrics import AveragePrecision, ConfusionMatrix, F1Score
import numpy as np

# ... (Your COCODataset and get_transform definitions) ...

# Create datasets for training and validation
train_images_path = "F:/data/dataset/train/images"
valid_images_path = "F:/data/dataset/valid/images"
train_annotations_path = "F:/data/train_annotations.json"
valid_annotations_path = "F:/data/valid_annotations.json"

train_dataset = COCODataset(train_annotations_path, train_images_path, transforms=get_transform(train=True))
val_dataset = COCODataset(valid_annotations_path, valid_images_path, transforms=get_transform(train=False))

# Create DataLoader for both train and validation datasets
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

# Initialize the model, optimizer, and learning rate scheduler
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model = fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.COCO_V1)
model.to(device)

optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Ensure the model directory exists
os.makedirs("F:/data/model_cnn/faster_r_cnn_1/results", exist_ok=True)

# Create a CSV file to store results
with open("F:/data/model_cnn/faster_r_cnn_1/results/results.csv", "w", newline="") as csvfile:
    fieldnames = ["Epoch", "Train Loss", "Validation Loss", "mAP", "F1 Score"]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()

# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")

    train_loss = train_one_epoch(model, optimizer, train_loader, device)
    val_loss, mAP, conf_matrix, f1_score = validate_one_epoch(model, val_loader, device)

    lr_scheduler.step()

    print(f"Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}, mAP: {mAP:.4f}, F1 Score: {f1_score:.4f}")

    # Save results to CSV
    writer.writerow({"Epoch": epoch + 1, "Train Loss": train_loss, "Validation Loss": val_loss, "mAP": mAP, "F1 Score": f1_score})

    # Save model checkpoint
    save_path = f"F:/data/model_cnn/faster_r_cnn_1/faster_rcnn_epoch_{epoch + 1}.pth"
    torch.save(model.state_dict(), save_path)
    print(f"Model saved for epoch {epoch + 1} at {save_path}")

    # Save confusion matrix
    np.save(f"F:/data/model_cnn/faster_r_cnn_1/results/confusion_matrix_epoch_{epoch + 1}.npy", conf_matrix.cpu().numpy())

# Define the training function
def train_one_epoch(model, optimizer, data_loader, device):
    model.train()
    epoch_loss = 0
    for images, targets in tqdm(data_loader, desc="Training"):
        images = [image.to(device) for image in images]  # Move images to device (GPU)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]  # Move targets to device

        optimizer.zero_grad()

        # Forward pass
        loss_dict = model(images, targets)

        # Sum all losses in the loss_dict
        if isinstance(loss_dict, list):
            losses = sum(loss for loss in loss_dict)  # Don't use .item(), keep it as a tensor
        else:
            losses = sum(loss for loss in loss_dict.values())  # Same here

        losses.backward()  # Now losses is a tensor, so backward() can be called
        optimizer.step()  # Update the weights
        epoch_loss += losses.item()  # Add the loss to the epoch's loss (convert to float here for reporting)

    avg_epoch_loss = epoch_loss / len(data_loader)
    return avg_epoch_loss

# Define the validation function
def validate_one_epoch(model, data_loader, device):
    model.eval()
    total_loss = 0
    evaluator = AveragePrecision(iou_threshold=0.5)
    conf_mat = ConfusionMatrix(num_classes=len(your_class_labels))
    f1_score = F1Score(num_classes=len(your_class_labels), average='macro')  # Calculate macro-averaged F1-score

    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        with torch.no_grad():
            predictions = model(images)[0]  # Get predictions (assuming model output is a tuple)
            pred_labels = [torch.argmax(p['scores'], dim=0).item() for p in predictions]
            true_labels = [t['labels'].cpu().numpy() for t in targets]

            # Calculate loss (unchanged)
            if isinstance(loss_dict, dict):
                batch_loss = sum(loss.item() for loss in loss_dict.values())
            else:
                raise ValueError("Model did not return a loss dictionary during validation.")

            total_loss += batch_loss

            # Update evaluators
            evaluator.update(predictions, targets)
            conf_mat.update(torch.tensor(pred_labels), torch.tensor(true_labels))
            f1_score.update(torch.tensor(pred_labels), torch.tensor(true_labels))

    # Calculate and return metrics
    mAP = evaluator.compute()
    conf_matrix = conf_mat.compute()
    f1_score = f1_score.compute()
    return total_loss / len(data_loader), mAP, conf_matrix, f1_score

In [None]:
import os
print("Current working directory:", os.getcwd())


In [None]:
import os
import torch
from PIL import Image
from torchvision.transforms import functional as F
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from torchvision.models.detection import fasterrcnn_resnet50_fpn

# Paths
test_images_path = "F:/data/dataset/test/images"
results_path = "F:/data/model_cnn/faster_r_cnn_1/results"
os.makedirs(results_path, exist_ok=True)

# Device setup
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Load the trained model
model = fasterrcnn_resnet50_fpn(pretrained=False)  # Ensure pretrained=False since it's a custom-trained model
model.load_state_dict(torch.load('F:/data/model_cnn/faster_r_cnn_1/faster_rcnn_epoch_10.pth'))  # Update with your trained model path
model.to(device)
model.eval()

# Threshold for filtering low-confidence predictions
threshold = 0.5

# Function to test and save results
def test_and_save_results(image_path, output_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = F.to_tensor(image).unsqueeze(0).to(device)

    # Run inference
    with torch.no_grad():
        prediction = model(image_tensor)

    # Extract predictions
    boxes = prediction[0]['boxes'].cpu().numpy()
    labels = prediction[0]['labels'].cpu().numpy()
    scores = prediction[0]['scores'].cpu().numpy()

    # Filter predictions by threshold
    filtered_boxes = boxes[scores >= threshold]
    filtered_labels = labels[scores >= threshold]
    filtered_scores = scores[scores >= threshold]

    # Plot the image with bounding boxes
    fig, ax = plt.subplots(1, figsize=(12, 9))
    ax.imshow(image)

    for box, label, score in zip(filtered_boxes, filtered_labels, filtered_scores):
        x_min, y_min, x_max, y_max = box
        rect = patches.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min,
                                  linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
        ax.text(x_min, y_min, f"{label} ({score:.2f})",
                bbox=dict(facecolor='yellow', alpha=0.5), fontsize=12, color='black')

    # Save the output
    plt.axis('off')
    plt.savefig(output_path, bbox_inches='tight', pad_inches=0)
    plt.close(fig)

# Test all images in the folder
for image_name in os.listdir(test_images_path):
    image_path = os.path.join(test_images_path, image_name)
    if os.path.isfile(image_path) and image_name.lower().endswith(('jpg', 'jpeg', 'png')):
        output_path = os.path.join(results_path, f"results_{image_name}")
        print(f"Processing: {image_name}")
        test_and_save_results(image_path, output_path)

print(f"All results saved in {results_path}")


In [None]:
import torch
from PIL import Image
from torchvision.transforms import functional as F
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from torchvision.models.detection import fasterrcnn_resnet50_fpn

# Check and set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the trained model
model = fasterrcnn_resnet50_fpn(pretrained=False)  # Custom-trained model, no pretrained weights
model.load_state_dict(torch.load('F:/data/model_cnn/faster_r_cnn/faster_rcnn_epoch_10.pth', map_location=device))
model.eval()  # Set model to evaluation mode
model.to(device)  # Move model to the device

# Load and preprocess the image
image_path = "F:/data/dataset/test/images/images (5).jpeg"
image = Image.open(image_path).convert("RGB")  # Open the image in RGB mode

# Convert the image to tensor
image_tensor = F.to_tensor(image).unsqueeze(0).to(device)  # Add batch dimension and move to the device

# Run inference
with torch.no_grad():  # Disable gradient computation
    prediction = model(image_tensor)

# Extract predictions
boxes = prediction[0]['boxes'].cpu().numpy()
labels = prediction[0]['labels'].cpu().numpy()
scores = prediction[0]['scores'].cpu().numpy()

# Set a confidence threshold
threshold = 0.5
filtered_boxes = boxes[scores >= threshold]
filtered_labels = labels[scores >= threshold]
filtered_scores = scores[scores >= threshold]

# Plot the image with bounding boxes
fig, ax = plt.subplots(1, figsize=(12, 9))
ax.imshow(image)

# Draw bounding boxes and labels
for box, label, score in zip(filtered_boxes, filtered_labels, filtered_scores):
    x_min, y_min, x_max, y_max = box  # Coordinates of the bounding box
    rect = patches.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, linewidth=2, edgecolor='r', facecolor='none')
    ax.add_patch(rect)
    ax.text(x_min, y_min, f"{label} ({score:.2f})", bbox=dict(facecolor='yellow', alpha=0.5), fontsize=12, color='black')

# Save and display the result
output_path = "F:/data/dataset/test/images/results/p_images (5).jpeg"
plt.savefig(output_path)
plt.show()


# YOLO

In [None]:
from ultralytics import YOLO

# Load a model
model = YOLO("yolov8m.pt")

# Train the model
train_results = model.train(
    data=r"F:/data/data.yaml",  # Path to dataset YAML
    epochs=50,                  # Number of training epochs
    imgsz=640,                  # Training image size (default is 640 for YOLOv8)
    device="gpu"                # Device to run on (CPU or GPU)
)

# Evaluate model performance on the validation set
metrics = model.val()

# Perform object detection on an image
results = model(r"F:/data/dataset/test/images/images (1).jpeg")
results[0].show()  # Display the first result

# Export the model to ONNX format
path = model.export(format="onnx")  # Save the model in ONNX format


# yolo

In [None]:
!yolo task=detect mode=predict model=C:\Users\Ahsan\runs\detect\train25\weights\best.onnx conf=0.25 source=F:\data\dataset\test\images

In [None]:
import torch
from torchvision import transforms
from PIL import Image
from models import YOLO  # Assuming your YOLO model is defined in 'models.py'

# Load the trained model
model = YOLO()  # Replace with your model's class
model.load_state_dict(torch.load('C:/Users/Ahsan/runs/detect/train25/weights/best.pt'))
model.eval()

# Define image transformations
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Path to the test images directory
test_dir = 'data/dataset/test/images'

# Iterate through each image in the test directory
for image_path in os.listdir(test_dir):
    image = Image.open(os.path.join(test_dir, image_path))
    image_tensor = transform(image).unsqueeze(0)

    with torch.no_grad():
        detections = model(image_tensor)

    # Process detections (e.g., filter by confidence, draw bounding boxes)
    # ...

    # Optionally, save or display the results
    # ...

In [None]:
from ultralytics import YOLO
import cv2
from IPython.display import Video  # For displaying the video in Colab

# Load your YOLO model
model = YOLO('F:/data/model_cnn/faster_r_cnn_1/faster_rcnn_epoch_10.pth')

# Path to your input video
video_path = 'F:/data/dataset/test_video/testvideo1.mp4'  # Replace with your video file path
output_path = 'F:/data/dataset/test_video/results/testvideo_cnn1.mp4'

# Open video
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Define video writer to save the annotated video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Process the video frame by frame
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Perform detection
    results = model(frame)

    # Annotate the frame
    annotated_frame = results[0].plot()

    # Write the annotated frame to the output video
    out.write(annotated_frame)

# Release resources
cap.release()
out.release()

# Display the processed video in Colab
print(f"Processed video saved at: {output_path}")
Video(output_path, embed=True)

# fasterrcnn_resnet50_fpn

In [None]:
import cv2
import torch
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F
from PIL import Image
from IPython.display import Video  # For displaying the video in Colab

# Load your Faster R-CNN model
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model = fasterrcnn_resnet50_fpn(weights=None)  # No pre-trained weights
model.load_state_dict(torch.load('F:/data/model_cnn/faster_r_cnn_1/faster_rcnn_epoch_10.pth'))
model.to(device)
model.eval()

# Path to your input video
video_path = 'F:/data/dataset/test_video/testvideo7.mp4'  # Replace with your video file path
output_path = 'F:/data/dataset/test_video/results/testvideo_cnn7.mp4'

# Open video
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Define video writer to save the annotated video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Process the video frame by frame
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert frame (NumPy array) to PIL image
    pil_image = Image.fromarray(frame)

    # Apply the same transformations as in training
    transform = get_transform(train=False)  # Use the same transform as validation
    pil_image = transform(pil_image).unsqueeze(0).to(device)  # Add batch dimension and send to device

    # Perform detection
    with torch.no_grad():
        prediction = model(pil_image)  # Get predictions (boxes, labels, scores)

    # Draw bounding boxes on the frame
    boxes = prediction[0]['boxes'].cpu().numpy()
    labels = prediction[0]['labels'].cpu().numpy()
    scores = prediction[0]['scores'].cpu().numpy()

    # Iterate over the detections and draw boxes
    for box, label, score in zip(boxes, labels, scores):
        if score > 0.5:  # Filter out weak detections
            x1, y1, x2, y2 = box
            frame = cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
            label_text = f"Label: {label}, Score: {score:.2f}"
            frame = cv2.putText(frame, label_text, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)

    # Write the annotated frame to the output video
    out.write(frame)

# Release resources
cap.release()
out.release()

# Display the processed video in Colab
print(f"Processed video saved at: {output_path}")
Video(output_path, embed=True)


In [None]:
import os
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Flatten, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import Sequence
import cv2

# Load COCO annotations
def load_coco_annotations(annotation_file):
    with open(annotation_file, 'r') as f:
        data = json.load(f)
    return data

# COCO Data Generator
class CocoDataGenerator(Sequence):
    def __init__(self, image_dir, annotations, batch_size, target_size):
        self.image_dir = image_dir
        self.annotations = annotations['annotations']
        self.images = {img['id']: img for img in annotations['images']}
        self.categories = {cat['id']: cat for cat in annotations['categories']}
        self.batch_size = batch_size
        self.target_size = target_size
        self.num_samples = len(self.annotations)
        self.indices = list(range(self.num_samples))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(self.num_samples / self.batch_size))

    def __getitem__(self, index):
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        images = []
        labels = []
        for idx in batch_indices:
            ann = self.annotations[idx]
            img_info = self.images[ann['image_id']]
            img_path = os.path.join(self.image_dir, img_info['file_name'])
            if not os.path.exists(img_path):
                print(f"Image not found: {img_path}")
                continue
            img = cv2.imread(img_path)
            if img is None:
                print(f"Failed to load image: {img_path}")
                continue
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, self.target_size)
            img = img / 255.0

            # Generate one-hot label
            label = np.zeros(len(self.categories))
            label[ann['category_id'] - 1] = 1  # Category IDs in annotations start from 1
            images.append(img)
            labels.append(label)

        return np.array(images), np.array(labels)

    def on_epoch_end(self):
        np.random.shuffle(self.indices)

# Directories and parameters
train_annotations_file = "F:/data/train_annotations1.json"
valid_annotations_file = "F:/data/valid_annotations1.json"
train_image_dir = "F:/data/dataset/train/images"
valid_image_dir = "F:/data/dataset/valid/images"
target_size = (224, 224)
batch_size = 32
num_classes = 3

# Load annotations
train_annotations = load_coco_annotations(train_annotations_file)
valid_annotations = load_coco_annotations(valid_annotations_file)

# Create data generators
train_generator = CocoDataGenerator(train_image_dir, train_annotations, batch_size, target_size)
valid_generator = CocoDataGenerator(valid_image_dir, valid_annotations, batch_size, target_size)

# Load VGG16 model
base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
x = base_model.output
x = Flatten()(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.5)(x)
predictions = Dense(num_classes, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=predictions)

# Freeze base model layers
for layer in base_model.layers:
    layer.trainable = False

# Compile the model
model.compile(optimizer=Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model
model.fit(
    train_generator,
    validation_data=valid_generator,
    epochs=10,
    steps_per_epoch=len(train_generator),
    validation_steps=len(valid_generator)
)


In [None]:
import os
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Flatten, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import Sequence
from tensorflow.keras.callbacks import ModelCheckpoint
import cv2

# Load COCO annotations
def load_coco_annotations(annotation_file):
    with open(annotation_file, 'r') as f:
        data = json.load(f)
    return data

# COCO Data Generator
class CocoDataGenerator(Sequence):
    def __init__(self, image_dir, annotations, batch_size, target_size):
        self.image_dir = image_dir
        self.annotations = annotations['annotations']
        self.images = {img['id']: img for img in annotations['images']}
        self.categories = {cat['id']: cat for cat in annotations['categories']}
        self.batch_size = batch_size
        self.target_size = target_size
        self.num_samples = len(self.annotations)
        self.indices = list(range(self.num_samples))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(self.num_samples / self.batch_size))

    def __getitem__(self, index):
        batch_indices = self.indices[index * self.batch_size:(index + 1) * self.batch_size]
        images = []
        labels = []
        for idx in batch_indices:
            ann = self.annotations[idx]
            img_info = self.images[ann['image_id']]
            img_path = os.path.join(self.image_dir, img_info['file_name'])
            if not os.path.exists(img_path):
                print(f"Image not found: {img_path}")
                continue
            img = cv2.imread(img_path)
            if img is None:
                print(f"Failed to load image: {img_path}")
                continue
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, self.target_size)
            img = img / 255.0

            # Generate one-hot label
            label = np.zeros(len(self.categories))
            label[ann['category_id'] - 1] = 1  # Category IDs in annotations start from 1
            images.append(img)
            labels.append(label)

        return np.array(images), np.array(labels)

    def on_epoch_end(self):
        np.random.shuffle(self.indices)

# Directories and parameters
train_annotations_file = r"archive\train_annotations1.json"
valid_annotations_file = r"archive\valid_annotations1.json"
train_image_dir = r"archive\dataset\dataset\train\images"
valid_image_dir = r"archive\dataset\dataset\valid\images"
target_size = (224, 224)
batch_size = 32
num_classes = 3

# Load annotations
train_annotations = load_coco_annotations(train_annotations_file)
valid_annotations = load_coco_annotations(valid_annotations_file)

# Create data generators
train_generator = CocoDataGenerator(train_image_dir, train_annotations, batch_size, target_size)
valid_generator = CocoDataGenerator(valid_image_dir, valid_annotations, batch_size, target_size)

# Load VGG16 model
base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
x = base_model.output
x = Flatten()(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.5)(x)
predictions = Dense(num_classes, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=predictions)

# Freeze base model layers
for layer in base_model.layers:
    layer.trainable = False

# Compile the model
model.compile(optimizer=Adam(learning_rate=1e-4), loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model
checkpoint = ModelCheckpoint('archive\dataset\dataset/vgg/best_model.keras', monitor='val_loss', save_best_only=True, mode='min', verbose=1)

# Train the model with the checkpoint callback
model.fit(
    train_generator,
    validation_data=valid_generator,
    epochs=10,
    steps_per_epoch=len(train_generator),
    validation_steps=len(valid_generator),
    callbacks=[checkpoint]  # Add the checkpoint callback here
)


In [None]:
import cv2
import numpy as np
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing import image
from PIL import Image
from IPython.display import Video  # For displaying the video in Colab

# Load the VGG16 model
vgg_model_path = 'archive\dataset\dataset/vgg/best_model.keras'  # Path to your saved VGG model
model = load_model(vgg_model_path)

# Path to your input video
video_path  = os.path.join('archive','dataset','dataset','test','video','testvideo1.mp4')
output_path = os.path.join('archive','dataset','dataset','test','video','results','testvideo_vgg1.mp4')


# Open video
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Define video writer to save the annotated video
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Define the categories
categories = ['pothole', 'cracks', 'open_manhole']  # Assuming you have 3 categories

# Process the video frame by frame
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert frame (NumPy array) to PIL image
    pil_image = Image.fromarray(frame)

    # Resize and preprocess the image for VGG16
    img = pil_image.resize((224, 224))
    img_array = image.img_to_array(img)
    img_array = np.expand_dims(img_array, axis=0)  # Add batch dimension
    img_array = img_array / 255.0  # Normalize the image

    # Perform prediction
    prediction = model.predict(img_array)

    # Get the predicted class
    predicted_class = np.argmax(prediction, axis=1)
    predicted_label = categories[predicted_class[0]]

    # Draw label on the frame
    label_text = f"Prediction: {predicted_label}, Score: {prediction[0][predicted_class[0]]:.2f}"
    frame = cv2.putText(frame, label_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 2)

    # Write the annotated frame to the output video
    out.write(frame)

# Release resources
cap.release()
out.release()

# Display the processed video in Colab
print(f"Processed video saved at: {output_path}")
Video(output_path, embed=True)


# Fintune Classification model (Torch)

In [None]:
!pip install efficientnet-pytorch


In [None]:
import json
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

class COCODataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None):
        with open(annotations_file) as f:
            self.coco_data = json.load(f)
        self.img_dir = img_dir
        self.transform = transform
        self.image_id_to_filename = {img["id"]: img["file_name"] for img in self.coco_data["images"]}
        self.annotations = self.coco_data["annotations"]
        self.categories = {cat["id"]: cat["name"] for cat in self.coco_data["categories"]}

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        annotation = self.annotations[idx]
        img_id = annotation["image_id"]
        category_id = annotation["category_id"]

        # Load the image
        img_path = f"{self.img_dir}/{self.image_id_to_filename[img_id]}"
        image = Image.open(img_path).convert("RGB")

        # Apply transformations
        if self.transform:
            image = self.transform(image)

        # Return image and label (0-based index)
        return image, category_id - 1


In [None]:
# Paths to data
train_annotations_file = os.path.join("archive", "train_annotations1.json")
valid_annotations_file = os.path.join("archive", "valid_annotations1.json")
train_image_dir = os.path.join("archive", "dataset", "dataset", "train", "images")
valid_image_dir = os.path.join("archive", "dataset", "dataset", "valid", "images")

# Image transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to match model input size
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Datasets
train_dataset = COCODataset(train_annotations_file, train_image_dir, transform=transform)
valid_dataset = COCODataset(valid_annotations_file, valid_image_dir, transform=transform)

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False)

print(f"Train dataset size: {len(train_dataset)}")
print(f"Valid dataset size: {len(valid_dataset)}")


In [None]:
import torch
from efficientnet_pytorch import EfficientNet

# Load EfficientNet model
model = EfficientNet.from_pretrained('efficientnet-b0', num_classes=3)  # Adjust num_classes to match your categories
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
model = model.to(device)

# Optimizer and loss function
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [None]:
for epoch in range(25):  # Adjust epochs as needed
    model.train()
    running_loss = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch {epoch + 1}, Loss: {running_loss / len(train_loader)}")


In [None]:
import torch
from efficientnet_pytorch import EfficientNet

# Load EfficientNet model
model = EfficientNet.from_pretrained('efficientnet-b0', num_classes=3)  # Adjust num_classes to match your categories
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# Optimizer and loss function
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [None]:
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for inputs, labels in valid_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

print(f"Validation Accuracy: {correct / total:.2f}")


In [None]:
torch.save(model.state_dict(), "archive\dataset\dataset\efficientnet/model.pth")


In [None]:
# inference using the trained model
import torch
from efficientnet_pytorch import EfficientNet
from PIL import Image
from torchvision import transforms
# Load the trained model
model = EfficientNet.from_pretrained('efficientnet-b0', num_classes=3)  # Adjust num_classes to match your categories
model.load_state_dict(torch.load("archive\dataset\dataset\efficientnet/model.pth"))
model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
# Define image transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to match model input size
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Function to predict the class of an image
def predict_image(image_path):
    image = Image.open(image_path).convert("RGB")
    image = transform(image).unsqueeze(0).to(device)  # Add batch dimension and move to device

    with torch.no_grad():
        outputs = model(image)
        _, predicted = torch.max(outputs, 1)

    return predicted.item()
# Example usage
image_path = os.path.join(base_dir, "dataset", "dataset", "test", "images", "360_F_916445076_KAGVDE2nuHOIxNARmepMJTgHQqd9Y7y0.jpg")  # Replace with your image path
predicted_class = predict_image(image_path)
print(f"Predicted class: {predicted_class}")

# Image Classification (Torch)

In [None]:
!pip install efficientnet-pytorch
!pip install pillow


In [None]:
import os
import json
from PIL import Image
from torch.utils.data import Dataset
import torchvision.transforms as transforms

class CustomDataset(Dataset):
    def __init__(self, annotations_file, images_dir, transform=None):
        """
        Args:
            annotations_file (string): Path to the annotations file (COCO format).
            images_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied on an image.
        """
        with open(annotations_file) as f:
            self.annotations = json.load(f)

        self.images_dir = images_dir
        self.transform = transform

        # Map category id to label (adjust if you have different categories)
        self.category_map = {
            1: "pothole",
            2: "cracks",
            3: "open_manhole"
        }

    def __len__(self):
        return len(self.annotations['images'])

    def __getitem__(self, idx):
        # Get the image information
        image_info = self.annotations['images'][idx]
        image_id = image_info['id']
        image_filename = image_info['file_name']
        image_path = os.path.join(self.images_dir, image_filename)

        # Open the image
        image = Image.open(image_path).convert("RGB")

        # Get the annotations for the image
        annotations = [anno for anno in self.annotations['annotations'] if anno['image_id'] == image_id]

        # Prepare the label (class id)
        labels = []
        for anno in annotations:
            category_id = anno['category_id']
            labels.append(category_id - 1)  # Convert to 0-indexed class

        # If you have multiple annotations, pick the first one (or handle it based on your logic)
        label = labels[0] if len(labels) > 0 else 0

        # Apply the transformation if provided
        if self.transform:
            image = self.transform(image)

        return image, label


In [None]:
from torch.utils.data import DataLoader
from torchvision import datasets

# Define data transformations for training and validation
transform_train = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize image to match EfficientNet input size
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet mean and std
])

transform_valid = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

base_dir = 'archive'

# Define Dataset and DataLoader
train_dataset = CustomDataset(
    annotations_file=os.path.join(base_dir, 'train_annotations1.json'),
    images_dir      =os.path.join(base_dir, 'dataset', 'dataset', 'train', 'images'),
    transform       =transform_train
)

valid_dataset = CustomDataset(
    annotations_file=os.path.join(base_dir, 'valid_annotations1.json'),
    images_dir      =os.path.join(base_dir, 'dataset', 'dataset', 'valid', 'images'),
    transform       =transform_valid
)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)


In [None]:
!pip install efficientnet-pytorch

In [None]:
from efficientnet_pytorch import EfficientNet
import torch
import torch.nn as nn

# Load pre-trained EfficientNet model (EfficientNet-B0 in this case)
model = EfficientNet.from_pretrained('efficientnet-b0')

# Modify the classifier layer to match your number of classes
num_classes = 3  # You have 3 classes: pothole, cracks, open_manhole
model._fc = nn.Linear(model._fc.in_features, num_classes)

# Move the model to GPU if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)


In [None]:
import torch.optim as optim
from torch import nn

# Define the optimizer (Adam)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Define the loss function (CrossEntropyLoss for classification)
criterion = nn.CrossEntropyLoss()

# Training loop
num_epochs = 10  # Set the number of epochs
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, targets in train_loader:
        images, targets = images.to(device), targets.to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, targets)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()

    train_accuracy = 100 * correct / total
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {train_accuracy:.2f}%")

    # Validation phase
    model.eval()
    valid_correct = 0
    valid_total = 0
    with torch.no_grad():
        for images, targets in valid_loader:
            images, targets = images.to(device), targets.to(device)

            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            valid_total += targets.size(0)
            valid_correct += (predicted == targets).sum().item()

    valid_accuracy = 100 * valid_correct / valid_total
    print(f"Validation Accuracy: {valid_accuracy:.2f}%")


In [None]:
# check the torch version
import torch
print(torch.__version__)

In [None]:
# Save the model
save_path = os.path.join('archive\dataset\dataset\efficientnet', 'efficientnet_model.pth')
torch.save(model.state_dict(), save_path)
print(f"Model saved to {save_path}")


In [None]:
import torch
from torchvision import transforms
from PIL import Image
import os
from efficientnet_pytorch import EfficientNet
import numpy as np


In [None]:
# Define the model (assuming you are using EfficientNet-B0, change if necessary)
model = EfficientNet.from_pretrained('efficientnet-b0', num_classes=3)

# Load the trained weights
model_path = 'archive\dataset\dataset\efficientnet\efficientnet_model.pth'
model.load_state_dict(torch.load(model_path))
model.eval()  # Set the model to evaluation mode


In [None]:
# Define the transformation for the input images
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize the image to the size expected by EfficientNet
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalization for EfficientNet
])


In [None]:
# Define class labels
class_names = ['pothole', 'cracks', 'open_manhole']  # Update based on your dataset

def classify_image(image_path):
    # Open the image
    img = Image.open(image_path).convert('RGB')

    # Apply the transformations
    img = transform(img).unsqueeze(0)  # Add batch dimension

    # Predict with the model
    with torch.no_grad():  # No need to calculate gradients during inference
        output = model(img)
        _, predicted_class = torch.max(output, 1)

    # Return the predicted class label
    return class_names[predicted_class.item()]


In [None]:
# Directory containing the test images
test_dir = 'archive/dataset/dataset/test/images'

# List all the images in the directory
test_images = [f for f in os.listdir(test_dir) if f.endswith('.jpg') or f.endswith('.png')]

# Iterate over all test images and classify them
for image_name in test_images:
    image_path = os.path.join(test_dir, image_name)

    # Classify the image
    predicted_class = classify_image(image_path)

    # Print the result
    print(f"Image: {image_name} - Predicted Class: {predicted_class}")
