In [13]:
import os
import cv2
import uuid
import numpy as np
from PIL import Image
import tensorflow as tf
from astropy.io import fits
import matplotlib.pyplot as plt
from astropy.stats import sigma_clip
from astropy.visualization import (MinMaxInterval, SqrtStretch, ImageNormalize)
import warnings
warnings.filterwarnings('ignore')

THE FOLLOWING SECTION WILL PERFORM THE DATA PREPROCESSING PHASE OF THE DATA PIPELINE

In [14]:
def convert_fits_to_jpg(directory_path, target_path):

    if not os.path.isdir(directory_path):
        raise NotADirectoryError(f"Directory not found: {directory_path}")

    file_list = os.listdir(directory_path)

    for file_name in file_list:
        file_path = os.path.join(directory_path, file_name)

        if file_name.lower().endswith(".fits"):
            with fits.open(file_path) as hdul:
                image_data = hdul[0].data
            # Close the FITS file
            hdul.close()

            # Apply sigma clipping
            sigma = 3.0
            sigma_data = sigma_clip(image_data, sigma=sigma)

            # Normalize data between [0,255] in order to save images as UINT8 and JPG
            sigma_data = (sigma_data - sigma_data.min()) / (sigma_data.max() - sigma_data.min()) * 255
            sigma_data = sigma_data.astype('uint8')

            # Used for colour ranges
            sigma_data = sigma_data[:, :, None].repeat(3, axis=2)

            # # Display original and processed images side by side
            # fig, axes = plt.subplots(1, 2, figsize=(10, 5))

            # # Original Image
            # ax = axes[0]
            # ax.set_title('Original')
            # norm = ImageNormalize(stretch=SqrtStretch())
            # im = ax.imshow(image_data, cmap='gray', origin='lower', norm=norm)
            # fig.colorbar(im, ax=ax)

            # # Sigma clipped/normalized Image
            # ax = axes[1]
            # ax.set_title('Sigma Clipped/Normalized')
            # im = ax.imshow(sigma_data, cmap='gray', origin='lower')
            # fig.colorbar(im, ax=ax)

            image = Image.fromarray(sigma_data)
            final_path = os.path.join(target_path, file_name + '.jpg')
            image.save(final_path)
        else:
            print('None .fits file found.')

In [15]:
anchor_class_ = 'anchor'
positive_class_ = 'positive'
negative_class_ = 'negative'
training_set_ = 'training'
validation_set_ = 'validation'
core = '/content/drive/MyDrive/Colab Notebooks/Final/data'

In [16]:
# EXPERIMENT ALPHA - ORIGINAL BENT_TAIL VS FRII [NON-AUGMENTED]
# source_path = os.path.join(core, 'alpha', 'base', training_set_, anchor_class_)
# target_path = os.path.join(core, 'alpha', 'sigma_non_augmented', training_set_, anchor_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'alpha', 'base', training_set_, positive_class_)
# target_path = os.path.join(core, 'alpha', 'sigma_non_augmented', training_set_, positive_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'alpha', 'base', training_set_, negative_class_)
# target_path = os.path.join(core, 'alpha', 'sigma_non_augmented', training_set_, negative_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'alpha', 'base', validation_set_, anchor_class_)
# target_path = os.path.join(core, 'alpha', 'sigma_non_augmented', validation_set_, anchor_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'alpha', 'base', validation_set_, positive_class_)
# target_path = os.path.join(core, 'alpha', 'sigma_non_augmented', validation_set_, positive_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'alpha', 'base', validation_set_, negative_class_)
# target_path = os.path.join(core, 'alpha', 'sigma_non_augmented', validation_set_, negative_class_)
# convert_fits_to_jpg(source_path, target_path)

