# Tutorial 02: Navier-Stokes problem

In this tutorial we compare the formulation of a Navier-Stokes problem using standard assembly with mixed function spaces and block assembly. This tutorial serves as a reminder on how to solve a nonlinear problem in `dolfinx` interfacing to `SNES` (part of `PETSc`), and shows a further usage of `multiphenicsx.fem.petsc.BlockVecSubVectorWrapper`.

In [None]:
import typing

In [None]:
import basix.ufl
import dolfinx.fem
import dolfinx.fem.petsc
import dolfinx.io
import gmsh
import mpi4py.MPI
import numpy as np
import numpy.typing as npt
import petsc4py.PETSc
import ufl
import viskex

In [None]:
import multiphenicsx.fem
import multiphenicsx.fem.petsc

### Constitutive parameters

In [None]:
nu = 0.01


def u_in_eval(x: npt.NDArray[np.float64]) -> npt.NDArray[  # type: ignore[no-any-unimported]
        petsc4py.PETSc.ScalarType]:
    """Return the flat velocity profile at the inlet."""
    values = np.zeros((2, x.shape[1]))
    values[0, :] = 1.0
    return values


def u_wall_eval(x: npt.NDArray[np.float64]) -> npt.NDArray[  # type: ignore[no-any-unimported]
        petsc4py.PETSc.ScalarType]:
    """Return the zero velocity at the wall."""
    return np.zeros((2, x.shape[1]))

### Geometrical parameters

In [None]:
pre_step_length = 4.
after_step_length = 14.
pre_step_height = 3.
after_step_height = 5.
mesh_size = 1. / 5.

### Mesh

In [None]:
gmsh.initialize()
gmsh.model.add("mesh")

In [None]:
p0 = gmsh.model.geo.addPoint(0.0, after_step_height - pre_step_height, 0.0, mesh_size)
p1 = gmsh.model.geo.addPoint(pre_step_length, after_step_height - pre_step_height, 0.0, mesh_size)
p2 = gmsh.model.geo.addPoint(pre_step_length, 0.0, 0.0, mesh_size)
p3 = gmsh.model.geo.addPoint(pre_step_length + after_step_length, 0.0, 0.0, mesh_size)
p4 = gmsh.model.geo.addPoint(pre_step_length + after_step_length, after_step_height, 0.0, mesh_size)
p5 = gmsh.model.geo.addPoint(0.0, after_step_height, 0.0, mesh_size)
l0 = gmsh.model.geo.addLine(p0, p1)
l1 = gmsh.model.geo.addLine(p1, p2)
l2 = gmsh.model.geo.addLine(p2, p3)
l3 = gmsh.model.geo.addLine(p3, p4)
l4 = gmsh.model.geo.addLine(p4, p5)
l5 = gmsh.model.geo.addLine(p5, p0)
line_loop = gmsh.model.geo.addCurveLoop([l0, l1, l2, l3, l4, l5])
domain = gmsh.model.geo.addPlaneSurface([line_loop])

In [None]:
gmsh.model.geo.synchronize()
gmsh.model.addPhysicalGroup(1, [l5], 1)
gmsh.model.addPhysicalGroup(1, [l0, l1, l2, l4], 2)
gmsh.model.addPhysicalGroup(2, [domain], 0)
gmsh.model.mesh.generate(2)

In [None]:
mesh, subdomains, boundaries, *_ = dolfinx.io.gmshio.model_to_mesh(
    gmsh.model, comm=mpi4py.MPI.COMM_WORLD, rank=0, gdim=2)
gmsh.finalize()
assert subdomains is not None
assert boundaries is not None

In [None]:
# Create connectivities required by the rest of the code
mesh.topology.create_connectivity(mesh.topology.dim - 1, mesh.topology.dim)

In [None]:
boundaries_1 = boundaries.indices[boundaries.values == 1]
boundaries_2 = boundaries.indices[boundaries.values == 2]

