<a href="https://colab.research.google.com/github/rubyvanrooyen/NIFTyworkshop/blob/master/NIFTy_Example_WienerFilter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!git clone -b NIFTy_6 --single-branch https://gitlab.mpcdf.mpg.de/ift/nifty.git
!pip install ./nifty

In [None]:
import nifty6 as ift
import numpy
import numpy as np
from matplotlib import pylab

In [None]:
# dummy function for demo
def pow_spec(k):
    P0, k0, gamma = [.2, 5, 6]
    return P0 * (1. + (k/k0)**2)**(- gamma / 2)

# generate example dataset
N_pixels = 512     # Number of pixels
# assume a periodic signal and define a regularly sampled grid (to start with)
# -- regular grid space = 1d with length N pixels
s_space = ift.RGSpace(N_pixels)
print(s_space)

# get the harmonic space field
h_space = s_space.get_default_codomain()
# basically just an FFT, but actually a hartley transform
HT = ift.HarmonicTransformOperator(h_space, target=s_space)
print(HT)

# Operators
# diagonal operator with power spectrum on diagonal
S = ift.create_power_operator(h_space, power_spectrum=pow_spec)
print(S)

R = HT
print(R)

# fft = FFTOperator(s_space)
# print(fft.target)
# h_space = fft.target[0]
# print(h_space)
# R = HT*ift.create_harmonic_smoothing_operator((h_space,), 0, 0.02)
# print(R)

# Fields and data
# create a fake signal
# draw a sample form the zero-centered Gaussian distribution with S as covariance
s = S.draw_sample_with_dtype(dtype=np.float64)
# (ground truth  - the signal we know)
print(s.val.shape)
# prior sample with this given signal
noiseless_data=R(s)
print(noiseless_data.val.shape)
# s is ground truth -- R(s) is in real space 

# define the noise operator
noise_amplitude = np.sqrt(0.2) # replace with the actual noise covariance you have
N = ift.ScalingOperator(s_space, noise_amplitude**2) # diagonal operator with the same number of the diagonal
n0 = N.draw_sample_with_dtype(dtype=np.float64)
print(n0.val.shape)
n1 = ift.Field.from_random(domain=s_space, random_type='normal',
                          std=noise_amplitude, mean=0)
print(n1.val.shape)

# to calculate d you need to generate fields s and n with given covariances
# -- define what fields are in the INF sense of the way
d = noiseless_data + n1
print(d.val.shape)

fig, (ax0, ax1, ax2) = pylab.subplots(3, 1, figsize=(15, 11), facecolor='white')
ax0.plot(s.val, 'y-', noiseless_data.val, 'k')
ax1.plot(n0.val, 'b-', n1.val, 'r-')
ax2.plot(d.val, 'g-')
pylab.show()

IFT starting point: $d=Rs+n$
Typically, $s$ is a continuous field/map, $d$ a discrete data vector and $R$  is not invertible.

Assumption:
* $d=Rs+n$, with $R$ a linear response operator.


Example Wiener filter implementation with NIFTy

In [None]:
# we will now start off assuming data 'd' as given
data = d

$P(s) = G(s,S)$, $P(n)=G(n,N)$ where S, N are positive definite matrices.

The Posterior is given by:
$P (s|d) \propto P(s,d) = G(d-Rs,N) \, G(s,S) \propto G (s-m,D)$

where

$\begin{align}
m &= Dj \\
D^{-1}&= (S^{-1} +R^\dagger N^{-1} R )\\
j &= R^\dagger N^{-1} d
\end{align}$

We assume statistical homogeneity and isotropy.     
Therefore the signal covariance $S$ is diagonal in harmonic space, and is described by a one-dimensional power spectrum.    
* number of data points Npix=len(d).
* reconstruction in harmonic space.
* response operator: $R = FFT_{\text{harmonic} \rightarrow \text{position}}$

$D$ is defined via:
$D^{-1} = S_h^{-1} + R^\dagger N^{-1} R$

In the end, we want to apply $D$ to $j$, i.e. we need the inverse action of $D^{-1}$.
This is done numerically

In [None]:
# One-dimensional regular grid
position_space = ift.RGSpace([len(data.val)])
# Specify harmonic space corresponding to signal
harmonic_space = position_space.get_default_codomain()
# Harmonic transform from harmonic space to position space
HT = ift.HarmonicTransformOperator(harmonic_space, target=position_space)

# Set prior correlation covariance with a power spectrum leading to
# homogeneous and isotropic statistics
# 1D spectral space on which the power spectrum is defined
Sh = ift.create_power_operator(harmonic_space, power_spectrum=pow_spec)

# The response operator consists of
# - a harmonic transform (to get to image space)
R = HT

# Set the noise covariance N
noise = 0.2
N = ift.ScalingOperator(position_space, noise)

In [None]:
# Build inverse propagator D and information source j
j = R.adjoint_times(N.inverse_times(data))

# Make D_inv invertible (via Conjugate Gradient)
# conjugate gradient for inverse of matrix -- solves Ax=b (given A and b) for x
# computes a pseudo inverse with some conditions for A (A must be positive definite)
ic = ift.GradientNormController(iteration_limit=50000, tol_abs_gradnorm=0.1)
# WienerFilterCurvature is (R.adjoint@N.inverse@R + Sh.inverse) plus some handy helper methods.
D = ift.WienerFilterCurvature(R, N, S, iteration_controller=ic, iteration_controller_sampling=ic).inverse

Run the Wiener filter


In [None]:
# Calculate WIENER FILTER solution
m = D(j)

In [None]:
# Get signal data and reconstruction data
# R(m) posterior mean -- recontructed signal
m_data = R(m).val
# corrupt measured signal
d_data = data.val

# recontruct and compare
fig, ax = pylab.subplots(1, 1, figsize=(15, 3), facecolor='white')
# R(s) ground truth -- the signal we know
s_data = R(s).val
ax.plot(s_data, 'r', label="Signal", linewidth=3)

ax.plot(d_data, 'k.', label="Data")
ax.plot(m_data, 'k', label="Reconstruction",linewidth=3)
ax.set_title("Reconstruction")
pylab.legend()
pylab.show()

In [None]:
fig, ax = pylab.subplots(1, 1, figsize=(15, 3), facecolor='white')
ax.plot(s_data - s_data, 'r', label="Signal", linewidth=3)
ax.plot(d_data - s_data, 'k.', label="Data")
ax.plot(m_data - s_data, 'k', label="Reconstruction",linewidth=3)
ax.axhspan(-noise_amplitude,noise_amplitude, facecolor='0.9', alpha=.5)
ax.set_title("Residuals")
pylab.legend()
pylab.show()

In [None]:
s_power_data = ift.power_analyze(s).val
m_power_data = ift.power_analyze(m).val
fig, ax = pylab.subplots(1, 1, figsize=(15, 3), facecolor='white')
pylab.loglog()
ax.set_xlim(1, int(N_pixels/2))
ymin = min(m_power_data)
ax.set_ylim(ymin, 1)
xs = np.arange(1,int(N_pixels/2),.1)
ax.plot(xs, pow_spec(xs), label="True Power Spectrum", color='k',alpha=0.5)
ax.plot(s_power_data, 'r', label="Signal")
ax.plot(m_power_data, 'k', label="Reconstruction")
ax.axhline(noise_amplitude**2 / N_pixels, color="k", linestyle='--', label="Noise level", alpha=.5)
ax.axhspan(noise_amplitude**2 / N_pixels, ymin, facecolor='0.9', alpha=.5)
ax.set_title("Power Spectrum")
pylab.legend()
pylab.show()