In [None]:
try:
    import google.colab  # noqa: F401
except ImportError:
    import dolfin
else:
    try:
        import dolfin
    except ImportError:
        !wget "https://fem-on-colab.github.io/releases/fenics-install.sh" -O "/tmp/fenics-install.sh" && bash "/tmp/fenics-install.sh"
        import dolfin
import numpy as np
import mshr
import fenics as fe
from fenics import PETScKrylovSolver, PETScOptions

In [None]:
def get_domain_dimensions(mesh):

  x_coords = mesh.coordinates()[:, 0]
  y_coords = mesh.coordinates()[:, 1]
  length = x_coords.max() - x_coords.min()

  threshold = x_coords.min() + length * 0.1
  filtered_y_coords = y_coords[x_coords <= threshold]

  height = filtered_y_coords.max() - filtered_y_coords.min()

  return length, height

In [None]:
def define_dirichlet_bc(V, Q, boundaries, inflow_profile):

    bcu_inflow = fe.DirichletBC(V, fe.Expression(inflow_profile, degree=2), boundaries, 1)
    bcu_walls = fe.DirichletBC(V, fe.Constant((0, 0)), boundaries, 2)
    bcu = [bcu_inflow, bcu_walls]

    bcp_outflow = fe.DirichletBC(Q, fe.Constant(0.), boundaries, 3)
    bcp = [bcp_outflow]

    return bcu, bcp

In [None]:
def define_trial_test_functions(V, Q):

  u = fe.TrialFunction(V)
  v = fe.TestFunction(V)
  p = fe.TrialFunction(Q)
  q = fe.TestFunction(Q)

  return u, v, p, q

In [None]:
def define_solution_functions(V, Q):

  u_n = fe.Function(V) #previous time-step
  u_  = fe.Function(V)
  p_n = fe.Function(Q) #previous time-step
  p_  = fe.Function(Q)

  return u_n, u_, p_n, p_

In [None]:
def define_expressions(mesh, scalars):

  dt, mu, rho = scalars
  f  = fe.Constant((0, 0))
  n  = fe.FacetNormal(mesh) # normal vector
  k  = fe.Constant(dt)
  mu = fe.Constant(mu)
  rho = fe.Constant(rho)

  return f, n, k, mu, rho

In [None]:
def define_variational_problem(u, u_, u_n, v, p, p_, p_n, q, f, n, k, mu, rho):

  def epsilon(u):
    return fe.sym(fe.nabla_grad(u))

  def sigma(u, p):
    return 2*mu*epsilon(u) - p*fe.Identity(len(u))

  U = 0.5*(u_n + u)

  F1 = rho*fe.dot((u - u_n) / k, v)*fe.dx \
    + rho*fe.dot(fe.dot(u_n, fe.nabla_grad(u_n)), v)*fe.dx \
    + fe.inner(sigma(U, p_n), epsilon(v))*fe.dx \
    + fe.dot(p_n*n, v)*fe.ds - fe.dot(mu*fe.nabla_grad(U)*n, v)*fe.ds \
    - fe.dot(f, v)*fe.dx
  a1 = fe.lhs(F1)
  L1 = fe.rhs(F1)

  a2 = fe.dot(fe.nabla_grad(p), fe.nabla_grad(q))*fe.dx
  L2 = fe.dot(fe.nabla_grad(p_n), fe.nabla_grad(q))*fe.dx - (1/k)*fe.div(u_)*q*fe.dx

  a3 = fe.dot(u, v)*fe.dx
  L3 = fe.dot(u_, v)*fe.dx - k*fe.dot(fe.nabla_grad(p_ - p_n), v)*fe.dx

  A1 = fe.assemble(a1)
  A2 = fe.assemble(a2)
  A3 = fe.assemble(a3)

  return A1, L1, A2, L2, A3, L3

In [None]:
def range_norm(u):

  if u.value_rank() == 0:  # Scalar field
    min_val = np.min(u.vector().get_local())
    max_val = np.max(u.vector().get_local())
  else:                    # Vector field
    norms = np.linalg.norm(u.vector().get_local().reshape(-1, 2), axis=1)
    min_val = np.min(norms)
    max_val = np.max(norms)
  return min_val, max_val

In [None]:
def get_norm(u, v):
  u_array = u.vector().get_local()
  v_array = v.vector().get_local()
  difference = u_array - v_array
  norm = np.sqrt(np.sum(difference**2))
  return norm

In [None]:
def solve_variational_problem(u_, u_n, p_, p_n, bcu, bcp, A1, L1, A2, L2, A3, L3, scalars, num_steps, atol, solver):

  dt, mu_scalar, rho_scalar = scalars

  [bc.apply(A1) for bc in bcu]
  [bc.apply(A2) for bc in bcp]

  t = 0
  diverged = False
  for i in range(num_steps):

    b1 = fe.assemble(L1)
    [bc.apply(b1) for bc in bcu]

    match solver:
      case 'lu':
        fe.solve(A1, u_.vector(), b1, 'lu')
      case 'gmres':
        fe.solve(A1, u_.vector(), b1, 'gmres', 'default')
      case 'bicgstab':
        fe.solve(A1, u_.vector(), b1, 'bicgstab', 'hypre_amg')

    b2 = fe.assemble(L2)
    [bc.apply(b2) for bc in bcp]

    match solver:
      case 'lu':
        fe.solve(A2, p_.vector(), b2, 'lu')
      case 'gmres':
        fe.solve(A2, p_.vector(), b2, 'gmres', 'default')
      case 'bicgstab':
        fe.solve(A2, p_.vector(), b2, 'bicgstab', 'hypre_amg')

    b3 = fe.assemble(L3)
    fe.solve(A3, u_.vector(), b3, 'cg', 'sor')

    min_pressure = p_.vector().min()
    if min_pressure < 0:
        offset = abs(min_pressure)
        p_.vector()[:] += offset

    u_delta = get_norm(u_, u_n)
    p_delta = get_norm(p_, p_n)

    if u_delta < atol:
        print(f"Converged at step {i}")
        break

    u_n.assign(u_)
    p_n.assign(p_)

    t += dt

    u_max = np.max(np.linalg.norm(u_.vector().get_local().reshape(-1, 2), axis=1))
    p_max = np.max(p_.vector().get_local())

    if u_max * p_max > 10:

      print(f'Potential divergence error at step {i}')
      diverged = True
      break

    # if (i % 100 == 0):
    #   min_velocity, max_velocity = range_norm(u_)
    #   min_pressure, max_pressure = range_norm(p_)
    #   print(f"Time step n: {i}, Velocity: [{min_velocity:.5f}, {max_velocity:.5f}], Pressure: [{min_pressure:.5f}, {max_pressure:.5f}]")
    #   print(f"u delta: {u_delta}, \t p delta: {p_delta}")

  return u_, p_, diverged

In [None]:
def solve_navier_stokes(V, Q, mesh, boundaries, inflow_profile, scalars, num_steps, atol, solver):

  bcu, bcp = define_dirichlet_bc(V, Q, boundaries, inflow_profile)
  u, v, p, q = define_trial_test_functions(V, Q)
  u_n, u_, p_n, p_ = define_solution_functions(V, Q)
  f, n, k, mu, rho = define_expressions(mesh, scalars)
  A1, L1, A2, L2, A3, L3 = define_variational_problem(u, u_, u_n, v, p, p_, p_n, q, f, n, k, mu, rho)
  u_, p_, diverged = solve_variational_problem(u_, u_n, p_, p_n, bcu, bcp, A1, L1, A2, L2, A3, L3, scalars, num_steps, atol, solver)

  return u_, p_, diverged