In [3]:
import numpy as np
import torch

class LatticeBoltzmannSimulator:
    def __init__(self, grid, tau, ic=None, bc=None):
        self.nx, self.ny,_ = grid.shape
        self.tau = tau
        self.grid = grid
        self.ic = ic
        self.bc = bc
        # Initialize velocity vectors for D2Q9
        self.e = torch.tensor([[0, 0], [1, 0], [0, 1], [-1, 0], [0, -1], 
                               [1, 1], [-1, 1], [-1, -1], [1, -1]], dtype=torch.float32)
        self.w = torch.tensor([4/9] + [1/9]*4 + [1/36]*4, dtype=torch.float32)
        self.initialize_grid()

    def initialize_grid(self):
        # Initialize f at equilibrium with a given macroscopic density rho and velocity u
        rho = torch.ones((self.nx, self.ny), dtype=torch.float32)  # Example density
        if self.ic==None:
            print('no initial cond')
            u = torch.zeros((self.nx, self.ny, 2), dtype=torch.float32)  # Example velocity field (u, v) for each point
        else: 
            u=self.ic
        self.f = self.equilibrium_distribution(rho, u)
        

    def equilibrium_distribution(self, rho, u):
        # Compute the equilibrium distribution function
        feq = torch.zeros((self.nx, self.ny, 9), dtype=torch.float32)
        usqr = u.pow(2).sum(dim=2, keepdim=True)
        eu = torch.einsum('ij,klj->kli', self.e, u)  # Notice the changed indices and order to match dimensions
        # Compute feq for all directions using broadcasting
        feq = rho.unsqueeze(-1) * self.w * (1 + 3*eu + 9/2*eu.pow(2) - 3/2*usqr)
        return feq
        

    def collision_step(self):
        omega = 1.0 / self.tau
        rho, u = self.update_properties()
        feq = self.equilibrium_distribution(rho, u)
        self.f = (1 - omega) * self.f + omega * feq
        #print(np.linalg.norm(self.f-feq))

    def streaming_step(self):
        f_streamed = torch.zeros_like(self.f)
        for i, (ex, ey) in enumerate(self.e):
            f_streamed[:, :, i] = torch.roll(self.f[:, :, i], shifts=(int(ex), int(ey)), dims=(0, 1))
        self.f = f_streamed

    def apply_boundary_conditions(self):
        # Bounce-back boundary condition for no-slip walls at top and bottom
        for i in range(self.nx):
            # Top wall: reverse the distribution functions for the relevant directions
            self.f[i, 0, [2, 5, 6]] = self.f[i, 0, [4, 7, 8]]
            # Bottom wall: reverse the distribution functions for the relevant directions
            self.f[i, -1, [4, 7, 8]] = self.f[i, -1, [2, 5, 6]]

        # Periodic boundary conditions for left and right
        self.f[:, 0, :] = self.f[:, -2, :]  # Shift all f's from second-to-last column to the first
        self.f[:, -1, :] = self.f[:, 1, :]  # Shift all f's from second column to the last
        
    def update_properties(self):
        rho = torch.sum(self.f, dim=2)
        u = torch.einsum('ijk,kl->ijl', self.f, self.e)
        
        # Normalize the velocity by density
        u = u / rho.unsqueeze(2)
        return rho, u

    def run_simulation(self, num_iterations):
        # Initialize a list to store the state at each iteration
        saved_states = []
    
        for iteration in range(num_iterations):
            self.collision_step()
            self.streaming_step()
            self.apply_boundary_conditions()
            rho, u = self.update_properties()
            #print(u.shape,rho.shape)
            
            # Save the current state (rho and velocity field u)
            saved_states.append((rho.clone(), u.clone()))
    
            # Insert additional code for output, visualization, etc., here
            
            # For example, you might print out progress or save visualizations intermittently:
            if iteration % 100 == 0:  # adjust the frequency to your preference
                print(f"Iteration {iteration}: rho and u fields saved.")
                # Visualization or file output code goes here
    
        # Return the saved states
        return saved_states




