In [None]:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.colors import LogNorm
import matplotlib.cm
import warnings
import logging
import seaborn as sns
import pandas as pd
import numpy as np
import collections
import itertools
import os
import math
import json
from sklearn import linear_model
import statsmodels.api as sm
from mpl_toolkits.axes_grid1.inset_locator import zoomed_inset_axes 
from mpl_toolkits.axes_grid1.inset_locator import mark_inset

logging.getLogger("matplotlib").setLevel(logging.FATAL)

In [None]:
DATA_PATH = "data_path_here"

PLOT_PATH = "plot_path_here"

In [None]:
frameworks = ["tf_sync", "horovod", "kungfu_ssgd", "kungfu_pairavg"]
models = ["mobilenetv2", "densenet201", "resnet50", "resnet101"]
optimizers = ["adam", "rmsprop"]
batch_sizes = ["64", "128", "512"]
backends = ["grpc", "mpi", "grpc_nccl", "gloo", "gloo_nccl", "mpi_nccl", "kungfu"]
delays = ["normal", "loss_0.01", "loss_0.05", "loss_0.1", "loss_0.2", "loss_0.5", "loss_1", "loss_2"]
topologies = ["ring", "hierarchical", "BINARY_TREE_STAR", "CLIQUE", "STAR", "TREE"]

In [None]:
def get_available_measurements(frameworks=frameworks, models=models, optimizers=optimizers, batch_sizes=batch_sizes, 
                               backends=backends, delays=delays, topologies=topologies, path=DATA_PATH):
    measurements = list()
    all_dirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
    for directory in all_dirs:
        fwork, mdl, opt, bsize, bckend, dly, topo = directory.split("-")
        if (fwork in frameworks) and (mdl in models) and (opt in optimizers) and (bsize in batch_sizes) and (bckend in backends) and (dly in delays) and (topo in topologies):
            measurements.append(directory)
    return measurements

## Communication Patterns (Fig. 4)

In [None]:
for fw, topo in list(itertools.product(["kungfu_ssgd", "kungfu_pairavg"], ["BINARY_TREE_STAR", "CLIQUE", "STAR", "TREE"])) + [("horovod", "ring")]:
    measurements = get_available_measurements(frameworks=fw,
                                              models=models,
                                              optimizers="adam",
                                              batch_sizes="64",
                                              backends=backends,
                                              delays="normal",
                                              topologies=topo)
    data = dict()
    piv = 0
    for directory in measurements:
        data[directory] = pd.read_json(path_or_buf=os.path.join(DATA_PATH, directory, "parsed-files", "joint_sent_received.json"))
        piv += pd.pivot_table(data[directory], values="frame.len",index=["ip.src"], columns=["ip.dst"], fill_value=0)

    piv = piv / (len(data.keys())*1e9)

    piv.rename({"192.168.17.50": "W1", "192.168.17.51": "W2", "192.168.17.52": "W3", "192.168.17.53": "W4"}, inplace=True)
    piv.columns = ["W1", "W2", "W3", "W4"]

    fig, ax = plt.subplots()

    ax = sns.heatmap(piv, square=True, cmap='Blues', cbar_kws={'label': 'Tx. Volume [GB]'}, norm=LogNorm(vmin=1, vmax=500))
    ax.set_xlabel("Destination")
    ax.set_ylabel("Source")#
    plt.setp(ax.xaxis.get_majorticklabels(), rotation=0)
    plt.setp(ax.yaxis.get_majorticklabels(), rotation=0)
    plt.savefig(f"{PLOT_PATH}/comm_pattern/{fw}_{topo}.pdf")

## Batchsize Effect (Fig. 9)

In [None]:
msr = get_available_measurements(frameworks="horovod", optimizers="adam", models="mobilenetv2", backends="mpi", batch_sizes=["64", "128","512"])

