In [1]:
import torch
from torch import nn
import numpy as np
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
from scipy import io

from models import DeepONet, BelNet, StochasticFeatures
from utils import get_data, train_model, validate_model


In [2]:
def test_loaded_model(path, model):
    criterion = nn.MSELoss()  
    model.load_state_dict(torch.load(path))
    model.to(device)
    '''
        test on data same as training time
    '''
    np.random.seed(42) # fixed seed
    train_data, test_data = get_data(1000,200)
    train_data= [torch.from_numpy(d).float().to(device).reshape((1000,128,1)) for d in train_data]
    test_data = [torch.from_numpy(d).float().to(device).reshape((200,128,1)) for d in test_data]
    (x_train, a_train, y_train, u_train) = train_data
    (x_test, a_test, y_test, u_test)     = test_data
    print("train shape:", x_train.shape, a_train.shape, y_train.shape, u_train.shape)
    print("test shape:", x_test.shape, a_test.shape, y_test.shape, u_test.shape)
        
    validate_model(model, test_data, criterion, device=device)
    
    '''
        test on interpolated data
    '''
    def get_data_subsample_input(ntrain, ntest):
        N = ntrain + ntest
        data = io.loadmat("./Burgers/burgers_data_R10.mat")
    
        def get_fixed_indices_input(N):
            indices_1 = np.arange(0, 2**13, 2**6)
            index_array = np.zeros((N, 2**7), dtype=int) 
            for n in range(N):
                index_array[n] = indices_1
            index_array = index_array[:,2::4]
            print(index_array.shape)
            return index_array
    
        def get_fixed_indices_output(N):
            indices_1 = np.arange(0, 2**13, 2**6)
            index_array = np.zeros((N, 2**7), dtype=int) 
            for n in range(N):
                index_array[n] = indices_1
            return index_array
    
        input_func_data  = data["a"][:N, :].astype(np.float32)
        output_func_data = data["u"][:N, :].astype(np.float32)

        from scipy.interpolate import interp1d
        input_index_array = get_fixed_indices_input(N)
        raw_a_data = input_func_data[np.arange(N)[:, None], input_index_array]
        a_data = np.zeros((N, 128))
        for i in range(N):
            f = interp1d(input_index_array[i,:], raw_a_data[i,:], kind='linear', fill_value="extrapolate")
            a_data[i,:] = f(np.arange(0, 2**13, 2**6))
                
        output_index_array = get_fixed_indices_output(N)
        y_data = output_index_array * (1.0 / 2**13)
        u_data = output_func_data[np.arange(N)[:, None], output_index_array]
    
        train_data = (np.zeros((1000,1)), a_data[:ntrain], y_data[:ntrain], u_data[:ntrain]) 
        test_data  = (np.zeros((200,1)), a_data[ntrain:], y_data[ntrain:], u_data[ntrain:])
        return train_data, test_data
    
    np.random.seed(42) # fixed seed
    train_data, test_data = get_data_subsample_input(1000,200)
    train_data= [torch.from_numpy(d).float().to(device).unsqueeze(2) for d in train_data]
    test_data = [torch.from_numpy(d).float().to(device).unsqueeze(2) for d in test_data]
    (x_train, a_train, y_train, u_train) = train_data
    (x_test, a_test, y_test, u_test)     = test_data
    print("train shape:", x_train.shape, a_train.shape, y_train.shape, u_train.shape)
    print("test shape:", x_test.shape, a_test.shape, y_test.shape, u_test.shape)
    validate_model(model, test_data, criterion, device=device)

paths = [
    './saved_models/model_ex0.pth',
]
models = [
    DeepONet(),
]

for model_idx, (path, model) in enumerate(zip(paths, models)):
    print(f"testing model {model_idx}")
    test_loaded_model(path, model)

testing model 0
train shape: torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1])
test shape: torch.Size([200, 128, 1]) torch.Size([200, 128, 1]) torch.Size([200, 128, 1]) torch.Size([200, 128, 1])
Validation Loss: 0.00060
mean Relative L2 Loss: 0.02260
(1200, 32)
train shape: torch.Size([1000, 1, 1]) torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1])
test shape: torch.Size([200, 1, 1]) torch.Size([200, 128, 1]) torch.Size([200, 128, 1]) torch.Size([200, 128, 1])
Validation Loss: 0.00058
mean Relative L2 Loss: 0.02317


