In [None]:
from fenics import *
from multiphenics import *
import numpy as np
from braininversion.IOHandling import (read_mesh_from_h5, write_to_xdmf, 
                                       xdmf_to_unstructuredGrid, read_xdmf_timeseries)
from braininversion.PlottingHelper import (plot_pressures_and_forces_timeslice, 
                                           plot_pressures_and_forces_cross_section,
                                           extract_cross_section, style_dict)
import matplotlib.pyplot as plt
import yaml
import pyvista as pv
from pathlib import Path


In [None]:
mesh_name = "ideal_brain_3D_N80"
T = 2.0
num_steps = 40
# subdomain ids
fluid_id = 2
porous_id = 1

# boundary ids
interface_id = 1
rigid_skull_id = 2
spinal_outlet_id = 3
fixed_stem_id = 4


mmHg2Pa = 132.32

v_order = 2
sim_name = f"{mesh_name}_{T}_{num_steps}"
config_file_path = f"../results/{sim_name}/{sim_name}_config.yml"
sim_file = f"../results/{sim_name}/{sim_name}_checkp.xdmf"
sim_file_fluid = f"../results/{sim_name}/{sim_name}_fluid.xdmf"
sim_file_porous = f"../results/{sim_name}/{sim_name}_por.xdmf"

mesh_file = f"../meshes/{mesh_name}/{mesh_name}.xdmf"
fluid_restriction_file = f"../meshes/{mesh_name}/{mesh_name}_fluid.rtc.xdmf"
porous_restriction_file = f"../meshes/{mesh_name}/{mesh_name}_porous.rtc.xdmf"

boundary_file = f"../meshes/{mesh_name}/{mesh_name}_boundaries.xdmf"


with open(config_file_path) as conf_file:
    config = yaml.load(conf_file, Loader=yaml.FullLoader)

mesh_dir = config["mesh_dir_name"]
mesh_config_path = f"../meshes/{mesh_dir}/{mesh_dir}_config.yml"
with open(mesh_config_path) as conf_file:
    mesh_config = yaml.load(conf_file, Loader=yaml.FullLoader)

In [None]:
ventricle_probe = [Point(mesh_config["ventricle_probe"])]
sas_probe = [Point(mesh_config["sas_probe"])]
parenchyma_probe = [Point(mesh_config["parenchyma_probe"])]


dt = T/num_steps
times = np.linspace(0, T, num_steps + 1)
infile_mesh = XDMFFile(mesh_file)
mesh = Mesh()
infile_mesh.read(mesh)
gdim = mesh.geometric_dimension()
subdomain_marker = MeshFunction("size_t", mesh, gdim)
infile_mesh.read(subdomain_marker, "subdomains")

boundary_marker = MeshFunction("size_t", mesh, gdim - 1, 0)
boundary_infile = XDMFFile(boundary_file)
boundary_infile.read(boundary_marker)
boundary_infile.close()
fluid_submesh = SubMesh(mesh, subdomain_marker, fluid_id)
por_submesh = SubMesh(mesh, subdomain_marker, porous_id)

In [None]:
class subdomainFilter(UserExpression):
    def __init__(self, subdomain_marker, subdomain_id, **kwargs):
        self.marker = subdomain_marker
        self.domain_id = subdomain_id
        super().__init__(**kwargs)


    def eval_cell(self, values, x, cell):
            if self.marker[cell.index] == self.domain_id:
                values[0] = 1
            else:
                values[0] = np.nan

fluid_filter = subdomainFilter(subdomain_marker, fluid_id, degree=0)
por_filter = subdomainFilter(subdomain_marker, porous_id, degree=0)

DG = FunctionSpace(mesh, "DG", 0)
fluid_filter = interpolate(fluid_filter, DG)
fluid_filter.set_allow_extrapolation(True)
por_filter = interpolate(por_filter, DG)
por_filter.set_allow_extrapolation(True)

In [None]:
V = VectorFunctionSpace(mesh, "CG", v_order)
V_por = VectorFunctionSpace(por_submesh, "CG", v_order)
V_fluid = VectorFunctionSpace(fluid_submesh, "CG", v_order)

W = FunctionSpace(mesh, "CG", 1)
W_por = FunctionSpace(por_submesh, "CG", 1)
W_fluid = FunctionSpace(fluid_submesh, "CG", 1)

names = {"pF":W, "pP":W, "phi":W,"d":V,"u":V}

#names = {"pF":W_fluid, "pP":W_por, "phi":W_por,"d":V_por,"u":V_fluid}
domains = {"pF":"fluid", "pP":"porous", "phi":"porous",
         "d":"porous", "u":"fluid"}

infile = XDMFFile(sim_file)

In [None]:

infile_por = XDMFFile(sim_file_porous)
infile_fluid = XDMFFile(sim_file_fluid)

