### Novel observations
* Controller reports available as well, showing that a lot of time is spent in the `act` phase of sending (sequentially!) the commands to the hosts. I realize now that the `act` is blocking, so the transmit are effectively serialized at the controller!
* Redundant local transports mostly vanished -- there are times when the planner decides in a single step that a dataset is needed at two workers on a host so issues two transmit commands. We could thus replace the redundant sent by idle time, to save network etc. It happens only 3 times out of 55 in the 2,2 scenario

The `lA` logs here represent the code/measurements _before_ transmits were reworked to happen async, the `lB` the _after_.
The `act` phase duration has shortened considerably, but the overall duration has increased -- possibly due to increased contention, due to introduction of locks, etc. But the overall amount of transmits has stayed roughly the same (even dripped a tiny bit). In particular, duration of the longest transmit has increased 4 times in the 2-host 2-worker scenario, **from 1 second to 4 seconds**. During that time, both sides of the transmit were doing other things as well (transmitting other datasets, computing tasks, etc).

### Next steps
* Rework the client to send asynchronously
* Fuse the multi-transmit
* When picking transmit, prefer local ones
* Create a tooling for worker's timeline visualisation/exploration, to understand the contention
  * Possibly parametrize the contention: how many concurrent transmits to allow, whether to allow transmits concurrent to task, pick least busy worker for transmits...


In [1]:
import pandas as pd
import numpy as np

In [2]:
def fixWorker(df):
    rows = df['host'] != 'controller'
    df.loc[rows, 'worker'] = df.loc[rows, 'host'] + ':' + df.loc[rows, 'worker'] 

def readAll(base):
    c = pd.read_json(f"{base}.controller.jsonl", lines=True)
    t = pd.read_json(f"{base}.tasks.jsonl", lines=True)
    d = pd.read_json(f"{base}.datasets.jsonl", lines=True)
    if 'M' in base:
        fixWorker(t)
        if d.shape[0] > 0:
            fixWorker(d)
    return c, t, d

In [3]:
f1c, f1t, f1d = readAll('lA_F_1')
f4c, f4t, f4d = readAll('lA_F_4')
m14c, m14t, m14d = readAll('lA_M_1_4')
m41c, m41t, m41d = readAll('lA_M_4_1')
m22c, m22t, m22d = readAll('lA_M_2_2')
# after making the transmit non-blocking
n14c, n14t, n14d = readAll('lB_M_1_4')
n41c, n41t, n41d = readAll('lB_M_4_1')
n22c, n22t, n22d = readAll('lB_M_2_2')

In [4]:
def fixMode(df):
    rows = ~df.dataset.isna()
    proj = df[rows & ~df['mode'].isna()].set_index(['dataset', 'worker'])['mode']
    lookup = proj[~proj.index.duplicated(keep='last')]
    return df.set_index(['dataset', 'worker']).drop(columns='mode').join(lookup).reset_index()

def fmn(n): # TODO set some central
    return f"{n:.3e}"

def ensureColumns(df, columns):
    for column in columns:
        if not column in df.columns:
            df = df.assign(**{column: np.nan})
    return df

def analyzeController(df):
    print(f"phases: {df.shape[0]}")
    print(f"total waits duration: {fmn(df.waitDuration.sum())}")
    print(f"total act duration: {fmn(df.actDuration.sum())}")
    print(f"transmits issued: {df.actionsTransmit.sum()}, transmits received: {df.eventsTransmited.sum()}")
    print(f"busy-during-wait: {fmn((df.busyWorkers * df.waitDuration).sum())}")
    
def transmitDurations(df):
    datasets = fixMode(df)
    durations = datasets.pivot(index=['dataset', 'worker', 'mode'], columns=['action'], values=['at'])
    durations.columns = [name[1][len('transmit'):] for name in durations.columns]
    durations = durations.reset_index()
    localFix = durations['mode'] == 'local'
    durations.loc[localFix, 'Started'] = durations.loc[localFix, 'Finished']
    durations.loc[localFix, 'Loaded'] = durations.loc[localFix, 'Finished']
    durations = durations.assign(total=durations.Finished - durations.Planned)
    durations = durations.assign(commDelay=durations.Started-durations.Planned)
    durations = durations.assign(loadDelay=durations.Loaded-durations.Started)
    durations = durations.assign(transmitDelay=durations.Finished-durations.Loaded)
    return durations

