In [None]:
import pandas as pd
import glob
import json
import dotted # https://pypi.org/project/dotted-notation/
import re
import matplotlib.pyplot as plt

from pathlib import Path
import seaborn as sns
import lib.datasciencetoolbelt as dstools
from lib.resultstorage import ResultStorage

In [None]:
dstools.setup({
    "seaborn_context": "talk",
    "savefig": {
        "enable": True,
        "dir": Path("./postprocess_results"),
    }
})
result_storage = ResultStorage(Path("./results"))

#%matplotlib qt
%matplotlib inline


In [None]:
result_storage_prefix = "itxg_bypass_v6"

id_vars__dottedpath_and_shortname_and_type = [
    ("zfs_setup.module_args.zfs.zfs_zil_itxg_bypass", "itxg_bypass", str),
    ("zfs_setup.module_args.zfs.zvol_request_sync", "zvol_request_sync", str), # technically not in the v6 set, but that's just because we limited the scope of the benchmark for time reasons
    ("zfs_setup.module_args.zfs.zfs_zil_pmem_prb_ncommitters", "ncommitters", int),
    ("fio_config.fsync_every", "fsync_every", int),
    ("fio_config.numjobs", "numjobs", int)
]
id_vars = [p[1] for p in id_vars__dottedpath_and_shortname_and_type]

def extract_id_var_values(output_json):
    d = output_json
    id_var_values = {}
    for dp, sn, ty in id_vars__dottedpath_and_shortname_and_type: 
        v = dotted.get(d, dp)
        if not v:
            raise Exception(f"{d['file']}: dotted path {dp} not found")
        if sn in id_var_values:
            raise Exception(f"duplicate shortname {sn}")
        try:
            id_var_values[sn] = ty(v)
        except ValueError as e:
            raise Exception(f"cannot parse v={v!r}") from e
    return id_var_values

def get_fio_write_metrics(output_json):
    d = output_json
    jobs = dotted.get(d, "fio_jsonplus.jobs")
    assert len(jobs) == 1
    j0 = jobs[0]
    jw = jobs[0]["write"]
    return jw

def to_fio_results_dict(output_json):
    jw = get_fio_write_metrics(output_json)
    return {
        **extract_id_var_values(output_json),
        "w_iops_mean": jw["iops_mean"],
        "w_iops_stddev": jw["iops_stddev"],
        "w_lat_mean": dotted.get(jw, "lat_ns.mean"),
        "w_lat_stddev": dotted.get(jw, "lat_ns.stddev"),
    }

def to_kstat_results_dict(output_json):
    d = output_json
    return {
        **extract_id_var_values(output_json),
        **d["zvol_stats"],
        **d["itxg_bypass_stats"],
        **d["zil_pmem_stats"],
        **d["zil_pmem_ringbuf_stats"],
        "bio_total": d["zvol_stats"]["submit_bio__zvol_write(with_taskq_if_enabled)"],
        "taskq_delay": dotted.get(d, 'zvol_stats.zvol_write__taskq_qdelay'),
        "assign_aquire": dotted.get(d, 'itxg_bypass_stats.assign__aquisition_total'),
        "assign_vtable": dotted.get(d, 'itxg_bypass_stats.assign__vtable'),
        "assign_total": dotted.get(d, 'itxg_bypass_stats.assign__total'),
        "commit_total": dotted.get(d, 'itxg_bypass_stats.commit__total'),
        "commit_aquire": dotted.get(d, 'itxg_bypass_stats.commit__aquire'),
        
    }

def to_cpu_dict(output_json):
    d = output_json
    return {
        **extract_id_var_values(output_json),
        **{f"cpu_{comp}": val for comp, val in dotted.get(d, "cpu_time.allcpu").items()},
    }

In [None]:
# compute `df_kstats`
rows = [to_kstat_results_dict(j) for j in result_storage.iter_results(result_storage_prefix)]
df_kstats = pd.DataFrame.from_dict(rows).set_index(id_vars).sort_index()

In [None]:
# compute `df_cpu`
rows = [to_cpu_dict(j) for j in result_storage.iter_results(result_storage_prefix)]
df = pd.DataFrame.from_dict(rows)
df = df.set_index(id_vars).sort_index()
df = df.rename_axis("metric", axis=1)
df = df.stack()
df_cpu = df
del df
df_cpu

