![CoSAppLogo](images/cosapp.svg) **CoSApp** tutorials: Time simulations

# Time simulations

CoSApp allows one to simulate the behaviour of systems with time-dependent variables, through the use of time drivers.

A new keyword `add_transient` has been introduced to declare transient variables, defined by their time derivative.

The syntax to define a variable $x$ such that $\frac{dx}{dt} = v$ is:
```python
self.add_transient('x', der='v')
```

## Free fall of a point mass with friction

In [None]:
from cosapp.base import System
import numpy as np

class PointMass(System):
    """Free fall of a point mass, with friction"""
    def setup(self):
        self.add_inward('mass', 1.2, desc='Mass')
        self.add_inward('k', 0.1, desc='Friction coefficient')
        self.add_inward('g', np.r_[0, 0, -9.81], desc='Uniform acceleration field')

        self.add_outward('a', np.zeros(3))
        
        self.add_transient('v', der='a')
        self.add_transient('x', der='v')
        
    def compute(self):
        self.a = self.g - (self.k / self.mass) * self.v

In [None]:
from cosapp.drivers import RungeKutta
from cosapp.recorders import DataFrameRecorder
from time_solutions import PointMassSolution

point = PointMass('point')
driver = point.add_driver(RungeKutta(order=3))

driver.time_interval = (0, 2)
driver.dt = 0.05

# Add a recorder to capture time evolution in a dataframe
driver.add_recorder(DataFrameRecorder(includes=['x', 'v', 'a']), period=0.1)

# Initial conditions
x0 = [0, 0, 10]
v0 = [8, 0, 9.5]

# Define a simulation scenario
driver.set_scenario(
    init = {'x': np.array(x0), 'v': np.array(v0)},
    values = {'mass': 1.5, 'k': 0.5},
)

point.run_drivers()

solution = PointMassSolution(point, v0, x0)

# Retrieve recorded data
data = driver.recorder.export_data()
time = np.asarray(data['time'])
traj = {
    'exact': np.array([solution.x(t) for t in time]),
    'num': np.asarray(data['x'].tolist()),
}

if point.k > 0:
    fls = PointMass('frictionless')
    fls.k = 0
    frictionless = PointMassSolution(fls, v0, x0)
    traj['k = 0'] = np.array([frictionless.x(t) for t in time])

with np.errstate(invalid='ignore', divide='ignore'):
    error = np.where(
        traj['exact'] == 0, 
        np.abs(traj['num']),
        np.abs(traj['num'] / traj['exact'] - 1),
    )

print(
    f"order = {driver.order}; dt = {driver.dt}",
    f"Max error on trajectory = {error.max():.2e}",
    f"End point: {traj['num'][-1].round(2)}",
    sep="\n",
)

In [None]:
# Plot results
import plotly.graph_objs as go

options = {
    'num': dict(mode='markers', name='numerical', line=dict(color='red')),
    'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
    'k = 0': dict(
        mode='lines', name='frictionless',
        line=dict(dash='dot', color='black'),
    ),
}

traces = [
    go.Scatter(
        x = data[:, 0], 
        y = data[:, 2],
        **options[name]
    ) for name, data in traj.items()
]

layout = go.Layout(
    title = "Trajectory", 
    xaxis = dict(title="x"),
    yaxis = dict(
        title = "z",
        scaleanchor = "x",
        scaleratio = 1,
    ),
    height = 450,
    hovermode = "x",
)

go.Figure(data=traces, layout=layout)

## Equilibration of two fluid tanks hydraulically connected

In this example, we consider the equilibration of two liquid filled tanks, hydraulically connected by a pipe.
For simplicity, we consider the quasi-steady, viscosity driven limit, neglecting inertial effects.

The liquid height in each tank is only known implicitly, through its instantaneous rate-of-change.
Note that the time derivative of `height` is a context dependent expression, evaluated at runtime.

In [None]:
from cosapp.base import System, Port


class FloatPort(Port):
    def setup(self):
        self.add_variable('value', 0.0)


class Tank(System):
    def setup(self):
        self.add_inward('area', 1.0, desc='Cross-section area')
        self.add_inward('rho', 1e3, desc='Fluid density')

        self.add_input(FloatPort, 'flowrate')
        self.add_output(FloatPort, 'p_bottom')

        self.add_transient('height', der='flowrate.value / area')

    def compute(self):
        g = 9.81
        self.p_bottom.value = self.rho * g * self.height


class Pipe(System):
    """Laminar Poiseuille flow in a cylindrical pipe"""
    def setup(self):
        self.add_inward('D', 0.1, desc="Diameter")
        self.add_inward('L', 2.0, desc="Length")
        self.add_inward('mu', 1e-3, desc="Fluid dynamic viscosity")

        self.add_input(FloatPort, 'p1')
        self.add_input(FloatPort, 'p2')

        self.add_output(FloatPort, 'Q1')
        self.add_output(FloatPort, 'Q2')

        self.add_outward('k', desc='Pressure loss coefficient')

    def compute(self):
        """Computes the volumetric flowrate from the pressure drop"""
        self.k = np.pi * self.D**4 / (256 * self.mu * self.L)
        self.Q1.value = self.k * (self.p2.value - self.p1.value)
        self.Q2.value = -self.Q1.value


class CoupledTanks(System):
    """System describing two tanks connected by a pipe (viscous limit)"""
    def setup(self):
        self.add_child(Tank('tank1'), pulling='rho')
        self.add_child(Tank('tank2'), pulling='rho')
        self.add_child(Pipe('pipe'), pulling='mu')

        self.connect(self.tank1.p_bottom, self.pipe.p1)
        self.connect(self.tank2.p_bottom, self.pipe.p2)
        self.connect(self.tank1.flowrate, self.pipe.Q1)
        self.connect(self.tank2.flowrate, self.pipe.Q2)


