In [None]:
import os
import cv2
import copy
import numpy as np
from os import listdir
from pathlib import Path
from os.path import isfile, join
from tqdm import tqdm
from sklearn.datasets import load_sample_image
from sklearn.feature_extraction import image
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf


In [None]:
def create(pp, img, p):

    img_ = np.zeros((p, 128, 128, 1), dtype=np.float32)

    for i in range(len(img)):
        mask = np.random.choice([0, 1], size=(128, 128), p=[1-pp, pp])
        idx_w, idx_h = np.where(mask ==  1)

        for j in range(len(idx_w)):
            img[i, idx_w[j],idx_h[j],0] = 0

        img_[i,:,:,0] = img[i,:,:,0]
    return img_

def curate(path, lt,  x, y):

    cnt = 0
    for idx, j in tqdm(enumerate(lt)):
        onlyfiles   = [f for f in listdir(lt[idx]) if isfile(join(lt[idx], f))]
        onlyfiles.remove('Thumbs.db')

        for _, i in enumerate(onlyfiles):
            p = join(str(lt[idx]),i)
            img = cv2.imread(p, 0)
            y_patches = image.extract_patches_2d(img, (128,128), max_patches = 8)

            y_patches = np.reshape(y_patches,(8, 128,128,-1))


            y_patch = copy.deepcopy(y_patches)
            x_patches = create(0.85, y_patch, 8)

            for k in range(len(x_patches)):
                x[cnt] = x_patches[k]
                y[cnt] = y_patches[k]
                cnt += 1


def curate_(path, lt,  x, y):

    cnt = 0
    for idx, j in tqdm(enumerate(lt)):
        onlyfiles   = [f for f in listdir(lt[idx]) if isfile(join(lt[idx], f))]

        for _, i in enumerate(onlyfiles):
            p = join(str(lt[idx]),i)
            img = cv2.imread(p, 0)
            y_patches = image.extract_patches_2d(img, (128,128), max_patches = 2)

            y_patches = np.reshape(y_patches,(2, 128,128,-1))


            y_patch = copy.deepcopy(y_patches)
            x_patches = create(0.85, y_patch, 2)

            for k in range(len(x_patches)):
                x[cnt] = x_patches[k]
                y[cnt] = y_patches[k]
                cnt += 1


In [None]:
def psnr_mean(y_true, y_pred):
    return tf.reduce_mean(tf.image.psnr(y_true, y_pred, max_val=1.0))


def ssim_loss(true, pred):
    return 1 - tf.reduce_mean(tf.image.ssim(true, pred, 1.0))