In [None]:
## derive `df_cpu.notidle`
tmp = df_cpu.unstack("metric")
tmp["cpu_not_idle"] = tmp.sum(axis=1) - tmp.cpu_idle
df_cpu = tmp.stack()

In [None]:
# compute `df`
rows = [to_fio_results_dict(j) for j in result_storage.iter_results(result_storage_prefix)]
df = pd.DataFrame.from_dict(rows)
df = df.set_index(id_vars).sort_index()
df = df.rename_axis("metric", axis=1)
df = df.stack()
df

In [None]:
# a quick peek on the actual data in `df`
df.unstack("metric")

In [None]:
# define df_zfssetup
data = df.unstack(["itxg_bypass", "zvol_request_sync", "ncommitters"])
data.columns = data.columns.map(lambda x: f"zil-pmem bypass={x[0]} zvol_taskq={ {'1':'no', '0':'yes'}[x[1]] } ncommitters={x[2]}")
data = data.rename_axis("zfs_setup", axis=1)
data = data.stack()
data
df_zfssetup = data
del data

In [None]:
id_vars

# Latency Breakdown

In [None]:
import itertools

def filter_by_index_value(df, level, filter):
    """Return a new df that only contains rows whose MultiIndex column `level`'s value passes `filter`"""
    return df[df.index.get_level_values(level).map(filter)]

def remove_index_dimension(df, level, value):
    """Reduce dimensionality of a dataframe by filtering by and subsequently dropping one of its index levels.
    
    df is assumed to be a multi-indexed pd.DataFrame.
    First, filter the data frame so that we only keep rows whose index tuple has value `value` at level `level`.
    Now the resulting data frame only has a single value at the level.
    Thus remove that level from the index.
    Voila: dimensionality reduced.
    """
    df = df[df.index.get_level_values(level) == value]
    assert set(df.index.get_level_values(level)) == {value}
    df.index = df.index.droplevel(level)
    return df

def _test_remove_index_dimension():
    data = [{"favnum": n, "favletter": l, "id": id} for id, (n, l) in enumerate(itertools.product([23,42],["a", "b"]))]
    d = pd.DataFrame(data).set_index(["favnum", "favletter"])
    display(d)
    display(remove_index_dimension(d, "favnum", 23))
    display(remove_index_dimension(d, "favletter", "b"))
    
_test_remove_index_dimension()

In [None]:
def level_values_sorted_unique(df, level):
    """Returns the sorted unique values of a DataFrame's multi-index at level `level`"""
    return sorted(list(set(df.index.get_level_values(level))))

class AttrDict(dict):
    def __init__(self, *args, **kwargs):
        super(AttrDict, self).__init__(*args, **kwargs)
        self.__dict__ = self
        
class FactorizedDataFrameItem(AttrDict):
    @property
    def title(self):
        return f"{self.fdf.row}={self.rv}|{self.fdf.col}={self.cv}"
        
        
class FactorizedDataFrame:
    def __init__(self, data, row, col):
        self.data = data
        self.col = col
        self.row = row

        self.col_values = level_values_sorted_unique(self.data, self.col)
        self.row_values = level_values_sorted_unique(self.data, self.row)
        
    def iter_factorized(self):
        for ci, c in enumerate(self.col_values):
            for ri, r in enumerate(self.row_values):
                d = self.data.copy()
                d = remove_index_dimension(d, self.col, c)
                d = remove_index_dimension(d, self.row, r)
                # display(d)
            
                context = FactorizedDataFrameItem({
                    "fdf": self,
                    "d": d,
                    "ri": ri,
                    "rv": r,
                    "ci": ci,
                    "cv": c,
                })
                yield context
                

def factorplot(data=None, row=None, col=None, plot=None, subplots_kw={}):
    """Factorizez MultiIndex'ed DataFrame `data`, then invokes `plot` for each FactorizedDataFrameItem"""
    
    fdf = FactorizedDataFrame(data, row, col)
    
    subplots_kw = {
        "gridspec_kw": {'hspace': 1},
        **subplots_kw,
        "squeeze": False, # axes should always be two-dimensional
    }

    fig, axes = plt.subplots(len(fdf.row_values), len(fdf.col_values), **subplots_kw)

    for f in fdf.iter_factorized():
        ax = axes[f.ri, f.ci]
        ax.set_title(f.title)
        legend = f.ri == len(fdf.row_values)-1 and f.ci == len(fdf.col_values)-1
        plot(f, ax, legend)
        if legend:
            plt.legend(loc='lower left', bbox_to_anchor=(1,0.5))

