This is the PyTorch version of FEA-net rewritten from Houpu's TensorFlow code.

Note that only the elasticity problems are implemented (not implement yet).

## Imports

In [8]:
import os
import numpy as np
import scipy.io as sio
import matplotlib.pyplot as plt
import torch
import torchvision
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

%matplotlib inline

In [9]:
# Helper function for inline image display
def matplotlib_imshow(img):
    img = img.mean(dim=0)
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(npimg, cmap="Greys")

In [10]:
class ElasticDataset(Dataset):
    """ Currently only support import one loading case with noise precent """
    def __init__(self, datafile, percent=0.0):
        data = sio.loadmat(datafile)
        load = np.stack([-data['fx']/1e6, -data['fy']/1e6], 0).astype('float32')
        resp = np.stack([data['ux']*1e6, data['uy']*1e6], 0).astype('float32')
        
        # convert to PyTorch tensor
        self.load = torch.from_numpy(load)
        self.resp = torch.from_numpy(resp)
        
        # add some noises
        self.percent = percent
        noise = self.percent * np.random.normal(size=self.load.shape)
        self.loading_w_noise = torch.from_numpy((1+noise) * load)
        noise = self.percent * np.random.normal(size=self.load.shape)
        self.response_w_noise = torch.from_numpy((1+noise) * resp)

        # material property: E, mu
        self.rho = [212e3, 0.288] 

    def __len__(self):
        return self.load.shape[0]

    def __getitem__(self, idx):
        
        data = {'num_node': self.load.shape[1],
                'rho': self.rho,
                'train_load': self.loading_w_noise,
                'train_resp': self.response_w_noise,
                'test_load': self.load,
                'test_resp': self.resp,
                } 
        
        return data

In [11]:
data_set = ElasticDataset('../data/elasticity/center_crack_36x36_xy.mat', percent=0.001)
data_loader = DataLoader(data_set)

In [12]:
data = next(iter(data_loader))

Helper functions

In [13]:
from functools import reduce

def apply_n_times(f, n):
    """Returns a new function which is f folded n times: f(f(f(f(...f(f(n))...))))

    Usage
    -----

    apply_n_times(lambda x: x**2, 3)(2) == 256
    """

    def f_folded_n_times(x):
        return reduce(lambda fx, _: f(fx), range(n), x)
    return f_folded_n_times

In [None]:
class Jacobi_block():
    def __init__(self, data, cfg):
        self.batch_size = 1 # only use 1 training example
        self.num_node = data['num_node']
        self.E, self.mu, self.k, self.alpha = self.rho = data['rho']
        self.set_bc()

    def set_bc(self):
        ''' bc_mask: 1.0 for inner points, 0.0 elsewhere '''
        ''' bc_values: assign values for boundary points, 0.0 elsewhere '''
        self.bc_mask = np.ones((self.batch_size, 1, self.num_node, self.num_node))
        self.bc_mask[:, :, 0, :] = 0
        self.bc_mask[:, :, -1, :] = 0
        self.bc_mask[:, :, :, 0] = 0
        self.bc_mask[:, :, :, -1] = 0
        self.bc_values = np.zeros((self.batch_size, 1, self.num_node, self.num_node))
        
    def boundary_padding(self,x):
        ''' special symmetric boundary padding: mirror filling '''
        ''' x is a 2D pytorch tensor '''
        pad = torch.nn.ReflectionPad2d(padding=1)
        padded_x = pad(x)
        return padded_x

    def iter_step(self, u_in):
        self.FEA_conv = torch.nn.Conv2d(2, 4, 3, padding='valid')
        self.FEA_conv.weight = torch.nn.Parameter

        for param in self.FEA_conv.parameters():
            param.requires_grad = False
        
        u_in = self.FEA_conv(u_in) * self.bc_mask + self.bc_values

    def apply(self, u0, max_itr=10):
        ''' u0 is the initial displacement values '''
        if u0 is None:
            u = torch.zeros(self.batch_size, 1, self.num_node, self.num_node)
        else:
            u = u0
        
        padded_u = self.boundary_padding(u)
             

    def get_matrix_elast(self):
        ''' Shape of pytorch filters: (output channel, input channel, size, size) '''
        ''' Input channel: 2=Ux,Uy; Output channel: 4=Vxx,Vxy,Vyx,Vyy '''
        E, mu = self.E, self.mu
        cost_coef = E / 16. / (1 - mu ** 2)
        self.wxx = cost_coef * np.asarray([
            [-4 * (1 - mu / 3.), 16 * mu / 3., -4 * (1 - mu / 3.)],
            [-8 * (1 + mu / 3.), 32. * (1 - mu / 3.), -8 * (1 + mu / 3.)],
            [-4 * (1 - mu / 3.), 16 * mu / 3., -4 * (1 - mu / 3.)],
        ], dtype='float32').reshape(1,1,3,3)

        self.wxy = cost_coef * np.asarray([
            [2 * (mu + 1), 0, -2 * (mu + 1)],
            [0, 0, 0],
            [-2 * (mu + 1), 0, 2 * (mu + 1)],
        ], dtype='float32').reshape(1,1,3,3)

        self.wyx = cost_coef * np.asarray([
            [2 * (mu + 1), 0, -2 * (mu + 1)],
            [0, 0, 0],
            [-2 * (mu + 1), 0, 2 * (mu + 1)],
        ], dtype='float32').reshape(1,1,3,3)

        self.wyy = cost_coef * np.asarray([
            [-4 * (1 - mu / 3.), -8 * (1 + mu / 3.), -4 * (1 - mu / 3.)],
            [16 * mu / 3., 32. * (1 - mu / 3.), 16 * mu / 3.],
            [-4 * (1 - mu / 3.), -8 * (1 + mu / 3.), -4 * (1 - mu / 3.)],
        ], dtype='float32').reshape(1,1,3,3)

        self.d_matrix = np.stack([self.wxx[0,0,1,1], self.wyy[0,0,1,1], 1])  # diagonal matrix containing x, y components



