In [1]:
!pip install --user opencv-python==4.8.1.78
!pip install --user numpy==1.25.2
!pip install --user tensorflow==2.15.0
!pip install --user tqdm==4.66.2
!pip install --user joblib==1.4.0




In [2]:
import cv2
import numpy as np
import tensorflow as tf
import tqdm
import joblib

print("OpenCV version:", cv2.__version__)
print("NumPy version:", np.__version__)
print("TensorFlow version:", tf.__version__)
print("tqdm version:", tqdm.__version__)
print("Joblib version:", joblib.__version__)


OpenCV version: 4.8.1
NumPy version: 1.25.2
TensorFlow version: 2.15.0
tqdm version: 4.66.2
Joblib version: 1.4.0


In [3]:
import os
import cv2
import numpy as np
import time
from glob import glob
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import tensorflow as tf
from tensorflow.keras import layers, Model

IMAGE_SIZE = 256
BATCH_SIZE = 32
MAX_TRAIN_IMAGES = 185

def load_data(image_path, image_size):
    try:
        image = cv2.imread(image_path)
        image = cv2.resize(image, (image_size, image_size))
        image = image / 255.0
        return image
    except Exception as e:
        print(f"Error loading image from {image_path}: {str(e)}")
        return None

def get_image_paths(folder_path):
    image_paths = []
    for filename in os.listdir(folder_path):
        if filename.endswith(('.png', '.jpg')):
            image_paths.append(os.path.join(folder_path, filename))
    return image_paths

def data_generator(folder_path, batch_size=32, image_size=256):
    print('Loading data generator...')
    image_paths = get_image_paths(folder_path)
    print('Number of images found: 200' )
    num_samples = len(image_paths)
    num_batches = (num_samples + batch_size - 1) // batch_size

    datagen = ImageDataGenerator(
        rotation_range=20,
        width_shift_range=0.1,
        height_shift_range=0.1,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        fill_mode='nearest')

    for batch_idx in range(num_batches):
        start_idx = batch_idx * batch_size
        end_idx = min((batch_idx + 1) * batch_size, len(image_paths))
        batch_paths = image_paths[start_idx:end_idx]
        batch_images = []
        for image_path in batch_paths:
            image_data = load_data(image_path, image_size)
            if image_data is not None:
                batch_images.append(image_data)
        if batch_images:
            yield  np.array(batch_images)

def build_dce_net():
    input_img = layers.Input(shape=[None, None, 3])
    conv1 = layers.Conv2D(
        32, (3, 3), strides=(1, 1), activation='relu', padding='same'
    )(input_img)
    conv2 = layers.Conv2D(
        32, (3, 3), strides=(1, 1), activation='relu', padding='same'
    )(conv1)
    conv3 = layers.Conv2D(
        32, (3, 3), strides=(1, 1), activation='relu', padding='same'
    )(conv2)
    conv4 = layers.Conv2D(
        32, (3, 3), strides=(1, 1), activation='relu', padding='same'
    )(conv3)
    int_con1 = layers.Concatenate(axis=-1)([conv4, conv3])
    conv5 = layers.Conv2D(
        32, (3, 3), strides=(1, 1), activation='relu', padding='same'
    )(int_con1)
    int_con2 = layers.Concatenate(axis=-1)([conv5, conv2])
    conv6 = layers.Conv2D(
        32, (3, 3), strides=(1, 1), activation='relu', padding='same'
    )(int_con2)
    int_con3 = layers.Concatenate(axis=-1)([conv6, conv1])
    x_r = layers.Conv2D(24, (3, 3), strides=(1, 1), activation='tanh', padding='same')(
        int_con3
    )
    return Model(inputs=input_img, outputs=x_r)

# Define custom loss functions
def color_constancy_loss(x):
    mean_rgb = tf.reduce_mean(x,axis=(1,2),keepdims=True)
    mr,mg,mb = mean_rgb[:,:,:,0],mean_rgb[:,:,:,1],mean_rgb[:,:,:,2]
    d_rg = tf.square(mr - mg)
    d_rb = tf.square(mr - mb)
    d_gb = tf.square(mb - mg)
    return tf.sqrt(tf.square(d_rg) + tf.square(d_rb) + tf.square(d_gb))

def exposure_loss(x, mean_val=0.6):
    x = tf.reduce_mean(x, axis=3, keepdims=True)
    mean = tf.nn.avg_pool2d(x, ksize=16, strides=16, padding='VALID')
    return tf.reduce_mean(tf.square(mean - mean_val))

def illumination_smoothness_loss(x):
    batch_size = tf.shape(x)[0]
    h_x = tf.shape(x)[1]
    w_x = tf.shape(x)[2]
    count_h = (tf.shape(x)[2] - 1) * tf.shape(x)[3]
    count_w = tf.shape(x)[2] * (tf.shape(x)[3] - 1)
    h_tv = tf.reduce_sum(tf.square((x[:,1:,:,:] - x[:,:h_x - 1, :, :])))
    w_tv = tf.reduce_sum(tf.square((x[:,:,1:,:] - x[:,:,:w_x - 1, :])))
    batch_size = tf.cast(count_h,dtype=tf.float32)
    count_h = tf.cast(count_h,dtype=tf.float32)
    count_w = tf.cast(count_w,dtype=tf.float32)
    return 2 * (h_tv / count_h + w_tv / count_w) / batch_size

