SinkSAM Data preperation

In [None]:
import os
import random
import shutil

def split_dataset(source_folder, train_folder, valid_folder, train_ratio=0.7):
    # Ensure the output directories exist
    os.makedirs(train_folder, exist_ok=True)
    os.makedirs(valid_folder, exist_ok=True)

    # Get a list of all TIFF files in the source folder
    tiff_files = [f for f in os.listdir(source_folder) if f.endswith('.tiff') or f.endswith('.tif')]

    # Shuffle the list to ensure randomness
    random.shuffle(tiff_files)

    # Calculate the split index
    split_index = int(len(tiff_files) * train_ratio)

    # Split the files into train and validation sets
    train_files = tiff_files[:split_index]
    valid_files = tiff_files[split_index:]

    # Copy the files to their respective directories
    for file in train_files:
        shutil.copy(os.path.join(source_folder, file), os.path.join(train_folder, file))

    for file in valid_files:
        shutil.copy(os.path.join(source_folder, file), os.path.join(valid_folder, file))

    print(f"Total files: {len(tiff_files)}")
    print(f"Train files: {len(train_files)}")
    print(f"Validation files: {len(valid_files)}")

# Define source and destination folders
source_folder = "D:/Osher/ann_osher/sinkholes_yolo_dataset/images_6bands/"
train_folder = "D:/Osher/ann_osher/sinkholes_yolo_dataset/train_6bands/"
valid_folder = "D:/Osher/ann_osher/sinkholes_yolo_dataset/valid_6bands/"

# Call the function to split the dataset
split_dataset(source_folder, train_folder, valid_folder)


In [None]:
import os ## new one
import numpy as np
import tifffile
import rasterio

# Directory containing the TIFF images
input_folder = "D:/Osher/ann_osher/sinkholes_yolo_dataset/valid_6bands/"
output_folder_rgb = "D:/Osher/ann_osher/sinkholes_yolo_dataset/input/valid_images"
output_folder_masks1 = "D:/Osher/ann_osher/sinkholes_yolo_dataset/input/valid_masks/Sinkholes"

# Ensure output directories exist
os.makedirs(output_folder_rgb, exist_ok=True)
os.makedirs(output_folder_masks1, exist_ok=True)

# Function to convert images to 8-bit
def convert_to_8bit(image):
    return image.astype(np.uint8)

# Iterate through each file in the input folder
for filename in os.listdir(input_folder):
    if filename.endswith(".tif") or filename.endswith(".tiff"):
        # Open the image
        image_path = os.path.join(input_folder, filename)
        
        try:
            # Load the TIFF image using tifffile
            image = tifffile.imread(image_path)

            # Check if the image has at least 6 bands
            if image.shape[-1] >= 6:
                # Extract the bands
                band_1 = convert_to_8bit(image[..., 0])
                band_2 = convert_to_8bit(image[..., 1])
                band_3 = convert_to_8bit(image[..., 2])
                band_5 = convert_to_8bit(image[..., 4])
                band_6 = convert_to_8bit(image[..., 5])

                # Check if mask1 (band 5) has any values greater than 1 or all values are zero
                if np.any(band_5 > 1) or np.all(band_5 == 0):
                    print(f"Mask1 (band 5) in file {image_path} contains values greater than 1 or all values are zero. Skipping RGB and mask output.")
                    continue

                # Create the RGB image using bands 1, 2, and 3
                rgb_image = np.stack([band_1, band_2, band_3], axis=-1)

                # Save the RGB image
                output_rgb_path = os.path.join(output_folder_rgb, filename)
                with rasterio.open(output_rgb_path, 'w', driver='GTiff', height=rgb_image.shape[0], width=rgb_image.shape[1], count=3, dtype='uint8', photometric='RGB') as dst:
                    for i in range(3):
                        dst.write(rgb_image[:, :, i], i + 1)

                # Save the mask1 (band 5)
                mask1_image = band_5
                output_mask1_path = os.path.join(output_folder_masks1, filename)
                with rasterio.open(output_mask1_path, 'w', driver='GTiff', height=mask1_image.shape[0], width=mask1_image.shape[1], count=1, dtype='uint8') as dst:
                    dst.write(mask1_image, 1)

            else:
                print(f"Image file {image_path} does not have at least 6 bands.")
        except Exception as e:
            print(f"Error processing image file {image_path}: {e}")

