In [1]:
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

This notebook implements and tests a numerical solution to the shallow water equations on a cartesian geometry, using 2nd order finite differences and a 4th-order Runge-Kutta solver for stepping forward in time.  Boundary conditions are implemented in a flexible way, allowing for periodicity in either dimension, flux-free boundary conditions, or numerically-specified boundary conditions via use of ghost cells.

The equation set solved follows Holton, Ch 3.1 equations SW.1--4:

$$\begin{eqnarray}
\frac{D\mathbf{u}}{dt} + \mathbf{f} \times \mathbf{u} & = & -g \nabla \eta \\
\frac{Dh}{dt} + h \nabla \cdot \mathbf{u} & = & 0 \\
h(x,y,t) & = & \eta(x,y,t) - \eta_b(x,y) \\
\frac{D}{Dt} & \equiv & \frac{\partial}{\partial t} + u \frac{\partial}{\partial x} + v \frac{\partial}{\partial y} \\
\end{eqnarray}$$


## Advection and boundary conditions

This section implements and tests advection, the boundary conditions, and the RK4 solver.

In [2]:

# define helper tuples for shifting data in the left, right, up, and down directions
# this is used for centered finite differences
c = 2*(slice(1,-1,None),)
l = (slice(1,-1,None), slice(None, -2, None))
r = (slice(1,-1,None), slice(2, None, None))
u = (slice(None, -2, None), slice(1, -1, None))
d = (slice(2, None, None), slice(1, -1, None))

def periodic_boundary(array, axis = -1):
    """ Implements a periodic boundary condition via ghost cells for the specified dimension """
    ndims = len(array.shape)
    slice_accessor_lhs = ndims*[slice(None,None,None)]
    slice_accessor_rhs = ndims*[slice(None,None,None)]
    
    # mirror the end of the array to the beginning
    slice_accessor_lhs[axis] = 0
    slice_accessor_rhs[axis] = -2 
    array[tuple(slice_accessor_lhs)] = array[tuple(slice_accessor_rhs)]
    
    # mirror the beginning of the array to the end
    slice_accessor_lhs[axis] = -1 
    slice_accessor_rhs[axis] = 1 
    array[tuple(slice_accessor_lhs)] = array[tuple(slice_accessor_rhs)]
    
    return

def advection_test_UV_tendency(state, tendency_array, U_index = 1, V_index = 2):
    """ Returns zeros for the U/V tendencies; used for testing of advection """
    tendency_array[U_index,...] = 0.0
    tendency_array[V_index,...] = 0.0
    
    return
    
def cfd_h_sw1d_tendency(state, tendency_array, dx, dy, H_index = 0, U_index = 1, V_index = 2):
    """ sets the tendency for the height variable in the shallow-water system, calculated using 2nd-order finite differences. """
    # get pointers to the u,v, h and h tendency arrays
    U = state[U_index,:,:]
    V = state[V_index,:,:]
    H = state[H_index,:,:]
    dHdt = tendency_array[H_index,:,:]
    
    # it is assumed that boundary conditions are already loaded in the single layer of
    # ghost cells surrounding the state array
    
    # dhdt = -u * grad(h) - h * grad(u)
    dHdt[c] = \
        -U[c] * (H[r] - H[l])/(2*dx) \
        -V[c] * (H[u] - H[d])/(2*dy) \
        -H[c] * (U[r] - U[l])/(2*dx) \
        -H[c] * (V[u] - V[d])/(2*dy)
    
    # note that the above modifies a view of tendency_array, so no data need to be returned; the modification is in-place
    return

def cfd_uv_sw1d_tendency(state, tendency_array, f, g, eta_b, dx, dy, H_index = 0, U_index = 1, V_index = 2):
    """ sets the tendency for the u and v variables in the shallow-water system, calculated using 2nd-order finite differences. """
    # get pointers to the u,v, h and h tendency arrays
    U = state[U_index,:,:]
    V = state[V_index,:,:]
    H = state[H_index,:,:]
    dUdt = tendency_array[U_index,:,:]
    dVdt = tendency_array[V_index,:,:]
    
    # it is assumed that boundary conditions are already loaded in the single layer of
    # ghost cells surrounding the state array
    
    # dudt = -u * grad(u) - f * v - g * grad(eta)
    
    # note that the above modifies a view of tendency_array, so no data need to be returned; the modification is in-place
    return
    
        
