# Imports and global vars

In [2]:
import torch
import math
import numpy as np
import tensorflow as tf
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import torchvision.transforms 
import cv2
import gc
matplotlib.use('Agg')


from torchvision.io import decode_image
from pathlib import Path
from torch import nn
from tqdm import tqdm
from typing import Union

# pyplot style
plt.rcParams["savefig.bbox"] = 'tight'
# Select data path
# DATA_PATH = Path('../../data/raw/test_data')
DATA_PATH = Path('../../data/raw/back_h0_128mm_test015')
# Colormap
cmap1_strict = matplotlib.colors.ListedColormap(['green'])
# Number of images used for testing
max_number_of_images = 709
# Convolutional network
conv_fn = tf.nn.conv2d

# Gaussian and sobel kernel calculations functions

In [3]:
def create_gaussian_filter(sigma:float=1.4, k:int=2):
    """
    Creates a gaussian kernel with determined sigma and size values

    Parameters
    ----------
    sigma : float
        The standard deviation of the gaussian

    k : int
        The size of the kernel

    Returns
    -------
    tf.constant
        The filter as a constant tensorflow value
    """
    size = (2*k)+1
    kernel = np.fromfunction(lambda x, y:(math.e ** (-(((x-(k))**2) + ((y-(k))**2))/(2*sigma**2)))/(2*math.pi*sigma**2), (size, size))
    return tf.constant(kernel)
