In [None]:
!unzip /content/Cut.zip -d /content/New_Data

Archive:  /content/Cut.zip
   creating: /content/New_Data/Cut/
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_0_0.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_0_1200.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_0_1800.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_0_600.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_1200_0.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_1200_1200.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_1200_1800.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_1200_600.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_1800_0.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_1800_1200.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_1800_1800.png  
  inflating: /content/New_Data/Cut/denoised_equalized_image_0_1800_600.png  
  inflating: /content/New

In [None]:
import os
import numpy as np
from skimage.transform import resize
from skimage import io
import tensorflow as tf
from tqdm.keras import TqdmCallback

print(tf.__version__)
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

2.15.0
Num GPUs Available:  0


In [None]:
# Function to load images from a specified directory
def load_images_from_directory(directory, target_size=(512, 512)):
    images = []
    for filename in os.listdir(directory):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            img_path = os.path.join(directory, filename)
            img = io.imread(img_path).astype(np.float32) / 255.0
            img = resize(img, target_size, anti_aliasing=True)
            if len(img.shape) == 2:  # Grayscale to RGB
                img = np.stack((img,) * 3, axis=-1)
            images.append(img)
    return images

In [None]:
# Function to add noise to an image
def get_noisy_img(img, sig=30):
    sigma = sig / 255.0
    noise = np.random.normal(scale=sigma, size=img.shape)
    img_noisy = np.clip(img + noise, 0, 1).astype(np.float32)
    return img_noisy

In [None]:
# Function to create an inpainting mask
def get_inpainted_img(img, mask_size=0.25):
    mask = np.ones_like(img)[:, :, :1]
    sx, sy = int(mask_size * img.shape[0]), int(mask_size * img.shape[1])
    x = np.random.randint(0, img.shape[0] - sx)
    y = np.random.randint(0, img.shape[1] - sy)
    mask[x:x + sx, y:y + sy] = 0
    img_noisy = img * mask
    return img_noisy, mask

In [None]:
# Function to save images to a specified directory
def save_image(image, directory, filename):
    if not os.path.exists(directory):
        os.makedirs(directory)
    img_path = os.path.join(directory, filename)
    io.imsave(img_path, (image * 255).astype(np.uint8))

In [None]:
# Load your local images
image_directory = "/content/New_Data/Cut"  # Update this path
save_directory = "/content/New_Data1"  # Update this path

images = load_images_from_directory(image_directory)

In [None]:
for idx, img in enumerate(images):
    save_image(img, save_directory, f"ground_truth_{idx + 1}.png")

def dip_workflow(x0, x_true, f, f_input_shape, z_std=0.1, loss_mask=None, num_iters=5000, init_lr=0.01, save_filepath=None):
    """Deep Image Prior workflow"""
    shape = (1,) + f_input_shape
    z = tf.constant(np.random.uniform(size=shape).astype(np.float32) * z_std, name='net_input')

    def loss_fn(x_true, x):
        if loss_mask is None:
            return tf.keras.losses.MSE(x, x_true)  # Use x_true instead of x0
        else:
            return tf.keras.losses.MSE(x * loss_mask, x_true * loss_mask)

    def mse_to_gt(x_true, x):
        return tf.reduce_mean(tf.keras.losses.MeanSquaredError()(x, x_true))

    def psnr_to_gt(x_true, x, maxv=1.0):
        mse = tf.reduce_mean(tf.keras.losses.MeanSquaredError()(x, x_true))
        psnr_ = 10.0 * tf.math.log(maxv ** 2 / mse) / tf.math.log(10.0)
        return psnr_

    opt = tf.keras.optimizers.Adam(learning_rate=init_lr)
    f.compile(optimizer=opt, loss=loss_fn, metrics=[mse_to_gt, psnr_to_gt])

    callbacks = ()
    if save_filepath is not None:
        callbacks = create_saving_callback(save_filepath)

    # Create a dataset from the input tensor and repeat it indefinitely
    dataset = tf.data.Dataset.from_tensors((z, x_true[None, ...])).repeat()

    history = f.fit(dataset, epochs=num_iters, steps_per_epoch=1, verbose=0, callbacks=callbacks + (TqdmCallback(verbose=1),))

    x = f.predict(z)[0]
    if save_filepath is not None and os.path.exists(save_filepath):
        f.load_weights(save_filepath)
        x_opt = f.predict(z)[0]
    return x