util = dict()
for run in msr:
    print(run)
    util[run] = pd.read_json(path_or_buf=os.path.join(DATA_PATH, run, "parsed-files", "utilization.json"))
    util[run] = util[run]*8/1e6
    util[run].columns = ["Worker0-Worker1", "Worker0-Worker2", "Worker0-Worker3", "Worker1-Worker0", "Worker1-Worker2", "Worker1-Worker3", "Worker2-Worker0", "Worker2-Worker1", "Worker2-Worker3", "Worker3-Worker0", "Worker3-Worker1", "Worker3-Worker2"]
    util[run].reset_index(inplace=True)
    util[run].index = pd.TimedeltaIndex(util[run].index*1e6)
    util[run].drop("index", axis=1, inplace=True)
    util[run] = util[run].resample('0.01S').sum()
    util[run] = util[run]/10
    util[run] = util[run][["Worker0-Worker1"]]

In [None]:
fig, ax = plt.subplots(3, 1)

plot_params = {
    'markevery': 5,
    'markersize': 4,
    'legend': False
}

util['horovod-mobilenetv2-adam-64-mpi-normal-ring'][9495:9520].plot(ax=ax[0], zorder=3, color="midnightblue", marker='o', **plot_params)
util['horovod-mobilenetv2-adam-128-mpi-normal-ring'][9495:9520].plot(ax=ax[1], zorder=3, color="cadetblue", marker='x', **plot_params)
util['horovod-mobilenetv2-adam-512-mpi-normal-ring'][9495:9520].plot(ax=ax[2], zorder=3, color="darkgreen", marker='*', **plot_params)

ax[0].set_yticklabels([])
ax[1].set_yticklabels([])
ax[2].set_yticklabels([])

handles = list()
h, _ = ax[0].get_legend_handles_labels()

handles.append(h[0])
h, _ = ax[1].get_legend_handles_labels()
handles.append(h[0])
h, _ = ax[2].get_legend_handles_labels()
handles.append(h[0])

ax[0].legend(handles, ["64", "128", "512"],
          bbox_to_anchor=(0.3, 1), loc='lower left', ncol=3, 
          frameon=False, columnspacing=0.4, handlelength=1.0, handletextpad=0.2, fontsize=11)
for i in range(3):
    ax[i].grid(color = 'gray', linestyle = '--', linewidth = 0.5, zorder=0)

ax[0].set_xticklabels([])
ax[1].set_xticklabels([])
ax[2].set_xticklabels([0, 50, 100, 150, 200])
ax[2].set_xlabel('Time (ms)')
ax[1].set_ylabel('Throughput [Gbps]')


fig.subplots_adjust(left=.18, bottom=.28, right=.99, top=.82)
plt.savefig(f"{PLOT_PATH}/batchsize_effect.pdf")

## Analysis Packet Loss (Fig. 10) 

In [None]:
msr = get_available_measurements(frameworks="tf_sync",
                                 models="mobilenetv2",
                                 optimizers="adam",
                                 backends="grpc_nccl",
                                 batch_sizes="64",
                                 delays=["loss_0.01", "loss_0.1", "loss_1", "loss_2"],
                                 topologies="ring") + ['tf_sync-mobilenetv2-adam-64-grpc_nccl-normal-ring']

In [None]:
mydict = dict()
for run in msr:
    x = pd.read_csv(os.path.join(DATA_PATH, run, "train_history.log"))
    mydict[run] = (x["Timestamp"].max() - x["Timestamp"].min())/60

In [None]:
x = pd.DataFrame.from_dict(mydict.items())
x.index = ["2\%", "0.01\%", "1\%", "0.1\%", "0\%"]
x.columns = ["fw", "min"]
x = x[["min"]]
x = x.reindex(["0\%", "0.01\%", "0.1\%", "1\%", "2\%"])
x = x/x.min()

In [None]:
fig, ax = plt.subplots()
x.plot.barh(legend=False, ax=ax, color="midnightblue")
ax.set_xticks(ticks=[i for i in range(1,11)])
ax.grid(color = 'gray', linestyle = '--', linewidth = 0.5, zorder=0)
ax.axvspan(2, 3, facecolor='gray', alpha=0.2)
ax.axvspan(4, 5, facecolor='gray', alpha=0.2)
ax.axvspan(6, 7, facecolor='gray', alpha=0.2)
ax.axvspan(8, 9, facecolor='gray', alpha=0.2)
ax.set_xlabel("Normalized Training Duration")
ax.set_ylabel("Loss")