def taskDurations(df):
    tasks = df[~df.task.isna()]
    durations = tasks.pivot(index=['task', 'worker'], columns=['action'], values=['at'])
    durations.columns = [name[1][len('task'):] for name in durations.columns]
    durations = durations.reset_index()
    durations = durations.assign(total=durations.Finished - durations.Planned)
    durations = durations.assign(commDelay = durations.Enqueued - durations.Planned)
    durations = durations.assign(queueDelay = durations.Started - durations.Enqueued)
    durations = durations.assign(loadDelay = durations.Loaded - durations.Started)
    durations = durations.assign(runtimes = durations.Finished - durations.Loaded)
    durations = durations.assign(onWorker = durations.Finished - durations.Enqueued)
    return durations

def analyzeTransmits(df):
    durations = transmitDurations(df)
    print(f"total transmit duration: {fmn(durations.total.sum())}")
    print(" *** ")
    print(f"mode counts: {durations['mode'].value_counts()}")
    print(f"per-mode transmit duration: {durations[['mode', 'total']].groupby('mode').sum()}")
    print(" *** ")
    print(f"total comm delay: {fmn(durations.commDelay.sum())}")
    print(f"mean comm delay: {fmn(durations.commDelay.mean())}")
    print(f"max comm delay: {fmn(durations.commDelay.max())}")
    print(" *** ")
    remotes = durations.query("mode == 'remote'")
    print(f"total load delay: {fmn(remotes.loadDelay.sum())}")
    print(f"mean load delay: {fmn(remotes.loadDelay.mean())}")
    print(f"max load delay: {fmn(remotes.loadDelay.max())}")
    print(" *** ")
    print(f"total transmit delay: {fmn(remotes.transmitDelay.sum())}")
    print(f"mean transmit delay: {fmn(remotes.transmitDelay.mean())}")
    print(f"max transmit delay: {fmn(remotes.transmitDelay.max())}")
    print(" *** ")
    
def analyzeTasks(df):
    durations = taskDurations(df)
    print(f"total task duration: {fmn(durations.total.sum())}")
    print(" *** ")
    print(f"total task duration per worker: {durations.groupby('worker').onWorker.agg(['mean', 'sum'])}")
    print(" *** ")
    print(f"total comm delay: {fmn(durations.commDelay.sum())}")
    print(f"mean comm delay: {fmn(durations.commDelay.mean())}")
    print(f"max comm delay: {fmn(durations.commDelay.max())}")
    print(" *** ")
    print(f"total queue delay: {fmn(durations.queueDelay.sum())}")
    print(f"mean queue delay: {fmn(durations.queueDelay.mean())}")
    print(f"max queue delay: {fmn(durations.queueDelay.max())}")
    print(" *** ")
    print(f"total runtime delay: {fmn(durations.runtimes.sum())}")

In [5]:
analyzeController(f1c)

phases: 135
total waits duration: 1.794e+10
total act duration: 1.003e+09
transmits issued: 0, transmits received: 0
busy-during-wait: 1.794e+10


In [6]:
analyzeController(f4c)

phases: 133
total waits duration: 6.106e+09
total act duration: 1.430e+09
transmits issued: 0, transmits received: 0
busy-during-wait: 2.183e+10


In [7]:
analyzeController(m14c)

phases: 70
total waits duration: 2.782e+08
total act duration: 1.060e+10
transmits issued: 0, transmits received: 0
busy-during-wait: 9.496e+08


In [8]:
analyzeController(n14c)

phases: 133
total waits duration: 6.953e+08
total act duration: 9.345e+09
transmits issued: 0, transmits received: 0
busy-during-wait: 1.516e+09


In [9]:
analyzeController(m22c)

phases: 66
total waits duration: 5.873e+09
total act duration: 1.335e+10
transmits issued: 55, transmits received: 55
busy-during-wait: 2.015e+10


In [10]:
analyzeController(n22c)

phases: 105
total waits duration: 9.587e+09
total act duration: 1.091e+10
transmits issued: 48, transmits received: 48
busy-during-wait: 3.682e+10


In [11]:
analyzeController(m41c)

phases: 67
total waits duration: 3.930e+09
total act duration: 1.822e+10
transmits issued: 110, transmits received: 110
busy-during-wait: 1.551e+10


In [12]:
analyzeController(n41c)

phases: 169
total waits duration: 2.160e+10
total act duration: 1.054e+10
transmits issued: 98, transmits received: 98
busy-during-wait: 8.135e+10


In [13]:
analyzeTransmits(m22d)

total transmit duration: 4.251e+09
 *** 
mode counts: mode
remote       52
redundant     3
Name: count, dtype: int64
per-mode transmit duration:                 total
mode                 
redundant   881508877
remote     3369545192
 *** 
total comm delay: 2.846e+08
mean comm delay: 5.175e+06
max comm delay: 6.485e+07
 *** 
total load delay: 2.938e+07
mean load delay: 5.650e+05
max load delay: 1.443e+06
 *** 
