<a href="https://colab.research.google.com/github/isosafrasaurus/3D-1D/blob/main/3D1D_docker_variant_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title Install required libraries
%%capture
import os, re

def replace_in_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()

    # Replace 'ufl' with 'ufl_legacy'
    content = re.sub(r'\bufl\b', 'ufl_legacy', content)

    with open(file_path, 'w', encoding='utf-8') as file:
        file.write(content)

def process_directory(directory):
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith('.py'):
                file_path = os.path.join(root, file)
                replace_in_file(file_path)

# ipywidgets
try:
    import ipywidgets
except ImportError:
    !pip install ipywidgets

# dolfin
try:
    import dolfin
except ImportError:
    !wget "https://fem-on-colab.github.io/releases/fenics-install-real.sh" -O "/tmp/fenics-install.sh" && bash "/tmp/fenics-install.sh"

# block
try:
    import block
except ImportError:
    !git clone https://bitbucket.org/fenics-apps/cbc.block/src/master/
    !pip install master/

# fenics_ii
try:
    import xii
except ImportError:
    !git clone https://github.com/MiroK/fenics_ii
    process_directory("fenics_ii/")
    !pip install fenics_ii/

# vtk
try:
    import vtk
except ImportError:
    !pip install vtk

# graphnics
try:
    import graphnics
except ImportError:
    !git clone https://github.com/IngeborgGjerde/graphnics
    !pip install graphnics/

In [126]:
from dolfin import *
from vtk.util.numpy_support import vtk_to_numpy
from xii import *
from graphnics import *

import scipy
import copy
import vtk
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from mpl_toolkits.mplot3d import Axes3D

import sys
sys.path.append('/workspace/modules')
from visualizer import visualize, visualize_scatter, visualize_contour

WD_PATH = "/workspace"

In [139]:
# @title Define G = serena data graph, mesh1d = G.mesh
%%capture

G = FenicsGraph()
ind = 0
branch_points = {}

for n in range(29):
    file_name = WD_PATH + '/data/pv_json1/Centerline_'+str(n)+'.mrk.json'
    f = open(file_name)
    data = json.load(f)
    f.close()

    # get coords + radius at each point
    points = data['markups'][0]['controlPoints']
    radius = data['markups'][0]['measurements'][3]['controlPointValues']
    G.add_nodes_from(range(ind - n, ind + len(points) - n))

    # check if first coord is branch point from previous centerlines
    v1 = 0
    for key, val in branch_points.items():
        if points[0]['position'] == val:
            v1 = key
            break

    # add coords and radius to nodes fenics graph
    v2 = ind - n + 1
    pos_v1 = points[0]['position']
    pos_v2 = points[1]['position']
    G.nodes[v1]["pos"] = pos_v1
    G.nodes[v2]["pos"] = pos_v2
    G.nodes[v1]["Radius"] = radius[0]
    G.nodes[v2]["Radius"] = radius[1]
    # add edge to fenics graph
    G.add_edge(v1, v2)

    for i in range(len(points)-2):
        v1 = ind - n + 1 + i
        v2 = v1 + 1
        # Convert coordinates from mm to meters (divide by 1000)
        pos_v1 = [coord / 1000 for coord in points[0]['position']]
        pos_v2 = [coord / 1000 for coord in points[1]['position']]
        pos_v1 = points[i + 1]['position']
        pos_v2 = points[i + 2]['position']
        G.nodes[v1]["pos"] = pos_v1
        G.nodes[v2]["pos"] = pos_v2
        G.nodes[v1]["Radius"] = radius[i + 1]
        G.nodes[v2]["Radius"] = radius[i + 2]
        G.add_edge(v1, v2)


    # store last point as a branch point
    ind += len(points)
    branch_points.update({ind-n-1: pos_v2})

# create 1d mesh
G.make_mesh()
mesh1d = G.mesh