print("Processing completed.")


In [None]:
import os ##convert labels to 8 bit
import numpy as np
import rasterio
from rasterio.enums import Resampling

def process_tiff_folder(input_folder, output_folder):
    """
    Process all TIFF files in the input folder, convert all non-zero values to 1, and save them in 8-bit format.
    
    :param input_folder: Path to the folder containing the input TIFF files
    :param output_folder: Path to the folder to save the processed TIFF files
    """
    os.makedirs(output_folder, exist_ok=True)
    
    for filename in os.listdir(input_folder):
        if filename.endswith(".tif") or filename.endswith(".tiff"):
            input_path = os.path.join(input_folder, filename)
            output_path = os.path.join(output_folder, filename)
            
            with rasterio.open(input_path) as src:
                # Read the image data
                image = src.read(1)
                transform = src.transform
                crs = src.crs
            
            # Convert all non-zero values to 1
            processed_image = np.where(image != 0, 1, 0).astype(np.uint8)
            
            # Save the processed image
            with rasterio.open(
                output_path,
                'w',
                driver='GTiff',
                height=processed_image.shape[0],
                width=processed_image.shape[1],
                count=1,
                dtype='uint8',
                crs=crs,
                transform=transform
            ) as dst:
                dst.write(processed_image, 1)
            
            print(f"Processed {filename} and saved to {output_path}")


# Example usage
input_folder = "D:/Osher/ann_osher/YOLO_DATABASE/input/val_masks/Sinkholes/"
output_folder = "D:/Osher/ann_osher/YOLO_DATABASE/input/val_masks/Sinkholes/"

process_tiff_folder(input_folder, output_folder)


In [None]:
import glob
import json
import os
import cv2

# Label IDs of the dataset representing different categories
category_ids = {
    "Sinkholes": 1
}

MASK_EXT = 'tif'
ORIGINAL_EXT = 'tif'
image_id = 0
annotation_id = 0

def images_annotations_info(maskpath):
    """
    Process the binary masks and generate images and annotations information.

    :param maskpath: Path to the directory containing binary masks
    :return: Tuple containing images info, annotations info, and annotation count
    """
    global image_id, annotation_id
    annotations = []
    images = []

    # Iterate through categories and corresponding masks
    for category in category_ids.keys():
        for mask_image in glob.glob(os.path.join(maskpath, category, f'*.{MASK_EXT}')):
            original_file_name = f'{os.path.basename(mask_image).split(".")[0]}.{ORIGINAL_EXT}'
            mask_image_open = cv2.imread(mask_image)
            
            # Check if the image was loaded successfully
            if mask_image_open is None:
                print(f"Error loading image: {mask_image}")
                continue
            
            # Get image dimensions
            height, width, _ = mask_image_open.shape

            # Create or find existing image annotation
            if original_file_name not in map(lambda img: img['file_name'], images):
                image = {
                    "id": image_id + 1,
                    "width": width,
                    "height": height,
                    "file_name": original_file_name,
                }
                images.append(image)
                image_id += 1
            else:
                image = [element for element in images if element['file_name'] == original_file_name][0]

            # Find contours in the mask image
            gray = cv2.cvtColor(mask_image_open, cv2.COLOR_BGR2GRAY)
            _, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
            contours = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)[0]

            # Create annotation for each contour
            for contour in contours:
                bbox = cv2.boundingRect(contour)
                area = cv2.contourArea(contour)
                segmentation = contour.flatten().tolist()

                annotation = {
                    "iscrowd": 0,
                    "id": annotation_id,
                    "image_id": image['id'],
                    "category_id": category_ids[category],
                    "bbox": bbox,
                    "area": area,
                    "segmentation": [segmentation],
                }

                # Add annotation if area is greater than zero and not a duplicate
                if area > 0 and annotation not in annotations:
                    annotations.append(annotation)
                    annotation_id += 1

    return images, annotations, annotation_id