results = {n:[] for n in names}
for n, space in names.items():
    for i in range(num_steps + 1):
        f = Function(space)
        if domains[n] == "fluid":
            infile.read_checkpoint(f, n, i)
            #infile_fluid.read_checkpoint(f, n, i)
        elif domains[n] == "porous":
            infile.read_checkpoint(f, n, i)
            #infile_por.read_checkpoint(f, n, i)
        else:
            print("error!")
        results[n].append(f)

infile_por.close()
infile_fluid.close()

In [None]:
# plot pressure evolution at different probe points
pP_series = results["pP"]
pF_series = results["pF"]
phi_series = results["phi"]

pF_sas = extract_cross_section(pF_series, sas_probe).flatten()/mmHg2Pa
pF_ventricle = extract_cross_section(pF_series, ventricle_probe).flatten()/mmHg2Pa
pP_parenchyma = extract_cross_section(pP_series, parenchyma_probe).flatten()/mmHg2Pa
phi_parenchyma = extract_cross_section(phi_series, parenchyma_probe).flatten()/mmHg2Pa

plt.figure(figsize=(10,8))
plt.plot(times, pF_ventricle, label="ventricle")
plt.plot(times, pP_parenchyma, label="parenchyma fluid")
plt.plot(times, phi_parenchyma, label="parenchyma tot")
plt.plot(times, pF_sas, label="SAS")

plt.legend()
plt.grid()
plt.xlabel("t [s]")
plt.ylabel("p in mmHg")

In [None]:
# compute pressure gradient
dist = np.array(mesh_config["ventricle_probe"]) - np.array(mesh_config["sas_probe"])
dist = np.linalg.norm(dist)
diff = pF_ventricle - pF_sas
grad = diff/dist
plt.figure(figsize=(10,8))
plt.plot(times, grad , label="ventricle")

plt.legend()
plt.grid()
plt.xlabel("t [s]")
plt.ylabel("pressure grad in mmHg/m")

In [None]:
# plot cross section through the domain
x_coords = np.linspace(0.0, 0.12, 1000)
if gdim==2:
    cross_points = [Point(x, 0) for x in x_coords]
elif gdim==3:
    cross_points = [Point(x, 0, 0) for x in x_coords]

pF_cross = extract_cross_section(pF_series, cross_points, filter_function=fluid_filter)/mmHg2Pa
pP_cross = extract_cross_section(pP_series, cross_points, filter_function=por_filter)/mmHg2Pa
phi_cross = extract_cross_section(phi_series, cross_points, filter_function=por_filter)/mmHg2Pa


for i in np.arange(1, num_steps, 5):
    plt.figure(figsize=(10,8))
    plt.plot(x_coords, pF_cross[i,:], ".-",label="fluid pressure")
    plt.plot(x_coords, pP_cross[i,:], ".-", label="porous fluid pressure")
    plt.plot(x_coords, phi_cross[i,:], ".-", label="total pressure")

    plt.legend()
    plt.grid()
    plt.title(f"t = {(i)*dt:.3f}")
    plt.xlabel("x in m")
    plt.ylabel("p in mmHg")

In [None]:
# compute outflow into spinal coord 
ds_outflow = Measure("ds", domain=mesh, subdomain_data=boundary_marker, subdomain_id=spinal_outlet_id)
n = FacetNormal(mesh)

m3tomL = 1e6
outflow = np.array([assemble(dot(u,n)*ds_outflow) for u in results["u"]])
plt.figure(figsize=(10,8))
plt.plot(times, outflow*m3tomL, label="outflow into spinal coord")
plt.legend()
plt.grid()
plt.xlabel("time in s")
plt.ylabel("flowrate in mL/ s")

In [None]:
cum_outflow = np.cumsum(outflow)*dt
plt.figure(figsize=(10,8))
plt.plot(times, cum_outflow*m3tomL, label="cumulative outflow into spinal coord")
plt.legend()
plt.grid()
plt.xlabel("time in s")
plt.ylabel("V in mL")

In [None]:
# compute transitional flow in and out of parenchyma
[u.set_allow_extrapolation(True) for u in results["u"]]
ds_interf = Measure("dS", domain=mesh, subdomain_data=boundary_marker, subdomain_id=interface_id)
dx = Measure("dx", domain=mesh, subdomain_data=subdomain_marker)
transitional_flow = np.array([assemble(dot(u("-"), n("-"))*ds_interf + Constant(0.0)*dx) for u in results["u"]])
plt.figure(figsize=(10,8))
plt.plot(times, transitional_flow*m3tomL, label="outflow into spinal coord")
plt.legend()
plt.grid()
plt.xlabel("time in s")
plt.ylabel("flowrate in mL/ s")