In [None]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

import keras
from keras import backend as K
import numpy as np
import os
import matplotlib.pylab as plt

P_MODELSAVE = 'saved_models'
P_LOGS = 'logs'
P_IMGSAVE = 'saved_images'

dirs = [P_MODELSAVE, P_LOGS, P_IMGSAVE]

for d in dirs:
    if not os.path.exists(d):
        os.makedirs(d)
        

dataset_path = '/kaggle/input/dataset-explo/bsds500/images'
batch_size = 20
epochs = 150
input_shape = (256, 256)
noise_factor = 1


# the path to save the weight of the model
# saved_weight = os.path.join(P_MODELSAVE, 'dataweights.{epoch:02d}-{val_acc:.2f}.hdf5')
# saved_weight = os.path.join(P_MODELSAVE, 'dataweights.{epoch:02d}-{val_acc:.2f}.hdf5.keras')
saved_weight = os.path.join(P_MODELSAVE, 'dataweights.{epoch:02d}-{val_accuracy:.2f}.hdf5.keras')



from PIL import Image as pil_image

def random_crop(img, random_crop_size):
    width, height = img.size # PIL format
    dx, dy = random_crop_size
    x = np.random.randint(0, width - dx + 1)
    y = np.random.randint(0, height - dy + 1)
    return img.crop((x, y, x+dx, y+dy))


def load_img_extended(path, grayscale=False, color_mode='rgb', target_size=None,
                      interpolation='nearest'):
    if grayscale is True:
        warnings.warn('grayscale is deprecated. Please use '
                      'color_mode = "grayscale"')
        color_mode = 'grayscale'
    if pil_image is None:
        raise ImportError('Could not import PIL.Image. '
                          'The use of `array_to_img` requires PIL.')
    img = pil_image.open(path)
    if color_mode == 'grayscale':
        if img.mode != 'L':
            img = img.convert('L')
    elif color_mode == 'rgba':
        if img.mode != 'RGBA':
            img = img.convert('RGBA')
    elif color_mode == 'rgb':
        if img.mode != 'RGB':
            img = img.convert('RGB')
    else:
        raise ValueError('color_mode must be "grayscale", "rbg", or "rgba"')
    
    if target_size is not None:
        width_height_tuple = (target_size[1], target_size[0])
        if img.size != width_height_tuple:
            img = random_crop(img, width_height_tuple) # here comes the magic
    return img


keras.preprocessing.image.load_img = load_img_extended

# from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.image import ImageDataGenerator


data_gen_args = dict(
#     featurewise_center=True,
#     featurewise_std_normalization=True,
#     rotation_range=20,
#     width_shift_range=0.2,
#     height_shift_range=0.2,
#     brightness_range=[0.5, 1.2],
#     shear_range=0.01,
#     horizontal_flip=True,
    rescale=1/255,
    fill_mode='reflect',
    data_format='channels_last')

data_flow_args = dict(
    target_size=input_shape,
    batch_size=batch_size,
    class_mode='input') # Since we want to reconstruct the input


class AddGaussianNoise(object):
    def __init__(self, mean=0., std=1):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        noise = tf.random.normal(tf.shape(tensor), mean=self.mean, stddev=self.std, dtype=tf.float32)
        return tensor + noise

class AddSaltAndPepperNoise(object):
    def __init__(self, prob=0.2):
        self.prob = prob
        
    def __call__(self, tensor):
        salt_pepper = tf.random.uniform(tf.shape(tensor), 0, 1)
        noisy_image = tf.where(salt_pepper < self.prob / 2, 0.0, tensor)
        noisy_image = tf.where(salt_pepper > 1 - self.prob / 2, 1.0, noisy_image)
        return noisy_image

class AddRayleighNoise(object):
    def __init__(self, scale=0.26):
        self.scale = scale
        
    def __call__(self, tensor):
        noise = tf.random.normal(tf.shape(tensor), mean=0.0, stddev=self.scale, dtype=tf.float32)
        noisy_image = tensor + noise
        noisy_image = tf.clip_by_value(noisy_image, 0.0, 1.0)
        return noisy_image

class AddSpeckleNoise(object):
    def __init__(self, mean=0., std=0.5):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        noise = tf.random.normal(tf.shape(tensor), mean=self.mean, stddev=self.std, dtype=tf.float32)
        noisy_tensor = tensor + tensor * noise
        return noisy_tensor

import random

def noisy_generator(batches):
    noise_types = ['gaussian', 'rayleigh', 'salt_and_pepper']
    
    for batch_x, batch_y in batches:
        # Select a random noise type
        noise_type = random.choice(noise_types)
        
        if noise_type == 'salt_and_pepper':
            # Add Salt and Pepper Noise
            noisy_transform = AddSaltAndPepperNoise(prob=0.3)
        elif noise_type == 'gaussian':
            # Add Gaussian Noise
            noisy_transform = AddGaussianNoise(std=0.5)