In [None]:
viskex.dolfinx.plot_mesh(mesh)

In [None]:
viskex.dolfinx.plot_mesh_tags(mesh, boundaries, "boundaries")

### Function spaces

In [None]:
V_element = basix.ufl.element("Lagrange", mesh.basix_cell(), 2, shape=(mesh.geometry.dim, ))
Q_element = basix.ufl.element("Lagrange", mesh.basix_cell(), 1)

### Standard formulation using a mixed function space

In [None]:
def run_monolithic() -> dolfinx.fem.Function:
    """Run standard formulation using a mixed function space."""
    # Function spaces
    W_element = basix.ufl.mixed_element([V_element, Q_element])
    W = dolfinx.fem.functionspace(mesh, W_element)

    # Test and trial functions: monolithic
    vq = ufl.TestFunction(W)
    (v, q) = ufl.split(vq)
    dup = ufl.TrialFunction(W)
    up = dolfinx.fem.Function(W)
    (u, p) = ufl.split(up)

    # Variational forms
    F = (nu * ufl.inner(ufl.grad(u), ufl.grad(v)) * ufl.dx
         + ufl.inner(ufl.grad(u) * u, v) * ufl.dx
         - ufl.inner(p, ufl.div(v)) * ufl.dx
         + ufl.inner(ufl.div(u), q) * ufl.dx)
    J = ufl.derivative(F, up, dup)

    # Boundary conditions
    W0 = W.sub(0)
    V, _ = W0.collapse()
    u_in = dolfinx.fem.Function(V)
    u_in.interpolate(u_in_eval)
    u_wall = dolfinx.fem.Function(V)
    u_wall.interpolate(u_wall_eval)
    bdofs_V_1 = dolfinx.fem.locate_dofs_topological((W0, V), mesh.topology.dim - 1, boundaries_1)
    bdofs_V_2 = dolfinx.fem.locate_dofs_topological((W0, V), mesh.topology.dim - 1, boundaries_2)
    inlet_bc = dolfinx.fem.dirichletbc(u_in, bdofs_V_1, W0)
    wall_bc = dolfinx.fem.dirichletbc(u_wall, bdofs_V_2, W0)
    bc = [inlet_bc, wall_bc]

    # Class for interfacing with SNES
    class NavierStokesProblem:
        """Define a nonlinear problem, interfacing with SNES."""

        def __init__(  # type: ignore[no-any-unimported]
            self, F: ufl.Form, J: ufl.Form, solution: dolfinx.fem.Function,
            bcs: list[dolfinx.fem.DirichletBC], P: typing.Optional[ufl.Form] = None
        ) -> None:
            self._F = dolfinx.fem.form(F)
            self._J = dolfinx.fem.form(J)
            self._obj_vec = dolfinx.fem.petsc.create_vector(self._F)
            self._solution = solution
            self._bcs = bcs
            self._P = P

        def create_snes_solution(self) -> petsc4py.PETSc.Vec:  # type: ignore[no-any-unimported]
            """
            Create a petsc4py.PETSc.Vec to be passed to petsc4py.PETSc.SNES.solve.

            The returned vector will be initialized with the initial guess provided in `self._solution`.
            """
            x = self._solution.x.petsc_vec.copy()
            with x.localForm() as _x, self._solution.x.petsc_vec.localForm() as _solution:
                _x[:] = _solution
            return x

        def update_solution(self, x: petsc4py.PETSc.Vec) -> None:  # type: ignore[no-any-unimported]
            """Update `self._solution` with data in `x`."""
            x.ghostUpdate(addv=petsc4py.PETSc.InsertMode.INSERT, mode=petsc4py.PETSc.ScatterMode.FORWARD)
            with x.localForm() as _x, self._solution.x.petsc_vec.localForm() as _solution:
                _solution[:] = _x

        def obj(  # type: ignore[no-any-unimported]
            self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec
        ) -> np.float64:
            """Compute the norm of the residual."""
            self.F(snes, x, self._obj_vec)
            return self._obj_vec.norm()  # type: ignore[no-any-return]

        def F(  # type: ignore[no-any-unimported]
            self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec, F_vec: petsc4py.PETSc.Vec
        ) -> None:
            """Assemble the residual."""
            self.update_solution(x)
            with F_vec.localForm() as F_vec_local:
                F_vec_local.set(0.0)
            dolfinx.fem.petsc.assemble_vector(F_vec, self._F)
            dolfinx.fem.petsc.apply_lifting(F_vec, [self._J], [self._bcs], x0=[x], alpha=-1.0)
            F_vec.ghostUpdate(addv=petsc4py.PETSc.InsertMode.ADD, mode=petsc4py.PETSc.ScatterMode.REVERSE)
            dolfinx.fem.petsc.set_bc(F_vec, self._bcs, x, -1.0)

        def J(  # type: ignore[no-any-unimported]
            self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec, J_mat: petsc4py.PETSc.Mat,
            P_mat: petsc4py.PETSc.Mat
        ) -> None:
            """Assemble the jacobian."""
            J_mat.zeroEntries()
            dolfinx.fem.petsc.assemble_matrix(  # type: ignore[misc]
                J_mat, self._J, self._bcs, diagonal=1.0)  # type: ignore[arg-type]
            J_mat.assemble()
            if self._P is not None:
                P_mat.zeroEntries()
                dolfinx.fem.petsc.assemble_matrix(  # type: ignore[misc]
                    P_mat, self._P, self._bcs, diagonal=1.0)  # type: ignore[arg-type]
                P_mat.assemble()

    # Create problem
    problem = NavierStokesProblem(F, J, up, bc)
    F_vec = dolfinx.fem.petsc.create_vector(problem._F)
    J_mat = dolfinx.fem.petsc.create_matrix(problem._J)

    # Solve
    snes = petsc4py.PETSc.SNES().create(mesh.comm)
    snes.setTolerances(max_it=20)
    snes.getKSP().setType("preonly")
    snes.getKSP().getPC().setType("lu")
    snes.getKSP().getPC().setFactorSolverType("mumps")
    snes.setObjective(problem.obj)
    snes.setFunction(problem.F, F_vec)
    snes.setJacobian(problem.J, J=J_mat, P=None)
    snes.setMonitor(lambda _, it, residual: print(it, residual))
    up_copy = problem.create_snes_solution()
    snes.solve(None, up_copy)
    problem.update_solution(up_copy)  # TODO can this be safely removed?
    up_copy.destroy()
    snes.destroy()
    return up

