In [None]:
"""総合演習2 マクロ系シミュレーション 第3回 例題3

Step 0: 1次元非定常拡散方程式のシミュレーション(有限差分法)を行い, 模擬的に測定データをつくる
"""

import sys
from typing import Final, Self
import random

import matplotlib.pyplot as plt
import numpy as np
from IPython.display import HTML
from matplotlib import animation

# ========== パラメータ ==========
LX: Final[float] = 10              # 部屋のx方向の長さ(m)
D: Final[float] = 10**(-2)         # 二酸化炭素の拡散係数(m^2/s)
C_OUT: Final[float] = 400          # 外気の二酸化炭素濃度(ppm)

NX: Final[int] = 31                # x方向の格子点数
DT: Final[float] = 3               # 時間刻み幅(s)
T_END: Final[float] = 1000         # シミュレーション時間(s)
DT_PLOT: Final[float] = T_END/50   # グラフを作成する時間間隔(s)

OBS_POINT: Final[list[int]] = [
    int((NX-1)/4), int((NX-1)/2), int(3*(NX-1)/4)
]                                  # 測定点の格子点番号
DT_OBS: Final[float] = 5*DT        # 測定する時間間隔(s)
OBS_NOISE: Final[float] = 200      # 測定データのノイズの大きさ(ppm)
# ==============================

# float型のNumPy配列の型エイリアス
FloatArray = np.typing.NDArray[np.float64]


def func_src(x: float, lx: float) -> float:
    """xにおける二酸化炭素の生成項を計算する

    Parameters
    ----------
    x : float
        x座標
    lx : float
        部屋のx方向の長さ

    Returns
    -------
    float
        xにおける二酸化炭素の生成項
    """

    return 1


class Variable:
    """格子点上の物理量を扱うクラス

    Attributes
    ----------
    value : FloatArray
        格子点上の物理量の値
    __name : str
        'grid_x', 'co2', 'src', 'lambda' のいずれか

    Notes
    -----
    インスタンスを生成する前に, set_class_variablesクラスメソッドを実行する必要あり
    Variableクラスの外で使用しない属性は, 変数名の最初にダンダー(__)をつけて隠蔽している
    """

    __lx: float
    __nx: int
    __dx: float

    # set_class_variablesクラスメソッドを実行したかどうかを判定する真偽値
    __flag: bool = False

    @classmethod
    def set_class_variables(cls, lx: float, nx: int) -> None:
        """Variableクラスのクラス変数を設定する

        Parameters
        ----------
        lx : float
            部屋のx方向の長さ
        nx : int
            x方向の格子点数
        """

        cls.__lx = lx
        cls.__nx = nx
        cls.__dx = lx / (nx-1)
        cls.__flag = True

    @classmethod
    def check_stable(cls, dt: float, d: float):
        """数値的安定性をチェックする
        
        数値的安定性の条件:
            時間刻み幅 < 拡散によって情報が格子点間を伝わるのにかかる時間

        Parameters
        ----------
        dt : float
            時間刻み幅
        d : float
            二酸化炭素の拡散係数
        """

        if not Variable.__flag:
            print('[ERROR] set_class_variablesクラスメソッドが実行されていません')
            sys.exit()

        dx: float = Variable.__dx
        dt_stable: float = (dx**2)/(2*d)
        if dt > dt_stable:
            print(f'[ERROR] 数値的安定性の条件(DT < {dt_stable})を満たしていません')
            sys.exit()

    def __init__(self, name: str) -> None:
        """Variableクラスのイニシャライザ

        Parameters
        ----------
        name : str
            'grid_x', 'co2', 'src', 'lambda' のいずれか
        """

        if not Variable.__flag:
            print('[ERROR] set_class_variablesクラスメソッドが実行されていません')
            sys.exit()

        if name not in ['grid_x', 'co2', 'src', 'lambda']:
            print('[ERROR] Variableクラスのイニシャライザの引数が不適切です')
            sys.exit()
        self.__name: str = name

        nx: int = Variable.__nx
        self.value: FloatArray = np.empty(nx, dtype=np.float64)

        lx: float = Variable.__lx
        dx: float = Variable.__dx
        x: float
        for ix in range(nx):
            x = dx * ix
            self.value[ix] = self.set_initial_condition(x, lx)


    def set_initial_condition(self, x: float, lx: float) -> float:
        """xにおける(初期条件の)値を設定する

        Parameters
        ----------
        x : float
            x座標
        lx : float
            部屋のx方向の長さ

        Returns
        -------
        float
            xにおける(初期条件の)値
        """

        if self.__name == 'grid_x':
            return x
        elif self.__name == 'co2':
            return 5000 * np.sin(np.pi * x / lx) + 400
        elif self.__name == 'src':
            return func_src(x, lx)
        elif self.__name == 'lambda':
            return 0

    def set_boundary_condition(self, d: float, dt: float, src: Self | None = None,  c_out: float | None = None) -> None:
        """境界条件を設定する

        Parameters
        ----------
        d : float
            二酸化炭素の拡散係数
        dt : float
            時間刻み幅
        src : Self | None, optional, default None
            Variableクラスのインスタンス(src)
        c_out : float | None, optional, default None
            外気の二酸化炭素濃度
        """

        dx: float = Variable.__dx

        if self.__name == 'co2':

            src_name: str = ''
            if isinstance(src, Variable):
                src_name = src.__name
            if (src_name != 'src'):
                print('[ERROR] co2.set_boundary_conditionメソッドの引数が不適切です')
                sys.exit()
            if c_out is None:
                print('[ERROR] co2.set_boundary_conditionメソッドの引数が不適切です')
                sys.exit()

            # 左端
            self.value[0] = c_out

            # 右端
            self.value[-1] += (
                2 * d * (self.value[-2] - self.value[-1]) / (dx**2) + src.value[-1]
            ) * dt

        if self.__name == 'lambda':

            # 左端
            self.value[0] = 0

            # 右端
            self.value[-1] += (
                2 * d * (self.value[-2] - self.value[-1]) / (dx**2)
            ) * dt

    def laplacian(self) -> FloatArray:
        """ラプラシアンを計算する

        Returns
        -------
        laplacian : FloatArray
            ラプラシアン
        """

        nx: int = Variable.__nx
        laplacian: FloatArray = np.zeros(nx, dtype=np.float64)

        dx: float = Variable.__dx
        for ix in range(1, nx-1):
            laplacian[ix] = (
                (self.value[ix+1] - 2 * self.value[ix] + self.value[ix-1]) / (dx**2)
            )
        return laplacian