In [4]:
# Step 2: Initialize your grid, for example, with a uniform flow in the x-direction
u0 = 0.1          # Example initial flow velocity in the x direction
rho0 = 1.0        # Example initial density
nx=1000
ny=500
tau=0.6
# Create initial condition (velocity) tensor
ic = torch.zeros((nx, ny, 2), dtype=torch.float32)  # Placeholder for initial condition (velocity)
ic[:, :, 0] = u0  # Set the x component of velocity

# Instantiate the simulator
simulator = LatticeBoltzmannSimulator(grid=torch.zeros((nx, ny, 9)), tau=tau, ic=ic)
# Run the simulation
saved_states = simulator.run_simulation(num_iterations=100)

Iteration 0: rho and u fields saved.


In [5]:
# Import necessary libraries
%matplotlib notebook

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

# Set up the figure, the axis, and the plot element
fig, ax = plt.subplots()

# Define the plot for the initial state
cax = ax.imshow(saved_states[0][0], interpolation='nearest', cmap='viridis')

# Initialization function: plot the background of each frame
def init():
    cax.set_data(saved_states[0][0])
    return cax,

# Animation function; this is called sequentially
def animate(i):
    rho, u = saved_states[i]  # Access the saved state
    
    cax.set_data(u[:,:,0])  # Update the data.
    return cax

from matplotlib.animation import FuncAnimation
from IPython.display import HTML
fig.colorbar(cax)  
# Create the animation object
anim = FuncAnimation(fig, animate, init_func=init, frames=len(saved_states), interval=20, blit=True)



# Instead of plt.show(), use HTML to embed the animation in the notebook
HTML(anim.to_html5_video())



<IPython.core.display.Javascript object>

In [6]:
rho,u=saved_states[2641]
print(u[2,55])

IndexError: list index out of range

In [10]:
perturbed_density = torch.full((nx, ny), fill_value=1.2, dtype=torch.float32)  # Base density
perturbed_density[5, :] = 2.0  # Increased density for the middle row to see a clear change

# Reapply the non-uniform velocity perturbation
perturbed_velocity = torch.zeros((nx, ny, 2), dtype=torch.float32)
perturbed_velocity[:, :, 0] = 0.1  # Base x-velocity
perturbed_velocity[5, :, 0] = 0.5  # Increased x-velocity for the middle row

class LatticeBoltzmannSimulatorFull(LatticeBoltzmannSimulator):
    # Inherits everything from LatticeBoltzmannSimulator including collision_step, streaming_step, etc.
    def run_simulation(self, num_iterations):
        # Initialize a list to store the state at each iteration
        saved_states = []
    
        for iteration in range(num_iterations):
            self.collision_step()
            self.streaming_step()
            self.apply_boundary_conditions()
            rho, u = self.update_properties()
    
            # Save the current state (rho and velocity field u)
            saved_states.append((rho.clone(), u.clone()))
    
            # For example, you might print out progress or save visualizations intermittently:
            if iteration % (num_iterations // 10) == 0:  # adjust the frequency to your preference
                print(f"Iteration {iteration}: rho and u fields saved.")
    
        # Return the saved states
        return saved_states

# Create a new instance of the simulator with the full class definition
lbs_full = LatticeBoltzmannSimulatorFull(grid=torch.zeros((nx, ny, 9)), tau=0.6, ic=perturbed_velocity)

# Now run the full simulation
simulation_states_full = lbs_full.run_simulation(num_iterations_test=1000)

# For verification purposes, let's check the first and last saved states to see the evolution
first_state_rho_full, first_state_u_full = simulation_states_full[0]
last_state_rho_full, last_state_u_full = simulation_states_full[-1]

# Summarize the results for the first and last iteration for cell (5,5)
evolution_summary_full = {
    'first_state_rho_5_5': first_state_rho_full[5, 5].item(),
    'first_state_u_5_5': first_state_u_full[5, 5].numpy(),
    'last_state_rho_5_5': last_state_rho_full[5, 5].item(),
    'last_state_u_5_5': last_state_u_full[5, 5].numpy(),
}

evolution_summary_full

NameError: name 'num_iterations_test' is not defined