# get positions of 1d mesh
pos = nx.get_node_attributes(G, "pos")
node_coords = np.asarray(list(pos.values()))
xmin, ymin, zmin = np.min(node_coords, axis = 0)

# recenter 1d coordinates to all be >= 0
d = mesh1d.coordinates()
d[:, :] += [-xmin, -ymin, -zmin]

mesh3d = UnitCubeMesh(32, 32, 32)

# fit 3D mesh around 1D mesh
c = mesh3d.coordinates()
xl, yl, zl = (np.max(node_coords, axis=0) - np.min(node_coords, axis=0))  # graph length scales
c[:, :] *= [xl + 3, yl + 3, zl]  # Rescale lengths

In [131]:
# @title Define G = sorted domain, mesh1d = G.mesh, mesh3d wrap
%%capture

# Define function to read .vtk file and extract required information
def read_vtk_file(file_path):
    reader = vtk.vtkPolyDataReader()
    reader.SetFileName(file_path)
    reader.Update()

    output = reader.GetOutput()
    points = vtk_to_numpy(output.GetPoints().GetData())

    connectivity = []
    for i in range(output.GetNumberOfCells()):
        cell = output.GetCell(i)
        connectivity.append(tuple(cell.GetPointId(j) for j in range(cell.GetNumberOfPoints())))

    damage = vtk_to_numpy(output.GetPointData().GetArray("Damage"))
    radius = vtk_to_numpy(output.GetPointData().GetArray("Radius"))

    # Create a pandas DataFrame
    data = pd.DataFrame(points, columns=['x', 'y', 'z'])
    data['Damage'] = damage
    data['Radius'] = radius

    return data, connectivity

# Example usage
file_path = WD_PATH + '/data/vtk/sortedDomain.vtk'
data, connectivity = read_vtk_file(file_path)

G = FenicsGraph()

# Add nodes to the graph G with position, radius, and damage attributes
for i, row in data.iterrows():
    G.add_node(i, pos=(row['x'], row['y'], row['z']), Radius=row['Radius'], Damage=row['Damage'])

# Add edges to the graph G based on connectivity (just in case tuple length > 2, we use range length instead of constant 2)
for conn in connectivity:
    for j in range(len(conn) - 1):
        G.add_edge(conn[j], conn[j + 1])

# Create 1D mesh from the graph
G.make_mesh()
mesh1d = G.mesh

# Get positions and recenter 1D coordinates to all be >= 0
pos = nx.get_node_attributes(G, "pos")
node_coords = np.asarray(list(pos.values()))
xmin, ymin, zmin = np.min(node_coords, axis=0)

# Recenter coordinates
d = mesh1d.coordinates()
d[:, :] -= [xmin, ymin, zmin]

mesh3d = UnitCubeMesh(32, 32, 32)

# fit 3D mesh around 1D mesh
c = mesh3d.coordinates()
xl, yl, zl = (np.max(node_coords, axis=0) - np.min(node_coords, axis=0))  # graph length scales
c[:, :] *= [xl + 3, yl + 3, zl]  # Rescale lengths

In [134]:
G = FenicsGraph()

# "data" should be a Pandas dataframe with columns named x, y, z, Radius, Damage
# each row of "data" represents a separate node
# "connectivity" should be a list of tuples containing the connections between indices of rows in "data"
# Generate evenly spaced coordinates
x_coords = np.repeat(40, 200)
y_coords = np.linspace(0, 100, 200)
z_coords = np.repeat(40, 200)
radii = np.repeat(1.0, 200)  # Example radii; adjust as needed
damage = np.repeat(0.0, 200)  # Example damage; adjust as needed

# Create DataFrame
data = pd.DataFrame({'x': x_coords, 'y': y_coords, 'z': z_coords, 'Radius': radii, 'Damage': damage})

# Generate connectivity (each node connects to the next)
connectivity = [(i, i + 1) for i in range(199)]