def create_random_array(nx: int, obs_noise: float) -> FloatArray:
    """測定ノイズを模擬した配列を作成する

    Parameters
    ----------
    nx : int
        格子点数
    obs_noise : float
        ノイズの大きさ

    Returns
    -------
    FloatArray
        測定ノイズを模擬した配列
    """

    return np.array([random.uniform(-obs_noise, obs_noise) for _ in range(nx)], dtype=np.float64)


def create_plot(frame: int, *fargs) -> None:
    """各タイムステップでのグラフを作成する
    
    animation.FuncAnimationに使用する関数
    
    Parameters
    ----------
    frame : int
        フレーム番号
    fargs : tuple
        格子点のx座標, 時刻, co2の結果, 測定点, 測定データを格納したタプル
    """

    grid_x: FloatArray
    obs_point: list[int]
    results_t: list[float]
    results_co2: list[FloatArray]
    grid_x, results_t, results_co2, obs_point, results_obs = fargs
    
    plt.cla()

    # co2のグラフ
    co2: FloatArray = results_co2[frame]
    obs: FloatArray = results_obs[frame]
    plt.scatter(grid_x, co2, c='red', label='CO2')
    plt.scatter(grid_x[obs_point], obs, c='blue', label='observation')
    plt.ylim(400, 7000)
    axes.set_xlabel('x')
    plt.title(f't = {results_t[frame]:.1f} sec')
    plt.legend()


