In [4]:
import numpy as np
import torch
import torch.nn as nn
import optuna
import torch.optim as optim

from loss_function import gradient_norm_loss
from Constant import C

In [5]:
class Model(nn.Module):
    def __init__(self, num_node):
        super(Model, self).__init__()
        self.fc1 = nn.Linear(2,num_node)
        self.fc2 = nn.Linear(num_node, num_node)
        self.fc3 = nn.Linear(num_node, num_node)
        self.fc3 = nn.Linear(num_node, num_node)
        self.fc4 = nn.Linear(num_node, num_node)
        self.fc5 = nn.Linear(num_node, num_node)
        self.fc6 = nn.Linear(num_node, num_node)
        self.fc7 = nn.Linear(num_node, num_node)
        self.fc8 = nn.Linear(num_node, num_node)
        self.fc9 = nn.Linear(num_node, 1)

    def forward(self, x):
        '''
        x = torch([batch, t, x])
        '''
        x = torch.tanh(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        x = torch.tanh(self.fc3(x))
        x = torch.tanh(self.fc4(x))
        x = torch.tanh(self.fc5(x))
        x = torch.tanh(self.fc6(x))
        x = torch.tanh(self.fc7(x))
        x = torch.tanh(self.fc8(x))
        x = self.fc9(x)
        return x

In [None]:
def main(data_path, sample_batch_size, boundary_batch_size, model, learning_rate, max_step):
    
    dataset = data_load(data_path)

    #model = Model()
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()
    grad_loss = gradient_norm_loss()

    params = 0
    for p in model.parameters():
        if p.requires_grad:
            params += p.numel()
    print('params: ', params)

    writer = SummaryWriter(log_dir='./logs')

    bc_and_init = define_bc_and_init(dataset)
    
    for step in range(max_step):
        #boundary_and_initial calucrate
        boundary_input, boundary_label = get_boudary_point(bc_and_init, dataset, boundary_batch_size)
        u = model(boundary_input)
        loss_1 = criterion(u, boundary_label)

        #sample calucurate
        sample_input, _ = get_sample_point(dataset, sample_batch_size)
        f_list, grad = f(sample_input, model)
        fnc = torch.cat(f_list, dim=0).unsqueeze(1)
        loss_2 = criterion(fnc, torch.zeros_like(fnc))
        loss_3 = grad_loss(grad)

        loss = loss_1 + loss_2 + 0.1 * loss_3
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print('step {} : loss {}'.format(step, loss))
        writer.add_scalar('Loss/step',loss, step)
        if step % 50 == 0:
            evaluate(model, dataset, step, writer)
            torch.save(model.state_dict(), 'checkpoint/step_{}.pth'.format(step))

    writer.close()


def evaluate(model, dataset, step, writer):
    model.eval()
    L2_loss = 0
    with torch.no_grad():
        for t_index, t in enumerate(dataset['t']):
            for x_index, x in enumerate(dataset['x']):
                #print('t', t.shape)
                #print('x', x)
                input_array = torch.tensor(np.concatenate([t, x]), dtype=torch.float32)
                #print(input_array)
                pred_u = model(input_array)
                L2_loss += (pred_u.item() - dataset['u'][x_index][0][t_index]) ** 2
    writer.add_scalar('pre_L2loss/step', L2_loss.item(), step)
    print('pre_L2loss: {}'.format(L2_loss.item()))
    model.train()

def define_bc_and_init(dataset):
    max_time_step = dataset['t'].shape[0]
    left_pos_index = 0
    right_pos_index = dataset['x'].shape[0]
    left_bc = np.concatenate([np.arange(1, max_time_step, dtype=np.int32).reshape(max_time_step - 1, 1), \
                              np.full((max_time_step - 1, 1), left_pos_index)], axis=1)
    #print(left_bc)
    right_bc = np.concatenate([np.arange(1, max_time_step, dtype=np.int32).reshape(max_time_step - 1, 1), \
                              np.full((max_time_step - 1, 1), right_pos_index - 1)], axis=1)
    #print(right_bc)
    initial_con = np.concatenate([np.full((right_pos_index, 1), 0), \
                                  np.arange(right_pos_index, dtype=np.int32).reshape(right_pos_index, 1)], axis=1)
    #print(initial_con)
    #print('*'*80)
    return np.concatenate([left_bc, right_bc, initial_con], axis=0)

def f(sample_input, model):
    f_list = []
    grad_list = []
    for one_sample in sample_input:
        t = one_sample[0].unsqueeze(0)
        x = one_sample[1].unsqueeze(0)
        u = model(torch.cat([t, x], dim=0))
        gradient = torch.autograd.grad(u, (t, x), retain_graph=True, create_graph=True)
        f = gradient[0] + C * gradient[1]
        f_list.append(f)
        grad_list.extend([gradient[0], gradient[1]])
    return f_list, torch.tensor(grad_list, dtype=torch.float32)
    
def data_load(data_path):
    with open(data_path,mode='rb') as f:
        data = pickle.load(f)
    return data

def get_label_list(label, batch_point):
    '''
    label [time * stencil]
    batch_point [batch_size * 2(time, point)]
    '''
    label_list = []
    for point in batch_point:
        value = torch.unsqueeze(label[point[0], point[1]], 0)
        label_list.append(value)
    return label_list

def get_sample_point(dataset, sample_batch_size):
    time_step = np.random.randint(1, dataset['t'].shape[0], (sample_batch_size, 1))
    #time_step = np.array([0])
    #time_step = time_step.astype(np.float32)
    sample_stencil = np.random.randint(1, dataset['x'].shape[0] - 2, (sample_batch_size, 1))
    #sample_stencil = sample_stencil.astype(np.float32)
    pair_batch = np.concatenate([time_step, sample_stencil], axis=1)
    #print("pair_batch", pair_batch)
    batch_list = []
    for pair in pair_batch:
        one_dataset = np.concatenate([dataset['t'][pair[0]], \
                                      dataset['x'][pair[1]], \
                                      np.array([dataset['u'][pair[1]][0][pair[0]]])])                             
        batch_list.append(one_dataset)
    numerical_data = np.stack(batch_list, axis=0)
    #print('numerical data', numerical_data)
    return torch.tensor(numerical_data[:, 0:2], dtype=torch.float32, requires_grad=True), torch.tensor(numerical_data[:, 2], dtype=torch.float32).view(-1, 1)

def get_boudary_point(bc_and_init, dataset, boundary_batch_size):

    index_list = np.random.choice(range(bc_and_init.shape[0]), boundary_batch_size)
    pair_batch = np.stack([bc_and_init[index] for index in index_list], axis=0)

    batch_list = []
    for pair in pair_batch:
        one_dataset = np.concatenate([dataset['t'][pair[0]], \
                                      dataset['x'][pair[1]], \
                                      np.array([dataset['u'][pair[1]][0][pair[0]]])])                             
        batch_list.append(one_dataset)

    numerical_data = np.stack(batch_list, axis=0)
    return torch.tensor(numerical_data[:, 0:2], dtype=torch.float32, requires_grad=True), torch.tensor(numerical_data[:, 2], dtype=torch.float32).view(-1, 1)

In [None]:
def objective(trial):
    num_node = trial.suggest_int('num_nodes', 20, 100)
    learning_rate = trial.suggest_uniform('lr', 1e-4, 1e-1)
    sample_batch_size = trial.suggest_int('sample_batch_size', 500, 8000)
    boundary_batch_size = trial.suggest_int('boundary_batch_size', 20, 300)
    gradient_loss_weight = trial.suggest_uniform('gradient_loss_weight', 1e-2, 10)
    sample_loss_weight = trial.suggest_uniform('sample_loss_weight', 1e-2, 10)
    
    model = Model
    

In [None]:
study = optuna.create_study()
study.optimize(objective, n_trials = 100)