In [None]:
!nvidia-smi -L

In [None]:
!pip install pyDOE

In [None]:
from google.colab import drive

drive.mount('/content/gdrive/', force_remount=True)

In [None]:
import os
os.chdir('./gdrive/MyDrive/')

In [None]:
import time
import numpy as np
import torch
from pyDOE import lhs
from torch import nn
from torch.nn import functional as F
from torch import optim
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter 

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
def equi_distant(dims, count):
    if dims == 1:
        return np.linspace(0, 1, count + 2)[1:-1].reshape(-1, 1)
    if dims == 2:
        count = int(np.ceil(np.sqrt(count)))
        line = np.linspace(0, 1, count + 2)[1:-1]
        x, y = np.meshgrid(line, line)
        return np.stack([x.reshape(-1), y.reshape(-1)], 1)

In [None]:
# U(x, t) = sin(Ax)cos(Bt)
DIM_A = 1
B = 2
SEED = 1
torch.manual_seed(SEED)
A = torch.randn((1, DIM_A)).to(device)
BATCH_SIZE = 4096

MODEL_NAME = 'AL.CA-2'
N0 =100
N1 = 10000
N_TEST = 5000

MIN_X = 0
MAX_X = np.pi 

EPOCHS = 20000
LOG_EVERY_EPOCH = int(EPOCHS/100)
PLOT_EVERY_EPOCH = int(EPOCHS/5)

LAYERS = 5
NEURONS_PER_LAYER = 80

GRAD_CLIP_VALUE = 5
#ATTACK_TYPE = 'None'
ATTACK_TYPE = 'Linf_change_label_high_dim'
#ATTACK_TYPE = 'Gaussian'

ATTACK_STEPS = 8
ATTACK_EPS = 0.01
ATTACK_DELTA = (ATTACK_EPS / 8) * 1.5

EVAL_ATTACK_STEPS = 8
EVAL_ATTACK_EPS = 0.05
EVAL_ATTACK_DELTA = (ATTACK_EPS / 8) * 1.5

sampling_func = lhs

In [None]:
writer = SummaryWriter(f'./PINN/HD/logs/{MODEL_NAME}/')
hyper_writer = SummaryWriter('./PINN/HD/SUMMARY/')

In [None]:
D0 = []
Y0 = []

for i in range(DIM_A + 1):
    x = torch.tensor(sampling_func(DIM_A + 1, N0) * (MAX_X - MIN_X) + MIN_X).to(device)
    x[:, i] = 0
    y = torch.cos(B * x[:, -1]) * torch.sin((A * x[:, :-1]).sum(dim=1))
    D0.append(x)
    Y0.append(y)

for i in range(DIM_A):
    x = torch.tensor(sampling_func(DIM_A + 1, N0) * (MAX_X - MIN_X) + MIN_X).to(device)
    x[:, i] = MAX_X
    y = torch.cos(B * x[:, -1]) * torch.sin((A * x[:, :-1]).sum(dim=1))
    D0.append(x)  # list of 51 dim vectors [(X11, X12, ... X1M), .... (XN1, ...XNM)]
    Y0.append(y)


D0 = torch.cat(D0)  # matrix of shape (Num samples, 51)
# [(X11, X12, ... X1M), 
#   .... 
#  (XN1, ..., XNM)]
Y0 = torch.cat(Y0).reshape(-1, 1)
boundary_dataset = TensorDataset(D0, Y0)
boundary_loader = DataLoader(boundary_dataset, BATCH_SIZE, shuffle=True)

D_tr = torch.tensor(sampling_func(DIM_A + 1, N1) * (MAX_X - MIN_X) + MIN_X).to(device)
Y_tr = (torch.cos(B * D_tr[:, -1]) * torch.sin((A * D_tr[:, :-1]).sum(dim=1))).reshape(-1, 1)

f = -B * torch.sin(B * D_tr[:, -1]) * torch.sin((A * D_tr[:, :-1]).sum(1)) + \
    (A ** 2).sum() * torch.cos(B * D_tr[:, -1]) * torch.sin((A * D_tr[:, :-1]).sum(1)) - \
    (torch.cos(B * D_tr[:, -1]) * torch.sin((A * D_tr[:, :-1]).sum(1))) + (torch.cos(B * D_tr[:, -1]) * torch.sin((A * D_tr[:, :-1]).sum(1)))**3
f = f.reshape(-1)