# Add nodes to the graph G with position, radius, and damage attributes
for i, row in data.iterrows():
    G.add_node(i, pos=(row['x'], row['y'], row['z']), Radius=row['Radius'], Damage=row['Damage'])

# Add edges to the graph G based on connectivity (just in case tuple length > 2, we use range length instead of constant 2)
for conn in connectivity:
    for j in range(len(conn) - 1):
        G.add_edge(conn[j], conn[j + 1])

# Create 1D mesh from the graph
G.make_mesh()
mesh1d = G.mesh

# Get positions and recenter 1D coordinates to all be >= 0
pos = nx.get_node_attributes(G, "pos")
node_coords = np.asarray(list(pos.values()))
xmin, ymin, zmin = np.min(node_coords, axis=0)

# Recenter coordinates
d = mesh1d.coordinates()
d[:, :] -= [xmin, ymin, zmin]

mesh3d = UnitCubeMesh(32, 32, 32)

# fit 3D mesh around 1D mesh
c = mesh3d.coordinates()
xl, yl, zl = (np.max(node_coords, axis=0) - np.min(node_coords, axis=0))  # graph length scales
c[:, :] *= [xl + 3, yl + 3, zl]  # Rescale lengths

In [140]:
# @title Robin coupling with 3D

Alpha1 = Constant(9.6e-2)
alpha1 = Constant(1.45e4)
beta = Constant(3.09e-5)
gamma = Constant(1.0)  # Adjust gamma
p_infty = Constant(1.3e3)  # Far-field pressure, medical literature suggests PV pressure is 5-10 mmHg

# set boundary conditions for simulation
bc_3d = Constant(3)

# define function that returns lateral faces of 3D boundary
def boundary_3d(x, on_boundary):
    return on_boundary and not near(x[2], 0) and not near(x[2], zl)

# pressure space on global mesh
V3 = FunctionSpace(mesh3d, "CG", 1)
V1 = FunctionSpace(mesh1d, "CG", 1)
W = [V3, V1]

u3, u1 = list(map(TrialFunction, W))
v3, v1 = list(map(TestFunction, W))

# create a radius function for the averaging surface
class RadiusFunction(UserExpression):
    def __init__(self, G, **kwargs):
        self.G = G
        _, self.mf = self.G.get_mesh()
        super().__init__(**kwargs)

    def eval(self, value, x):
        p = Point(x[0], x[1], x[2])
        tree = BoundingBoxTree()
        tree.build(mesh1d)
        cell = tree.compute_first_entity_collision(p)

        edge_ix = self.mf[cell]
        edge = list(G.edges())[edge_ix]
        value[0] = self.G.nodes()[edge[0]]['Radius']

    def value_shape(self):
        return ()

radius_function = RadiusFunction(G)
cylinder = Circle(radius=radius_function, degree=10)

Pi_u = Average(u3, mesh1d, cylinder)
Pi_v = Average(v3, mesh1d, cylinder)

# Dirac measure
dxGamma = Measure("dx", domain=mesh1d)

# blocks
a00 = Alpha1 * inner(grad(u3), grad(v3)) * dx + beta * inner(Pi_u, Pi_v) * dxGamma
a01 = -beta * inner(u1, Pi_v) * dxGamma
a10 = -beta * inner(Pi_u, v1) * dxGamma
a11 = alpha1 * inner(grad(u1), grad(v1)) * dx + beta * inner(u1, v1) * dx - gamma * inner(u1, v1) * ds

# right-hand side
L0 = inner(Constant(0), Pi_v) * dxGamma
L1 = -gamma * inner(p_infty, v1) * ds

a = [[a00, a01], [a10, a11]]
L = [L0, L1]

W_bcs = [[DirichletBC(V3, bc_3d, boundary_3d)], []]

A, b = map(ii_assemble, (a, L))
A, b = apply_bc(A, b, W_bcs)
A, b = map(ii_convert, (A, b))

wh = ii_Function(W)
solver = LUSolver(A, "mumps")
solver.solve(wh.vector(), b)