plt.savefig(f"{PLOT_PATH}/loss_simtime.pdf")

## Accelerator Comparison (Table II)

In [None]:
msr = [
    f"{DATA_PATH}/{run}" for run in [
        "tf_sync-resnet50-adam-64-grpc-normal-ring", 
        "tf_sync-resnet50-adam-64-grpc_nccl-normal-ring", 
        "horovod-resnet50-adam-64-mpi-normal-ring", 
        "horovod-resnet50-adam-64-mpi_nccl-normal-ring",
        "horovod-resnet50-adam-64-gloo-normal-ring", 
        "horovod-resnet50-adam-64-gloo_nccl-normal-ring", 
        "kungfu_ssgd-resnet50-adam-64-kungfu-normal-BINARY_TREE_STAR",
        "kungfu_pairavg-resnet50-adam-64-kungfu-normal-BINARY_TREE_STAR"
    ]
]

util = dict()
for run in msr:
    util[run] = pd.read_json(path_or_buf=os.path.join(run, "parsed-files", "utilization.json"))
    util[run] = util[run]*8/1e6
    try:
        util[run].columns = ["Worker0-Worker1", "Worker0-Worker2", "Worker0-Worker3", "Worker1-Worker0", "Worker1-Worker2", "Worker1-Worker3", "Worker2-Worker0",
                             "Worker2-Worker1", "Worker2-Worker3", "Worker3-Worker0", "Worker3-Worker1", "Worker3-Worker2"]
    except ValueError:
        util[run].columns = ["Worker0-Worker1", "Worker0-Worker2", "Worker1-Worker0", "Worker1-Worker3", "Worker2-Worker0", "Worker3-Worker1"]
    util[run].reset_index(inplace=True)
    util[run].index = pd.TimedeltaIndex(util[run].index*1e6)
    util[run].drop("index", axis=1, inplace=True)
    util[run] = util[run].resample('0.01S').sum()
    util[run] = util[run]/10

In [None]:
y = dict()
for run in msr:
    y[run] = list()
    for i in range(30):
        y[run].append(np.mean(util[run][["Worker0-Worker1"]][9000+100*i:9100+100*i]))
    m = np.mean(y[run])
    std = math.sqrt(np.var(y[run]))
    y[run] = [round(m - 1.96 * (std/10), 2), round(m, 2), round(m + 1.96 * (std/10), 2)]
y

## Temporal Flow Plot (Fig. 6)

In [None]:
def get_flow_plot(mydict, fig, ax):
    df = dict()
    v_min = 0.1
    v_max = 0
    for flow in mydict.keys():
        df[flow] = pd.read_json(mydict[flow])
        df[flow].sort_index(inplace=True)
        v_max = max(v_max, int(df[flow]["frame.len"].max()))

    cmap = plt.cm.Blues

    for (i, flow) in enumerate(mydict.keys()):
        for idx in range(len(df[flow].index)):
            opa = df[flow]["frame.len"][df[flow].index[idx]] / v_max
            ax.broken_barh([(df[flow].index[idx], 10)], (5*(i+1), 4), facecolors=cmap(opa))


    ax.set_ylim(0, 5 * len(mydict.keys()) + 10)
    ax.set_xlim(0, df[flow].index[-1].max()*1.05) 
    ax.set_yticks([5*(i+1) + 2.5 for i in range(len(mydict.keys()))])
    ax.set_yticklabels([])
    ax.set_ylabel('Flows')
    ax.set_xlabel('Time (Sec)')
    
    return ax