#         elif noise_type == 'speckle':
            # Add Gaussian Noise
#             noisy_transform = AddSpeckleNoise(std=0.5)
        elif noise_type == 'rayleigh':
            # Add Rayleigh Noise
            noisy_transform = AddRayleighNoise(scale=0.1)
        else:
            raise ValueError("Invalid noise type. Choose from 'salt_and_pepper', 'gaussian', or 'rayleigh'.")
        
        # Apply the noise transform
        noisy_batch_x = tf.stack([noisy_transform(tensor) for tensor in batch_x])
        
        yield (noisy_batch_x, batch_y)



import keras.layers as layers
import keras.models as models
from keras.initializers import orthogonal

def Conv2DLayer(x, filters, kernel, strides, padding, block_id, kernel_init=orthogonal()):
    prefix = f'block_{block_id}_'
    x = layers.Conv2D(filters, kernel_size=kernel, strides=strides, padding=padding,
                      kernel_initializer=kernel_init, name=prefix+'conv')(x)
    x = layers.ReLU(name=prefix+'relu')(x)
    x = layers.Dropout(0.1, name=prefix+'drop')((x))
    return x

def Transpose_Conv2D(x, filters, kernel, strides, padding, block_id, kernel_init=orthogonal()):
    prefix = f'block_{block_id}_'
    x = layers.Conv2DTranspose(filters, kernel_size=kernel, strides=strides, padding=padding,
                               kernel_initializer=kernel_init, name=prefix+'de-conv')(x)
    x = layers.ReLU(name=prefix+'relu')(x)
    x = layers.Dropout(0.1, name=prefix+'drop')((x))
    return x

def AutoEncdoer(input_shape):
    inputs = layers.Input(shape=input_shape)
    
    # 256 x 256
    conv1 = Conv2DLayer(inputs, 64, 3, strides=1, padding='same', block_id=1)
    conv2 = Conv2DLayer(conv1, 64, 3, strides=2, padding='same', block_id=2)
    
    # 128 x 128
    conv3 = Conv2DLayer(conv2, 128, 5, strides=2, padding='same', block_id=3)
    
    # 64 x 64
    conv4 = Conv2DLayer(conv3, 128, 3, strides=1, padding='same', block_id=4)
    conv5 = Conv2DLayer(conv4, 256, 5, strides=2, padding='same', block_id=5)
    
    # 32 x 32
    conv6 = Conv2DLayer(conv5, 512, 3, strides=2, padding='same', block_id=6)
    
    # 16 x 16
    deconv1 = Transpose_Conv2D(conv6, 512, 3, strides=2, padding='same', block_id=7)
    
    # 32 x 32
    skip1 = layers.concatenate([deconv1, conv5], name='skip1')
    conv7 = Conv2DLayer(skip1, 256, 3, strides=1, padding='same', block_id=8)
    deconv2 = Transpose_Conv2D(conv7, 128, 3, strides=2, padding='same', block_id=9)
    
    # 64 x 64
    skip2 = layers.concatenate([deconv2, conv3], name='skip2')
    conv8 = Conv2DLayer(skip2, 128, 5, strides=1, padding='same', block_id=10)
    deconv3 = Transpose_Conv2D(conv8, 64, 3, strides=2, padding='same', block_id=11)
    
    # 128 x 128
    skip3 = layers.concatenate([deconv3, conv2], name='skip3')
    conv9 = Conv2DLayer(skip3, 64, 5, strides=1, padding='same', block_id=12)
    deconv4 = Transpose_Conv2D(conv9, 64, 3, strides=2, padding='same', block_id=13)
    
    # 256 x 256
    skip3 = layers.concatenate([deconv4, conv1])
    conv10 = layers.Conv2D(3, 3, strides=1, padding='same', activation='sigmoid',
                       kernel_initializer=orthogonal(), name='final_conv')(skip3)

    
    return models.Model(inputs=inputs, outputs=conv10)
#     decoded = layers.Conv2D(3, 3, activation='sigmoid', padding='same')(conv10)
    
#     return models.Model(inputs=inputs, outputs=decoded)



In [None]:

dataset_path = '/kaggle/input/aaaaaaaaaaa/owndataset'


                                            
test_datagen = ImageDataGenerator(**data_gen_args)

test_batches = test_datagen.flow_from_directory(
    dataset_path ,
    **data_flow_args)

test_noisy_batches = noisy_generator(test_batches)




X, y = next(test_noisy_batches)



score = model.evaluate(X, y, verbose=1)
print('Test loss:', score[0])
print('Test accuracy:', score[1])


decoded_imgs = model.predict(X)

In [None]:




train_datagen = ImageDataGenerator(**data_gen_args)
val_datagen = ImageDataGenerator(**data_gen_args)

train_batches = train_datagen.flow_from_directory(
    dataset_path + '/train',
    **data_flow_args)