In [None]:
def SimpleCSNet2():

    input_layer = tf.keras.layers.Input((128, 128, 1))

    x = tf.keras.layers.Conv2D(32, (3, 3), padding='same', kernel_initializer='orthogonal')(input_layer)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.Conv2D(32, (3, 3), padding='same', kernel_initializer='orthogonal')(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.Conv2D(32, (3, 3), padding='same', kernel_initializer='orthogonal')(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.Conv2D(32, (3, 3), padding='same', kernel_initializer='orthogonal')(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.Conv2D(32, (3, 3), padding='same', kernel_initializer='orthogonal')(x)
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.Conv2D(32, (3, 3), padding='same', kernel_initializer='orthogonal')(x)
    x = tf.keras.layers.ReLU()(x)
    output_layer = tf.keras.layers.Conv2D(1, (1, 1), activation='sigmoid', padding='same', kernel_initializer='orthogonal')(x)

    ae = tf.keras.models.Model(inputs = [input_layer], outputs = [output_layer])
    ae.compile(optimizer='adam', loss=ssim_loss, metrics=[psnr_mean])

    ae.summary()

    return ae


In [None]:
def SimpleCSNet():

    input_layer = tf.keras.layers.Input((128, 128, 1))

    conv1 = tf.keras.layers.Conv2D(8, (3,3), padding='same')(input_layer)
    conv1 = tf.keras.layers.ReLU()(conv1)

    conv2 = tf.keras.layers.Conv2D(8, (3,3), padding='same')(conv1)
    conv2 = tf.keras.layers.ReLU()(conv2)

    conv3 = tf.keras.layers.Conv2D(16, (3,3), padding='same')(conv2)
    conv3 = tf.keras.layers.ReLU()(conv3)

    conv4 = tf.keras.layers.Conv2D(16, (3,3), padding='same')(conv3)
    conv4 = tf.keras.layers.ReLU()(conv4)

    conv5 = tf.keras.layers.Conv2D(32, (3,3), padding='same')(conv4)
    conv5 = tf.keras.layers.ReLU()(conv5)

    conv6 = tf.keras.layers.Conv2D(32, (3,3), padding='same')(conv5)
    conv6 = tf.keras.layers.ReLU()(conv6)

    output_layer = tf.keras.layers.Conv2D(1, (1,1), padding='same', activation='sigmoid')(conv6)

    ae = tf.keras.models.Model(inputs = [input_layer], outputs = [output_layer])
    ae.compile(optimizer='adam', loss=ssim_loss, metrics = [psnr_mean])

    ae.summary()

    return ae


In [None]:
def fire(x, squeeze, expand):
    y  = tf.keras.layers.Conv2D(filters=squeeze, kernel_size=1, activation='relu', padding='same')(x)
    y1 = tf.keras.layers.Conv2D(filters=expand//2, kernel_size=1, activation='relu', padding='same')(y)
    y3 = tf.keras.layers.Conv2D(filters=expand//2, kernel_size=3, activation='relu', padding='same')(y)
    return tf.keras.layers.concatenate([y1, y3])

def fire_module(squeeze, expand):
    return lambda x: fire(x, squeeze, expand)

def SimpleCSNet_sq():
    input_layer = tf.keras.layers.Input(shape=[128, 128, 1])
    y = tf.keras.layers.Conv2D(kernel_size=3, filters=32, padding='same', use_bias=True, activation='relu')(input_layer)
    y = fire_module(32, 32)(y)
    y = fire_module(32, 32)(y)
    y = fire_module(32, 32)(y)
    y = fire_module(32, 32)(y)
    y = fire_module(32, 32)(y)
    output_layer = tf.keras.layers.Conv2D(1, (1,1), padding = 'same', activation='sigmoid')(y)

    ae = tf.keras.models.Model(inputs = [input_layer], outputs = [output_layer])
    ae.compile(optimizer='adam', loss=ssim_loss, metrics = [psnr_mean])
    ae.summary()

    return ae


In [None]:
count_n = 0
IMG_WIDTH = 128
IMG_HEIGHT = 128

Path1 = Path('/workspace/storage/cnn-cs/data/images')
Path2 = Path('/workspace/storage/cnn-cs/data/train')

lst  = [x for x in Path1.iterdir() if Path1.is_dir()]
lst_ = [x for x in Path2.iterdir() if Path2.is_dir()]

for i in range(len(lst)):
    count_n += len(os.listdir(os.path.join(Path1,lst[i]))) - 1

x_train = np.zeros((count_n * 8, IMG_HEIGHT, IMG_WIDTH, 1), dtype=np.uint8)
y_train = np.zeros((count_n * 8, IMG_HEIGHT, IMG_WIDTH, 1), dtype=np.uint8)

curate(Path1, lst, x_train, y_train)

x_train = x_train / 255.
y_train = y_train / 255.

count_n = 0

for i in range(len(lst_)):
    count_n += len(os.listdir(os.path.join(Path2,lst_[i])))

X_train = np.zeros((count_n * 2, IMG_HEIGHT, IMG_WIDTH, 1), dtype=np.uint8)
Y_train = np.zeros((count_n * 2, IMG_HEIGHT, IMG_WIDTH, 1), dtype=np.uint8)

curate_(Path2, lst_, X_train, Y_train)

X_train = X_train / 255.
Y_train = Y_train / 255.

X_TRAIN, X_VAL, Y_TRAIN, Y_VAL = train_test_split(np.concatenate((x_train,X_train),axis=0), np.concatenate((y_train,Y_train),axis=0), test_size = 0.2, random_state = 42)

model_cnn = SimpleCSNet2()

checkpointer = tf.keras.callbacks.ModelCheckpoint('/workspace/data/cs-simple-model-1000.h5', verbose=1, save_best_only=True)
history = model_cnn.fit(X_TRAIN, Y_TRAIN, epochs=500, batch_size=64, shuffle=True, validation_data=(X_VAL, Y_VAL), verbose = 1,  callbacks=[checkpointer])

hist_df = pd.DataFrame(history.history)
hist_df.to_csv('/workspace/data/cs-simple-history-1000.csv')

print('End of training ...')
