# Hopf Bifurcation Study

In [None]:
from tqdm.notebook import tqdm
from os.path import isfile, splitext

import h5py
import numpy as np
import jax
from jax import numpy as jnp
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import dolfin as dfn

from femvf import load, statefile as sf, meshutils
from femvf.models.transient import solid as smd, fluid as fmd
from femvf.signals.solid import make_sig_glottal_width_sharp
from blocktensor import h5utils as bh5utils
from vfsig import modal
from vfsig.misc import resample_over_uniform_period

import libhopf
import main_hopf
import libsignal
from lib_main_transient import case_config
import h5utils
from postprocutils import postprocess_case_to_signal

In [None]:
## Global vars
ZETA = 1e-4
R_SEP = 1.0
Y_GAP = 1e-2
PSUBS = np.concatenate([np.arange(200, 300, 10), np.arange(300, 1000, 100)])*10
PSUBS = np.arange(550, 650, 10) * 10 
ECOV = 5e3 * 10
# EBODY = 15e3 * 10
EBODY = 5e3 * 10

# Load the transient model
INIT_STATE_TYPE = 'static'

OUT_DIR = f'out/zeta{ZETA:.2e}_rsep{R_SEP:.1f}_ygap{Y_GAP:.2e}_init{INIT_STATE_TYPE}_fixed_rsep'
FluidType = fmd.BernoulliMinimumSeparation
# OUT_DIR = f'out/zeta{ZETA:.2e}_rsep{R_SEP:.1f}_ygap{Y_GAP:.2e}_init{INIT_STATE_TYPE}_variable_rsep'
# FluidType = fmd.Bernoulli

MESH_NAME = 'BC-dcov5.00e-02-cl1.00'
mesh_path = f'mesh/{MESH_NAME}.xml'
model_tran = load.load_transient_fsi_model(
    mesh_path, None, 
    SolidType=smd.KelvinVoigt, 
    FluidType=FluidType, 
    coupling='explicit'
)

# load the Hopf models
res, dres = main_hopf.setup_models(mesh_path)
_region_to_dofs = meshutils.process_celllabel_to_dofs_from_forms(
    res.solid.forms, res.solid.forms['fspace.scalar'])
_props = main_hopf.set_props(res.props.copy(), _region_to_dofs, res)
res.set_props(_props)
dres.set_props(_props)

res_hopf = libhopf.HopfModel(res, dres)

## Post-process nonlinear oscillations (transient simulations)

In [None]:
## Initialize functions to extract glottal width from saved simulations

# Number of points per period to use in processing
N_PER_PERIOD = 100
proc_gw_tran = make_sig_glottal_width_sharp(model_tran)
proc_gw_hopf = libsignal.make_glottal_width(res_hopf, num_points=N_PER_PERIOD)
def time(f):
    return np.array(f.get_times())

In [None]:
## Post processing transient simulations
postprocess_fname = f'{OUT_DIR}/data.h5'
signal_to_proc = {
    'glottal_width': proc_gw_tran,
    'time': time}
files = [f'{OUT_DIR}/{case_config(MESH_NAME, psub, ECOV, EBODY)}.h5' for psub in PSUBS]

case_to_signal = postprocess_case_to_signal(postprocess_fname, files, model_tran, signal_to_proc)

# with h5py.File(postprocess_fname, mode='r') as f:
#     case_to_signal = h5utils.h5_to_dict(f, {})

In [None]:
## Resample the transient simulations so that they a fixed number of points per period
cases = list(set([key.split('/')[0] for key in case_to_signal.keys()]))

NOFFSET = 3000
NPERIOD = 5

case_to_period = {
    case:
    int(round(
        modal.estimate_fundamental_mode(
            case_to_signal[f'{case}/glottal_width'][NOFFSET:])[0]**-1
    ))
    for case in cases}

case_to_rsignal = {}
for case in cases:
    xp = np.arange(case_to_signal[f'{case}/glottal_width'].size-NOFFSET)
    fp = case_to_signal[f'{case}/glottal_width'][NOFFSET:]
    
    samples_per_period = case_to_period[f'{case}']
    num_period = int(fp.size/samples_per_period)
    
    x = np.linspace(0, num_period, num_period*N_PER_PERIOD + 1) * samples_per_period
    
    case_to_rsignal[f'{case}/glottal_width'] = np.interp(x, xp, fp)

case_to_rsignal_update = {
    f'{case}/time': 
    np.arange(case_to_rsignal[f'{case}/glottal_width'].size)/N_PER_PERIOD
    for case in cases}

case_to_rsignal.update(case_to_rsignal_update)

In [None]:
## Post processing Hopf simulation (load the Hopf bifurcation state)
hopf_sim_fpath = f"out/hopf_state.h5"

