80% train 10% validation 10% test
4 data augmentation operations

In [None]:
import os
import cv2
from glob import glob
import numpy as np
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from albumentations import *

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
def create_dir(path):
    """Create a directory"""
    if not os.path.exists(path):
        os.makedirs(path)

In [None]:
def load_data():

    split=0.1 # 90% train&valid, 10%test
    split2=0.1 # 80%train, 10% valid, 10% test

    #dataset1_path = path + '/dataset1'
    dataset1_path = '/content/drive/MyDrive/pfa-dataset1'

    images = sorted(glob(f"{dataset1_path}/Images/*.BMP"))
    masks_te = sorted(glob(f"{dataset1_path}/GT_TE/*.bmp"))
    masks_icm = sorted(glob(f"{dataset1_path}/GT_ICM/*.bmp"))
    masks_zp = sorted(glob(f"{dataset1_path}/GT_ZP/*.bmp"))
    print("total dataset:",len(masks_zp), len(masks_icm), len(masks_te), len(images))

    """Split data"""
    split_size=round(split*len(images))#25 images for test
    split_size2=round(split2*len(images))#25 images for valid, 199 train

    train_i, test_i, train_te, test_te, train_zp, test_zp ,train_icm, test_icm = train_test_split(images,masks_te,masks_zp,masks_icm, test_size=split_size, random_state=42)
    train_i, valid_i, train_te, valid_te, train_zp, valid_zp ,train_icm, valid_icm = train_test_split(train_i,train_te,train_zp,train_icm, test_size=split_size2, random_state=42)

    print("train, valid, test")
    print(len(train_zp), len(valid_zp),len(test_zp))

    return (train_i, test_i, valid_i), (train_zp, test_zp, valid_zp), (train_icm, test_icm, valid_icm), (train_te, test_te, valid_te)

In [None]:
def read_this(image_file):

    image_src = cv2.imread(image_file)
    image_src = cv2.cvtColor(image_src, cv2.COLOR_BGR2GRAY)

    return image_src

In [None]:
def resize_enhance(i, zp, icm, te):
    W = 256
    H = 256

    i = cv2.resize(i, (W, H))
    zp = cv2.resize(zp, (W, H))
    icm = cv2.resize(icm, (W, H))
    te = cv2.resize(te, (W, H))
    #i = enhance_contrast(i)
    i = i.astype('uint8')

    return i, zp, icm, te

In [None]:
import csv
from google.colab.patches import cv2_imshow

