# Automated Model Testing

This notebook contains a set of automated tests for the CCU model.  These tests are either pass or fail and no interpretation is needed. A summary of test results is provided at the end of the notebook.

**Additional testing:** Beyond the original test suite defined in our methodology, we've included extra tests to enhance validation and coverage, based on peer review feedback.

## Imports

In [1]:
import html
from IPython.display import HTML, display
import os
import sys
import pytest
import ipytest
from streamlit.testing.v1 import AppTest

ipytest.autoconfig()

## Model Code Imports

In [2]:
from ccu_formatted_code_stage2 import *

# For formatting the coverage report
module_path = os.path.abspath(os.path.join('..', '..', '..'))
if module_path not in sys.path:
    sys.path.insert(0, module_path)
from helper.format_coverage import display_coverage

## Tests

### Constants and utility functions

In [3]:
# The extreme value
M = 10_000_000

In [4]:
def run_test(experiment):
    # Create a SimPy environment
    env = simpy.Environment()
    ccu_model = CCU(env, experiment)
    try:
        results = ccu_model.run()
    except ZeroDivisionError:
        results = None
    
    return ccu_model, results

### Extreme value test: no elective arrivals

In [5]:
def test_extreme_value_no_electives(extreme_value=M):
    '''
    Extreme value test 1: 
    
    Wards, Em Surgery, other hospitals, x-ray, electives have their inter-arrival time
    set to $M$ a very large number.
    
    Expected result: 
    ----------------
    Quantitative: There are no elective cancellations (as no electives arrive)

    See also manual version:
    The only type of patient to arrive to the model is "Accident and Emergency."
    '''
    experiment = Experiment(accident_emergency_arrival_rate = extreme_value, wards_arrival_rate = extreme_value,
                            emergency_surgery_arrival_rate = extreme_value, other_hospitals_arrival_rate = extreme_value,
                            xray_department_arrival_rate = extreme_value, trace=True)
    model, results =  run_test(experiment)

    # test elective count should = 0
    assert results['Total Cancelled Elective Operations'].iloc[0] == 0

### Extreme value test: 1 critical care bed available

In [6]:
@pytest.mark.parametrize('random_number_set', [
                          (42),(101),(1),(2),(1234)
])
def test_extreme_value_1_bed(random_number_set):
    '''
    Extreme value test: Critical care beds set to 1 
    
    Expected result: 
    ---------------
    bed occupancy to be equal to bed utilization as only 1 bed is available
    
    Cancellations also begin after 1st arrival.
 
    Note: 
    ----
    [1] When critical_care_beds=1 queues form after first arrival.  This is
    assessed manually by viewing the trace in the manual trace notebook.

    [2] Simpy constraints force us to use 1 bed instead of 0.
    
    '''
    experiment = Experiment(num_critical_care_beds=1, 
                            random_number_set=random_number_set)
    model, results = run_test(experiment)
    assert results['Bed Utilization'].iloc[0] == results['Bed Occupancy'].iloc[0]

### Extreme value test: infinite critical care capacity.

In [7]:
@pytest.mark.parametrize('random_number_set', [
                          (42),(101),(1),(2),(1234)
])
def test_extreme_value_infinite_capacity(random_number_set):
    '''
    Extreme value test 3: 
    
    Critical care beds set to M a very large number.
    
    Expected result: 
    ---------------
    No cancelled electives and no unplanned patients have to wait.    
    '''
    experiment = Experiment(num_critical_care_beds=M, 
                            random_number_set=random_number_set)
    model, results = run_test(experiment)
    assert results['Total Cancelled Elective Operations'].iloc[0] == 0 and \
           results['Mean Unplanned Admission Waiting Time (hours)'].iloc[0] == 0

### Test warm-up period reset

In [8]:
@pytest.mark.parametrize('warm_up', [
                          (0),(1),(100),(570),(1000)
])
def test_warmup_period(warm_up):
    '''
    Vary the warm-up period while holding the 
    results collection period constant.

    Expected result
    ---------------
    Run length = warm_up + results collection period
    '''
    experiment = Experiment(warm_up_period=warm_up)
    model, results = run_test(experiment)
    assert model.env.now == (experiment.results_collection_period \
                             + experiment.warm_up_period)

