In [1]:
import numpy as np
import scipy.sparse.linalg as splinalg
from scipy import interpolate
import matplotlib.pyplot as plt

Tryng to do fluid flow animation followed the code from:

https://medium.com/@zakariatemouch/exploring-fluid-dynamics-using-python-a-numerical-approach-with-navier-stokes-equations-0a00ddae6822

https://www.youtube.com/watch?v=wbYe58NGJJI

Basically it solves incompressible navier stokes equation, by breaking down into certain vector calc steps (you know more than me)


In [2]:
import cmasher as cmr
from tqdm import tqdm

In [5]:
DOMAIN_SIZE = 1.0
N_POINTS = 41
N_TIME_STEPS = 100
TIME_STEP_LENGTH = 0.1
MAX_VISCOSITY = 0.0001
MAX_ITER_CG = None

def forcing_function(time, point):
    time_decay = np.maximum(3.0-0.5*t,
                           0.0,)
    forced_value = (
        time_decay
        *
        np.where(
            (point[0]>0.4)
            &
            (point[0]<0.6)
            &
            (point[1]>0.1)
            &
            (point[1]<0.3)
        ),
        np.array([0.0,1.0]),
        np.array([0.0,0.0])
    )
    return forced_value
        


In [14]:
def main():
    element_length = DOMAIN_SIZE/(N_POINTS - 1)
    scalar_shape = (N_POINTS, N_POINTS)
    scalar_dof = N_POINTS**2
    vector_shape = (N_POINTS, N_POINTS, 2)
    vector_dof = N_POINTS**2*2
    x = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)
    y = np.linspace(0.0, DOMAIN_SIZE, N_POINTS)
    
    X, Y = np.meshgrid(x,y, indexing="ij")
    coordinates = np.concatenate(
        (
            X[..., np.newaxis],
            Y[..., np.newaxis],
        ), axis=-1,
    )
    
    forcing_function_vectorized = np.vectorized(pyfunc=forcing_function, signature="(),(d)->(d)")

    def partial_derivative_x(field):
        diff = np.zeros_like(field)
        diff[1:-1, 1:-1]=(
            field[2:, 1:-1]
            -
            field[0:-2, 1:-1]
        )/(
            2 * element_length
        )
        return diff

    def partial_derivative_y(field):
        diff = np.zeros_like(field)
        diff[1:-1, 1:-1]=(
            field[1:-1, 2:]
            -
            field[1:-1, 0:-2]
        )/(
            2 * element_length
        )
        return diff
        

    def laplace(field):
        diff = np.zeros_like(field)
        diff[1:-1,1:-1] = (
            field[0:-2, 1:-1]
            +
            field[1:-1 , 0: -2]
            -
            4
            *
            field[1:-1, 1:-1]
            +
            field[2:, 1:-1]
            +
            field[1:-1, 2:] /
            (
                element_length**2
            )
        )
        return diff

    def divergence(vector_field):
        divergence_applied= (
            partail_derivative_x(vector_field[..., 0])
            +
            partail_derivative_y(vector_field[..., 1])
        )
        return divergence_applied

    def gradient(field):
        gradient_applied=np.concatenate(
            (
                partial_derivative_x(field)[..., np.newaxis],
                partial_derivative_y(field)[..., np.newaxis],
            ),
            axis=-1,
        )
        return gradient_applied

    def advect(field, vector_field):
        backtraced_positions = np.clip(
            (coordinates
            -
            TIME_STEP_LENGTH
            *
            vector_field),
            0.0,
            DOMAIN_SIZE,
        ),
        advected_field = interpolate.interpn(
            points = (x,y),
            values = field,
            xi = backraced_positions,
        )
        return advected_field

    def diffusion_operator(vector_field_flattened):
        vector_field = vector_field_flattened.reshape(vector_shape)

    diffusion_applied = (
        vector_field
        -
        KINEMATIC_VISCOSITY
        /
        TIME_STEP_LENGTH
        *
        laplace(vector_field)
    )
    return diffusion_applied.flatten()

    def poisson_operator(field_flattened):
        field = field_flattened.reshape(scalar_shape)
        poisson_applied = laplace(field)
        return poisson_applied.flatten()

    plt.style.use("dark background")
    plt.figure(figsize=(5,5), dpi=160)
    
    velocities_prev = np.zeros(vector_shape)
    time_current = 0.0
    for i in tqdm(range(B_TIME_STEPS)):
        time_current += TIME_STEP_LENGTH
        forces = forcing_function_vectorized(
            time_current,
            coordinates,
        )

        #APPLY forces
        velocities_forces_applied =(
            velocities_prev
        +
            TIME_STEP_LENGTH
            *
            forces
        )
        # Non linear convection (self advect)
        
        velocities_advected = advect(
            field=velocities_forces_applied,
            vector_field=velocities_forces_applied,
        )
        #diffusion
        
        velocities_diffused = splinalg.cg(
            A=splinalg.LinearOperator(
                shape=(vector_dof, vector_dof),
                matvec=diffusion_operator,
            ),
            b=velocities_advected.flatten(),
            maxiter=MAX_ITER_CG,
        )[0].rehsape(vector_shape)
        
        #4.1 computing vector correction, poisson equation (you know more than me)
        
        pressure=splinalg.cg(
           A=splinalg.LinearOperator(
               shape=(scalar_dof, scalar_dof),
               matvec=poisson_operator,
           ),
            b=divergence(velocities_diffued).flatten(),
            maxiter=MAX_ITER_CG
        )[0].reshape(scalar_shape)
        
        #corrected velocity should be incompressible

        velocities_projected=(
            velocities_diffused
            +
            gradient(pressure)
        )

        #advance to next tiem step
        velocities_prev = velocities_projected
        #plot
        plt.quiver(
            X,
            Y,
            velocities_projected[...,0],
            velocities_projected[...,1],
        )
        plt.draw()
        plt.pause(0.0001)
        plt.clf

In [7]:
if __name__ == "__name__":
    main()