In [None]:
df_kstats.columns

In [None]:
   
def compute_latency_components(normalize=None):

    # compute latency breakdown variables
    data = df_kstats.copy()
    data['t_taskq'] = data.zvol_write__taskq_qdelay
    data['t_bypass_commit'] = data.commit_total
    data['t_bypass_assign_aquisition'] = data.assign__aquisition_total
    data['t_bypass_assign_exit'] = data.assign__exit
    data['prb_total'] = data.write_entry_time - data.prb_write__pmem
    data['t_prb_sem'] = data.prb_write__get_committer_slot + data.prb_write__put_committer_slot
    data['t_prb_dt_aq'] = data.prb_write__dt_sl_aquisition
    data['t_prb_other'] = data.prb_total - (data.t_prb_sem + data.t_prb_dt_aq)
    data['t_pmem'] = data.prb_write__pmem
    data['t_get_data'] = data.get_data_time
    data['t_zil'] = data.assign__vtable - data.write_entry_time
    data['t_zvol_write'] = data.bio_total - (data.zvol_write__taskq_qdelay + data.zvol_write__1zil_commit + data.zvol_write__zvol_log_write_finish + data.zvol_write__2zil_commit)
    
    components = ['t_taskq', 't_bypass_commit', 't_bypass_assign_aquisition',
                  't_bypass_assign_exit', 't_prb_sem', 't_prb_dt_aq', 't_prb_other', 't_pmem', 't_get_data', 't_zil', 't_zvol_write']
    
    # t_other is what we didn't account for yet
    data['t_other'] = data.bio_total - data[components].sum(axis=1)
    components += ['t_other']

    data = normalize(data)

    # project to components
    data = data[components]


    data = data.rename_axis('component', axis=1)
    data = data.stack()
    data.name = "t"
    data = pd.DataFrame(data)
    
    return data
    

def normalize_by_numjobs(data):
    tmp = data.reset_index(level='numjobs')
    numjobs_orig = tmp.numjobs
    tmp = tmp.div(tmp.numjobs, axis=0)
    tmp['numjobs'] = numjobs_orig
    tmp = tmp.set_index('numjobs', append=True)
    return tmp

def normalize_by_numjobs_except_t_pmem(data):
    tmp = data.reset_index(level='numjobs')
    numjobs_orig = tmp.numjobs
    tmp = tmp.div(tmp.numjobs, axis=0)
    tmp['t_pmem'] = tmp['t_pmem'].mul(numjobs_orig) # !!!!!!!!!!!! don't normalize t_pmem
    tmp['numjobs'] = numjobs_orig
    tmp = tmp.set_index('numjobs', append=True)
    return tmp

def normalize_by_bio_total(data):
    return data.div(data.bio_total, axis=0)


## How The Latency Breakdown Evolves For Different `itxg_bypass` x `ncommitters`

In [None]:
row_var = 'itxg_bypass'
row_var_values = ["1", "2"]
col_var = 'ncommitters'
col_var_values = [1,2,4,6,8,12,16,24]
xlim=(0,16)
xticks=range(0,17, 2)

def filter_idvars(data):
    data = remove_index_dimension(data, 'zvol_request_sync', "1")
    data = remove_index_dimension(data, 'fsync_every', 32)
    data = filter_by_index_value(data, col_var, lambda v: v in col_var_values)
    data = filter_by_index_value(data, row_var, lambda v: v in row_var_values)
#     data = data.query('@row_var in @row_var_values')
    data = data.query('numjobs <= 16')
#     data = data.query('ncommitters in [1,2,4,6,8,12,16,24] and fsync_every in [1,16,32] and numjobs <= 16')
    return data

def make_data(normalize=None):
    data = compute_latency_components(normalize=normalize)    
    return filter_idvars(data)

print("relative latency breakdown")

def plot_rel_latency_breakdown(f, ax, legend):
    f.d.plot.area(ax=ax, legend=legend, xlim=xlim, xticks=xticks, ylim=(-0.1, 1.1))

data_lb = make_data(normalize=normalize_by_bio_total).unstack("component")
factorplot(data=data_lb, col=col_var, row=row_var, plot=plot_rel_latency_breakdown,
          subplots_kw={
            "figsize": (30, 10),
            "gridspec_kw": {
                "hspace": 0.2,
            },
        })

plt.show()

# print("absolute latency stacked")