In [None]:
learning_rate = 0.009
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# build a jacobi network

# inverse generative
#   forward_pass(resp_pl)
#   tf.reduce_mean()
#   pred_err
#   get_optimizer()
#       AdamOptimizer(lr = 0.1, beta1 = 0.5)
#   train epoh

# forward solving
#   jacobi.apply()
#       max_itr = 10
#   get_loss()
#   get_optimizer()

In [None]:
class FEA_Net_h():
    # NOTICE: right now for homogeneous anisotropic material only!!
    def __init__(self, data, cfg):
        # set learning rate
        self.lr = cfg['lr']
        self.num_epoch = cfg['epoch']

        # data related
        self.num_node = data['num_node']
        self.E, self.mu, self.k, self.alpha = self.rho = data['rho'] 

        # 3 dimensional in and out, defined on the nodes
        self.load_pl = tf.placeholder(tf.float32, shape=(None, data['num_node'], data['num_node'], 3), name='load_pl')
        self.resp_pl = tf.placeholder(tf.float32, shape=(None, data['num_node'], data['num_node'], 3), name='resp_pl')

        # get filters
        self.get_w_matrix()
        self.load_pred = self.u2v_map()


    def get_w_matrix(self):
        self.get_w_matrix_elast()
        self.get_w_matrix_thermal()
        self.get_w_matrix_coupling()
        self.apply_physics_constrain()

    def apply_physics_constrain(self):
        # known physics
        self.wxx_tf = tf.constant(self.wxx_ref)
        self.wyy_tf = tf.constant(self.wyy_ref)
        self.wxy_tf = tf.constant(self.wxy_ref)
        self.wyx_tf = tf.constant(self.wyx_ref)
        self.wtt_tf = tf.constant(self.wtt_ref)
        self.wtx_tf = tf.constant(self.wtx_ref)
        self.wty_tf = tf.constant(self.wty_ref)

        # unknown physics
        self.wxt_np = np.zeros_like(self.wxt_ref)  # * 1.9
        self.wyt_np = np.zeros_like(self.wyt_ref)  # * 1.9

        # TF variable vector
        self.trainable_var_np = np.concatenate([self.wxt_np.flatten(),
                                                self.wyt_np.flatten()], 0)
        self.trainable_var_ref = np.concatenate([self.wxt_ref.flatten(),
                                                self.wyt_ref.flatten()], 0)
        self.trainable_var_pl = tf.placeholder(tf.float32, shape=(9 * 2,), name='filter_vector')

        wxt_tf, wyt_tf = tf.split(self.trainable_var_pl, 2)
        self.wxt_tf = tf.reshape(wxt_tf, (3, 3, 1, 1))
        self.wyt_tf = tf.reshape(wyt_tf, (3, 3, 1, 1))

        # add constrains
        self.singula_penalty = (tf.reduce_sum(self.wxt_tf)
                              + tf.reduce_sum(self.wyt_tf)
                                )**2
        # self.E = tf.clip_by_value(self.E, 0, 1)
        # self.mu = tf.clip_by_value(self.mu, 0, 0.5)

        # tf.nn.conv2d filter shape: [filter_height, filter_width, in_channels, out_channels]
        self.w_filter = tf.concat([tf.concat([self.wxx_tf, self.wxy_tf, self.wxt_tf], 2),
                                   tf.concat([self.wyx_tf, self.wyy_tf, self.wyt_tf], 2),
                                   tf.concat([self.wtx_tf, self.wty_tf, self.wtt_tf], 2)],
                                  3)

        self.w_filter_ref = np.concatenate([np.concatenate([self.wxx_ref, self.wxy_ref, self.wxt_ref], 2),
                                   np.concatenate([self.wyx_ref, self.wyy_ref, self.wyt_ref], 2),
                                   np.concatenate([self.wtx_ref, self.wty_ref, self.wtt_ref], 2)],
                                  3)

    def get_w_matrix_coupling(self):
        E, v = self.E, self.mu
        alpha = self.alpha
        self.wtx_ref = np.zeros((3,3,1,1), dtype='float32')
        self.wty_ref = np.zeros((3,3,1,1), dtype='float32')
        coef = E * alpha / (6*(v-1)) / 400 *1e6
        self.wxt_ref = coef * np.asarray([[1, 0, -1],
                                      [4, 0, -4],
                                      [1, 0, -1]]
                                     , dtype='float32').reshape(3,3,1,1)

        self.wyt_ref = coef * np.asarray([[-1, -4, -1],
                                      [0, 0, 0],
                                      [1, 4, 1]]
                                     , dtype='float32').reshape(3,3,1,1)

    def get_w_matrix_thermal(self):
        w = -1/3. * self.k * np.asarray([[1., 1., 1.], [1., -8., 1.], [1., 1., 1.]])
        w = np.asarray(w, dtype='float32')
        self.wtt_ref = w.reshape(3,3,1,1)

    def get_w_matrix_elast(self):
        E, mu = self.E, self.mu
        cost_coef = E / 16. / (1 - mu ** 2)
        wxx = cost_coef * np.asarray([
            [-4 * (1 - mu / 3.), 16 * mu / 3., -4 * (1 - mu / 3.)],
            [-8 * (1 + mu / 3.), 32. * (1 - mu / 3.), -8 * (1 + mu / 3.)],
            [-4 * (1 - mu / 3.), 16 * mu / 3., -4 * (1 - mu / 3.)],
        ], dtype='float32')

        wxy = wyx = cost_coef * np.asarray([
            [2 * (mu + 1), 0, -2 * (mu + 1)],
            [0, 0, 0],
            [-2 * (mu + 1), 0, 2 * (mu + 1)],
        ], dtype='float32')

        wyy = cost_coef * np.asarray([
            [-4 * (1 - mu / 3.), -8 * (1 + mu / 3.), -4 * (1 - mu / 3.)],
            [16 * mu / 3., 32. * (1 - mu / 3.), 16 * mu / 3.],
            [-4 * (1 - mu / 3.), -8 * (1 + mu / 3.), -4 * (1 - mu / 3.)],
        ], dtype='float32')

        self.wxx_ref = wxx.reshape(3,3,1,1)
        self.wxy_ref = wxy.reshape(3,3,1,1)
        self.wyx_ref = wyx.reshape(3,3,1,1)
        self.wyy_ref = wyy.reshape(3,3,1,1)

    def boundary_padding(self,x):
        ''' special symmetric boundary padding '''
        left = x[:, :, 1:2, :]
        right = x[:, :, -2:-1, :]
        upper = tf.concat([x[:, 1:2, 1:2, :], x[:, 1:2, :, :], x[:, 1:2, -2:-1, :]], 2)
        down = tf.concat([x[:, -2:-1, 1:2, :], x[:, -2:-1, :, :], x[:, -2:-1, -2:-1, :]], 2)
        padded_x = tf.concat([left, x, right], 2)
        padded_x = tf.concat([upper, padded_x, down], 1)
        return padded_x

    def u2v_map(self):
        padded_resp = self.boundary_padding(self.resp_pl)  # for boundary consideration
        wx = tf.nn.conv2d(input=padded_resp, filter=self.w_filter, strides=[1, 1, 1, 1], padding='VALID')
        return wx

    def get_loss(self):
        self.diff = self.load_pred - self.load_pl
        diff_not_on_bc = self.apply_bc(self.diff)
        self.l1_error = tf.reduce_mean(diff_not_on_bc**2)
        # self.l1_error = tf.reduce_mean((self.diff_not_on_bc*self.resp_pl[:,1:-1,1:-1,:])**2)
        self.loss = self.l1_error #+ self.singula_penalty
        return self.loss

    def get_grad(self):
        self.rho_grads = tf.gradients(self.loss, self.trainable_var_pl)
        return self.rho_grads

    def get_hessian(self):
        self.rho_hessian = tf.hessians(self.loss, self.trainable_var_pl)
        return self.rho_hessian

    # V2U mapping functions
    def apply_bc(self, x):
        x_bc = tf.pad(x[:, 1:-1, 1:-1, :], ((0,0), (1, 1),(1, 1), (0, 0)), "constant")  # for boundary consideration
        return x_bc

    def FEA_conv(self, w, x):
        padded_input = self.boundary_padding(x)  # for boundary consideration
        wx = tf.nn.conv2d(input=padded_input, filter=w, strides=[1, 1, 1, 1], padding='VALID')
        wx_bc = wx * self.bc_mask # boundary_corrrect
        return wx_bc

    def v2u_layer(self, w, x):
        wx = self.FEA_conv(w, x)
        wx_bc = self.apply_bc(wx)
        return wx_bc

    def get_dmat(self):
        d_matrix = tf.stack([self.wxx_tf[1,1,0,0], self.wyy_tf[1,1,0,0], self.wtt_tf[1,1,0,0]])  # x, y, and t components
        return tf.reshape(d_matrix,(1,1,1,3))

    def get_bc_mask(self):
        bc_mask = np.ones_like(self.new_load)
        bc_mask[:, 0, :, :] /= 2
        bc_mask[:, -1, :, :] /= 2
        bc_mask[:, :, 0, :] /= 2
        bc_mask[:, :, -1, :] /= 2
        return bc_mask

    def init_solve(self, load, omega):
        self.omega = omega
        self.new_load = load
        self.d_matrix = self.get_dmat()
        self.bc_mask = self.get_bc_mask()
        self.u_in = tf.placeholder(tf.float32, load.shape, name='u_in')
        self.u_out = self.apply(self.u_in)

    def apply(self, u_in):
        wx = self.v2u_layer(self.w_filter, u_in)
        u_out = self.omega * (self.new_load - wx) / self.d_matrix +  u_in
        return u_out

