In [None]:
#!python -m pip install -r requirements.txt
!python -m pip install pydmd~=0.3.3

In [None]:
from collections.abc import Callable
from enum import Enum

from numpy import float64, fromfunction, full, ndarray, vectorize, zeros
from scipy.linalg import solve_banded


class Method(Enum):
    ALTERNATING_DIRECTION = 0,
    FRACTIONAL_STEP = 1


def splitting_method(alpha: ndarray, f_x_y_t: Callable,
        l_upper: ndarray, n_upper: tuple, t_upper: float, k_upper: int,
        alpha1: float, beta1: float, alpha2: float, beta2: float,
        alpha3: float, beta3: float, alpha4: float, beta4: float,
        phi_1_x_t: Callable, phi_2_x_t: Callable,
        phi_3_y_t: Callable, phi_4_y_t: Callable,
        initial: Callable, method: Method, callback: Callable) -> ndarray:
    h, tau = l_upper / n_upper, t_upper / k_upper
    sigma = alpha * tau / (h * h)

    ai, ci = full((2, n_upper[0] + 1),
        sigma[0] / (2.0 if method == Method.ALTERNATING_DIRECTION else 1.0))
    bi = full(n_upper[0] + 1,
        -(1.0 if method == Method.ALTERNATING_DIRECTION else 2.0) * sigma[0] - 1.0)
    aj, cj = full((2, n_upper[1] + 1),
        sigma[1] / (2.0 if method == Method.ALTERNATING_DIRECTION else 1.0))
    bj = full(n_upper[1] + 1,
        -(1.0 if method == Method.ALTERNATING_DIRECTION else 2.0) * sigma[1] - 1.0)
    di, dj = zeros(n_upper[0] + 1), zeros(n_upper[1] + 1)

    ai[0] = ci[-1] = aj[0] = cj[-1] = 0.0
    bi[0] = beta3 - alpha3 / h[0]
    ci[0] = alpha3 / h[0]
    ai[-1] = -alpha4 / h[0]
    bi[-1] = beta4 + alpha4 / h[0]
    bj[0] = beta1 - alpha1 / h[1]
    cj[0] = alpha1 / h[1]
    aj[-1] = -alpha2 / h[1]
    bj[-1] = beta2 + alpha2 / h[1]

    u_temp = zeros((n_upper[0] + 1, n_upper[1] + 1))
    u = __initial_approximation(n_upper, h, initial)
    callback(u)
    for k in range(1, k_upper + 1):
        __step_x(f_x_y_t, n_upper, h, tau, k, sigma,
            alpha1, beta1, alpha2, beta2,
            phi_1_x_t, phi_2_x_t, phi_3_y_t, phi_4_y_t, ai, bi, ci, di,
            method, u, u_temp)
        __step_y(f_x_y_t, n_upper, h, tau, k, sigma,
            alpha3, beta3, alpha4, beta4,
            phi_1_x_t, phi_2_x_t, phi_3_y_t, phi_4_y_t, aj, bj, cj, dj,
            method, u_temp, u)
        callback(u)
    return u


def __initial_approximation(n_upper, h, initial) -> ndarray:
    function = vectorize(lambda i, j: initial(i * h[0], j * h[1]))
    return fromfunction(function, (n_upper[0] + 1, n_upper[1] + 1))


