In [None]:
# default_exp pde

In [None]:
#hide
%load_ext autoreload
%autoreload 2
import numpy as np
from nangs.bocos import *

# PDE

> This module contains the PDE class with the basic functionality to solve PDEs with NNs.

In [None]:
#export

from nangs.utils import *
from nangs.solution import *
import torch
from torch.utils.data import Dataset, DataLoader
from fastprogress import master_bar, progress_bar
import numpy as np

class PDEDataset(Dataset):
    "Receives a dict of arrays and returns every possible combination of the elements in the arrays"
    def __init__(self, inputs):
        # convert dict to array
        self.inputs = np.array([inputs[k] for k in inputs])
        # length of the dataset (all possible combinations)
        self.len = 1
        for input in self.inputs:
            self.len *= len(input)
        # modules
        self.mods = []
        for i, _ in enumerate(self.inputs):
            mod = 1
            for j, input in enumerate(self.inputs):
                if j < i:
                    mod *= len(input)
            self.mods.append(mod)  
        
    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        return torch.FloatTensor([inp[(idx // self.mods[i]) % len(inp)] for i, inp in enumerate(self.inputs)])

class PDE:
    "PDE class with basic functionality to solve PDEs with NNs"
    def __init__(self, inputs, outputs, params=None):
        
        # check lists of unique strings, non-repeated
        checkIsListOfStr(inputs)
        checkUnique(inputs)
        checkIsListOfStr(outputs)
        checkUnique(outputs)
        checkNoRepeated(inputs, outputs)
        if params:
            checkIsListOfStr(params)
            checkUnique(params)
            checkNoRepeated(inputs, params)
            checkNoRepeated(params, outputs)
        
        # save keys
        self.input_keys = inputs
        self.output_keys = outputs
        self.param_keys = params            
        
        # initialize values
        self.train_inputs = {k: [] for k in self.input_keys}
        self.test_inputs = {k: [] for k in self.input_keys}
        self.outputs = {k: [] for k in self.output_keys}
        self.params = {}
        if self.param_keys:
            self.params = {k: [] for k in self.param_keys}
        
        # bocos
        self.bocos = []
        
        self.init = False

    def summary(self):
        "Print a summary of the PDE inputs, outputs, params and bocos."
        print('inputs (train): ', self.train_inputs)
        print('inputs (test): ', self.test_inputs)
        print('outputs: ', self.outputs)
        if self.params:
            print('params: ', self.params)
        print('bocos: ', [boco.type for boco in self.bocos])
        print('')
        
    def setValues(self, values, train=True):
        "Set values for inputs and params"
        checkValidDict(values)
        for key in values:
            value = values[key]
            if key in self.input_keys: 
                if train: 
                    self.train_inputs[key] = value
                else: 
                    self.test_inputs[key] = value
            elif key in self.param_keys: 
                if train:    
                    self.params[key] = value
                else: 
                    raise Exception('You cannot set params in test data !')
            elif key in self.output_keys:
                raise Exception('You cannot set values to outputs !')
            else:
                raise Exception('Key '+ key +' not found !')
                
    def addBoco(self, boco):
        "Add a boco to the list of bocos"
        boco.addBoco(self.input_keys, self.output_keys)
        self.bocos += [boco]
        
    def bocoSummary(self):
        "Print summary of each boco"
        for boco in self.bocos: 
            boco.summary(self.input_keys, self.output_keys, self.param_keys) 
            
    def buildSolution(self, topo):
        "Build an MLP to be the solution to the PDE"
        n_inputs, n_outputs = len(self.input_keys), len(self.output_keys)
        self.solution = Solution(n_inputs, n_outputs, topo['layers'], topo['neurons'], topo['activations'])

    def compile(self, lr=0.001, epochs=100, batch_size=32):
        "Set the required parameters for training"
        self.optimizer = torch.optim.Adam(self.solution.parameters(), lr=lr)
        self.epochs = epochs
        self.bs = batch_size
        
    def solve(self, device, path):
        "Find a solution to the PDE"
        # initialize dataladers
        if not self.init: 
            self.initialize()
        # convert params to tensors
        params = {k: torch.FloatTensor(self.params[k]).to(device) for k in self.params}
        # train loop
        self.solution.to(device)
        best_loss = 1e8
        hist = {'train_loss': [], 'val_loss': [], 'bocos': {boco.name: [] for boco in self.bocos}}
        mb = master_bar(range(1, self.epochs+1))
        for epoch in mb:
            #train
            self.solution.train()
            pde_loss = []
            bocos_loss = {boco.name: [] for boco in self.bocos}
            for inputs in progress_bar(self.dataloader['train'], parent=mb):
                # compute pde solution
                inputs = inputs.to(device)
                inputs.requires_grad = True
                #print(inputs.require_grad)
                outputs = self.solution(inputs)
                # compute gradients of outputs w.r.t. inputs
                grads, _inputs = self.computeGrads(inputs, outputs)
                # compute loss
                loss = self.computePDELoss(grads, _inputs, params).pow(2).mean()
                pde_loss.append(loss.item())
                # compute bocos loss
                # compute loss for all boco points at once (1 batch each b.c, optimize all together)
                for boco in self.bocos:
                    boco_loss = boco.computeLoss(self.solution, device)
                    bocos_loss[boco.name].append(boco_loss.item())
                    loss += boco_loss
                # update weights
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
                mb.child.comment = f'train loss {np.mean(pde_loss):.5f}'
            
            #evaluate
            self.solution.eval()
            val_loss = []
            for inputs in progress_bar(self.dataloader['val'], parent=mb):
                # compute pde solution
                inputs = inputs.to(device)
                inputs.requires_grad = True
                outputs = self.solution(inputs)
                # compute gradients of outputs w.r.t. inputs
                grads, _inputs = self.computeGrads(inputs, outputs)
                # compute loss
                loss = self.computePDELoss(grads, _inputs, params).pow(2).mean()
                val_loss.append(loss.item())
                mb.child.comment = f'val loss {np.mean(val_loss):.5f}'

            pde_total_loss = np.mean(pde_loss)
            val_total_loss = np.mean(val_loss)
            bocos_total_loss = 0
            for boco in self.bocos:
                bocos_loss[boco.name] = np.mean(bocos_loss[boco.name])
                hist['bocos'][boco.name].append(bocos_loss[boco.name])
                bocos_total_loss += bocos_loss[boco.name]
            total_loss = pde_total_loss + bocos_total_loss
            
            if total_loss < best_loss:
                best_loss = total_loss
                torch.save(self.solution.state_dict(), path)
            
            hist['train_loss'].append(total_loss)
            hist['val_loss'].append(val_total_loss)
            
            info = f'Epoch {epoch}/{self.epochs} Losses {total_loss:.5f} \n PDE  {pde_total_loss:.5f}'
            for boco in self.bocos:
                info += f'\n {boco.name} {bocos_loss[boco.name]:.5f}'
            info += f'\n Val {val_total_loss:.5f} '
            mb.write(info)          
            #mb.first_bar.comment = f'best acc {best_acc:.5f} at epoch {best_e}'
            
        return hist
                
    def initialize(self):
        self.dataset = {
            'train': PDEDataset(self.train_inputs),
            'val': PDEDataset(self.test_inputs)
        } 
        self.dataloader = {
            'train': DataLoader(self.dataset['train'], batch_size=self.bs, shuffle=True, num_workers=4),
            'val': DataLoader(self.dataset['val'], batch_size=self.bs, shuffle=False, num_workers=4)
        }
        for boco in self.bocos:
            boco.initialize()
        self.init = True
                        
    def computeGrads(self, inputs, outputs):
        # compute gradients
        _grads, = torch.autograd.grad(outputs, inputs, 
                    grad_outputs=outputs.data.new(outputs.shape).fill_(1),
                    create_graph=True, only_inputs=True)
        # assign keys to gradients
        grads = {output: {
            inp: _grads[:,j] for j, inp in enumerate(self.input_keys)
        } for output in self.output_keys}
        # assign keys to inputs
        _inputs = {inp: inputs[:,i] for i, inp in enumerate(self.input_keys)}
        return grads, _inputs
                        
    def computePDELoss(self, inputs, outputs, params=None):
        print('This function has to be overloaded by a child class!')
        
    def load_state_dict(self, path):
        self.solution.load_state_dict(torch.load(path))
        
    def eval(self, inputs, device):
        "Evaluate solution"
        checkValidDict(inputs)
        checkDictArray(inputs, self.input_keys)
        # set values of inpenedent vars 
        for key in self.input_keys: 
            if key in inputs: 
                self.test_inputs[key] = inputs[key] 
            else: 
                raise Exception(key + ' is not an input')
        # build dataset
        dataset = PDEDataset(self.test_inputs)
        outputs = []
        self.solution.eval()
        for i in range(len(dataset)):
            input = dataset[i].to(device)
            outputs.append(self.solution(input).cpu().detach().numpy())
        return np.array(outputs)

## Instantiate a PDE

In [None]:
pde = PDE(inputs=['x', 't'], outputs=['p'], params=['u'])

*inputs*, *outputs* and *params* must be lists of strings with non-repeated elements or you will get an error.

In [None]:
try:
    pde = PDE(inputs=['x', 't'], outputs=['x'], params=['u'])
except Exception as e:
    assert str(e) == "Repeated item x", "assertion failed"

In [None]:
#hide

pde = PDE(['a'], ['b'])
pde = PDE(inputs=['a'], outputs=['b'])
pde = PDE(inputs=['a', 'b', 'c'], outputs=['d'])
pde = PDE(inputs=['a'], outputs=['b', 'c', 'd'])
pde = PDE(inputs=['a'], outputs=['b'], params=['c'])
pde = PDE(inputs=['a', 'b'], outputs=['c'], params=['d', 'e', 'f'])

try:
    pde = PDE(inputs=['a'])
except Exception as e:
    assert str(e) == "__init__() missing 1 required positional argument: 'outputs'", "assertion failed"
    
try:
    pde = PDE(outputs=['a'])
except Exception as e:
    assert str(e) == "__init__() missing 1 required positional argument: 'inputs'", "assertion failed"
    
try:
    pde = PDE(inputs=['a'], outputs=42)
except Exception as e:
    assert str(e) == "42 must be a list of strings", "assertion failed"
    
try:
    pde = PDE(inputs=None, outputs=42)
except Exception as e:
    assert str(e) == "None must be a list of strings", "assertion failed"
    
try:
    pde = PDE(inputs=['a', 42], outputs=['b'])
except Exception as e:
    assert str(e) == "42 must be a string", "assertion failed"
    
try:
    pde = PDE(inputs=['a', 'b'], outputs=['b'])
except Exception as e:
    assert str(e) == "Repeated item b", "assertion failed"
    
try:
    pde = PDE(inputs=['a', 'b'], outputs=['c'], params=['a'])
except Exception as e:
    assert str(e) == "Repeated item a", "assertion failed"

## Print a summary

Get a summary of the PDE inputs, outputs, params and bocos.

In [None]:
pde = PDE(inputs=['x', 't'], outputs=['p'], params=['u'])

pde.summary()

inputs (train):  {'x': [], 't': []}
inputs (test):  {'x': [], 't': []}
outputs:  {'p': []}
params:  {'u': []}
bocos:  []



## Setting values

To solve a PDE you must set some input values, and optionally free-parameters. You cannot set output values (this will be given by the neural network).

In [None]:
pde = PDE(inputs=['a', 'b'], outputs=['c'], params=['d'])

a = np.array([0, 0.5, 1])
b = np.array([0, 0.5, 1])
d = np.array([1.0])
pde.setValues({'a': a, 'b': b, 'd': d})

try:
    pde.setValues({'a': a, 'b': b, 'c': d})
except Exception as e:
    assert str(e) == "You cannot set values to outputs !", "assertion failed"
    
try:
    pde.setValues({'a': a, 'b': b, 'e': d})
except Exception as e:
    assert str(e) == "Key e not found !", "assertion failed"

By default, values are set for training but you can specify values for testing (in this case only for inputs).

In [None]:
pde = PDE(inputs=['a', 'b'], outputs=['c'], params=['d'])

a = np.array([0, 0.5, 1])
b = np.array([0, 0.5, 1])
pde.setValues({'a': a, 'b': b}, train=False)

try:
    pde.setValues({'a': a, 'b': b, 'd': d}, train=False)
except Exception as e:
    assert str(e) == "You cannot set params in test data !", "assertion failed"

In [None]:
#hide

pde = PDE(inputs=['a', 'b'], outputs=['c'], params=['d'])

a = np.array([0, 0.5, 1])
b = np.array([0, 0.5, 1])
d = np.array([1.0])

pde.setValues({'a': a})
pde.setValues({'a': a, 'b': b, 'd': d})


try:
    pde.setValues({'a': a, 'b': b, 'c': d})
except Exception as e:
    assert str(e) == "You cannot set values to outputs !", "assertion failed"
    
try:
    pde.setValues({'a': a, 'b': b, 'e': d})
except Exception as e:
    assert str(e) == "Key e not found !", "assertion failed"
    
pde.setValues({'a': a, 'b': b}, train=False)

try:
    pde.setValues({'a': a, 'b': b, 'd': d}, train=False)
except Exception as e:
    assert str(e) == "You cannot set params in test data !", "assertion failed"
    
try:
    pde.setValues({'a': a, 'b': b, 'c': d}, train=False)
except Exception as e:
    assert str(e) == "You cannot set values to outputs !", "assertion failed"
    
try:
    pde.setValues({'a': a, 'b': b, 'e': d}, train=False)
except Exception as e:
    assert str(e) == "Key e not found !", "assertion failed"

## Adding Boundary Conditions

To add a boundary condition to the system, first define one and then add it with the *addBoco* method.

In [None]:
pde = PDE(inputs=['a', 'b'], outputs=['c'])

a1, a2 = np.array([0]), np.array([1])
b = np.array([1, 2, 3])

boco = PeriodicBoco('boco_name', {'a': a1, 'b': b}, {'a': a2, 'b': b})
pde.addBoco(boco)

pde.bocoSummary()

boco_name summary:
Type: periodic
Input 1:  {'a': 'a', 'b': 'b'}
Input 2:  {'a': 'a', 'b': 'b'}



Learn more about Boundary Conditions at `bocos`.

## Building the solution

In order to solve the PDE we need to build the solution. You can build a Multilayer Perceptron (MLP) to approximate the solution to the PDE as follows

In [None]:
pde = PDE(inputs=['a', 'b'], outputs=['c'])

# add values and bocos

mlp = {'layers': 3, 'neurons': 100, 'activations': 'relu'}
pde.buildSolution(mlp)

You should pass a dictionary with the values for *layers, *neurons* and *activations* to define the MLP. Once the solution is defined we can set the training parameters with the `compile` method.

In [None]:
pde.compile(lr=0.01, epochs=20, batch_size=50)

## Solving the PDE

Once the PDE is defined and compiled, we can solve it with the `solve` method. During the training the function `computePDELoss` will be called and it is expected to return the correct loss function in order to solve the PDE. To that end, a custom PDE child class has to be defined to overload this particular function.

In [None]:
# define custom PDE
class MyPDE(PDE):
    "Custom PDE to solve: dp/da + dp/db = 0"
    def __init__(self, inputs, outputs, params=None):
        super().__init__(inputs, outputs, params)
    def computePDELoss(self, grads, inputs, params): 
        dpda, dpda = grads['p']['a'], grads['p']['b']
        return dpda + dpda

# instanciate your new custo PDE class
pde = MyPDE(inputs=['a', 'b'], outputs=['p'])

# set values
a = np.array([1, 2, 3])
b = np.array([4, 5, 6])
pde.setValues({'a': a, 'b': b})

# find solution
mlp = {'layers': 3, 'neurons': 10, 'activations': 'relu'}
pde.buildSolution(mlp)
pde.compile(lr=0.01, epochs=3, batch_size=2)
pde.solve(device="cpu", path='best_solution.pth')

During training, the dataset class returns every possible combination of inputs, while the dataloader returns these in batches.

In [None]:
for i in pde.dataloader['train']:
    print(i)

tensor([[1., 4.],
        [3., 5.]])
tensor([[2., 5.],
        [3., 4.]])
tensor([[2., 6.],
        [1., 5.]])
tensor([[3., 6.],
        [2., 4.]])
tensor([[1., 6.]])
