In [15]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib notebook

import tensorflow as tf
from tensorflow.keras import layers, models, backend

import h5py
from scipy import ndimage

# block averaging for image degradation
def block_mean(ar, fact):
    assert isinstance(fact, int), type(fact)
    sx, sy = ar.shape
    X, Y = np.ogrid[0:sx, 0:sy]
    regions = sy//fact * (X//fact) + Y//fact
    res = ndimage.mean(ar, labels=regions, index=np.arange(regions.max() + 1))
    res.shape = (sx//fact, sy//fact)
    return res



# Read the dataset, normalise and scale

In [16]:
###########################################################
# load galaxy images from hdf5 file
ndown = 4
# use first 256 galaxies as training data
with h5py.File('data/raw_data/DECals_galaxies.hdf5', 'r') as F:
  images = np.array( F['images_spirals'] )
  # make grayscale
  images = np.mean(images, axis=-1)
  # downsample by 4
  imagesd = np.zeros((images.shape[0],images.shape[1]//ndown, images.shape[2]//ndown))
  for i in range( images.shape[0] ):
    imagesd[i,...] = block_mean( images[i,...], ndown )
  images = imagesd / 255

IMAGE_SIZE = images.shape[1]
CHANNELS   = 1
BATCH_SIZE = 128

###########################################################
# split into training and test data (first 32 galaxies are test data)
x_train = images[32:,:,:,None]
x_test  = images[:32,:,:,None]

###########################################################
# enable data augmentation, i.e. we randomly show the images flipped horizontally and vertically when training
data_augmentation = tf.keras.Sequential([
  layers.RandomFlip("horizontal_and_vertical"),
])

###########################################################
#convert numpy arrays to tf.data.Dataset
train_dataset = tf.data.Dataset.from_tensor_slices((x_train,x_train)).batch( BATCH_SIZE )
aug_ds = train_dataset.map( lambda x, y: (data_augmentation(x, training=True), y))

test_dataset = tf.data.Dataset.from_tensor_slices((x_test,x_test)).batch( BATCH_SIZE )

# Define the encoder

In [17]:
# define hyperparameters
EMBEDDING_DIM = 4
EPOCHS = 400

# Encoder
encoder_input = layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, CHANNELS), name="encoder_input")
x = layers.Conv2D(32, (3, 3), strides=2, activation="relu", padding="same")(encoder_input)
x = layers.Conv2D(64, (3, 3), strides=2, activation="relu", padding="same")(x)
x = layers.Conv2D(128, (3, 3), strides=2, activation="relu", padding="same")(x)
shape_before_flattening = backend.int_shape(x)[1:]  # the decoder will need this!
x = layers.Flatten()(x)
encoder_output = layers.Dense(EMBEDDING_DIM, name="encoder_output")(x)
encoder = models.Model(encoder_input, encoder_output)
encoder.summary()

# Define the decoder

In [18]:
# Decoder
decoder_input = layers.Input(shape=(EMBEDDING_DIM,), name="decoder_input")
x = layers.Dense(np.prod(shape_before_flattening))(decoder_input)
x = layers.Reshape(shape_before_flattening)(x)
x = layers.Conv2DTranspose( 128, (3, 3), strides=2, activation="relu", padding="same" )(x)
x = layers.Conv2DTranspose(  64, (3, 3), strides=2, activation="relu", padding="same" )(x)
x = layers.Conv2DTranspose(  32, (3, 3), strides=2, activation="relu", padding="same" )(x)
decoder_output = layers.Conv2D(CHANNELS, (3, 3), strides=1, activation="sigmoid", padding="same", name="decoder_output")(x)
decoder = models.Model(decoder_input, decoder_output)
decoder.summary()

# Assemble the autoencoder

In [19]:
# Autoencoder
autoencoder = models.Model( encoder_input, decoder(encoder_output) ) 
autoencoder.summary()

# Compile the autoencoder
autoencoder.compile(optimizer="adam", loss="mean_squared_error")

# Train the autoencoder

In [20]:
# train the autoencoder
autoencoder.fit( train_dataset, epochs=EPOCHS, shuffle=True, validation_data=test_dataset )

Epoch 1/400
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 247ms/step - loss: 0.1235 - val_loss: 0.1257
Epoch 2/400
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 144ms/step - loss: 0.1219 - val_loss: 0.1236
Epoch 3/400
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 134ms/step - loss: 0.1198 - val_loss: 0.1210
Epoch 4/400
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 140ms/step - loss: 0.1171 - val_loss: 0.1174
Epoch 5/400
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 140ms/step - loss: 0.1134 - val_loss: 0.1123
Epoch 6/400
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 147ms/step - loss: 0.1083 - val_loss: 0.1048
Epoch 7/400
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 140ms/step - loss: 0.1005 - val_loss: 0.0926
Epoch 8/400
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 145ms/step - loss: 0.0875 - val_loss: 0.0693
Epoch 9/400
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━

<keras.src.callbacks.history.History at 0x164a4dd95e0>

In [21]:
# Save the final models
autoencoder.save("./data/results/ex_03/models/autoencoder.keras")
encoder.save("./data/results/ex_03/models/encoder.keras")
decoder.save("./data/results/ex_03/models/decoder.keras")

In [22]:
n_to_predict = 5
example_images = x_test[:n_to_predict]
predictions = autoencoder.predict(example_images)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 170ms/step


In [29]:

fig, ax = plt.subplots(2, n_to_predict, figsize=(8, 3))

plt.suptitle(f'latent space embedding dimensions: {EMBEDDING_DIM}')

for i in range(n_to_predict):
  ax[0, i].imshow(example_images[i, ...], cmap='viridis', vmin=0, vmax=1)
  ax[0, i].axis('off')
  ax[1, i].imshow(predictions[i, ...], cmap='viridis', vmin=0, vmax=1)
  ax[1, i].axis('off')
  
plt.subplots_adjust(wspace=0.1, hspace=0.1)
plt.savefig('./data/results/ex_03/autoencoder/autoencoder_results_dims_4.png')


<IPython.core.display.Javascript object>

In [28]:
import matplotlib.pyplot as plt
import scienceplots

plt.style.use(['science', 'ieee'])

# Pass the images through the encoder to get the latent space representations
latent_space_values = encoder.predict(x_test)

# Plot a histogram for each dimension of the latent space
for i in range(EMBEDDING_DIM):
    plt.figure(figsize=(6, 4))
    plt.hist(latent_space_values[:, i], bins=30)
    plt.xlim(-1.3, 3.1)
    plt.title(f'Latent space dimension {i+1}')
    plt.savefig(f'./data/results/ex_03/autoencoder/latent_space_histogram_{i+1}.png')

# Function to generate an image from a latent space vector
def generate_image(latent_vector, dim, value):
    # Set one of the dimensions to a specific value while setting all others to zero
    modified_vector = np.zeros_like(latent_vector)
    modified_vector[dim] = value

    # Pass the modified vector through the decoder to generate an image
    generated_image = decoder.predict(modified_vector[np.newaxis, :])

    return generated_image[0]

# Generate and show images for each dimension of the latent space
for i in range(EMBEDDING_DIM):
    plt.figure(figsize=(6, 4))
    generated_image = generate_image(latent_space_values[0], i, 1)
    plt.imshow(generated_image, cmap='viridis')
    plt.title(f'Generated image for dimension {i+1}')
    plt.savefig(f'./data/results/ex_03/autoencoder/generated_image_{i+1}.png')

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step


<IPython.core.display.Javascript object>

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step


<IPython.core.display.Javascript object>

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step


<IPython.core.display.Javascript object>

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
