In [1]:
import json
import numpy as np
import random
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import shutil
import torch
import torchvision.transforms as T
import os
import re

from tifffile import tifffile 
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import ImageFolder
from PIL import Image

In [2]:
# to delete - Illustration of some samples coming from the dataset
json_file_path = 'Data/large_rock_dataset.json'

# Load the JSON file
with open(json_file_path, 'r') as file:
    data = json.load(file)
print('General information about the data:', data['info'])
dataset =data['dataset']
print('Number of samples  :', len(dataset) )
sample_info = dataset[10]
print('Looking at the the first images:', sample_info ['file_name'])
print('Looking at rocks annotations for the first images:\n', sample_info ['rocks_annotations'][0])

General information about the data: {'description': 'Large Rocks Detection Dataset ', 'version': '1.0', 'year': 2024, 'contributor': 'Valerie Zermatten', 'date_created': '2024/09/30'}
Number of samples  : 992
Looking at the the first images: 2581_1126_2_2.tif
Looking at rocks annotations for the first images:
 {'rock_id': 1459.0, 'abs_rock_coordinates': [2581767.93, 1126509.48], 'pixel_within_patch_coordinates': [608.0, 51.0], 'relative_within_patch_location': [0.95, 0.08]}


In [3]:
# to delete - vis
unique_splits = set(sample['split'] for sample in dataset)
print("Unique values in 'split':", unique_splits)

Unique values in 'split': {'train', 'test'}


In [4]:
# to delete - Dataset organisation for hillshade 

# TRAIN/TEST
# base_dir = 'dataset_surface_hillshade'
base_dir = 'dataset_swissimage'
train_images_folder = os.path.join(base_dir, 'train_images')
test_dir = os.path.join(base_dir, 'test')