In [None]:
runs = [
    f"{DATA_PATH}/tf_sync-resnet50-adam-64-grpc-normal-ring/parsed-files/temporal_flow.json",
    f"{DATA_PATH}/horovod-resnet50-adam-64-mpi-normal-ring/parsed-files/temporal_flow.json",
    f"{DATA_PATH}/kungfu_ssgd-resnet50-adam-64-kungfu-normal-BINARY_TREE_STAR/parsed-files/temporal_flow.json",
    f"{DATA_PATH}/kungfu_pairavg-resnet50-adam-64-kungfu-normal-BINARY_TREE_STAR/parsed-files/temporal_flow.json"
]
fws = ["tf_sync", "horovod", "kungfu_ssgd", "kungfu_pairavg"]
for (run, fw) in zip(runs, fws):
    
    f = open(run)
    data = json.load(f)
    mydict = dict(data)
    
    if fw == "kungfu_pairavg":
        fig, ax = plt.subplots()
        get_flow_plot(mydict, fig, ax)
        sm = matplotlib.cm.ScalarMappable(cmap=plt.cm.Blues)
        sm.set_array([])
        fig.colorbar(sm, ax=ax,label="Normalized Load")
        plt.savefig(f"{PLOT_PATH}/temporal_flow/{fw}.pdf")
    else:
        fig, ax = plt.subplots()
        get_flow_plot(mydict, fig, ax)
        plt.savefig(f"{PLOT_PATH}/temporal_flow/{fw}.pdf")

## Number of Flows (Fig. 5)

In [None]:
no_flows = pd.read_json(path_or_buf=os.path.join(DATA_PATH, "number_flows.json"))
measurements = get_available_measurements(frameworks=["tf_sync"], models=models, optimizers=optimizers, batch_sizes="64", 
                               backends=["grpc"], delays=["normal"], topologies=["ring"])
measurements += get_available_measurements(frameworks=["horovod"], models=models, optimizers=optimizers, batch_sizes="64", 
                               backends=["mpi"], delays=["normal"], topologies=["ring"])
measurements += get_available_measurements(frameworks=["kungfu_ssgd"], topologies="TREE", batch_sizes="64")
measurements += get_available_measurements(frameworks=["kungfu_pairavg"], topologies="TREE", batch_sizes="64")
no_flows = no_flows.loc[measurements]
no_flows = no_flows.sort_index()
out = pd.DataFrame()

for framework in frameworks:
    fm = no_flows.filter(like=framework, axis=0)["flows"].to_frame()    
    fm.index = ["DenseNet201", "MobileNetv2", "ResNet101", "ResNet50"]
    fm = fm.reindex(["MobileNetv2", "DenseNet201", "ResNet50", "ResNet101"])
    fm.columns = [framework]
    out = pd.concat([out, fm], axis=1)

measurements = get_available_measurements(frameworks=["kungfu_ssgd"], topologies="CLIQUE", batch_sizes="64")
measurements += get_available_measurements(frameworks=["kungfu_pairavg"], topologies="CLIQUE", batch_sizes="64")
no_flows = no_flows.loc[measurements]
no_flows = no_flows.sort_index()

for fw in ["kungfu_ssgd", "kungfu_pairavg"]:
    fm = no_flows.filter(like=fw, axis=0)
    fm = fm.filter(like="CLIQUE", axis=0)["flows"].to_frame()
    fm.index = ["DenseNet201", "MobileNetv2", "ResNet101", "ResNet50"]
    fm = fm.reindex(["MobileNetv2", "DenseNet201", "ResNet50", "ResNet101"])
    fm.columns = [f"{fw}clique"]
    out = pd.concat([out, fm], axis=1)
    

out.columns = ["TensorFlow", "Horovod", "KungFu \n(S-SGD)", "KungFu \n(PairAvg)", "KungFu \n(S-SGD) \n[Clique]", "KungFu \n(PairAvg) \n[Clique]"]
out = out.transpose()

In [None]:
fig, ax = plt.subplots()

out.plot(kind="bar", ax=ax, rot=0, color=["midnightblue", "cadetblue", "skyblue", "dodgerblue"], zorder=3)
ax.set_xlabel('Frameworks')
ax.set_ylabel('No. Flows')
ax.legend(bbox_to_anchor=(0.5, 1.2), loc='upper center', ncol=4, frameon=False, columnspacing=0.8, handlelength=1)
ax.grid(color = 'gray', linestyle = '--', linewidth = 0.5, zorder=0)
ax.axvspan(0.5, 1.5, facecolor='gray', alpha=0.2)
ax.axvspan(2.5, 3.5, facecolor='gray', alpha=0.2)
ax.axvspan(4.5, 5.5, facecolor='gray', alpha=0.2)
plt.savefig(f"{PLOT_PATH}/no_flows.pdf")

