# Duration


In RA, ``duration`` is a general term to indicate a measure of the time spent by an instance of an entity (local analyses) or a set of instances of an entity (global analyses) between two timestamps. For example, staging, scheduling, pre-execute, execute time of one or more compute tasks; description, submission and execution time of one or more pipelines or stages; and runtime of one or more pilots.

We show two sets of default durations for RADICAL-Pilot and how to define arbitrary durations, depending on the specifics of a given analysis. We then see how to plot the most common durations 

## Prologue

Load all the Python modules needed to profile and plot a RADICAL-EnsembleToolkit (EnTK) session.

In [None]:
import tarfile

import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker

import radical.utils as ru
import radical.pilot as rp
import radical.analytics as ra

from radical.pilot import states as rps

Load the RADICAL Matplotlib style to obtain viasually consistent and publishable-qality plots.

In [None]:
plt.style.use(ra.get_mplstyle('radical_mpl'))

Usually, it is useful to record the stack used for the analysis. 

<div class="alert alert-info">
    
__Note:__ The stack used for the analysis might be different from the stack used to crete the session to analyze. Usually, the two stack have to have the same major release number in order to be compatible.

</div>

In [None]:
! radical-stack

## Default Durations

Currently, we offer a set of default durations for the entity types Pilot and Task of RADICAL-Pilot.

In [None]:
pd.DataFrame(ra.utils.tabulate_durations(rp.utils.PILOT_DURATIONS_DEBUG))

In [None]:
pd.DataFrame(ra.utils.tabulate_durations(rp.utils.TASK_DURATIONS_DEBUG))

Most of those durations are meant for __debugging__ as they are as granular as possible and (almost completely) contiguous. Nonetheless, some are commonly used in experiment analyses. For example:

- __p_agent_runtime__: the amount of time for which one or more pilots (i.e., RP Agent) were active.
- __p_pmngr_scheduling__: the amount of time one or more pilots waited in the queue of the HPC batch system.
- __u_agent_stage_in__: the amount of time taken to stage the input data of one or more tasks.
- __u_agent_schedule__: the amount of time taken to schedule of one or more tasks.
- __u_agent_t_pre_execute__: the amount of time taken to execute the ``pre_exec`` of one or more tasks.
- __u_agent_t_execute__: the amount of time taken to execute the executable of one or more tasks.
- __u_agent_t_stage_out__: the amount of time taken to stage the output data of one or more tasks.

## Arbitrary Durations

RA enables the __arbitrary__ definition of durations. What duration you need, depends on why you need a certain measure. For example, given an experiment to charaterize the performance of one of RP executors, it might be useful to measure the amount of time spent by each task in the Executor component. 

<div class="alert alert-warning">
    