with h5py.File(hopf_sim_fpath, mode='r') as f:
    hopf_state = bh5utils.read_block_vector_from_group(f)

# Decompose the Hopf state vector to common components
xfp_hopf = hopf_state[:4]
mode_hopf_real, mode_hopf_imag = hopf_state[4:8], hopf_state[8:12]
# mode_hopf = mode_hopf_real + 1j*mode_hopf_imag
omega_hopf = hopf_state['omega'][0]
psub_hopf = hopf_state['psub'][0]

unit_mode_real, unit_mode_imag = libhopf.normalize_eigenvector_amplitude(
    mode_hopf_real, mode_hopf_imag)

## Plots

### Compare nonlinear and small amplitude oscillations

In [None]:
## Plot transient simulations
fig, ax = plt.subplots(1, 1, figsize=(6, 6))

for psub in PSUBS:
    case = case_config(MESH_NAME, psub, ECOV, EBODY)
    t = case_to_rsignal[f'{case}/time']
    gw = case_to_rsignal[f'{case}/glottal_width']
    
    ax.plot(t, gw, label=f"$p_{{sub}}$: {psub/10:.1f} Pa", lw=1.0)
    
ax.set_xlim(2, 4)

ax.set_xlabel("Time [s]")
ax.set_ylabel("Glottal width [cm]")
ax.legend(loc='lower left', bbox_to_anchor=(0, 1))

fig.tight_layout()
fig.savefig(f'fig/transient_glottal_width.png')

In [None]:
## Plot Hopf simulations
fig, ax = plt.subplots(1, 1)

for ampl in np.linspace(0, 10000.0, 5):
    gw = proc_gw_hopf(
        xfp_hopf.to_ndarray(),
        unit_mode_real.to_ndarray(),
        unit_mode_imag.to_ndarray(),
        psub_hopf, omega_hopf, ampl, 0.0)
    ax.plot(gw, label=f"Amplitude {ampl:.2e}")
    
ax.set_xlabel(f"Time [period]")
ax.set_ylabel("Glottal width [cm]")
ax.legend()

fig.tight_layout()
fig.savefig("fig/onset_gw_vs_amplitude.png", dpi=250)

In [None]:
## Plot Transient vs Hopf simulations
SCALE = np.array([1e4, 1])

def sqr_err(sc_ampl_phase, gw_ref):
    """
    Return the squared error between onset and reference glottal widths
    """
    ampl_phase = sc_ampl_phase*SCALE
    gw_hopf = proc_gw_hopf(
        xfp_hopf.to_ndarray(),
        unit_mode_real.to_ndarray(),
        unit_mode_imag.to_ndarray(),
        0.0, omega_hopf, 
        ampl_phase[0], ampl_phase[1])
    
    err = (gw_hopf-gw_ref)**2
    return jnp.sum(err)

from scipy import optimize

fig, ax = plt.subplots(1, 1)

colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
psubs = np.array([600, 700, 800, 900])*10
# psubs = PSUBS[0::2]

cases = [case_config(MESH_NAME, psub, ECOV, EBODY) for psub in psubs]
for ii, (psub, color, case) in enumerate(zip(psubs, colors, cases)):
    # For each case, load the transient simulation glottal width 
    t = case_to_rsignal[f'{case}/time'][-N_PER_PERIOD:]
    gw_tran = case_to_rsignal[f'{case}/glottal_width'][-N_PER_PERIOD:]
    
    # Calculate an optimal phase and amplitude for the onset glottal width
    # that minimizes the difference from the transient glottal width
    ampl_phase0 = np.array([5e3, 0.0])
    opt_res = optimize.minimize(sqr_err, ampl_phase0/SCALE, args=(gw_tran,), jac=jax.grad(sqr_err, 0), method='BFGS')
    ampl_phase = opt_res['x'] * SCALE
    # print(opt_res)
    
    # Compute the onset glottal width with the optimal phase and amplitude
    gw_hopf0 = proc_gw_hopf(
        xfp_hopf.to_ndarray(),
        unit_mode_real.to_ndarray(),
        unit_mode_imag.to_ndarray(),
        psub_hopf, omega_hopf, ampl_phase0[0], ampl_phase0[1])
    
    gw_hopf = proc_gw_hopf(
        xfp_hopf.to_ndarray(),
        unit_mode_real.to_ndarray(),
        unit_mode_imag.to_ndarray(),
        psub_hopf, omega_hopf, ampl_phase[0], ampl_phase[1])
    
    ax.plot(t, gw_tran, color=color, ls='-')
    ax.plot(t, gw_hopf, color=color, ls='-.')
    ax.plot(t, gw_hopf0, color=color, ls=':')
    
ax.set_xlabel("t/T")
ax.set_ylabel("Glottal width [cm]")

from matplotlib.lines import Line2D

