In [1]:
import os
from typing import Callable

import pandas as pd

from edgedroid.models.sampling import *
from edgedroid.models.timings import *

rtts = np.linspace(0, 5, 11)[1:]

runs_per_model = 60
task_steps = 120

timing_models: Dict[str, Callable[[], ExecutionTimeModel]] = {
    # "empirical-low": lambda: EmpiricalExecutionTimeModel.from_default_data(neuroticism=0.0),
    # "empirical-high": lambda: EmpiricalExecutionTimeModel.from_default_data(neuroticism=1.0),
    # "theoretical-low": lambda: TheoreticalExecutionTimeModel.from_default_data(neuroticism=0.0),
    # "theoretical-high": lambda: TheoreticalExecutionTimeModel.from_default_data(neuroticism=1.0),
    # "constant": lambda: ConstantExecutionTimeModel.from_default_data(),
    # "naive": lambda: NaiveExecutionTimeModel.from_default_data(),
    # "fitted-naive": lambda: FittedNaiveExecutionTimeModel.from_default_data(),
    "rolling-ttf-high": lambda: ExpKernelRollingTTFETModel(neuroticism=1.0)
}

In [2]:
import edgedroid.data as e_data

data, *_ = e_data.load_default_exec_time_data()
data

Unnamed: 0,run_id,ttf,exec_time,neuroticism
0,134146,0.597441,3.654797,0.375
1,134146,0.553513,4.438645,0.375
2,134146,0.561716,2.943222,0.375
3,134146,0.586512,5.405761,0.375
4,134146,0.558940,5.225161,0.375
...,...,...,...,...
6755,137353,0.557074,6.439071,0.625
6756,137353,0.534339,4.680858,0.625
6757,137353,0.560288,3.467878,0.625
6758,137353,0.579000,2.325759,0.625


In [3]:
exec_time_qs = np.round(data["exec_time"].describe(percentiles=[0.25, 0.5, 0.75])[["25%", "50%", "75%"]], decimals=1)
exec_time_qs

25%    3.9
50%    5.2
75%    7.0
Name: exec_time, dtype: float64

In [4]:
from edgedroid.models.sampling.adaptive import _aperiodic_instant_iterator
from typing import NamedTuple
import scipy.stats as stats


power_mw = {
    # "comm": 0.045,
    "comm": 0.045,
    "idle": 0.015
}  # Watts

class SamplingResult(NamedTuple):
    duration: float
    wait_time: float
    ttf: float
    num_samples: int


class ExpSampling(BaseFrameSamplingModel, abc.ABC):
    @abc.abstractmethod
    def constant_rtt_sampling(self, rtt: float, proc_time: float, prev_ttf: float,
                              target_exec_time: float) -> SamplingResult:
        pass


class GreedySampling(ZeroWaitFrameSamplingModel, ExpSampling):
    def constant_rtt_sampling(self, rtt: float, proc_time: float, prev_ttf: float,
                              target_exec_time: float) -> SamplingResult:
        num_samples = 1
        instant = 0.0

        while instant <= target_exec_time:
            instant += rtt
            num_samples += 1

        self.update_timings([rtt - proc_time] * num_samples, [proc_time] * num_samples)
        duration = instant + rtt
        return SamplingResult(duration=duration, wait_time=instant - target_exec_time, ttf=duration - target_exec_time,
                              num_samples=num_samples)


class IdealSampling(IdealFrameSamplingModel, ExpSampling):
    def constant_rtt_sampling(self, rtt: float, proc_time: float, prev_ttf: float,
                              target_exec_time: float) -> SamplingResult:
        self.update_timings([rtt - proc_time], [proc_time])
        return SamplingResult(duration=target_exec_time + rtt, wait_time=0.0, ttf=rtt, num_samples=1)


class PeriodicSampling(RegularFrameSamplingModel, ExpSampling):
    def constant_rtt_sampling(self, rtt: float, proc_time: float, prev_ttf: float,
                              target_exec_time: float) -> SamplingResult:
        num_samples = 1
        instant = self._interval

        interval = max(self._interval, rtt)

        while instant <= target_exec_time:
            instant += interval
            num_samples += 1

        self.update_timings([rtt - proc_time] * num_samples, [proc_time] * num_samples)
        duration = instant + rtt
        return SamplingResult(duration=duration, wait_time=instant - target_exec_time, ttf=duration - target_exec_time,
                              num_samples=num_samples)