__Warning:__ Correctly defining a duration requires a __detailed__ understanding of both [RP architecture](https://github.com/radical-cybertools/radical.pilot/wiki/Architecture) and [event model](https://github.com/radical-cybertools/radical.pilot/blob/devel/docs/source/events.md). 

</div>


Once we acquired that understanding, we can define our duration as the sum of the time spent by the task in the Executor component before and after the execution of the task's executable.

In [None]:
t_executor_before = [{ru.EVENT: 'state'          , ru.STATE: rps.AGENT_EXECUTING},
                     {ru.EVENT: 'task_exec_start', ru.STATE: None               } ]

t_executor_after  = [{ru.EVENT: 'task_exec_stop' , ru.STATE: None               }, 
                     {ru.EVENT: 'task_stop'      , ru.STATE: None               } ]

## Duration Analysis

Every analysis with RA requires to load the traces produced by RADICAL-Pilot (RP) or RADICAL-EnsembleToolkit (EnTK) into a session object. Both RP and EnTK write traces (i.e., timestamped sequences of events) to a  directory called ``client sandbox``. This directory is created inside the directory from which you executed your application. The name of the client sandbox is a session ID, e.g., ``rp.session.hostname.username.018443.0002`` or ``en.session.hostname.username.018443.0002``.

### Session

Name and location of the session we profile.

In [None]:
sid = 're.session.login1.lei.018775.0005'
sdir = 'sessions/'

Unbzip and untar sessions.

In [None]:
sp = sdir+sid+'.tar.bz2'
tar = tarfile.open(sp, mode='r:bz2')
tar.extractall(path=sdir)
tar.close()

Create a ``ra.Session`` object for the session. We are not going to use EnTK-specific traces so we are going to load only the RP traces contained in the EnTK session. Thus, we pass the ``'radical.pilot'`` session type to ``ra.Session``.

<div class="alert alert-warning">
    
__Warning:__ We already know we will want to derive information about pilot(s) and tasks. Thus, we save in memory a session objects filtered for those two identities. This might be too expensive with large sessions, depending on the amount of memory available.

</div>
    
<div class="alert alert-info">

__Note:__ We save the ouput of ``ra.Session`` in ``capt`` to avoid polluting the notebook. 

</div>

In [None]:
%%capture capt

sp = sdir+sid

session = ra.Session(sp, 'radical.pilot')
pilots  = session.filter(etype='pilot', inplace=False)
tasks   = session.filter(etype='task' , inplace=False)

As seen above, durations measure the time spent by an instance of an entity (local analyses) or a set of instances of an entity (global analyses) between two timestamps. For example, staging, scheduling, pre-execute, execute time of one or more compute tasks; description, submission and execution time of one or more pipelines or stages; and runtime of one or more pilots.

We starts with a global analysis to measure for how long all the pilots of our run have been active. Looking at the __[event model](https://github.com/radical-cybertools/radical.pilot/blob/devel/docs/source/events.md#bootstrap_0sh)__ of the entity of type ``pilot`` and to ``rp.utils.PILOT_DURATIONS_DEBUG``, we know that a pilot is active between the event ``TMGR_STAGING_OUTPUT`` and one of the final events ``DONE``, ``CANCELED`` or ``FAILED``. We also know that we have a default duration with those events: ``p_agent_runtime``.

To measure that duration, first, we filter the session object so to keep only the entities of type Pilot; and, second, we get the __cumulative__ amount of time for which all the pilot were active:

In [None]:
p_runtime = pilots.duration(event=rp.utils.PILOT_DURATIONS_DEBUG['p_agent_runtime'])
p_runtime

<div class="alert alert-info">
    
__Note:__ This works for a set of pilots, including the case in which we have a single pilot. If we have a single pilot, the cumulative active time of all the pilots is equal to the active time of the only available pilot.

</div>

If we have more than one pilot and we want to measure the active time of one of them, then we need to perform a local analysis. A rapid way to get a list of all the pilot entities in the session and, for example, see their unique identifiers (uid) is:

In [None]:
puids = [p.uid for p in pilots.get()]
puids

Once we know the ID of the pilot we want to analyze, first we filter the session object so to keep only the pilot we want to analyze; and, second, we get the amount of time for which that specific pilot was active:

In [None]:
p0000 = pilots.filter(uid='pilot.0000')
p0000_runtime = p0000.duration(event=rp.utils.PILOT_DURATIONS_DEBUG['p_agent_runtime'])
p0000_runtime

The same approach and both global and local analyses can be performed for every type of entity supported by RA (currently, Pilot, Task, Pipeline, Stage and Task). 

Total task execution time (TTX) and RADICAL-Pilot overheads (OVH) are among the most common metrics used to describe the global behavior of RCT. TTX consider the time taken by __ALL__ the tasks to execute, accounting for the overhapping among tasks. This means that if ``Task_a`` and ``task_b`` both start at the same time and ``Task_a`` finishes after 10 minutes and ``Task_b`` after 15, TTX will be 15 minutes. Conversely, if ``task_b`` starts to execute 5 minutes after ``task_a``, TTX will be 20 minutes. Finally, if ``task_b`` starts to execute 10 minutes after ``task_a`` finished, TTX will be 25 minutes as the gap between the two tasks will not be considered.

In [None]:
ttx = tasks.duration(event=rp.utils.TASK_DURATIONS_DEBUG['u_agent_lm_execute'])
ovh = p_runtime - ttx

print('TTX: %f\nOVH: %f' % (ttx, ovh))

### Plotting

We plot TTX and OVH for 4 sessions of an experiment. We create suitable data structures to suppor the plotting and we produce a figure with 4 subplots.

In [None]:
sids = ['re.session.login1.lei.018775.0008',
        're.session.login1.lei.018775.0007',
        're.session.login1.lei.018775.0004',
        're.session.login1.lei.018775.0005']
sdir = 'sessions/'
sessions = [sdir+s for s in sids]

Unbzip and untar sessions.

In [None]:
for sid in sids:
    sp = sdir+sid+'.tar.bz2'
    tar = tarfile.open(sp, mode='r:bz2')
    tar.extractall(path=sdir)
    tar.close()

Create session objects for each session of the experiment.

In [None]:
%%capture capt

ss = {}
for sid in sids:
    sp = sdir+sid
    ss[sid] = {'s': ra.Session(sp, 'radical.pilot')}
    ss[sid].update({'p': ss[sid]['s'].filter(etype='pilot'   , inplace=False),
                    't': ss[sid]['s'].filter(etype='task'    , inplace=False)})

Derive information we will want to use in our plots from the sessions via ``ra.Session``.

In [None]:
for sid in sids:
    
    ss[sid].update({'cores_node': ss[sid]['s'].get(etype='pilot')[0].cfg['resource_details']['rm_info']['cores_per_node'],
                    'pid'       : ss[sid]['p'].list('uid'),
                    'ntask'     : len(ss[sid]['t'].get())
    })
    
    ss[sid].update({'ncores'    : ss[sid]['p'].get(uid=ss[sid]['pid'])[0].description['cores'],
                    'ngpus'     : ss[sid]['p'].get(uid=ss[sid]['pid'])[0].description['gpus']
    })
    
    ss[sid].update({'nnodes'    : int(ss[sid]['ncores']/ss[sid]['cores_node'])})

Use the default global durations to calculate TTX and OVH for each session of the experiment.

In [None]:
for sid in sids:
    t  = ss[sid]['t']
    p  = ss[sid]['p']

    ss[sid].update({
      'ttx': t.duration( event=rp.utils.TASK_DURATIONS_DEBUG['u_agent_lm_execute']),
      'p_runtime': p.duration(event=rp.utils.PILOT_DURATIONS_DEBUG['p_agent_runtime'])
    })
    
    ss[sid].update({'ovh': ss[sid]['p_runtime'] - ss[sid]['ttx']})

Plot TTX and OVH for each session, add information about each run and letters for each subplot fo easy referencing in a paper.

In [None]:
nsids = len(sids)

fwidth, fhight = ra.get_plotsize(516,subplots=(1, nsids))
fig, axarr = plt.subplots(1, nsids, sharey=True, figsize=(fwidth, fhight))

i = 0
j = 'a'
for sid in sids:

    if len(sids) > 1:
        ax = axarr[i]
    else:
        ax = axarr
    
    ax.title.set_text('%s tasks; %s nodes' % (ss[sid]['ntask'], int(ss[sid]['nnodes'])))

    ax.bar(x = 'OVH', height = ss[sid]['ovh'])
    ax.bar(x = 'TTX', height = ss[sid]['ttx'])

    ax.set_xlabel('(%s)' % j, labelpad=10)

    i = i+1
    j = chr(ord(j) + 1)

fig.text(  0.05,  0.5 , 'Time (s)', va='center', rotation='vertical')
fig.text(  0.5 , -0.2, 'Metric'  , ha='center')
fig.legend(['RADICAL Cybertools overhead (OVH)', 
            'Workflow time to completion (TTX)'], 
           loc='upper center', 
           bbox_to_anchor=(0.5, 1.5), 
           ncol=1)

## Danger of Duration Analysis

<div class="alert alert-warning">
    
__Warning:__ Most of the time, the durations of __global analyses__ are __NOT__ additive. 

</div>
    
For example, the sum of the total time taken by RP Agent to manage all the compute tasks and the total amount of time taken to execute all those compute tasks is __greater__ than the time taken to execute all the workload. This is because RP is a distributed system that performs multiple operations at the same time on multiple resources. Thus, while RP Agent manages a compute task, it might be executing another compute task.

Consider three durations:

1. __t_agent_t_load__: the time from when RP Agent receives a compute task to the time in which the compute task's executable is launched.
2. __t_agent_t_execute__: default duration for the time taken by a compute task's executable to execute.
3. __t_agent_t_load__: the time from when a compute task's executable finishes to execute to when RP Agent mark the compute task with a final state (DONE, CANCELED or FAILED).

For a single compute task, ``t_agent_t_load``, ``t_agent_t_execute`` and ``t_agent_t_load`` are contiguos and therefore additive. A single compute task cannot be loaded by RP Agent while it is also executed. For multiple compute tasks, this does not apply: one compute tasks might be loaded by RP Agent while another compute task is being executed.

## Distribution of Durations

We want to calculate the statistical distribution of default and arbitrary durations. Variance and outliers characterize the runtime behavior of both tasks and RADICAL-Cybertools.

Global durations like TTX and OVH are aggregate across all entities: TTX aggregates the duration of each task while OVH that of all the pilot components active hen no tasks are executed. For a distribution, we need instead the measure for each entity and component. For example, to calculate the distribution of task execution time, we have to measure the execution time of each task.

We use RA to cycle through all the task entities and then the `get` and `duration` methods to return the wanted duration for each task. We use both the default duration for task runtime and the two arbitary durations we defined above for the time taken by RP executor to manage the execution of the task.

In [None]:
t_duration = {}
events = {'tx': rp.utils.TASK_DURATIONS_DEBUG['u_agent_lm_execute'], 
          't_executor_before': t_executor_before, 
          't_executor_after': t_executor_after}

for sid in sids:
    t_duration[sid] = {}
    for name, event in events.items():
        t_duration[sid].update({name: []})    
        for tid in ss[sid]['t'].list('uid'):
            task = ss[sid]['t'].get(etype='task', uid=tid)[0]
            duration = task.duration(event=event)
            t_duration[sid][name].append(duration)

We can now plot the distribution of task runtime as a boxplot for each session:

In [None]:
fwidth, fhight = ra.get_plotsize(212)
fig, ax = plt.subplots(figsize=(fwidth, fhight))

data   = [t_duration[sid]['tx'] for sid in sids]
labels = ['%s;%s' % (ss[sid]['ntask'], int(ss[sid]['nnodes'])) for sid in sids]

ax.boxplot(data, labels=labels, patch_artist=True)

ax.set_xlabel('Task;Nodes')
ax.set_ylabel('Task Runtime (s)')

We can do the same for the arbitrary defined durations `t_executor_before` and `t_executor_after`

In [None]:
fwidth, fhight = ra.get_plotsize(212, subplots=(2, 1))
fig, axarr = plt.subplots(2, 1, figsize=(fwidth, fhight))
plt.subplots_adjust(hspace=0.6)

i = 0
for dname in ['t_executor_before', 't_executor_after']:
    ax = axarr[i]

    data   = [t_duration[sid][dname] for sid in sids]
    labels = ['%s;%s' % (ss[sid]['ntask'], int(ss[sid]['nnodes'])) for sid in sids]

    ax.boxplot(data, labels=labels, patch_artist=True)

    ax.set_title('Distribution of duration: %s' % ra.to_latex(dname))
    ax.set_xlabel('Task;Nodes')
    ax.set_ylabel('Task Runtime (s)')
    
    i += 1
    