In [1]:
import torch
import torch.nn as nn
import time
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random

In [2]:
class FNN(nn.Module):
    '''Full-Connected Neural Network with Batch Normalization'''
    def __init__(self, input_dim: int, num_hiddens, output_dim: int, activate_fun, device=torch.device("cpu")):
        super(FNN, self).__init__()
        self.num_hiddens = num_hiddens
        self.activate_fun = activate_fun
        self.input_dim = input_dim
        self.output_dim = output_dim

        self.dense_layers = nn.ModuleList()
        self.dense_layers.append(nn.Linear(input_dim, num_hiddens[0]))

        # 添加隐藏层
        for i in range(len(num_hiddens) - 1):
            self.dense_layers.append(nn.Linear(num_hiddens[i], num_hiddens[i+1]))
        
        self.dense_layers.append(nn.Linear(num_hiddens[-1], output_dim))

        self.to(device)

    def forward(self, x):
        for i in range(len(self.dense_layers) - 1):
            x = self.dense_layers[i](x)
            x = self.activate_fun(x)
        x = self.dense_layers[-1](x)
        return x

In [3]:
def sample_path(n_sample, d, n_time, h, lam, delta, mu, sig):
    sqrth = torch.sqrt(torch.tensor(h))
    
    dW_sample = torch.zeros(n_sample, d, n_time)
    Jumps_sample = torch.zeros(n_sample, d, n_time)
    X_sample = torch.zeros(n_sample, d, n_time + 1)
    
    for i in range(n_time):
        dW_sample[:, :, i] = torch.normal(mean=0.0, std=1.0, size=(n_sample, d)) * sqrth
        P_sample = torch.poisson(torch.full((n_sample * d,), lam * h))
        Jumps_sample[:, :, i] = torch.tensor(
            [torch.sum(torch.empty(int(k)).uniform_(-delta, delta)) for k in P_sample]
        ).reshape(n_sample, d)
        
        X_sample[:, :, i+1] = X_sample[:, :, i] + mu * h + \
            sig * dW_sample[:, :, i] + Jumps_sample[:, :, i]
    
    return dW_sample, X_sample, Jumps_sample

def f(t, X, Y, Z, U, delta = 1):
    # nonlinear term
    exp_term = torch.exp(torch.sin(X + t) + 2.0)
    term1 = (Y - 2.0) * torch.exp(Y) / (2.0 * exp_term)
    term2 = torch.sum(Z, dim=1, keepdim=True) * Y / exp_term
    result = term1 - term2 - U
    return result

def g(t, X):
    # terminal conditions
    output = torch.sin(X + t) + 2.0
    return output

def dg_dx(t,x):
    return torch.cos(x+t)

In [None]:
# set random seed
seed = 2024
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

# default start time is 0
T = 1 # total time
d = 1 # dimension
n_time = 20 # time interval number
mu = 0   # drift 
sig = 1.0 # diffusion coefficient
lam = 2    # jump activity
delta = 1    # jump size


h = T/n_time # time interval step
n_sample = 64
time_grid = np.linspace(0, T, n_time + 1)

# Generate the sample paths
dW_sample, X_sample, Jumps_sample = sample_path(n_sample, d, n_time, h, lam, delta, mu, sig)

plt.figure(figsize=(12, 6))
for i in range(n_sample):
    plt.plot(time_grid, X_sample[i, 0, :])

plt.title('Sample Paths of X(t)')
plt.xlabel('Time')
plt.ylabel('X(t)')
# plt.legend()
plt.grid(True)
plt.show()

In [None]:
modelList_Z = [FNN(input_dim=d, 
                   num_hiddens=[11,11], 
                   output_dim = 1, 
                   activate_fun=nn.ReLU()) 
               for i in range(n_time-1)]
modelList_U = [FNN(input_dim=d, 
                   num_hiddens=[11,11], 
                   output_dim = 1, 
                   activate_fun=nn.ReLU()) 
               for i in range(n_time-1)]