class HoldSampling(HoldFrameSamplingModel, ExpSampling):
    def constant_rtt_sampling(self, rtt: float, proc_time: float, prev_ttf: float,
                              target_exec_time: float) -> SamplingResult:
        num_samples = 1
        instant = self._hold_time

        while instant <= target_exec_time:
            instant += rtt
            num_samples += 1

        self.update_timings([rtt - proc_time] * num_samples, [proc_time] * num_samples)
        duration = instant + rtt
        return SamplingResult(duration=duration, wait_time=instant - target_exec_time, ttf=duration - target_exec_time,
                              num_samples=num_samples)


class AdaptiveSamplingMixin(BaseAperiodicFrameSamplingModel, ExpSampling, abc.ABC):
    def constant_rtt_sampling(self, rtt: float, proc_time: float, prev_ttf: float,
                              target_exec_time: float) -> SamplingResult:
        self._timing_model.advance(prev_ttf)
        alpha = self.get_alpha()
        beta = self.get_beta()

        instant_iter = _aperiodic_instant_iterator(
            mu=self._timing_model.get_expected_execution_time(),
            alpha=alpha,
            beta=beta,
        )
        # first instant doesn't depend on RTT
        instant = next(instant_iter)
        num_samples = 1

        while instant <= target_exec_time:
            instant = max(next(instant_iter), instant + rtt)
            num_samples += 1

        self.update_timings([rtt - proc_time] * num_samples, [proc_time] * num_samples)
        duration = instant + rtt
        return SamplingResult(duration=duration, wait_time=instant - target_exec_time, ttf=duration - target_exec_time,
                              num_samples=num_samples)


class AdaptiveSampling(AperiodicFrameSamplingModel, AdaptiveSamplingMixin):
    pass


class AdaptivePowerSampling(AperiodicPowerFrameSamplingModel, AdaptiveSamplingMixin):
    pass


class AdaptiveHoldSampling(BaseAdaptiveFrameSamplingModel, ExpSampling):
    def step_iterator(self, target_time: float, ttf: float) -> Generator[FrameSample, FrameTimings, None]:
        pass

    def constant_rtt_sampling(self, rtt: float, proc_time: float, prev_ttf: float,
                              target_exec_time: float) -> SamplingResult:
        hold_time = self._timing_model.advance(prev_ttf).get_expected_execution_time()
        num_samples = 1
        instant = hold_time

        while instant <= target_exec_time:
            instant += rtt
            num_samples += 1

        self.update_timings([rtt - proc_time] * num_samples, [proc_time] * num_samples)
        duration = instant + rtt
        return SamplingResult(duration=duration, wait_time=instant - target_exec_time, ttf=duration - target_exec_time,
                              num_samples=num_samples)


sampling_schemes: Dict[str, Callable[[], ExpSampling]] = {
    "greedy": lambda: GreedySampling.from_default_data(),
    "ideal" : lambda: IdealSampling.from_default_data(),
}

