## Gerchberg Saxton Neural Network

In [1]:
import torch
import torchvision
from torchvision import transforms, datasets
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import random
import time
from scipy import linalg as la
import plotly.graph_objects as go

# Gerchberg Saxton Algorithm: Inital Phase Portion

In [2]:
# Load Python Files
# saved as arrays
f = np.load('f.npy')
F = np.load('FF.npy')
f_phases = np.load('f_phases.npy')

In [3]:
np.random.seed(2)
# omg is the random initial phase generator 
# needed to initialize the GS Algorithm 
omg = 2 * (np.pi * np.random.random(size=(len(f[1])))) - np.pi

In [4]:
# Initialization of the ANN uses the GS Algorithm for the initial phase estimate:
# Takes in:
    # length of the signal f (N)
    # random iniial phases (omg)
    # mag of f and F (f_mag) and (F_mag)
    
def GSA(N, omg, f_mag, F_mag):
    
    #Maximum number of iterations
    maxiter = 256
    
    # Error Tolerance
    err_tol = 10 ** -10
    
    # where the error for each iteration will be stored
    # The GS Algorithm computes the error twice per iteration
    error = []
    
    # First estimate in image plane
    # x has two parts: the magnitude f (from the image plane) and a random initial phase (omg)
    # x is a complex valued function
    x = f_mag * np.exp(1j * omg) #size 261x256
    k = 0

    start = time.time()
    
    ##################################
    X = np.fft.fft(x) / N
    error.append(np.sqrt(N) * la.norm((np.abs(X) - F_mag), 2))
    Y = F_mag * np.exp(1j * np.angle(X))
    y = N * np.fft.ifft(Y)
    error.append(la.norm((np.abs(y) - f_mag), 2))
    x = f_mag * np.exp(1j * np.angle(y))
    k += 1
    ##################################
    
    while (np.abs(error[k-1] - error[k]) > err_tol and np.abs(error[k]) > err_tol) and k <= maxiter:
        
        ## Step 1: Equation 5 (Estimate after Fourier Transform)
        # X is the estimate in the diffraction plane
        X = np.fft.fft(x) / N

        # Error is defined as the diff between estimated magnitude (X) and known magnitude (F_mag)
        # Diffraction plane error
        error.append(np.sqrt(N) * la.norm((np.abs(X) - F_mag), 2))
        
        ## Step 2: (Equation 6)
        Y = F_mag * np.exp(1j * np.angle(X))

        # Step 3: (Equation 7) IDTF of Y_k
        y = N * np.fft.ifft(Y)

        error.append(la.norm((np.abs(y) - f_mag), 2))
        
        # Step 4: (Equation 4) Change first estimate to then begin iterative process:
        x = f_mag * np.exp(1j * np.angle(y))

        # increment index
        k += 1
        
        
    # Estimated phases:
    e_phases = np.angle(x)
    
    # Doing necessary changes to recovered phases:
    # manually corrects estimated phases to 0 if they are equal to pi

    fix1 = np.where(e_phases == np.pi)
    fix2 = np.where(e_phases == -np.pi)
    
    e_phases[fix1] = 0
    e_phases[fix2] = 0

    return e_phases, error, k-1

In [5]:
#These two will be matrices that hold the information of the initial phases and E_phases
# Will both be of dimension 261x256
start = time.time()
Initial_Phase = np.zeros((1,256))

for i in range(len(f)):
    Initial_Phase = np.append(Initial_Phase, GSA(256,omg, np.abs(f[i]), np.abs(F[i]))[0])
    
# Putting the Initial Phases in a 261x256 matrix
Initial_Phase = Initial_Phase.reshape((-1,256))
Initial_Phase = np.delete(Initial_Phase, [0], axis=0)
end = time.time()
print(Initial_Phase.shape)
print('Time to compute the GS Algorithm for 261 different functions is ', end-start, "seconds")

(261, 256)
Time to compute the GS Algorithm for 261 different functions is  7.567270278930664 seconds


In [57]:
#Graphing all the Initial Phases from the GS Algorithm
fig = go.Figure()
x = np.linspace(-1,1,256)
for i in range(len(Initial_Phase[0])):
    fig.add_trace(go.Scatter(x=x, y=Initial_Phase[i], mode='markers'))# name='Original'))


fig.update_layout(title='Initial Phases from GS Algorithm', xaxis_title='t', yaxis_title='Initial Phases')    
fig

In [7]:
# the first column labels each function with a number 1-(however many rows)
def label(Initial_Phase):
    # number of rows in initial phase matrix
    lenIP = len(Initial_Phase)
    
    values = np.array(range(1,lenIP+1))
    
    # creates a new matrix with each row labeled 1-261
    labeled_IP = np.insert(Initial_Phase, 0, values, axis=1)
    return labeled_IP

