In [1]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras import activations
from tensorflow.keras import layers
from tensorflow.keras import optimizers
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession



config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)

root = os.getcwd()


In [2]:
def data_generator(encoded_images_array, ground_truth_array, start_index):
    
    path_post_enc = "C:\\Users\\Furkan\\Desktop\\BLG4901E\\nii_deney\\post_enc_frames"
    path_pre_enc = "C:\\Users\\Furkan\\Desktop\\BLG4901E\\nii_deney\\pre_enc_frames"
    batch_size = 10
    
    folders = os.listdir(path_post_enc)
    for i in range(start_index, start_index + batch_size):
        os.chdir(os.path.join(path_post_enc, folders[i]))
        files = os.listdir(os.getcwd())
        for file in files:
            with open(file, 'rb') as yuv_file:
                encoded_images_array.append(np.frombuffer(yuv_file.read(256 * 256), dtype=np.uint8).reshape((256, 256)))

    folders = os.listdir(path_pre_enc)
    for i in range(start_index, start_index + batch_size):
        os.chdir(os.path.join(path_pre_enc, folders[i]))
        files = os.listdir(os.getcwd())
        for file in files:
            with open(file, 'rb') as yuv_file:
                ground_truth_array.append(np.frombuffer(yuv_file.read(256 * 256), dtype=np.uint8).reshape((256, 256)))

    return start_index + 10


In [3]:
def validation_data(X_val, y_val):
    path_post_enc = "C:\\Users\\Furkan\\Desktop\\BLG4901E\\nii_deney\\post_enc_frames"
    path_pre_enc = "C:\\Users\\Furkan\\Desktop\\BLG4901E\\nii_deney\\pre_enc_frames"
    
    folders = os.listdir(path_post_enc)
    for i in range(570, 578):
        os.chdir(os.path.join(path_post_enc, folders[i]))
        files = os.listdir(os.getcwd())
        for file in files:
            with open(file, 'rb') as yuv_file:
                X_val.append(np.frombuffer(yuv_file.read(256 * 256), dtype=np.uint8).reshape((256, 256)))

    folders = os.listdir(path_pre_enc)
    for i in range(570, 578):
        os.chdir(os.path.join(path_pre_enc, folders[i]))
        files = os.listdir(os.getcwd())
        for file in files:
            with open(file, 'rb') as yuv_file:
                y_val.append(np.frombuffer(yuv_file.read(256 * 256), dtype=np.uint8).reshape((256, 256)))

In [None]:

def SpatialAttention(input):
    x = layers.Conv2D(filters=16, kernel_size=3, strides=1, padding='same')(input)
    x = layers.ReLU()(x)
    x = layers.Conv2D(filters=1, kernel_size=3, strides=1, padding='same')(x)
    out = layers.Activation('sigmoid')(x)

    return out
    
'''
def SpatialAttention(input):
    x_shortcut = input
    x = layers.Conv2D(filters=16, kernel_size=3, strides=1, padding='same')(input)
    x = layers.ReLU()(x)
    x = layers.Conv2D(filters=1, kernel_size=3, strides=1, padding='same')(x)
    x = layers.Activation('sigmoid')(x)
    out = layers.Multiply()([x_shortcut, x])

    return out
    '''

In [7]:

def ChannelAttention(input):
    sh = input.get_shape().as_list()
    x = layers.GlobalAveragePooling2D()(input)
    x = layers.Dense(8)(x)
    x = layers.ReLU()(x)
    x = layers.Dense(sh[-1])(x)
    out = layers.Activation('sigmoid')(x)

    return out


In [8]:
def ResidualAttentionBlock(x):
    x_shortcut = x

    x = layers.Conv2D(filters=1, kernel_size=3, padding="same")(x)  # conv 1
   # x = layers.BatchNormalization()(x)
    spatial_attention = SpatialAttention(x)  # spatial attention
    x = layers.Multiply()([x, spatial_attention])  # multiply

    x = layers.Conv2D(filters=64, kernel_size=3, padding="same")(x)  # conv 2
   # x = layers.BatchNormalization()(x)
    x = layers.LeakyReLU(alpha=0.05)(x)  # leaky relu
    x = layers.Conv2D(filters=1, kernel_size=3, padding="same")(x)  # conv 2
  #  x = layers.BatchNormalization()(x)

    x = layers.Add()([x, x_shortcut])  # add
  #  x = layers.BatchNormalization()(x)
  #  x = activations.sigmoid(x)

    return x


In [9]:
def CNN(num_blocks):
    input_layer = layers.Input(shape=(256, 256, 1))
    x = input_layer

    for _ in range(num_blocks):
        x = ResidualAttentionBlock(x)

    model = tf.keras.Model(inputs=input_layer, outputs=x)
    return model


In [10]:
num_blocks = 30

optimizer = optimizers.Adam(learning_rate=0.001)

Model = CNN(num_blocks)

Model.compile(optimizer=optimizer, loss='mae')
# Model.summary()

In [11]:
X_val = []
y_val = []
validation_data(X_val, y_val)
X_val = np.array(X_val) / 255.0
y_val = np.array(y_val) / 255.0

In [None]:
epochs = 20
st_ix = 0
for i in range(epochs):
    X_train = []
    y_train = []
    
    
    st_ix = data_generator(X_train, y_train, st_ix)

    X_train = np.array(X_train) / 255.0
    y_train = np.array(y_train) / 255.0
    

    os.chdir(root)
    if i != 0:
        # loading the saved model
        Model = tf.keras.models.load_model('./MyModel_tf')
    
    Model.fit(
        x=X_train,
        y=y_train,
        epochs=3,
        batch_size=2,
        shuffle=False,
        validation_data=(X_val, y_val)
    )

    Model.save('./MyModel_tf',save_format='tf'),




In [None]:
Model = tf.keras.models.load_model('./MyModel_tf')

In [None]:
predictions = Model.predict(encoded_images_array)

In [None]:
predictions[predictions < 0] = 0

predictions[predictions > 255] = 255

In [None]:
os.chdir(root)
cv2.imwrite("C:\\Users\\Furkan\\Desktop\\BLG4901E\\CNN\\as.png", predictions[0] * 255)