def create_sobel_kernel(k=3, transposed = False):
    """
    Creates a sobel kernel with determined size value

    Parameters
    ----------
    k : int
        The size of the kernel

    transposed : bool
        Should the kernel be transposed


    Returns
    -------
    tf.constant
        The filter as a constant tensorflow value
    """
    grid_range = np.linspace(-(k // 2), k // 2, k)
    x, y = np.meshgrid(grid_range, grid_range)
    numer = x
    denom = (x**2 + y**2)
    denom[:, k // 2] = 1
    sobel_2D = numer / denom
    return tf.constant(sobel_2D.T) if transposed else tf.constant(sobel_2D)

In [4]:
# Show example kernels
kernel = [create_gaussian_filter(1.4,2), create_sobel_kernel(5, True)]
for style in kernel:
    plt.matshow(style)

# Well known kernel examples and custom ones

In [5]:
# Dictionary with kernels
kernels = {
    "Edge Detect" : tf.constant([
                    [-1,-1,-1],
                    [-1,8,-1],
                    [-1,-1,-1],
                    ]),
    "Bottom Sobel" : tf.constant([
                    [-1,-2,-1],
                    [0,0,0],
                    [1,2,1],
                    ]),
    "Top Sobel" : tf.constant([
        [1,2,1],
        [0,0,0],
        [-1,-2,-1],
    ]),
    "Emboss" : tf.constant([
                    [-3,-2,-1],
                    [1,0,-1],
                    [3,2,1],
                    ]),
    "Sharpen" : tf.constant([
                    [0,-1,0],
                    [-1,5,-1],
                    [0,-1,0],
                    ]),
    "Mean" : tf.constant([
        [1, 1, 1],
        [1, 1, 1],
        [1, 1, 1]
    ]),
    "Gaussian" : tf.constant([
        [1, 2, 1],
        [2, 4, 2],
        [1, 2, 1]
    ]),
    "Smoothing 5x5" : tf.constant([ #Its still just Gaussian filter
        [2,4,5,4,2],
        [4,9,12,9,4],
        [5,12,15,12,5],
        [4,9,12,9,4],
        [2,4,5,4,2]
    ]),
    "Top Sobel 5x5" : tf.constant([
        [2,2,4,2,2],
        [1,1,2,1,1],
        [0,0,0,0,0],
        [-1,-1,-2,-1,-1],
        [-2,-2,-4,-2,-2]
    ]),
    "Bottom Sobel 5x5" : tf.constant([
        [-2,-2,-4,-2,-2],
        [-1,-1,-2,-1,-1],
        [0,0,0,0,0],
        [1,1,2,1,1],
        [2,2,4,2,2]
    ]),
    "Custom2" : tf.constant([
        [-1, -1, -1],
        [1, 1,  1],
        [-1, -1, -1],
    ])
}

# Kernel keys list
kernels_keys = list(kernels.keys())

In [6]:
# Show kernels using matplotlib
plt.figure(figsize=(16,16))
fig, plots = plt.subplots(1, len(kernels))
fig.suptitle('Examples of kernels')
fig.set_figwidth(15)
for i, (name, kernel) in enumerate(kernels.items()):
    plots[i].matshow(kernel,)
    plots[i].set_title(name)
plt.tight_layout()


# Kernel and convolutional layer helper functions

In [5]:
t1_work=None
t2_not=None

In [23]:
def get_kernel(name:str="Edge Detect") -> tf.Tensor:
    """
    Gets the kernel from a dictionary and sets it to the correct shape

    Parameters
    ----------
    name : str
        The name of the kernel within the previously defined kernels dictionary

    Returns
    -------
    tf.Tensor
        The kernel with the correct shape.
    """
    krnl = kernels[name]
    krnl = tf.reshape(krnl, [*krnl.shape, 1, 1])
    return tf.cast(krnl, dtype=tf.float32)

def filter_over_image(input_image:np.array, kernel:Union[tf.constant, str], normalize:bool = True) -> np.array:
    """
    Gets the kernel from a dictionary and sets it to the correct shape

    Parameters
    ----------
    input_image : np.array
        The image to be filtered over

    kernel : tf.constant | str
        The kernel as a tensorflow constant or as a string with the name of kernel from kernels dict

    normalize : boolean
        Boolean asking if the output should be normalized

    Returns
    -------
    np.array
        The new transformed image
    """
    if type(kernel) == str:
        krnl = get_kernel(kernel)
    else:
        if len(kernel.shape) == 3:
            krnl = tf.reshape(kernel, [kernel.shape[1], kernel.shape[2], 1, kernel.shape[0]])
        else:
            krnl = tf.reshape(kernel, [*kernel.shape, 1, 1])
        krnl = tf.cast(krnl, dtype=tf.float32)
        
    image_filter = conv_fn(
    input=input_image,
    filters=krnl,
    strides=1,
    padding='SAME',
    )

    return (image_filter.numpy()-np.min(image_filter.numpy()))/(np.max(image_filter.numpy())-np.min(image_filter.numpy())) if normalize else image_filter.numpy()

def threshold_over_normalised_image(input_image:np.array, threshold_percent:float, replacement_val:float) -> tf.Tensor:
    """
    Gets the kernel from a dictionary and sets it to the correct shape

    Parameters
    ----------
    input_image : np.array
        The image to be filtered over

    threshold_percent : float
        The percent below which values should be thresholded

    replacement_val : float
        What value to replace the thresholded values with

    Returns
    -------
    tf.Tensor
        Thresholded image with correct shape
    """
    thld = nn.Threshold(np.max(input_image) * threshold_percent, replacement_val)
    thld_image = thld(torch.from_numpy(input_image)).numpy()
    return tf.squeeze(tf.transpose(thld_image, perm=[1, 2, 0, 3]))


def apply_kernels(used_kernels = ["Edge Detect"], kernel_names = ["Edge Detect"], image_loc = DATA_PATH, image_range = [271, 272], save_location = "../", threshold_value=0.540, inverse_image_alphas=False, save_image=False, alpha_bounds=(100,553), verbose=True):
    """
    Applies given kernels over an image, returns the output as an image and optinally saves it.

    Parameters
    ----------
    used_kernels : list[str, tf.constant]
        List of kernels as either strings, tf.constants, or both.

    kernel_names : float
        The names of the used kernels, which will be used as the titles of images

    image_loc : path
        The path where the images are located

    image_range : list
        The range/list of numbers representing the current frame being processed

    save_location : string 
        The location to which the image should be saved
    
    threshold_value :float 
        A value, typically in the range [0,1], below which all values will be set to 0
    
    inverse_image_alphas :bool 
        Should the alpha (transparency) values be inversed. 
    
    save_image : bool
        Should the image be saved
    
    alpha_bounds : tuple
        The row bounds between which the image will be checked for processing.
        Useful since our images have lots of unused space.
    
    verbose: bool
        Should every step of preprocessing be shown
    """
    for i in image_range:
        if i%500==0:
            print(f"Frame #{i}")
    # for i in tqdm(range(max_number_of_images)):
        # Used for image show
        # wave_raw = torchvision.transforms.Grayscale(1)(decode_image(str(image_loc / f'{str(i).zfill(5)}.jpg'))/255.0) 
        wave_raw = decode_image(str(image_loc / f'{str(i).zfill(5)}.jpg'))/255.0
        image_list = [wave_raw]
        image_names = ["Raw Image"] + kernel_names
        # Used for processing
        processed_image = tf.concat([tf.expand_dims(wave_raw, 3)], 3)
        processed_image = tf.math.divide(tf.math.subtract(
                                    processed_image, 
                                    tf.reduce_min(processed_image)
                                ), 
                                tf.math.subtract(
                                    tf.reduce_max(processed_image), 
                                    tf.reduce_min(processed_image)
                                ))
        
        for krn in used_kernels:
            processed_image = filter_over_image(processed_image, krn, normalize=True)
            image_list.append(processed_image)

        # Thresholding
        processed_image = threshold_over_normalised_image(processed_image, threshold_value, 0)
        global t2_not
        t2_not=processed_image
        image_list.append(processed_image)
        image_names.append("thresholed")
        
        alphas = np.where(tf.squeeze(processed_image) > 0, 1, 0.0)
        ###########################
        # Alpha edits for inverse #
        if inverse_image_alphas:
            alphas = alphas.max() - alphas + alphas.min()
        alphas[:alpha_bounds[0], :] = 0
        # alphas[553:, :] = 0
        alphas[alpha_bounds[1]:, :] = 0
        ###########################
        ###########################
        

        ############ Morphological transformations ############
        # closing = cv2.morphologyEx(alphas, cv2.MORPH_CLOSE, np.ones((5,5),np.uint8))
        # opening = cv2.morphologyEx(alphas, cv2.MORPH_OPEN, np.ones((3,3),np.uint8))
        closing = cv2.morphologyEx(alphas, cv2.MORPH_CLOSE, np.ones((3,3),np.uint8))
        # closing = cv2.morphologyEx(closing, cv2.MORPH_GRADIENT, np.ones((3,3),np.uint8))
        # closing = cv2.morphologyEx(closing, cv2.MORPH_OPEN, np.ones((9,9),np.uint8))
        # closing = cv2.morphologyEx(closing, cv2.MORPH_CLOSE, np.ones((5,5),np.uint8))
        # closing = cv2.morphologyEx(closing, cv2.MORPH_OPEN, np.ones((5,5),np.uint8))
        # closing = cv2.morphologyEx(closing, cv2.MORPH_GRADIENT, np.ones((3,3),np.uint8))

        alphas = np.where(tf.squeeze(closing) > 0.0, 1, 0.0)
        # closing = alphas
        # image_list.append(closing)
        image_list.append(closing)
        image_names.append("with open and close")
        #######################################################
        #######################################################

        # print(type(tf.expand_dims(closing, 0).numpy()))
        # print(test.shape)
        # test = closing.astype(np.uint8) * 255
        # test2 = test
        # print((test2 == test).all())
        # # test = tf.expand_dims(test, 0).numpy()
        # # test = tf.transpose(tf.expand_dims(test, 0), perm=[1, 2, 0]).numpy()
        # print(test.dtype, test.max(), test.shape)
        # contours, _ = cv2.findContours(test, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        # cnt = contours[4]
        # cv2.drawContours(test, [cnt], 0, (0,255,0), 3)
        # print((test2 == test).all())
        test = np.zeros(closing.shape)
        top_positions = []
        # print(closing[251].max())
        prev_row = None
        for column in range(closing.shape[1]):
            val_changed = False
            for row in range(alpha_bounds[0], alpha_bounds[1]):
                if closing[row][column] > 0:
                    # if (prev_row is None or prev_row - 50 <= row <= prev_row + 50):
                        test[row][column] = 1
                        val_changed = True
                        # print(row)
                        prev_row = row
                        # top_positions.append(row)
                        # print(f"found wave at row {row} column {column}")
                        break
            if prev_row is None:
                prev_row = alpha_bounds[0]
            if not val_changed:
                # print(prev_row, column)
                test[prev_row][column] = 1
            top_positions.append(prev_row)
            
        for pos in range(1, len(top_positions)):
            height_change = top_positions[pos] - top_positions[pos-1]
            if height_change == 0:
                continue
            elif height_change < 0:
                test[top_positions[pos]:top_positions[pos-1]+1,pos] = 1
            else:
                test[top_positions[pos-1]:top_positions[pos]+1,pos] = 1

        image_list.append(test)
        image_names.append("modified contour image")
        image_list.append(test)
        image_names.append("Combined")

        if verbose:
            grid_size_x = math.floor((len(image_list))**(1/2))
            grid_size_y = math.ceil((len(image_list))/grid_size_x)  
        else:
            grid_size_x = 1
            grid_size_y = 3 
        gs = gridspec.GridSpec(grid_size_x, grid_size_y, bottom=0.1, top=0.45, left=0.1, right=0.66)   
        fig = plt.figure()
        fig.set_figwidth(55)
        fig.set_figheight(55)
        
        # Process image
        verbose_count = 0
        for j, krn in enumerate(image_names):
            # Display image
            if verbose:
                ax = fig.add_subplot(gs[(j)//grid_size_y, (j)%grid_size_y])
            else:
                ax = fig.add_subplot(gs[(verbose_count)//grid_size_y, (verbose_count)%grid_size_y])
            if j == 0:
                grayscaled_raw = tf.squeeze(image_list[j])*-1
                ax.imshow(grayscaled_raw, cmap="binary", alpha = 1)
                ax.set_title(f"Step {j}: {krn}")
                verbose_count += 1

            elif j >= len(image_names) -2:
                if j == len(image_names) -1:
                    ax.imshow(tf.squeeze(image_list[0]), alpha=0.75, cmap='gray')

                ax.imshow(tf.squeeze(image_list[j]), alpha = test, cmap=cmap1_strict)
                # ax.imshow(tf.squeeze(backup_test), alpha = backup_test, cmap=cmap1_strict)
                ax.set_title(f"Step {j}: {krn}")
                verbose_count += 1

            elif verbose:
                ax.imshow(tf.squeeze(image_list[j]))
                ax.set_title(f"Step {j}: {krn}")
        
        if save_image:
            plt.savefig(save_location + f"output_{i}.png") # Multi kernels
            ax.clear()
            fig.clear()
            plt.close()
    # Transform to correct format:
    # $VAR1=0
    # $END=7560
    # $i=1800
    # for ((;i<=END;i+=10)); do mv output_$i.png output_$VAR1.png; VAR1=$(($VAR1+1)); done
    # for ((i=1800;i<=END;i+=10)); do mv output_$i.png output_$VAR1.png; VAR1=$(($VAR1+1)); done
    # Turn into video: 
    # ffmpeg -r 60 -f image2 -s 1024x1024 -i output_%d.png -vcodec libx264 -crf 25 test.mp4
    # This does not really create github compatible gifs and videos

# Apply kernels, show and/or save final results

In [26]:
# Time can be saved by creating and saving the gaussian filters as a separate variable (think in the same style as the kernels dictionary)
my_kernels = [create_gaussian_filter(1.4,2),"Edge Detect"]
my_kernel_names = ["Gaussian", "Edge Detect"]
# DATA_PATH was defined at the start
# my_image_location = DATA_PATH # Path('../../data/raw/test_data')
my_image_location = Path('../../data/raw/back_h0_128mm_test015')
# image_numbers = range(1800, 15001)
# image_numbers = range(1800, 1801)
image_numbers = range(4400, 4401)
# image_numbers = range(1800, 15001, 20) # An example can be range(0, 708) and it will create a list of 709 values
# Another example can be range(1800, 15001, 10), it will go from 1800, 15000 but increase by 10 every step instead of by 1 (1800, 1810, 1820... 14980, 14990, 15000)
save_path = '../../Outputs/Multi_Kernel/Gaussian-Edge-Close-Close-Open-Gradient-Backlight/'
# threshold_value = 0.32
threshold_value = 0.4
inverse_image_alphas = False
save_image = True
verbose = True

# for r in tqdm(image_numbers):
apply_kernels(used_kernels=my_kernels, kernel_names=my_kernel_names, image_loc=my_image_location, image_range=image_numbers, save_location=save_path,
            threshold_value=threshold_value, inverse_image_alphas=inverse_image_alphas, save_image=save_image, alpha_bounds=(200, 750), verbose = verbose)