# Imports

In [8]:
import os
import shutil
import numpy as np
import cv2
import random
import warnings
import tensorflow as tf
warnings.filterwarnings('ignore')
warnings.filterwarnings("ignore", category=DeprecationWarning, module="tensorflow")

## Blur, darken and augment photos

In [9]:
def random_blur(image): # blur the images
    ksize = np.random.choice(range(5, 35, 2))
    image = cv2.GaussianBlur(image, (ksize, ksize), 0)
    return image

def random_darken(image): # darken the images
    factor = np.random.uniform(0.3, 0.7)
    image = np.clip(image * factor, 0, 255).astype(np.uint8)
    return image

def distort_image(image): # Randomly decide to blur or darken
    if np.random.rand() > 0.5:
        image = random_blur(image)
    else:
        image = random_darken(image)
    return image
    
def random_augmentation(image): # augment the dataset with flips, rotation and contrast adjust for improving our results
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    image = tf.image.rot90(image, tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32))
    image = tf.image.random_contrast(image, lower=0.5, upper=1.5)
    return image

### Test if the functions are producing the desired output

In [10]:
image = cv2.imread('test.jpg')
blurred_image = random_blur(image)
darkened_image = random_darken(image)

cv2.imwrite('blurred_test.jpg', blurred_image)
cv2.imwrite('darkened_test.jpg', darkened_image)

error: OpenCV(4.10.0) D:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\smooth.dispatch.cpp:617: error: (-215:Assertion failed) !_src.empty() in function 'cv::GaussianBlur'


In [14]:
### Prepare the input data

In [36]:
def process_path(image_path):
    image_path = image_path.numpy().decode('utf-8')
    original_image = cv2.imread(image_path)
    original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)

    # distortions
    distorted_image = distort_image(original_image.copy())

    # resize to 256x256
    original_image = cv2.resize(original_image, (256, 256))
    distorted_image = cv2.resize(distorted_image, (256, 256))

    # normalize
    original_image = original_image / 255.0
    distorted_image = distorted_image / 255.0

    return distorted_image.astype(np.float32), original_image.astype(np.float32)

def tf_process_path(image_path):
    distorted_image, original_image = tf.py_function(process_path, [image_path], [tf.float32, tf.float32])
    distorted_image.set_shape([256, 256, 3])
    original_image.set_shape([256, 256, 3])
    return distorted_image, original_image

def load_dataset(image_dir, batch_size=16):
    image_paths = [os.path.join(image_dir, fname) for fname in os.listdir(image_dir)]
    dataset = tf.data.Dataset.from_tensor_slices(image_paths)
    dataset = dataset.map(tf_process_path, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(buffer_size=len(image_paths) * 2)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    return dataset   

train_dir = 'train_test'
validation_dir = 'val_test'

train_dataset = load_dataset(train_dir, batch_size=16)
validation_dataset = load_dataset(validation_dir, batch_size=16)

In [37]:
print(len(train_dataset), len(validation_dataset))

10 2


# UNet Architecture

In [38]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, Concatenate
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import ModelCheckpoint

In [39]:
def unet_model(input_size=(256, 256, 3)):
    inputs = Input(input_size)

    # encoder
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = Conv2D(512, 3, activation='relu', padding='same')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same')(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)

    # bottleneck
    conv5 = Conv2D(1024, 3, activation='relu', padding='same')(pool4)
    conv5 = Conv2D(1024, 3, activation='relu', padding='same')(conv5)

    # decoder
    up6 = Concatenate()([UpSampling2D(size=(2, 2))(conv5), conv4])
    conv6 = Conv2D(512, 3, activation='relu', padding='same')(up6)
    conv6 = Conv2D(512, 3, activation='relu', padding='same')(conv6)

    up7 = Concatenate()([UpSampling2D(size=(2, 2))(conv6), conv3])
    conv7 = Conv2D(256, 3, activation='relu', padding='same')(up7)
    conv7 = Conv2D(256, 3, activation='relu', padding='same')(conv7)

    up8 = Concatenate()([UpSampling2D(size=(2, 2))(conv7), conv2])
    conv8 = Conv2D(128, 3, activation='relu', padding='same')(up8)
    conv8 = Conv2D(128, 3, activation='relu', padding='same')(conv8)

    up9 = Concatenate()([UpSampling2D(size=(2, 2))(conv8), conv1])
    conv9 = Conv2D(64, 3, activation='relu', padding='same')(up9)
    conv9 = Conv2D(64, 3, activation='relu', padding='same')(conv9)

    outputs = Conv2D(3, 1, activation='sigmoid')(conv9)

    model = tf.keras.Model(inputs=[inputs], outputs=[outputs])

    return model
    
checkpoint_path = "../models/checkpoint_unet_model.keras"
checkpoint = ModelCheckpoint(checkpoint_path, monitor='val_loss', save_best_only=True, mode='min', verbose=1)

model = unet_model()
model.compile(optimizer='adam', loss='mean_squared_error', metrics=['accuracy'])

In [None]:
history = model.fit(train_dataset, epochs=50, batch_size=16, validation_data=validation_dataset)

Epoch 1/50


In [None]:
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
evaluation = model.evaluate(validation_generator)
print(f"Validation Loss: {evaluation[0]}")
print(f"Validation Accuracy: {evaluation[1]}")

## TESTING HERE

In [None]:
model.save('../models/unet_model.keras')

In [None]:
from tensorflow.keras.models import load_model
model = load_model(''../models/unet_model.keras')

In [None]:
test_image = cv2.imread('path/to/test/image.jpg')
test_image = cv2.resize(test_image, (256, 256))  # Resize to the input size of the model
test_image = test_image / 255.0  # Normalize if necessary
test_image = np.expand_dims(test_image, axis=0)  # Add batch dimension

# Predict the restored image
restored_image = model.predict(test_image)

# Postprocess and display the restored image
restored_image = np.squeeze(restored_image)  # Remove batch dimension
restored_image = restored_image * 255.0  # Denormalize if necessary
restored_image = restored_image.astype(np.uint8)

# Display the result
cv2.imshow('Restored Image', restored_image)
cv2.waitKey(0)
cv2.destroyAllWindows()