# def plot_abs_latency_breakdown(f, ax, legend):
#     f.d.plot.area(ax=ax, legend=legend, xlim=xlim, xticks=xticks, ylim=(0, 1.1*f.fdf.data.sum(axis=1).max()))
# data_lb = make_data(normalize=lambda d: d).unstack("component")
# # display(data_lb.loc[("1",1)])
# factorplot(data=data_lb, col=col_var, row=row_var, plot=plot_abs_latency_breakdown,
#           subplots_kw={
#             "figsize": (30, 10),
#             "gridspec_kw": {
#                 "hspace": 0.2,
#             },
#         })

# plt.show()

print("""
Absolute latencies divided by numjobs, EXCEPT t_pmem.

Rationale: This section of the evaluation is about evaluating the PMEM write overhead limiter.
Reminder: Writing to PMEM above its write bandwidth limit wastes CPU time because the write
          instructions stall waiting for PMEM.
This means that an ideal implementation would allow t_pmem to reach the value where we achieve
peak performance (IOPS in our case), then keep the value at that level by queuing additional
writers on the CPU (we do it using a semaphore in t_prb).
Thus, an ideal t_pmem should look like so:

   _____________
  /
 /
/

If we divided t_pmem by `numjobs` like all the other `t_` metrics, that straight line would
need to turn into a declining line, proportional to `numjobs`.
That's quite hard to spot, so we felt it's better to not scale t_pmem at all.
""")

def plot_latency_curves(f, ax, legend):
    f.d.plot.line(ax=ax, legend=legend, xlim=xlim, xticks=xticks, ylim=(0, 1.1*f.fdf.data.max().max()))
data_abs = make_data(normalize=normalize_by_numjobs_except_t_pmem).unstack("component")
factorplot(data=data_abs, col=col_var, row=row_var, plot=plot_latency_curves,
          subplots_kw={
            "figsize": (30, 10),
            "gridspec_kw": {
                "hspace": 0.2,
            },
        })

plt.show()

print("performance metrics")

def plot_perf_metric(metric):

    full_data = df.unstack("metric")
    full_data = filter_idvars(full_data)
    full_data = full_data[[metric]]
#     ymax = full_data[metric].max() * 1.1

    def plot(f, ax, legend):
    #     display(data)
        #     data = data.pivot("numjobs", "metric", "value")
        #data.plot.line(ax=ax, legend=legend, xlim=(0, None), ylim=(0, context["full_data"][metric].max()))
        f.d.plot.line(ax=ax, legend=legend, xlim=xlim, xticks=xticks, ylim=(0, 1.1*f.fdf.data.max().max()))

    factorplot(data=full_data, col=col_var, row=row_var,
              plot=plot,
              subplots_kw={
                "figsize": (30, 5),
                "gridspec_kw": {
                    "hspace": 1,
                }
              })
    plt.show()

print("IOPS mean")
plot_perf_metric("w_iops_mean")
print("IOPS std")
plot_perf_metric("w_iops_stddev")

print("t_pmem / IOPS (how much time we spend writing PMEM per IOP (shouldn't grow after peak performance)")
t_pmem = make_data(normalize=lambda d: d).unstack("component")[("t", "t_pmem")]
iops = df.unstack("metric")["w_iops_mean"]
data = t_pmem / iops
data.name = "t_pmem/IOPS"
data = pd.DataFrame(data)
data = filter_idvars(data)

def plot(f, ax, legend):
#     display(data)
    #     data = data.pivot("numjobs", "metric", "value")
    #data.plot.line(ax=ax, legend=legend, xlim=(0, None), ylim=(0, context["full_data"][metric].max()))
    f.d.plot.line(ax=ax, legend=legend, xlim=xlim, xticks=xticks, ylim=(0, 1*f.fdf.data.max().max()))

factorplot(data=data, col=col_var, row=row_var,
          plot=plot,
          subplots_kw={
            "figsize": (30, 5),
            "gridspec_kw": {
                "hspace": 1,
            }
          })
plt.show()

## How Some Latency Components are affected by different `itxg_bypass`

In [None]:
col_var = 'ncommitters'
col_var_values = [1,2,3,4,8]
row_var = 'variable'
row_var_values = ["w_iops_mean", "t_pmem", "t_prb_sem", "t_bypass_assign_aquisition", "t_zvol_write"]
xlim=(0,16)
xticks=range(0,17, 2)