train_dataset = TensorDataset(D_tr, f)
train_loader = DataLoader(train_dataset, BATCH_SIZE, shuffle=True)

train_function_dataset = TensorDataset(D_tr, Y_tr)
train_function_loader = DataLoader(train_function_dataset, BATCH_SIZE, shuffle=True)

D_tst = torch.tensor(sampling_func(DIM_A + 1, N1) * (MAX_X - MIN_X) + MIN_X).to(device)
Y_tst = (torch.cos(B * D_tst[:, -1]) * torch.sin((A * D_tst[:, :-1]).sum(dim=1))).reshape(-1, 1)
test_dataset = TensorDataset(D_tst, Y_tst)
test_loader = DataLoader(test_dataset, BATCH_SIZE, shuffle=True)


In [None]:
class Model(nn.Module):
    def __init__(self, input_shape, output_shape, layers, neurons_per_layer):
        super().__init__()
        self.layers = layers
        self.fc1 = nn.Linear(input_shape, neurons_per_layer)
        for i in range(layers - 2):
            layer = nn.Linear(neurons_per_layer, neurons_per_layer)
            setattr(self, f'fc{i + 2}', layer)
        layer = nn.Linear(neurons_per_layer, output_shape)
        setattr(self, f'fc{layers}', layer)

        self.activation =  nn.Tanh()

    def forward(self, x):
        for i in range(self.layers - 1):
            layer = getattr(self, f'fc{i + 1}')
            x = layer(x)
            x = self.activation(x)
        layer = getattr(self, f'fc{self.layers}')
        x = layer(x)
        return x

In [None]:
def derivative(u_, x_, on_dim=1, order=1):
    ones_ = torch.ones_like(u_)

    drv = torch.autograd.grad(u_, x_, create_graph=True, grad_outputs=ones_)[0]
    for i in range(1, order):
        ones_ = torch.zeros_like(drv)
        ones_[:, on_dim] = 1
        drv = torch.autograd.grad(drv, x_, create_graph=True, grad_outputs=ones_)[0]
    return drv


def l2_pgd(x, target, loss_function, steps, eps, delta):
    noise = torch.zeros_like(x).requires_grad_(True)
    for i in range(steps):
      loss = loss_function(x + noise, target)
      loss.backward()
      with torch.no_grad():
        grad = noise.grad
        grad /= torch.norm(grad, p=2, dim=1, keepdim=True)  # p=float('inf')
        noise += grad * delta
        norm2 = torch.norm(noise, p=2, dim=1, keepdim=True)   # p=float('inf')
        norm2 = torch.maximum(norm2, torch.ones_like(norm2) * eps)
        noise *= eps / norm2
        noise.grad.zero_()
    return noise
  

def gaussian_noise(x, target, loss_function, steps, eps, delta):
    noise = torch.randn(x.size()).requires_grad_(True) * eps / np.sqrt(x.size(1))
    return noise


def zero_noise(x, *args, **kwargs):
    return torch.zeros_like(x).requires_grad_(True)

In [None]:
eps_final= 0.2

def Linf_change_label_high_dim(x, target, loss_function, steps, eps = (MAX_X-MIN_X)*eps_final, type_points='D0'):

    if steps>0:
      delta = (eps/steps)*1.5
    noise = torch.zeros_like(x).requires_grad_(True)

    for i in range(steps):
      
      #print(i, (x + noise).shape, target.shape)
      loss = loss_function(x + noise, target)
      loss.backward()
      with torch.no_grad():
        grad = noise.grad
        grad = grad.sign()

        noise += grad * delta

        noise[noise>eps] = eps
        noise[noise<-eps] = -eps
        
        noise.grad.zero_()

      x_noise = x+noise
      if type_points=='D0':
          target = []
          for i in range(1):
              y = torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(dim=1))
              target.append(y)
          #print(torch.tensor(target).shape)
          target = torch.cat(target).reshape(-1, 1)
          #print(target.shape)

      elif type_points=='D_tr' or type_points=='D_tst':        
        #target = (torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(dim=1))).reshape(-1, 1)
        target = -B * torch.sin(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1)) + \
            (A ** 2).sum() * torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1)) - \
            (torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1))) + (torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1)))**3
        target = target.reshape(-1)

    return noise.detach(), target.detach()


