In [None]:
!pip install astropy==6.1.4 parfive==2.1.0 Glymur==0.13.6 aiapy==0.7.4 numpy hvpy opencv-python==4.10.0.84 ultralytics torch wandb matplotlib==3.7.1 sunpy "numpy<2.0" --no-cache-dir h5py pycocotools

In [None]:
# Built-in libraries
import os
import json
from datetime import datetime
from pathlib import Path

# Third-party libraries
import aiohttp
import astropy.units as u
import cv2
import hvpy
import numpy as np
import pandas as pd
import requests
from PIL import Image
from astropy.coordinates import SkyCoord
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from sunpy.map import Map
from aiapy.calibrate import normalize_exposure
from ultralytics import YOLO

In [None]:
# Directory setup
base_folder = 'data/roloro'
jp2_folder = os.path.join(base_folder, 'jp2')
train_images_folder = os.path.join(base_folder, 'train/images')
train_labels_folder = os.path.join(base_folder, 'train/labels')
val_images_folder = os.path.join(base_folder, 'val/images')
val_labels_folder = os.path.join(base_folder, 'val/labels')
test_images_folder = os.path.join(base_folder, 'test/images')
test_labels_folder = os.path.join(base_folder, 'test/labels')
annotated_folder = os.path.join(base_folder, 'annotated')

# Create folders if they do not exist
os.makedirs(jp2_folder, exist_ok=True)
os.makedirs(train_images_folder, exist_ok=True)
os.makedirs(train_labels_folder, exist_ok=True)
os.makedirs(val_images_folder, exist_ok=True)
os.makedirs(val_labels_folder, exist_ok=True)
os.makedirs(test_images_folder, exist_ok=True)
os.makedirs(test_labels_folder, exist_ok=True)
os.makedirs(annotated_folder, exist_ok=True)

output_folders = [
    (train_images_folder, train_labels_folder),
    (val_images_folder, val_labels_folder),
    (test_images_folder, test_labels_folder)
]

# Load the dataset
data = pd.read_csv("ribbondb_v1.0.csv")
source_id = hvpy.getDataSources()['SDO']['AIA']['1600']['sourceId']

# Define the number of rows to process (set to None to process all rows)
num_rows = 10  # Set to None to process the entire dataset

# Determine the number of rows to process
if num_rows is None:
    num_rows = len(data)

print(f"Processing {num_rows} rows from the dataset...")

# Helper function to determine output folder based on index
def get_output_folder(index):
    # Assign 70% for train, 20% for val, 10% for test
    if index % 10 < 7:
        return output_folders[0]  # Train
    elif index % 10 < 9:
        return output_folders[1]  # Validation
    else:
        return output_folders[2]  # Test

# Helper function to check files in folders
def check_files_in_folders(start_time, end_time):
    # Generate a list of expected file names based on the time range
    time_range = pd.date_range(start=start_time, end=end_time, freq='min')
    expected_files = [f"{timestamp.strftime('%Y-%m-%dT%H-%M-%S')}_AIA1600.png" for timestamp in time_range]
    
    # Dictionary to store folder locations for each file
    file_locations = {file: [] for file in expected_files}
    
    # Search for each file in the output folders
    for images_folder, _ in output_folders:
        for file in expected_files:
            if os.path.exists(os.path.join(images_folder, file)):
                file_locations[file].append(images_folder)
    
    # Check if files from the same interval are in multiple folders
    for file, locations in file_locations.items():
        if len(locations) > 1:
            print(f"File {file} is found in multiple folders: {locations}")
    
    # Return True if all files are in a single folder
    return all(len(locations) <= 1 for locations in file_locations.values())

# Step 1: Download and preprocess images
print("Starting image download and preprocessing...")
for index, row in data.iterrows():
    if index >= num_rows:
        break

    lat = row[' LAT [deg]']
    lon = row[' LON [deg]']
    start_time = f"{row[' TSTART [UT]']}:00"
    end_time = f"{row[' TFINAL [UT]']}:00"
    
    # Create time range
    time_range = pd.date_range(start=start_time, end=end_time, freq='min')

    # Define bounding box coordinates in SkyCoord
    top_right = SkyCoord((lon - 20) * u.deg, (lat + 20) * u.deg, frame='heliographic_stonyhurst')
    bottom_left = SkyCoord((lon + 20) * u.deg, (lat - 20) * u.deg, frame='heliographic_stonyhurst')

    for timestamp in time_range:
        date_str = timestamp.strftime('%Y-%m-%dT%H:%M:%S')
        jp2_filename = os.path.join(jp2_folder, f'{date_str.replace(":", "-")}_AIA1600.jp2')
        
        # Check if JP2 file already exists 
        if os.path.exists(jp2_filename):
            print(f"File already exists: {jp2_filename}. Skipping download.")
            continue

