Environment

In [None]:
import os
import sys
sys.path.append("/home/zanardi/Codes/ML/RONEK/ronek/")

from ronek import env
env.set(
  device="cpu",
  device_idx=0,
  nb_threads=8,
  floatx="float64"
)

Import libraries

In [None]:
import copy
import numpy as np
import scipy as sp
import pandas as pd
import ronek.postproc.plotting as pltt

from ronek import const
from ronek import utils
from ronek.systems import TASystem
from ronek.roms import CoarseGrainingM1
from silx.io.dictdump import h5todict

Define inputs
> System

In [None]:
T = 1e4
ic = {
  "cold": [5e2, 1e3, 5e-2],
  "hot":  [3e3, 1e3, 5e-2]
}
tgrid = {
  "lim": [1e-9, 1e-3],
  "num": 500
}
teval = {
  "cold": [1e-8, 1e-7, 7e-7, 3e-6, 1e-4],
  "hot":  [1e-7, 1e-6, 4e-6, 2e-5, 1e-3]
}

> Paths

In [None]:
path_to_dtb = "/home/zanardi/Codes/ML/RONEK/run/RVC_O3/database/"
path_to_data = "/home/zanardi/Codes/ML/RONEK/run/RVC_O3/test/data/"

Initialize isothermal master equation model

In [None]:
system = TASystem(
  T=T,
  rates=path_to_dtb + "/kinetics.hdf5",
  species={
    k: path_to_dtb + f"/species/{k}.json" for k in ("atom", "molecule")
  },
  use_einsum=False,
  use_factorial=False
)

In [None]:
cg = CoarseGrainingM1(
  molecule=path_to_dtb + "/species/molecule.json"
)
e = cg.molecule.lev["e"]

In [None]:
testcase = "/home/zanardi/Codes/ML/RONEK/ronek/examples/RVC_O3/database/testcases/RVE5/cold.csv"
mapping = "/home/zanardi/Codes/ML/RONEK/ronek/examples/RVC_O3/database/testcases/RVE5/mapping.csv"

In [None]:
mapping = np.loadtxt(mapping, delimiter=",")[:,1].astype(np.int16)
cg.build(mapping=mapping)

In [None]:
x_fom = cg.decode(x=cg.read_sol(testcase))
x_fom

In [None]:
x_fom.T

In [None]:
# Thermo
# ===================================
def build_interp(species, T, phi):
  if isinstance(phi, np.ndarray):
    phi = [phi]
  e = np.stack([species.lev["e"] @ phij for phij in phi], axis=0).T
  # Temperature to internal energy
  species.T_to_e = []
  for eg in e:
    species.T_to_e.append(
      (T.min(), T.max(), sp.interpolate.interp1d(T, eg, kind="linear", axis=0))
    )
  # Internal energy to temperature
  species.e_to_T = []
  for eg in e:
    species.e_to_T.append(
      (eg.min(), eg.max(), sp.interpolate.interp1d(eg, T, kind="linear", axis=0))
    )

def interpolate(species, T=None, e=None):
  if (T is not None):
    return species._interpolate(T, species.T_to_e)
  else:
    return species._interpolate(e, species.e_to_T)

def _interpolate(species, x, funs):
  y = [fun[2](np.clip(x[i], fun[0], fun[1]) ) for (i, fun) in enumerate(funs)]
  return np.array(y).reshape(-1)


In [None]:
def build_probmat(nb_bins):
  mapping = cg.get_mapping(nb_bins)
  return cg.construct_probmat(mapping)

def build_phi(T, P):
  cg.molecule.update(T)
  phi = P * cg.molecule.q.reshape(-1,1)
  Q = P.T @ cg.molecule.q
  phi /= Q.reshape(1,-1)
  return phi

def build_psi(P):
  return P, P*e.reshape(-1,1)

In [None]:
nb_bins = 5
Tint = np.geomspace(3e2, 5e5, 500)

