In [1]:
import subprocess
import shlex
import re
import json
import pyinotify

import numpy as np
import multiprocessing as mp

from pathlib import Path
from scipy import stats
from tqdm import tqdm
from IPython.display import display, Markdown
from tempfile import TemporaryDirectory

In [2]:
EXECUTION_PATTERN = re.compile(r"The simulation took:\n - initialisation: ([^\n]+)\n - execution: ([^\n]+)\n - cleanup: ([^\n]+)\n")

In [3]:
TIME_PATTERN = re.compile(r"(\d+(?:\.\d+)?)([^\d]+)")
TIME_UNITS = {
    "ns": 0.000000001,
    "µs": 0.000001,
    "ms": 0.001,
    "s": 1.0,
}

def parse_time(time_str):
    match = TIME_PATTERN.match(time_str)
    
    if match is None:
        return None
    
    return float(match.group(1)) * TIME_UNITS[match.group(2)]

In [4]:
target_directory = json.loads(subprocess.run("cargo metadata --format-version 1".split(), capture_output=True).stdout)["target_directory"]

In [5]:
class EventLogSink(pyinotify.ProcessEvent):
    def __init__(self, event_log):
        Path(event_log).mkdir(parents=True, exist_ok=True)
        
        self.log_size = 0
        
        self.wm = pyinotify.WatchManager()
        self.notifier = pyinotify.ThreadedNotifier(self.wm, self)
        
        self.notifier.start()
        
        self.wm.add_watch(event_log, pyinotify.IN_CREATE | pyinotify.IN_CLOSE_WRITE, rec=True)
        
    def stop(self):
        self.notifier.stop()
        
        return self.log_size
    
    def process_IN_CREATE(self, event):
        self.wm.add_watch(event.pathname, pyinotify.IN_CREATE | pyinotify.IN_CLOSE_WRITE)
    
    def process_IN_CLOSE_WRITE(self, event):
        path = Path(event.pathname)
        
        if path.exists():
            self.log_size += path.stat().st_size
            
            path.unlink()

In [6]:
def simulate_reporting_dynamic(
    algorithm, speciation=0.001, seed=42, sample=1.0, radius=564, sigma=10.0, report_speciation=False, report_dispersal=False, log=False
):
    if report_dispersal:
        reporters = ["Execution()", "Biodiversity()", "Counter()"]
    elif report_speciation:
        reporters = ["Execution()", "Biodiversity()"]
    else:
        reporters = ["Execution()"]
        
    with TemporaryDirectory() as log_path:
        event_log = EventLogSink(log_path)
        
        config = "".join(f"""
        (
            speciation: {speciation},
            sample: {sample},
            seed: {seed},

            algorithm: {algorithm},

            scenario: AlmostInfinite(
                radius: {radius},
                sigma: {sigma},
            ),
            
            log: {f'"{log_path}"' if log else 'None'},

            reporters: [
                Plugin(
                    library: "{target_directory}/release/deps/libnecsim_plugins_common.so",
                    reporters: [{', '.join(reporters)}]
                )
            ],
        )
        """.split()).replace(",)", ")").replace(",]", "]")

        # Run the simulation
        result = subprocess.run(shlex.split(
            f"{target_directory}/release/rustcoalescence simulate '{config}'"
        ), check=True, capture_output=True, text=True)
        
        event_log_size = event_log.stop()
    
    match = EXECUTION_PATTERN.search(result.stdout)
    if match is None:
        print(result.stdout)
        print(result.stderr)
    initialisation = parse_time(match.group(1))
    execution = parse_time(match.group(2))
    cleanup = parse_time(match.group(3))
        
    return initialisation, execution, cleanup, event_log_size

In [7]:
def simulate_reporting_compiled(speciation=0.001, seed=42, sample=1.0, radius=564, sigma=10.0, report_speciation=False, report_dispersal=False):
    if report_dispersal:
        reporting = "progress-speciation-dispersal"
    elif report_speciation:
        reporting = "progress-speciation"
    else:
        reporting = "progress-only"
        
    # Run the simulation
    result = subprocess.run(shlex.split(
        f"{target_directory}/release/analysis-performance-reporting --radius {radius} " +
        f"--sample {sample} --seed {seed} --sigma {sigma} --speciation {speciation} {reporting}"
    ), check=True, capture_output=True, text=True)
    
    match = EXECUTION_PATTERN.search(result.stdout)
    if match is None:
        print(result.stdout)
        print(result.stderr)
    initialisation = parse_time(match.group(1))
    execution = parse_time(match.group(2))
    cleanup = parse_time(match.group(3))
        
    return initialisation, execution, cleanup

In [8]:
def batch_simulation_many_seeds(simulate, seeds, args=tuple(), kwargs=dict(), silent=False, processes=mp.cpu_count()):
    results = []

    with tqdm(total=len(seeds), disable=silent) as progress:
        def update_progress(result):
            results.append(result)

            progress.update()
        
        def update_error(err):
            print(err)

        with mp.Pool(processes) as pool:
            for seed in seeds:
                pool.apply_async(simulate, args, {**kwargs, "seed": seed}, update_progress, update_error)

            pool.close()
            pool.join()
    
    return results