sampling_schemes.update({
    # "adaptive-empirical": lambda : AdaptiveSampling.from_default_data(EmpiricalExecutionTimeModel.from_default_data(neuroticism=None)),
    # "adaptive-empirical-low": lambda : AdaptiveSampling.from_default_data(EmpiricalExecutionTimeModel.from_default_data(neuroticism=0.0)),
    # "adaptive-empirical-high": lambda : AdaptiveSampling.from_default_data(EmpiricalExecutionTimeModel.from_default_data(neuroticism=1.0)),
    # "adaptive-theoretical-exgaussian": lambda : AdaptiveSampling.from_default_data(
    #     TheoreticalExecutionTimeModel.from_default_data(neuroticism=None, distribution=stats.exponnorm)
    # ),
    # "adaptive-theoretical-rayleigh": lambda : AdaptiveSampling.from_default_data(
    #     TheoreticalExecutionTimeModel.from_default_data(neuroticism=None, distribution=stats.rayleigh)
    # ),
    # "adaptive-theoretical-exgaussian-low": lambda : AdaptiveSampling.from_default_data(
    #     TheoreticalExecutionTimeModel.from_default_data(neuroticism=0.0, distribution=stats.exponnorm)
    # ),
    # "adaptive-theoretical-rayleigh-low": lambda : AdaptiveSampling.from_default_data(
    #     TheoreticalExecutionTimeModel.from_default_data(neuroticism=0.0, distribution=stats.rayleigh)
    # ),
    # "adaptive-theoretical-exgaussian-high": lambda : AdaptiveSampling.from_default_data(
    #     TheoreticalExecutionTimeModel.from_default_data(neuroticism=1.0, distribution=stats.exponnorm)
    # ),
    # "adaptive-theoretical-rayleigh-high": lambda : AdaptiveSampling.from_default_data(
    #     TheoreticalExecutionTimeModel.from_default_data(neuroticism=1.0, distribution=stats.rayleigh)
    # ),
    # "adaptive-fitted-naive-exgaussian": lambda : AdaptiveSampling.from_default_data(FittedNaiveExecutionTimeModel.from_default_data(dist=stats.exponnorm)),
    # "adaptive-fitted-naive-rayleigh": lambda : AdaptiveSampling.from_default_data(FittedNaiveExecutionTimeModel.from_default_data(dist=stats.rayleigh))
})

# sampling_schemes.update({
#     f"adaptive-constant-Q{i + 1}-{t:0.1f}s":
#         lambda : AdaptiveSampling.from_default_data(ConstantExecutionTimeModel(float(t))) for i, t in enumerate(exec_time_qs)
# })

# sampling_schemes.update({
#     "adaptive-power-empirical"                  : lambda: AdaptivePowerSampling.from_default_data(
#         EmpiricalExecutionTimeModel.from_default_data(neuroticism=None), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]),
#     "adaptive-power-empirical-low"              : lambda: AdaptivePowerSampling.from_default_data(
#         EmpiricalExecutionTimeModel.from_default_data(neuroticism=0.0), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]),
#     "adaptive-power-empirical-high"             : lambda: AdaptivePowerSampling.from_default_data(
#         EmpiricalExecutionTimeModel.from_default_data(neuroticism=1.0), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]),
#     "adaptive-power-theoretical-exgaussian"     : lambda: AdaptivePowerSampling.from_default_data(
#         TheoreticalExecutionTimeModel.from_default_data(neuroticism=None, distribution=stats.exponnorm), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]
#     ),
#     # "adaptive-power-theoretical-rayleigh": lambda : AdaptivePowerSampling.from_default_data(
#     #     TheoreticalExecutionTimeModel.from_default_data(neuroticism=None, distribution=stats.rayleigh)
#     # ),
#     "adaptive-power-theoretical-exgaussian-low" : lambda: AdaptivePowerSampling.from_default_data(
#         TheoreticalExecutionTimeModel.from_default_data(neuroticism=0.0, distribution=stats.exponnorm), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]
#     ),
#     # "adaptive-power-theoretical-rayleigh-low": lambda : AdaptivePowerSampling.from_default_data(
#     #     TheoreticalExecutionTimeModel.from_default_data(neuroticism=0.0, distribution=stats.rayleigh)
#     # ),
#     "adaptive-power-theoretical-exgaussian-high": lambda: AdaptivePowerSampling.from_default_data(
#         TheoreticalExecutionTimeModel.from_default_data(neuroticism=1.0, distribution=stats.exponnorm), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]
#     ),
#     # "adaptive-power-theoretical-rayleigh-high": lambda : AdaptivePowerSampling.from_default_data(
#     #     TheoreticalExecutionTimeModel.from_default_data(neuroticism=1.0, distribution=stats.rayleigh)
#     # ),
#     "adaptive-power-fitted-naive-exgaussian"    : lambda: AdaptivePowerSampling.from_default_data(
#         FittedNaiveExecutionTimeModel.from_default_data(dist=stats.exponnorm), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]
#     ),
#     # "adaptive-power-fitted-naive-rayleigh": lambda : AdaptivePowerSampling.from_default_data(FittedNaiveExecutionTimeModel.from_default_data(dist=stats.rayleigh))
# })