def process_masks(mask_path, dest_json):
    global image_id, annotation_id
    image_id = 0
    annotation_id = 0

    # Initialize the COCO JSON format with categories
    coco_format = {
        "info": {},
        "licenses": [],
        "images": [],
        "categories": [{"id": value, "name": key, "supercategory": key} for key, value in category_ids.items()],
        "annotations": [],
    }

    # Create images and annotations sections
    coco_format["images"], coco_format["annotations"], annotation_cnt = images_annotations_info(mask_path)

    # Save the COCO JSON to a file
    with open(dest_json, "w") as outfile:
        json.dump(coco_format, outfile, sort_keys=True, indent=4)

    print("Created %d annotations for images in folder: %s" % (annotation_cnt, mask_path))


if __name__ == "__main__":
    train_mask_path = "D:/Osher/ann_osher/sinkholes_yolo_dataset/input/train_masks/"
    train_json_path = "D:/Osher/ann_osher/sinkholes_yolo_dataset/input/train_images/train.json"
    process_masks(train_mask_path, train_json_path)

    val_mask_path = "D:/Osher/ann_osher/sinkholes_yolo_dataset/input/val_masks/"
    val_json_path = "D:/Osher/ann_osher/sinkholes_yolo_dataset/input/val_images/val.json"
    process_masks(val_mask_path, val_json_path)


In [None]:
import json
import os
import shutil
import yaml

# Function to convert images to YOLO format
def convert_to_yolo(input_images_path, input_json_path, output_images_path, output_labels_path):
    # Open JSON file containing image annotations
    f = open(input_json_path)
    data = json.load(f)
    f.close()

    # Create directories for output images and labels
    os.makedirs(output_images_path, exist_ok=True)
    os.makedirs(output_labels_path, exist_ok=True)

    # List to store filenames
    file_names = []
    for filename in os.listdir(input_images_path):
        if filename.endswith(".tif"):
            source = os.path.join(input_images_path, filename)
            destination = os.path.join(output_images_path, filename)
            shutil.copy(source, destination)
            file_names.append(filename)

    # Function to get image annotations
    def get_img_ann(image_id):
        return [ann for ann in data['annotations'] if ann['image_id'] == image_id]

    # Function to get image data
    def get_img(filename):
        return next((img for img in data['images'] if img['file_name'] == filename), None)

    # Iterate through filenames and process each image
    for filename in file_names:
        img = get_img(filename)
        img_id = img['id']
        img_w = img['width']
        img_h = img['height']
        img_ann = get_img_ann(img_id)

        # Write normalized polygon data to a text file
        if img_ann:
            with open(os.path.join(output_labels_path, f"{os.path.splitext(filename)[0]}.txt"), "a") as file_object:
                for ann in img_ann:
                    current_category = ann['category_id'] - 1
                    polygon = ann['segmentation'][0]
                    normalized_polygon = [format(coord / img_w if i % 2 == 0 else coord / img_h, '.6f') for i, coord in enumerate(polygon)]
                    file_object.write(f"{current_category} " + " ".join(normalized_polygon) + "\n")

# Function to create a YAML file for the dataset
def create_yaml(input_json_path, output_yaml_path, train_path, val_path, test_path=None):
    with open(input_json_path) as f:
        data = json.load(f)
    
    # Extract the category names
    names = [category['name'] for category in data['categories']]
    
    # Number of classes
    nc = len(names)

    # Create a dictionary with the required content
    yaml_data = {
        'names': names,
        'nc': nc,
        'test': test_path if test_path else '',
        'train': train_path,
        'val': val_path
    }

    # Write the dictionary to a YAML file
    with open(output_yaml_path, 'w') as file:
        yaml.dump(yaml_data, file, default_flow_style=False)