In [9]:
def format_bytes(b):
    if b < 1e3:
        return f"{int(b)}B"
    elif b < 1e6:
        return f"{np.round(b / 1e3, 2)}kB"
    elif b < 1e9:
        return f"{np.round(b / 1e6, 2)}MB"
    elif b < 1e12:
        return f"{np.round(b / 1e9, 2)}GB"
    else:
        return f"{np.round(b / 1e12, 2)}TB"

In [10]:
subprocess.run(shlex.split(
    f"cargo build --manifest-path {target_directory}/../rustcoalescence/Cargo.toml --release "
    + "--features rustcoalescence-algorithms-monolithic,rustcoalescence-algorithms-independent,"
    + "rustcoalescence-algorithms-cuda,necsim-partitioning-mpi"
), check=True, capture_output=True, text=True);

subprocess.run(shlex.split(
    "cargo build --release"
), check=True, capture_output=True, text=True);

CompletedProcess(args=['cargo', 'build', '--release'], returncode=0, stdout='', stderr='    Finished release [optimized] target(s) in 0.12s\n')

In [11]:
display(Markdown("# Reporting Performance:"))

seeds = np.random.randint(0, np.iinfo("uint64").max, dtype="uint64", size=160)

for report_speciation, report_dispersal, title in [
    (False, False, "progress only"), (True, False, "progress + speciation"), (True, True, "progress + speciation + dispersal")
]:
    display(Markdown(f"## {title}:"))
    
    initialisations, executions, cleanups, event_log_sizes = zip(*batch_simulation_many_seeds(
        simulate_reporting_dynamic, seeds, args=("Classical()",), kwargs=dict(
            speciation=0.001, sample=1.0, radius=178, sigma=0.0, report_speciation=report_speciation, report_dispersal=report_dispersal
        ), silent=False
    ))
    display(Markdown(f"* Dynamic Plugins (CPU): {np.round(np.mean(executions), 2)}s ± {np.round(np.std(executions), 2)}s"))

    initialisations, executions, cleanups = zip(*batch_simulation_many_seeds(
        simulate_reporting_compiled, seeds, args=(), kwargs=dict(
            speciation=0.001, sample=1.0, radius=178, sigma=0.0, report_speciation=report_speciation, report_dispersal=report_dispersal
        ), silent=False
    ))
    display(Markdown(f"* Compiled Analysis (CPU): {np.round(np.mean(executions), 2)}s ± {np.round(np.std(executions), 2)}s"))

    initialisations, executions, cleanups, event_log_sizes = zip(*batch_simulation_many_seeds(
        simulate_reporting_dynamic, seeds, args=("Classical()",), kwargs=dict(
            speciation=0.001, sample=1.0, radius=178, sigma=0.0, report_speciation=report_speciation, report_dispersal=report_dispersal, log=True
        ), silent=False
    ))
    display(Markdown(f"* Dynamic Plugins + Log (CPU): {np.round(np.mean(executions), 2)}s ± {np.round(np.std(executions), 2)}s [{format_bytes(np.mean(event_log_sizes))} ± {format_bytes(np.std(event_log_sizes))}]"))
    
    initialisations, executions, cleanups, event_log_sizes = zip(*batch_simulation_many_seeds(
        simulate_reporting_dynamic, seeds, args=("CUDA(ptx_jit:true, parallelism_mode:Monolithic(event_slice:10000000))",), kwargs=dict(
            speciation=0.001, sample=1.0, radius=178, sigma=0.0, report_speciation=report_speciation, report_dispersal=report_dispersal
        ), silent=False, processes=1
    ))
    display(Markdown(f"* Dynamic Plugins (GPU): {np.round(np.mean(executions), 2)}s ± {np.round(np.std(executions), 2)}s"))

# Reporting Performance:

## progress only:

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [04:49<00:00,  1.81s/it]


* Dynamic Plugins (CPU): 70.73s ± 0.69s

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [04:35<00:00,  1.72s/it]


* Compiled Analysis (CPU): 68.19s ± 0.67s

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [04:49<00:00,  1.81s/it]


* Dynamic Plugins + Log (CPU): 70.93s ± 0.61s [0B ± 0B]

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [10:18<00:00,  3.87s/it]


* Dynamic Plugins (GPU): 3.46s ± 0.02s

## progress + speciation:

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [04:48<00:00,  1.80s/it]


* Dynamic Plugins (CPU): 70.78s ± 0.56s

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [04:36<00:00,  1.73s/it]


* Compiled Analysis (CPU): 68.18s ± 0.99s

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [04:51<00:00,  1.82s/it]


* Dynamic Plugins + Log (CPU): 70.52s ± 0.76s [3.98MB ± 0B]

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [10:24<00:00,  3.90s/it]


* Dynamic Plugins (GPU): 3.49s ± 0.02s

## progress + speciation + dispersal:

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [05:08<00:00,  1.93s/it]


* Dynamic Plugins (CPU): 74.86s ± 1.19s

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [04:43<00:00,  1.77s/it]


* Compiled Analysis (CPU): 70.38s ± 0.53s

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [07:35<00:00,  2.85s/it]


* Dynamic Plugins + Log (CPU): 112.61s ± 1.19s [5.57GB ± 17.07MB]

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160/160 [1:35:09<00:00, 35.69s/it]


* Dynamic Plugins (GPU): 35.27s ± 0.47s