sampling_schemes.update({
    "adaptive-power-rolling-ttf"                  : lambda: AdaptivePowerSampling.from_default_data(
        ExpKernelRollingTTFETModel(neuroticism=None), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]),
    "adaptive-power-rolling-ttf-low"              : lambda: AdaptivePowerSampling.from_default_data(
        ExpKernelRollingTTFETModel(neuroticism=0.0), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]),
    "adaptive-power-rolling-ttf-high"              : lambda: AdaptivePowerSampling.from_default_data(
        ExpKernelRollingTTFETModel(neuroticism=1.0), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]),
    "adaptive-power-fitted-rolling-ttf-exgaussian"     : lambda: AdaptivePowerSampling.from_default_data(
        DistExpKernelRollingTTFETModel(neuroticism=None, dist=stats.exponnorm), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]
    ),
    "adaptive-power-fitted-rolling-ttf-exgaussian-low"     : lambda: AdaptivePowerSampling.from_default_data(
        DistExpKernelRollingTTFETModel(neuroticism=0.0, dist=stats.exponnorm), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]
    ),
    "adaptive-power-fitted-rolling-ttf-exgaussian-high"     : lambda: AdaptivePowerSampling.from_default_data(
        DistExpKernelRollingTTFETModel(neuroticism=1.0, dist=stats.exponnorm), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]
    ),
    "adaptive-power-fitted-naive-exgaussian"    : lambda: AdaptivePowerSampling.from_default_data(
        FittedNaiveExecutionTimeModel(dist=stats.exponnorm), comm_power_w=power_mw["comm"], idle_power_w=power_mw["idle"]
    ),
})

sampling_schemes.update({
    "adaptive-hold-rolling-ttf-high": lambda : AdaptiveHoldSampling.from_default_data(ExpKernelRollingTTFETModel(neuroticism=1.0)),
    "adaptive-hold-rolling-ttf-low": lambda : AdaptiveHoldSampling.from_default_data(ExpKernelRollingTTFETModel(neuroticism=0.0)),
    "adaptive-hold-fitted-rolling-ttf-exgaussian-high": lambda : AdaptiveHoldSampling.from_default_data(DistExpKernelRollingTTFETModel(neuroticism=1.0, dist=stats.exponnorm)),
    "adaptive-hold-fitted-rolling-ttf-exgaussian-low": lambda : AdaptiveHoldSampling.from_default_data(DistExpKernelRollingTTFETModel(neuroticism=0.0, dist=stats.exponnorm)),
    "adaptive-hold-fitted-naive-exgaussian": lambda : AdaptiveHoldSampling.from_default_data(FittedNaiveExecutionTimeModel(dist=stats.exponnorm)),
})


# sampling_schemes.update({
#     f"adaptive-power-constant-Q{i + 1}-{t:0.1f}s":
#         lambda : AdaptivePowerSampling.from_default_data(ConstantExecutionTimeModel(float(t))) for i, t in enumerate(exec_time_qs)
# })

# sampling_schemes.update({
#     f"periodic-{t:0.1f}s": lambda : PeriodicSampling.from_default_data(sampling_interval_seconds=float(t)) for t in (0.125, 0.25, 0.5, 1)
# })
# sampling_schemes.update({
#     f"hold-{t:0.1f}s": lambda : HoldSampling.from_default_data(hold_time_seconds=float(t)) for t in (3, 5, 7)
# })

list(sampling_schemes.keys())

['greedy',
 'ideal',
 'adaptive-power-rolling-ttf',
 'adaptive-power-rolling-ttf-low',
 'adaptive-power-rolling-ttf-high',
 'adaptive-power-fitted-rolling-ttf-exgaussian',
 'adaptive-power-fitted-rolling-ttf-exgaussian-low',
 'adaptive-power-fitted-rolling-ttf-exgaussian-high',
 'adaptive-power-fitted-naive-exgaussian',
 'adaptive-hold-rolling-ttf-high',
 'adaptive-hold-rolling-ttf-low',
 'adaptive-hold-fitted-rolling-ttf-exgaussian-high',
 'adaptive-hold-fitted-rolling-ttf-exgaussian-low',
 'adaptive-hold-fitted-naive-exgaussian']