# EXPERIMENT BETA - ORIGINAL BENT_TAIL VS FRII [AUGMENTED DATA]
# source_path = os.path.join(core, 'beta', 'base', training_set_, anchor_class_)
# target_path = os.path.join(core, 'beta', 'sigma_augmented', training_set_, anchor_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'beta', 'base', training_set_, positive_class_)
# target_path = os.path.join(core, 'beta', 'sigma_augmented', training_set_, positive_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'beta', 'base', training_set_, negative_class_)
# target_path = os.path.join(core, 'beta', 'sigma_augmented', training_set_, negative_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'beta', 'base', validation_set_, anchor_class_)
# target_path = os.path.join(core, 'beta', 'sigma_augmented', validation_set_, anchor_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'beta', 'base', validation_set_, positive_class_)
# target_path = os.path.join(core, 'beta', 'sigma_augmented', validation_set_, positive_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'beta', 'base', validation_set_, negative_class_)
# target_path = os.path.join(core, 'beta', 'sigma_augmented', validation_set_, negative_class_)
# convert_fits_to_jpg(source_path, target_path)

# EXPERIMENT CHARLIE - ORC IMAGES [NON-AUGMENTED]
# source_path = os.path.join(core, 'charlie', 'base', training_set_, anchor_class_)
# target_path = os.path.join(core, 'charlie', 'sigma_non_augmented', training_set_, anchor_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'charlie', 'base', training_set_, positive_class_)
# target_path = os.path.join(core, 'charlie', 'sigma_non_augmented', training_set_, positive_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'charlie', 'base', training_set_, negative_class_)
# target_path = os.path.join(core, 'charlie', 'sigma_non_augmented', training_set_, negative_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'charlie', 'base', validation_set_, anchor_class_)
# target_path = os.path.join(core, 'charlie', 'sigma_non_augmented', validation_set_, anchor_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'charlie', 'base', validation_set_, positive_class_)
# target_path = os.path.join(core, 'charlie', 'sigma_non_augmented', validation_set_, positive_class_)
# convert_fits_to_jpg(source_path, target_path)
# source_path = os.path.join(core, 'charlie', 'base', validation_set_, negative_class_)
# target_path = os.path.join(core, 'charlie', 'sigma_non_augmented', validation_set_, negative_class_)
# convert_fits_to_jpg(source_path, target_path)

# EXPERIMENT DELTA - ORC IMAGES [AUGMENTED DATA]
source_path = os.path.join(core, 'delta', 'base', training_set_, anchor_class_)
target_path = os.path.join(core, 'delta', 'sigma_augmented', training_set_, anchor_class_)
convert_fits_to_jpg(source_path, target_path)
source_path = os.path.join(core, 'delta', 'base', training_set_, positive_class_)
target_path = os.path.join(core, 'delta', 'sigma_augmented', training_set_, positive_class_)
convert_fits_to_jpg(source_path, target_path)
source_path = os.path.join(core, 'delta', 'base', training_set_, negative_class_)
target_path = os.path.join(core, 'delta', 'sigma_augmented', training_set_, negative_class_)
convert_fits_to_jpg(source_path, target_path)
source_path = os.path.join(core, 'delta', 'base', validation_set_, anchor_class_)
target_path = os.path.join(core, 'delta', 'sigma_augmented', validation_set_, anchor_class_)
convert_fits_to_jpg(source_path, target_path)
source_path = os.path.join(core, 'delta', 'base', validation_set_, positive_class_)
target_path = os.path.join(core, 'delta', 'sigma_augmented', validation_set_, positive_class_)
convert_fits_to_jpg(source_path, target_path)
source_path = os.path.join(core, 'delta', 'base', validation_set_, negative_class_)
target_path = os.path.join(core, 'delta', 'sigma_augmented', validation_set_, negative_class_)
convert_fits_to_jpg(source_path, target_path)

THE FOLLOWING SECTION WILL COMPLETE THE DATA AUGMENTATION PHASE OF THE DATA PIPELINE

In [17]:
def crop_and_resize(img, crop_size=(250,250)):
    # Read the image file
    img = tf.io.read_file(img)
    # Decode the image to a tensor
    img = tf.image.decode_image(img, channels=3)
    # Convert to float32 and normalize to [0, 1]
    img = tf.cast(img, tf.float32)

    # Define the bounding box coordinates for cropping (normalized to [0, 1])
    boxes = tf.constant([[0.2, 0.2, 0.8, 0.8]])  # Example: Crop center 60% of the image

    # Generate the crop and resize operation
    cropped_resized_image = tf.image.crop_and_resize(
        img[tf.newaxis, ...],  # Add batch dimension
        boxes,
        box_indices=[0],  # Index of the box to use (in this case, we only have one box)
        crop_size=crop_size
    )

    # Convert to uint8
    cropped_resized_image = tf.cast(cropped_resized_image, tf.uint8)

    return cropped_resized_image[0]  # Remove batch dimension before returning