class SW1DSolver:
    
    def __init__(
        self,
        nx = 32,
        ny = 32,
        dx = 50e3,
        dy = 50e3,
        c_cfl = 30,
        cfl = 0.1,
        g = -9.806,
        f = 0, 
        eta_b = 0,
        x_is_periodic = True,
        y_is_periodic = True,
        array_type = np.float64,
        advection_testing = False,
        state0 = None,
        current_t = 0,
    ):
        """ Initialize the solver class; allocate variables, etc. """
        # store inputs
        self.nx = nx
        self.ny = ny
        self.dx = dx
        self.dy = dy
        self.c_cfl = c_cfl
        self.cfl = cfl
        self.g = g
        self.f = f
        self.eta_b = eta_b,
        self.x_is_periodic = x_is_periodic
        self.y_is_periodic = y_is_periodic
        self.array_type = array_type
        self.advection_testing = advection_testing
        self.current_t = current_t
        
        # initialize the state variables
        self.state_indices = dict(u = 0, v = 1, h = 2)
        self.num_state = len(self.state_indices)
        
        # determine the timestep
        self.dt = int(cfl * max([dx,dy]) / (c_cfl))
        
        
        # allocate the state variable
        # (index 0 is the state variable, index 1 is y, index 2 is x)
        # Note that the dimension size is increased by 2 to account for ghost cells
        self._state_ = np.empty([self.num_state, ny+2, nx+2], dtype = array_type)
        if state0 is not None:
            # set the initial condition from what was passed in
            self._state_[:, 1:-1, 1:-1] = state0
            
        # a temporary array for assisting in the RK4 solver
        self._solver_tmp_ = np.zeros([5,self.num_state, ny+2, nx+2], dtype = array_type)
        
        # set the boundary condition callback function
        if x_is_periodic:
            self.x_boundary_function = lambda x: periodic_boundary(x, axis = -1)
        if y_is_periodic:
            self.y_boundary_function = lambda x: periodic_boundary(x, axis = -2)
            
        if advection_testing:
            # set the RHS for the u and v quantities such that they aren't updated
            self.uv_rhs_function = lambda state, dstate_dt : \
                advection_test_UV_tendency(
                    state,
                    dstate_dt,
                    self.state_indices['u'],
                    self.state_indices['v'],
                )
        else:
            # set the RHS for u and v
            self.uv_rhs_function = lambda state, dstate_dt : \
                cfd_uv_sw1d_tendency(
                    state,
                    dstate_dt,
                    self.f,
                    self.g,
                    self.eta_b,
                    self.dx,
                    self.dy,
                    H_index = self.state_indices['h'],
                    U_index = self.state_indices['u'],
                    V_index = self.state_indices['v'],
                )
            
        self.h_rhs_function = lambda state, dstate_dt : \
            cfd_h_sw1d_tendency(
                state,
                dstate_dt,
                self.dx,
                self.dy,
                H_index = self.state_indices['h'],
                U_index = self.state_indices['u'],
                V_index = self.state_indices['v'],
            )
        
        return
    
    def RHS(self, state, dstate_dt):
        """ Updates dstate_dt with the right-hand side values of the 1D SW system. """
        
        # enforce the boundary conditions
        self.x_boundary_function(state)
        self.y_boundary_function(state)
        
        # update the U and V tendency variables;
        # this directly updates self._solver_tmp_[0,...]
        self.uv_rhs_function(state, dstate_dt)
        
        # update the h tendency
        self.h_rhs_function(state, dstate_dt)
        
     
    def step_forward(
        self,
        ndays = 5,
        state0 = None,
    ):
        """Steps the model forward a specified number of days; 
        uses the model's current state as the initial condition if one isn't given"""
        
        if state0 is not None:
            self._state_[:,1:-1,1:-1] = np.copy(np.array(state0))
            
            
        delta_t = ndays * 86400 # s
        tmax = self.current_t + delta_t
        dt = self.dt
        
        # pre-define some helper variables/views for the RK4 integration
        k1 = self._solver_tmp_[1,...]
        k2 = self._solver_tmp_[2,...]
        k3 = self._solver_tmp_[3,...]
        k4 = self._solver_tmp_[4,...]
        
        while self.current_t < tmax:
            # stage 1
            self.RHS(self._state_, k1)
            
            # stage 2
            self._solver_tmp_[0,...] = self._state_ + k1 * dt/2
            self.RHS(self._solver_tmp_[0,...], k2)
            
            # stage 3
            self._solver_tmp_[0,...] = self._state_ + k2 * dt/2
            self.RHS(self._solver_tmp_[0,...], k3)
            
            # stage 4
            #print(self._state_.max(),k1.max(),k2.max(),k3.max(),k4.max())
            self._solver_tmp_[0,...] = self._state_ + k3 * dt
            self.RHS(self._solver_tmp_[0,...], k4)
            
            # combine the tendencies into the final value
            self._state_[:] = \
                self._state_ + (1/6)*dt*(k1 + 2*k2 + 2*k3 + k4)
            
            # calculate the new time
            self.current_t += dt
            
    