In [9]:
@pytest.mark.parametrize('warm_up', [
                          (0),(1),(100),(570),(1000)
])
def test_warmup_reset(warm_up):
    '''
    Vary the warm-up period while holding the 
    results collection period constant.

    Expected result
    ---------------
    The following `experiment` variables are set to 0:
    
    total_treatment_time
    cancelled_elective_count
    mean_waiting_time_unplanned
    total_unplanned_admissions

    The following model variables is reset to 0 during 
    warm_up_complete event:
    
    patient_count

    Notes:
    ------
    [1] We must set results collectiom period to a small
    number. This allows for the warm_up_complete event
    to take place before simpy terminates the run.
    
    '''
    # allow for very small results collection period so
    # that warmup reset event occurs.
    experiment = Experiment(warm_up_period=warm_up,
                            results_collection_period=0.005)
    
    model, results = run_test(experiment)

    assert model.total_treatment_time == 0 and \
           model.cancelled_operations == 0 and \
           model.total_unplanned_waiting_time == 0 and \
           model.total_unplanned_admissions == 0 and \
           model.patient_id_counter == 0

### Test repeatable runs

In [10]:
@pytest.mark.parametrize('n_reps', [
                          (5),(10),(27)
])
def test_repeatable_reps(n_reps):
    """
    Test that random number streams are controlled and 
    multiple replications produce the same results each
    time they are run.

    Params:
    -------
    n_reps: int
        The number of replications to run.

    Expected results:
    -----------------
    The difference of two repeated runs is 0.
    """
    experiment = Experiment()
    replications = multiple_replications(experiment, n_reps)
    rs1 =  results_summary(replications)

    experiment = Experiment()
    replications = multiple_replications(experiment, n_reps)
    rs2 =  results_summary(replications)

    # sum all performance measures -> if no diff then = 0
    diff = (rs1 - rs2).sum(axis=1).sum()

    assert diff == 0.0

### Test tracing functionality

In [11]:
def test_trace(capsys):
    """
    Verify trace messages are printed when trace=True.
    """
    experiment = Experiment(trace=True)
    model, results = run_test(experiment)

    # Capture printed output
    captured = capsys.readouterr()
    
    # Check for some of the expected messages based on actual code
    assert "arrived from Accident and Emergency" in captured.out
    assert "arrived from the X-Ray Department" in captured.out

### Test batch experiment functionality

In [12]:
def test_get_experiments():
    """
    Check that the expected experiments are generated by get_experiments().
    """
    experiments = get_experiments()
    assert len(experiments) == 6
    assert all(f"Experiment with {i} beds" in experiments
               for i in range(23, 29))

In [13]:
def test_run_all_experiments():
    """
    Test running multiple experiments with run_all_experiments().
    """
    # Run two experiments
    mini_experiments = {
        "test_exp1": Experiment(num_critical_care_beds=25),
        "test_exp2": Experiment(num_critical_care_beds=30)
    }
    summaries = run_all_experiments(mini_experiments, num_replications=2)

    # Check that results for both are output
    assert "test_exp1" in summaries.keys()
    assert "test_exp2" in summaries.keys()
    assert len(summaries) == 2

    # Check that both results are DataFrames with correct shape (5 rows, 2 columns)
    assert isinstance(summaries["test_exp1"], pd.DataFrame)
    assert isinstance(summaries["test_exp2"], pd.DataFrame)
    assert summaries["test_exp1"].shape == (5, 2)
    assert summaries["test_exp2"].shape == (5, 2)

In [14]:
def test_summary_of_experiments():
    """
    Test that summary_of_experiments concatenates experiment summaries.
    """
    # Create two blank DataFrames with expected dimensions (5 rows, 2 columns)
    summary1 = pd.DataFrame(index=range(5), columns=['mean', 'std'])
    summary2 = pd.DataFrame(index=range(5), columns=['mean', 'std'])
    experiment_summaries = {
        "Experiment with 23 beds": summary1,
        "Experiment with 24 beds": summary2
    }
    
    # Run the function
    result = summary_of_experiments(experiment_summaries)
    
    # Verify the result
    assert isinstance(result, pd.DataFrame)
    assert result.shape == (5, 4)

### Test random number streams