In [18]:
def flip_left_right(img):
    return tf.image.stateless_random_flip_left_right(img, seed=(np.random.randint(100), np.random.randint(100)))

In [19]:
def flip_up_down(img):
    return tf.image.stateless_random_flip_up_down(img, seed=(np.random.randint(100), np.random.randint(100)))

In [20]:
def random_rotation(img, max_angle=180):
    # Generate a random angle within the specified range
    angle = tf.random.uniform([], -max_angle, max_angle) * (3.141592653589793 / 180)  # Convert degrees to radians

    # Apply rotation using tf.image
    rotated_img = tf.image.rot90(img, k=tf.cast(angle // (3.141592653589793 / 2), tf.int32))

    return rotated_img

In [21]:
def augment_data(im_path):
    augmented_images = []
    for file_name in os.listdir(os.path.join(im_path)):
        img_path = os.path.join(im_path, file_name)
        if img_path.endswith('.jpg'):
            # img = cv2.imread(img_path)
            img = crop_and_resize(img_path)
            # Loop 9 Times => 1440 augmented images + 32 original = 1184
            # Formula (4xloop_counter)x32+32
            for idx in range(5):
                augmented_images.append(img)
                augmented_images.append(flip_left_right(img))
                augmented_images.append(flip_up_down(img))
                augmented_images.append(random_rotation(img))
        else:
            continue

    for image in augmented_images:
        cv2.imwrite(os.path.join(im_path, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [22]:
def augment_val_data(im_path):
    augmented_images = []
    for file_name in os.listdir(os.path.join(im_path)):
        img_path = os.path.join(im_path, file_name)
        if img_path.endswith('.jpg'):
            # img = cv2.imread(img_path)
            img = crop_and_resize(img_path)
            # Loop 9 Times => 1440 augmented images + 32 original = 1184
            # Formula (4xloop_counter)x32+32
            for idx in range(1):
                augmented_images.append(img)
        else:
            continue

    for image in augmented_images:
        cv2.imwrite(os.path.join(im_path, '{}.jpg'.format(uuid.uuid1())), image.numpy())

In [24]:
# EXPERIMENT BETA - ORIGINAL BENT_TAIL VS FRII [AUGMENTED DATA]
# img_path = os.path.join(core, 'beta', 'sigma_augmented', training_set_, anchor_class_)
# augment_data(img_path)
# img_path = os.path.join(core, 'beta', 'sigma_augmented', training_set_, positive_class_)
# augment_data(img_path)
# img_path = os.path.join(core, 'beta', 'sigma_augmented', training_set_, negative_class_)
# augment_data(img_path)
# img_path = os.path.join(core, 'beta', 'sigma_augmented', validation_set_, anchor_class_)
# augment_data(img_path)
# img_path = os.path.join(core, 'beta', 'sigma_augmented', validation_set_, positive_class_)
# augment_data(img_path)
# img_path = os.path.join(core, 'beta', 'sigma_augmented', validation_set_, negative_class_)
# augment_data(img_path)

# EXPERIMENT DELTA - ORC IMAGES [AUGMENTED DATA]
# img_path = os.path.join(core, 'delta', 'sigma_augmented', training_set_, anchor_class_)
# augment_data(img_path)
# img_path = os.path.join(core, 'delta', 'sigma_augmented', training_set_, positive_class_)
# augment_data(img_path)
# img_path = os.path.join(core, 'delta', 'sigma_augmented', training_set_, negative_class_)
# augment_data(img_path)
# img_path = os.path.join(core, 'delta', 'sigma_augmented', validation_set_, anchor_class_)
# augment_data(img_path)
# img_path = os.path.join(core, 'delta', 'sigma_augmented', validation_set_, positive_class_)
# augment_data(img_path)
# img_path = os.path.join(core, 'delta', 'sigma_augmented', validation_set_, negative_class_)
# augment_data(img_path)