In [3]:
""" Test a: periodic boundaries """

nx = ny = 10
a = np.zeros([ny+2,nx+2])
a[c] = 1

# apply periodic boundaries in both the x- and y- directions
periodic_boundary(a, axis = -1)
periodic_boundary(a, axis = -2)

# expected result: array elements are 1 at the boundaries
assert np.all(a == 1)

In [4]:
""" Test 0: Instantiation """

test0 = SW1DSolver()

# expected result: no errors raised

In [5]:
""" Test 1: Run 1 step w/o breaking """
test1 = SW1DSolver(state0 = 0)
test1.RHS(test1._state_, test1._solver_tmp_[0,...])

# expected result: no errors raised

In [6]:
""" Test 2: Run 1 step advection-only w/o breaking """
test2 = SW1DSolver(advection_testing=True, state0 = 1)
test2.RHS(test1._state_, test1._solver_tmp_[0,...])

# expected result: no errors raised

In [7]:
""" Test 3: Run 1 day advection-only on uniform field """
state0 = 1 # initialize with all 1's
test3 = SW1DSolver(advection_testing=True, state0 = state0)

test3.step_forward(ndays = 1)

# expected result: initial field is identical to final field
assert np.all(test3._state_[:,1:-1,1:-1] == state0)

In [8]:
""" Test 4: Run 1 step advection-only on a non-uniform field """
# set the initial condition to be a bell
nx = ny = 101
dx = dy = 50e3
s_b = 5
a_b = 10
cx = (nx-1)/2
cy = (ny-1)/2
x = np.arange(nx)
y = np.arange(ny)
x, y = np.meshgrid(x,y)
test4 = SW1DSolver(
    nx = nx, ny = ny, dx = dx, dy = dy,
    advection_testing=True, state0 = 0)

# set the wind speed
U0 = 10 # m/s

state0 = np.array(test4._state_[:,1:-1, 1:-1], copy = True)
state0[test4.state_indices['h'],...] = \
    a_b * np.exp(-((x - cx)**2 + (y - cy)**2)/(4*(s_b**2)))
state0[test4.state_indices['u'],...] = U0 

test4.step_forward(ndays = test4.dt/86400, state0 = state0)

# expected result: final field differs from initial field
assert np.logical_not(np.all(test4._state_[:,1:-1,1:-1] == state0))

In [11]:
""" Test 5: Advect a full cycle with a horizontal periodic boundary condition """
# set the initial condition to be a bell
nx = ny = 101
dx = dy = 50e3
s_b = 10 
a_b = 10
cx = (nx-1)/2
cy = (ny-1)/2
x = np.arange(nx)
y = np.arange(ny)
x, y = np.meshgrid(x,y)
test5 = SW1DSolver(
    nx = nx, ny = ny, dx = dx, dy = dy,
    advection_testing=True, state0 = 0)

# set the wind speed
U0 = 10 # m/s

state0 = np.array(test5._state_[:,1:-1, 1:-1], copy = True)
state0[test5.state_indices['h'],...] = \
    a_b * np.exp(-((x - cx)**2 + (y - cy)**2)/(4*(s_b**2)))
# advect diagonally at a total speed of U0
state0[test5.state_indices['u'],...] = U0 *np.sqrt(2)/2
state0[test5.state_indices['v'],...] = U0 *np.sqrt(2)/2

# calculate the time required for one full transit
x_domain = np.sqrt((nx * dx)**2 + (ny * dy)**2)
t_transit = x_domain / U0

# adjust dt so we can do exactly one full transit
n_transit = int(t_transit/test5.dt)
test5.dt = t_transit / n_transit

test5.step_forward(ndays = t_transit/86400, state0 = state0)

# calculate the mean squared error
MSE = np.sqrt(np.mean((state0[2,...]-test5._state_[2,...][c])**2))

# expected result: mean squared error is small
assert MSE < 0.05