uh3d, uh1d = wh
File(WD_PATH + '/plots/pv_epsilongaptest/pressure1d.pvd') << uh1d
File(WD_PATH + '/plots/pv_epsilongaptest/pressure3d.pvd') << uh3d

Averaging over 792 cells: 100%|██████████| 792/792 [00:00<00:00, 1045.92it/s]


In [141]:
# visualize(mesh1d, uh1d, mesh3d=mesh3d, uh3d=uh3d, z_level=50, elev=15, azim=120)
visualize_scatter(mesh1d, uh1d, z_level=80)
visualize_contour(mesh3d, uh3d, z_level=80)

# **ARCHIVE**

In [None]:
# @title pv_1 robin coupling in 3D with lagrange

# Create 3D cube mesh
mesh3d = UnitCubeMesh(32, 32, 32)

# Fit 3D mesh around 1D mesh, guaranteeing minimum size and positive coordinates
c = mesh3d.coordinates()
xl, yl, zl = (np.max(node_coords, axis=0) - np.min(node_coords, axis=0))  # Graph length scales

# Calculate scaling factors, ensuring none go below 10
scaling_factors = np.maximum([xl, yl, zl], 10) + 10 # Add 10 for extra space

# Scale and position the cube mesh
c[:, :] *= scaling_factors
c[:, :] += offset  # Align with the adjusted 1D mesh coordinates

# set constants for simulation
Alpha1 = Constant(1)
alpha1 = Constant(1)
beta = Constant(1.0e3)
gamma = Constant(1.0)  # Adjust gamma
p_infty = Constant(1.3e3)  # Far-field pressure, medical literature suggests PV pressure is 5-10 mmHg

# set boundary conditions for simulation
bc_3d = Constant(3)

# define function that returns lateral faces of 3D boundary
def boundary_3d(x, on_boundary):
    return on_boundary and not near(x[2], 0) and not near(x[2], zl)

class FirstPointBoundary(SubDomain):
    def inside(self, x, on_boundary):
        return near(x[0], 0.0)

boundary_markers = MeshFunction("size_t", mesh1d, mesh1d.topology().dim())
boundary_markers.set_all(0)
FirstPointBoundary().mark(boundary_markers, 1)

# pressure space on global mesh
V3 = FunctionSpace(mesh3d, "CG", 1)
V1 = FunctionSpace(mesh1d, "CG", 1)
W = [V3, V1]

u3, u1 = list(map(TrialFunction, W))
v3, v1 = list(map(TestFunction, W))

# create a radius function for the averaging surface
class RadiusFunction(UserExpression):
    def __init__(self, radii_map, pos_map, **kwargs):
        self.radii_map = radii_map
        self.pos_map = pos_map
        super().__init__(**kwargs)

    def eval(self, value, x):
        min_dist = float('inf')
        closest_radius = 0
        for node, position in self.pos_map.items():
            posi = np.array(list(position.values()))
            dist = np.linalg.norm(x - posi)
            if dist < min_dist:
                min_dist = dist
                closest_radius = self.radii_map[node]
        value[0] = closest_radius

    def value_shape(self):
        return ()

# Prepare radius and position maps
radii = df_points['Radius'].to_dict()
pos = df_points[['x', 'y', 'z']].to_dict(orient='index')

radius_function = RadiusFunction(radii, pos)
cylinder = Circle(radius=radius_function, degree=10)

Pi_u = Average(u3, mesh1d, cylinder)
Pi_v = Average(v3, mesh1d, cylinder)

dxGamma = Measure("dx", domain=mesh1d)
ds = Measure("ds", domain=mesh1d, subdomain_data=boundary_markers)

# Define the variational problem
a00 = Alpha1 * inner(grad(u3), grad(v3)) * dx + beta * inner(Pi_u, Pi_v) * dxGamma
a01 = -beta * inner(u1, Pi_v) * dxGamma
a10 = -beta * inner(Pi_u, v1) * dxGamma
a11 = alpha1 * inner(grad(u1), grad(v1)) * dx + beta * inner(u1, v1) * dx - gamma * inner(u1, v1) * ds(1)

