In [None]:
!pip install deepxde

In [None]:
import os
os.environ['DDE_BACKEND'] = 'pytorch'

In [None]:
import deepxde as dde
from deepxde.backend import torch
from pathlib import Path
import os
import numpy as np
import pandas as pd
from skimage import metrics
import matplotlib.pyplot as plt
import types
import gc
import cv2

In [None]:
dde.config.set_random_seed(0)
dde.config.real.set_float64()
l = 1e-6
eps = 1e-8

In [None]:
def prepare_experiment(out_path: Path) -> Path:
    out_path.mkdir(parents=True, exist_ok=True)
    dirs = [d for d in out_path.iterdir() if d.is_dir() and d.name.startswith('exp_')]
    experiment_id = max(int(d.name.split('_')[1]) for d in dirs) + 1 if dirs else 1
    exp_path = out_path / f'exp_{experiment_id}'
    exp_path.mkdir()
    return exp_path

In [None]:
gt_img = cv2.imread('/kaggle/input/test-images/yacht.bmp', 0) / 255 

height = 200
width = 200
top = 200
left = 100
gt_img = gt_img[top : top + height, left: left + width]

mean = 0
var = 0.0025
sigma = var ** 0.5
gaussian = np.random.normal(mean, sigma, gt_img.shape)
img = gt_img + gaussian
img = np.clip(img, a_min=0, a_max=1)

plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.imshow(gt_img, cmap='gray')
plt.subplot(1, 2, 2)
plt.imshow(img, cmap='gray')

gt_img = torch.from_numpy(gt_img)
img = torch.from_numpy(img)
if torch.cuda.is_available():
    img = img.to('cuda', copy=True)
    gt_img = gt_img.to('cuda', copy=True)

In [None]:
def pde(x, u): 
    norm = torch.tensor([height, width], device='cuda')
    ind = x * norm

    if torch.cuda.is_available():
        ind = ind.to('cuda')

    u_0 = img[ind[:, 0].long(), ind[:, 1].long()]
    u_0 = torch.reshape(u_0, (-1, 1))

    res = (u - u_0).abs()
    du_xx = dde.grad.hessian(u, x, i=0, j=0)
    du_xy = dde.grad.hessian(u, x, i=1, j=0)
    du_yy = dde.grad.hessian(u, x, i=1, j=1)
    du_yx = dde.grad.hessian(u, x, i=0, j=1)
    
    D2_u = torch.pow(du_xx ** 2 + 2 * du_xy ** 2  + du_yy ** 2 + eps, 0.5)
    
    du_xxxx = dde.grad.hessian(du_xx / D2_u, x, i=0, j=0)
    du_xyyx = dde.grad.hessian(du_xy / D2_u, x, i=0, j=1)
    du_yyyy = dde.grad.hessian(du_yy / D2_u, x, i=1, j=1)
    du_yxxy = dde.grad.hessian(du_xy / D2_u, x, i=1, j=0)
    
    res += (du_xxxx.abs() + du_xyyx.abs() + du_yxxy.abs() + du_yyyy.abs()) * l
    
    return res

In [None]:
import types
normalizers = np.array([height, width])

ix = np.indices(img.shape).reshape(2, -1).T / normalizers[np.newaxis, :]

# Всё нормировано к [0,1]x[0,1] 
boundary_mask = (np.isclose(ix, 0) | np.isclose(ix, 1)).any(axis=1)
boundary_ix = ix[boundary_mask]
boundary_normals = np.zeros_like(boundary_ix)
boundary_normals[np.isclose(boundary_ix[:, 0], 0), 0] = -1
boundary_normals[np.isclose(boundary_ix[:, 0], 1), 0] = 1
boundary_normals[np.isclose(boundary_ix[:, 1], 0), 1] = -1
boundary_normals[np.isclose(boundary_ix[:, 1], 1), 1] = 1
boundary_normals = boundary_normals / np.linalg.norm(boundary_normals, axis=1)[:, np.newaxis]

