In [None]:
from pyotf.otf import HanserPSF, apply_aberration, apply_named_aberration
import matplotlib.pyplot as plt
from data.visualise import grid_psfs
import numpy as np

kwargs = dict(
    wl=647,
    na=1.3, 
    ni=1.51,
    res=106,
    zres=10,
    size=32,
    zsize=200,
    vec_corr="none",
    condition="none",
)
psf = HanserPSF(**kwargs)
psf = apply_aberration(psf, np.array([0, 0, 0, 0, 0]), np.array([0, 0, 0, 0, 1]))

blank_psf = psf.PSFi
plt.imshow(grid_psfs(blank_psf).T)
plt.show()

In [None]:
from data.visualise import show_psf_axial
plt.rcParams['figure.figsize'] = [15, 5]
show_psf_axial(blank_psf, '', 15)

In [None]:
blank_psf *= (2**16)
assert blank_psf.max() < np.iinfo(np.uint16).max

In [None]:
from experiments.noise.noise_psf import generate_noisy_psf, EMCCD

def noise_psf(psf, nsr, baseline):
    background_noise = psf.max() * nsr
    if background_noise<0:
        background_noise = 0
    config = {
        'noise_background': background_noise,
        'quantum_efficiency': np.random.normal(0.9, 0.1),
        'read_noise': np.random.normal(74.4, 1),
        'spurious_charge': np.random.normal(0.0002, 1e-5),
        'em_gain': np.random.normal(300.0, 5), 
        'baseline': np.random.normal(baseline, 5), 
        'e_per_adu': 45.0
    }
    noise_config = EMCCD(**config)
    noisy_psf = noise_config.add_noise(psf)
    noisy_psf[noisy_psf<0] = 0
    return noisy_psf

n_test_datasets = 100
test_dataset = np.concatenate([noise_psf(blank_psf, np.random.normal(0.4, 0.3), 250)/2 for _ in range(n_test_datasets)])
coords = np.concatenate([np.zeros((200, 2)) for _ in range(n_test_datasets)])
zs = np.concatenate([np.arange(-1000, 1000, 10) for _ in range(n_test_datasets)])


n_train_datasets = 1000
train_dataset = np.concatenate([noise_psf(blank_psf, np.random.normal(0.4, 0.5), np.random.normal(100, 5)) for _ in range(n_train_datasets)])
train_coords = np.concatenate([np.zeros((200, 2)) for _ in range(n_train_datasets)])
train_zs = np.concatenate([np.arange(-1000, 1000, 10) for _ in range(n_train_datasets)])


In [None]:
from sklearn.model_selection import train_test_split

dataset = {}
for i in [-500, 0, 500]:
    idx = np.argwhere(np.abs(zs-i) < 100).squeeze()
    sub_test_dataset = test_dataset[idx]
    sub_test_coords = coords[idx]
    sub_test_zs = zs[idx]
    dataset[f'test_{i}nm'] = [[sub_test_dataset, sub_test_coords], sub_test_zs]

psf_train, psf_other, coords_train, coords_other, zs_train, zs_other = train_test_split(train_dataset, train_coords, train_zs, train_size=0.8, random_state=42)

dataset['test'] = [[test_dataset, coords], zs]
dataset['train'] = [[psf_train, coords_train], zs_train]
dataset['val'] = [[psf_other, coords_other], zs_other]

for i in [-500, 0, 500]:
    idx = np.argwhere(np.abs(zs_train-i) < 100).squeeze()
    train_reduced_psf = psf_train[idx]
    train_reduced_coords = coords_train[idx]
    train_reduced_zs = zs_train[idx]
    dataset[f'train_{i}nm'] = [[train_reduced_psf, train_reduced_coords], train_reduced_zs] 

In [None]:
for k, v in dataset.items():
    print(k)
    print('     Imgs')
    print('\t', v[0][0].shape)
    print('\t', v[0][0].min(), v[0][0].max())
    print('     Coords')
    print('\t', v[0][1].shape)
    print('     Zs')
    print('\t', v[1].shape)
    print('\t', v[1].min(), v[1].max())

In [None]:
for k, v in dataset.items():
    print(k)
    imgs = dataset[k][0][0][0:10]
    plt.rcParams['figure.figsize'] = [15, 5]
    plt.imshow(grid_psfs(imgs))
    plt.show()

In [None]:
import re


def plot_hists(dataset, query):
    for k, v in dataset.items():
        if re.match(query, k):
            plt.hist(v[0][0].flatten(), label=k, bins=50, histtype=u'step', density=True)
    plt.legend()
    plt.legend()
    plt.yscale('log')
    plt.ylabel('Frequency')
    plt.rcParams['figure.figsize'] = [15, 5]
    plt.xlabel('Pixel value')
    plt.show()

def plot_all_hists():
    plot_hists(dataset, r'^[^_]+$')
    plot_hists(dataset, r'.+_-500nm')
    plot_hists(dataset, r'.+_500nm')
    plot_hists(dataset, r'.+_0nm')