# Right-hand side
L0 = inner(Constant(0), Pi_v) * dxGamma
L1 = -gamma * inner(p_infty, v1) * ds(1)

# Assemble system
a = [[a00, a01], [a10, a11]]
L = [L0, L1]

W_bcs = [[DirichletBC(V3, bc_3d, boundary_3d)], []]

A, b = map(ii_assemble, (a, L))
A, b = apply_bc(A, b, W_bcs)
A, b = map(ii_convert, (A, b))

wh = ii_Function(W)
solver = LUSolver(A, "mumps")
solver.solve(wh.vector(), b)

uh3d, uh1d = wh
File(WD_PATH + '/plots/pv_lagrangetest/pressure1d.pvd') << uh1d
File(WD_PATH + '/plots/pv_lagrangetest/pressure3d.pvd') << uh3d
visualize_scatter(mesh1d, uh1d, z_level=20)


In [None]:
# @title Solve 1D PDE by finite difference method
import numpy as np
import matplotlib.pyplot as plt

L = 1.0         # Domain length
beta = 1.0       # Coefficient
p_bar_3D = 1.0   # Mean 3D pressure
p_inf = 0.5     # Far-field pressure
gamma = 1.0      # Boundary condition coefficient
f_1D = lambda s: 0.0  # Source term

N = 100         # Number of grid points
ds = L / (N - 1)

s = np.linspace(0, L, N)

# pressure and coefficient matrix
p_1D = np.zeros(N)
A = np.zeros((N, N))

# Interior points
for i in range(1, N - 1):
    A[i, i - 1] = 1
    A[i, i] = -2 - beta * ds**2
    A[i, i + 1] = 1

# Apply Robin boundary at s = 0
A[0, 0] = gamma * ds - 1
A[0, 1] = 1

# Apply Robin boundary at s = L
A[-1, -1] = 1 - gamma * ds
A[-1, -2] = -1

# right-hand side vector
b = np.zeros(N)
for i in range(1, N - 1):
    b[i] = -beta * ds**2 * p_bar_3D - ds**2 * f_1D(s[i])

# boundary conditions in right-hand side vector
b[0] = gamma * ds * p_inf
b[-1] = -gamma * ds * p_inf

p_1D = np.linalg.solve(A, b)

plt.figure()
plt.plot(s, p_1D, marker='o', linestyle='-')
plt.xlabel('s')
plt.ylabel('p_1D')
plt.title('1D numpy solution')
plt.grid(True)
plt.legend()
plt.show()

In [None]:
# @title Solve 1D PDE by FEniCS
%%capture

L = 1.0         # Domain length
beta = 1.0      # Coefficient
p_bar_3D = 1.0  # Mean 3D pressure
p_inf = 0.5     # Far-field pressure
gamma = 1.0     # Boundary condition coefficient

# mesh and function space
mesh = IntervalMesh(50, 0, L)
V = FunctionSpace(mesh, 'P', 1)

# variational problem
p = TrialFunction(V)
v = TestFunction(V)

a = dot(grad(p), grad(v))*dx + beta*p*v*dx
L = beta*p_bar_3D*v*dx

# Robin boundary conditions
a += gamma*p*v*ds
L += gamma*p_inf*v*ds

p_1D = Function(V)
solve(a == L, p_1D)

plot(p_1D)
plt.xlabel('s')
plt.ylabel('p_{1D}')
plt.title('1D FEniCS solution')
plt.grid(True)
plt.show()

In [None]:
# @title 1D domain analysis only with Dirichlet boundaries
# function space
V1 = FunctionSpace(mesh1d, "CG", 1)
u1 = TrialFunction(V1)
v1 = TestFunction(V1)

