# Inference with experimental and synthetic patterns

by Victor Hugo Flores Muñoz

In [None]:
from IPython import display
from tensorflow.python.ops.numpy_ops import np_config
from sklearn.utils import shuffle
from zernike import RZern
import cv2
import json
import matplotlib.pyplot as plt
import numpy as np
import os
import pickle
import random
import tensorflow as tf
import time

## 1. Create Datasets

In [None]:
def normaliza(A):
    mask = np.isnan(A)
    B = np.nan_to_num(A, nan=0)
    C = (B - B.min())/(B.max() - B.min()) * 2 - 1
    C[mask] = 0.0
    return C

def normaliza_pos(A):
    mask = np.isnan(A)
    B = np.nan_to_num(A, nan=0)
    C = (B - B.min())/(B.max() - B.min())
    C[mask] = 0.0
    return C

def generate_mask(reference):
    mask = np.isnan(reference)
    mask = np.logical_not(mask)
    return mask

HEIGHT = 256
WIDTH = 256
cart = RZern(2)
ddx = np.linspace(-1.0, 1.0, WIDTH)
ddy = np.linspace(-1.0, 1.0, HEIGHT)
xv, yv = np.meshgrid(ddx, ddy)
cart.make_cart_grid(xv, yv)
num_coef = cart.nk
print("Numero de coeficientes: ", num_coef)
Phi = cart.eval_grid(np.array([0,1,0,0,0,0]), matrix=True)
MASK = generate_mask(Phi)

### 1.1 Synthetic Dataset

In [None]:
def GeneradorZ(num_samples=50000, num_coef=10, h=256, w=256):
    y = np.empty((num_samples, num_coef))
    X = np.empty((num_samples, h, w))
    Xcos = np.empty((num_samples, h, w))
    for k in range(num_samples):
        y[k] = [
            random.random() * 20 - 10,
            random.random() * 30 - 15,
            random.random() * 30 - 15,
            random.random() * 30 - 15,
            random.random() * 30 - 15,
            random.random() * 30 - 15
        ]
        Phi = cart.eval_grid(y[k], matrix=True)
        X[k,:,:] = normaliza_pos(Phi)
        Xcos[k,:,:] = normaliza_pos(np.cos(Phi))
    return X, y, Xcos

def GeneradorZ_sparse(num_samples=50000, num_coef=10, h=256, w=256):
    y = np.empty((num_samples,num_coef))
    X = np.empty((num_samples, h, w))
    Xcos = np.empty((num_samples, h, w))
    for k in range(num_samples):
        y[k] = np.zeros(num_coef)
        n_terms = np.random.randint(1,num_coef)
        for cont in range(n_terms):
            index = np.random.randint(num_coef)
            if index == 0:
                y[k, index] = random.random() * 20 - 10
            else:
                y[k, index] = random.random() * 30 - 15
        Phi = cart.eval_grid(y[k], matrix=True)
        X[k,:,:] = normaliza_pos(Phi)
        Xcos[k,:,:] = normaliza_pos(np.cos(Phi))
    return X, y, Xcos

In [None]:
NUM_SAMPLES = 1000

X1, y1, Xcos1 = GeneradorZ(num_samples=NUM_SAMPLES//2, 
                           num_coef=num_coef, 
                           h=HEIGHT, 
                           w=WIDTH)
X2, y2, Xcos2 = GeneradorZ_sparse(num_samples=NUM_SAMPLES//2, 
                                  num_coef=num_coef, 
                                  h=HEIGHT, 
                                  w=WIDTH)
X = np.concatenate((X1, X2))
y = np.concatenate((y1, y2))
Xcos = np.concatenate((Xcos1, Xcos2))

In [None]:
BATCH_SIZE = 64
X, y, Xcos= shuffle(X, y, Xcos)
synthetic_dataset = tf.data.Dataset.from_tensor_slices((X, y))
synthetic_dataset = synthetic_dataset.batch(BATCH_SIZE)

### 1.2 Experimental Dataset

In [None]:
experimental_path = '../Datasets/Zernikes/Train/'
fnames = os.listdir(experimental_path)
clean_fnames = [fname for fname in fnames if '_c' not in fname]
clean_fnames.sort()
len(clean_fnames)

In [None]:
def json_to_dict(path, destination_dict):
    loaded_dict = json.load(open(path, 'r'))
    for key in loaded_dict.keys():
        destination_dict[key] = loaded_dict[key]

