In [None]:
import torch
from torch.utils.data import Dataset
from torchvision.transforms import v2

In [None]:
from pathlib import Path
import shutil
import tqdm
from ultralytics import YOLO
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from typing import Union
import os



In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")


In [None]:
lidar_images = Path("/cluster/projects/vc/data/ad/open/Poles")

In [None]:

def print_directory_structure(startpath):
    for root, dirs, files in os.walk(startpath):
        level = root.replace(startpath, '').count(os.sep)
        indent = ' ' * 4 * level
        print(f'{indent}{os.path.basename(root)}/')
        subindent = ' ' * 4 * (level + 1)
        for f in files:
            print(f'{subindent}{f}')
            break

print_directory_structure(str(lidar_images))


In [None]:
model = YOLO("yolo11s.pt")

In [None]:
if not (Path.cwd() / "data").exists():
    tqdm.tqdm(shutil.copytree(lidar_images, Path.cwd() / "data"))
    print(f"Files copied to {Path.cwd() / 'data'}")
else:
    print(f"Files already present in {Path.cwd() / 'data'}")

In [None]:
img_path = "data/test/images/combined_image_5_png.rf.9372598b5abf9cfff473ec530fdbf7be.jpg"
coord_path = "data/test/labels/combined_image_5_png.rf.9372598b5abf9cfff473ec530fdbf7be.txt"

double_coords_path = "data/test/labels/combined_image_13_png.rf.7390c64ced5859039424843ebf4207f9.txt"

In [None]:
#this only works for files with a single line of coordinates

def get_coords(file_path):
    with open(file_path, 'r') as file:
        file_contents = file.read()
        
        
    if "\n" in file_contents:
        # print(f"File {file_path} contains multiple lines of coordinates")
        file_contents = file_contents.split("\n")
        # print(f"file_contents: {file_contents}")
        for i, index in enumerate(file_contents):
            file_contents[i] = index.split(" ")
            for j, index2 in enumerate(file_contents[i]):
                file_contents[i][j] = float(index2)
        return file_contents
    else:    
        file_contents = file_contents.split(" ")
        for i, index in enumerate(file_contents):
            file_contents[i] = float(index)
        return [file_contents, None]


print(f"Function output:\n{get_coords(coord_path)}")
print(f"Function output:\n{get_coords(double_coords_path)}")

print(f"Expected coords:\n0 0.42138671875 0.74609375 0.0068359375 0.2421875\n0 0.5244140625 0.6484375 0.00537109375 0.1640625")

In [None]:
def show_boxes(image, box_coords):
    
    # print(f"Box coords: {box_coords}")
    fig, ax = plt.subplots()
    for i, box in enumerate(box_coords):
        # print(f"Box {i}: {box}")
        if box is None:
            # print("Finished printing rectangles")
            pass
        else:
            # print(i)
            _, x_center, y_center, width, height = box
            
            # print(f"Image shape: {image.shape[:2]}")
            img_height, img_width = image.shape[:2]
            
            
            # print(f"Box coordinates:  x_center: {x_center}, y_center: {y_center}, width: {width}, height: {height}")
            # print(f"Type: {type(x_center)}")

            x_center *= img_width
            y_center *= img_height
            width *= img_width
            height *= img_height
            
            # print(f"Box coordinates:  x_center: {x_center}, y_center: {y_center}, width: {width}, height: {height}")
            x1 = x_center - width / 2
            y1 = y_center - height / 2
            
            rect = patches.Rectangle((x1, y1), width, height, linewidth=1, edgecolor='r', facecolor='none')
            
            ax.add_patch(rect)
    
    ax.set_xlim(0, img_width)
    ax.set_ylim(img_height, 0)
    plt.imshow(image)
    
    plt.pause(0.001)
    
show_boxes(plt.imread(img_path), get_coords(double_coords_path))
print(f"Image shape: {plt.imread(img_path).shape}")