val_batches = val_datagen.flow_from_directory(
    dataset_path + '/val',
    **data_flow_args)


train_noisy_batches = noisy_generator(train_batches)
val_noisy_batches = noisy_generator(val_batches)

from keras.optimizers import SGD, Adam

model = AutoEncdoer((*input_shape, 3))
# model_opt = SGD(lr=0.005, decay=1-0.995, momentum=0.7, nesterov=False)
model_opt = Adam(learning_rate=0.001)



In [None]:
import tensorflow as tf
from skimage.metrics import peak_signal_noise_ratio, structural_similarity

def psnr(y_true, y_pred):
    return tf.image.psnr(y_true, y_pred, max_val=1.0)

def ssim(y_true, y_pred):
    return tf.image.ssim(y_true, y_pred, max_val=1.0)

model.compile(optimizer=model_opt, loss='mse', metrics=['accuracy', psnr, ssim])


In [None]:


import tensorflow as tf

def tv_loss(y_true, y_pred):
    # Calculate the total variation loss
    dy = tf.abs(y_pred[:, 1:, :, :] - y_pred[:, :-1, :, :])
    dx = tf.abs(y_pred[:, :, 1:, :] - y_pred[:, :, :-1, :])
    tv_loss = tf.reduce_mean(dx) + tf.reduce_mean(dy)
    
    # Define your main loss function (e.g., MSE)
    mse_loss = tf.reduce_mean(tf.square(y_true - y_pred))
    
    # Combine the two losses with a weighting factor
    alpha = 0.1  # Weighting factor for TV regularization
    total_loss = mse_loss + alpha * tv_loss
    
    return total_loss

# model.compile(optimizer=model_opt, loss=tv_loss, metrics=['accuracy'])
# modelchk = keras.callbacks.ModelCheckpoint(
#                                       monitor='accuracy',
#                                       verbose=1,
#                                       save_best_only=True,
#                                       save_weights_only=False,
#                                       mode='auto',
#                                       save_freq=2)
# Define the filepath where the model will be saved
saved_model_path = './content/saved_models/my_model.keras'

# Define ModelCheckpoint callback
modelchk = keras.callbacks.ModelCheckpoint(
    filepath=saved_model_path,  # specify the filepath here
    monitor='val_accuracy',
    verbose=1,
    save_best_only=True,
    save_weights_only=False,
    mode='auto',
    save_freq=2
)

tensorboard = keras.callbacks.TensorBoard(log_dir=P_LOGS,
                                          histogram_freq=0,
                                          write_graph=True,
                                          write_images=True)

csv_logger = keras.callbacks.CSVLogger(f'{P_LOGS}/keras_log.csv',
                                       append=True)

import tensorflow as tf
train_dataset = tf.data.Dataset.from_generator(
    lambda: train_noisy_batches,
    output_signature=(
        tf.TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32)
    )
)

val_dataset = tf.data.Dataset.from_generator(
    lambda: val_noisy_batches,
    output_signature=(
        tf.TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(None, 256, 256, 3), dtype=tf.float32)
    )
)

epochs=100

history=model.fit(train_dataset,
          steps_per_epoch=train_batches.samples // batch_size,
          epochs=epochs,
          verbose=1, 
          validation_data=val_dataset,
          validation_steps=train_batches.samples // batch_size,
          callbacks=[modelchk, tensorboard, csv_logger])






In [None]:

dataset_path = '/kaggle/input/aaaaaaaaaaa/owndataset'


                                            
test_datagen = ImageDataGenerator(**data_gen_args)

test_batches = test_datagen.flow_from_directory(
    dataset_path ,
    **data_flow_args)

test_noisy_batches = noisy_generator(test_batches)




X, y = next(test_noisy_batches)



score = model.evaluate(X, y, verbose=1)
print('Test loss:', score[0])
print('Test accuracy:', score[1])


decoded_imgs = model.predict(X)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from skimage.metrics import peak_signal_noise_ratio, structural_similarity

for i in range(len(y)):
    plt.figure(figsize=(10, 5))

    # Display the input image
    plt.subplot(1, 2, 1)
    plt.imshow(X[i])
    plt.title('Input Image')
    plt.axis('off')

    # Display the output (decoded) image
    plt.subplot(1, 2, 2)
    plt.imshow(decoded_imgs[i])
    plt.title('Output (Decoded) Image')
    plt.axis('off')

    # Calculate PSNR
    psnr_val = tf.image.psnr(y[i], decoded_imgs[i], max_val=1.0)

    # Calculate SSIM
    ssim_val = tf.image.ssim(y[i], decoded_imgs[i], max_val=1.0)

    # Display PSNR and SSIM
    plt.text(1.5, 0.5, f"PSNR: {psnr_val.numpy():.4f}\nSSIM: {ssim_val.numpy():.4f}", 
             horizontalalignment='center', verticalalignment='center', 
             fontsize=12, color='black', transform=plt.gca().transAxes)

    plt.show()