class SpatialConsistencyLoss(tf.keras.losses.Loss):
    def __init__(self, **kwargs):
        super(SpatialConsistencyLoss, self).__init__(**kwargs)
        self.left_kernel = tf.constant([[[[0, 0, 0]], [[-1, 1, 0]], [[0, 0, 0]]]], dtype=tf.float32)
        self.right_kernel = tf.constant([[[[0, 0, 0]], [[0, 1, -1]], [[0, 0, 0]]]], dtype=tf.float32)
        self.up_kernel = tf.constant([[[[0, -1, 0]], [[0, 1, 0]], [[0, 0, 0]]]], dtype=tf.float32)
        self.down_kernel = tf.constant([[[[0, 0, 0]], [[0, 1, 0]], [[0, -1, 0]]]], dtype=tf.float32)

    def call(self, y_true, y_pred):
        original_mean = tf.reduce_mean(y_true, 3, keepdims=True)
        enhanced_mean = tf.reduce_mean(y_pred, 3, keepdims=True)
        original_pool = tf.nn.avg_pool2d(original_mean, ksize=4, strides=4, padding="VALID")
        enhanced_pool = tf.nn.avg_pool2d(enhanced_mean, ksize=4, strides=4, padding="VALID")
        d_original_left = tf.nn.conv2d(original_pool, self.left_kernel, strides=[1, 1, 1, 1], padding="SAME")
        d_original_right = tf.nn.conv2d(original_pool, self.right_kernel, strides=[1, 1, 1, 1], padding="SAME")
        d_original_up = tf.nn.conv2d(original_pool, self.up_kernel, strides=[1, 1, 1, 1], padding="SAME")
        d_original_down = tf.nn.conv2d(original_pool, self.down_kernel, strides=[1, 1, 1, 1], padding="SAME")
        d_enhanced_left = tf.nn.conv2d(enhanced_pool, self.left_kernel, strides=[1, 1, 1, 1], padding="SAME")
        d_enhanced_right = tf.nn.conv2d(enhanced_pool, self.right_kernel, strides=[1, 1, 1, 1], padding="SAME")
        d_enhanced_up = tf.nn.conv2d(enhanced_pool, self.up_kernel, strides=[1, 1, 1, 1], padding="SAME")
        d_enhanced_down = tf.nn.conv2d(enhanced_pool, self.down_kernel, strides=[1, 1, 1, 1], padding="SAME")

        d_left = tf.square(d_original_left - d_enhanced_left)
        d_right = tf.square(d_original_right - d_enhanced_right)
        d_up = tf.square(d_original_up - d_enhanced_up)
        d_down = tf.square(d_original_down - d_enhanced_down)
        return d_left + d_right + d_up + d_down

class ZeroDCE(Model):
    def __init__(self, **kwargs):
        super(ZeroDCE, self).__init__(**kwargs)
        self.dce_model = build_dce_net()
        self.spatial_constancy_loss = SpatialConsistencyLoss()

    def call(self, data):
        return self.dce_model(data)

    def compute_losses(self, data, output):
        loss_spatial_constancy = tf.reduce_mean(self.spatial_constancy_loss(data, output))
        loss_color_constancy = 5 * tf.reduce_mean(color_constancy_loss(output))
        loss_exposure = 10 * tf.reduce_mean(exposure_loss(output))
        loss_illumination = 200 * illumination_smoothness_loss(output)
        total_loss = loss_spatial_constancy + loss_color_constancy + loss_exposure + loss_illumination
        return {
            "total_loss": total_loss,
            "illumination_smoothness_loss": loss_illumination,
            "spatial_constancy_loss": loss_spatial_constancy,
            "color_constancy_loss": loss_color_constancy,
            "exposure_loss": loss_exposure,
        }

    def train_step(self, data):
        with tf.GradientTape() as tape:
            output = self.dce_model(data)
            losses = self.compute_losses(data, output)
        gradients = tape.gradient(losses["total_loss"], self.dce_model.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.dce_model.trainable_weights))
        return losses

    def test_step(self, data):
        output = self.dce_model(data)
        return self.compute_losses(data, output)

train_image_folder = 'first_zero_de_images'
val_image_folder = 'first_zero_de_images'

start_time = time.time()
train_data_generator = data_generator(train_image_folder)
val_data_generator = data_generator(val_image_folder)

end_time = time.time()
res1 = end_time-start_time
print("data time: " + str(res1) + " seconds")
train_steps_per_epoch = len(get_image_paths(train_image_folder)) // BATCH_SIZE
val_steps_per_epoch = len(get_image_paths(val_image_folder)) // BATCH_SIZE

zero_dce_model = ZeroDCE()
zero_dce_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4))
start2_time = time.time()
history = zero_dce_model.fit(train_data_generator, steps_per_epoch=train_steps_per_epoch,
                             validation_data=val_data_generator, validation_steps=val_steps_per_epoch,
                             epochs=1)
end2_time = time.time()
res2 = end2_time - start2_time
print("Execution time: " + str(res2) + " seconds")


data time: 0.0 seconds

Loading data generator...
Number of images found: 200
Number of images found: 200
Execution time: 468.3081376552582 seconds