if __name__ == "__main__":
    base_input_path = "D:/Osher/ann_osher/sinkholes_yolo_dataset/input/"
    base_output_path = "D:/Osher/ann_osher/sinkholes_yolo_dataset/yolo_dataset/"

    # Processing validation dataset (if needed)
    convert_to_yolo(
        input_images_path=os.path.join(base_input_path, "val_images"),
        input_json_path=os.path.join(base_input_path, "val_images/val.json"),
        output_images_path=os.path.join(base_output_path, "valid/images"),
        output_labels_path=os.path.join(base_output_path, "valid/labels")
    )

    # Processing training dataset 
    convert_to_yolo(
        input_images_path=os.path.join(base_input_path, "train_images"),
        input_json_path=os.path.join(base_input_path, "train_images/train.json"),
        output_images_path=os.path.join(base_output_path, "train/images"),
        output_labels_path=os.path.join(base_output_path, "train/labels")
    )
    
    # Creating the YAML configuration file
    create_yaml(
        input_json_path=os.path.join(base_input_path, "train_images/train.json"),
        output_yaml_path=os.path.join(base_output_path, "data.yaml"),
        train_path="D:/Osher/ann_osher/sinkholes_yolo_dataset/yolo_dataset/train/images/",
        val_path="D:/Osher/ann_osher/sinkholes_yolo_dataset/yolo_dataset/valid/images/",
        test_path='../test/images'  # or None if not applicable
    )

In [None]:
import os ## genetate bbox from computed closed depressions
import numpy as np
import rasterio
from scipy.ndimage import label

def mask_to_bboxes(mask, class_id=0, confidence=1.0):
    """
    Convert a mask to bounding boxes for each connected component.
    
    :param mask: Numpy array of the mask
    :param class_id: Class ID to assign to each bounding box
    :param confidence: Confidence score to assign to each bounding box
    :return: List of bounding boxes in the format [class_id, x_min, y_min, x_max, y_max, confidence]
    """
    labeled_mask, num_features = label(mask)
    bboxes = []
    for i in range(1, num_features + 1):
        coords = np.argwhere(labeled_mask == i)
        if len(coords) > 0:
            y_min, x_min = coords.min(axis=0)
            y_max, x_max = coords.max(axis=0)
            bboxes.append([class_id, x_min, y_min, x_max, y_max, confidence])
    return bboxes

def save_bboxes_to_txt(bboxes, file_path):
    """
    Save bounding boxes to a text file.
    
    :param bboxes: List of bounding boxes in the format [class_id, x_min, y_min, x_max, y_max, confidence]
    :param file_path: Path to the text file
    """
    with open(file_path, 'w') as file:
        for bbox in bboxes:
            file.write(f"{bbox[0]} {bbox[1]} {bbox[2]} {bbox[3]} {bbox[4]} {bbox[5]}\n")

def process_masks_and_save_bboxes(mask_folder, output_folder):
    """
    Process masks and save bounding boxes to text files.
    
    :param mask_folder: Path to the folder containing masks
    :param output_folder: Path to the folder to save the bounding boxes files
    """
    os.makedirs(output_folder, exist_ok=True)

    for filename in os.listdir(mask_folder):
        if filename.endswith(".tif") or filename.endswith(".tiff"):
            mask_path = os.path.join(mask_folder, filename)

            # Read the mask
            with rasterio.open(mask_path) as src:
                mask = src.read(1)

            # Convert mask to bounding boxes
            bboxes = mask_to_bboxes(mask)

            # Ensure at least one bbox is created
            if not bboxes:
                # If no bboxes, create a bbox around the center of the mask as a fallback
                center_y, center_x = mask.shape[0] // 2, mask.shape[1] // 2
                bboxes = [[0, center_x-1, center_y-1, center_x+1, center_y+1, 1.0]]

            # Save original bounding boxes to text file
            output_txt_path = os.path.join(output_folder, filename.replace(".tif", ".txt").replace(".tiff", ".txt"))
            save_bboxes_to_txt(bboxes, output_txt_path)

            print(f"Processed {filename} and saved bounding boxes to {output_txt_path}")

# Define the folder paths
mask_folder = "D:/Osher/ann_osher/sinkholes_yolo_dataset/final_test_512_new/images2/"
output_folder = "D:/Osher/ann_osher/sinkholes_yolo_dataset/final_test_512_new/images2/boxes_from_masks"

# Process the masks and save bounding boxes
process_masks_and_save_bboxes(mask_folder, output_folder)