y_init = torch.nn.Parameter(torch.empty(1, 1).uniform_(1.75, 1.85))
z_init = torch.nn.Parameter(torch.empty(1, 1).uniform_(0.7, 0.8))
u_init = torch.nn.Parameter(torch.empty(1, 1).uniform_(-0.2, 0.2))


def loss_fun(dW_sample, X_sample, Jumps_sample):
    Y = torch.ones((n_sample,1)) *y_init
    Z = torch.ones((n_sample,1)) *z_init
    U = torch.ones((n_sample,1)) *u_init


    for i in range(n_time):
        Y = Y - f(time_grid[i], X_sample[:, :, i], Y, Z, U, delta) * h
        Y = Y + Z * dW_sample[:, :, i] + U*h

        if i < n_time - 1:
            Z = modelList_Z[i](X_sample[:, :, i + 1])
            U = modelList_U[i](Z * Jumps_sample[:, :, i])

    term_delta = Y - g(time_grid[i+1], X_sample[:,:,i+1]) 
    clamp_term_delta = torch.clamp(term_delta, -50, 50)
    loss = torch.mean(clamp_term_delta**2)
    
    return loss
   
def error_dgdx(X_sample):
    with torch.no_grad():
        dgdx_error = []
        for i in range(n_time-1):
            nn_result = modelList_Z[i](X_sample[:, :, i + 1])
            fun_result = dg_dx(time_grid[i+1],X_sample[:, :, i + 1])
            error = torch.mean(torch.abs(nn_result - fun_result))
            dgdx_error.append(error.item())
    return np.mean(dgdx_error)


params = [y_init, z_init, u_init] 

for model in modelList_Z:
    params += list(model.parameters())

for model in modelList_U:
    params += list(model.parameters())

optimizer = torch.optim.Adam(params, lr=0.0005)

start_time = time.time()
train_his = []

for i in range(4000):
    dW_sample, X_sample, Jumps_sample = sample_path(n_sample, d, n_time, h, lam, delta, mu, sig)  

    loss = loss_fun(dW_sample, X_sample, Jumps_sample)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    if (i+1) % 100 == 0 or i == 0:
        dgdx_error = error_dgdx(X_sample)

        print(f'iter:{i+1}, loss:{loss.item():.4e}, time:{time.time()-start_time:.2f}, y0:{torch.mean(y_init).item():.4f}, z_error:{dgdx_error:.4f}')
        train_his.append({'iter':i+1,'loss': loss.item(), 'y0': torch.mean(y_init).item(), 'time': time.time()-start_time, 'z_error':dgdx_error})
        
        
    

In [None]:
plt.plot([i['iter'] for i in train_his], [i['loss'] for i in train_his])
plt.yscale('log')
plt.show()

In [None]:
# 绘制损失、y0
plt.plot([i['iter'] for i in train_his], [i['z_error'] for i in train_his])
plt.yscale('log')
plt.show()

In [9]:
df = pd.DataFrame(train_his)
df.to_csv(f'd1_train_his_{seed}.csv', index=False)

In [None]:
plt.plot([i['iter'] for i in train_his], [i['z_error'] for i in train_his])
plt.xlabel('Epoch')
plt.ylabel('Mean Absolute Error of $\\nabla u$')
plt.savefig('d1-z_error.pdf',dpi=300)
plt.show()

In [None]:
window_size = 2
df['loss_smooth'] = df['loss'].rolling(window=window_size).mean()
df.loc[df['loss_smooth'].isna(), 'loss_smooth'] = df['loss'].iloc[0]
df.loc[0, 'iter'] = 0

plt.plot(df['iter'],df['loss_smooth'])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.savefig('d1-loss.pdf',dpi=300)
plt.show()


In [None]:
df['relativeL1error'] = abs(df['y0']-2)/df['y0']
window_size = 2
df['relativeL1error_smooth'] = df['relativeL1error'].rolling(window=window_size).mean()
df.loc[df['relativeL1error_smooth'].isna(), 'relativeL1error_smooth'] = df['relativeL1error'].iloc[0]

plt.plot(df['iter'],df['relativeL1error_smooth'])
plt.xlabel('Epoch')
plt.ylabel('Error')
plt.savefig('d1-error.pdf',dpi=300)
plt.show()