# constants for simulation
alpha1 = Constant(1.45e4)
beta = Constant(3.09e-5)

# Dirichlet boundary condition
bc_1d = Expression("0.02*x[2]+6", degree=0)

# measure for 1D mesh
dxGamma = Measure("dx", domain=mesh1d)

# blocks
a11 = alpha1 * inner(grad(u1), grad(v1)) * dx + beta * inner(u1, v1) * dxGamma

# right-hand side
L1 = inner(Constant(0), v1) * dxGamma

# assemble system
A, b = assemble_system(a11, L1, DirichletBC(V1, bc_1d, "on_boundary"))

# solve
uh1d = Function(V1)
solve(A, uh1d.vector(), b)

# save solution
File(WD_PATH + 'plots/pv1_1dlim/pressure1d.pvd') << uh1d

visualize(mesh1d, uh1d)

In [None]:
# @title deprecated method of G = sorted domain definition
%%capture

# Define function to read .vtk file and extract required information
def read_vtk_file(file_path):
    reader = vtk.vtkPolyDataReader()
    reader.SetFileName(file_path)
    reader.Update()

    output = reader.GetOutput()
    points = vtk_to_numpy(output.GetPoints().GetData())

    connectivity = []
    for i in range(output.GetNumberOfCells()):
        cell = output.GetCell(i)
        connectivity.append([cell.GetPointId(j) for j in range(cell.GetNumberOfPoints())])

    damage = vtk_to_numpy(output.GetPointData().GetArray("Damage"))
    radius = vtk_to_numpy(output.GetPointData().GetArray("Radius"))

    df_points = pd.DataFrame(points, columns=['x', 'y', 'z'])
    df_points['Damage'] = damage
    df_points['Radius'] = radius

    df_connectivity = pd.DataFrame(connectivity, columns=['point1', 'point2'])
    return df_points, df_connectivity

file_path = WD_PATH + '/data/vtk/sortedDomain.vtk'
df_points, df_connectivity = read_vtk_file(file_path)

# Create the graph G using graphnics
G = FenicsGraph()

# Add nodes to the graph G with position, radius, and damage attributes
for index, row in df_points.iterrows():
    G.add_node(index, pos=(row['x'], row['y'], row['z']), radius=row['Radius'], damage=row['Damage'])

# Add edges to the graph G based on connectivity
for _, row in df_connectivity.iterrows():
    G.add_edge(row['point1'], row['point2'])

# Create 1D mesh from the graph
G.make_mesh()
mesh1d = G.mesh

# Get positions and recenter 1D coordinates to all be >= 0
pos = nx.get_node_attributes(G, "pos")
node_coords = np.asarray(list(pos.values()))
xmin, ymin, zmin = np.min(node_coords, axis=0)

# Recenter coordinates
d = mesh1d.coordinates()
d[:, :] -= [xmin, ymin, zmin]

In [None]:
# @title 1D domain analysis only with Robin boundaries

# Function space
V1 = FunctionSpace(mesh1d, "CG", 1)
u1 = TrialFunction(V1)
v1 = TestFunction(V1)

# Constants for simulation
alpha1 = Constant(1.45e4)
beta = Constant(3.09e-5)
p_bar_3D = Constant(5.0) # Average p_{3D}
gamma = Constant(0.2)  # Adjust gamma
p_infty = Constant(1.0)  # Far-field pressure

debug_x_list = []

# Weak form
a11 = alpha1 * inner(grad(u1), grad(v1)) * dx + beta * inner(u1, v1) * dx - gamma * inner(u1, v1) * ds
L1 = beta * inner(p_bar_3D, v1) * dx - gamma * inner(p_infty, v1) * ds

# Assemble system
A, b = assemble_system(a11, L1)

# Solve system
uh1d = Function(V1)
solve(A, uh1d.vector(), b)

# Save solution
File(WD_PATH + 'plots/pv1_mod/pressure1d.pvd') << uh1d

# visualize(mesh1d, uh1d)