In [1]:
import random
import numpy as np
import pandas as pd
from pathlib import Path
import albumentations as A
import cv2
import os

In [2]:
labels_df = pd.read_csv('../data/fundus/MuReD/train_data.csv')
images_path = Path('../data/fundus/MuReD/images/images')
da_images_path = Path('../data/fundus/MuReD/images/lpros030')
len(labels_df)

1764

In [3]:
transform = A.Compose([
    A.OneOf([
        A.Rotate(limit=45, p=1.0),  # 旋转，当被选择时应用的概率为1.0
        A.HorizontalFlip(p=1.0),    # 水平翻转，当被选择时应用的概率为1.0
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=1.0),  # 随机亮度对比度调整
        A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=1.0)  # 色调饱和度调整
    ], p=1)
])

In [4]:
def get_da_image(img_name, ID_name):
    image_path = images_path / f"{img_name}.png" if os.path.exists(images_path / f"{img_name}.png") else images_path / f"{img_name}.tif"
    image = cv2.imread(str(image_path))
    if image is not None:
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        transformed = transform(image=image)
        transformed_image = transformed["image"]

        new_img_name = f"DA_{ID_name}.png"
        new_img_path = da_images_path / new_img_name
        cv2.imwrite(str(new_img_path), cv2.cvtColor(transformed_image, cv2.COLOR_RGB2BGR))
    
    return new_img_name.split('.')[0]

In [5]:
def LP_ROS(D, size_addition=0.25):
    samples_to_add = int(len(D) * size_addition)
    
    # Group samples according to their labelsets
    label_set_bags = {}
    for index, sample in D.iterrows():
        labelset = tuple(sample[1:])  # Assuming 'labelset' is a column containing the labelset as a tuple or list
        if labelset not in label_set_bags:
            label_set_bags[labelset] = []
        label_set_bags[labelset].append(sample)
    
    # Calculate the average number of samples per labelset
    mean_size = sum(len(bag) for bag in label_set_bags.values()) / len(label_set_bags)
    
    # Obtain minority labels bags
    min_bag = [bag for bag in label_set_bags.values() if len(bag) < mean_size]
    
    # Calculate mean increment
    mean_increment = samples_to_add / len(min_bag)
    
    # Sort bags from largest to smallest
    min_bag.sort(key=len, reverse=True)
    
    # Augment instances
    new_samples = []
    ID_name = 1
    for bag in min_bag:
        increment = min(mean_size - len(bag), mean_increment)
        for _ in range(int(increment)):
            # Clone random samples from the minority bag
            sample_to_clone = random.choice(bag)
            new_image_name = get_da_image(sample_to_clone.iloc[0], ID_name)
            new_samples.append([new_image_name] + sample_to_clone[1:].tolist())
            ID_name += 1
    
    # Convert new_samples list to DataFrame if needed
    if new_samples:
        new_samples_df = pd.DataFrame(new_samples, columns=D.columns)
        D = pd.concat([D, new_samples_df], ignore_index=True, axis=0)
    
    return D

da_labels_df = LP_ROS(labels_df, size_addition=0.30)
len(da_labels_df)

2242

In [6]:
da_labels_df.to_csv('../data/fundus/MuReD/lpros030_train_data.csv', index=False)