In [None]:
up_m = run_monolithic()
(u_m, p_m) = (up_m.sub(0).collapse(), up_m.sub(1).collapse())

In [None]:
viskex.dolfinx.plot_vector_field(u_m, "u")

In [None]:
viskex.dolfinx.plot_vector_field(u_m, "u", glyph_factor=1.0)

In [None]:
viskex.dolfinx.plot_scalar_field(p_m, "p")

### Block formulation using two independent function spaces

In [None]:
def run_block() -> tuple[dolfinx.fem.Function, dolfinx.fem.Function]:
    """Run block formulation using two independent function spaces."""
    # Function spaces
    V = dolfinx.fem.functionspace(mesh, V_element)
    Q = dolfinx.fem.functionspace(mesh, Q_element)

    # Test and trial functions
    (v, q) = (ufl.TestFunction(V), ufl.TestFunction(Q))
    (du, dp) = (ufl.TrialFunction(V), ufl.TrialFunction(Q))
    (u, p) = (dolfinx.fem.Function(V), dolfinx.fem.Function(Q))

    # Variational forms
    F = [(nu * ufl.inner(ufl.grad(u), ufl.grad(v)) * ufl.dx + ufl.inner(ufl.grad(u) * u, v) * ufl.dx
          - ufl.inner(p, ufl.div(v)) * ufl.dx),
         ufl.inner(ufl.div(u), q) * ufl.dx]
    J = [[ufl.derivative(F[0], u, du), ufl.derivative(F[0], p, dp)],
         [ufl.derivative(F[1], u, du), ufl.derivative(F[1], p, dp)]]

    # Boundary conditions
    u_in = dolfinx.fem.Function(V)
    u_in.interpolate(u_in_eval)
    u_wall = dolfinx.fem.Function(V)
    u_wall.interpolate(u_wall_eval)
    bdofs_V_1 = dolfinx.fem.locate_dofs_topological(V, mesh.topology.dim - 1, boundaries_1)
    bdofs_V_2 = dolfinx.fem.locate_dofs_topological(V, mesh.topology.dim - 1, boundaries_2)
    inlet_bc = dolfinx.fem.dirichletbc(u_in, bdofs_V_1)
    wall_bc = dolfinx.fem.dirichletbc(u_wall, bdofs_V_2)
    bc = [inlet_bc, wall_bc]

    # Class for interfacing with SNES
    class NavierStokesProblem:
        """Define a nonlinear problem, interfacing with SNES."""

        def __init__(  # type: ignore[no-any-unimported]
            self, F: list[ufl.Form], J: list[list[ufl.Form]],
            solutions: tuple[dolfinx.fem.Function, dolfinx.fem.Function],
            bcs: list[dolfinx.fem.DirichletBC],
            P: typing.Optional[list[list[ufl.Form]]] = None
        ) -> None:
            self._F = dolfinx.fem.form(F)
            self._J = dolfinx.fem.form(J)
            self._obj_vec = dolfinx.fem.petsc.create_vector_block(self._F)
            self._solutions = solutions
            self._bcs = bcs
            self._P = P

        def create_snes_solution(self) -> petsc4py.PETSc.Vec:  # type: ignore[no-any-unimported]
            """
            Create a petsc4py.PETSc.Vec to be passed to petsc4py.PETSc.SNES.solve.

            The returned vector will be initialized with the initial guesses provided in `self._solutions`,
            properly stacked together in a single block vector.
            """
            x = dolfinx.fem.petsc.create_vector_block(self._F)
            with multiphenicsx.fem.petsc.BlockVecSubVectorWrapper(x, [V.dofmap, Q.dofmap]) as x_wrapper:
                for x_wrapper_local, sub_solution in zip(x_wrapper, self._solutions):
                    with sub_solution.x.petsc_vec.localForm() as sub_solution_local:
                        x_wrapper_local[:] = sub_solution_local
            return x

        def update_solutions(self, x: petsc4py.PETSc.Vec) -> None:  # type: ignore[no-any-unimported]
            """Update `self._solutions` with data in `x`."""
            x.ghostUpdate(addv=petsc4py.PETSc.InsertMode.INSERT, mode=petsc4py.PETSc.ScatterMode.FORWARD)
            with multiphenicsx.fem.petsc.BlockVecSubVectorWrapper(x, [V.dofmap, Q.dofmap]) as x_wrapper:
                for x_wrapper_local, sub_solution in zip(x_wrapper, self._solutions):
                    with sub_solution.x.petsc_vec.localForm() as sub_solution_local:
                        sub_solution_local[:] = x_wrapper_local

        def obj(  # type: ignore[no-any-unimported]
            self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec
        ) -> np.float64:
            """Compute the norm of the residual."""
            self.F(snes, x, self._obj_vec)
            return self._obj_vec.norm()  # type: ignore[no-any-return]

        def F(  # type: ignore[no-any-unimported]
            self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec, F_vec: petsc4py.PETSc.Vec
        ) -> None:
            """Assemble the residual."""
            self.update_solutions(x)
            with F_vec.localForm() as F_vec_local:
                F_vec_local.set(0.0)
            dolfinx.fem.petsc.assemble_vector_block(  # type: ignore[misc]
                F_vec, self._F, self._J, self._bcs, x0=x, alpha=-1.0)

        def J(  # type: ignore[no-any-unimported]
            self, snes: petsc4py.PETSc.SNES, x: petsc4py.PETSc.Vec, J_mat: petsc4py.PETSc.Mat,
            P_mat: petsc4py.PETSc.Mat
        ) -> None:
            """Assemble the jacobian."""
            J_mat.zeroEntries()
            dolfinx.fem.petsc.assemble_matrix_block(  # type: ignore[misc]
                J_mat, self._J, self._bcs, diagonal=1.0)  # type: ignore[arg-type]
            J_mat.assemble()
            if self._P is not None:
                P_mat.zeroEntries()
                dolfinx.fem.petsc.assemble_matrix_block(  # type: ignore[misc]
                    P_mat, self._P, self._bcs, diagonal=1.0)  # type: ignore[arg-type]
                P_mat.assemble()

    # Create problem
    problem = NavierStokesProblem(F, J, (u, p), bc)
    F_vec = dolfinx.fem.petsc.create_vector_block(problem._F)
    J_mat = dolfinx.fem.petsc.create_matrix_block(problem._J)

    # Solve
    snes = petsc4py.PETSc.SNES().create(mesh.comm)
    snes.setTolerances(max_it=20)
    snes.getKSP().setType("preonly")
    snes.getKSP().getPC().setType("lu")
    snes.getKSP().getPC().setFactorSolverType("mumps")
    snes.setObjective(problem.obj)
    snes.setFunction(problem.F, F_vec)
    snes.setJacobian(problem.J, J=J_mat, P=None)
    snes.setMonitor(lambda _, it, residual: print(it, residual))
    solution = problem.create_snes_solution()
    snes.solve(None, solution)
    problem.update_solutions(solution)  # TODO can this be safely removed?
    solution.destroy()
    snes.destroy()
    return (u, p)

