In [None]:
# Check GPU type
# !nvidia-smi

In [None]:
# Install ultralytics
# !pip -q install  ultralytics

In [None]:
from pathlib import Path
import pandas as pd
import os
from sklearn.model_selection import train_test_split
import shutil
from tqdm import tqdm
import cv2
import yaml
import matplotlib.pyplot as plt
from ultralytics import YOLO

In [None]:
# Set up directoris for training a yolo model

# Images directories
DATASET_DIR = Path('datasets/dataset')
IMAGES_DIR = DATASET_DIR / 'images'
TRAIN_IMAGES_DIR = IMAGES_DIR / 'train'
VAL_IMAGES_DIR = IMAGES_DIR / 'val'
TEST_IMAGES_DIR = IMAGES_DIR / 'test'

# Labels directories
LABELS_DIR = DATASET_DIR / 'labels'
TRAIN_LABELS_DIR = LABELS_DIR / 'train'
VAL_LABELS_DIR = LABELS_DIR / 'val'
TEST_LABELS_DIR = LABELS_DIR / 'test'

In [None]:
# Path to where your data is stored
DATA_DIR = Path('.')

# Preview data files available
os.listdir(DATA_DIR)

In [None]:
# Load train and test files
train = pd.read_csv(DATA_DIR / 'train.csv')
# test = pd.read_csv(DATA_DIR / 'Test.csv')
ss = pd.read_csv(DATA_DIR / 'sample_submission.csv')

# Add an image_path column
# train['image_path'] = [Path('images/' + x) for x in train.Image_ID]
# test['image_path'] = [Path('images/' + x) for x in test.Image_ID]

# Map str classes to ints (label encoding targets)
train['class_id'] = train['label'].map({'Jett': 0, 'Phoenix': 1, 'Sage': 2, 'Brimstone': 3})

# Preview the head of the train set
train.head()

In [None]:
# Split data into training and validation
train_unique_imgs_df = train.drop_duplicates(subset = ['filename'], ignore_index = True)
X_train, X_val = train_test_split(train_unique_imgs_df, test_size = 0.25, stratify=train_unique_imgs_df['label'], random_state=42)

X_train = train[train.filename.isin(X_train.filename)]
X_val = train[train.filename.isin(X_val.filename)]

# Check shapes of training and validation data
X_train.shape, X_val.shape

In [None]:
# Preview target distribution, seems there a class imbalance that needs to be handled
X_train['label'].value_counts(normalize = True), X_val['label'].value_counts(normalize = True)

In [None]:
# Check if dirs exist, if they do, remove them, otherwise create them.
# This only needs to run once
for DIR in [TRAIN_IMAGES_DIR,VAL_IMAGES_DIR, TEST_IMAGES_DIR, TRAIN_LABELS_DIR,VAL_LABELS_DIR,TEST_LABELS_DIR]:
    if DIR.exists():
        shutil.rmtree(DIR)
    DIR.mkdir(parents=True, exist_ok = True)

In [None]:
# Copy train, val and test images to their respective dirs
for img in tqdm(X_train.filepath.unique()):
    shutil.copy(img, TRAIN_IMAGES_DIR / img.split('/')[-1])

for img in tqdm(X_val.filepath.unique()):
    shutil.copy(img, VAL_IMAGES_DIR / img.split('/')[-1])

for img in tqdm(os.listdir(DATA_DIR / 'test/images')):
    shutil.copy(DATA_DIR / 'test/images' / img, TEST_IMAGES_DIR / img.split('/')[-1])

In [None]:
# Function to convert the bboxes to yolo format and save them
def save_yolo_annotation(row):
    image_path, class_id, output_dir = row['filepath'], row['class_id'], row['output_dir']

    # img = cv2.imread(image_path)
    # if img is None:
    #     raise ValueError(f"Could not read image from path: {image_path}")

    height, width, _ = row['height'], row['width'], row['depth']

    label_file = Path(output_dir) / f"{Path(image_path).stem}.txt"

    ymin, xmin, ymax, xmax = row['ymin'], row['xmin'], row['ymax'], row['xmax']

    # Normalize the coordinates
    x_center = (xmin + xmax) / 2 / width
    y_center = (ymin + ymax) / 2 / height
    bbox_width = (xmax - xmin) / width
    bbox_height = (ymax - ymin) / height

    with open(label_file, 'a') as f:
        f.write(f"{class_id} {x_center:.6f} {y_center:.6f} {bbox_width:.6f} {bbox_height:.6f}\n")

# Parallelize the annotation saving process
def process_dataset(dataframe, output_dir):
    dataframe['output_dir'] = output_dir
    # with multiprocessing.Pool() as pool:
    #     list(tqdm(pool.imap(save_yolo_annotation, dataframe.to_dict('records')), total=len(dataframe)))
    for row in tqdm(dataframe.to_dict('records'), total=len(dataframe)):
        save_yolo_annotation(row)