In [None]:
class LidarDataset(Dataset):
    """Custom dataset for LiDAR images"""
    
    def __init__(self, img_dir:Union[Path, str], coord_dir:Union[Path, str], transform=None):
        
        self.img_dir = img_dir
        self.coord_dir = coord_dir
        self.transform = transform
        
        self.images = sorted(os.listdir(img_dir))
        self.coords = sorted(os.listdir(coord_dir))
        if len(self.images) != len(self.coords):
            raise ValueError("Number of images and coordinates do not match")
    
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        img_path = os.path.join(self.img_dir, self.images[idx])
        coord_path = os.path.join(self.coord_dir, self.coords[idx])
        
        image = plt.imread(img_path)
        coords = get_coords(coord_path)
        
        if coords[-1] == None:
            coords = coords[:-1]
        
        if self.transform:
            image = self.transform(image)
        
        print(f"Image path: {img_path}")
        print(f"Coord path: {coord_path}")
        return image, coords

In [None]:
lidar_dataset = LidarDataset(Path("data/test/images"), Path("data/test/labels"))

for i, sample in enumerate(lidar_dataset):
    show_boxes(sample[0], sample[1])
    if i == 5:
        break

In [None]:
transforms = v2.Compose([
    v2.Resize(size=(64, 512), antialias=True),
    v2.RandomHorizontalFlip(p=0.5),
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
]) 

In [None]:
transformed_dataset = LidarDataset(Path("data/test/images"), Path("data/test/labels"), transform=transforms)

for i, sample in enumerate(lidar_dataset):
    show_boxes(sample[0], sample[1])
    if i == 5:
        break

In [None]:
del lidar_dataset
del transformed_dataset

In [None]:
# Create validation folder if it doesn't exist
# val_folder.mkdir(parents=True, exist_ok=True)

DEBUG = False
# Define the paths
if DEBUG:
    images_folder = Path("data/train/images")
    labels_folder = Path("data/train/labels")
    train_images_folder = Path("data/training/images")
    train_labels_folder = Path("data/training/labels")
    val_images_folder = Path("data/valid/images")
    val_labels_folder = Path("data/valid/labels")

    # Create directories if they don't exist
    train_images_folder.mkdir(parents=True, exist_ok=True)
    train_labels_folder.mkdir(parents=True, exist_ok=True)
    val_images_folder.mkdir(parents=True, exist_ok=True)
    val_labels_folder.mkdir(parents=True, exist_ok=True)

    # Get list of all image and label files
    image_files = sorted(images_folder.glob("*.jpg")) 
    label_files = sorted(labels_folder.glob("*.txt")) 

    # Ensure that the number of images and labels match
    assert len(image_files) == len(label_files), "Number of images and labels do not match"

    # Split the data into training and validation sets
    train_images, val_images, train_labels, val_labels = train_test_split(
        image_files, label_files, test_size=0.2, random_state=42
    )

    # Function to copy files to the respective directories
    def copy_files(files, dest_folder):
        for file in files:
            shutil.copy(file, dest_folder)

    # Copy the files to the respective directories
    copy_files(train_images, train_images_folder)
    copy_files(train_labels, train_labels_folder)
    copy_files(val_images, val_images_folder)
    copy_files(val_labels, val_labels_folder)

    print("Data split and copied successfully.")

In [None]:
model = YOLO("yolov10n.pt")

model.train(data="/cluster/home/jofa/tdt17/TDT17-mini-project/data/data.yaml",
                      epochs=1,
                      batch=1,
                      workers=1
                      )
model.val(batch=1,)


In [None]:
# data_yaml_path = Path.cwd() / "data" / "data.yaml"

# if data_yaml_path.exists():
# 	try:
# 		results = model.train(data=str(data_yaml_path), epochs=3)
# 	except Exception as e:
# 		print(f"An error occurred during training: {e}")
# else:
# 	print(f"data.yaml file not found at {data_yaml_path}")

In [None]:
# results = model.val()