In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import numpy as np
from glob import glob
from PIL import Image, ImageOps
import matplotlib.pyplot as plt
import keras
from keras import layers
from keras.layers import Input,Conv2D,Concatenate
from keras.models import Model
import tensorflow as tf

In [3]:
IMAGE_SIZE = 256
BATCH_SIZE = 32
MAX_TRAIN_IMAGES = 400

PRE_REQUISITE FUNCTION TO GENERATE DATA FROM FOLDERS

In [4]:
def load_data(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.image.resize(images=img, size=[IMAGE_SIZE, IMAGE_SIZE])
    img = img / 255.0  #scaling between [0,1]
    return img

In [9]:
def data_generator(low_light_images):
    data = tf.data.Dataset.from_tensor_slices((low_light_images))
    data = data.map(load_data, num_parallel_calls=tf.data.AUTOTUNE)
    data = data.batch(BATCH_SIZE, drop_remainder=True)
    return data

DATASET PREPARATION

In [21]:
import glob
import tensorflow as tf

# Define the maximum number of training images
MAX_TRAIN_IMAGES = 1000  # example value, set it to your actual requirement

# Sort and split the images for training and validation
train_low_light_images = sorted(glob.glob("/content/drive/MyDrive/Train/low/*"))[:MAX_TRAIN_IMAGES]
val_low_light_images = sorted(glob.glob("/content/drive/MyDrive/Train/low/*"))[MAX_TRAIN_IMAGES:]

# Load test images
test_low_light_images = sorted(glob.glob("/content/drive/MyDrive/Train/low/*"))
test_high_light_images = sorted(glob.glob("/content/drive/MyDrive/Train/high/*"))

# Function to load and preprocess images
def load_and_preprocess_image(path):
    image = tf.io.read_file(path)
    image = tf.image.decode_image(image, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    return image

# Assuming data_generator is a function that prepares datasets
def data_generator(image_paths):
    # Debug: Check types of the elements in image_paths
    print(f"Type of image_paths: {type(image_paths)}")
    print(f"Type of elements in image_paths: {type(image_paths[0]) if image_paths else 'No elements'}")

    if not image_paths:
        print("Warning: Empty image paths list passed to data_generator.")
        return tf.data.Dataset.from_tensor_slices([])  # Return an empty dataset

    # Ensure paths are strings
    image_paths = [str(path) for path in image_paths]

    # Create a TensorFlow dataset from the image paths
    dataset = tf.data.Dataset.from_tensor_slices(image_paths)
    dataset = dataset.map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
    return dataset

# Create training and validation datasets
train_dataset = data_generator(train_low_light_images)
val_dataset = data_generator(val_low_light_images)

# Print datasets (this will print the structure, not the actual images)
print("Train Dataset:", train_dataset)
print("Validation Dataset:", val_dataset)


Type of image_paths: <class 'list'>
Type of elements in image_paths: <class 'str'>
Type of image_paths: <class 'list'>
Type of elements in image_paths: No elements
Train Dataset: <_ParallelMapDataset element_spec=TensorSpec(shape=<unknown>, dtype=tf.float32, name=None)>
Validation Dataset: <_TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.float32, name=None)>


In [22]:
def Build_DCE_NET():
    input_img = Input(shape=[None, None, 3])
    x1 = Conv2D(32, (3, 3), strides=(1, 1), activation="relu", padding="same")(input_img)
    x2 = Conv2D(32, (3, 3), strides=(1, 1), activation="relu", padding="same")(x1)
    x3 = Conv2D(32, (3, 3), strides=(1, 1), activation="relu", padding="same")(x2)
    x4 = Conv2D(32, (3, 3), strides=(1, 1), activation="relu", padding="same")(x3)

    int_x1 = Concatenate(axis=-1)([x4, x3])
    x5 = layers.Conv2D(32, (3, 3), strides=(1, 1), activation="relu", padding="same")(int_x1)

    int_x2 = Concatenate(axis=-1)([x5, x2])
    x6 = Conv2D(32, (3, 3), strides=(1, 1), activation="relu", padding="same")(int_x2)

    int_x3 = Concatenate(axis=-1)([x6, x1])
    y = Conv2D(24, (3, 3), strides=(1, 1), activation="tanh", padding="same")(int_x3)

    return Model(inputs=input_img, outputs=y)

CUSTOM LOSS FUNCTIONS

In [23]:
def color_constancy_loss(x):
    mean_rgb = tf.reduce_mean(x, axis=(1, 2), keepdims=True)
    jr, jg, jb = (
        mean_rgb[:, :, :, 0],
        mean_rgb[:, :, :, 1],
        mean_rgb[:, :, :, 2],
    )
    #ji denotes average intensity of ith channel
    #pairwise taking squares
    diff_rg = tf.square(jr - jg)
    diff_rb = tf.square(jr - jb)
    diff_gb = tf.square(jb - jg)

    L_col = tf.sqrt(tf.square(diff_rg) + tf.square(diff_rb) + tf.square(diff_gb))
    return  L_col

In [24]:
def exposure_loss(x, E=0.6):  # E is the grey level in RGB color generally taken as 0.6 for experiments
    x = tf.reduce_mean(x, axis=3, keepdims=True)
    mean = tf.nn.avg_pool2d(x, ksize=16, strides=16, padding="VALID") #averaging over 16x16 non-overlapping regions
    L_exp = tf.reduce_mean(tf.square(mean - E)) #took the mean of all the values
    return L_exp

In [25]:
def illumination_smoothness_loss(x):
    batch_size = tf.shape(x)[0]
    h_x = tf.shape(x)[1]
    w_x = tf.shape(x)[2]
    count_h = (w_x - 1) * tf.shape(x)[3]
    count_w = w_x * (tf.shape(x)[3] - 1)
    h_tv = tf.reduce_sum(tf.square((x[:, 1:, :, :] - x[:, : h_x - 1, :, :])))
    w_tv = tf.reduce_sum(tf.square((x[:, :, 1:, :] - x[:, :, : w_x - 1, :])))
    batch_size = tf.cast(batch_size, dtype=tf.float32)
    count_h = tf.cast(count_h, dtype=tf.float32)
    count_w = tf.cast(count_w, dtype=tf.float32)
    return 2 * (h_tv / count_h + w_tv / count_w) / batch_size

CUSTOM ZERO DCE MODEL ----> MODIFYING IT'S Properties

In [52]:
class ZeroDCE(keras.Model):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.dce_model = Build_DCE_NET()

    #custom compile function to accomadate custom loss functions
    def compile(self, learning_rate, **kwargs):
        super().compile(**kwargs)
        self.optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
        #self.spatial_constancy_loss = SpatialConsistencyLoss(reduction="none")
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.illumination_smoothness_loss_tracker = keras.metrics.Mean(name="illumination_smoothness_loss")
        self.color_constancy_loss_tracker = keras.metrics.Mean(name="color_constancy_loss")
        self.exposure_loss_tracker = keras.metrics.Mean(name="exposure_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.illumination_smoothness_loss_tracker,
            self.color_constancy_loss_tracker,
            self.exposure_loss_tracker,
        ]

    def get_enhanced_image(self, data, output):
        r1 = output[:, :, :, :3]
        r2 = output[:, :, :, 3:6]
        r3 = output[:, :, :, 6:9]
        r4 = output[:, :, :, 9:12]
        r5 = output[:, :, :, 12:15]
        r6 = output[:, :, :, 15:18]
        r7 = output[:, :, :, 18:21]
        r8 = output[:, :, :, 21:24]
        x = data + r1 * (tf.square(data) - data)
        x = x + r2 * (tf.square(x) - x)
        x = x + r3 * (tf.square(x) - x)
        enhanced_image = x + r4 * (tf.square(x) - x)
        x = enhanced_image + r5 * (tf.square(enhanced_image) - enhanced_image)
        x = x + r6 * (tf.square(x) - x)
        x = x + r7 * (tf.square(x) - x)
        enhanced_image = x + r8 * (tf.square(x) - x)
        return enhanced_image

    def call(self, data):
        dce_net_output = self.dce_model(data)
        return self.get_enhanced_image(data, dce_net_output)

    #lossed are calcualted by multiplying them with their corrosponding weights
    def compute_losses(self, data, output):
        enhanced_image = self.get_enhanced_image(data, output)
        loss_illumination = 200 * illumination_smoothness_loss(output)
        #loss_spatial_constancy = tf.reduce_mean(
            #self.spatial_constancy_loss(enhanced_image, data)
        #)
        loss_color_constancy = 5 * tf.reduce_mean(color_constancy_loss(enhanced_image))
        loss_exposure = 10 * tf.reduce_mean(exposure_loss(enhanced_image))
        total_loss = (loss_illumination+ loss_color_constancy+ loss_exposure)

        return {
            "total_loss": total_loss,
            "illumination_smoothness_loss": loss_illumination,
            "color_constancy_loss": loss_color_constancy,
            "exposure_loss": loss_exposure,
        }

    def train_step(self, data):
        with tf.GradientTape() as tape:
            output = self.dce_model(data)
            losses = self.compute_losses(data, output)

        gradients = tape.gradient(losses["total_loss"], self.dce_model.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.dce_model.trainable_weights)) #weights are updated accroding to the gradients

        #every loss is updated
        self.total_loss_tracker.update_state(losses["total_loss"]) #total loss status updated
        self.illumination_smoothness_loss_tracker.update_state(losses["illumination_smoothness_loss"])
        self.color_constancy_loss_tracker.update_state(losses["color_constancy_loss"])
        self.exposure_loss_tracker.update_state(losses["exposure_loss"])

        return {metric.name: metric.result() for metric in self.metrics}

    def test_step(self, data):
        output = self.dce_model(data)
        losses = self.compute_losses(data, output)

        self.total_loss_tracker.update_state(losses["total_loss"])
        self.illumination_smoothness_loss_tracker.update_state(
            losses["illumination_smoothness_loss"]
        )
        self.color_constancy_loss_tracker.update_state(losses["color_constancy_loss"])
        self.exposure_loss_tracker.update_state(losses["exposure_loss"])

        return {metric.name: metric.result() for metric in self.metrics}