ops = {
  "ed": [[],[]],
  "r": []
}
P = build_probmat(nb_bins)
psis = build_psi(P)
phis = []
for Ti in Tint:
  phi = build_phi(Ti, P)
  phis.append(phi)
  for j, psi in enumerate(psis):
    system.update_rom_ops(phi, psi, biortho=False)
    ops["ed"][j].append(system.rom_ops["ed"])
    if (len(ops["r"]) < 2):
      ops["r"].append(system.rom_ops["r"])
    if ("m_ratio" not in ops):
      ops["m_ratio"] = system.rom_ops["m_ratio"]
ops["ed"] = [np.stack(op, axis=0) for op in ops["ed"]]

utils.map_nested_dict(ops, np.shape)

In [None]:
def build_interp_ops(Tint, ops):
  funs = copy.deepcopy(ops)
  for m in range(2):
    funs_m = []
    for op in np.transpose(ops["ed"][m], axes=(2,0,1)):
      funs_m.append(
        sp.interpolate.interp1d(Tint, op, kind="linear", axis=0)
      )
    funs["ed"][m] = funs_m
  return funs

def apply_interp_ops(T, funs):
  ops = copy.deepcopy(funs)
  for m in range(2):
    ops_m = []
    for i, fun in enumerate(funs["ed"][m]):
      ops_m.append(fun(T[i]))
    ops["ed"][m] = np.stack(ops_m, axis=-1)
  return ops

In [None]:
ops_interp = build_interp_ops(Tint, ops)
cg.molecule.build_interp(Tint, phis)

In [None]:
def fun(t, x, interp):
  # Extract variables
  x = x * const.UNA
  n_a, (n_m, e_m) = x[:1], np.split(x[1:], 2)
  # Interpolate temperatures
  T_m = cg.molecule.interpolate(e=e_m/n_m)
  print(T_m)
  # Interpolate operators
  ops = apply_interp_ops(T_m, interp)
  # Compose source terms
  # > Molecule mass
  w_n_m = ops["ed"][0] @ n_m * n_a \
        + ops["r"][0] * n_a**3
  # > Molecule energy
  w_e_m = ops["ed"][1] @ e_m * n_a \
        + ops["r"][1] * n_a**3
  # > Atom
  w_n_a = - ops["m_ratio"] @ w_n_m
  # Compose full source term
  f = np.concatenate([w_n_a, w_n_m, w_e_m], axis=0)
  return f / const.UNA

In [None]:
def encode(n, psis):
  return np.concatenate([n @ psi for psi in psis])

# def decode(z, P):
#   if (z.shape[1] != 2*nb_bins):
#     z = z.T
#   n_m, e_m = np.split(z, 2, axis=-1)
#   T_m = np.vstack([cg.molecule.interpolate(e=e_m[i]/n_m[i]) for i in range(len(z))])
#   for i, Ti in enumerate(T_m.T):
#     Ti = Ti.reshape(-1,1)
    
    
#     T_m @ P.T


#   return np.concatenate([n @ psi for psi in psis])

In [None]:
filename = path_to_data + "/case_hot.p"
icase = utils.load_case(filename=filename)
n_fom, t, n0 = [icase[k] for k in ("n", "t", "n0")]

In [None]:
z_m = encode(n0[1:], psis)
n_m, e_m = np.split(z_m, 2)
y0 = np.concatenate([n0[:1], n_m, e_m])
y0

In [None]:
y = system.solve(
  t=system.get_tgrid(1e-9,1e-3,500),
  y0=y0,
  fun=fun,
  jac=None,
  ops=ops_interp,
  rtol=1e-10
)

In [None]:
np.sort(np.random.choice(200,5,replace=False))

In [None]:
i = np.random.choice(200)
Ti = np.full(5, Tint[i])
opsp = apply_interp_ops(Ti, ops_interp)

true = ops["ed"][1][i]
pred = opsp["ed"][1]
err = 100*np.abs(true - pred) / np.abs(true)
err.max()

In [None]:
egp = np.exp(sp.interpolate.interp1d(Tint, np.log(e_g), kind="cubic", axis=0)(Tint))
err = 100*np.abs(egp - e_g) / np.abs(e_g)
err.max()

In [None]:
eg_int = sp.interpolate.interp1d(Tint, np.log(e_g), kind="cubic", axis=0)
Tint_p = np.random.normal(1,0.2,size=nb_bins)*5000
egp = np.exp(eg_int(Tint_p))
egp