def boundary_normal(self, x):
    # Вычисление нормали в точке (для PointCloud)
    res = np.zeros_like(x)
    mask = self.on_boundary(x)
    res[np.isclose(x[mask, 0], 0), 0] = -1
    res[np.isclose(x[mask, 0], 1), 0] = 1
    res[np.isclose(x[mask, 1], 0), 1] = -1
    res[np.isclose(x[mask, 1], 1), 1] = 1
    res[mask] = res[mask] / np.linalg.norm(res[mask], axis=1)[:, np.newaxis]
    return res

geom = dde.geometry.PointCloud(points=ix, boundary_points=boundary_ix, boundary_normals=boundary_normals)
geom.boundary_normal = types.MethodType(boundary_normal, geom)

num_dense_nodes = 50
num_dense_layers = 10
layer_size = [2] + [num_dense_nodes] * num_dense_layers + [1] 
net = dde.nn.FNN(layer_size, activation='silu', kernel_initializer='Glorot uniform')
net.to('cuda')

net.num_trainable_parameters()

In [None]:
def boundary_b(x, on_boundary):
    #bottom
    return on_boundary and np.isclose(x[0], 1) 

def boundary_t(x, on_boundary):
    # top
    return on_boundary and np.isclose(x[0], 0)

def boundary_l(x, on_boundary):
    # left
    return on_boundary and np.isclose(x[1], 0) 

def boundary_r(x, on_boundary):
    # right
    return on_boundary and np.isclose(x[1], 1)

def bc_func2_lr(inputs, outputs, X):
    du_xx = dde.grad.hessian(outputs, inputs, i=0, j=0)
    du_yx = dde.grad.hessian(outputs, inputs, i=0, j=1)
    return du_xx.abs() + du_yx.abs()
  
def bc_func3_lr(inputs, outputs, X):
    du_xx = dde.grad.hessian(outputs, inputs, i=0, j=0)
    du_xy = dde.grad.hessian(outputs, inputs, i=1, j=0)
    du_yx = dde.grad.hessian(outputs, inputs, i=0, j=1)
    du_yy = dde.grad.hessian(outputs, inputs, i=1, j=1)
    D2_u = torch.pow(du_xx * du_xx + du_xy * du_xy + du_yx * du_yx + du_yy * du_yy + eps, 0.5)
    du_xxx = dde.grad.jacobian(du_xx / D2_u, inputs, i=0, j=0)
    du_xyy = dde.grad.jacobian(du_xy / D2_u, inputs, i=0, j=1)
    return du_xxx.abs() + du_xyy.abs()

def bc_func2_bt(inputs, outputs, X):
    du_xy = dde.grad.hessian(outputs, inputs, i=1, j=0)
    du_yy = dde.grad.hessian(outputs, inputs, i=1, j=1)
    return du_xy.abs() + du_yy.abs()

def bc_func3_bt(inputs, outputs, X):
    du_xx = dde.grad.hessian(outputs, inputs, i=0, j=0)
    du_xy = dde.grad.hessian(outputs, inputs, i=1, j=0)
    du_yy = dde.grad.hessian(outputs, inputs, i=1, j=1)
    du_yx = dde.grad.hessian(outputs, inputs, i=0, j=1)
    D2_u = torch.pow(du_xx * du_xx + du_xy * du_xy + du_yx * du_yx + du_yy * du_yy + eps, 0.5)
    du_yyy = dde.grad.jacobian(du_yy / D2_u, inputs, i=0, j=1)
    du_yxx = dde.grad.jacobian(du_yx / D2_u, inputs, i=0, j=0)
    return du_yyy.abs() + du_yxx.abs()

bc1 = dde.icbc.OperatorBC(geom, bc_func2_lr, boundary_l)
bc2 = dde.icbc.OperatorBC(geom, bc_func3_lr, boundary_l)
bc3 = dde.icbc.OperatorBC(geom, bc_func2_bt, boundary_b)
bc4 = dde.icbc.OperatorBC(geom, bc_func3_bt, boundary_b)
bc5 = dde.icbc.OperatorBC(geom, bc_func2_lr, boundary_r)
bc6 = dde.icbc.OperatorBC(geom, bc_func3_lr, boundary_r)
bc7 = dde.icbc.OperatorBC(geom, bc_func2_bt, boundary_t)
bc8 = dde.icbc.OperatorBC(geom, bc_func3_bt, boundary_t)

bc = [bc1, bc2, bc3, bc4, bc5, bc6, bc7, bc8]