In [None]:
class GaussianNoiseWithDecay(tf.keras.layers.GaussianNoise):
    def __init__(self, stddev, decayrate=0.99999, decaysteps=1, **kwargs):
        super(GaussianNoiseWithDecay, self).__init__(stddev, **kwargs)
        self.num_calls = 0
        self.decayrate = decayrate
        self.decaysteps = decaysteps

    def call(self, inputs, training=None):
        def noised():
            self.num_calls += 1
            stddev = self.stddev * self.decayrate ** (self.num_calls // self.decaysteps)
            return inputs + tf.keras.backend.random_normal(shape=tf.shape(inputs), mean=0.0, stddev=stddev, dtype=inputs.dtype)

        return tf.keras.backend.in_train_phase(noised, inputs, training=training)


In [None]:
def create_saving_callback(filepath):
    return (tf.keras.callbacks.ModelCheckpoint(filepath=filepath, monitor='loss', verbose=0, save_best_only=True, mode='min'),)

def deep_image_prior(input_shape, noise_reg=None, layers=(128, 128, 128, 128, 128), kernel_size_down=3, kernel_size_up=3, skip=(0, 4, 4, 4, 4)):
    def norm_and_active(x):
        x = tf.keras.layers.BatchNormalization()(x)
        x = tf.keras.layers.LeakyReLU()(x)
        return x

    model = tf.keras.models.Sequential(name="Deep Image Prior")
    inputs = tf.keras.Input(shape=input_shape)
    x = inputs
    if noise_reg is not None:
        x = GaussianNoiseWithDecay(**noise_reg)(x)

    down_layers = []
    for i, (num_filters, do_skip) in enumerate(zip(layers, skip)):
        if do_skip > 0:
            down_layers.append(norm_and_active(tf.keras.layers.Conv2D(filters=do_skip, kernel_size=1, strides=1, name=f"conv_skip_depth_{i}")(x)))
        for j, strides in enumerate([2, 1]):
            x = norm_and_active(tf.keras.layers.Conv2D(num_filters, kernel_size_down, strides=strides, padding='same', name=f"conv_down_{j + 1}_depth_{i}")(x))

    for i, (num_filters, do_skip) in enumerate(zip(layers[::-1], skip[::-1])):
        x = tf.keras.layers.UpSampling2D(interpolation='bilinear', name=f"upsample_depth_{i}")(x)
        if do_skip:
            x = tf.keras.layers.Concatenate(axis=-1)([x, down_layers.pop()])
        for j, kernel_size in enumerate([kernel_size_up, 1]):
            x = norm_and_active(tf.keras.layers.Conv2D(num_filters, kernel_size, strides=1, padding='same', name=f"conv_up_{j + 1}_depth_{i}")(x))

    x = tf.keras.layers.Conv2D(filters=3, kernel_size=1, strides=1, name="conv_out")(x)
    x = tf.keras.layers.Activation('sigmoid')(x)
    return tf.keras.Model(inputs=inputs, outputs=x, name="deep_image_prior")


In [None]:
def display_dip_model(input_shape=(256, 256, 3)):
    model = deep_image_prior(input_shape)
    model.build(input_shape)
    print(model.summary())


In [None]:
# Example usage:
# Define input shape and create DIP model
input_shape = (512, 512, 3)
model = deep_image_prior(input_shape)

# Prepare test images (assuming images is a list of loaded images)
for idx, img in enumerate(images):
    noisy_img = get_noisy_img(img)
    output_img = dip_workflow(noisy_img, img, model, input_shape)
    save_image(output_img, save_directory, f"denoised_{idx + 1}.png")


0epoch [00:00, ?epoch/s]

0batch [00:00, ?batch/s]

In [None]:
!zip -r /content/New_Data/Cut /content/NewData