In [3]:
def test_loaded_model(path, model):
    criterion = nn.MSELoss()  
    model.load_state_dict(torch.load(path))
    model.to(device)
    '''
        test on data same as training time
    '''
    np.random.seed(42) # fixed seed
    train_data, test_data = get_data(1000,200)
    train_data= [torch.from_numpy(d).float().to(device).reshape((1000,128,1)) for d in train_data]
    test_data = [torch.from_numpy(d).float().to(device).reshape((200,128,1)) for d in test_data]
    (x_train, a_train, y_train, u_train) = train_data
    (x_test, a_test, y_test, u_test)     = test_data
    print("train shape:", x_train.shape, a_train.shape, y_train.shape, u_train.shape)
    print("test shape:", x_test.shape, a_test.shape, y_test.shape, u_test.shape)
        
    validate_model(model, test_data, criterion, device=device)

    '''
        test on one-fourth of input data
    '''
    def get_data_subsample_input(ntrain, ntest):
        N = ntrain + ntest
        data = io.loadmat("./Burgers/burgers_data_R10.mat")
    
        def get_fixed_indices_input(N, num_samples):
            indices_1 = np.arange(0, 2**13, 2**6)
            index_array = np.zeros((N, 2**7), dtype=int) 
            for n in range(N):
                index_array[n] = indices_1
            index_array = index_array[:,2::4]
            print(index_array.shape)
            return index_array
    
        def get_fixed_indices_output(N):
            indices_1 = np.arange(0, 2**13, 2**6)
            index_array = np.zeros((N, 2**7), dtype=int) 
            for n in range(N):
                index_array[n] = indices_1
            return index_array
    
        input_func_data  = data["a"][:N, :].astype(np.float32)
        output_func_data = data["u"][:N, :].astype(np.float32)

        input_index_array = get_fixed_indices_input(N, num_samples = 32)
        x_data = input_index_array * (1.0 / 2**13)
        a_data = input_func_data[np.arange(N)[:, None], input_index_array]

        def interpolate_inputs(arr, multi):
            N,m = arr.shape
            new_arr = np.concatenate([arr, arr[:,-1:]], axis=1)
            a = new_arr[:, :-1]  
            b = new_arr[:, 1:]   
            result = np.empty((arr.shape[0], multi*m), dtype=arr.dtype)
            result[:, 0::multi] = arr
            for i in range(1,multi):
                result[:, i::multi] = (1-i/multi) * a + (i/multi) * b
        
            return result
        
        output_index_array = get_fixed_indices_output(N)
        y_data = output_index_array * (1.0 / 2**13)
        u_data = output_func_data[np.arange(N)[:, None], output_index_array]
    
        train_data = (x_data[:ntrain], a_data[:ntrain], y_data[:ntrain], u_data[:ntrain]) 
        test_data  = (x_data[ntrain:], a_data[ntrain:], y_data[ntrain:], u_data[ntrain:])
        return train_data, test_data
    
    np.random.seed(42) # fixed seed
    train_data, test_data = get_data_subsample_input(1000,200)
    train_data= [torch.from_numpy(d).float().to(device).unsqueeze(2) for d in train_data]
    test_data = [torch.from_numpy(d).float().to(device).unsqueeze(2) for d in test_data]
    (x_train, a_train, y_train, u_train) = train_data
    (x_test, a_test, y_test, u_test)     = test_data
    print("train shape:", x_train.shape, a_train.shape, y_train.shape, u_train.shape)
    print("test shape:", x_test.shape, a_test.shape, y_test.shape, u_test.shape)
    validate_model(model, test_data, criterion, device=device)

paths = [
    './saved_models/model_ex1.pth',
    './saved_models/model_ex2.pth',
    './saved_models/model_ex3.pth',
    './saved_models/model_ex4.pth',
]
models = [
    BelNet(num_learned_basis = 50, num_fixed_basis = 0),
    BelNet(),
    BelNet(),
    BelNet(),
]

for model_idx, (path, model) in enumerate(zip(paths, models)):
    print(f"testing model {model_idx}")
    test_loaded_model(path, model)

testing model 0




train shape: torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1])
test shape: torch.Size([200, 128, 1]) torch.Size([200, 128, 1]) torch.Size([200, 128, 1]) torch.Size([200, 128, 1])
Validation Loss: 0.00027
mean Relative L2 Loss: 0.01553
(1200, 32)
train shape: torch.Size([1000, 32, 1]) torch.Size([1000, 32, 1]) torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1])
test shape: torch.Size([200, 32, 1]) torch.Size([200, 32, 1]) torch.Size([200, 128, 1]) torch.Size([200, 128, 1])
Validation Loss: 0.00029
mean Relative L2 Loss: 0.01706
testing model 1
train shape: torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1]) torch.Size([1000, 128, 1])
test shape: torch.Size([200, 128, 1]) torch.Size([200, 128, 1]) torch.Size([200, 128, 1]) torch.Size([200, 128, 1])
Validation Loss: 0.00029
mean Relative L2 Loss: 0.01582
(1200, 32)
train shape: torch.Size([1000, 32, 1]) torch.Size([1000, 32, 1]) torch.Size([1000, 128,