## Setting

In [None]:
import os
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'  # or any {'0', '1', '2'}

In [None]:
class Evaluator(object):
    def __init__(self, model, data):
        self.model = model

        self.data = data
        self.init_w = np.zeros((3,3,1,1))

        self.loss_value = None
        self.grads_value = None

        self.loss_tf = self.model.get_loss()
        self.hessian_tf = self.model.get_hessian()
        self.grad_tf = self.model.get_grad()
        self.initial_graph()

    def initial_graph(self):
        # initialize
        FLAGS = tf.app.flags.FLAGS
        tfconfig = tf.ConfigProto(
            allow_soft_placement=True,
            log_device_placement=True,
        )
        tfconfig.gpu_options.allow_growth = True
        self.sess = tf.Session(config=tfconfig)
        init = tf.global_variables_initializer()
        self.sess.run(init)

    def get_loss(self, w):
        self.feed_dict = {self.model.load_pl: data['train_load'],
                          self.model.resp_pl: data['train_resp'],
                          self.model.trainable_var_pl: w}
        self.loss_value = self.sess.run(self.loss_tf, self.feed_dict).astype('float64')
        return self.loss_value

    def get_grads(self, w):
        self.feed_dict = {self.model.load_pl: data['train_load'],
                          self.model.resp_pl: data['train_resp'],
                          self.model.trainable_var_pl: w}
        self.grads_value = self.sess.run(self.grad_tf, self.feed_dict)[0].flatten().astype('float64')
        return self.grads_value

    def get_hessian(self, w):
        self.feed_dict = {self.model.load_pl: data['train_load'],
                          self.model.resp_pl: data['train_resp'],
                          self.model.trainable_var_pl: w}
        self.hessian_value = self.sess.run(self.hessian_tf, self.feed_dict)[0].astype('float64')
        return self.hessian_value

    def get_pred(self,w):
        feed_dict = {self.model.load_pl: data['train_load'],
                      self.model.resp_pl: data['train_resp'],
                      self.model.trainable_var_pl: w.astype('float32')}
        pred_value = self.sess.run(self.model.load_pred, feed_dict)
        return pred_value

    def run_newton(self):
        from scipy.optimize import minimize
        self.result = minimize(self.get_loss, self.model.trainable_var_np, method='Newton-CG',
                          jac=self.get_grads, hess=self.get_hessian,
                          options={'xtol': 1e-8, 'disp': True})
        return self.result.x

    def visualize(self, w):
        pred_value = self.get_pred(w)
        plt.figure(figsize=(6, 6))
        idx = 0  # which data to visualize
        for i in range(3):
            plt.subplot(4, 3, i + 1)
            plt.imshow(self.data['test_resp'][idx, 1:-1, 1:-1, i])
            plt.colorbar()
            plt.subplot(4, 3, 3 + i + 1)
            plt.imshow(self.data['test_load'][idx, 1:-1, 1:-1, i])
            plt.colorbar()
            plt.subplot(4, 3, 6 + i + 1)
            plt.imshow(pred_value[idx, 1:-1, 1:-1, i])
            plt.colorbar()
            plt.subplot(4, 3, 9 + i + 1)
            plt.imshow(self.data['test_load'][idx, 1:-1, 1:-1, i] - pred_value[idx, 1:-1, 1:-1, i])
            plt.colorbar()
        plt.show()

    def init_solve(self, load, omega=2/3.):
        self.model.init_solve(load, omega)
        self.solution = {'itr':[], 'loss': [], 'pred':[]}

    def run_forward(self, filter, pred_i, resp_ref=None, max_itr=100):

        st = 0 if self.solution['itr'] == [] else self.solution['itr'][-1]+10
        for itr in tqdm(range(st, st+max_itr, 1)):
            feed_dict = {self.model.u_in: pred_i, self.model.trainable_var_pl:filter}
            pred_i = self.sess.run(self.model.u_out, feed_dict)
            if itr%1 == 0:
                self.solution['itr'] += [itr]
                self.solution['pred'] += [pred_i]
                if resp_ref is not None:
                    pred_err_i = np.sqrt(np.sum((resp_ref - pred_i) ** 2)) / np.sqrt(  np.sum((resp_ref) ** 2))
                    print("iter:{}  pred_err: {}".format(itr, np.mean(pred_err_i)))
                    self.solution['loss'] += [np.mean(pred_err_i)]

        return pred_i