# Neural Network Portion

In [10]:
# define necessary processing functions
#random_state controles the shuffling aplied to the data before splitting it
def train_test_val(X, Y, test_size=0.6, random_state=22):

    x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size = test_size, random_state=random_state)
    
    X_train=torch.Tensor(x_train)
    Y_train=torch.Tensor(y_train)
    X_test=torch.Tensor(x_test)
    Y_test=torch.Tensor(y_test)
    
    return X_train, Y_train, X_test, Y_test

def create_loader(X,Y,batch_size=32,shuffle=True):
    data = torch.utils.data.TensorDataset(X,Y)
    data_loader = torch.utils.data.DataLoader(data,batch_size=batch_size,shuffle=shuffle)
    return data_loader

In [11]:
#obs is the initial phases from GS
obs = label(Initial_Phase)
#truth is the acutal real phases
truth = label(f_phases)
# test_percentage is the percentage of the data in the testing set
test_percentage = 0.6
# create train/test splits
X_train, y_train, X_test, y_test = train_test_val(obs, truth, test_size = test_percentage, random_state=22)

In [12]:
# use DataLoader to load data 
bs = 32
phase_train = create_loader(X_train[:,1:], y_train[:,1:], batch_size=bs)
phase_test = create_loader(X_test[:,1:], y_test[:,1:], batch_size=bs)

In [13]:
# now we will define a model
# This line will allow us to use the features of nn.module 
class Net(nn.Module):
    def __init__(self):
        # this is where we inherit class attributes of nn.module
        super().__init__() 
        # fully connected layer 1
        self.fc1 = nn.Linear(256, 256, bias=False)
        # fully connected layer 2
        self.fc2 = nn.Linear(256, 256, bias=False)
        # output layer
        self.o = nn.Linear(256, 256, bias=False)

        
    # define forward method where x is the data
    def forward(self, x):
        # output of layer 1
        x = F.relu(self.fc1(x))
        # output of layer 2
        x = F.relu(self.fc2(x))
        # output of final layer
        ### This is where this model differs from the other: 
        ### Also uses ReLu in final layer
        x = F.relu(self.o(x))
        return x
        return Net() #returns network

In [14]:
model = Net()
model

Net(
  (fc1): Linear(in_features=256, out_features=256, bias=False)
  (fc2): Linear(in_features=256, out_features=256, bias=False)
  (o): Linear(in_features=256, out_features=256, bias=False)
)

In [15]:
# next, we need a loss function and an optimizer
import torch.optim as optim

loss_fun = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr = 0.01)

In [59]:
import torch.nn.functional as F

start = time.time()
# Let's train the model
epoch_number=3000
for epoch in range(epoch_number):
    for batch in phase_train:
        X, Y = batch
        #print(X.shape)
        # zero gradients
        model.zero_grad()
        # input the data
        #out is what the NN is outputting
        out = model(X.view(-1, 256))
        # calculate the loss 
        loss = loss_fun(out, Y)
        # calculate gradients
        loss.backward()
        # update parameters
        optimizer.step()
    if epoch % 500 == 0:
        print(loss)
end = time.time()
print("Time it takes ANN to train: ", end-start)

tensor(0.0411, grad_fn=<MseLossBackward>)
tensor(0.0737, grad_fn=<MseLossBackward>)
tensor(0.1131, grad_fn=<MseLossBackward>)
tensor(0.0493, grad_fn=<MseLossBackward>)
tensor(0.0481, grad_fn=<MseLossBackward>)
tensor(0.0581, grad_fn=<MseLossBackward>)
Time it takes ANN to train:  45.86624836921692


In [61]:
# output of the test function
out_test = model(X_test[:,1:])

#loss function is the MSE
# This is the loss with the test
# Cell above is the loss with the train
loss_fun(out_test, y_test[:,1:])

tensor(0.0550, grad_fn=<MseLossBackward>)

In [62]:
# outputs of the train function
out_train = model(X_train[:,1:])
#loss function is the MSE of the train functions
loss_train = loss_fun(out_train, y_train[:,1:])
loss_train

tensor(0.0669, grad_fn=<MseLossBackward>)

In [19]:
# This provides the indices of the test set that are used. Helps so that we can compare them to the truth 
# values, initial phases, etc.
test_indices=[]
for each in y_test[:,0].numpy():
    Integers = int(each)
    test_indices.append(Integers)
#print(test_indices)

In [20]:
# This provides the indices of the train set that are used. Helps so that we can compare them to the truth 
# values, initial phases, etc.
train_indices=[]
for each in y_train[:,0].numpy():
    Integers = int(each)
    train_indices.append(Integers)