In [15]:
def test_random_numbers():
    """
    Test that random number streams are properly initialised and reproducible.
    """
    # Check that the expected RNG attributes are created
    experiment = Experiment(random_number_set=12345)
    rng_attributes = [
        'rng_accident_emergency', 'rng_wards', 'rng_emergency_surgery',
        'rng_other_hospitals', 'rng_xray_department', 'rng_elective_surgery',
        'rng_unplanned_treatment', 'rng_elective_treatment'
    ]
    for attr in rng_attributes:
        assert hasattr(experiment, attr)
        assert hasattr(getattr(experiment, attr), 'random')

    # Test reproducibility: same seed should produce same results
    experiment1 = Experiment(random_number_set=12345)
    experiment2 = Experiment(random_number_set=12345)
    values1 = [experiment1.rng_accident_emergency.random() for _ in range(5)]
    values2 = [experiment2.rng_accident_emergency.random() for _ in range(5)]
    assert values1 == values2

    # Test that different seeds produce different results
    experiment3 = Experiment(random_number_set=54321)
    values3 = [experiment3.rng_accident_emergency.random() for _ in range(5)]
    assert values1 != values3

### Test streamlit app

In [16]:
def test_streamlit_app_loads():
    """
    Verify that the streamlit app loads without errors.
    """
    # Run the app
    at = AppTest.from_file("ccu_formatted_code_stage2.py")
    at.run()
    
    # Expected title
    exp_title = "A simulation model of bed-occupancy in a critical care unit"

    # Check that main elements are present
    assert not at.exception
    assert exp_title in str(at.title.values)
    assert len(at.slider) == 1  # Number of beds slider
    assert len(at.number_input) == 2  # Cleaning duration and replications
    assert len(at.checkbox) == 1  # Trace checkbox
    assert len(at.button) == 1  # Simulate button

In [17]:
def test_streamlit_simulate_button():
    """
    Check that clicking the simulate button runs the simulation.
    """
    # Run the app
    at = AppTest.from_file("ccu_formatted_code_stage2.py")
    at.run()

    # Click simulate button
    at.button[0].click().run()

    # Check that no exception occurred
    assert not at.exception
    
    # Check that dataframe results are displayed
    assert len(at.dataframe) > 0
    
    # Get the results dataframe and check it contains expected columns
    results_df = at.dataframe[0].value
    expected_rows = [
        'Total Cancelled Elective Operations', 
        'Mean Unplanned Admission Waiting Time (hours)',
        'Bed Utilization', 
        'Bed Occupancy', 
        'Patient Count'
    ]
    for row in expected_rows:
        assert row in results_df.index

## Run tests

In [18]:
ipytest.run(
    "-vv", "--no-header", "--cov=ccu_formatted_code_stage2",
    "--cov-report=term", "--cov-report=annotate"
)

[1mcollecting ... [0mcollected 31 items

t_3d1bd27415144168954a94b1b41644b5.py::test_extreme_value_no_electives [32mPASSED[0m[32m                [  3%][0m
t_3d1bd27415144168954a94b1b41644b5.py::test_extreme_value_1_bed[42] [32mPASSED[0m[32m                   [  6%][0m
t_3d1bd27415144168954a94b1b41644b5.py::test_extreme_value_1_bed[101] [32mPASSED[0m[32m                  [  9%][0m
t_3d1bd27415144168954a94b1b41644b5.py::test_extreme_value_1_bed[1] [32mPASSED[0m[32m                    [ 12%][0m
t_3d1bd27415144168954a94b1b41644b5.py::test_extreme_value_1_bed[2] [32mPASSED[0m[32m                    [ 16%][0m
t_3d1bd27415144168954a94b1b41644b5.py::test_extreme_value_1_bed[1234] [32mPASSED[0m[32m                 [ 19%][0m
t_3d1bd27415144168954a94b1b41644b5.py::test_extreme_value_infinite_capacity[42] [32mPASSED[0m[32m       [ 22%][0m
t_3d1bd27415144168954a94b1b41644b5.py::test_extreme_value_infinite_capacity[101] [32mPASSED[0m[32m      [ 25%][0m
t_3d1bd274151

<ExitCode.OK: 0>

## Test coverage report

The percentage coverage is reported in the cell above. Below is the annotated report for the model. Grey lines (" ") are excluded, green lines (">") are covered, and red lines ("!") are not covered.

In [19]:
display_coverage('ccu_formatted_code_stage2.py,cover')