In [None]:
import math
import time
import numpy as np
from matplotlib import pyplot as plt
from scipy.stats import norm

import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset,RandomSampler

In [None]:
device = torch.device("cpu")

#n = 100 
K = 1

r = 0.05;gamma = 0.05
sigma = 0.3;T = 1

x_start = -np.exp(4*sigma*np.sqrt(T));x_end = np.exp(4*sigma*np.sqrt(T))
t_start = 0.;t_end = 1.

x = np.linspace(x_start,x_end,100 )
t = np.linspace(t_start,t_end,100 )
x, t = np.meshgrid(x, t)
x = np.reshape(x, (np.size(x[:]),1))
t = np.reshape(t, (np.size(t[:]),1))

x_left = np.linspace(x_start,x_start, 100 )
t_left = np.linspace(t_start,t_end, 100 ) 
v_left = np.zeros(100)
x_left = x_left.reshape(-1, 1)
t_left = t_left.reshape(-1, 1)
v_left = v_left.reshape(-1, 1)

x_up = np.linspace(x_start,x_end, 100 )
t_up = np.linspace(t_end,t_end, 100 ) 
v_up = (x_up > 0)*x_up
x_up = x_up.reshape(-1, 1)
t_up = t_up.reshape(-1, 1)
v_up = v_up.reshape(-1, 1)

x_right = np.linspace(x_end,x_end, 100 )
t_right = np.linspace(t_start,t_end, 100 ) 
v_right = np.exp(-r*(T-t_right))
x_right = x_right.reshape(-1, 1)
t_right = t_right.reshape(-1, 1)
v_right = v_right.reshape(-1, 1)


x_bound = [x_left,x_up,x_right]
t_bound = [t_left,t_up,t_right]
v_bound = [v_left,v_up,v_right]


batchsize = 128 
batch_flag = True

d = -np.log(1 + np.abs(x))/(sigma*np.sqrt(T-t)) + (sigma/2)*np.sqrt(T-t) 

for i in range(9900,10000):
    d[i] = -100000

actual_soln = np.exp(-gamma* (T - t))*((x > 0)*(x) + (d*sigma*np.sqrt(T - t) + 1)* norm.cdf(d)/2 - (np.abs(x) + 1)* norm.cdf(d - np.sqrt(T-t))/2  + (sigma*np.sqrt(T - t))* norm.pdf(d))  

In [None]:
def plot_graph(soln,soln_name):
    x = np.linspace(x_start,x_end,100);t = np.linspace(t_start,t_end,100)
    x,t = np.meshgrid(x,t)
    ax = plt.axes(projection='3d')
    ax.plot_surface(x,t,soln.reshape(100,100))
    plt.title(soln_name)
    plt.show()

def legendre_coeffs(num):
		lis = [np.array([1]),np.array([0,1])]
		for n in range(1,num-1):
			xl1 = np.concatenate((np.array([0]),lis[n]),axis = 0)
			l0 = np.concatenate((lis[n-1],np.array([0,0])),axis = 0)
			l = ((2*n + 1)*xl1 - n*l0)/(n + 1)
			lis.append(l)
		for n in range(num):
			lis[n] = np.concatenate((lis[n],np.array([0]*(num-1-n))),axis = 0)
		return lis

def poly(coeffs,x):
	sum = torch.zeros_like(x)
	for i,coeff in enumerate(coeffs):
		sum = sum + coeff*(x**i)
	return sum
	
class Legendre_PINN(nn.Module):
	def __init__(self,basis_num):
		super(Legendre_PINN,self).__init__()
		self.tanh = nn.Tanh()
		self.basis_num = basis_num
		self.main = nn.Sequential(
			#nn.Linear(2*basis_num,2*basis_num),
			nn.Linear(2*basis_num,1))		
	def forward(self,input):
		coeffs = legendre_coeffs(self.basis_num)
		x = input[:,0]
		t = input[:,1]
		netin = torch.Tensor([])
		for i in range(self.basis_num):
			netin = torch.cat((netin,poly(coeffs[i],x.view(-1,1))),1)
		for i in range(self.basis_num):
			netin = torch.cat((netin,poly(coeffs[i],t.view(-1,1))),1)
		netout = self.main(netin)
		return netout


