In [35]:
import numpy as np
import random
import os
import pickle

from numpy import fft
from astropy.io import fits

from matplotlib import pyplot as plt
from matplotlib import rc


import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Layer, Dense, Flatten, BatchNormalization

BELOW IS A FN THAT PULLS FITS FROM FOLDER IN DIRECTORY AND SAVES THE FITS IMAGE, IMAGE LABEL, AND LABEL DATA OF EACH FITS FILE AND TRUTHCAT FILE

In [25]:
images_folder_path = '/Users/sofie/Desktop/Projects/rwl_sims/ellip_images'
truthcat_folder_path = '/Users/sofie/Desktop/Projects/rwl_sims/ellip_txts'

# Define a function to get the fits file data
def fft_image_e(images_folder_path, truthcat_folder_path):
    # Initialize dictionaries to store values: (item1_name: fits1 image, output_label), (item2_name: fits2 image, output_label), etc.
    fft_data_dict = {}
    real_image_dict = {}
    ellipticity_data_dict = {}

    # Iterate through the images directory to obtain data
    for item in sorted(os.listdir(images_folder_path)):
        if item.endswith('.png') and not item.endswith('psf.png'):
            image_path = os.path.join(images_folder_path, item)
            image = plt.imread(image_path)

            # Fourier transform the  image
            fourier_image_data = fft.fftshift(fft.fft2(image))            

            # Store the fft data in a dictionary
            if item not in fft_data_dict:
                fft_data_dict[item] = {}
            fft_data_dict[item]['fourier_image_data'] = fourier_image_data

            # Store the real image data in a dictionary
            if item not in real_image_dict:
                real_image_dict[item] = {}
            real_image_dict[item]['real_image_data'] = image

    # Extract the output labels (mod_e header in truthcat FITS file)
    for item in os.listdir(truthcat_folder_path):
        if item.startswith('truthcat'):
            truthcat_file_path = os.path.join(truthcat_folder_path, item)
            truthcat_file = fits.open(truthcat_file_path)
            hdr_data = truthcat_file[1].data
           
            # Grab mod_e values
            ellipticity = hdr_data['mod_e']

            # Add values to ellipticity value dictionary
            if item not in ellipticity_data_dict:
                ellipticity_data_dict[item] = {}
            ellipticity_data_dict[item] = ellipticity

            # Close the FITS file
            truthcat_file.close()

    # Add fourier and ellipticity data together to initialize input output label dict
    for key in fft_data_dict:
        fft_first_six_letters = key[:7]
        # print(fft_first_six_letters)
        for elli_key, elli_value in ellipticity_data_dict.items():
            elli_first_six_letters = elli_key[9:16]
            # print(elli_first_six_letters)
            if fft_first_six_letters == elli_first_six_letters:
                for item in elli_value:
                    fft_data_dict[key]['ellipticity'] = item
  
    # print(fft_data_dict)
    
    # Define the path to save the pickle file
    pickle_file_path = '/Users/sofie/Desktop/Projects/Dissertation_Program/test_pkl/example_test_run1.pkl'

    # Dump all dictionaries into a single pickle file
    with open(pickle_file_path, 'wb') as f:
        pickle.dump((fft_data_dict, real_image_dict, ellipticity_data_dict), f)

    return fft_data_dict, real_image_dict, ellipticity_data_dict

INITIALIZING THE FUNCTION

In [26]:
data = fft_image_e(images_folder_path, truthcat_folder_path)
input_output_data_dict = data[0]

INITIALIZING THE INPUT, OUTPUT, AND IMAGE DATA, CONVERTING THE DATA INTO ARRAYS

In [27]:
input_output_data_list = []

# Populate the list with data from the dictionaries
for key, value in input_output_data_dict.items():

    # Extract fourier_image_data value and ellipticity value from the nested dictionaries
    fourier_image_data = value.get('fourier_image_data', None)
    ellipticity = value.get('ellipticity', None)
    
    # Ensure both values are present before appending to input_output_data_array
    if fourier_image_data is not None and ellipticity is not None:
        input_output_data_list.append((fourier_image_data, ellipticity))

APPEND THE DATA TO LISTS TO MAKE IT EASIER TO DEAL WITH

In [28]:
input_fft_data = []
output_e_data = []
for data_tuple in input_output_data_list:
    fourier_image_data = data_tuple[0]
    ellipticity = data_tuple[1]
    input_fft_data.append(fourier_image_data)
    output_e_data.append(ellipticity)

CONVERT THE DATA TO ARRAYS