# Create directories if they don't exist
os.makedirs(base_dir, exist_ok=True)
os.makedirs(train_images_folder, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

## DONE:

# Iterate through all samples
# for sample in dataset:
#     file_name = 'Data/swissImage_50cm_patches/' + sample['file_name']
#     split = sample['split']  # Assuming the "split" key indicates train/test/val

#     # Define source and destination paths
#     src_path = file_name  # Assuming file_name contains the full or relative path
#     if split == 'train':
#         dest_dir = train_images_folder
#     elif split == 'test':
#         dest_dir = test_dir
#     # elif split == 'val':
#     #     dest_dir = val_dir
#     else:
#         print(f"Unknown split '{split}' for file '{file_name}'. Skipping.")
#         continue

#     dest_path = os.path.join(dest_dir, os.path.basename(file_name))

#     # Copy file to the appropriate directory
#     try:
#         shutil.copy(src_path, dest_path)
#         print(f"Copied '{file_name}' to '{dest_dir}'")
#     except Exception as e:
#         print(f"Error copying '{file_name}': {e}")

# print("Dataset split completed.")


In [5]:
# to delete - Dataset organisation for hillshade 

# train_labels
train_labels_folder = os.path.join(base_dir, 'train_labels')

# Create the train_labels directory if it doesn't exist
os.makedirs(train_labels_folder, exist_ok=True)

# DONE:

# Process images with split == 'train'
# for sample in dataset:
#     if sample['split'] == 'train':
#         # Extract relevant details
#         file_name = sample['file_name']
#         annotations = sample.get('rocks_annotations', [])
        
#         # Create a .txt file for this image
#         base_name = os.path.splitext(os.path.basename(file_name))[0]
#         txt_file_path = os.path.join(train_labels_folder, f"{base_name}.txt")
        
#         # Write annotations to the .txt file
#         with open(txt_file_path, 'w') as txt_file:
#             for annotation in annotations:
#                 txt_file.write(f"{annotation}\n")
        
#         print(f"Created annotation file: {txt_file_path}")

# print("All train annotations have been saved to the 'train_labels' folder.")


In [6]:
# to delete - Dataset organisation for hillshade 

# Create Validation Set - images
file_count = len([file for file in os.listdir(train_images_folder) if os.path.isfile(os.path.join(train_images_folder, file))])
print(f"Number of images in train set: {file_count}")

# Define folder
val_images_folder = os.path.join(base_dir, 'val_images')
os.makedirs(val_images_folder, exist_ok=True)

# DONE:

# List all files in the source folder
# files = [file for file in os.listdir(train_images_folder) if os.path.isfile(os.path.join(train_images_folder, file))]

# # Calculate 10% of the total files
# num_files_to_move = max(1, int(len(files) * 0.1))  # Ensure at least one file is moved

# # Randomly select 10% of the files
# files_to_move = random.sample(files, num_files_to_move)

# # Move the selected files
# for file in files_to_move:
#     src_path = os.path.join(train_images_folder, file)
#     dest_path = os.path.join(val_images_folder, file)
#     shutil.move(src_path, dest_path)
#     print(f"Moved '{file}' to '{val_images_folder}'")

# print(f"Moved {len(files_to_move)} files to '{val_images_folder}'.")


Number of images in train set: 576


In [7]:
# to delete - Dataset organisation for hillshade 

# Create Validation Set - labels
val_labels_folder = os.path.join(base_dir, 'val_labels')
os.makedirs(val_labels_folder, exist_ok=True)

# DONE:

# List all image files in val_images folder (excluding extensions)
# val_image_files = {os.path.splitext(file)[0] for file in os.listdir(val_images_folder) if os.path.isfile(os.path.join(val_images_folder, file))}

# # Move matching label files from train_labels to val_labels
# for label_file in os.listdir(train_labels_folder):
#     # Get the base name (without extension) of the label file
#     base_name = os.path.splitext(label_file)[0]
    
#     if base_name in val_image_files:
#         src_path = os.path.join(train_labels_folder, label_file)
#         dest_path = os.path.join(val_labels_folder, label_file)
#         shutil.move(src_path, dest_path)
#         print(f"Moved '{label_file}' to '{val_labels_folder}'")

# print("Matching label files moved to 'val_labels' folder.")




In [8]:
# to delete (?) - DONE:

# Path to the folder containing .tif images
# folder_path = 'dataset_swissimage/val_images'

# Iterate through all files in the folder
# for file in os.listdir(folder_path):
#     if file.endswith('.tif'):
#         # Full path to the .tif file
#         tif_path = os.path.join(folder_path, file)
        
#         # Open the .tif file
#         try:
#             with Image.open(tif_path) as img:
#                 # Define the output path with the same name but .jpg extension
#                 jpg_path = os.path.join(folder_path, file.replace('.tif', '.jpg'))
                
#                 # Convert and save as JPG
#                 img.convert('RGB').save(jpg_path, 'JPEG')
                
#                 # Remove the original .tif file
#                 os.remove(tif_path)
#                 print(f"Converted and replaced: {file} -> {jpg_path}")
        
#         except Exception as e:
#             print(f"Error processing {file}: {e}")

# print("All .tif files have been converted to .jpg and replaced.")

In [9]:
# to delete (?) - label in form for Yolo8

# Input and output directories
label_input_folder = train_labels_folder  # Folder containing original label files
label_output_folder = os.path.join(base_dir, 'yolo_train_labels')  # Folder for YOLO-compliant labels
os.makedirs(label_output_folder, exist_ok=True)

# DONE:

# YOLOv8 assumes constant bbox size based on your description
# bbox_width = 10 / 640  # Normalized width
# bbox_height = 10 / 640  # Normalized height

# Process each label file
# for label_file in os.listdir(label_input_folder):
#     if label_file.endswith('.txt'):  # Process only text files
#         input_path = os.path.join(label_input_folder, label_file)
#         output_path = os.path.join(label_output_folder, label_file)

#         with open(input_path, 'r') as infile, open(output_path, 'w') as outfile:
#             # Read each line in the file and extract dictionaries
#             for line in infile:
#                 line = line.strip()
#                 if not line:
#                     continue
                
#                 # Parse the dictionary using regex
#                 match = re.search(r"'relative_within_patch_location': \[(\d+\.\d+), (\d+\.\d+)\]", line)
#                 if match:
#                     x_center = float(match.group(1))  # Normalized x_center
#                     y_center = float(match.group(2))  # Normalized y_center

#                     # Write to YOLO format: class_id, x_center, y_center, width, height
#                     class_id = 0  # Assuming 'rock' class is class 0
#                     outfile.write(f"{class_id} {x_center:.6f} {y_center:.6f} {bbox_width:.6f} {bbox_height:.6f}\n")
        
#         print(f"Processed: {label_file}")

# print("Conversion to YOLOv8 format completed.")

In [20]:
# DATA LOADER - to put in python

# Custom Dataset Class
class RockDetectionDataset(Dataset):
    def __init__(self, image_folder, label_folder):
        self.image_folder = image_folder
        self.label_folder = label_folder
        # Transforms for images
        self.transform = T.Compose([
            T.Resize((640, 640)), # Resize all images to 640x640
            T.ToTensor(),  # Convert image to [0, 1] range and tensor
        ])
        
        # List all images and sort to match labels
        self.image_files = sorted([f for f in os.listdir(image_folder) if f.endswith('.jpg')])
        self.label_files = sorted([f for f in os.listdir(label_folder) if f.endswith('.txt')])

    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        # Load image
        img_path = os.path.join(self.image_folder, self.image_files[idx])
        image = Image.open(img_path).convert('RGB')  # Ensure 3 channels (RGB)
        # image = self.transform(image)
        
        # Load label
        label_path = os.path.join(self.label_folder, self.label_files[idx])
        with open(label_path, 'r') as f:
            labels = []
            for line in f:
                class_id, x_center, y_center, width, height = map(float, line.strip().split())
                labels.append([class_id, x_center, y_center, width, height])
        
        # Convert labels to tensor
        labels = torch.tensor(labels, dtype=torch.float32)
        
        # Apply transformations to the image
        if self.transform:
            image = self.transform(image)
        
        return image, labels

In [21]:
# MAIN

# Paths to folders
train_images_folder = "dataset_swissimage/train_images"
train_labels_folder = "dataset_swissimage/yolo_train_labels"

# Create Dataset
train_dataset = RockDetectionDataset(train_images_folder, train_labels_folder)

# Use DataLoader to load the dataset in batches
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False, num_workers=0)