In [None]:
from cosapp.drivers import NonLinearSolver, RunSingleCase, RungeKutta
from cosapp.recorders import DataFrameRecorder
from time_solutions import CoupledTanksSolution
import numpy as np

system = CoupledTanks('coupledTanks')
driver = system.add_driver(RungeKutta(time_interval=[0, 5], dt=0.1, order=3))

solver = driver.add_child(NonLinearSolver('solver', factor=1.0))

init = (3, 1)

driver.set_scenario(
    init = {
        # initial conditions:
        'tank1.height': init[0],
        'tank2.height': init[1],
    },
    values = {
        # boundary conditions:
        'rho': 921,
        'mu': 1e-3,
        'pipe.D': 0.07,
        'pipe.L': 2.5,
        'tank1.area': 2,
        'tank2.area': 0.8,
    }
)

driver.add_recorder(DataFrameRecorder(includes='tank?.height'), period=0.1)

system.run_drivers()

# Retrieve recorded data
data = driver.recorder.export_data()
time = np.asarray(data['time'])
solution = CoupledTanksSolution(system, init)

heights = {
    'num': np.asarray([data['tank1.height'], data['tank2.height']]),
    'exact': np.transpose([solution(t) for t in time])
}
error = np.abs(heights['num'] - heights['exact'])

print(
    f"dt = {driver.dt}; order = {driver.order}",
    f"Max error = {error.max():2e}",  # try different RK orders!
    sep="\n",
)

In [None]:
color = dict(tank1='blue', tank2='orange')
exact = heights['exact']

go.Figure(
    data = [
        go.Scatter(
            x = time,
            y = data['tank1.height'],
            mode = 'markers',
            marker_color = color['tank1'],
            name = 'tank1.height',
        ),
        go.Scatter(
            x = time,
            y = data['tank2.height'],
            mode = 'markers',
            marker_color = color['tank2'],
            name = 'tank2.height',
        ),
        go.Scatter(
            x = time,
            y = exact[0],
            line = dict(dash='solid', width=0.5, color=color['tank1']),
            name = 'Exact tank1',
        ),
        go.Scatter(
            x = time,
            y = exact[1],
            line = dict(dash='solid', width=0.5, color=color['tank2']),
            name = 'Exact tank2',
        ),
    ],
    layout = go.Layout(
        title = "Liquid height in tanks",
        xaxis = {'title': 'time'},
        yaxis = {'title': 'height'},
        hovermode = 'x',
    )
)

## Harmonic oscillator
This example illustrates the use of option `max_time_step` in `add_transient()`, indicating a maximum admissible time step for certain transient variables.

In [None]:
from cosapp.base import System
import numpy as np

class DampedMassSpring(System):
    def setup(self):
        self.add_inward('mass', 0.25)
        self.add_inward('length', 0.5, desc='Unloaded spring length')
        self.add_inward('c', 0., desc='Friction coefficient')
        self.add_inward('K', 0.1, desc='Spring stiffness')
        self.add_inward('x', 0.0, desc='Linear position')

        self.add_outward('F', 0.0, desc='Force')
        self.add_outward('a', 0.0, desc='Linear acceleration')

        self.add_transient('v', der='a')
        self.add_transient('x', der='v', max_time_step='0.05 * natural_period')

    @property
    def natural_period(self) -> float:
        """float: Natural oscillating period of the system"""
        return 2 * np.pi * np.sqrt(self.mass / self.K)

    def compute(self):
        self.F = self.K * (self.length - self.x) - self.c * self.v
        self.a = self.F / self.mass


In [None]:
from cosapp.drivers import RungeKutta
from cosapp.recorders import DataFrameRecorder
from time_solutions import HarmonicOscillatorSolution as ExactSolution
import numpy as np

spring = DampedMassSpring('spring')
driver = spring.add_driver(RungeKutta(time_interval=(0, 8), order=3))
# Time step does not need to be defined, it will be taken from the max time step
# defined in the DampedMassSpring system. But you can overwrite it.
driver.dt = 0.01

x0, v0 = 0.26, 0.0

driver.set_scenario(
    init = {'x': x0, 'v': v0},
    values = {
        'length': 0.2,
        'mass': 0.6,
        'c': 0.47,
        'K': 20,
    },
)

driver.add_recorder(
    DataFrameRecorder(includes=['x', 'v', 'a']),
    period=0.05,
)

spring.run_drivers()

solution = ExactSolution(spring, (x0, v0))

# Retrieve recorded data
data = driver.recorder.export_data()
time = np.asarray(data['time'])
x = {
    'num': np.asarray(data['x']),
    'exact': np.array([solution.x(t) for t in time]),
}

error = np.abs(x['num'] - x['exact'])

print(
    f"dt = {driver.dt}; order = {driver.order}",
    f"Natural period = {spring.natural_period:.3}",
    f"Damping factor = {solution.damping:.3}",
    f"Max error = {error.max():.2e}",
    sep="\n",
)
if solution.over_damped:
    print("Over-damped spring")

# Plot solution
import plotly.graph_objs as go

options = {
    'num': dict(mode='markers', name='numerical', line=dict(color='red')),
    'exact': dict(mode='lines', name='analytical', line=dict(color='blue')),
}

traces = [
    go.Scatter(x=time, y=position, **options[name])
    for name, position in x.items()
]
layout = go.Layout(
    xaxis = dict(title="Time"), 
    yaxis = dict(title="Position"),
    hovermode = "x",
)

go.Figure(data=traces, layout=layout)