# Save train and validation labels to their respective dirs
process_dataset(X_train, TRAIN_LABELS_DIR)
process_dataset(X_val, VAL_LABELS_DIR)

In [None]:
# Train images dir
TRAIN_IMAGES_DIR

In [None]:
# # Create a data.yaml file required by yolo
# class_names = train['label'].unique().tolist()
# num_classes = len(class_names)
#
# data_yaml = {
#     'train': str(TRAIN_IMAGES_DIR),
#     'val': str(VAL_IMAGES_DIR),
#     'test': str(TEST_IMAGES_DIR),
#     'nc': num_classes,
#     'names': class_names
# }
#
# yaml_path = 'data.yaml'
# with open(yaml_path, 'w') as file:
#     yaml.dump(data_yaml, file, default_flow_style=False)
#
# # Preview data yaml file
# data_yaml

In [None]:
# Plot some images and their bboxes to ensure the conversion was done correctly
def load_annotations(label_path):
    with open(label_path, 'r') as f:
        lines = f.readlines()
    boxes = []
    for line in lines:
        class_id, x_center, y_center, width, height = map(float, line.strip().split())
        boxes.append((class_id, x_center, y_center, width, height))
    return boxes

# Function to plot an image with its bounding boxes
def plot_image_with_boxes(image_path, boxes):
    # Load the image
    image = cv2.imread(str(image_path))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Get image dimensions
    h, w, _ = image.shape

    # Plot the image
    plt.figure(figsize=(10, 10))
    plt.imshow(image)

    # Plot each bounding box
    for box in boxes:
        class_id, x_center, y_center, width, height = box
        # Convert YOLO format to corner coordinates
        xmin = int((x_center - width / 2) * w)
        ymin = int((y_center - height / 2) * h)
        xmax = int((x_center + width / 2) * w)
        ymax = int((y_center + height / 2) * h)

        # Draw the bounding box
        plt.gca().add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                          edgecolor='red', facecolor='none', linewidth=2))
        plt.text(xmin, ymin - 10, f'Class {int(class_id)}', color='red', fontsize=12, weight='bold')

    plt.axis('off')
    plt.show()

# Directories for images and labels
IMAGE_DIR = TRAIN_IMAGES_DIR
LABEL_DIR = TRAIN_LABELS_DIR

# Plot a few images with their annotations
for image_name in os.listdir(IMAGE_DIR)[:3]:
    image_path = IMAGE_DIR / image_name
    label_path = LABEL_DIR / (image_name.replace('.jpg', '.txt').replace('.png', '.txt'))

    if label_path.exists():
        boxes = load_annotations(label_path)
        print(f"Plotting {image_name} with {len(boxes)} bounding boxes.")
        plot_image_with_boxes(image_path, boxes)
    else:
        print(f"No annotations found for {image_name}.")


In [None]:
# Set the high watermark ratio to 0.0 (disabling the upper limit)
# os.environ['PYTORCH_MPS_HIGH_WATERMARK_RATIO'] = '0.0'

In [None]:
# Load a yolo pretrained model
# model = YOLO('yolo11m.pt')
#
# # # Fine tune model to our data
# model.train(
#     data='data.yaml',          # Path to the dataset configuration
#     epochs=30,                 # Number of epochs
#     imgsz=640,                # Image size (height, width)
#     batch=20,                   # Batch size
#     device='cuda',                  # Device to use (0 for the first GPU)
#     patience=5)

In [None]:
# inference
ss.head()

In [None]:
# Load the trained YOLO model
model = YOLO('runs/detect/train/weights/best.pt')

# Path to the test images directory
test_dir_path = 'datasets/dataset/images/test'

# Get a list of all image files in the test directory
image_files = os.listdir(test_dir_path)

# Initialize an empty list to store the results for all images
all_data = []

# Iterate through each image in the directory
for image_file in tqdm(image_files):
    # Full path to the image
    img_path = os.path.join(test_dir_path, image_file)

    # Make predictions on the image
    results = model(img_path)

    # Extract bounding boxes, confidence scores, and class labels
    boxes = results[0].boxes.xyxy.tolist()  # Bounding boxes in xyxy format
    classes = results[0].boxes.cls.tolist()  # Class indices
    # confidences = results[0].boxes.conf.tolist()  # Confidence scores
    names = results[0].names  # Class names dictionary

    # Iterate through the results for this image
    prediction_string = ""

    for cls, box in zip(classes, boxes):
        prediction_string += names[int(cls)] + " " + " ".join(str(_) for _ in list(map(int, box))) + " "

    # Combine preds or add None for no prediction
    if prediction_string == "":
        prediction_string = "None 0 0 1 1"

    all_data.append({
        'Id': image_file.split(".")[0],
        'PredictionString': prediction_string.strip(),
    })

# Convert the list to a DataFrame for all images
sub = pd.DataFrame(all_data)
sub.head()

In [None]:
sub = sub.sort_values(by=['Id'])
sub.to_csv('benchmark_submission.csv', index=False)