total transmit delay: 3.059e+09
mean transmit delay: 5.883e+07
max transmit delay: 1.076e+09
 *** 


In [14]:
analyzeTransmits(n22d)

total transmit duration: 1.387e+10
 *** 
mode counts: mode
remote    48
Name: count, dtype: int64
per-mode transmit duration:               total
mode               
remote  13865203703
 *** 
total comm delay: 4.271e+08
mean comm delay: 8.898e+06
max comm delay: 6.223e+07
 *** 
total load delay: 3.556e+07
mean load delay: 7.408e+05
max load delay: 3.964e+06
 *** 
total transmit delay: 1.340e+10
mean transmit delay: 2.792e+08
max transmit delay: 4.635e+09
 *** 


In [15]:
analyzeTransmits(m41d)

total transmit duration: 9.800e+09
 *** 
mode counts: mode
remote    110
Name: count, dtype: int64
per-mode transmit duration:              total
mode              
remote  9800394444
 *** 
total comm delay: 1.011e+09
mean comm delay: 9.195e+06
max comm delay: 6.566e+08
 *** 
total load delay: 5.603e+07
mean load delay: 5.094e+05
max load delay: 1.274e+06
 *** 
total transmit delay: 8.733e+09
mean transmit delay: 7.939e+07
max transmit delay: 1.274e+09
 *** 


In [16]:
analyzeTransmits(n41d)

total transmit duration: 5.664e+10
 *** 
mode counts: mode
remote    98
Name: count, dtype: int64
per-mode transmit duration:               total
mode               
remote  56640390536
 *** 
total comm delay: 8.381e+09
mean comm delay: 8.552e+07
max comm delay: 3.718e+09
 *** 
total load delay: 7.004e+07
mean load delay: 7.147e+05
max load delay: 5.043e+06
 *** 
total transmit delay: 4.819e+10
mean transmit delay: 4.917e+08
max transmit delay: 1.105e+10
 *** 


In [20]:
Dn22d = transmitDurations(n22d)
Dm22d = transmitDurations(m22d)

In [24]:
Dn22d.sort_values(by="transmitDelay", ascending=False)[:5]

Unnamed: 0,dataset,worker,mode,Finished,Loaded,Planned,Started,total,commDelay,loadDelay,transmitDelay
37,"retrieve(1,):84fbe9f7d17f79669173bbfa74fae8183...",h1:worker1,remote,16848567183789,16843932338035,16843930704797,16843931825121,4636478992,1120324,512914,4634845754
39,retrieve:228aecd8e8e69130f9f6fe68973639b52a6e3...,h0:worker1,remote,16843866592089,16839263215223,16839261719502,16839262669693,4604872587,950191,545530,4603376866
1,concat:f26f97554dd3e073b8d9fe163f390f9f880154e...,h0:worker1,remote,16845863610503,16844567894151,16844564661826,16844566434189,1298948677,1772363,1459962,1295716352
0,concat:29989362f37d65adb916f9c7dde48777a53a1a3...,h1:worker0,remote,16840278704073,16839260368282,16839258046338,16839259490726,1020657735,1444388,877556,1018335791
8,"retrieve(0, 10):af27ba6b04ec0b75b39afeeae1c202...",h1:worker0,remote,16836338136031,16836248731209,16836234352423,16836244767646,103783608,10415223,3963563,89404822


In [25]:
Dm22d.sort_values(by="transmitDelay", ascending=False)[:5]

Unnamed: 0,dataset,worker,mode,Finished,Loaded,Planned,Started,total,commDelay,loadDelay,transmitDelay
41,"retrieve(1,):84fbe9f7d17f79669173bbfa74fae8183...",h0:worker0,remote,16283523838005,16282447563740,16282445865752,16282446872239,1077972253,1006487,691501,1076274265
44,retrieve:228aecd8e8e69130f9f6fe68973639b52a6e3...,h1:worker1,remote,16282397954780,16281352527444,16281351635008,16281352233132,1046319772,598124,294312,1045427336
42,"retrieve(1,):84fbe9f7d17f79669173bbfa74fae8183...",h0:worker1,redundant,16284441236583,16283594831758,16283593587321,16283594337489,847649262,750168,494269,846404825
26,"retrieve(1, 11):b14562177f37fe6d669d26a982313f...",h0:worker0,remote,16278335131026,16278260551467,16278258561750,16278259936861,76569276,1375111,614606,74579559
18,"retrieve(0, 5):3f288e6a71aba29f67ed9d96fd04774...",h0:worker0,remote,16277935936481,16277863909422,16277861800695,16277863250956,74135786,1450261,658466,72027059