print("Image download completed.")

# Step 2: Process images to PNG and create YOLO annotations
print("Starting image processing and annotation creation...")
for index, row in data.iterrows():
    if index >= num_rows:
        break

    lat = row[' LAT [deg]']
    lon = row[' LON [deg]']
    start_time = f"{row[' TSTART [UT]']}:00"
    end_time = f"{row[' TFINAL [UT]']}:00"
    
    # Create time range
    time_range = pd.date_range(start=start_time, end=end_time, freq='min')

    # Determine the output folder (train/val/test)
    images_folder, labels_folder = get_output_folder(index)

    for timestamp in time_range:
        date_str = timestamp.strftime('%Y-%m-%dT%H:%M:%S')
        jp2_filename = os.path.join(jp2_folder, f'{date_str.replace(":", "-")}_AIA1600.jp2')
        
        # Check if JP2 file exists
        if not os.path.exists(jp2_filename):
            print(f"JP2 file not found: {jp2_filename}. Skipping.")
            continue

        try:
            # Process image with sunpy
            solar_map = Map(jp2_filename)
            solar_map = normalize_exposure(solar_map)
            
            # Convert image data to 8-bit PNG
            data = solar_map.data.astype(np.float32)
            data = np.nan_to_num(data, nan=0.0, posinf=0.0, neginf=0.0)
            data = ((data - data.min()) / (data.max() - data.min()) * 255).astype(np.uint8)
            image_filename = os.path.join(images_folder, f'{date_str.replace(":", "-")}_AIA1600.png')

            # Ensure the folder for the image exists
            os.makedirs(os.path.dirname(image_filename), exist_ok=True)

            # Save the image
            Image.fromarray(data).save(image_filename)

            # Calculate YOLO annotations
            orig_img_height, orig_img_width = data.shape
            bottom_left = SkyCoord((lon + 20) * u.deg, (lat - 20) * u.deg, frame=solar_map.coordinate_frame)
            top_right = SkyCoord((lon - 20) * u.deg, (lat + 20) * u.deg, frame=solar_map.coordinate_frame)
            bottom_left_pix = solar_map.world_to_pixel(bottom_left)
            top_right_pix = solar_map.world_to_pixel(top_right)

            x_center = ((bottom_left_pix.x.value + top_right_pix.x.value) / 2) / orig_img_width
            y_center = ((bottom_left_pix.y.value + top_right_pix.y.value) / 2) / orig_img_height
            box_width = abs(top_right_pix.x.value - bottom_left_pix.x.value) / orig_img_width
            box_height = abs(top_right_pix.y.value - bottom_left_pix.y.value) / orig_img_height

            # Save YOLO annotation
            yolo_filename = os.path.join(labels_folder, f'{date_str.replace(":", "-")}_AIA1600.txt')
            os.makedirs(os.path.dirname(yolo_filename), exist_ok=True)
            with open(yolo_filename, 'w') as f:
                f.write(f'0 {x_center} {y_center} {box_width} {box_height}\n')
        except Exception as e:
            print(f"Error processing image {jp2_filename}: {e}")
            continue

print("Image processing and annotation creation completed.")

# Step 3: Validate file organization
print("Starting validation of file organization...")
data = pd.read_csv("ribbondb_v1.0.csv")
for idx, row in data.iterrows():
    start_time = pd.to_datetime(row[' TSTART [UT]'])
    end_time = pd.to_datetime(row[' TFINAL [UT]'])
    
    # Check files for this time interval
    if not check_files_in_folders(start_time, end_time):
        print(f"Error: Files for row {idx} (interval {start_time} to {end_time}) are distributed across multiple folders!")

print("Validation completed.")



In [None]:
# Directories
output_folders = ['data/train/images', 'data/val/images', 'data/test/images']

# Helper function to check files in folders
def check_files_in_folders(start_time, end_time):
    # Generate a list of expected file names based on the time range
    time_range = pd.date_range(start=start_time, end=end_time, freq='min')
    expected_files = [f"{timestamp.strftime('%Y-%m-%dT%H-%M-%S')}_AIA1600.png" for timestamp in time_range]
    
    # Dictionary to store folder locations for each file
    file_locations = {file: [] for file in expected_files}
    
    # Search for each file in the output folders
    for folder in output_folders:
        for file in expected_files:
            if os.path.exists(os.path.join(folder, file)):
                file_locations[file].append(folder)
    
    # Check if files from the same interval are in multiple folders
    for file, locations in file_locations.items():
        if len(locations) > 1:
            print(f"File {file} is found in multiple folders: {locations}")
    
    # Return True if all files are in a single folder
    return all(len(locations) <= 1 for locations in file_locations.values())