In [None]:
def train(device,x,t,x_bound,t_bound,v_bound,learning_rate,basis_num,epochs,batch_flag,batch_size):

	xnet = torch.Tensor(x).to(device)
	tnet = torch.Tensor(t).to(device) 
	x_left,x_up,x_right = x_bound
	t_left,t_up,t_right = t_bound
	V_left,V_up,V_right = v_bound

	x_left = torch.Tensor(x_left).to(device) 	
	t_left = torch.Tensor(t_left).to(device) 	
	V_left = torch.Tensor(V_left).to(device)
	x_up = torch.Tensor(x_up).to(device) 	
	t_up = torch.Tensor(t_up).to(device) 	
	V_up = torch.Tensor(V_up).to(device)
	x_right = torch.Tensor(x_right).to(device) 	
	t_right = torch.Tensor(t_right).to(device) 	
	V_right = torch.Tensor(V_right).to(device)

	if(batch_flag):
		dataset = TensorDataset(xnet,tnet)
		dataloader = DataLoader(dataset, batch_size=batch_size,shuffle=True,num_workers = 0,drop_last = True )
		print(len(dataloader))


	net = Legendre_PINN(basis_num).to(device)
	
	def init_normal(m):
		if type(m) == nn.Linear:
			nn.init.kaiming_normal_(m.weight)

	net.apply(init_normal)

	optimizer = optim.Adam(net.parameters(), lr=learning_rate, betas = (0.9,0.99),eps = 10**-15)

	def Loss_criterion(xnet,tnet):
		xnet.requires_grad = True
		tnet.requires_grad = True
		points = torch.cat((xnet,tnet),1) 
		V = net(points)
		V = V.view(len(V),-1)
		V_x = torch.autograd.grad(V,xnet,grad_outputs=torch.ones_like(xnet),create_graph = True,only_inputs=True)[0]
		V_xx = torch.autograd.grad(V_x,xnet,grad_outputs=torch.ones_like(xnet),create_graph = True,only_inputs=True)[0]
		V_t = torch.autograd.grad(V,tnet,grad_outputs=torch.ones_like(tnet),create_graph = True,only_inputs=True)[0]
		u = ((r - gamma)*V_x - xnet*(sigma**2)*V_xx > 0).long()
		loss = V_t + (u - xnet)*(r - gamma)*V_x + ((u - xnet)**2)*(sigma**2)*V_xx/2 - gamma * V
		return nn.MSELoss()(loss,torch.zeros_like(loss)) 

	def Loss_BC(x_left,t_left,V_left,x_up,t_up,V_up,x_right,t_right,V_right):
		x_right.requires_grad = True
		left = torch.cat((x_left, t_left), 1)
		out = net(left)
		V_left_pred = out.view(len(out), -1)
		loss_f = nn.MSELoss()
		loss_left = loss_f(V_left_pred,V_left)

		up = torch.cat((x_up, t_up), 1)
		out = net(up)
		V_up_pred = out.view(len(out), -1)
		loss_f = nn.MSELoss()
		loss_up = loss_f(V_up_pred,V_up)
		
		right = torch.cat((x_right, t_right), 1)
		out = net(right)
		V_right_pred = out.view(len(out), -1)
		Vx_right_pred = torch.autograd.grad(V_right_pred,x_right,grad_outputs=torch.ones_like(x_right),create_graph = True,only_inputs=True)[0]
		loss_f = nn.MSELoss()
		loss_right = loss_f(Vx_right_pred,V_right)

		loss_bc = loss_right + loss_left + loss_up
		return loss_bc,loss_right,loss_left,loss_up


	losses = []
	tic = time.time()

	if(batch_flag):
		for epoch in range(epochs):
			if epoch == 250:
				learning_rate = 0.00001
				new_optimizer = optim.Adam(net.parameters(), lr=learning_rate, betas = (0.9,0.99),eps = 10**-15)
				optimizer = new_optimizer
			for batch_idx, (x_in,t_in) in enumerate(dataloader):

				net.zero_grad()
				loss_eqn = Loss_criterion(x_in,t_in)
				loss_bc,loss_right,loss_left,loss_up = Loss_BC(x_left,t_left,V_left,x_up,t_up,V_up,x_right,t_right,V_right)
				loss = loss_eqn + loss_bc
				loss.backward()

				optimizer.step() 
				if batch_idx % 20 ==0:
					print('\nTrain Epoch: {} \tLoss: {:.10f}\tEquation Loss: {:.10f} \t BC Loss {:.6f}\nRight_boundary loss {:.10f}\tLeft_boundary loss {:.10f}\tUp_boundary loss {:.10f}\n'.format(epoch, loss.item(),loss_eqn.item(),loss_bc.item(),loss_right.item(),loss_left.item(),loss_up.item()))

					
			points = torch.cat((xnet,tnet),1)
			U = net(points)
			z = U.detach().numpy()
			actual_loss = np.square(actual_soln - z).mean()
			print('\nAfter Epoch {}, \t Actual solution loss: {:.10f}\n'.format(
				epoch, actual_loss))

			if epoch % 1 == 0:
				plot_graph(z,'Predicted solution')
			#if epoch % 50 == 0:
			#	print('Train Epoch: {} \tLoss: {:.10f}\tEquation Loss: {:.10f} \t BC Loss {:.6f}'.format(epoch, loss.item(),loss_eqn.item(),loss_bc.item()))
			#losses.append(loss.item())
					
	else:
		for epoch in range(epochs):
			if epoch == 2000:
				learning_rate = 0.00001
				new_optimizer = optim.Adam(net.parameters(), lr=learning_rate, betas = (0.9,0.99),eps = 10**-15)
				optimizer = new_optimizer
			net.zero_grad()
			loss_eqn = Loss_criterion(xnet,tnet)
			loss_bc,loss_right,loss_left,loss_up = Loss_BC(x_left,t_left,V_left,x_up,t_up,V_up,x_right,t_right,V_right)
			loss = loss_eqn + loss_bc
			loss.backward()
			
			optimizer.step() 
			points = torch.cat((xnet,tnet),1)
			U = net(points)
			z = U.detach().numpy()

			if epoch % 10 == 0:
				print('\nTrain Epoch: {} \tLoss: {:.10f}\tEquation Loss: {:.10f} \t BC Loss {:.6f}\nRight_boundary loss {:.10f}\tLeft_boundary loss {:.10f}\tUp_boundary loss {:.10f}\n'.format(epoch, loss.item(),loss_eqn.item(),loss_bc.item(),loss_right.item(),loss_left.item(),loss_up.item()))
	
			if epoch % 50 == 0:
				plot_graph(z,'Predicted solution')
			
			losses.append(loss.item())

	toc = time.time()
	elapseTime = toc - tic
	print ("elapse time in parallel = ", elapseTime)

	net_in = torch.cat((xnet,tnet),1)
	output = net(net_in)  #evaluate model
	
	pred = net(torch.cat((x_up, t_up), 1))
	V_up_pred = pred.view(len(pred), -1)

	return output,V_up_pred


In [None]:
basis_num = 5
epochs = 250
learning_rate = 1e-3  

batchsize = 128 
batch_flag = True

In [None]:
output = train(device,x,t,x_bound,t_bound,v_bound,learning_rate,basis_num,epochs,batch_flag,batchsize)

In [None]:
arr = output[1].detach().numpy()
plt.plot(arr)
plt.plot(v_up)

In [None]:
z = output[0].detach().numpy()
plot_graph(z,"Predicted Solution")

plot_graph(actual_soln,"Actual Solution")

In [None]:
plot_graph(np.abs(actual_soln - z),"Predicted Solution")