In [1]:
import os
import random
from typing import Callable
from edgedroid.models.sampling import *
from edgedroid.models.timings import *

rtts = np.linspace(0, 5, 10)[1:]

runs_per_model = 30
task_steps = 100

timing_models: Dict[str, Callable[[], ExecutionTimeModel]] = {
    "empirical-low": lambda: EmpiricalExecutionTimeModel.from_default_data(neuroticism=0.0),
    "empirical-high": lambda: EmpiricalExecutionTimeModel.from_default_data(neuroticism=1.0),
    "theoretical-low": lambda: TheoreticalExecutionTimeModel.from_default_data(neuroticism=0.0),
    "theoretical-high": lambda: TheoreticalExecutionTimeModel.from_default_data(neuroticism=0.0),
    # "constant": lambda: ConstantExecutionTimeModel.from_default_data(),
    # "naive": lambda: NaiveExecutionTimeModel.from_default_data(),
    "fitted-naive": lambda: FittedNaiveExecutionTimeModel.from_default_data(),
}

In [2]:
from edgedroid.models.sampling.adaptive import _aperiodic_instant_iterator
from typing import NamedTuple

class SamplingResult(NamedTuple):
    final_sampling_instant: float
    num_samples: int

class ExpSampling(BaseFrameSamplingModel, abc.ABC):
    @abc.abstractmethod
    def constant_rtt_sampling(self, rtt: float, prev_ttf: float, target_exec_time: float) -> SamplingResult:
        pass


class GreedySampling(ZeroWaitFrameSamplingModel, ExpSampling):
    def constant_rtt_sampling(self, rtt: float, prev_ttf: float, target_exec_time: float) -> SamplingResult:
        num_samples = 1
        instant = 0.0

        while instant <= target_exec_time:
            instant += rtt
            num_samples += 1

        return SamplingResult(instant, num_samples)

class IdealSampling(IdealFrameSamplingModel, ExpSampling):
    def constant_rtt_sampling(self, rtt: float, prev_ttf: float, target_exec_time: float) -> SamplingResult:
        return SamplingResult(target_exec_time, 1)

class PeriodicSampling(RegularFrameSamplingModel, ExpSampling):
    def constant_rtt_sampling(self, rtt: float, prev_ttf: float, target_exec_time: float) -> SamplingResult:
        num_samples = 1
        instant = self._interval

        interval = max(self._interval, rtt)

        while instant <= target_exec_time:
            instant += interval
            num_samples += 1

        return SamplingResult(instant, num_samples)

class HoldSampling(HoldFrameSamplingModel, ExpSampling):
    def constant_rtt_sampling(self, rtt: float, prev_ttf: float, target_exec_time: float) -> SamplingResult:
        num_samples = 1
        instant = self._hold_time

        while instant <= target_exec_time:
            instant += rtt
            num_samples += 1

        return SamplingResult(instant, num_samples)

class AdaptiveSampling(AperiodicFrameSamplingModel, ExpSampling):
    def constant_rtt_sampling(self, rtt: float, prev_ttf: float, target_exec_time: float) -> SamplingResult:
        self._timing_model.advance(prev_ttf)
        alpha = float(np.mean(self._delay_costs))

        num_samples = 0
        prev_instant = 0.0
        for instant in _aperiodic_instant_iterator(
                mu=self._timing_model.get_expected_execution_time(),
                alpha=alpha,
                beta=self._beta,
        ):
            num_samples += 1
            instant = max(instant, prev_instant + rtt)

            if instant > target_exec_time:
                self._delay_costs.append(
                    max(rtt - self._processing_time, 0.0) / num_samples
                )
                return SamplingResult(instant, num_samples)
            else:
                prev_instant = instant

sampling_schemes: Dict[str, Callable[[], ExpSampling]] = {
    "greedy": lambda : GreedySampling.from_default_data(),
    "ideal": lambda : IdealSampling.from_default_data(),
    "adaptive-empirical": lambda : AdaptiveSampling.from_default_data(EmpiricalExecutionTimeModel.from_default_data(neuroticism=None)),
    "adaptive-theoretical": lambda : AdaptiveSampling.from_default_data(TheoreticalExecutionTimeModel.from_default_data(neuroticism=None)),
    "adaptive-fitted-naive": lambda : AdaptiveSampling.from_default_data(FittedNaiveExecutionTimeModel.from_default_data())
}