In [None]:
from tqdm.notebook import trange, tqdm
def _train_sgd(self, iterations, display_every):
    # Модификация метода модели для добавления прогресс-бара из tqdm
    t = trange(iterations)
    for i in t:           # +: tqdm
        self.callbacks.on_epoch_begin()
        self.callbacks.on_batch_begin()

        self.train_state.set_data_train(
            *self.data.train_next_batch(self.batch_size)
        )
        self._train_step(
            self.train_state.X_train,
            self.train_state.y_train,
            self.train_state.train_aux_vars,
        )

        self.train_state.epoch += 1
        self.train_state.step += 1
        if self.train_state.step % display_every == 0 or i + 1 == iterations:
            self._test()

        self.callbacks.on_batch_end()
        self.callbacks.on_epoch_end()
        if self.train_state.epoch % 5 == 0:
            t.set_postfix(loss=self._outputs_losses(True, 
                                                    self.train_state.X_train, 
                                                    self.train_state.y_train, 
                                                    self.train_state.train_aux_vars)[1], refresh=True)     # +
        if self.stop_training:
            break

class ReduceLROnPlateau(dde.callbacks.EarlyStopping):
    # Callback для отслеживания лосса и уменьшения темпа обучения
    def __init__(self, model, min_delta=0, patience=0, factor=0.2, baseline=None, monitor="loss_train"):
        super().__init__(min_delta, patience, baseline, monitor)

        self.model = model
        self.factor = factor

    def on_epoch_end(self):
        current = self.get_monitor_value()
        if self.monitor_op(current - self.min_delta, self.best):
            self.best = current
            self.wait = 0
        else:
            self.wait += 1
            if self.wait >= self.patience:
                print('Reduce LR on plateau triggered.')
                self.wait = 0
                for g in self.model.opt.param_groups:
                    g['lr'] = g['lr'] * self.factor
                    
    def on_train_end(self):
        pass


In [None]:
num_domain = 6000
num_boundary = 0
lr = 0.002

data = dde.data.PDE(
    geom,
    pde,
    [],
    num_domain=num_domain,  # внутренние точки
    num_boundary=num_boundary,  # граничные точки
    num_test=1000,
    train_distribution='pseudo'
)

model = dde.Model(data, net)
model._train_sgd = types.MethodType(_train_sgd, model)
loss_weights = [1]
model.compile("adam", lr=lr, loss_weights=loss_weights)
output_path = prepare_experiment(Path('/kaggle/working'))

In [None]:
losshistory, train_state = model.train(iterations=10000,
                                       display_every=500,
                                       callbacks=[
#                                                   dde.callbacks.EarlyStopping(min_delta=0, patience=3000, baseline=None, monitor='loss_train'),
                                                  dde.callbacks.ModelCheckpoint(str(output_path) + "/model", verbose=1, save_better_only= True, period=1000),
                                                  dde.callbacks.PDEPointResampler(period=100, pde_points=True, bc_points=False)
                                                  ])
dde.saveplot(losshistory, train_state, issave=True, isplot=True, output_dir=output_path)

X = np.indices(img.shape).reshape(2, -1).T / normalizers[np.newaxis, :]
u_pred = model.predict(X) * 255
u_pred = u_pred.reshape(img.shape)
plt.imshow(u_pred, cmap='gray')
cv2.imwrite(str(output_path) + '/img_predicted_' + str(model.train_state.step) + '.png', u_pred)

In [None]:
col = ['l', 'img size', 'var', 'num domain', 'num boundary', 'learning rate', 'iterations', 'best_loss_train']
data = [l, (height, width), var, num_domain, num_boundary, lr, model.train_state.step, model.train_state.best_loss_train]
df_pinn = pd.DataFrame([data], columns=col, index=[0])
df_pinn.to_csv (str(output_path) + '/df_pinn.csv')
cv2.imwrite(str(output_path) + '/img_predicted.png', u_pred)
df_pinn

In [None]:
%cd /kaggle/working/
!zip -r "exp_1.zip" "exp_1"
from IPython.display import FileLink
FileLink(r'exp_1.zip')

In [None]:
# !rm -r /kaggle/working/exp_3

In [None]:
# del model
# gc.collect()
# torch.cuda.empty_cache()