def augment_data(images, masks_zp, masks_icm, masks_te, new_path, train):
    """Performing data augmentation"""

    print(len(images))
    myvar=0
    myvar2=0
    myvar3=0
    x=0

    for idx, (i, zp, icm, te) in tqdm(enumerate(zip(images, masks_zp, masks_icm, masks_te)), total=len(images)):

        name = i.split("/")[-1].split(".BMP")[0]#corrected 20 aug

        i = read_this(i)
        zp = read_this(zp)
        icm = read_this(icm)
        te = read_this(te)

        width, height = i.shape[0], i.shape[1]

        if train:
            myvar=myvar+1 #another train image processed

            aug = Compose([

                HorizontalFlip(p=1)

            ], additional_targets={'mask_zp': 'mask', 'mask_icm': 'mask', 'mask_te': 'mask'})

            augmented = aug(image=i, mask_zp=zp, mask_icm=icm, mask_te=te)
            i1 = augmented['image']
            zp1 = augmented['mask_zp']
            icm1 = augmented['mask_icm']
            te1 = augmented['mask_te']

            aug = Compose([

                VerticalFlip(p=1)

            ], additional_targets={'mask_zp': 'mask', 'mask_icm': 'mask', 'mask_te': 'mask'})

            augmented = aug(image=i, mask_zp=zp, mask_icm=icm, mask_te=te)
            i2 = augmented['image']
            zp2 = augmented['mask_zp']
            icm2 = augmented['mask_icm']
            te2 = augmented['mask_te']

            aug = Compose([

                Transpose(1)

            ], additional_targets={'mask_zp': 'mask', 'mask_icm': 'mask', 'mask_te': 'mask'})

            augmented = aug(image=i, mask_zp=zp, mask_icm=icm, mask_te=te)
            i3 = augmented['image']
            zp3 = augmented['mask_zp']
            icm3 = augmented['mask_icm']
            te3 = augmented['mask_te']

            aug = Compose([

                HorizontalFlip(p=1)

            ], additional_targets={'mask_zp': 'mask', 'mask_icm': 'mask', 'mask_te': 'mask'})


            augmented = aug(image=i3, mask_zp=zp3, mask_icm=icm3, mask_te=te3)
            i4 = augmented['image']
            zp4 = augmented['mask_zp']
            icm4 = augmented['mask_icm']
            te4 = augmented['mask_te']


            I = [i, i1, i2, i3,i4]
            ZP = [zp, zp1, zp2, zp3,zp4]
            ICM = [icm, icm1, icm2, icm3,icm4]
            TE = [te, te1, te2, te3,te4]

            myvar3=myvar3+len(I)

        else:
            myvar2=myvar2+1 #another test/valid image processed

            I = [i]
            ZP = [zp]
            ICM = [icm]
            TE = [te]

        index = 0
        real_size = []
        for i_aug, zp_aug, icm_aug, te_aug in zip(I, ZP, ICM, TE):

            i_w, i_h = i_aug.shape[0], i_aug.shape[1]

            i_res, zp_res, icm_res, te_res = resize_enhance(i_aug, zp_aug, icm_aug, te_aug)

            if len(I) == 1:
                i_name = f"{name}.bmp"
                zp_name = f"{name} ZP_Mask.bmp"
                icm_name = f"{name} ICM_Mask.bmp"
                te_name = f"{name} TE_Mask.bmp"
            else:
                i_name = f"{name}_{index}.bmp"
                zp_name = f"{name} ZP_Mask_{index}.bmp"
                icm_name = f"{name} ICM_Mask_{index}.bmp"
                te_name = f"{name} TE_Mask_{index}.bmp"
                index += 1

            i_path = os.path.join(new_path, "images/", i_name)
            zp_path = os.path.join(new_path, "GT_ZP/", zp_name)
            icm_path = os.path.join(new_path, "GT_ICM/", icm_name)
            te_path = os.path.join(new_path, "GT_TE/", te_name)

            real_size.append([i_name, i_w, i_h])

            cv2.imwrite(i_path, i_res)
            cv2.imwrite(zp_path, zp_res)
            cv2.imwrite(icm_path, icm_res)
            cv2.imwrite(te_path, te_res)
            x=x+1

        with open('/content/drive/MyDrive/PFA_Final/new_data/real_size.csv', 'a+', newline='\n') as file:
          writer = csv.writer(file)
          writer.writerows(real_size)

    print("nb train images ",myvar, "non train images",myvar2)
    print("augmented total",myvar3)
    print("images stored total",x)

In [None]:
#Load the dataset
drive_path = "/content/drive/MyDrive/PFA_Final"

(train_i, test_i, valid_i), (train_zp, test_zp, valid_zp), (train_icm, test_icm, valid_icm), (train_te, test_te, valid_te)= load_data()

create_dir(drive_path+"/new_data/")

test_path = drive_path+"/new_data/test/"
valid_path= drive_path+"/new_data/valid/"
train_path= drive_path+"/new_data/train/"

create_dir(train_path)
create_dir(test_path)
create_dir(valid_path)

create_dir(test_path+"images/")
create_dir(train_path+"images/")
create_dir(valid_path+"images/")

create_dir(test_path+"GT_ZP/")
create_dir(train_path+"GT_ZP/")
create_dir(valid_path+"GT_ZP/")

create_dir(test_path+"GT_ICM/")
create_dir(train_path+"GT_ICM/")
create_dir(valid_path+"GT_ICM/")

create_dir(test_path+"GT_TE/")
create_dir(train_path+"GT_TE/")
create_dir(valid_path+"GT_TE/")

augment_data(test_i, test_zp, test_icm, test_te, test_path, train=False)
augment_data(valid_i, valid_zp, valid_icm, valid_te, valid_path, train=False)
augment_data(train_i, train_zp, train_icm, train_te, train_path, train=True)

total dataset: 249 249 249 249
train, valid, test
199 25 25
25


100%|██████████| 25/25 [00:21<00:00,  1.14it/s]


nb train images  0 non train images 25
augmented total 0
images stored total 25
25


100%|██████████| 25/25 [00:15<00:00,  1.63it/s]


nb train images  0 non train images 25
augmented total 0
images stored total 25
199


100%|██████████| 199/199 [03:10<00:00,  1.05it/s]

nb train images  199 non train images 0
augmented total 995
images stored total 995