if __name__ == '__main__':

    Variable.set_class_variables(LX, NX)
    Variable.check_stable(DT, D)

    # Variableクラスのインスタンスとして変数を準備
    grid_x: Variable = Variable('grid_x')
    co2: Variable = Variable('co2')
    src: Variable = Variable('src')

    t: float = 0
    it: int = 0

    # t=0 における模擬測定データを作成
    obs_data: FloatArray = np.array(co2.value + create_random_array(NX, OBS_NOISE), dtype=np.float64)[OBS_POINT]

    # アニメーションの準備
    figure, axes = plt.subplots(1, 1, figsize=(5, 5))
    results_t: list[float] = [t]
    results_co2: list[FloatArray] = [co2.value.copy()]
    results_obs: list[FloatArray] = [obs_data.copy()]

    # 測定データ保存の準備
    results_data_t: list[float] = [t]
    results_data_obs: list[FloatArray] = [obs_data.copy()]

    while t < T_END:

        # 方程式にしたがって時間発展(Euler法)
        co2.value += (D * co2.laplacian() + src.value) * DT
        co2.set_boundary_condition(D, DT, src, C_OUT)
        t += DT

        # 模擬測定データを作成
        obs_data = np.array(co2.value + create_random_array(NX, OBS_NOISE), dtype=np.float64)[OBS_POINT]

        # アニメーション用に結果を保存
        if it % int(DT_PLOT / DT) == 0:
            results_t.append(t)
            results_co2.append(co2.value.copy())
            results_obs.append(obs_data.copy())

        # 測定データ保存を保存
        if it % int(DT_OBS / DT) == 0:
            results_data_t.append(t)
            results_data_obs.append(obs_data.copy())

        it += 1

    # アニメーションを作成
    anim = animation.FuncAnimation(figure, create_plot, range(len(results_t)),
                                   fargs=(grid_x.value, results_t, results_co2, OBS_POINT, results_obs))
    display(HTML(anim.to_jshtml()))
    plt.close()

In [None]:
"""Step 1: CO2の初期分布の(初期)推定値を初期条件として, 1次元非定常拡散方程式のシミュレーションをする(有限差分法)"""