In [46]:
def plot(images,titles,figure_size = (10,10)):
    fig = plt.figure(figsize=figure_size)
    for i in range(len(images)):
        fig.add_subplot(1, len(images), i + 1).set_title(titles[i])
        _fig = plt.imshow(images[i])
        plt.axis("off")
    plt.show()

In [47]:
def calculate_psnr(original_image, enhanced_image):
  # Convert the images to numpy arrays.
  original_image = np.array(original_image)
  enhanced_image = np.array(enhanced_image)
  # Calculate the mean squared error (MSE) between the two images.
  mse = np.mean((original_image - enhanced_image) ** 2)
  # Calculate the peak signal-to-noise ratio (PSNR).
  psnr = 10 * np.log10(255 ** 2 / mse)
  return psnr

In [55]:
Image_Enhancer = ZeroDCE()
Image_Enhancer.compile(learning_rate = 1e-4)
history = Image_Enhancer.fit(train_dataset, validation_data=val_dataset, epochs=50)


Epoch 1/50


ValueError: in user code:

    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1401, in train_function  *
        return step_function(self, iterator)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1384, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 1373, in run_step  **
        outputs = model.train_step(data)
    File "<ipython-input-52-d00de7689abc>", line 46, in train_step
        self.optimizer.apply_gradients(zip(gradients, self.dce_model.trainable_weights))
    File "/usr/local/lib/python3.10/dist-packages/keras/src/optimizers/optimizer.py", line 1222, in apply_gradients
        grads_and_vars = self.aggregate_gradients(grads_and_vars)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/optimizers/optimizer.py", line 1184, in aggregate_gradients
        return optimizer_utils.all_reduce_sum_gradients(grads_and_vars)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/optimizers/utils.py", line 33, in all_reduce_sum_gradients
        filtered_grads_and_vars = filter_empty_gradients(grads_and_vars)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/optimizers/utils.py", line 77, in filter_empty_gradients
        raise ValueError(

    ValueError: No gradients provided for any variable: (['conv2d_49/kernel:0', 'conv2d_49/bias:0', 'conv2d_50/kernel:0', 'conv2d_50/bias:0', 'conv2d_51/kernel:0', 'conv2d_51/bias:0', 'conv2d_52/kernel:0', 'conv2d_52/bias:0', 'conv2d_53/kernel:0', 'conv2d_53/bias:0', 'conv2d_54/kernel:0', 'conv2d_54/bias:0', 'conv2d_55/kernel:0', 'conv2d_55/bias:0'],). Provided `grads_and_vars` is ((None, <tf.Variable 'conv2d_49/kernel:0' shape=(3, 3, 3, 32) dtype=float32>), (None, <tf.Variable 'conv2d_49/bias:0' shape=(32,) dtype=float32>), (None, <tf.Variable 'conv2d_50/kernel:0' shape=(3, 3, 32, 32) dtype=float32>), (None, <tf.Variable 'conv2d_50/bias:0' shape=(32,) dtype=float32>), (None, <tf.Variable 'conv2d_51/kernel:0' shape=(3, 3, 32, 32) dtype=float32>), (None, <tf.Variable 'conv2d_51/bias:0' shape=(32,) dtype=float32>), (None, <tf.Variable 'conv2d_52/kernel:0' shape=(3, 3, 32, 32) dtype=float32>), (None, <tf.Variable 'conv2d_52/bias:0' shape=(32,) dtype=float32>), (None, <tf.Variable 'conv2d_53/kernel:0' shape=(3, 3, 64, 32) dtype=float32>), (None, <tf.Variable 'conv2d_53/bias:0' shape=(32,) dtype=float32>), (None, <tf.Variable 'conv2d_54/kernel:0' shape=(3, 3, 64, 32) dtype=float32>), (None, <tf.Variable 'conv2d_54/bias:0' shape=(32,) dtype=float32>), (None, <tf.Variable 'conv2d_55/kernel:0' shape=(3, 3, 64, 24) dtype=float32>), (None, <tf.Variable 'conv2d_55/bias:0' shape=(24,) dtype=float32>)).


In [None]:
def low_to_high_light(original_image):
    image = keras.utils.img_to_array(original_image)
    image = image.astype("float32") / 255.0
    image = np.expand_dims(image, axis=0)
    output_image = Image_Enhancer(image)
    output_image = tf.cast((output_image[0, :, :, :] * 255), dtype=np.uint8)
    output_image = Image.fromarray(output_image.numpy())
    return output_image

Displaying Results for 10 images

In [None]:
for val_image_file in test_low_light_images[:10]:
    original_image = Image.open(val_image_file)
    enhanced_image = low_to_high_light(original_image)
    plot(
        [original_image,enhanced_image],
        ["Original","Enhanced"],
        (20, 12),
    )

In [None]:
psnr_ratio = []
for i in range(len(test_low_light_images)):
    low_light_image = Image.open(test_low_light_images[i])
    enhanced_image = low_to_high_light(low_light_image)
    high_light_image = Image.open(test_high_light_images[i])
    psnr = calculate_psnr(high_light_image, enhanced_image)
    psnr_ratio.append(psnr)
    #print(f"PSNR for {test_low_light_images[i]}: {psnr}")

In [None]:
np.average(psnr_ratio)