In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib notebook

import matplotlib.pyplot as plt

try:
    import pyfftw as fftw
except:
    pass

try:
    import arrayfire as af
except:
    pass

import scipy as sp
import numpy as np

import llops as yp
import ndoperators as ops

# Numpy-Based Operators

## Create Object

In [None]:
sz = (2000, 2000)
x = yp.rand(sz)

## CPU/Numpy FFT Function Test

In [None]:
fftw.interfaces.cache.enable()

def numpy_fft(x):
    return np.fft.fft2(x)

def scipy_fft(x):
    return sp.fftpack.fft2(x)

def fftw_fft(x):
    return fftw.interfaces.numpy_fft.fft2(x)

for f in [numpy_fft, scipy_fft, fftw_fft]:
    print(f)
    %timeit f(x)

## CPU/Numpy FFT Operator Test

In [None]:
F_np = ops.FourierTransform(sz, backend='numpy', fft_backend='numpy', center=False, normalize=False, pad=False)
F_sp = ops.FourierTransform(sz, backend='numpy', fft_backend='scipy', center=False, normalize=False, pad=False)
F_fftw = ops.FourierTransform(sz, backend='numpy', fft_backend='fftw', center=False, normalize=False, pad=False)

print('Unvectorized versions:')
for F in [F_np, F_sp, F_fftw]:
    print(F)
    %timeit F * x
    
print('Vectorized versions:')
for F in [F_np, F_sp, F_fftw]:
    print(F)
    %timeit F * x.ravel()

## Normalization Test

In [None]:
sz_0 = (100,100)
sz_1 = (50,50)

