# Optical flow Gaussian volume denoising

In [None]:
use_OpenCV = True
local_debug = True

In [None]:
import logging
import numpy as np
import scipy.ndimage
import matplotlib.pyplot as plt
#from ipywidgets import *
import cv2
import time
#import kernels
import skimage

In [None]:
import logging
logging.basicConfig(format="[%(filename)s:%(lineno)s %(funcName)s()] %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)

In [None]:
if local_debug:
    !ln -sf ../../information_theory/src/information_theory/ .
else:
    !pip install "information_theory @ git+https://github.com/vicente-gonzalez-ruiz/information_theory"
import information_theory  # pip install "information_theory @ git+https://github.com/vicente-gonzalez-ruiz/information_theory"

In [None]:
if local_debug:
    !ln -sf ../../motion_estimation/src/motion_estimation/ .
else:
    !pip install "motion_estimation @ git+https://github.com/vicente-gonzalez-ruiz/motion_estimation"
if use_OpenCV:
    from motion_estimation._2D.farneback_OpenCV import Estimator_in_CPU as Estimator
else:
    from motion_estimation._2D.farneback_python import Estimator
from motion_estimation._2D.project import project

In [None]:
from numpy.linalg import LinAlgError

In [None]:
noisy = skimage.io.imread("small_vol.tif", plugin="tifffile").astype(np.float32)

In [None]:
plt.imshow(noisy[16], cmap="gray")

In [None]:
def get_kernel(sigma=1.5):
    number_of_coeffs = 3
    number_of_zeros = 0
    while number_of_zeros < 2:
        delta = np.zeros(number_of_coeffs)
        delta[delta.size//2] = 1
        coeffs = scipy.ndimage.gaussian_filter1d(delta, sigma=sigma)
        number_of_zeros = coeffs.size - np.count_nonzero(coeffs)
        number_of_coeffs += 1
    return coeffs[1:-1]

In [None]:
def project_A_to_B(A, B):
    return A

In [None]:
def filter_Z(vol, kernel, mean):
    KL = kernel.size
    KL2 = KL//2
    extended_vol = np.full(fill_value=mean, shape=(vol.shape[0] + KL, vol.shape[1], vol.shape[2]))
    extended_vol[KL2:vol.shape[0] + KL2, ...] = vol[...]
    filtered_vol = []
    Z_dim = vol.shape[0]
    Y_dim = vol.shape[1]
    X_dim = vol.shape[2]
    for z in range(Z_dim):
        YX_plane = np.zeros(shape=(Y_dim, X_dim), dtype=np.float32)
        for i in range(KL):
            plane = project_A_to_B(A=extended_vol[z + i, ...], B=extended_vol[z, ...])
            YX_plane += plane * kernel[i]
        filtered_vol.append(YX_plane)
    filtered_vol = np.stack(filtered_vol, axis=0)
    return filtered_vol

def _filter_Y(vol, kernel, mean):
    vol = np.transpose(vol, (1, 0, 2))
    filtered_vol = filter_Z(vol, kernel, mean)
    filtered_vol = np.transpose(filtered_vol, (1, 0, 2))
    return filtered_vol

def filter_Y(vol, kernel, mean):
    KL = kernel.size
    KL2 = KL//2
    extended_vol = np.full(fill_value=mean, shape=(vol.shape[0], vol.shape[1] + KL, vol.shape[2]))
    extended_vol[:, KL2:vol.shape[1] + KL2, :] = vol[...]
    filtered_vol = []
    Z_dim = vol.shape[0]
    Y_dim = vol.shape[1]
    X_dim = vol.shape[2]
    for y in range(Y_dim):
        ZX_plane = np.zeros(shape=(Z_dim, X_dim), dtype=np.float32)
        for i in range(KL):
            plane = project_A_to_B(A=extended_vol[:, y + i, :], B=extended_vol[:, y, :])
            ZX_plane += plane * kernel[i]
        filtered_vol.append(ZX_plane)
    filtered_vol = np.stack(filtered_vol, axis=1)
    return filtered_vol

def _filter_X(vol, kernel, mean):
    vol = np.transpose(vol, (2, 1, 0))
    filtered_vol = filter_Z(vol, kernel, mean)
    filtered_vol = np.transpose(filtered_vol, (2, 1, 0))
    return filtered_vol

def filter_X(vol, kernel, mean):
    KL = kernel.size
    KL2 = KL//2
    extended_vol = np.full(fill_value=mean, shape=(vol.shape[0], vol.shape[1], vol.shape[2] + KL))
    extended_vol[..., KL2:vol.shape[2] + KL2] = vol[...]
    filtered_vol = []
    Z_dim = vol.shape[0]
    Y_dim = vol.shape[1]
    X_dim = vol.shape[2]
    for x in range(X_dim):
        ZY_plane = np.zeros(shape=(Z_dim, Y_dim), dtype=np.float32)
        for i in range(KL):
            plane = project_A_to_B(A=extended_vol[..., x + i], B=extended_vol[..., x])
            ZY_plane += plane * kernel[i]
        filtered_vol.append(ZY_plane)
    filtered_vol = np.stack(filtered_vol, axis=2)
    return filtered_vol

def filter(vol, kernel, mean):
    vol = filter_Z(vol, kernel, mean)
    vol = filter_Y(vol, kernel, mean)
    vol = filter_X(vol, kernel, mean)
    return vol

In [None]:
kernel = get_kernel(2.5)
print(kernel)
print(np.sum(kernel))
plt.plot(kernel)
plt.show()

In [None]:
denoised = filter(noisy, kernel, mean=128)

In [None]:
plt.imshow(denoised[16], cmap="gray")

In [None]:
estimator = Estimator(logger, pyr_levels=3, sigma_poly=0.5, win_side=5, num_iters=3, flags=0)

def project_A_to_B(A, B):
    try:
        flow = estimator.pyramid_get_flow(target=B, reference=A, flow=None)
    except LinAlgError as e:
        print(f"Caught exception: {e}")
        self.counter += 1
        return A
    projection = project(logger, image=A.astype(np.uint8), flow=flow)
    return projection

In [None]:
denoised = filter(noisy, kernel, mean=128)

In [None]:
plt.imshow(denoised[16], cmap="gray")

In [None]:
input()

In [None]:
import denoising.image.OF_gaussian as denoising

In [None]:
if use_OpenCV:
    denoiser = gaussian_denoising.Monochrome_Image_OF_Gaussian_Denoising(sigma_gaussian=sigma_gaussian, l=3, w=5, sigma_OF=1.0, verbosity=logging.DEBUG)
else:
    denoiser = denoising.Monochrome_Denoising(logger, pyr_levels=2, win_side=27, num_iters=3, sigma_poly=4.0)

In [None]:
kernel = denoiser.get_kernel(1.5)
print(kernel)
print(np.sum(kernel))
plt.plot(kernel)
plt.show()

In [None]:
denoised = filter(vol, kernel)

In [None]:
denoised = np.clip(denoised_img, a_min=0, a_max=255)

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(16, 32))
axs[0].imshow(GT, cmap="gray")
axs[0].set_title("Original")
axs[1].imshow(noisy, cmap="gray")
axs[1].set_title("Noisy")
axs[2].imshow(denoised, cmap="gray")
axs[2].set_title(f"Denoised ({information_theory.distortion.avg_PSNR(denoised, GT):4.2f} dB)")
fig.tight_layout()
plt.show()

In [None]:
input()

In [None]:
if use_OpenCV:
    def filter(img, Estimator, l=3, w=5):
        w2 = (w + 1) // 2
        estimator = Estimator(win_side=w, pyr_levels=l, verbosity=logging.WARNING, poly_sigma=0.5)
        A = np.zeros_like(img, dtype=np.int16)
        for i in range(img.shape[0] - w):
            print(i, end=' ')
            R = img[i:i + w]
            T = img[i + 1: i + 1 + w].astype(np.float32)
            initial_flow = np.zeros(shape=(R.shape[0], R.shape[1], 2), dtype=np.float32)
            flow = estimator.pyramid_get_flow(target=T, reference=R, flow=initial_flow)
            compensated_slice = project(R, flow)
            compensated_line = compensated_slice[(w + 1) >> 1, :]
            A[i + w2] = (T[(w + 1) >> 1, :] + compensated_line) / 2
        return A
else:
    from numpy.linalg import LinAlgError
    def filter(img, Estimator, l=3, sigma_poly=4.0, sigma_flow=4.0):
        estimator = Estimator(logger, pyr_levels=l, sigma_poly=sigma_poly, sigma_flow=sigma_flow, num_iters=3)
        A = np.zeros_like(img, dtype=np.int16)
        for i in range(img.shape[0]-1):
            print(i, end=' ')
            R = img[i]
            T = img[i + 1].astype(np.float32)
            initial_flow = np.zeros_like(R, dtype=np.float32)
            try:
                initial_flow = np.zeros_like(R, dtype=np.float32)
                flow = estimator.pyramid_get_flow(target=T, reference=R, flow=initial_flow)
            except LinAlgError as e:
                print(f"Caught LinAlgError: {e}")
                flow = np.zeros_like(R)
            projected_line = project(logger, R, flow)
            print("avg flow=", np.average(np.abs(flow)))
            print(np.max(projected_line))
            A[i] = (T + projected_line) / 2
        return A

In [None]:
denoised = noisy.copy()

In [None]:
if use_OpenCV:
    denoised = filter(denoised, Estimator, w=5, l=3)
else:
    denoised = filter(denoised, Estimator, l=3, sigma_poly=4.0, sigma_flow=4.0)
denoised = np.transpose(denoised, (1, 0))
if use_OpenCV:
    denoised = filter(denoised, Estimator, w=2, l=3)
else:
    denoised = filter(denoised, Estimator, l=3, sigma_poly=4.0, sigma_flow=4.0)
denoised = np.transpose(denoised, (1, 0))
plt.imshow(denoised.astype(np.float32), cmap="gray")

In [None]:
plt.imshow(denoised[40:480,40:480], cmap="gray")

In [None]:
plt.imshow(d, cmap="gray")

In [None]:
np.min(denoised)

In [None]:
plt.imshow(noisy, cmap="gray")

In [None]:
input()

In [None]:
#import denoising.image.OF_gaussian as denoising
!pip install "image_denoising @ git+https://github.com/microscopy-processing/image_denoising"
!pip show image_denoising
import image_denoising
import logging
from image_denoising import OF_gaussian
image_denoising.OF_gaussian.logger.setLevel(logging.WARNING)
logging.getLevelName(image_denoising.OF_gaussian.logger.getEffectiveLevel())

In [None]:
%%time

OFGD__sigma_kernel = 2.5
OFGD__N_iters = 1
OFGD__l = 4
OFGD__w = 25
OFGD__sigma_OF = 2.0

denoised_img, _ = image_denoising.OF_gaussian.filter_gray_image(noisy_img, OFGD__sigma_kernel, OFGD__N_iters, OFGD__l, OFGD__w, OFGD__sigma_OF)

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(16, 32))
axs[0].imshow(img, cmap="gray")
axs[0].set_title("Original")
axs[1].imshow(noisy_img, cmap="gray")
axs[1].set_title("Noisy")
axs[2].imshow(denoised_img, cmap="gray")
axs[2].set_title(f"Denoised ({information_theory.distortion.avg_PSNR(denoised_img, img):4.2f} dB)")
fig.tight_layout()
plt.show()