## Data Transferred (Fig. 3)

In [None]:
bytes_sent = pd.read_json(path_or_buf=os.path.join(DATA_PATH, "bytes_sent.json"))
data = dict()
for framework in frameworks:
    if framework == "kungfu_ssgd" or "kungfu_pairavg":
        bytes_sent_sync = bytes_sent.filter(like=framework, axis=1)
        bytes_sent_sync = bytes_sent_sync.filter(like="adam-64-kungfu-normal-TREE", axis=1)
    if framework == "tf_sync":
        bytes_sent_sync = bytes_sent.filter(like=framework, axis=1)
        bytes_sent_sync = bytes_sent_sync.filter(like="adam-64-grpc-normal-ring", axis=1)
    if framework == "horovod":
        bytes_sent_sync = bytes_sent.filter(like=framework, axis=1)
        bytes_sent_sync = bytes_sent_sync.filter(like="adam-64-mpi-normal-ring", axis=1)
    bytes_sent_sync = bytes_sent_sync/1e9
    bytes_sent_sync.columns = [f"{framework}-{x.split('-')[1]}" for x in bytes_sent_sync]
    bytes_sent_sync.rename(columns={f'{framework}-resnet50': 'ResNet50', f'{framework}-resnet101': 'ResNet101', f'{framework}-mobilenetv2': 'MobileNetv2', f'{framework}-densenet201': 'DenseNet201'}, inplace=True)
    data[framework] = bytes_sent_sync.sum()

In [None]:
fig, ax = plt.subplots()
    
x = pd.DataFrame(data)
x = x[~x.index.str.contains("vgg16")]
x = x.reindex(["MobileNetv2", "DenseNet201", "ResNet50", "ResNet101"])

x.plot(kind="bar", rot=15, ax=ax, color=["midnightblue", "cadetblue", "skyblue", "dodgerblue"], zorder=3)
ax.legend(["TensorFlow", "Horovod", "KungFu (S-SGD)", "KungFu (PairAvg)"], loc='upper left', frameon=False)
ax.grid(color = 'gray', linestyle = '--', linewidth = 0.5, zorder=0)
ax.axvspan(0.5, 1.5, facecolor='gray', alpha=0.2)
ax.axvspan(2.5, 3.5, facecolor='gray', alpha=0.2)
ax.set_xlabel("Model")
ax.set_ylabel("Total Transmitted Volume [GB]")


ax2=ax.twinx()
ax2.plot(["MobileNetv2", "DenseNet201", "ResNet50", "ResNet101"], [14, 80, 98, 171], 'r-o')
ax2.set_ylabel("Model Size [MB]")
ax2.yaxis.label.set_color('r')
ax2.spines['right'].set_color('r')
ax2.tick_params(axis='y', colors='r')
plt.savefig(f"{PLOT_PATH}/data_transferred.pdf"))

## Link Utilization (Fig. 7)

In [None]:
measurements = get_available_measurements(frameworks=["tf_sync"], models="mobilenetv2", optimizers=optimizers, batch_sizes="64", 
                               backends=["grpc"], delays=["normal"], topologies=["ring"])
measurements += get_available_measurements(frameworks=["horovod"], models="mobilenetv2", optimizers=optimizers, batch_sizes="64", 
                               backends=["mpi"], delays=["normal"], topologies=["ring"])
measurements += get_available_measurements(frameworks=["kungfu_ssgd"], models="mobilenetv2", topologies="TREE", batch_sizes="64")
measurements += get_available_measurements(frameworks=["kungfu_pairavg"], models="mobilenetv2", topologies="TREE", batch_sizes="64")

util = dict()