def step1(initial_co2: FloatArray, bool_anim: bool) -> tuple[list[FloatArray], float, int]:
    """Step 1を行う

    Parameters
    ----------
    initial_co2 : FloatArray
        co2の初期分布の推定値
    bool_anim : bool
        アニメーションを作成するかどうかの真偽値

    Returns
    -------
    results_error : list[FloatArray]
        測定データとの誤差の結果
    t_end_step1 : float
        Step 1でのシミュレーション終了時刻
    it_end_step1 : int
        Step 1でのシミュレーション終了時のインデックス
    """

    co2.value = initial_co2.copy()

    t: float = 0
    it: int = 0

    # アニメーションの準備
    if bool_anim:
        figure, axes = plt.subplots(1, 1, figsize=(5, 5))
    results_t: list[float] = [t]
    results_co2: list[FloatArray] = [co2.value.copy()]

    # 測定データとの誤差の計算の準備
    data: FloatArray = np.empty(len(OBS_POINT), dtype=np.float64)
    error: FloatArray = np.empty(len(OBS_POINT), dtype=np.float64)
    results_error: list[FloatArray] = []

    while t < T_END:

        # 方程式にしたがって時間発展(Euler法)
        co2.value += (D * co2.laplacian() + src.value) * DT
        co2.set_boundary_condition(D, DT, src, C_OUT)
        t += DT

        # アニメーション用に結果を保存
        if it % int(DT_PLOT / DT) == 0:
            results_t.append(t)
            results_co2.append(co2.value.copy())

        # 測定データとの誤差を計算して保存
        if it % int(DT_OBS / DT) == 0:
            data = results_data_obs[it // int(DT_OBS / DT)]
            error = data - co2.value[OBS_POINT]
            results_error.append(error)

        it += 1

    # アニメーションを作成
    if bool_anim:
        anim = animation.FuncAnimation(figure, create_plot, range(len(results_t)),
                                       fargs=(grid_x.value, results_t, results_co2, OBS_POINT, results_obs))
        display(HTML(anim.to_jshtml()))
        plt.close()

    return results_error, t, it


if __name__ == '__main__':

    # co2の初期分布の初期推定値を設定
    initial_guess: FloatArray = np.array([4500 for _ in range(NX)], dtype=np.float64)
    
    results_error: list[FloatArray]
    t_end_step1: float
    it_end_step1: int
    results_error, t_end_step1, it_end_step1 = step1(initial_guess, True)

In [None]:
"""Step 2: 時間を逆向きに, 1次元非定常拡散方程式の随伴方程式をシミュレーションをする(有限差分法)"""


def create_plot_lambda(frame: int, *fargs) -> None:
    """lambdaの各タイムステップでのグラフを作成する
    
    animation.FuncAnimationに使用する関数
    
    Parameters
    ----------
    frame : int
        フレーム番号
    fargs : tuple
        格子点のx座標, 時刻, lambdaの結果を格納したタプル
    """

    grid_x: FloatArray
    results_t: list[float]
    results_lambda: list[FloatArray]
    grid_x, results_t, results_lambda = fargs
    
    plt.cla()

    # lambdaのグラフ
    lambda_: FloatArray = results_lambda[frame]
    plt.scatter(grid_x, lambda_, c='green', label='lambda')
    plt.ylim(-5000, 5000)
    axes.set_xlabel('x')
    plt.title(f't = {results_t[frame]:.1f} sec')
    plt.legend()


def step2(results_error: list[FloatArray], t_end_step1: float, it_end_step1, bool_anim: bool) -> FloatArray:
    """Step 2を行う

    Parameters
    ----------
    results_error : list[FloatArray]
        step1で得られた測定データとの誤差の結果
    t_end_step1 : float
        Step 1でのシミュレーション終了時刻
    it_end_step1 : int
        Step 1でのシミュレーション終了時のインデックス
    bool_anim : bool
        アニメーションを作成するかどうかの真偽値
    
    Returns
    -------
    grad_j : FloatArray
        目的関数の勾配
    """

    # Variableクラスのインスタンスとして変数を準備
    lambda_: Variable = Variable('lambda')

    t: float = t_end_step1
    it: int = it_end_step1

    # アニメーションの準備
    if bool_anim:
        figure, axes = plt.subplots(1, 1, figsize=(5, 5))
    results_t: list[float] = [t]
    results_lambda: list[FloatArray] = [lambda_.value.copy()]

    # 測定データとの誤差の計算の準備
    data_t: list[float] = results_data_t.copy()

    while t > 0:

        # 方程式にしたがって時間発展(Euler法)
        lambda_.value += D * lambda_.laplacian() * DT
        if abs(t - data_t[-1]) < DT/2:
            lambda_.value[OBS_POINT] += results_error[it // int(DT_OBS / DT)]
            data_t.pop()
        lambda_.set_boundary_condition(D, DT)
        t -= DT

        # アニメーション用に結果を保存
        if it % int(DT_PLOT / DT) == 0:
            results_t.append(t)
            results_lambda.append(lambda_.value.copy())
        it -= 1

    # アニメーションを作成
    if bool_anim:
        anim = animation.FuncAnimation(figure, create_plot_lambda, range(len(results_t)),
                                       fargs=(grid_x.value, results_t, results_lambda))
        display(HTML(anim.to_jshtml()))
        plt.close()

    grad_j: FloatArray = -lambda_.value.copy()

    return grad_j


if __name__ == '__main__':
    
    grad_j: FloatArray
    grad_j = step2(results_error, t_end_step1, it_end_step1, True)

In [None]:
"""最急勾下法が収束するまで Step 1とStep 2を繰り返す"""

MAX_ITER: Final[int] = 100
DELTA_GUESS: Final[float] = 0.1


def create_plot_initial(frame: int, *fargs) -> None:
    """各反復でのグラフを作成する
    
    animation.FuncAnimationに使用する関数
    
    Parameters
    ----------
    frame : int
        フレーム番号
    fargs : tuple
        格子点のx座標, 正しい初期分布, 反復回数, 初期分布の推定値の結果を格納したタプル
    """

    grid_x: FloatArray
    ans: FloatArray
    results_iter: list[float]
    results_initial: list[FloatArray]
    grid_x, ans, results_iter, results_initial = fargs
    
    plt.cla()

    # co2の初期のグラフ
    initial: FloatArray = results_initial[frame]
    plt.scatter(grid_x, ans, c='red', label='true CO2 initial condition')
    plt.scatter(grid_x, initial, c='orange', label='guess')
    plt.ylim(400, 7000)
    axes.set_xlabel('x')
    plt.title(f'Iteration: {results_iter[frame]}')
    plt.legend()


if __name__ == '__main__':

    # co2の初期条件の初期推定値を設定
    initial_guess = np.full(NX, 4000, dtype=np.float64)

    # アニメーションの準備
    figure, axes = plt.subplots(1, 1, figsize=(5, 5))
    results_iter: list[float] = []
    results_initial: list[FloatArray] = []
    ans: FloatArray = Variable('co2').value

    for iter in range(MAX_ITER):

        # Step 1
        results_error, t_end_step1, it_end_step1 = step1(initial_guess, False)

        # Step 2
        grad_j = step2(results_error, t_end_step1, it_end_step1, False)

        # 最急勾下法で初期条件の推定値を更新
        initial_guess -= DELTA_GUESS * grad_j

        results_iter.append(iter+1)
        results_initial.append(initial_guess.copy())

    anim = animation.FuncAnimation(figure, create_plot_initial, range(len(results_iter)),
                                       fargs=(grid_x.value, ans, results_iter, results_initial))
    display(HTML(anim.to_jshtml()))
    plt.close()