In [5]:
proc_time = 0.3  # 300 ms
# warmup_steps = 20

def run_combination(timing: str, sampling: str, rtt: float, repetition: int) -> pd.DataFrame:
    timing_model = timing_models[timing]()
    sampling_model = sampling_schemes[sampling]()
    prev_ttf = rtt
    cumulative_duration = 0.0
    cumulative_samples = 0

    # for power calculations:
    comm_time_per_sample = rtt - proc_time
    cumulative_energy = 0.0

    rows = deque()
    sampling_model.update_timings([comm_time_per_sample], [proc_time])

    for step in range(1, task_steps + 1):
        exec_time = timing_model.advance(prev_ttf).get_execution_time()
        duration, wait_time, ttf, num_samples = sampling_model.constant_rtt_sampling(rtt=rtt, proc_time=proc_time, prev_ttf=prev_ttf, target_exec_time=exec_time)

        cumulative_duration += duration
        cumulative_samples += num_samples

        # calculate power
        comm_time = comm_time_per_sample * num_samples
        idle_time = duration - comm_time
        comm_energy = comm_time * power_mw["comm"]
        idle_energy = idle_time * power_mw["idle"]

        total_energy = comm_energy + idle_energy
        cumulative_energy += total_energy

        rows.append(
            {
                "timing_model"       : timing,
                "sampling_scheme"    : sampling,
                "rtt"                : rtt,
                "step"               : step,
                "previous_ttf"       : prev_ttf,
                "execution_time"     : exec_time,
                "step_duration"      : duration,
                "ttf"                : ttf,
                "wait_time"          : wait_time,
                "samples"            : num_samples,
                "cumulative_duration": cumulative_duration,
                "cumulative_samples" : cumulative_samples,
                "repetition"         : repetition,
                "energy"             : total_energy,
                "cumulative_energy"  : cumulative_energy,
                "comm_time"          : comm_time,
                "idle_time"          : idle_time,
                "comm_energy"        : comm_energy,
                "idle_energy"        : idle_energy,
            }
        )
        prev_ttf = ttf

    return pd.DataFrame(rows)

In [6]:

from tqdm.notebook import tqdm
import multiprocess as mp
import shutil

result_path = "./sampling_scaling_rtt.gzip"
combs = set(itertools.product(timing_models.keys(), sampling_schemes.keys(), rtts, range(1, runs_per_model + 1)))

# only calculate missing results
results = deque()

try:
    old_results = pd.read_parquet(result_path)
    existing_combinations = set(
        old_results[["timing_model", "sampling_scheme", "rtt", "repetition"]].itertuples(index=False))
    shutil.rmtree(result_path)
    results.append(old_results)
except FileNotFoundError:
    existing_combinations = set()

combs.difference_update(existing_combinations)
if len(combs) == 0:
    print("No missing combinations.")
else:
    # noinspection PyUnresolvedReferences
    with tqdm(
        total=len(combs),
        desc="Running timing model/sampling scheme combinations...",
        bar_format="{l_bar}{bar}{n_fmt}/{total_fmt} [Time: {elapsed}]"
    ) as bar, mp.Pool(
        processes=os.cpu_count() - 1,
        maxtasksperchild=10
    ) as pool:

        def _callback(result: pd.DataFrame):
            bar.update()
            results.append(result)

        for c in combs:
            pool.apply_async(run_combination, args=c, callback=_callback)

        pool.close()
        pool.join()  # wait for workers

results = pd.concat(results, ignore_index=True)
results["timing_model"] = results["timing_model"].astype(
    pd.CategoricalDtype(timing_models.keys(), ordered=False)
)
results["sampling_scheme"] = results["sampling_scheme"].astype(
    pd.CategoricalDtype(sampling_schemes.keys(), ordered=False)
)
results.to_parquet(result_path, partition_cols=["timing_model", "sampling_scheme"], compression="gzip")
results

Running timing model/sampling scheme combinations...:   0%|          0/3000 [Time: 00:00]

TypeError: NDFrame.astype() missing 1 required positional argument: 'dtype'