In [None]:
path_combinacion = '../Datasets/Zernikes/Zernikes_combinacion/Zernikes_coef_combinacion.json'
path_puros = '../Datasets/Zernikes/Zernikes_puros/Zernikes_coef_puros.json'
path_random = '../Datasets/Zernikes/Zernikes_random/Zernikes_coef_random.json'
mega_json = {}
json_to_dict(path_combinacion, mega_json)
json_to_dict(path_puros, mega_json)
json_to_dict(path_random, mega_json)
print(len(mega_json))

In [None]:
X = []
y = []
for fname in clean_fnames:
    X.append(cv2.imread(experimental_path + fname, cv2.IMREAD_GRAYSCALE) / 255.0)
    y.append(mega_json[fname])
print("Loaded images: ", len(X))
print("Loaded labels: ", len(y))
X = np.array(X)
y = np.array(y)
print("X shape: ", X.shape) 
print("y shape: ", y.shape)

In [None]:
def swap_columns(arr, frm, to):
    arr[:,[frm, to]] = arr[:,[to, frm]]

swap_columns(y, 1, 2)

In [None]:
BATCH_SIZE = 64
X, y = shuffle(X, y)
experimental_dataset = tf.data.Dataset.from_tensor_slices((X, y))
experimental_dataset = experimental_dataset.batch(BATCH_SIZE)

## 2. Load Models

In [None]:
loss_object = tf.keras.losses.MeanSquaredError()
loss_object_zernike = tf.keras.losses.MeanAbsoluteError()
loss_object_phase = tf.keras.losses.MeanAbsoluteError()

autoencoder_optimizer = tf.keras.optimizers.Adam(learning_rate=0.002)
zernike_decoder_optimizer = tf.keras.optimizers.Adam(learning_rate=0.002)

BETA = 100
ALPHA = 1
LAMBDA = 10

def ae_loss(y_true, y_pred):
    return BETA * loss_object(y_true, y_pred)


def generate_zernike(coef, normalize=True):
    Phi = cart.eval_grid(coef, matrix=True)
    if normalize:
        Phi = normaliza_pos(Phi)
    return Phi

def zernike2phi(coef):
    Phi = tf.map_fn(
        lambda x: generate_zernike(x, normalize=False),
        coef
    )
    B = np.nan_to_num(Phi, nan=0)
    return B

def zernike2cos(coef):
    Phi = tf.map_fn(
        lambda x: generate_zernike(x, normalize=False),
        coef
    )
    Phi = tf.math.cos(Phi)
    Phi = tf.map_fn(lambda x: normaliza_pos(x), Phi)
    return Phi

def phase_loss(y_true, y_pred):
    phi = zernike2phi(y_true)
    hat_phi = zernike2phi(y_pred)
    return ALPHA * loss_object_phase(phi, hat_phi)

def cos_loss(y_true, y_pred):
    phi = zernike2cos(y_true)
    hat_phi = zernike2cos(y_pred)
    return LAMBDA * loss_object_phase(phi, hat_phi)

def zernike_loss(y_true, y_pred):
    return ALPHA * loss_object_zernike(y_true, y_pred)

def zernike2gradient(coef):
    Phi = tf.map_fn(
        lambda x: generate_zernike(x, normalize=False),
        coef
    )
    Phi = tf.convert_to_tensor(
        np.expand_dims(
            np.nan_to_num(Phi), 
            axis=3
        )
    )
    dx, dy = tf.image.image_gradients(Phi)
    return dx, dy

def grad_loss(y_true, y_pred):
    dx_true, dy_true = zernike2gradient(y_true)
    dx_pred, dy_pred = zernike2gradient(y_pred)
    return LAMBDA * (0.5*loss_object(dx_true, dx_pred) + 0.5*loss_object(dy_true, dy_pred))

def total_zernike_loss(y_true, y_pred):
    return phase_loss(y_true, y_pred) + grad_loss(y_true, y_pred) + zernike_loss(y_true, y_pred)

def model_loader(path, optimizer, loss):
    model = tf.keras.models.load_model(path, compile=False)
    model.compile(optimizer=optimizer, 
                  loss=loss)
    return model

In [None]:
encoder = model_loader('../models/ae_encoder.h5',
                       autoencoder_optimizer,
                       ae_loss)