def __step_x(f_x_y_t: Callable,
        n_upper: tuple, h: tuple, tau: float, k: int, sigma: ndarray,
        alpha1: float, beta1: float, alpha2: float, beta2: float,
        phi_1_x_t, phi_2_x_t, phi_3_y_t, phi_4_y_t,
        ai: ndarray, bi: ndarray, ci: ndarray, di: ndarray,
        method: Method, u_k_minus_1: ndarray, u_k_minus_1_divides_2: ndarray):
    t: float = (k - 0.5) * tau
    for j in range(1, n_upper[1]):
        di[0], di[-1] = phi_3_y_t(j * h[1], t), phi_4_y_t(j * h[1], t)
        for i in range(1, n_upper[0]):
            di[i] = (-u_k_minus_1[i, j]
                - tau / 2.0 * f_x_y_t(i * h[0], j * h[1], t))
            if method == Method.ALTERNATING_DIRECTION:
                di[i] += (sigma[1] * u_k_minus_1[i, j]
                    - sigma[1] / 2.0 * u_k_minus_1[i, j - 1]
                    - sigma[1] / 2.0 * u_k_minus_1[i, j + 1])
        u_k_minus_1_divides_2[:, j] = thomas(ai, bi, ci, di)
    for i in range(0, n_upper[0] + 1):
        u_k_minus_1_divides_2[i, 0] = (-alpha1 * u_k_minus_1_divides_2[i, 1]
                + h[1] * phi_1_x_t(i * h[0], t)) / (beta1 * h[1] - alpha1)
        u_k_minus_1_divides_2[i, -1] = (alpha2 * u_k_minus_1_divides_2[i, -2]
                + h[1] * phi_2_x_t(i * h[0], t)) / (beta2 * h[1] + alpha2)


def __step_y(f_x_y_t: Callable,
        n_upper: tuple, h: tuple, tau: float, k: int, sigma: ndarray,
        alpha3: float, beta3: float, alpha4: float, beta4: float,
        phi_1_x_t, phi_2_x_t, phi_3_y_t, phi_4_y_t,
        aj: ndarray, bj: ndarray, cj: ndarray, dj: ndarray,
        method: Method, u_k_minus_1_divides_2: ndarray, u_k: ndarray):
    t: float = k * tau
    for i in range(1, n_upper[0]):
        dj[0], dj[-1] = phi_1_x_t(i * h[0], t), phi_2_x_t(i * h[0], t)
        for j in range(1, n_upper[1]):
            dj[j] = (-u_k_minus_1_divides_2[i, j]
                - tau / 2.0 * f_x_y_t(i * h[0], j * h[1], t))
            if method == Method.ALTERNATING_DIRECTION:
                dj[j] += (sigma[0] * u_k_minus_1_divides_2[i, j]
                    - sigma[0] / 2.0 * u_k_minus_1_divides_2[i - 1, j]
                    - sigma[0] / 2.0 * u_k_minus_1_divides_2[i + 1, j])
        u_k[i, :] = thomas(aj, bj, cj, dj)
    for j in range(0, n_upper[1] + 1):
        u_k[0, j] = (-alpha3 * u_k[1, j]
            + h[0] * phi_3_y_t(j * h[1], t)) / (beta3 * h[0] - alpha3)
        u_k[-1, j] = (alpha4 * u_k[-2, j]
            + h[0] * phi_4_y_t(j * h[1], t)) / (beta4 * h[0] + alpha4)

In [None]:
from typing import Optional
from numpy import array, ndarray, result_type, zeros_like


def thomas(
        a: ndarray, b: ndarray, c: ndarray, d: ndarray,
        out: Optional[ndarray] = None, overwrite_d: bool = False) -> ndarray:
    if not (isinstance(a, ndarray) and isinstance(b, ndarray)
            and isinstance(c, ndarray) and isinstance(d, ndarray)
            and (out is None or isinstance(out, ndarray))
            and isinstance(overwrite_d, bool)):
        raise TypeError()
    if not (a.shape == b.shape == c.shape == d.shape):
        raise ValueError()
    n = len(d)
    dtype = result_type(a.dtype, b.dtype, c.dtype, d.dtype, float64)
    p = d if overwrite_d else zeros_like(d, dtype=dtype)
    x = zeros_like(d, dtype=dtype) if out is None else out

    p[0], x[0] = -c[0] / b[0], d[0] / b[0]
    for i in range(1, n):
        p[i] = -c[i] / (b[i] + a[i] * p[i - 1])
        x[i] = (d[i] - a[i] * x[i - 1]) / (b[i] + a[i] * p[i - 1])

    for i in range(n - 2, -1, -1):
        x[i] += p[i] * x[i + 1]

    return x

In [None]:
from numpy import array, where
L: ndarray = array([10.0, 10.0])