f0s = []
for case in cases:
    _t = case_to_signal[f'{case}/time']
    dt = _t[1]-_t[0]
    f0s.append((case_to_period[f'{case}'] * dt)**-1)

# Make a legend describing the relationship 
# ls -> model type and psub -> color
_model_type_ls = ['-', '-.']
_model_type_lines = [Line2D([0], [0], color='k', ls=ls) for ls in _model_type_ls] 
_model_type_labels = ["Transient", f"Onset {omega_hopf/2/np.pi:.1f} Hz {psub_hopf/10:.1f} Pa"]

_psub_colors = colors[:len(psubs)]
_psub_lines = [Line2D([0], [0], color=color) for color in _psub_colors]
_psub_labels = [f"{psub/10:.0f} Pa, {f0:.1f} Hz" for psub, f0 in zip(psubs, f0s)]
ax.legend(
    _model_type_lines+_psub_lines,
    _model_type_labels+_psub_labels)

fig.savefig('fig/transient_vs_onset_gw.png', dpi=200)

In [None]:
## Plot the onset mode shape
import dolfin as dfn
XREF = res.solid.XREF.vector()

NPHASE = 5
phases = 2*np.pi*np.linspace(0, 1, NPHASE+1)[:-1]

VERT_TO_VECDOF = dfn.vertex_to_dof_map(res.solid.forms['fspace.vector'])
CELLS = res.solid.forms['mesh.mesh'].cells()

fig, axs = plt.subplots(1, NPHASE, sharey=True, sharex=True, figsize=(10, 5))
for ax, phase in zip(axs, phases):
    ampl = 5e4*np.exp(1j*phase)
    mode_hopf_dof_order = (
        XREF[:] + hopf_state['u'] 
        + np.real(
            ampl * (
                hopf_state['u_mode_real'] 
                + 1j*hopf_state['u_mode_imag'])
            )
        )
    
    mode_hopf_vert_order = mode_hopf_dof_order[VERT_TO_VECDOF].reshape(-1, 2)
    # print(mode_hopf_vert_order.shape)
    ax.triplot(*mode_hopf_vert_order.T, triangles=CELLS)
    ax.set_title(f"${phase/np.pi:.2f}\pi$ rad")
    
for ax in axs:
    ax.set_aspect('equal')
    ax.set_xlabel("x [cm]")
axs[0].set_ylabel("y [cm]")
    
fig.tight_layout()
fig.savefig(f'fig/hopf_mode_shape.png', dpi=200)

### Optimization results

In [None]:
# Load mesh and dof map information for plotting data on the mesh
TRI = res.solid.forms['mesh.mesh'].coordinates()
X, Y = TRI[:, 0], TRI[:, 1]
CELLS = res.solid.forms['mesh.mesh'].cells()

# These indices select DOFs from a scalar function in vertex-order
IDX_VERT = dfn.vertex_to_dof_map(res.solid.forms['fspace.scalar'])

In [None]:
# Load optimization history
with h5py.File('out/opt_hist.h5', mode='r') as f:
    objs = f['objective'][:]/10
    grads = [bh5utils.read_block_vector_from_group(f['grad'], nn) for nn in range(len(objs))]
    params = [bh5utils.read_block_vector_from_group(f['parameters'], nn) for nn in range(len(objs))]
    
its = np.arange(len(objs))

In [None]:
# Indicate a specific iteration to plot parameters for
N = -5
grad = grads[N]
param = params[N]

fig = plt.figure(figsize=(10, 6))
gs = gridspec.GridSpec(2, 2, figure=fig)

# Plot the objective function history
ax_obj = fig.add_subplot(gs[0, :])

ax_obj.plot(objs[:-3])
ax_obj.plot([its[N]], [objs[N]], marker='o', color='k')

ax_obj.set_xlabel("Iteration")
ax_obj.set_ylabel("Onset pressure [Pa]")

# Plot the current parameter guess
ax_mod = fig.add_subplot(gs[1, 0])
mappable = ax_mod.tripcolor(X, Y, CELLS, param['emod'][IDX_VERT]/10/1e3)
ax_mod.set_xlabel("x [cm]")
ax_mod.set_ylabel("y [cm]")

cbar = fig.colorbar(mappable, ax=ax_mod)
cbar.ax.set_ylabel("$E$ [kPa]")
ax_mod.set_aspect(1)

# Plot the current gradient
ax_grad = fig.add_subplot(gs[1, 1])
mappable = ax_grad.tripcolor(X, Y, CELLS, grad['emod'][IDX_VERT]/10/1e3)
ax_grad.set_xlabel("x [cm]")
ax_grad.set_ylabel("y [cm]")

cbar = fig.colorbar(mappable, ax=ax_grad)
cbar.ax.set_ylabel("$dE$ [Pa/kPa]")
ax_grad.set_aspect(1)

fig.tight_layout()