for run in measurements:
    util[run] = pd.read_json(path_or_buf=os.path.join(DATA_PATH, run, "parsed-files", "utilization.json"))
    util[run] = util[run]*8/1e6
    try:
        util[run].columns = ["W1-W2", "W1-W3", "W1-W4", "W2-W1", "W2-W3", "W2-W4", "W3-W1", "W3-W2", "W3-W4", "W4-W1", "W4-W2", "W4-W3"]
    except ValueError:
        util[run].columns = ["W1-W2", "W1-W3", "W2-W1", "W2-W4", "W3-W1", "W4-W2"]
    util[run].reset_index(inplace=True)
    util[run].index = pd.TimedeltaIndex(util[run].index*1e6)
    util[run].drop("index", axis=1, inplace=True)
    util[run] = util[run].resample('0.01S').sum()
    util[run] = util[run]/10

In [None]:
fig, axs = plt.subplots(1, 2)

fworks = ["horovod", "kungfu_pairavg"]
plots = list()
for i, ax in enumerate(axs):
    
    if fworks[i] == "horovod":
        cols = ["W1-W2", "W1-W3", "W1-W4", "W2-W1", "W2-W3", "W2-W4", "W3-W1", "W3-W2", "W3-W4", "W4-W1", "W4-W2", "W4-W3"]
        util[f"{fworks[i]}-mobilenetv2-adam-64-mpi-normal-ring"] = util[f"{fworks[i]}-mobilenetv2-adam-64-mpi-normal-ring"][cols]
        myplt = util[f"{fworks[i]}-mobilenetv2-adam-64-mpi-normal-ring"][9000:9025].plot(ax=ax, legend=False, zorder=3)
        plots.append(myplt)
    elif fworks[i] == "kungfu_ssgd":
        cols = ["W1-W2", "W1-W3", "W2-W1", "W2-W4", "W3-W1", "W4-W2"]
        util[f"{fworks[i]}-mobilenetv2-adam-64-kungfu-normal-TREE"] = util[f"{fworks[i]}-mobilenetv2-adam-64-kungfu-normal-TREE"][cols]
        myplt = util[f"{fworks[i]}-mobilenetv2-adam-64-kungfu-normal-TREE"][9000:9025].plot(ax=ax, legend=False, zorder=3)
        plots.append(myplt)
    elif fworks[i] == "kungfu_pairavg":
        cols = ["W1-W2", "W1-W3", "W1-W4", "W2-W1", "W2-W3", "W2-W4", "W3-W1", "W3-W2", "W3-W4", "W4-W1", "W4-W2", "W4-W3"]
        util[f"{fworks[i]}-mobilenetv2-adam-64-kungfu-normal-TREE"] = util[f"{fworks[i]}-mobilenetv2-adam-64-kungfu-normal-TREE"][cols]
        myplt = util[f"{fworks[i]}-mobilenetv2-adam-64-kungfu-normal-TREE"][9000:9025].plot(ax=ax, legend=False, zorder=3)
        plots.append(myplt)
        
    ax.set_xlabel('Time (ms)')
    ax.set_ylabel('Throughput [Gbps]')
    ax.set_xticklabels([0, 50, 100, 150, 200])
    ax.set_ylim([0, 8])
    ax.grid(color = 'gray', linestyle = '--', linewidth = 0.5, zorder=0)
    w = (ax.get_xlim()[1] - ax.get_xlim()[0])*5/24
    ax.axvspan(ax.get_xlim()[0] + w, ax.get_xlim()[0] + 2*w, facecolor='gray', alpha=0.2)
    ax.axvspan(ax.get_xlim()[0] + 3*w, ax.get_xlim()[0] + 4*w, facecolor='gray', alpha=0.2)
fig.legend(plots, labels=["W1-W2", "W1-W3", "W1-W4", "W2-W1", "W2-W3", "W2-W4", "W3-W1", "W3-W2", "W3-W4", "W4-W1", "W4-W2", "W4-W3"], bbox_to_anchor=(0.52, 1.12), loc='upper center', ncol=6, frameon=False, columnspacing=1.5, handlelength=1.5)
fig.text(0.3, -0.012, "(a) Synchronous.", ha='center')
fig.text(0.78, -0.012, "(b) Asynchronous.", ha='center')
plt.savefig(f"figures/linkutil_v2.pdf")