# Iterate over each row in the dataset and perform the check
for idx, row in data.iterrows():
    start_time = pd.to_datetime(row[' TSTART [UT]'])
    end_time = pd.to_datetime(row[' TFINAL [UT]'])
    print(f"Checking row {idx} with interval {start_time} to {end_time}...")
    
    if not check_files_in_folders(start_time, end_time):
        print(f"Files for row {idx} are distributed across multiple folders!")
    else:
        print(f"All files for row {idx} are in a single folder.")

print("Check completed.")

In [1]:
def check_duplicate_images(data, jp2_folder, output_folders, num_rows=None):
    if num_rows is None:
        num_rows = len(data)

    print(f"Checking {num_rows} rows from the dataset for duplicates...")

    # Helper function to determine output folder based on index
    def get_output_folder(index):
        if index % 10 < 7:
            return output_folders[0]  # Train
        elif index % 10 < 9:
            return output_folders[1]  # Validation
        else:
            return output_folders[2]  # Test

    # Track processed files to detect duplicates
    processed_files = {}
    duplicate_files = []

    for index, row in data.iterrows():
        if index >= num_rows:
            break

        lat = row[' LAT [deg]']
        lon = row[' LON [deg]']
        start_time = f"{row[' TSTART [UT]']}:00"
        end_time = f"{row[' TFINAL [UT]']}:00"

        # Create time range
        time_range = pd.date_range(start=start_time, end=end_time, freq='min')

        # Determine the output folder (train/val/test) for this row
        images_folder, _ = get_output_folder(index)

        for timestamp in time_range:
            date_str = timestamp.strftime('%Y-%m-%dT%H:%M:%S')
            jp2_filename = os.path.join(jp2_folder, f'{date_str.replace(":", "-")}_AIA1600.jp2')

            # Check if the file has already been processed
            if date_str in processed_files:
                duplicate_files.append((date_str, processed_files[date_str], images_folder))
            else:
                processed_files[date_str] = images_folder

    # Report duplicates if any
    if duplicate_files:
        print("Duplicate files detected:")
        for date_str, folder1, folder2 in duplicate_files:
            print(f"File {date_str} found in both {folder1} and {folder2}")
    else:
        print("No duplicate files detected.")

    return duplicate_files

# Directory setup
base_folder = 'data'
jp2_folder = os.path.join(base_folder, 'jp2')
train_images_folder = os.path.join(base_folder, 'train/images')
train_labels_folder = os.path.join(base_folder, 'train/labels')
val_images_folder = os.path.join(base_folder, 'val/images')
val_labels_folder = os.path.join(base_folder, 'val/labels')
test_images_folder = os.path.join(base_folder, 'test/images')
test_labels_folder = os.path.join(base_folder, 'test/labels')
annotated_folder = os.path.join(base_folder, 'annotated')

output_folders = [
    (train_images_folder, train_labels_folder),
    (val_images_folder, val_labels_folder),
    (test_images_folder, test_labels_folder)
]

data = pd.read_csv("ribbondb_v1.0.csv")

check_duplicate_images(data, 'data,jp2', output_folders)

In [None]:
# Define paths
train_folder = 'data/train'


# Target precision to stop training
target_map = 0.8  # Replace with your desired mAP threshold

# Check if folders exist
if not os.path.exists(train_folder):
    print("Train folder does not exist. Please ensure the path is correct.")
else:
    # Initialize YOLOv8 model
    model = YOLO("runs/detect/train36/weights/epoch10.pt")  # YOLOv8 nano model as a starting point

    # Train the model on the custom dataset
    results = model.train(
        data='custom_data.yaml',
        epochs=50,
        imgsz=512,
        batch=8,
        hsv_h=0.0,
        hsv_s=0.1,
        hsv_v=0.1,
        translate=0.0,
        scale=0.0,
        flipud=0.5,
        mosaic=0.0,
        erasing=0.0,
        crop_fraction=0.0,
        verbose=True,
        patience = 10,
        save_period = 5,
        workers=0,
    )
    print("Training process completed.")

In [None]:
model = YOLO("best/best.pt")

conf_values = [0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] 
iou_values = [0.0001, 0.001, 0.1, 0.5] 

for conf in conf_values:
    for iou in iou_values:
        metrics = model.val(conf=conf, iou=iou, save_json=True)

        val_folder = metrics.save_dir
        
        file_path = os.path.join(val_folder, "arguments.txt")

        with open(file_path, "w") as f:
            f.write(f"conf={conf}, iou={iou}\n")

