In [11]:
import numpy as np
import cv2
from google.colab.patches import cv2_imshow
from sklearn.model_selection import train_test_split
import imgaug.augmenters as iaa
from pathlib import Path
import random

In [4]:
# Constants
LABELED_ROI_IMAGES_PATH = Path('/content/drive/MyDrive/osteoporosis_data/labeled_rois')
TRAIN_IMAGES_PATH = Path('/content/drive/MyDrive/osteoporosis_data/train')
TEST_IMAGES_PATH = Path('/content/drive/MyDrive/osteoporosis_data/test')

TEST_SIZE = 0.2
AUGMENTATION_SIZE = 0.3

### Load data

In [5]:
def load_images(path):
  file_list = [f for f in path.glob('**/*') if f.is_file()]
  images_dict = {}

  for image_path in file_list:
    image_name = image_path.stem
    image = cv2.imread(str(image_path), cv2.IMREAD_GRAYSCALE)
    images_dict[image_name] = image

  return images_dict

In [6]:
images = load_images(LABELED_ROI_IMAGES_PATH)
print("Number of images:", len(images))

Number of images: 382


### Down-sample

In [7]:
downsampled_images = {image_name: cv2.resize(image, (300, 350)) for image_name, image in images.items()}

### Train-test split

In [8]:
def validate_class_label(label):
  no_digit_label = ''.join([i for i in label if not i.isdigit()])

  if no_digit_label != "N" and no_digit_label != "OP" and no_digit_label != "OS":
    raise ValueError(f"Illegal label {label}")
  return no_digit_label


# Extract the class labels from the images names
labels = [validate_class_label(image_name.split('_')[0]) for image_name in images.keys()]

In [12]:
np.unique(labels, return_counts=True)

(array(['N', 'OP', 'OS'], dtype='<U2'), array([ 57, 250,  75]))

In [None]:
train_images, test_images, train_labels, test_labels = train_test_split(
    list(downsampled_images.items()), labels, test_size=TEST_SIZE, random_state=42, stratify=labels)

# Convert the lists of tuple back to dictionaries
train_images = dict(train_images)
test_images = dict(test_images)

print("Train images size:", len(train_images))
print("Test images size:", len(test_images))

Train images size: 305
Test images size: 77


### Augment training set

In [None]:
def extract_dict_sub_sample(images, percentage):
  num_of_records = int(len(images) * percentage)
  extracted_items = random.sample(list(images.items()), num_of_records)
  return dict(extracted_items)


def augment_images_dict(images, augmenter, aug_percentage, name_suffix):
  sub_train_images = extract_dict_sub_sample(images, percentage=aug_percentage)
  return {f"{image_name}_{name_suffix}": augmenter(image=image) for image_name, image in sub_train_images.items()}

#### Rotate

In [None]:
# sub_train_images = extract_dict_sub_sample(train_images, percentage=0.2)
# print("Sub sample size:", len(sub_train_images))

In [None]:
# Define an augmentation pipeline
rotator = iaa.Sequential([
    iaa.Affine(rotate=(-5, 5))
])

# Apply the augmentation pipeline to each image in the dictionary
# rotated_sub_train_images = {f"{image_name}_rotated": seq(image=image) for image_name, image in sub_train_images.items()}
rotated_sub_train_images = augment_images_dict(train_images, rotator,
                                               aug_percentage=AUGMENTATION_SIZE,
                                               name_suffix="rotated")

In [None]:
# cv2_imshow(rotated_sub_train_images["OP67_L_rotated"])
# rotated_sub_train_images
print("Rotated images:", len(rotated_sub_train_images))

Rotated images: 91


### Flip horizontally

In [None]:
# Define a horizontal flip augmenter
flipper = iaa.Fliplr(1.0)
flipped_sub_train_images = augment_images_dict(train_images, flipper,
                                               aug_percentage=AUGMENTATION_SIZE,
                                               name_suffix="flipped")

In [None]:
# cv2_imshow(flipped_sub_train_images["OP68_L_flipped"])
# flipped_sub_train_images
print("Flipped images:", len(flipped_sub_train_images))

Flipped images: 91


### Shift

In [None]:
# Define a shift augmenter
shifter = iaa.Affine(translate_px={"x": (-10, 10), "y": (-10, 10)})
shifted_sub_train_images = augment_images_dict(train_images, shifter,
                                               aug_percentage=AUGMENTATION_SIZE,
                                               name_suffix="shifted")

In [None]:
# cv2_imshow(shifted_sub_train_images["OP58_s_L_shifted"])
# shifted_sub_train_images
print("Shifted images:", len(shifted_sub_train_images))

Shifted images: 91


#### Add noise

In [None]:
noise = iaa.AdditiveGaussianNoise(scale=(0, 25))
noisy_sub_train_images = augment_images_dict(train_images, noise,
                                               aug_percentage=AUGMENTATION_SIZE,
                                               name_suffix="noisy")

In [None]:
# cv2_imshow(noisy_sub_train_images["OP59_L_noisy"])
# noisy_sub_train_images
print("Noisy images:", len(noisy_sub_train_images))

Noisy images: 91


### Add augmented images to train set

In [None]:
augmented_train_images = train_images.copy()
augmented_train_images.update(rotated_sub_train_images)
augmented_train_images.update(flipped_sub_train_images)
augmented_train_images.update(shifted_sub_train_images)
augmented_train_images.update(noisy_sub_train_images)
print("Augmented images size:", len(augmented_train_images))

Augmented images size: 669


### Save data

In [None]:
def save_images_dict(images, path):
  # Check if the directory is empty before adding values
  if any(path.iterdir()):
    raise ValueError("The directory is not empty!")

  for image_name, image in images.items():
    file_path = f"{str(path)}/{image_name}.png"

    if not cv2.imwrite(file_path, image):
      raise Exception("Failed saving", file_path)

In [None]:
save_images_dict(test_images, TEST_IMAGES_PATH)

In [None]:
save_images_dict(augmented_train_images, TRAIN_IMAGES_PATH)