In [19]:
# NORMALISATION - python 

# Variables to accumulate pixel statistics
mean_sum = torch.zeros(3)
std_sum = torch.zeros(3)
pixel_count = 0

# Calculate mean
print("Calculating mean and standard deviation...")
for batch in train_loader:
    images, _ = batch  # Unpack the images and labels
    # images shape: [batch_size, 3, H, W]
    batch_pixels = images.size(0) * images.size(2) * images.size(3)  # Number of pixels per batch
    mean_sum += torch.sum(images, dim=[0, 2, 3])  # Sum across batch, height, width
    std_sum += torch.sum(images ** 2, dim=[0, 2, 3])  # Sum of squared values
    pixel_count += batch_pixels

# Final mean and std calculation
mean = mean_sum / pixel_count
std = torch.sqrt((std_sum / pixel_count) - mean ** 2)

print(f"Mean: {mean}")
print(f"Standard Deviation: {std}")


Calculating mean and standard deviation...


TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class 'PIL.Image.Image'>

In [None]:
# Data Augmentation (PHOUF)

def transform_train_with_labels(image, label):
    
    # Geometric transformation applied to both image and label
    geometric_transforms =[T.RandomVerticalFlip(p=1),
                           T.RandomHorizontalFlip(p=1),
                           T.RandomRotation(degrees = (90,90)),
                           T.RandomRotation(degrees = (180,180)),
                           T.RandomRotation(degrees = (270,270))]
    
    geometric_transforms_names = ["vertical_flip", "horizontal_flip", "rotation_90", "rotation_180", "rotation_270"]
    
    # Randomly select 3 transformations
    selected_transforms = random.sample(list(zip(geometric_transforms, geometric_transforms_names)), 3)

    # Extract the selected transformations and their names
    geometric_transforms, geometric_transforms_names = zip(*selected_transforms)
    geometric_transforms = list(geometric_transforms)
    geometric_transforms_names = list(geometric_transforms_names)

    new_images = [transform(image) for transform in geometric_transforms]
    new_labels = [transform(label) for transform in geometric_transforms]
    
    # Other transformation applied only to the image
    if random.random() < 0.5:
      other_transforms = [T.GaussianBlur(kernel_size=3, sigma=(0.1, 0.5))]
      other_transforms_name = ["gaussian_blur"]
    else:
      other_transforms = []
      other_transforms_name = []
    
    new_images = new_images +  [transform(image) for transform in other_transforms]
    new_labels = new_labels + [label for _ in other_transforms]     
          
    return new_images, new_labels, geometric_transforms_names , other_transforms_name

In [None]:
# DATA AUG - EXO 7

import torchvision.transforms as T
import numpy as np

# mean and standard deviation of the dataset
mean=torch.tensor([0.504, 0.504, 0.503])
std=torch.tensor([0.019 , 0.018, 0.018])

# normalize image [0-1] (or 0-255) to zero-mean unit standard deviation
normalize = T.Normalize(mean, std)
# we invert normalization for plotting later
std_inv = 1 / (std + 1e-7)
unnormalize = T.Normalize(-mean * std_inv, std_inv)

transforms_train = T.Compose([
  #TODO: add your own transforms here
  T.RandomResizedCrop((200, 200)),
  T.RandomGrayscale(),
  T.RandomHorizontalFlip(),
  T.RandomApply([T.GaussianBlur(kernel_size=7)]),
  T.RandomPosterize(bits=8),
  T.RandomVerticalFlip(),
  T.ColorJitter(),
  T.Resize((224, 224)),
  T.ToTensor(),
  normalize
])

# we do not augment the validation dataset (aside from resizing and tensor casting)
transforms_val = T.Compose([
  T.Resize((224, 224)),
  T.ToTensor(),
  normalize
])

# Test
dataset_index = 500

img, label = dataset[dataset_index]

fig, axs = plt.subplots(1,2, figsize=(12,6))
axs[0].imshow(unnormalize(transforms_val(img)).permute(1,2,0))
axs[0].set_title("validation transform (no augmentation)")

axs[1].imshow(unnormalize(transforms_train(img)).permute(1,2,0))
axs[1].set_title("training transform")
[ax.axis("off") for ax in axs] # removes ticks