In [1]:
import numpy as np 
import os
import matplotlib
from PIL import Image, ImageOps
import random
import cv2
import csv
import shutil

In [2]:
def set_img_name(origin_path, destination_path, i = ""):
    file_name = os.path.basename(origin_path)
    file_name_splitted = file_name.split(".")
    file_name = file_name_splitted[0] + "_" + str(i) 
    file_name_extension = file_name + "." + file_name_splitted[-1]
    destination_path = destination_path + "/" + file_name_extension

    return destination_path, file_name

def random_translation(width, height):
    limits_x = int(width * .75 ) // 2
    limits_y = int(height * .65 ) // 2

    x = random.randint(-limits_x, limits_x)
    y = random.randint(-limits_y, limits_y)

    return x, y

In [3]:
def translate_img(image, width, height):
    new_image = Image.new("RGB", (width, height))
    
    #translating the image
    displacement_x, displacement_y = random_translation(width, height)
    paste_position = (displacement_x, displacement_y)
    new_image.paste(image, paste_position)

    return new_image, displacement_x, displacement_y

def mirror_image(image, width, height):
    new_image, displacement_x, displacement_y = translate_img(image, width, height)
    new_image = ImageOps.mirror(new_image)
    displacement_x = (-displacement_x)
    return new_image, displacement_x, displacement_y

def zoom_and_crop(image):
    # Making a random zoom factor and keeping it between the range (1,2]
    zoom_factor = 2 - random.random()
    
    # Calculate the new dimensions after zooming
    width, height = image.size
    new_width = int(width * zoom_factor)
    new_height = int(height * zoom_factor)

    # zoom image
    zoomed_image = image.resize((new_width, new_height))

    # Calculate the coordinates for cropping
    left = (new_width - width) / 2
    top = (new_height - height) / 2
    right = (new_width + width) / 2
    bottom = (new_height + height) / 2

    # Crop the image to the original resolution
    cropped_image = zoomed_image.crop((left, top, right, bottom))

    return cropped_image, zoom_factor

def save_img(image, image_path, new_destination, i):
    image_path, image_name = set_img_name(image_path, new_destination, i)
    try:
        image.save(image_path)
    except:
        print("\n\n\n\n\n\n\n\n", image_path, "\n\n\n\n\n\n\n\n")

    return image_name

In [4]:
def data_augmentation(image_path, destination_folder = "./"):
    original_image = Image.open(image_path)
    width,height = original_image.size 

    imgs = []


    #Translating image
    new_image, displacement_x, displacement_y = translate_img(original_image, width, height)
    new_image_name = save_img(new_image, image_path, destination_folder, 0)
    imgs.append({"image": new_image_name, "displacement_x": displacement_x, "displacement_y": displacement_y, "zoom_factor": 1})

    # Mirroring one image
    new_image, displacement_x, displacement_y = mirror_image(original_image, width, height)
    new_image_name = save_img(new_image, image_path, destination_folder, 1)
    imgs.append({"image": new_image_name, "displacement_x": displacement_x, "displacement_y": displacement_y, "zoom_factor": 1})

    #Zooming image
    new_image, zoom_factor = zoom_and_crop(original_image)
    new_image_name = save_img(new_image, image_path, destination_folder, 2)
    imgs.append({"image": new_image_name, "displacement_x": 0, "displacement_y": 0, "zoom_factor": zoom_factor})

    return imgs

In [5]:
def read_dataset(dataset = "LG4000", train_verification = "train"):
    assert dataset == "LG4000" or dataset == "AD100"
    assert train_verification == "train" or train_verification == "verification"

    train_verification = "verification-subject-disjoint" if train_verification == "verification" else "train"

    data = []

    csvreader = None
    with open(r"D:/Datasets/ND-Contact-Lens/LG4000/"+train_verification+".csv", "r") as file:
        csvreader = list(csv.reader(file))

    csvreader.pop(0) # Removing the header line of the file

    for row in csvreader:
        data.append({"image": row[0], "x": int(row[6]), "y":int(row[7]), "radius": int(row[8])})

    del csvreader

    return data

dataset = read_dataset()

In [6]:
def mk_data_augmentation(dataset, thread = None, origin_img_folder_path = "D:/Datasets/ND-Contact-Lens/LG4000/images/", new_images_folder_path = "D:/Datasets/ND-Contact-Lens/LG4000-augmented/images"):

    csv_file_path = r"D:/Datasets/ND-Contact-Lens/LG4000-augmented/train.csv"
    file = open(csv_file_path, "w", newline="\n")
    writer = csv.writer(file)

    writer.writerow(["Image","Subject","Eye","Gender","Race","Contacts","PupilX","PupilY","PupilR","IrisX","IrisY","IrisR","Fold"])

    for data in dataset:
        # Writting the original image into the csv file
        writer.writerow([data["image"], "_","_","_","_","_", data["x"], data["y"], data["radius"], "_","_","_"])
        shutil.copy(origin_img_folder_path+data["image"]+".tiff", new_images_folder_path)


        # Generating new images from the original and saving them
        new_imgs = data_augmentation(origin_img_folder_path + data["image"] + ".tiff", new_images_folder_path)
        for img in new_imgs:
            x = data["x"] + img["displacement_x"]
            y = data["y"] + img["displacement_y"]
            radius = data["radius"] * img["zoom_factor"]

            write = [img["image"], "_","_","_","_","_", str(x), str(y), str(radius), "_","_","_"]
            writer.writerow(write)

    file.close()

dataset = read_dataset()
mk_data_augmentation(dataset, 1)

In [7]:
# import threading

# threads = []
# total_images = len(dataset)
# total_threads = os.cpu_count()

# batch_size = total_images // total_threads

# for i in range(total_threads):

#     begin = i * batch_size 
#     end = i * batch_size + batch_size
#     if i == total_threads-1:
#         end += 1

#     print("Begin: %4d - End: %4d" % (begin, end))

#     thread = threading.Thread(target=mk_data_augmentation, args=(dataset[begin : end], i+1))
#     threads.append(thread)

# print("Initializing threads")
# for thread in threads:
#     thread.start()

# print("Finishing threads")
# for thread in threads:
#     thread.join()

In [11]:
origin_img_folder_path = "D:/Datasets/ND-Contact-Lens/LG4000/images/"

dataset = read_dataset(train_verification="verification")
print(len(dataset))

destination = "D:/Datasets/ND-Contact-Lens/LG4000-augmented/"

file = open(destination + "verification-subject-disjoint.csv", "w", newline="\n")
writer = csv.writer(file)
writer.writerow(["Image","Subject","Eye","Gender","Race","Contacts","PupilX","PupilY","PupilR","IrisX","IrisY","IrisR","Fold"])

for i, data in enumerate(dataset):

    shutil.copy(origin_img_folder_path+data["image"]+".tiff", destination)

    write = [data["image"], "_","_","_","_","_", str(data["x"]), str(data["y"]), str(data["radius"]), "_","_","_"]
    writer.writerow(write)

    percentage = 100 * (i+1)/len(dataset) 
    if percentage % 10 == 0:
        print(str(percentage) + "%")

file.close()

1200
10.0%
20.0%
30.0%
40.0%
50.0%
60.0%
70.0%
80.0%
90.0%
100.0%