In [29]:
input_fft_data = np.array(input_fft_data)
output_e_data = np.array(output_e_data)

SPLIT THE DATA INTO TESTING, TRAINING, AND VALIDATION DATA

In [30]:
# Split the data into training and testing data
X_train, X_test, y_train, y_test = train_test_split(
        input_fft_data, output_e_data, test_size=0.33, random_state=42)

#Split the training data into training and validation data
X_train, X_val, y_train, y_val = train_test_split(
        X_train, y_train, test_size=0.25, random_state=42)

INITIALIZE THE HYPERPARAMETERS

In [31]:
# Set the hyperparameters:
NumClasses = 10
BatchLength = 16
Size = [28, 28, 1]
NumIteration = 40001
LearningRate = 1e-4
EvalFreq = 1000
NumKernels = [16, 32, 64]

INITIALIZE THE SPECTRAL POOLING SIZE

In [32]:
# spectral pooling size: (0 is 1/2, 1 is 6/8)
specPoolSize = 0

DEFINE THE FOURIER ACTIVATION FUNCTION

In [33]:
# No change from the template found on GitHub
def fourier_complex_relu(x):
    real = tf.math.real(x)
    imag = tf.math.imag(x)
    return tf.complex(tf.cast(real*real+imag*imag > 0.1, tf.float32)*real, tf.cast(real*real+imag*imag > 0.1, tf.float32)*imag)


DEFINE THE FOURIER CONVOLUTION

In [34]:
# No change from the template found on GitHub
# Setting up a convolution layer to process data already in the Fourier domain
# This has been updated using the automated software put out by TF to go from TF1.x to TF2.x as best it can
# Obvi since this is a custom code, although the fns have been updated, the syntax isn't TF2.x recommended yet
def convolution_in_freq_domain_without_ifft(f_input, out_channels):
    in_shape = f_input.get_shape()

    # Define trainable biases
    bias_r = tf.compat.v1.get_variable('BiasReal', [out_channels], dtype=tf.float32)
    bias_c = tf.compat.v1.get_variable('BiasComp', [out_channels], dtype=tf.float32)
    bias = tf.complex(bias_r, bias_c)

    # Spectral pooling:
    if specPoolSize == 0:
        f_input = tf.slice(f_input, [0, int(in_shape[1] // 4), int(in_shape[2] // 4), 0], 
                           [-1, int(in_shape[1] // 2), int(in_shape[2] // 2), in_shape[-1]])
    elif specPoolSize == 1:
        f_input = tf.slice(f_input, [0, int(in_shape[1] // 8), int(in_shape[2] // 8), 0], 
                           [-1, int(in_shape[1]) - int(2 * in_shape[1] // 8), 
                            int(in_shape[2]) - int(2 * in_shape[2] // 8), in_shape[-1]])
   
    in_shape = f_input.get_shape()
    w_r = tf.compat.v1.get_variable('w_r', [int(in_shape[1]), int(in_shape[2]), int(in_shape[3]), out_channels])
    w_i = tf.compat.v1.get_variable('w_i', [int(in_shape[1]), int(in_shape[2]), int(in_shape[3]), out_channels])
    w = tf.complex(w_r, w_i)

    fourier_kernel = w
    fourier_kernel = tf.tile(tf.expand_dims(fourier_kernel, 0), [BatchLength, 1, 1, 1, 1])
    out = []

    for ind in range(out_channels):
        res = tf.multiply(f_input[:, :, :, :], fourier_kernel[:, :, :, :, ind])
        res = tf.add(res, bias[ind])
        res = tf.expand_dims(tf.reduce_sum(res, 3), -1)
        out.append(res)
    out = tf.concat(out, 3)
    
    norm_real_layer = tf.keras.layers.BatchNormalization()
    norm_comp_layer = tf.keras.layers.BatchNormalization()
    
    norm_real = norm_real_layer(tf.math.real(out), training=True)
    norm_comp = norm_comp_layer(tf.math.imag(out), training=True)
    
    out = tf.complex(norm_real, norm_comp)
    out = fourier_complex_relu(out)
    return out

In [15]:
# Define the Class Model
class FourierCNN(tf.keras.Model):
    def __init__(self):
        super(FourierCNN, self).__init__()
        self.dense1 = keras.layers.Dense(32, activation="relu")
        self.dense2 = keras.layers.Dense(5, activation="softmax")
        self.dropout = keras.layers.Dropout(0.5)

    def call(self, inputs, training=False):
        x = self.dense1(inputs)
        x = self.dropout(x, training=training)
        return self.dense2(x)

model = FourierCNN()

SCRATCH