## Accuracy (Fig. 8)

In [None]:
def get_data(model, path):
    mydf = pd.read_csv(os.path.join(path, model, "train_history.log"))
    mydf = mydf[mydf.Type != "Epoch"]
    mydf.Timestamp += 7200
    mydf['date'] = pd.to_datetime(mydf['Timestamp'], unit='s')
    mydf = mydf[["date", "Accuracy"]]
    x = pd.read_json(path_or_buf=os.path.join(path, model, "parsed-files", "bytes_per_sec.json"))
    x['date'] = pd.to_datetime(x['frame.time'], unit='ms')
    x = x[["date", "frame.len"]]
    x = x.resample(np.mean(mydf["date"].diff()), on='date').sum()
    x["cum"] = (x["frame.len"].cumsum()) / 1e9
    mydf = mydf.set_index("date")
    name = model
    return mydf, x, name

In [None]:
mappings = {"tf_sync": "TensorFlow", "horovod": "Horovod", "kungfu_ssgd": "KungFu (S-SGD)", "kungfu_pairavg": "KungFu (PairAvg)"}
models = ["mobilenetv2", "densenet201", "resnet50", "resnet101"]


fig, axs = plt.subplots(1, 4)
plots = list()

color_mappings = {"tf_sync": "midnightblue","horovod": "cadetblue","kungfu_ssgd": "skyblue","kungfu_pairavg": "dodgerblue"}


for i, ax in enumerate(axs):
    
    lgd = list()
    runs = [f'tf_sync-{models[i]}-adam-64-grpc_nccl-normal-ring', f'horovod-{models[i]}-adam-64-mpi-normal-ring', 
            f'kungfu_ssgd-{models[i]}-adam-64-kungfu-normal-TREE', f'kungfu_pairavg-{models[i]}-adam-64-kungfu-normal-TREE']
    for run in runs:
        x, y, model = get_data(run, DATA_PATH)
        fwork = model.split("-")[0]
        model = model.split("-")[1]
        try:
            myplt = ax.plot(y["cum"], x.Accuracy, color=color_mappings[fwork], zorder=3)
            myplt.append(plots)
            lgd.append(fwork)
            print(colors)
        except ValueError:
            for i in range(3000):
                try:
                    myplt = ax.plot(y["cum"][i:len(y["cum"])], x.Accuracy, color=color_mappings[fwork], zorder=3)
                    myplt.append(plots)
                    lgd.append(fwork)
                except ValueError:
                    continue

    ax.set_xlabel("Data Transmitted [GB]")
    ax.set_ylabel("Accuracy")
    ax.axhline(0.6, ls='--', color="black", alpha=0.9)
    ax.axhline(0.8, ls='--', color="black", alpha=0.9)
    ax.set_ylim([0, 1])
    ax.set_yticks(ticks=[0.2, 0.4, 0.6, 0.8])
    if model == "mobilenetv2":
        ax.set_xticks(ticks=[0, 50, 100, 150, 200])
        ax.grid(color = 'gray', linestyle = '--', linewidth = 0.5, zorder=0)
        ax.axvspan(50, 100, facecolor='gray', alpha=0.2)
        ax.axvspan(150, 200, facecolor='gray', alpha=0.2)
    if model == "densenet201":
        ax.set_xticks(ticks=[0, 500, 1000, 1500, 2000])
        ax.grid(color = 'gray', linestyle = '--', linewidth = 0.5, zorder=0)
        ax.axvspan(500, 1000, facecolor='gray', alpha=0.2)
        ax.axvspan(1500, 2000, facecolor='gray', alpha=0.2)
    if model == "resnet50":
        ax.set_xticks(ticks=[0, 500, 1000, 1500, 2000])
        ax.grid(color = 'gray', linestyle = '--', linewidth = 0.5, zorder=0)
        ax.axvspan(500, 1000, facecolor='gray', alpha=0.2)
        ax.axvspan(1500, 2000, facecolor='gray', alpha=0.2)
    if model == "resnet101":
        ax.set_xticks(ticks=[0, 1000, 2000, 3000, 4000])
        ax.grid(color = 'gray', linestyle = '--', linewidth = 0.5, zorder=0)
        ax.axvspan(1000, 2000, facecolor='gray', alpha=0.2)
        ax.axvspan(3000, 4000, facecolor='gray', alpha=0.2)
    
    