In [None]:
egp = np.exp(sp.interpolate.interp1d(Tint, np.log(e_g), kind="cubic", axis=0)(Tint))
err = 100*np.abs(egp - e_g) / np.abs(e_g)
err.max()

In [None]:
k_edp = np.sign(k_ed) * np.exp(sp.interpolate.interp1d(Tint, np.log(np.abs(k_ed)), kind="cubic", axis=0)(Tint))
err = 100*np.abs(k_edp - k_ed) / np.abs(k_ed)
err.max()

In [None]:


  # Isothermal master equation model
  # -----------------------------------
  path_to_dtb = inputs["paths"]["dtb"]
  system = utils.get_class(
    modules=[sys_mod],
    name=inputs["system"]["name"]
  )(
    rates=path_to_dtb + "/kinetics.hdf5",
    species={
      k: path_to_dtb + f"/species/{k}.json" for k in ("atom", "molecule")
    },
    **inputs["system"]["kwargs"]
  )

  # Testing
  # -----------------------------------
  # Initialization
  # ---------------
  # Path to saving
  path_to_saving = inputs["paths"]["saving"] + "/error/" + inputs["eval_err_on"]
  os.makedirs(path_to_saving, exist_ok=True)
  # Time grid
  t = utils.load_case(path=inputs["data"]["path"], index=0, key="t")

  # ROM models
  bt_bases = h5todict(inputs["paths"]["bases"])
  bt_bases = [bt_bases[k] for k in ("phi", "psi")]
  cg_model = CoarseGraining(
    T=system.T,
    molecule=path_to_dtb+"/species/molecule.json"
  )

> Molecule's levels

In [None]:
gi = model.species["molecule"].lev["g"]
ei = model.species["molecule"].lev['e'] / const.eV_to_J

Balanced POD

In [None]:
path = paths["data"]+"/figs/bases/"
os.makedirs(path, exist_ok=True)

In [None]:
bases = h5todict(paths["data"]+"/bases.hdf5")
s, phi, psi = [bases[k] for k in ("s", "phi", "psi")]

In [None]:
# Cumulative energy
cs = 1.0 / np.sum(s**2)
cs *= np.cumsum(s**2)
# Number of principal components
eps = 1e-6
rom_dim = np.where(cs > 1-eps)[0][0]+1
rom_dim

In [None]:
pltt.cum_energy(
  cs[:rom_dims[-1]],
  figname=path + "/cum_en",
  save=True,
  show=False
)

In [None]:
for i in range(rom_dims[-1]):
  nb = str(i+1)
  for (name, basis) in (("phi", phi), ("psi", psi)):
    pltt.dist_2d(
      x=ei,
      y=basis[:,i],
      labels=[r"$\epsilon_i$ [eV]", r"$\%s_{%s}$" % (name, nb)],
      scales=["linear", "linear"],
      markersize=1,
      figname=path + f"/{name}_{nb.zfill(2)}",
      save=True,
      show=False
    )

FOM and ROM solutions

In [None]:
t = get_tgrid(tgrid["lim"], tgrid["num"])
for (name, (T, p, Xa)) in ic.items():
  y0 = get_y0(model, T, p, Xa)
  # Solving
  print(f"> Solving FOM for '{name}' test case ...")
  yfom = solve_fom(model, t, y0)
  for r in rom_dims:
    print(f"  > Solving ROM for '{name}' test case with {r} dimensions ...")
    yrom = solve_rom(model, t, y0, phi, psi, r, abs=False)
    # Postprocessing
    print(f"  > Postprocessing FOM and ROM solutions ...")
    path = paths["data"] + f"/figs/sol/{name}_r{r}/"
    os.makedirs(path, exist_ok=True)
    # > Moments
    plot_moments(path, t, yfom[1], yrom[1], ei.reshape(-1,1), max_mom=2)
    # > Distribution static
    plot_dist(path, teval[name], t, yfom[1], yrom[1], ei, gi, markersize=1)
    # > Distribution dynamic
    animate_dist(path, t, yfom[1], yrom[1], ei, gi, markersize=1)