plot_all_hists()



In [None]:
# min_max_norm
def norm_zero_one(img):
    img_max = img.max()
    img_min = img.min()
    return (img - img_min) / (img_max - img_min)

def norm_one_one(img):
    return (2 * norm_zero_one(img)) - 1

for k in dataset.keys():
    dataset[k][0][0] = np.stack([norm_zero_one(img) for img in dataset[k][0][0]])

In [None]:
plot_all_hists()


In [None]:
import tensorflow as tf
for k in dataset.keys():
    dataset[k][0][0] = dataset[k][0][0][:, :, :, np.newaxis]
    dataset[k][0][0] = np.stack([tf.image.grayscale_to_rgb(tf.convert_to_tensor(img)).numpy() for img in dataset[k][0][0]])
    print(k, dataset[k][0][0].shape)

In [2]:
from skimage.transform import resize
import tensorflow as tf
for k in train_dataset.data.keys():
    print(k, train_dataset.data[k][0][0].shape)
    train_dataset.data[k][0][0] = np.stack([tf.image.grayscale_to_rgb(tf.convert_to_tensor(img)).numpy() for img in train_dataset.data[k][0][0]])
    train_dataset.data[k][0][0] = np.stack([resize(img, (32, 32, 1), anti_aliasing=True) for img in train_dataset.data[k][0][0]])
    print(k, train_dataset.data[k][0][0].shape)


NameError: name 'train_dataset' is not defined

In [None]:
%load_ext autoreload
%autoreload 2

dataset2 = dataset.copy()
from model.model import load_new_model, train_model, get_resnet_101

model = get_resnet_101(32, 0.01)
train_model(model, dataset)

In [None]:
# no norm
# Mean error 334.7929205786267
# std error 311.9182186726581

# sample-wise [0,1]
# Mean error 18.256286726335162
# std error 13.497500530378288

# sample wise [-1, 1]
# Mean error 20.571977236271884
# std error 15.336221691580258

In [None]:
import seaborn as sns
import random

test_x, test_y = dataset['test']

pred_y = model.predict(test_x).squeeze()
error = abs(test_y-pred_y)

print(f'Mean error {np.mean(error)}')
print(f'std error {np.std(error)}')

plt.rcParams['figure.figsize'] = [5, 5]
sns.histplot(error)
plt.xlabel('Error (nm)')
plt.yscale('log')

plt.show()

idx = np.argsort(error)[::-1][0:1000]
test_imgs = test_x[0].squeeze()[idx]
plt.rcParams['figure.figsize'] = [30, 50]
plt.imshow(grid_psfs(test_imgs.mean(axis=-1)))
plt.show()

def snr(img):
    return img.max() / np.median(img)

plt.rcParams['figure.figsize'] = [3, 5]
plt.scatter(test_y, pred_y)
plt.show()
error = abs(pred_y-test_y)
plt.boxplot(error)
plt.show()



In [None]:
pred_z = model.predict(dataset['train'][0]).squeeze()
plt.plot(dataset['train'][1], pred_z)
plt.show()

In [None]:
train_zs = dataset['train'][1]
idx = np.argwhere(train_zs==0).squeeze()
img = dataset['train'][0][0][idx][0:10]
print(img.shape)
# model.predict((img, np.zeros((10, 2))))

In [None]:
pred_z = model.predict((dataset['val'][0][0], dataset['val'][0][1])).squeeze()
error = abs(dataset['val'][1]-pred_z)

best_img_idx = np.argmin(error)
img = dataset['val'][0][0][best_img_idx:best_img_idx+1]
coord = dataset['val'][0][1][best_img_idx:best_img_idx+1]
z = dataset['val'][1][best_img_idx:best_img_idx+1]


In [None]:

def create_circular_mask(h, w, center=None, radius=None):

    if center is None: # use the middle of the image
        center = (int(w/2), int(h/2))
    if radius is None: # use the smallest distance between the center and image walls
        radius = min(center[0], center[1], w-center[0], h-center[1])

    Y, X = np.ogrid[:h, :w]
    dist_from_center = np.sqrt((X - center[0])**2 + (Y-center[1])**2).astype(int)

    mask = dist_from_center <= radius
    mask = mask[:, :, np.newaxis]
    mask = np.repeat(mask, 3, axis=-1)
    return mask
mask = create_circular_mask(32, 32, radius=8)

In [None]:
img = img[0:1]
for _ in range(10):
    nsr = 0.01
    noise = np.random.normal(loc=nsr, scale=0.01, size=img.shape[1:])
    print(noise.shape)
    print(mask.shape)
#     noise[~mask] = 0
    psf = img + noise
    psf = norm_zero_one(psf)
    print(psf.min(), psf.max())
    print(model.predict((psf, np.zeros((1, 2)))))
    