sampling_schemes.update({
    f"periodic-{t:0.2f}": lambda : PeriodicSampling.from_default_data(sampling_interval_seconds=t) for t in (1.0, 2.0, 3.0)
})
sampling_schemes.update({
    f"hold-{t:0.2f}": lambda : HoldSampling.from_default_data(hold_time_seconds=t) for t in (1.0, 2.0, 3.0)
})

In [3]:
def run_combination(timing: str, sampling: str, rtt: float, repetition: int) -> pd.DataFrame:
    timing_model = timing_models[timing]()
    sampling_model = sampling_schemes[sampling]()
    prev_ttf = rtt
    cumulative_duration = 0.0
    cumulative_samples = 0

    rows = deque()

    for step in range(1, task_steps + 1):
        exec_time = timing_model.advance(prev_ttf).get_execution_time()
        final_sample, num_samples = sampling_model.constant_rtt_sampling(rtt, prev_ttf, exec_time)

        duration = final_sample + rtt
        cumulative_duration += duration
        cumulative_samples += num_samples
        ttf = duration - exec_time
        wait_time = ttf - rtt

        rows.append(
            {
                "timing_model": timing,
                "sampling_scheme": sampling,
                "rtt": rtt,
                "step": step,
                "previous_ttf": prev_ttf,
                "execution_time": exec_time,
                "step_duration": duration,
                "ttf": ttf,
                "wait_time": wait_time,
                "samples": num_samples,
                "cumulative_duration": cumulative_duration,
                "cumulative_samples": cumulative_samples,
                "repetition": repetition,
            }
        )
        prev_ttf = ttf

    return pd.DataFrame(rows)

In [4]:
from typing import Tuple
from tqdm.notebook import tqdm
import multiprocess as mp

combs = list(itertools.product(timing_models.keys(), sampling_schemes.keys(), rtts, range(1, runs_per_model + 1)))
random.shuffle(combs)

# combs = (("empirical-high", "greedy", 0),)

with tqdm(total=len(combs), bar_format="{l_bar}{bar}{n_fmt}/{total_fmt} [Time: {elapsed}]") as bar, mp.Pool(os.cpu_count() - 2) as pool:
    bar.set_description("Running timing model/sampling scheme combinations...")

    def make_callback(cmb: Tuple[str, str, float]) -> Callable[[...], None]:
        def _cb(*args, **kwargs) -> None:
            bar.update()
            bar.set_description(f"Running timing model/sampling scheme combinations...")

        return _cb

    procs = [
        pool.apply_async(run_combination, args=c, callback=make_callback(c)) for c in combs
    ]

    results = [p.get() for p in procs]

results = pd.concat(results, ignore_index=True)
results.to_parquet("./sampling_results.parquet")
results

  0%|          0/14850 [Time: 00:00]

Unnamed: 0,timing_model,sampling_scheme,rtt,step,previous_ttf,execution_time,step_duration,ttf,wait_time,samples,cumulative_duration,cumulative_samples,repetition
0,empirical-high,periodic-2.00,5.000000,1,5.000000,5.052651,13.000000,7.947349,2.947349,2,13.000000,2,5
1,empirical-high,periodic-2.00,5.000000,2,7.947349,2.867882,8.000000,5.132118,0.132118,1,21.000000,3,5
2,empirical-high,periodic-2.00,5.000000,3,5.132118,5.103839,13.000000,7.896161,2.896161,2,34.000000,5,5
3,empirical-high,periodic-2.00,5.000000,4,7.896161,4.376181,13.000000,8.623819,3.623819,2,47.000000,7,5
4,empirical-high,periodic-2.00,5.000000,5,8.623819,7.625668,13.000000,5.374332,0.374332,2,60.000000,9,5
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1484995,theoretical-low,hold-2.00,0.555556,96,0.978123,5.593978,6.333333,0.739355,0.183800,6,555.777778,482,28
1484996,theoretical-low,hold-2.00,0.555556,97,0.739355,2.989867,3.555556,0.565688,0.010133,1,559.333333,483,28
1484997,theoretical-low,hold-2.00,0.555556,98,0.565688,3.093104,4.111111,1.018007,0.462452,2,563.444444,485,28
1484998,theoretical-low,hold-2.00,0.555556,99,1.018007,2.768672,3.555556,0.786884,0.231328,1,567.000000,486,28