def filter_idvars(data):
    data = remove_index_dimension(data, 'zvol_request_sync', "1")
    data = remove_index_dimension(data, 'fsync_every', 32)
    data = filter_by_index_value(data, col_var, lambda v: v in col_var_values)
    data = filter_by_index_value(data, row_var, lambda v: v in row_var_values)
#     data = data.query('@row_var in @row_var_values')
    data = data.query('numjobs <= 16')
#     data = data.query('ncommitters in [1,2,4,6,8,12,16,24] and fsync_every in [1,16,32] and numjobs <= 16')
    return data

def make_data(normalize=None):
    data = compute_latency_components(normalize=normalize)    
    data.index.rename('variable', 'component', inplace=True)
    data.columns = ['value']
    data['kind'] = 'latency_breakdown'
    data['value'] = data.value / 1e9 # scale to seconds
#     display(data)
    
#     data.columns.set_levels(['variable'], columns = data.rename_axis('variable')
    perf = df.copy()
    perf.index.rename("variable", 'metric', inplace=True)
    perf.name = 'value'
    perf = pd.DataFrame(perf)
    perf['kind'] = 'perf'
#     display(perf)

    data = pd.concat([data, perf], axis=0)
#     display(data)
    return filter_idvars(data)


def plot_rel_latency_breakdown(f, ax, legend):
    # this realizes 'sharey=False' 
    if f.rv == "w_iops_mean":
        ymax = f.fdf.data[(f.fdf.data.kind == 'perf') & (f.fdf.data.index.get_level_values('variable') == f.rv)]['value'].max()
    else:
        ymax = f.fdf.data[(f.fdf.data.kind == 'latency_breakdown') & (f.fdf.data.index.get_level_values('variable') == f.rv)]['value'].max()
    ylim=(0, 1.1*ymax)
    f.d['value'].unstack("itxg_bypass").plot.line(ax=ax, legend=legend, xlim=xlim, xticks=xticks, ylim=ylim)

data_lb = make_data(normalize=lambda d: d)
factorplot(data=data_lb, col=col_var, row=row_var, plot=plot_rel_latency_breakdown,
          subplots_kw={
            "figsize": (60, 20),
            "gridspec_kw": {
                "hspace": 0.5,
            },
        })

plt.show()

# Conclusion

This reaffirms our observations in the previous section:
- `itxg_bypass=1` spends most of its time in `itxg_bypass_assign_aquisition` (the rwlock) for `numjobs >= 6`
  - The rwlock is used to add a serialization point on zil_commit(), which is required for correctness.
    (A more scalable technique than rwlock might exist but that's out of scope for now)
- `itxg_bypass=2` does not use the rwlock
  - Thus it's incorrect from a ZIL perspective.
  - But allows us to stress the PMEM Overload Protection functionality and observe how it scales.
- => Comparison
  - `ncommitters=1`:
    - This `ncommitters` value does not achieve peak performance
  - `ncommitters=2`:
    - This `ncommitters` value achieves peak performance at `numjobs=6`
    - `itxg_bypass=1` has a significant decline in peak performance for `numjobs > 9`
    - `itxg_bypass=2` also shows a decline, but less so.
    - (Keep in mind that we only have 8 cores (16 SMT threads))
  - For `ncommitters>=3` we observe that IOPS do not improve beyond what we observed for `ncommitters=3`
  - But the stability of the achieved IOPS over rising `numjobs` is different among the two `itxg_bypass` variants
    - `itxg_bypass=1` IOPS stabilizes, albeit at the cost of wasting time on PMEM
    - `itxg_bypass=2` IOPS drop at `(ncommitters,numjobs) = [(3,11),(4,13)]`.
      - That drop in IOPS is explained by the combined cost of rwlock aquisition and prb semaphore aquisition time
        - "proof": the drop goes away for `ncommitters=8`

Conclusion:
- PMEM overload protection works as intended: `t_pmem` does not rise after peak performance is reached if `ncommitters` is configured correctly (`1` or `2` for this setup)
- `itxg_bypass=1` has the rwlock overhead which is significant even if ony `fsync_every=32`'th operation is serializing. That overhead relieves shifts wait time from `t_prb_rwsem` to `t_bypass_assign_aq`.

Critique:
- `itxg_bypass=2`'s drop in IOPS at `ncommitters=2` for `numjobs > 6` is suboptimal. Explanations?
  - ???
  - Maybe the optimal value is `ncommitters=2.X`?