fig.legend(plots, labels=["TensorFlow", "Horovod", "KungFu (S-SGD)", "KungFu (PairAvg)"], bbox_to_anchor=(0.5, 1.08), loc='upper center', ncol=4, frameon=False)
fig.text(0.15, -0.014, "(a) MobileNetv2.", ha='center')
fig.text(0.40, -0.014, "(b) DenseNet201.", ha='center')
fig.text(0.64, -0.014, "(c) ResNet50.", ha='center')
fig.text(0.88, -0.014, "(d) ResNet101.", ha='center')
plt.tight_layout()
plt.savefig(f"figures/accuracy.pdf")

## Throughput Table (Table I)

In [None]:
msr = get_available_measurements(frameworks="kungfu_pairavg", delays="normal", batch_sizes="64", backends="kungfu")
out = dict()
tput = dict()
for run in msr:
    tput[run] = pd.read_json(path_or_buf=os.path.join(DATA_PATH, run, "parsed-files", "bytes_per_sec.json"))
    tput[run] = tput[run]*8/1e6
    tput[run].index = pd.TimedeltaIndex(tput[run].index*1e6)
    tput[run] = tput[run].resample('S').sum()
    tput[run] = tput[run]/1000
    tput[run] = tput[run].reset_index()
    tput[run] = tput[run][["frame.len"]]
    m = float(tput[run][200:300].mean())
    std = math.sqrt(tput[run][200:300].var())
    out[run] = [round(m - 1.96 * (std/10), 2), round(m, 2), round(m + 1.96 * (std/10), 2)]
out 

## Data Transmitted vs Step (Sec. VI - Prediction Model)

In [None]:
all_models = get_available_measurements(frameworks=frameworks, models=models, optimizers="adam", batch_sizes=batch_sizes, 
                                        backends=backends, delays="normal", topologies=topologies, path=DATA_PATH)

In [None]:
data = dict()
for run in all_models:
    mydf = pd.read_csv(os.path.join(DATA_PATH, run, "train_history.log"))
    mydf = mydf[mydf.Type != "Epoch"]
    mydf.Timestamp += 7200
    mydf['date'] = pd.to_datetime(mydf['Timestamp'], unit='s')
    mydf = mydf[["date", "Accuracy"]]
    x = pd.read_json(path_or_buf=os.path.join(DATA_PATH, run, "parsed-files", "bytes_per_sec.json"))
    x['date'] = pd.to_datetime(x['frame.time'], unit='ms')
    x = x[["date", "frame.len"]]
    data_transmitted = x.resample(np.mean(mydf["date"].diff()), on='date').sum()
    data[run] = data_transmitted.mean()/1e6

In [None]:
parameters = [i.split("-") + [float(data[i])] for i in all_models]

In [None]:
parameters = [[f"{l[0]}_{l[4]}", l[1], l[2], l[3], l[5], l[6], l[7]] for l in parameters]

In [None]:
df = pd.DataFrame(parameters, columns = ['framework_backend', 'model', 'optimizer', 'bsize', 'delay', 'topology', 'data'])

mapping = {"mobilenetv2": 14, "densenet201": 80, "resnet50": 98, "resnet101": 171}
mapping2 = {"mobilenetv2": 3.5, "densenet201": 20, "resnet50": 26, "resnet101": 45}
df['model_size'] = df['model'].apply(lambda x: mapping[x])
df['no_param'] = df['model'].apply(lambda x: mapping2[x])

In [None]:
X = df[['model_size', 'framework_backend', 'bsize', 'topology']]
y = df['data']
X = pd.get_dummies(data=X, drop_first=True)

In [None]:
model = linear_model.LinearRegression()
model.fit(X, y)

In [None]:
X_train_Sm= sm.add_constant(X)
ls=sm.OLS(y,X_train_Sm).fit()
print(ls.summary())