def Linf_change_label_gussaian_high_dim(x, target, loss_function, steps, eps = (MAX_X-MIN_X)*eps_final, type_points='D0'):

    if steps>0:
      noise = torch.randn(x.size(), device=device).requires_grad_(True)*eps

      noise[noise>(MAX_X-x)] = MAX_X-x[noise>(MAX_X-x)]
      noise[noise<(MIN_X-x)] = MIN_X-x[noise<(MIN_X-x)] 

      x_noise = x+noise
      if type_points=='D0':
          target = []
          for i in range(1):
              y = torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(dim=1))
              target.append(y)
          #print(torch.tensor(target).shape)
          target = torch.cat(target).reshape(-1, 1)
          #print(target.shape)

      elif type_points=='D_tr' or type_points=='D_tst':        
        #target = (torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(dim=1))).reshape(-1, 1)
        target = -B * torch.sin(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1)) + \
            (A ** 2).sum() * torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1)) - \
            (torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1))) + (torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1)))**3
        target = target.reshape(-1)
    
    else:
        noise = torch.zeros(x.size(), device=device).requires_grad_(True)
      
    return noise, target.detach()


def Linf_without_change_label_gussaian_high_dim(x, target, loss_function, steps, eps = (MAX_X-MIN_X)*eps_final, type_points='D0'):

    if steps>0:
      noise = torch.randn(x.size(), device=device).requires_grad_(True)*eps

      noise[noise>(MAX_X-x)] = MAX_X-x[noise>(MAX_X-x)]
      noise[noise<(MIN_X-x)] = MIN_X-x[noise<(MIN_X-x)] 

      x_noise = x
      if type_points=='D0':
          target = []
          for i in range(1):
              y = torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(dim=1))
              target.append(y)
          #print(torch.tensor(target).shape)
          target = torch.cat(target).reshape(-1, 1)
          #print(target.shape)

      elif type_points=='D_tr' or type_points=='D_tst':        
        #target = (torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(dim=1))).reshape(-1, 1)
        target = -B * torch.sin(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1)) + \
            (A ** 2).sum() * torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1)) - \
            (torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1))) + (torch.cos(B * x_noise[:, -1]) * torch.sin((A * x_noise[:, :-1]).sum(1)))**3
        target = target.reshape(-1)
    
    else:
        noise = torch.zeros(x.size(), device=device).requires_grad_(True)
      
    return noise, target.detach()

In [None]:
if ATTACK_TYPE == 'Gaussian':
    attack_function = gaussian_noise
elif ATTACK_TYPE == 'L2PGD':
    attack_function = l2_pgd
elif ATTACK_TYPE == 'Linf_change_label_high_dim':
    attack_function = Linf_change_label_high_dim
elif ATTACK_TYPE == 'Linf_change_label_gussaian_high_dim':
    attack_function = Linf_change_label_gussaian_high_dim
elif ATTACK_TYPE == 'Linf_without_change_label_gussaian_high_dim':
    attack_function = Linf_without_change_label_gussaian_high_dim
else:
    attack_function = zero_noise
print(attack_function)

In [None]:
# %matplotlib inline
# import matplotlib.pyplot as plt
# from mpl_toolkits import mplot3d

# fig = plt.figure(figsize=plt.figaspect(0.5) * 1.5)
# ax = fig.add_subplot(1, 2, 1, projection="3d")
# X = np.linspace(MIN_X, MAX_X, 100)
# T = np.linspace(MIN_X, MAX_X, 100)
# X, T = np.meshgrid(X, T)
# a = A.cpu().numpy()[0]
# u = torch.tensor(np.stack([X.reshape(-1), T.reshape(-1)], 1)).to(device)
# Z = model(u).cpu().detach().numpy()
# U = np.sin(a * X) * np.cos(B * T)
# Z = Z.reshape(X.shape)
# ax.plot_surface (X, T, Z
#                 , rstride=1 # default value is one
#                 , cstride=1 # default value is one
#                 , cmap='winter'
#                 , edgecolor='none'
#                 )
# ax.scatter3D(D_tr[:, 0].cpu().detach().numpy(),
#               D_tr[:, 1].cpu().detach().numpy(),
#               Y_tr[:, 0].cpu().detach().numpy(),
#               marker='x')
# ax.scatter3D(D0[:, 0].cpu().detach().numpy(),
#               D0[:, 1].cpu().detach().numpy(),
#               Y0[:, 0].cpu().detach().numpy(),
#               marker='x')
# ax.set_xlabel('X')
# ax.set_ylabel('T')
# ax.set_zlabel('Z')



