## Coding exercises

In [None]:
from typing import Generator, Literal, cast

import matplotlib.pyplot as plt
import numpy as np
import simpy
from matplotlib.axes import Axes
from matplotlib.figure import Figure

RNG = np.random.default_rng(seed=0)

For all exercises below, consider the following open network, along with its default arguments/parameters:

In [None]:
class OpenNet:
    def __init__(self, lambda_i=0.1, lambda_cpu=1.0, lambda_disk=0.2) -> None:
        self.env = simpy.Environment()
        self.beta_i = 1. / lambda_i
        self.beta_cpu = 1. / lambda_cpu
        self.beta_disk = 1. / lambda_disk
        self.cpu = simpy.Resource(self.env, capacity=1)
        self.disk = simpy.Resource(self.env, capacity=1)

        self.current_jobs_in_system = 0
        self.log_jobs_in_system: list[int] = []

        self.e_t_acc = 0.
        self.n_served_jobs = 0
        self.rho_cpu_acc = 0
        self.rho_disk_acc = 0
        self.rho_sys_acc = 0

    def system_generator(self) -> Generator:
        job_start_time = self.env.now
        rho_cpu = self.cpu.count / self.cpu.capacity
        rho_disk = self.disk.count / self.disk.capacity
        self.rho_cpu_acc += rho_cpu
        self.rho_disk_acc += rho_disk
        self.rho_sys_acc += rho_cpu or rho_disk

        self.log_jobs_in_system.append(self.current_jobs_in_system)
        self.current_jobs_in_system += 1

        with self.cpu.request() as request:
            yield request
            yield self.env.timeout(RNG.exponential(scale=self.beta_cpu))

        with self.disk.request() as request:
            yield request
            yield self.env.timeout(RNG.exponential(scale=self.beta_disk))

        self.current_jobs_in_system -= 1

        self.e_t_acc += self.env.now - job_start_time
        self.n_served_jobs += 1

    def job_generator(self) -> Generator:
        while True:
            yield self.env.timeout(RNG.exponential(self.beta_i))
            self.env.process(self.system_generator())

    def run(self, t: float) -> "OpenNet":
        self.env.process(self.job_generator())
        self.env.run(until=t)

        return self

    def expected_response_time(self) -> float:
        return self.e_t_acc / self.n_served_jobs

    def expected_jobs_in_system(self) -> float:
        return sum(self.log_jobs_in_system) / len(self.log_jobs_in_system)

    def utilization(self, component: Literal["cpu", "disk", "sys"]) -> float:
        rho_acc = {
            "cpu": self.rho_cpu_acc,
            "disk": self.rho_disk_acc,
            "sys": self.rho_sys_acc
        }[component]

        return rho_acc / len(self.log_jobs_in_system)

    def system_throughput(self) -> float:
        return self.n_served_jobs / self.env.now

### 1. Transient removal: Batch means

Use batch means method to analyze the transient period of the given network.

The function template below takes as input arguments:

- `sim_length` - the number of time units for which to run the simulation
- `max_bs` - the maximum batch size to consider

Fill in the contents to return a list with the variance of the number of jobs in the system for all batch sizes from 2, as the first index, up to `max_bs` (included).

In [None]:
def transient_batch_means(sim_length=100000, max_bs=400) -> list[float]:
    variances: list[float] = []
    log_jobs_in_system = OpenNet().run(sim_length).log_jobs_in_system

    for n in range(2, max_bs + 1):
        m = len(log_jobs_in_system) // n
        batch_observations = np.array_split(log_jobs_in_system[:n * m], m)
        batch_means = np.array([np.mean(bo) for bo in batch_observations])
        var = batch_means.var(ddof=1)
        variances.append(var)

    return variances

The code below uses the `plot_batch_means` function to visualize the relation between batch size and variance.

Inspect the output figure to check for a suitable transient period cutoff.

In [None]:
def plot_batch_means(variances_n: list[float]) -> Figure:
    ax: Axes
    fig, ax = plt.subplots()
    ax.set_xlabel("Batch size n")
    ax.set_ylabel("Variance # jobs in system")
    ax.plot(np.arange(len(variances_n)) + 2, variances_n)
    plt.close(fig)

    return fig

variances_n = transient_batch_means()
plot_batch_means(variances_n)

### 2. Simulation stopping: Batch means

Use the same batch means method to analyze the stopping period of the given network.

The function template below takes as input arguments:

- `sim_length` - the number of time units for which to run the simulation
- `transient_length` - the number of jobs to consider as part of the transient period
- `max_bs_exp` - maximum exponent to consider for the batch size.

Consider only the batch sizes $2^0, 2^1, ..., 2^{max\_bs}$.

Remember to discard the first `transient_length` jobs before calculating any statistics.

The autocovariance formula, if considering that the first observation has index $i=1$, is:

$$\frac{1}{m-2} \sum_{i=1}^{m-1} (\bar{x}_i - \bar{\bar{x}}) (\bar{x}_{i+1} - \bar{\bar{x}})$$

where:

- $\bar{x}_i$ - mean of batch $i$
- $\bar{\bar{x}}$ - mean of all batches ($\bar{x}_1$, $\bar{x}_2$, ..., $\bar{x}_m$)

Fill in the contents to return a list with tuples, one for each outlined batch size, containing the autocovariance and variance of the number of jobs in the system.