In [None]:
# Output dictionary in COCO format
coco = {
    "images": [],
    "annotations": [],
    "categories": [{"id": 1, "name": "event"}]
}

labels_path = "data/val/labels"
images_path = "data/val/images"  # current directory
annotation_id = 1

# Loop through all .txt files in labels/
for fname in os.listdir(labels_path):
    if not fname.endswith(".txt"):
        continue

    image_id = fname.replace(".txt", "")
    image_file = image_id + ".png"
    image_full_path = os.path.join(images_path, image_file)
    label_full_path = os.path.join(labels_path, fname)

    if not os.path.exists(image_full_path):
        print(f"Image {image_file} not found, skipping...")
        continue

    # Get image dimensions
    with Image.open(image_full_path) as img:
        width, height = img.size

    # Add image metadata
    coco["images"].append({
        "id": image_id,
        "file_name": image_file,
        "width": width,
        "height": height
    })

    # Read all objects from .txt
    with open(label_full_path, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 5:
                continue  # skip invalid lines

            cls_id, x_center, y_center, w, h = map(float, parts)

            # Convert YOLO → COCO format (x, y, width, height)
            abs_x = (x_center - w / 2) * width
            abs_y = (y_center - h / 2) * height
            abs_w = w * width
            abs_h = h * height

            coco["annotations"].append({
                "id": annotation_id,
                "image_id": image_id,
                "category_id": int(cls_id) + 1,  # Class IDs start from 1 in COCO
                "bbox": [abs_x, abs_y, abs_w, abs_h],
                "area": abs_w * abs_h,
                "iscrowd": 0
            })
            annotation_id += 1

# Save to JSON
with open("ground_truth.json", "w") as f:
    json.dump(coco, f, indent=2)

print(f"Done! Total GT annotations: {len(coco['annotations'])}")

In [None]:
# Load ground truth annotations
coco_gt = COCO("ground_truth.json")

# Load predictions (YOLOv8 output with save_json=True)
coco_dt = coco_gt.loadRes("runs/detect/val99/predictions.json")

# Evaluate predictions using COCO metrics
coco_eval = COCOeval(coco_gt, coco_dt, iouType="bbox")
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()

In [None]:
# Folder paths
image_folder = "data/test/images"  # Folder with input images
output_folder = "output"  # Folder to save processed images
model_path = "best/best.pt"  # YOLOv8 model file
labels_folder = "data/test/labels"  # Folder with label files

# Prediction settings
iou_threshold = 0.0001  # IOU threshold (for filtering overlapping boxes)
conf_threshold = 0.3  # Confidence threshold (for removing weak predictions)

# Load YOLOv8 model
model = YOLO(model_path)

# Create output folder if it doesn't exist
Path(output_folder).mkdir(parents=True, exist_ok=True)

# Get a list of all image files
image_files = [f for f in os.listdir(image_folder) if f.endswith(('.jpg', '.png', '.jpeg'))]

# Process each image
for image_name in image_files:
    image_path = os.path.join(image_folder, image_name)
    label_path = os.path.join(labels_folder, os.path.splitext(image_name)[0] + ".txt")  # Corresponding label file

    # Load the image
    image = cv2.imread(image_path)
    h, w, _ = image.shape  # Get image dimensions

    # Check if label file exists
    if os.path.exists(label_path):
        with open(label_path, "r") as f:
            lines = f.readlines()

        # Process bounding boxes from the label file
        for line in lines:
            parts = line.strip().split()
            if len(parts) == 5:
                _, x_center, y_center, width, height = map(float, parts)

                # Convert normalized coordinates to pixels
                x1 = int((x_center - width / 2) * w)
                y1 = int((y_center - height / 2) * h)
                x2 = int((x_center + width / 2) * w)
                y2 = int((y_center + height / 2) * h)

                # Draw bounding box from labels (green)
                cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)

    # Run YOLOv8 prediction
    results = model(image_path, conf=conf_threshold, iou=iou_threshold)

    # Add predicted YOLO bounding boxes
    for result in results:
        for box in result.boxes.xyxy:
            x1, y1, x2, y2 = map(int, box[:4])
            cv2.rectangle(image, (x1, y1), (x2, y2), (255, 0, 0), 2)  # YOLO boxes (blue)

    # Save the annotated image
    output_image_path = os.path.join(output_folder, image_name)
    cv2.imwrite(output_image_path, image)

    print(f"Result saved: {output_image_path}")

print("Processing completed!")