ALPHA1, BETA1 = 0.0, 1.0
ALPHA2, BETA2 = 0.0, 1.0
ALPHA3, BETA3 = 0.0, 1.0
ALPHA4, BETA4 = 0.0, 1.0

N, K, ALPHA, T = (64, 64), 256, array([1.0, 1.0]), 0.125

T: float = 10

s_xy: ndarray = array([[1.25, 2.125], [9.65625, 8.5]])
s_a: ndarray = array([60, 100])

def f_x_y_t(x: float, y: float, t: float) -> float:
    idx = None
    for i in range(len(s_xy)):
        if (s_xy[i] == array([x, y])).all():
            idx = i
    if idx:
        print(array([x, y]), flush=True)
        return s_a[idx]
    return 0


def phi_1_x_t(x: float, t: float) -> float:
    return T


def phi_2_x_t(x: float, t: float) -> float:
    return T


def phi_3_y_t(y: float, t: float) -> float:
    return T


def phi_4_y_t(y: float, t: float) -> float:
    return T

                
def psi_x_y(x: float, y: float) -> float:
    return T

In [None]:
u = []
splitting_method(ALPHA, f_x_y_t,
        L, N, T, K,
        ALPHA1, BETA1, ALPHA2, BETA2,
        ALPHA3, BETA3, ALPHA4, BETA4,
        phi_1_x_t, phi_2_x_t, phi_3_y_t, phi_4_y_t, psi_x_y,
        Method.ALTERNATING_DIRECTION, lambda uk: u.append(uk.copy()))
u = array(u)

In [None]:
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter

figure, axes = plt.subplots(figsize=(16, 9), dpi=300)
vmin, vmax = u.min(), u.max()

def animate(k: int):
    print(F'animate({k:03})', end='\r')
    axes.cla()
    axes.pcolormesh(u[k], cmap=plt.cm.jet, vmin=vmin, vmax=vmax)
    # data = np.abs(dmd.reconstructed_data.T.reshape(u.shape)[k])
    # data = np.abs(dmd.modes).T[2].reshape(u[0].shape)
    # axes.pcolormesh(data, cmap=plt.cm.jet, vmin=data.min(), vmax=data.max())
    axes.set_title(F"t = {T / len(u) * k}")
    axes.set_xlabel('x')
    axes.set_ylabel('y')
    return axes
animate(70)

import sys
import numpy
numpy.set_printoptions(threshold=sys.maxsize)
# u[20]

In [None]:
from pydmd import DMD
dmd = DMD(svd_rank=3, exact=False).fit(u.reshape(len(u), -1).T)

In [None]:
interval = 1000.0 * T / len(u)
animation = FuncAnimation(figure, animate, interval=interval, frames=len(u), repeat=False)
animation.save("animation.gif", writer='pillow')

In [None]:
from numpy import abs

In [None]:
from numpy import abs
u_prime = abs(dmd.reconstructed_data).T.reshape(u.shape)
delta = abs(u_prime - u)
dir(dmd)

In [None]:
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation, PillowWriter

figure, axes = plt.subplots(figsize=(16, 9), dpi=300)
vmin, vmax = u.min(), u.max()

def animate(k: int):
    print(F'animate({k:03})', end='\r')
    axes.cla()
    axes.pcolormesh(u_prime[k], cmap=plt.cm.jet, vmin=vmin, vmax=vmax)
    # data = np.abs(dmd.reconstructed_data.T.reshape(u.shape)[k])
    # data = np.abs(dmd.modes).T[2].reshape(u[0].shape)
    # axes.pcolormesh(data, cmap=plt.cm.jet, vmin=data.min(), vmax=data.max())
    axes.set_title(F"t = {T / len(u) * k}")
    axes.set_xlabel('x')
    axes.set_ylabel('y')
    return axes
animate(37)

In [None]:
interval = 1000.0 * T / len(u)
animation = FuncAnimation(figure, animate, interval=interval, frames=len(u), repeat=False)
animation.save("animation.gif", writer='pillow')

In [None]:
dmd.amplitudes