In [1]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
#Here you can find image shadow generation functions, along with the declaration and compilation of the U-Net model
def load_images_from_folder(folder, indices):
    images = []
    for filename in os.listdir(folder):
        if not filename.endswith("_radiance.tif"):
            print("Invalid filename format:", filename)
            continue
        image_index = filename.split('_')[-2]
        if int(image_index) not in indices:
           continue
        img = cv2.imread(os.path.join(folder, filename),cv2.IMREAD_UNCHANGED)
        if img is not None:
            img = (img / 1000).astype('float32')
            images.append(img)
            print("Loaded image:", filename)
    return  np.array(images)


def resize_images(images, target_size=(128, 128)):
    return np.array([cv2.resize(img, target_size) for img in images])

def uneven_illumination(image, max_illumination=1.2, min_illumination=-0.5, smoothness=75):
    image = image.astype(float)
    rows, cols = image.shape[:2]
    gradient_type = np.random.choice(['linear', 'circular', 'diagonal'])

    if gradient_type == 'linear':
        start, end = np.random.randint(0, cols, 2)
        start, end = min(start, end), max(start, end)
        mask = np.ones((rows, cols))
        mask[:, start:end] = np.linspace(min_illumination, max_illumination, end - start).reshape(1, -1)
    elif gradient_type == 'circular':
        center = [np.random.randint(low=0, high=rows), np.random.randint(low=0, high=cols)]
        mask = np.zeros((rows, cols))
        max_distance_to_center = max(np.sqrt((rows - center[0]) ** 2 + (cols - center[1]) ** 2), np.sqrt(center[0] ** 2 + center[1] ** 2))
        for i in range(rows):
            for j in range(cols):
                distance_to_center = np.sqrt((i - center[0]) ** 2 + (j - center[1]) ** 2)
                mask[i, j] = min_illumination + (max_illumination - min_illumination) * (distance_to_center / max_distance_to_center)
    elif gradient_type == 'diagonal':
        mask = np.zeros((rows, cols))
        diagonal_start = np.random.choice(['top_left', 'top_right', 'bottom_left', 'bottom_right'])
        for i in range(rows):
            for j in range(cols):
                if diagonal_start == 'top_left':
                    mask[i, j] = min_illumination + (max_illumination - min_illumination) * ((i + j) / (rows + cols - 2))
                elif diagonal_start == 'top_right':
                    mask[i, j] = min_illumination + (max_illumination - min_illumination) * ((i + (cols - j)) / (rows + cols - 2))
                elif diagonal_start == 'bottom_left':
                    mask[i, j] = min_illumination + (max_illumination - min_illumination) * (((rows - i) + j) / (rows + cols - 2))
                elif diagonal_start == 'bottom_right':
                    mask[i, j] = min_illumination + (max_illumination - min_illumination) * (((rows - i) + (cols - j)) / (rows + cols - 2))

    mask = cv2.GaussianBlur(mask, (smoothness, smoothness), 0)


    image_masked = cv2.multiply(image, mask)
    return image_masked, mask

def generate_noisy_images_and_masks(images,num_samples=1500):
    noisy_images = []
    illumination_masks = []
    original_indices = []
    for idx, img in enumerate(images):
        for _ in range(num_samples):
            max_illumination = np.random.uniform(0.7, 2)
            min_illumination = np.random.uniform(-0.5, max_illumination)
            noisy_image, illumination_mask = uneven_illumination(img, max_illumination, min_illumination)

            noisy_images.append(noisy_image)
            illumination_masks.append(illumination_mask)
            original_indices.append(idx)

    return np.array(noisy_images), np.array(illumination_masks), np.array(original_indices)

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.regularizers import l2


def unet_model(input_size=(128, 128, 1)):
    inputs = Input(input_size)

    conv1 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(0.001))(inputs)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(0.001))(pool1)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(0.001))(pool2)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(0.001))(pool3)

    up1 = Conv2D(512, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv4))
    merge1 = concatenate([conv3, up1], axis=3)
    conv5 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge1)

    up2 = Conv2D(256, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv5))
    merge2 = concatenate([conv2, up2], axis=3)
    conv6 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge2)

    up3 = Conv2D(128, 2, activation='relu', padding='same', kernel_initializer='he_normal')(UpSampling2D(size=(2, 2))(conv6))
    merge3 = concatenate([conv1, up3], axis=3)
    conv7 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge3)

    conv8 = Conv2D(1, 1)(conv7)

    model = Model(inputs=inputs, outputs=conv8)
    return model

In [2]:
import os
import time

mainout="/content"
collection = "/content" #change this too
folder="/content"#change this too

collection_path =mainout+collection+folder
if not os.path.exists(collection_path):
    os.makedirs(collection_path)
    print(f"Created collection directory: {collection_path}")
else:
    print(f"Collection directory already exists: {collection_path}")
folder_path = os.path.join(collection_path, folder)
if not os.path.exists(folder_path):
    os.makedirs(folder_path)
    print(f"Created folder directory: {folder_path}")
else:
    print(f"Folder directory already exists: {folder_path}")