decoder = model_loader('../models/ae_decoder.h5',
                       autoencoder_optimizer,
                       ae_loss)
zernike_decoder = model_loader('../models/zae_zernike_decoder.h5',
                              zernike_decoder_optimizer,
                              total_zernike_loss)

In [None]:
zautoencoder = tf.keras.models.Model(
    inputs=encoder.inputs, 
    outputs=zernike_decoder(encoder.outputs)
)
zautoencoder.compile(optimizer=zernike_decoder_optimizer, 
                     loss=total_zernike_loss)

## 3. Predict with synthetic patterns

In [None]:
np_config.enable_numpy_behavior()
tf.config.run_functions_eagerly(True)

def error_estimator(y_true, y_pred):
    return np.sqrt(np.abs(y_true - y_pred))

In [None]:
samples = [1, 2, 3, 4, 5]
example_input, example_target = next(iter(synthetic_dataset.take(100)))
zernike_coefs = zautoencoder.predict(example_input)
generated_zernike = zernike2phi(zernike_coefs)
real_zernike = zernike2phi(example_target)
plt.figure(figsize=(20, 16))
for i in range(5):
    y_true = normaliza_pos(real_zernike[samples[i]])*MASK
    y_pred = normaliza_pos(generated_zernike[samples[i]])*MASK
    error = error_estimator(y_true, y_pred)*MASK
    experimental_pattern = example_input[samples[i]]
    plt.subplot(4, 5, i + 1)
    plt.imshow(experimental_pattern, origin='lower')
    # plt.colorbar(fraction=0.046)
    plt.axis('off')
    plt.title(r'$I_{}$'.format(i + 1), fontdict={'fontsize': 20}, y=-0.15)
    plt.subplot(4, 5, i + 6)
    plt.imshow(y_true, origin='lower')
    plt.colorbar(fraction=0.046)
    plt.axis('off')
    plt.title(r'$\phi_{}$'.format(i + 1), fontdict={'fontsize': 20}, y=-0.15)
    plt.subplot(4, 5, i + 11)
    plt.imshow(y_pred, origin='lower')
    plt.colorbar(fraction=0.046)
    plt.axis('off')
    plt.title(r'$\hat\phi_{}$'.format(i + 1), fontdict={'fontsize': 20}, y=-0.18)
    plt.subplot(4, 5, i + 16)
    plt.imshow(error, origin='lower')
    plt.colorbar(fraction=0.046)
    plt.axis('off')
    plt.title(r'$\epsilon_{}$'.format(i + 1), fontdict={'fontsize': 20}, y=-0.15)
plt.show()

## 4. Predict With Experimental Patterns

In [None]:
samples = [1, 2, 3, 4, 5]
example_input, example_target = next(iter(experimental_dataset.take(100)))
zernike_coefs = zautoencoder.predict(example_input)
generated_zernike = zernike2phi(zernike_coefs)
real_zernike = zernike2phi(example_target)
plt.figure(figsize=(20, 16))
for i in range(5):
    y_true = normaliza_pos(real_zernike[samples[i]])*MASK
    y_pred = normaliza_pos(generated_zernike[samples[i]])*MASK
    error = error_estimator(y_true, y_pred)*MASK
    experimental_pattern = example_input[samples[i]]
    plt.subplot(4, 5, i + 1)
    plt.imshow(experimental_pattern, origin='lower')
    # plt.colorbar(fraction=0.046)
    plt.axis('off')
    plt.title(r'$I_{}$'.format(i + 1), fontdict={'fontsize': 20}, y=-0.15)
    plt.subplot(4, 5, i + 6)
    plt.imshow(y_true, origin='lower')
    plt.colorbar(fraction=0.046)
    plt.axis('off')
    plt.title(r'$\phi_{}$'.format(i + 1), fontdict={'fontsize': 20}, y=-0.15)
    plt.subplot(4, 5, i + 11)
    plt.imshow(y_pred, origin='lower')
    plt.colorbar(fraction=0.046)
    plt.axis('off')
    plt.title(r'$\hat\phi_{}$'.format(i + 1), fontdict={'fontsize': 20}, y=-0.18)
    plt.subplot(4, 5, i + 16)
    plt.imshow(error, origin='lower')
    plt.colorbar(fraction=0.046)
    plt.axis('off')
    plt.title(r'$\epsilon_{}$'.format(i + 1), fontdict={'fontsize': 20}, y=-0.15)
plt.show()