In [None]:
model = Model(DIM_A + 1, 1, LAYERS, NEURONS_PER_LAYER).to(device).double()

optimizer = optim.Adam(model.parameters(),0.001)
#optimizer = optim.SGD(model.parameters(), 0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, int(EPOCHS/10), 0.5)

In [None]:
def mse_output_target(x, y):
    u = model(x)
    return F.mse_loss(u, y)
#u_t=u_xx+u-u^3+f
#u(x,0)=f1(x)
#u(0,t)=g(t)
#u(1,t)=h(t)
# let y=sin(x+t)
#y=exact solution 
#f=(y)_t-(y_xx+y-(y)^3)
def mse_derivatives(x, f):
    u = model(x)

    u_d = derivative(u, x)
    u_t = u_d[:, -1]

    #u_t - lap u
    for i in range(DIM_A):
        u_dd_i = derivative(u, x, i, 2)
        u_xx_i = u_dd_i[:, i]
        u_t -= u_xx_i 
  #   u_t - lap u - u + u3
    u_t = u_t - u + u**3

    #f=u_t-u_xx
    '''
    f = -B * torch.sin(B * x[:, -1]) * torch.sin((A * x[:, :-1]).sum(1)) + \
        (A ** 2).sum() * torch.cos(B * x[:, -1]) * torch.sin((A * x[:, :-1]).sum(1)) - \
        (torch.cos(B * x[:, -1]) * torch.sin((A * x[:, :-1]).sum(1))) + (torch.cos(B * x[:, -1]) * torch.sin((A * x[:, :-1]).sum(1)))**3
    f = f.reshape(-1)
    '''
    return F.mse_loss(u_t, f)


In [None]:
%reload_ext tensorboard

In [None]:
!ps | grep tensorbaord

In [None]:
%tensorboard --logdir ./PINN/HD/

In [None]:
epoch_start = time.time()
previous_loss = None
for epoch in range(EPOCHS):
    model.train()
    train_boundary_loss = 0
    train_collocation_loss = 0

    #if epoch <= int(EPOCHS/2):
    #    eps=(MAX_X-MIN_X)*eps_final*epoch/int(EPOCHS/2)
    #else:
    #    eps=(MAX_X-MIN_X)*eps_final
    eps=(MAX_X-MIN_X)*eps_final

    for i, (x, y) in enumerate(boundary_loader):
        optimizer.zero_grad()

        #noise_u0t = attack_function(x, y, mse_output_target, ATTACK_STEPS, ATTACK_EPS, ATTACK_DELTA)
        #noise_u0t, y = attack_function(x, y, mse_output_target, ATTACK_STEPS, ATTACK_EPS, 'D0')
        noise_u0t, y = Linf_change_label_high_dim(x, y, mse_output_target, ATTACK_STEPS, eps, type_points='D0')
        loss_0 = mse_output_target(x + noise_u0t, y)
        train_boundary_loss += loss_0.item() * x.size(0)

        loss_0.backward()
        nn.utils.clip_grad_value_(model.parameters(), GRAD_CLIP_VALUE)
        optimizer.step()

    for i, (x, y) in enumerate(train_loader):
        optimizer.zero_grad()

        #noise_u = attack_function(x, None, mse_derivatives, ATTACK_STEPS, ATTACK_EPS, ATTACK_DELTA)
      #  noise_u, y = attack_function(x, y, mse_derivatives, ATTACK_STEPS, eps, 'D_tr')
        noise_u, y = Linf_change_label_high_dim(x, y, mse_derivatives, ATTACK_STEPS, eps, type_points='D_tr')
        adv_input = x + noise_u
        adv_input.requires_grad = True

        loss_1 = mse_derivatives(adv_input, y)
        train_collocation_loss += loss_1 * x.size(0)

        loss_1.backward()
        nn.utils.clip_grad_value_(model.parameters(), GRAD_CLIP_VALUE)
        optimizer.step()

    train_boundary_loss /= len(boundary_loader.dataset)
    train_collocation_loss /= len(train_loader.dataset)
    total_train_loss = train_boundary_loss + train_collocation_loss

    writer.add_scalars('loss', {
        'Total': total_train_loss,
        'Boundary': train_boundary_loss,
        'Collocation': train_collocation_loss
    }, epoch)

    scheduler.step()

    if epoch % LOG_EVERY_EPOCH == 0:
        print('Epoch', epoch)
        model.eval()

        function_loss = 0
        for i, (x, y) in enumerate(train_function_loader):
            u = model(x)
            function_loss += F.mse_loss(u, y).item() * x.size(0)
        function_loss /= len(train_loader.dataset)
        print(f'Training loss {total_train_loss}, function loss {function_loss}')


        clean_test_function_loss = 0
        adv_test_function_loss = 0
        for i, (x, y) in enumerate(test_loader):
            #noise_test = attack_function(x, None, mse_derivatives, ATTACK_STEPS, ATTACK_EPS, ATTACK_DELTA)
            #noise_test, y = attack_function(x, y, mse_derivatives, ATTACK_STEPS, ATTACK_EPS, 'D_tst')
            u_test = model(x)
            clean_test_function_loss += F.mse_loss(u_test, y).item() * x.size(0)
            #u_test_adv = model(x + noise_test)
            #adv_test_function_loss += F.mse_loss(u_test_adv, y).item() * x.size(0)
        clean_test_function_loss /= len(test_loader.dataset)
        #adv_test_function_loss /= len(test_loader.dataset)
        print(f'Test function loss: Clean {clean_test_function_loss}, '
              #f'Adv {adv_test_function_loss}'
              )
        print('time', time.time() - epoch_start)
        epoch_start = time.time()
        writer.add_scalars('Function Loss', {
            'Train': function_loss,
            'Test': clean_test_function_loss,
            #'Test Adv': adv_test_function_loss,
        }, epoch)
        writer.flush()
    # if previous_loss is None or loss.item() < LAMBDA_PLOT * previous_loss or \
    #             epoch % PLOT_EVERY_EPOCH == 0:
    #     plot_model(f'Epoch {epoch}', epoch)
    #     previous_loss = loss.item()