In [None]:
import denoising.image.OF_gaussian as denoising

In [None]:
import logging

In [None]:
sigma = 2.0

In [None]:
denoiser = denoising.Monochrome_Denoising(verbosity=logging.DEBUG, sigma_poly=4.0, sigma_flow=4.0, pyr_levels=2)
#denoiser = gaussian_denoising.Monochrome_Image_OF_Gaussian_Denoising(sigma_gaussian=sigma_gaussian, l=3, w=5, sigma_OF=1.0, verbosity=logging.DEBUG)

In [None]:
kernel = denoiser.get_kernel(sigma)
print(kernel)
print(np.sum(kernel))
plt.plot(kernel)
plt.show()

In [None]:
denoised_img, _ = denoiser.filter_iterate(noisy_img=img, GT=img, sigma=sigma)

In [None]:
denoised_img = np.clip(denoised_img, a_min=0, a_max=255)

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(16, 32))
axs[0].imshow(img, cmap="gray")
axs[0].set_title("Original")
axs[1].imshow(noisy_img, cmap="gray")
axs[1].set_title("Noisy")
axs[2].imshow(denoised_img, cmap="gray")
axs[2].set_title(f"Denoised ({information_theory.distortion.avg_PSNR(denoised_img, img):4.2f} dB)")
fig.tight_layout()
plt.show()

In [None]:
img = skimage_io.imread("http://www.hpca.ual.es/~vruiz/images/lena.png")

In [None]:
plt.imshow(img)

In [None]:
mean = 0
var = 1000
sigma = var**0.5
noise = np.random.normal(mean,sigma,img.shape).reshape(img.shape)
noisy_img = np.clip(a=img.astype(np.float32) + noise, a_min=0, a_max=255).astype(np.uint8)

In [None]:
plt.imshow(noisy_img)

In [None]:
sigma_gaussian = 2.0

In [None]:
denoiser = gaussian_denoising.Color_Image_OF_Gaussian_Denoising(
    sigma_gaussian=sigma_gaussian, l=3, w=5, sigma_OF=1.0, verbosity=logging.DEBUG)

In [None]:
denoised_img, _ = denoiser.filter(noisy_image=noisy_img, GT=img)
denoised_img = denoised_img.astype(np.uint8)

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(16, 32))
axs[0].imshow(img)
axs[0].set_title("Original")
axs[1].imshow(noisy_img)
axs[1].set_title("Noisy")
axs[2].imshow(denoised_img)
axs[2].set_title("Denoised")
fig.tight_layout()
plt.show()