# Create fake object
x = yp.zeros(sz_0)
x[sz_1[0] // 4: 3* sz_1[0] // 4, sz_1[1] // 4: 3* sz_1[1] // 4] = yp.rand((sz_1[0] // 2, sz_1[1] // 2))
x /= yp.mean(yp.abs(x))

# Generate FFT operators
F_list_n = []
F_list_un = []
for fft_backend in yp.valid_fft_backends:
    F_list_n.append(ops.FourierTransform(sz_0, backend='numpy', fft_backend=fft_backend, center=True, normalize=True, pad=False))
    F_list_un.append(ops.FourierTransform(sz_0, backend='numpy', fft_backend=fft_backend, center=True, normalize=False, pad=False))

# Check energy in Fourier domain
x_energy = yp.sumb(yp.abs(x) ** 2)
energies_unnormalized = [yp.sumb(yp.abs(F * x) ** 2) / yp.size(x) for F in F_list_n]
energies_normalized = [yp.sumb(yp.abs(F * x) ** 2) / yp.size(x) for F in F_list_un]

print(x_energy)
print(energies_unnormalized)
print(energies_normalized)

assert all([abs(energy_normalized - x_energy) < 1e-5 * x_energy for energy_normalized in energies_normalized])
assert all([abs(energy_unnormalized * yp.size(x) - x_energy) < 1e-4 * x_energy for energy_unnormalized in energies_unnormalized])

# Check inversion
assert all([yp.scalar(yp.sumb(yp.abs(F.H * F * x - x))) < 1e-6  * x_energy for F in F_list_un])
assert all([yp.scalar(yp.sumb(yp.abs(F.H * F * x - x))) < 1e-6  * x_energy for F in F_list_n])


## Convolution Test

In [None]:
# Create blur kernel
h = yp.zeros(sz_0)
h[sz_1[0] // 2:sz_1[0] // 2 + 1, sz_1[1] // 4: 3* sz_1[1] // 4,] = 1

# Create fake object
x = yp.zeros(h.shape)
x[sz_1[0] // 4: 3* sz_1[0] // 4, sz_1[1] // 4: 3* sz_1[1] // 4] = yp.rand((sz_1[0] // 2, sz_1[1] // 2))

# Generate forward operators
A_list = [F.H * ops.Diagonalize(F * h) * F for F in F_list_un]

# Generate measurements
y_list = [A * x for A in A_list]
y_sum = [yp.scalar(yp.sumb(yp.abs(A * x))) for A in A_list]

# Check values
assert all([delta < 1e-3 for delta in yp.sumb(yp.abs(x)) * yp.sumb(yp.abs(h)) - y_sum])

plt.figure(figsize=(8,4))
plt.subplot(121)
plt.imshow(np.abs(x))
plt.colorbar()
plt.subplot(122)
plt.imshow(np.abs(y_list[0]))
plt.colorbar()

# Arrayfire-Based Operators

In [None]:
sz = (2000, 2000)

## GPU/Arrayfire FFT Functon test

In [None]:
# print('CPU:')
# af.set_backend('cpu')
# I1 = yp.rand(sz, backend='arrayfire')
# %timeit af.signal.fft2(I1)

if 'opencl' in af.get_available_backends():
    print('OpenCL:')
    af.set_backend('opencl')
    I2 = yp.rand(sz, backend='arrayfire')
    %timeit af.signal.fft(I2)

if 'cuda' in af.get_available_backends():
    print('CUDA:')
    af.set_backend('cuda')
    I3 = yp.rand(sz, backend='arrayfire')
    %timeit af.signal.fft(I3)

## GPU/Arrayfire Operator Test

In [None]:
# af.set_backend('cpu:')
# I1 = yp.rand(sz, backend='arrayfire')
# F_cpu = ops.FourierTransform(sz, backend='arrayfire', center=False, normalize=False, pad=False)
# print(af.get_backend_id(I1))
# %timeit F_cpu * I1

af.set_backend('opencl')
I2 = yp.rand(sz, backend='arrayfire')
F_ocl = ops.FourierTransform(sz, backend='arrayfire', center=False, normalize=False, pad=False)
print(af.get_backend_id(I2))
%timeit F_ocl * I2

af.set_backend('cuda')
I3 = yp.rand(sz, backend='arrayfire')
F_cuda = ops.FourierTransform(sz, backend='arrayfire', center=False, normalize=False, pad=False)
print(af.get_backend_id(I3))
%timeit F_cuda * I3

## GPU/Arrayfire Normalization Test

In [None]:
backend = 'numpy'
sz_0 = (100,100)
sz_1 = (50,50)

# Create fake object
x = yp.zeros(sz_0, backend=backend)
x[sz_1[0] // 4: 3* sz_1[0] // 4, sz_1[1] // 4: 3* sz_1[1] // 4] = yp.rand((sz_1[0] // 2, sz_1[1] // 2), backend=backend)
x /= yp.mean(yp.abs(x))

# Generate FFT operators
F_list_n = []
F_list_un = []

for fft_backend in yp.valid_fft_backends:
    F_list_n.append(ops.FourierTransform(sz_0, backend=backend, fft_backend=fft_backend, center=True, normalize=True, pad=False))
    F_list_un.append(ops.FourierTransform(sz_0, backend=backend, fft_backend=fft_backend, center=True, normalize=False, pad=False))

# Check energy in Fourier domain
x_energy = yp.sumb(yp.abs(x) ** 2)
energies_unnormalized = [yp.sumb(yp.abs(F * x) ** 2) / yp.size(x) for F in F_list_n]
energies_normalized = [yp.sumb(yp.abs(F * x) ** 2) / yp.size(x) for F in F_list_un]

print(x_energy)
print(energies_unnormalized)
print(energies_normalized)

assert all([abs(energy_normalized - x_energy) < 1e-5 * x_energy for energy_normalized in energies_normalized])
assert all([abs(energy_unnormalized * yp.size(x) - x_energy) < 1e-4 * x_energy for energy_unnormalized in energies_unnormalized])

# Check inversion
assert all([yp.scalar(yp.sumb(yp.abs(F.H * F * x - x))) < 1e-6  * x_energy for F in F_list_un])
assert all([yp.scalar(yp.sumb(yp.abs(F.H * F * x - x))) < 1e-6  * x_energy for F in F_list_n])



## Numpy / Arrayfire Transform Similarity

In [None]:
normalize = False
sz = yp.shape(x)
x = yp.changeBackend(x, 'arrayfire')
F_af = ops.FourierTransform(sz, center=True, backend='arrayfire', normalize=normalize)
F_np = ops.FourierTransform(sz, center=True, backend='numpy', normalize=normalize)

plt.figure()
plt.subplot(221)
plt.imshow(yp.abs(yp.changeBackend(F_af * x, 'numpy')))
plt.title('Foward, af')
plt.colorbar()

plt.subplot(222)
plt.imshow(yp.abs(F_np * yp.changeBackend(x, 'numpy')))
plt.title('Foward, numpy')
plt.colorbar()

plt.subplot(223)
plt.imshow(yp.abs(yp.changeBackend(F_af.H * x, 'numpy')))
plt.title('Inverse, af')
plt.colorbar()

plt.subplot(224)
plt.imshow(yp.abs(F_np.H * yp.changeBackend(x, 'numpy')))
plt.title('Inverse, numpy')
plt.colorbar()

## Convolution Test

In [None]:
backend = 'arrayfire'

h = yp.zeros(sz_0, backend=backend)
h[sz_0[0] // 4 * 3, sz_0[1] // 4 * 3] = 1

# Create blur kernel
h = yp.changeBackend(h, backend)
h /= yp.sumb(yp.abs(h))

# Generate forward operators
F = ops.FourierTransform(sz_0, backend=backend, center=True, normalize=False, pad=False)
A = F.H * ops.Diagonalize(h, inside_operator=F) * F

# Generate measurements
y = A * yp.changeBackend(x, backend)
y_sum = yp.scalar(yp.sumb(yp.abs(y)))

# Check values
assert yp.scalar(yp.sumb(yp.abs(x)) * yp.sumb(yp.abs(h))) - y_sum

plt.figure(figsize=(11,4))
plt.subplot(131)
plt.imshow(np.abs(x))
plt.colorbar()
plt.subplot(132)
plt.imshow(np.abs(h))
plt.colorbar()
plt.subplot(133)
plt.imshow(np.abs(y))
plt.colorbar()

In [None]:
h = yp.zeros(sz_0)
h[sz_1[0] // 2:sz_1[0] // 2 + 1, sz_1[1] // 4: 3* sz_1[1] // 4,] = 1

# Create blur kernel
h = yp.changeBackend(h, 'arrayfire')
h /= yp.sumb(yp.abs(h))

# Generate forward operators
F = ops.FourierTransform(sz_0, backend='arrayfire', center=False, normalize=False, pad=False)
A = F.H * ops.Diagonalize(h, inside_operator=F) * F

# Generate measurements
y = A * x
y_sum = yp.scalar(yp.sumb(yp.abs(y)))

# Check values
assert yp.scalar(yp.sumb(yp.abs(x)) * yp.sumb(yp.abs(h))) - y_sum

plt.figure(figsize=(8,4))
plt.subplot(121)
plt.imshow(np.abs(x))
plt.colorbar()
plt.subplot(122)
plt.imshow(np.abs(y))
plt.colorbar()