In [None]:
def evaluate(loader):
    function_loss = 0
    for x, y in loader:
       u = model(x)
       function_loss += F.mse_loss(u, y).item()
    function_loss /= len(loader.dataset)
    return function_loss

In [None]:
'''
model.eval()
train_function_loss = evaluate(train_loader)
print(f'Training loss {total_train_loss.item()}, function loss {train_function_loss}')


clean_test_function_loss = 0
adv_test_function_loss = 0
for i, (x, y) in enumerate(test_loader):
    noise_test = l2_pgd(x, None, mse_derivatives, EVAL_ATTACK_STEPS, EVAL_ATTACK_EPS, 
                    EVAL_ATTACK_DELTA)
    u_test = model(x)
    clean_test_function_loss += F.mse_loss(u_test, y).item() * x.size(0)
    u_test_adv = model(x + noise_test)
    adv_test_function_loss += F.mse_loss(u_test_adv, y).item() * x.size(0)
clean_test_function_loss /= len(test_loader.dataset)
adv_test_function_loss /= len(test_loader.dataset)

print(f'Test function loss: Clean {clean_test_function_loss}, '
      f'Adv {adv_test_function_loss}')
'''

In [None]:
hyper_writer.add_hparams(
    {
        'N0': N0,
        'N1': N1,
        'N_TEST': N_TEST,
        'DIM_A': DIM_A,
        'B': B,
        'SEED': SEED,
        'EPOCHS': EPOCHS,
        'BATCH_SIZE': BATCH_SIZE,
        'LAYERS': LAYERS,
        'NEURONS_PER_LAYER': NEURONS_PER_LAYER,
        'ATTACK_TYPE': ATTACK_TYPE,
        'ATTACK_STEPS': ATTACK_STEPS,
        'ATTACK_DELTA': ATTACK_DELTA,
        'ATTACK_EPS': ATTACK_EPS,
        'EVAL_ATTACK_STEPS': EVAL_ATTACK_STEPS, 
        'EVAL_ATTACK_EPS': EVAL_ATTACK_EPS,
        'EVAL_ATTACK_DELTA': EVAL_ATTACK_DELTA,
        'MIN_X': MIN_X,
        'MAX_X': MAX_X,
        'SAMPLING_FUNCTION': sampling_func.__name__,
    },
    {
        'training_function_loss': train_function_loss,
        'test_clean_function_loss': clean_test_function_loss,
        'test_adv_function_loss': adv_test_function_loss,
    },
    run_name=MODEL_NAME,
)

In [None]:
writer.close()
hyper_writer.close()