In [None]:
def stopping_batch_means(
        sim_length=400000, transient_length=50, max_bs_exp=10) -> list[tuple[float, float]]:
    autocovariances_variances: list[tuple[float, float]] = []
    log_jobs_in_system = OpenNet().run(sim_length).log_jobs_in_system[transient_length:]

    for n_pow in range(max_bs_exp + 1):
        n = 2 ** n_pow
        m = len(log_jobs_in_system) // n
        batch_observations = np.array_split(log_jobs_in_system[:n * m], m)
        batch_means = np.array([np.mean(bo) for bo in batch_observations])
        var = batch_means.var(ddof=1)
        batch_means_minus_mean = batch_means - np.mean(batch_means)
        autocovar = (batch_means_minus_mean[:-1] * batch_means_minus_mean[1:]).sum() \
            / (m - 2)
        autocovariances_variances.append((autocovar, var))

    return autocovariances_variances

The code below uses the `stopping_batch_means` function to print the relation between batch size and (autoco)variance.

Inspect the output text to check for a suitable stopping period.

In [None]:
autocovariances_covariances_n_exp = stopping_batch_means()

for i, (autocovar, var) in enumerate(autocovariances_covariances_n_exp):
    print(
        f"Batch size {2 ** i}",
        f"Autocovariance {autocovar:.4f}",
        f"Variance {var:.4f}", sep="\t")

### 3. Operational laws

Reuse the statistics from the network simulation below for all laws below.

Given the relatively short warmup period and the lengthy simulation for a relatively simple network, you can leave in the transient for the following exercises.

In [None]:
net = OpenNet().run(1000000)

#### Little's law

Check that the empirical estimate for the expected number of jobs in the system $E[N]$ closely matches the one given by Little's law for open systems ($E[N] = \lambda E[T]$).

Use the empirical estimate for a job's time in the system as $E[T]$.

In [None]:
e_n_sim = net.expected_jobs_in_system()
e_t_sim = net.expected_response_time()
e_n_law = (1 / net.beta_i) * e_t_sim

print(f"Sim: {e_n_sim:.4f}")
print(f"Law: {e_n_law:.4f}")

#### Utilization law

Check that the empirical estimate for CPU utilization $\rho_{CPU}$ closely matches the one given by the utilization law ($\rho_{CPU}=\frac{\lambda_{CPU}}{\mu_{CPU}}$), where $\lambda_{CPU} = \lambda_i$.

In [None]:
rho_cpu_sim = net.utilization("cpu")
rho_cpu_law = (1 / net.beta_i) * net.beta_cpu

print(f"Sim: {rho_cpu_sim:.4f}")
print(f"Law: {rho_cpu_law:.4f}")

#### Bottleneck law

Check the empirical estimation for the utilization of the whole system $\rho$ and the corresponding throughput $X$.

Compare the change in the two metrics when rerunning the simulation with a slower CPU having $\lambda_{CPU} = 0.25$, or disk $\lambda_{disk}=0.05$.

In [None]:
rho_sys = net.utilization("sys")
x_sys = net.system_throughput()

print(f"Initial system utilization: {rho_sys:.4f}")
print(f"Initial system throughput: {x_sys:.4f}")

##### Slower CPU

In [None]:
net_slower_cpu = OpenNet(lambda_cpu=0.25).run(1000000)
rho_sys_slower_cpu = net_slower_cpu.utilization("sys")
x_slower_cpu = net_slower_cpu.system_throughput()

print(f"System utilization with a slower CPU: {rho_sys_slower_cpu:.4f}")
print(f"System throughput with a slower CPU: {x_slower_cpu:.4f}")

##### Slower Disk

In [None]:
net_slower_disk = OpenNet(lambda_disk=0.02).run(1000000)
rho_sys_slower_disk = net_slower_disk.utilization("sys")
x_slower_disk = net_slower_disk.system_throughput()

print(f"System utilization with a slower disk: {rho_sys_slower_disk:.4f}")
print(f"System throughput with a slower disk: {x_slower_disk:.4f}")

Create a single plot with the utilization and throughput for the default $\lambda_{CPU}$, and the following $\lambda_{disk}$ values: $0.0125, 0.025, 0.05, 0.1, 0.2, 0.4$.

You may run simulations for shorter to reduce the total computation time required.

Check how the rate of change for the two quantities behaves over the additional data points.

In [None]:
lambda_disk_list = [0.0125, 0.025, 0.05, 0.1, 0.2, 0.4]
sys_utilization_list: list[float] = []
sys_throughput_list: list[float] = []

for lambda_disk in lambda_disk_list:
    net_ = OpenNet(lambda_disk=lambda_disk).run(100000)
    sys_utilization_list.append(net_.utilization("sys"))
    sys_throughput_list.append(net_.system_throughput())

ax: Axes
fig, ax = plt.subplots()
ax.set_xlabel("Lambda disk")
color = "red"
ax.set_ylabel("Utilization", color=color)
ax.plot(lambda_disk_list, sys_utilization_list, color=color)
ax.tick_params(axis='y', labelcolor=color)
ax_twin = cast(Axes, ax.twinx())
color = "blue"
ax_twin.set_ylabel("throughput", color=color)
ax_twin.plot(lambda_disk_list, sys_throughput_list, color=color)
ax_twin.tick_params(axis='y', labelcolor=color)