# Load data

In [None]:
data = load_data(percent=0.1)#snr=100

# Build the network

In [None]:
cfg = {'lr': 0.001,
        'epoch': 1,
        }
model = FEA_Net_h(data,cfg)

# Train the network

In [None]:
evaluator = Evaluator(model, data)
result = evaluator.run_newton()#run_trust_ncg

# Visualize training result

In [None]:
evaluator.visualize(result)
for i in range(2):
    mat = result[9*i:9*(i+1)]
    print(mat.reshape(3,3))
    print(np.sum(mat))
print(model.wxt_ref.reshape(3,3))
print(np.sum(model.wxt_ref))
print(model.wyt_ref.reshape(3,3))
print(np.sum(model.wyt_ref))

In [None]:
evaluator.init_solve(load=data['test_load'], omega=2/3.)
pred_i = np.zeros_like(data['test_resp'])  # data['test_resp']#
resp_ref = data['test_resp']
pred_i = evaluator.run_forward(model.trainable_var_ref, pred_i, resp_ref, max_itr=4000)
s0 = evaluator.solution

# Test the model

In [None]:
evaluator.init_solve(load=data['test_load'], omega=2/3.)
pred_i = np.zeros_like(data['test_resp'])  # data['test_resp']#
pred_i = evaluator.run_forward(result, pred_i, resp_ref, max_itr=4000)
s1 = evaluator.solution

plt.figure()
plt.semilogy(s0['itr'], s0['loss'], label='ref')
plt.semilogy(s1['itr'], s1['loss'], label='pred')
plt.legend()
plt.show()