indices = [2]
#indices = [5]
# you can run several models given the image name has an indice
#In the case of the Miacasense Red-edge sensor, each individual band is numbered 1 to 5
model = unet_model()
model.compile(optimizer=Adam(lr=1e-4), loss='MeanSquaredError', metrics=['MAE'])

early_stopping = EarlyStopping(patience=15, verbose=1)

for index in indices:
    x_train = load_images_from_folder('/content', [index])
    x_train = resize_images(x_train)

    x_train_noisy, x_train_masks, original_indices = generate_noisy_images_and_masks(x_train, num_samples=1500)

    x_train_noisy, x_test_noisy, x_train_masks, x_test_masks, train_indices, test_indices = train_test_split(
        x_train_noisy, x_train_masks, original_indices, test_size=0.1)

    x_train = x_train[train_indices]
    x_test = x_train[test_indices]

    x_train_noisy = x_train_noisy[..., np.newaxis]
    x_test_noisy = x_test_noisy[..., np.newaxis]
    x_train_masks = x_train_masks[..., np.newaxis]
    x_test_masks = x_test_masks[..., np.newaxis]

    current_x_train_noisy = x_train_noisy
    current_x_train_masks = x_train_masks
    current_x_test_noisy = x_test_noisy
    current_x_test_masks = x_test_masks

    model_checkpoint = ModelCheckpoint(os.path.join(collection_path, f'unet_index_{index}.hdf5'),
                                   monitor='loss', verbose=1, save_best_only=True)
    time.sleep(8)
    model.fit(x_train_noisy, x_train_masks,
              epochs=25,
              batch_size=32,
              shuffle=True,
              callbacks=[model_checkpoint, early_stopping],
              validation_data=(x_test_noisy, x_test_masks))
    del x_train
    del x_train_noisy
    del x_train_masks
    del x_test_noisy
    del x_test_masks
    del train_indices
    del test_indices
    del current_x_train_noisy
    del current_x_train_masks
    del current_x_test_noisy
    del current_x_test_masks
    del model_checkpoint

Created collection directory: /content/content/content
Folder directory already exists: /content




Invalid filename format: .config
Invalid filename format: .ipynb_checkpoints
Invalid filename format: content
Loaded image: IMG_0458_2_radiance.tif
Invalid filename format: sample_data
Epoch 1/25
Epoch 1: loss improved from inf to 5.86107, saving model to /content/content/content/unet_index_2.hdf5
Epoch 2/25


  saving_api.save_model(


Epoch 2: loss improved from 5.86107 to 1.36723, saving model to /content/content/content/unet_index_2.hdf5
Epoch 3/25
Epoch 3: loss improved from 1.36723 to 1.22426, saving model to /content/content/content/unet_index_2.hdf5
Epoch 4/25
Epoch 4: loss improved from 1.22426 to 1.13360, saving model to /content/content/content/unet_index_2.hdf5
Epoch 5/25
Epoch 5: loss improved from 1.13360 to 1.07206, saving model to /content/content/content/unet_index_2.hdf5
Epoch 6/25
Epoch 6: loss improved from 1.07206 to 1.01759, saving model to /content/content/content/unet_index_2.hdf5
Epoch 7/25
Epoch 7: loss improved from 1.01759 to 0.97749, saving model to /content/content/content/unet_index_2.hdf5
Epoch 8/25
Epoch 8: loss improved from 0.97749 to 0.93737, saving model to /content/content/content/unet_index_2.hdf5
Epoch 9/25
Epoch 9: loss improved from 0.93737 to 0.90415, saving model to /content/content/content/unet_index_2.hdf5
Epoch 10/25
Epoch 10: loss improved from 0.90415 to 0.87591, saving

In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
#process and save several images

def process_and_save_images(input_folder, output_folder, model):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for filename in os.listdir(input_folder):
        if filename.endswith('.tif'):
            img_path = os.path.join(input_folder, filename)

            original_image = cv2.imread(img_path, cv2.IMREAD_UNCHANGED)

            img = original_image.astype('float32') / 1000

            target_size = (128, 128)
            img_resized = cv2.resize(img, target_size)

            img_resized = img_resized[np.newaxis, ..., np.newaxis]

            predicted_mask = model.predict(img_resized)

            predicted_mask = np.squeeze(predicted_mask)

            predicted_mask_resized = cv2.resize(predicted_mask, (original_image.shape[1], original_image.shape[0]))

            corrected_image = original_image / predicted_mask_resized

            output_filename = os.path.splitext(filename)[0] + '_corrected.tif'
            output_path = os.path.join(output_folder, output_filename)

            cv2.imwrite(output_path, corrected_image.astype('uint16'))

            plt.figure(figsize=(10, 5))
            plt.imshow(predicted_mask_resized, cmap='gray')
            plt.title('Predicted Mask')
            plt.show()

            plt.figure(figsize=(10, 5))
            plt.imshow(corrected_image, cmap='gray')
            plt.title('Corrected Image')
            plt.show()

            plt.figure(figsize=(10, 5))
            plt.imshow(original_image, cmap='gray')
            plt.title('Original Image')
            plt.show()

input_folder = '/content/To_correct'
output_folder = '/content/corrected'

process_and_save_images(input_folder,output_folder, model)