#print(train_indices)

In [21]:
# Find a way to label out_test in the exact same way you labeled X_test
label_out_test = np.insert(out_test.detach().numpy(), 0, X_test[:,0], axis=1)

label_out_train = np.insert(out_train.detach().numpy(), 0, X_train[:,0], axis=1)

In [23]:
# This will graph all the trained data
fig_train = go.Figure()
x2  = np.linspace(-1,1,256)

# choose an r between 0-103
r = 46
fig_train.add_trace(go.Scatter(x=x2, y=out_train.detach().numpy()[r,1:], mode='markers', name='Train'))
fig_train.add_trace(go.Scatter(x=x2, y=y_train.numpy()[r,1:], mode='markers', name='Truth'))
#finds the right index in Initial_Phases
fig_train.add_trace(go.Scatter(x=x2, y=Initial_Phase[train_indices[r]], mode='markers', name='Initial Phase'))

fig_train.update_layout(title='Trained Data', xaxis_title='t', yaxis_title='Phases')    
fig_train

In [54]:
# This will graph all the trained data
fig_test = go.Figure()
x2  = np.linspace(-1,1,256)

#choose an r between 0-156
r=27
fig_test.add_trace(go.Scatter(x=x2, y=out_test.detach().numpy()[r,1:], mode='markers', name='Test'))
fig_test.add_trace(go.Scatter(x=x2, y=y_test.numpy()[r,1:], mode='markers', name='Truth'))
#finds the right index in Initial_Phases
fig_test.add_trace(go.Scatter(x=x2, y=Initial_Phase[test_indices[r]], mode='markers', name='Initial Phase'))

fig_test.update_layout(title='Test Data', xaxis_title='t', yaxis_title='Phases')    
fig_test

In [26]:
# These are the true Phases
#This graphs the last 8 Y values. These correspond to the "Truth" or the real phases for each respective funciton
fig_Y = go.Figure()
x2  = np.linspace(-1,1,256)
for i in range(8):
    fig_Y.add_trace(go.Scatter(x=x2, y=Y.detach().numpy()[i], mode='markers'))#, name='Out'))


fig_Y.update_layout(title='Truth Values', xaxis_title='t', yaxis_title='Phases')    
fig_Y

In [27]:
# This is from the trained data!! So the results will be much better than the test data
#This graphs the "outs" or the estimated phases for the last 8 funcitons.
fig_out = go.Figure()
x2 = np.linspace(-1,1,256)
for i in range(len(out)):
    # This graphs the trained output values that we get
    fig_out.add_trace(go.Scatter(x=x2, y=out.detach().numpy()[i], mode='markers', name='trained NN Output'))

fig_out.update_layout(title='Trained Output Values', xaxis_title='t', yaxis_title='Phases') 
fig_out

In [63]:
#This will plot the truth and the trained data side by side
# only gives the last 8 plots from the NN (bc of the size of the batch)
train_output = go.Figure()
x2 = np.linspace(-1,1,256)
for i in range(len(out)):
    train_output.add_trace(go.Scatter(x=x2, y=Y.detach().numpy()[i], mode='markers', name='Truth'))
    train_output.add_trace(go.Scatter(x=x2, y=out.detach().numpy()[i], mode='markers', name='GS/ANN Prediction'))
    
train_output.update_layout(title='Trained Output Values vs. Truth Values (ReLu in output layer)', xaxis_title='t', yaxis_title='Phases') 
train_output

In [64]:
### This is from the test data, so the results will not be as accurate as the data 
### that was actually trained
fig_test = go.Figure()
x2  = np.linspace(-1,1,256)
r = 27
#NN Prediction
fig_test.add_trace(go.Scatter(x=x2, y=out_test.detach().numpy()[r], mode='markers'
                              ,name='GS/ANN Test Prediction'))
#Initial Phase Input from GS
fig_test.add_trace(go.Scatter(x=x2, y=X_test.detach().numpy()[r,1:], mode='markers'
                              , name='GS Estimated Phase')) 

#truth
fig_test.add_trace(go.Scatter(x=x2, y=y_test.detach().numpy()[r,1:], mode='markers'
                              , name='Truth Value')) 

fig_test.update_layout(title='GS/ANN Test Values', xaxis_title='t', yaxis_title='Phases')
fig_test

In [72]:
# Find the MSE between the GS Intial Phases and the True Phases
actual = y_test.detach().numpy()[r,1:]
prediction = X_test.detach().numpy()[r,1:]

def mse(actual, pred): 
    actual, pred = np.array(actual), np.array(pred)
    return np.square(np.subtract(actual,pred)).mean() 

In [73]:
mse(actual, prediction)

2.4110403