In [None]:
(u_b, p_b) = run_block()

In [None]:
viskex.dolfinx.plot_vector_field(u_b, "u", glyph_factor=1)

In [None]:
viskex.dolfinx.plot_scalar_field(p_b, "p")

### Error computation between mixed and block cases

In [None]:
def run_error(
    u_m: dolfinx.fem.Function, p_m: dolfinx.fem.Function, u_b: dolfinx.fem.Function, p_b: dolfinx.fem.Function
) -> None:
    """Compute errors between the mixed and block cases."""
    u_m_norm = np.sqrt(mesh.comm.allreduce(
        dolfinx.fem.assemble_scalar(dolfinx.fem.form(ufl.inner(ufl.grad(u_m), ufl.grad(u_m)) * ufl.dx)),
        op=mpi4py.MPI.SUM))
    err_u_norm = np.sqrt(mesh.comm.allreduce(
        dolfinx.fem.assemble_scalar(
            dolfinx.fem.form(ufl.inner(ufl.grad(u_b - u_m), ufl.grad(u_b - u_m)) * ufl.dx)),
        op=mpi4py.MPI.SUM))
    p_m_norm = np.sqrt(mesh.comm.allreduce(
        dolfinx.fem.assemble_scalar(dolfinx.fem.form(ufl.inner(p_m, p_m) * ufl.dx)), op=mpi4py.MPI.SUM))
    err_p_norm = np.sqrt(mesh.comm.allreduce(
        dolfinx.fem.assemble_scalar(dolfinx.fem.form(ufl.inner(p_b - p_m, p_b - p_m) * ufl.dx)),
        op=mpi4py.MPI.SUM))
    print("Relative error for velocity component is equal to", err_u_norm / u_m_norm)
    print("Relative error for pressure component is equal to", err_p_norm / p_m_norm)
    assert np.isclose(err_u_norm / u_m_norm, 0., atol=1.e-10)
    assert np.isclose(err_p_norm / p_m_norm, 0., atol=1.e-10)

In [None]:
run_error(u_m, p_m, u_b, p_b)