In [None]:
pip install pandas matplotlib scipy numpy seaborn scikit-learn

In [None]:
import copy
import pandas
import warnings
import requests
import scipy
from scipy import stats
import subprocess
import numpy as np
from matplotlib import pyplot as plt
import json
import pprint
import seaborn
import os
import sys

sys.path.insert(0, "../src")
from perf_tools.analysis import make_differential_frame, get_data, get_summary_statistics
from perf_tools.analysis import check_are_close, make_latency_plot, plot_latency_stats

In [None]:
class ContinuousRWWorkload:
    def __init__(self, workdir, patch_id, variant, execution, task_name):
        self.workdir = workdir
        self.patch_id = patch_id
        self.variant = variant
        self.execution = execution
        self.task_name = task_name
        self.insert_data = None
        self.find_data = None
        self.update_data = None
        self.crud_data = None
        self.overall_throughput_data = None
    def json_path(self, metric):
        return os.path.join(self.workdir, self.patch_id, self.variant,
            self.task_name, str(self.execution), metric + ".json")
    def get_insert_data(self):
        if self.insert_data is None:
            self.insert_data = get_data(self.json_path("ContinuousRW.insert"))
        return self.insert_data
    def get_find_data(self):
        if self.find_data is None:
            self.find_data = get_data(self.json_path("ContinuousRW.find"))
        return self.find_data
    def get_update_data(self):
        if self.update_data is None:
            self.update_data = get_data(self.json_path("ContinuousRW.update"))
        return self.update_data
    def get_crud_data(self):
        if self.crud_data is None:
            self.crud_data = get_data(self.json_path("ContinuousRW.Crud"))
        return self.crud_data
    def get_overall_throughput_data(self):
        if self.overall_throughput_data is None:
            insert_ops = self.get_insert_data().diff_data[["ts", "d(ops)"]]
            update_ops = self.get_update_data().diff_data[["ts", "d(ops)"]]
            find_ops = self.get_find_data().diff_data[["ts", "d(ops)"]]
            all_ops = pandas.concat([insert_ops, update_ops, find_ops], ignore_index=True)
            all_ops.sort_values("ts", inplace=True)
            all_ops.reset_index(drop=True, inplace=True)
            all_ops["duration"] = (all_ops["ts"] - all_ops["ts"].iloc[0]).astype(int) / 1000000000
            all_ops["total_ops"] = all_ops["d(ops)"].cumsum()
            all_ops["throughput"] = all_ops["total_ops"] / all_ops["duration"]
            self.overall_throughput_data = all_ops
        return self.overall_throughput_data

    def _plot_line_or_scatter(self, df, x, y, line=False, start=None, end=None, **kwargs):
        if line:
            return df[start:end].plot(x=x, y=y, figsize=(20,20), **kwargs)
        return df[start:end].plot.scatter(x=x, y=y, figsize=(20,20), **kwargs)

    def plot_insert_data(self, x, y, line=False, start=None, end=None, **kwargs):
        title=f"{self.variant}-{self.task_name} inserts {y}"
        return self._plot_line_or_scatter(self.get_insert_data().diff_data, x, y, line, start, end, title=title, **kwargs)
    def plot_find_data(self, x, y, line=False, start=None, end=None, **kwargs):
        title=f"{self.variant}-{self.task_name} finds {y}"
        return self._plot_line_or_scatter(self.get_find_data().diff_data, x, y, line, start, end, title=title, **kwargs)
    def plot_update_data(self, x, y, line=False, start=None, end=None, **kwargs):
        title=f"{self.variant}-{self.task_name} updates {y}"
        return self._plot_line_or_scatter(self.get_update_data().diff_data, x, y, line, start, end, title=title, **kwargs)
    def plot_crud_data(self, x, y, line=False, start=None, end=None, **kwargs):
        title=f"{self.variant}-{self.task_name} crud {y}"
        return self._plot_line_or_scatter(self.get_crud_data().diff_data, x, y, line, start, end, title=title, **kwargs)

    # Plot the data for insert, find, update
    def plot_rw_data(self, x, y, start=None, end=None, noupdate=False):
        insert_df = self.get_insert_data().diff_data[start:end]
        find_df = self.get_find_data().diff_data[start:end]
        plt.figure(figsize=(20,20))
        plt.ylabel(y)
        plt.xlabel(x)
        plt.title(f"{self.variant}-{self.task_name} inserts, updates, finds {y}")
        if not noupdate:
            update_df = self.get_update_data().diff_data[start:end]
            plt.plot(update_df[x], update_df[y], alpha=0.8, label=f"update {y}")
        plt.plot(find_df[x], find_df[y], alpha=0.8, label=f"find {y}")
        plt.plot(insert_df[x], insert_df[y], alpha=0.8, label=f"insert {y}")
        plt.legend()
        plt.show()

    def plot_throughput_data(self, x, start=None, end=None):
        tpdf = self.get_overall_throughput_data()
        title=f"{self.variant}-{self.task_name} overall throughput"
        return tpdf[start:end].plot(x=x, y="throughput", ylabel="ops per sec", title=title, figsize=(20,20))
        
    def get_insert_summary_statistics(self):
        data = self.get_insert_data()
        return get_summary_statistics(data.diff_data, data.fixed_data, data.raw_data)
    def get_find_summary_statistics(self):
        data = self.get_find_data()
        return get_summary_statistics(data.diff_data, data.fixed_data, data.raw_data)
    def get_update_summary_statistics(self):
        data = self.get_update_data()
        return get_summary_statistics(data.diff_data, data.fixed_data, data.raw_data)
    def get_crud_summary_statistics(self):
        data = self.get_crud_data()
        return get_summary_statistics(data.diff_data, data.fixed_data, data.raw_data)
    def print_all_summary_statistics(self, noupdate=False):
        pp = pprint.PrettyPrinter()
        print("INSERT SUMMARY STATS:")
        pp.pprint(self.get_insert_summary_statistics())
        print("FIND SUMMARY STATS:")
        pp.pprint(self.get_find_summary_statistics())
        if not noupdate:
            print("UPDATE SUMMARY STATS:")
            pp.pprint(self.get_update_summary_statistics())

class ContinuousRWWorkloadWithCompact(ContinuousRWWorkload):
    def __init__(self, workdir, patch_id, variant, execution, task_name):
        ContinuousRWWorkload.__init__(self, workdir, patch_id, variant, execution, task_name)
        self.compact_data = None
        self.compacting_find_data = None
        self.compacting_update_data = None

    def get_compact_data(self):
        if self.compact_data is None:
            self.compact_data = get_data(self.json_path("Compactor.compact"))
        return self.compact_data
    def get_compacting_find_data(self):
        if self.compacting_find_data is None:
            self.compacting_find_data = get_data(self.json_path("ContinuousRWCompactInProgress.find"))
        return self.compacting_find_data
    def get_compacting_update_data(self):
        if self.compacting_update_data is None:
            self.compacting_update_data = get_data(self.json_path("ContinuousRWCompactInProgress.update"))
        return self.compacting_update_data
    def plot_compact_data(self, x, y, line=False, start=None, end=None, **kwargs):
        title=f"{self.variant}-{self.task_name} compacts {y}"
        return self._plot_line_or_scatter(self.get_compact_data().diff_data, x, y, line, start, end, title=title, **kwargs)
    def plot_combined_find_data(self, x, y, line=False, start=None, end=None, **kwargs):
        ax = self.plot_find_data(x, y, line, start, end, **kwargs)
        df = self.get_compacting_find_data().diff_data
        return self._plot_line_or_scatter(df, x, y, line, start, end, ax=ax, color="orange", **kwargs)
    def plot_combined_update_data(self, x, y, line=False, start=None, end=None, **kwargs):
        ax = self.plot_update_data(x, y, line, start, end, **kwargs)
        df = self.get_compacting_update_data().diff_data
        return self._plot_line_or_scatter(df, x, y, line, start, end, ax=ax, color="orange", **kwargs)


In [None]:
def dump_dataframe(df):
    outfile = "./temp_dataframe.out"
    with open(outfile, "wt") as ostream:
        cols = ["total_ops", "ts", "actor_id", "throughput", "duration", "pure_latency(ms)", "overhead_latency(ms)", "total_latency(ms)"]
        df = df[cols].to_string()
        ostream.write(df)

In [None]:

VARIANTS = {"replset": "linux-3-node-replSet-qebench", "sharded": "linux-shard-lite-qebench"}
NOCOMPACT_WORKDIR="../datasets/genny/continuousrw_nocompact"
COMPACT_WORKDIR="../datasets/genny/continuousrw_compact"

nocompact_patchid = "634405351e2d171a12b2a0a8"
nocompact_taskname = "genny_qebench_continuousrw_nocompact"
nocompact_replset_wld = ContinuousRWWorkload(NOCOMPACT_WORKDIR, nocompact_patchid, VARIANTS["replset"], 0, nocompact_taskname)
nocompact_sharded_wld = ContinuousRWWorkload(NOCOMPACT_WORKDIR, nocompact_patchid, VARIANTS["sharded"], 0, nocompact_taskname)

compact_replset_executions = {
    "genny_qebench_continuousrw_compact_1024" : ("634451da57e85a772174286f", [0]),
    "genny_qebench_continuousrw_compact_128" : ("634451da57e85a772174286f", [0]),
    "genny_qebench_continuousrw_compact_256" : ("634451da57e85a772174286f", [0]),
    "genny_qebench_continuousrw_compact_512" : ("634451da57e85a772174286f", [0]),
}
compact_replset_wlds = {
    task: ContinuousRWWorkloadWithCompact(COMPACT_WORKDIR, tup[0], VARIANTS["replset"], tup[1][0], task)
    for task, tup in compact_replset_executions.items()
}
compact_sharded_executions = {
    "genny_qebench_continuousrw_compact_1024" : ("6347058fe3c331787727d7c3", [0]),
    "genny_qebench_continuousrw_compact_128" : ("6347058fe3c331787727d7c3", [1]),
    "genny_qebench_continuousrw_compact_256" : ("6347058fe3c331787727d7c3", [0]),
    "genny_qebench_continuousrw_compact_512" : ("6347058fe3c331787727d7c3", [0]),
}
compact_sharded_wlds = {
    task: ContinuousRWWorkloadWithCompact(COMPACT_WORKDIR, tup[0], VARIANTS["sharded"], tup[1][0], task)
    for task, tup in compact_sharded_executions.items()
}


In [None]:
row="total_ops"
col_latency="pure_latency(ms)"
pp = pprint.PrettyPrinter()
threads = 32

In [None]:
nocompact_replset_wld.plot_rw_data(row, col_latency)
nocompact_replset_wld.plot_insert_data(row, col_latency)
nocompact_replset_wld.plot_find_data(row, col_latency)
nocompact_replset_wld.plot_update_data(row, col_latency)
nocompact_replset_wld.plot_throughput_data(row)
nocompact_replset_wld.print_all_summary_statistics()

In [None]:
wld = nocompact_sharded_wld
df = wld.get_insert_data().diff_data
title=f"{wld.variant}-{wld.task_name} insert latency stats"
plot_latency_stats(df, row, title=title, regr="log")
df = wld.get_find_data().diff_data
title=f"{wld.variant}-{wld.task_name} find latency stats"
plot_latency_stats(df, row, title=title, regr="line")
df = wld.get_update_data().diff_data
title=f"{wld.variant}-{wld.task_name} update latency stats"
plot_latency_stats(df, row, title=title, regr="line")

In [None]:
nocompact_sharded_wld.plot_rw_data(row, col_latency)
nocompact_sharded_wld.plot_insert_data(row, col_latency)
nocompact_sharded_wld.plot_find_data(row, col_latency)
nocompact_sharded_wld.plot_update_data(row, col_latency)
nocompact_sharded_wld.plot_throughput_data(row)
nocompact_sharded_wld.print_all_summary_statistics()

In [None]:
filter=[128, 256, 512, 1024]
for task, wld in compact_replset_wlds.items():
    if int(task.split('_')[-1]) not in filter:
        continue
    wld.plot_rw_data(row, col_latency)
    wld.plot_insert_data(row, col_latency)
    wld.plot_combined_find_data("ts", col_latency, True)
    wld.plot_combined_update_data("ts", col_latency, True)
    wld.plot_compact_data(row, col_latency, False)
    wld.print_all_summary_statistics()

In [None]:
def get_moving_avg(df, k, x, y):
    calc_stats = pandas.DataFrame()
    calc_stats[x] = df[x]
    calc_stats["sma"] = df[y].rolling(k).mean()
    return calc_stats

def print_compaction_latency_csv(workload):
    compact_df = workload.get_compact_data().diff_data[["ts", "total_ops", "pure_latency(ms)"]]
    compact_df["pure_latency(mins)"] = compact_df["pure_latency(ms)"] / (60*1000)
    compact_df.rename(columns = {"ts": "timestamp"}, inplace=True)
    print(compact_df.to_csv(index_label="execution"))

In [None]:
# Print compaction latencies as csv
for task, wld in compact_replset_wlds.items():
    print(task)
    print_compaction_latency_csv(wld)

In [None]:
# Calculate how much the moving average latency improves after compaction
filter=[128, 256, 512, 1024]
for task, wld in compact_replset_wlds.items():
    iterations = int(task.split('_')[-1])
    if iterations not in filter:
        continue

    find_df = wld.get_find_data().diff_data
    update_df = wld.get_update_data().diff_data
    insert_df = wld.get_insert_data().diff_data

    # Every CRUD phase performs (iterations * threads) insert+find+update cycles
    cycles_per_phase = iterations * threads
    num_phases = find_df.shape[0] // cycles_per_phase

    # Moving average window size (ie. last k data points to average)
    sma_k = 1024

    for opname, df in {"find": find_df, "update": update_df, "insert": insert_df}.items():
        diff_df = pandas.DataFrame(columns=["first sma", "max sma", "prev max sma"])
        prev_max = None

        for i in range(num_phases):
            start = i * cycles_per_phase
            end = ((i+1) * cycles_per_phase) if i < (num_phases-1) else None
            sma_df = get_moving_avg(df[start:end], sma_k, "ts", col_latency)
            # sma_df.plot(x="ts", figsize=(20,20))
            
            first_sma = sma_df.iloc[sma_k - 1]["sma"]
            max_sma = sma_df["sma"].max()
            diff_df.loc[i] = [first_sma, max_sma, prev_max]
            prev_max = max_sma
    
        diff_df["change"] = diff_df["first sma"] - diff_df["prev max sma"]
        diff_df["percent drop"] = (diff_df["change"] * 100) / diff_df["prev max sma"]

        print(f"Change in {opname} moving mean latency after compaction every {iterations} CRUD iterations:")
        pp.pprint(diff_df)

In [None]:
# Calculate how much the throughput improves after compaction
filter=[512]#[128, 256, 512, 1024]
for task, wld in compact_sharded_wlds.items():
    iterations = int(task.split('_')[-1])
    if iterations not in filter:
        continue

    crud_phase_dfs = {
        "find": wld.get_find_data().diff_data,
        "update": wld.get_update_data().diff_data,
        "insert": wld.get_insert_data().diff_data
    }
    compact_phase_crud_dfs = {
        "find": wld.get_compacting_find_data().diff_data,
        "update": wld.get_compacting_update_data().diff_data,
        "insert": None
    }
    compact_df = wld.get_compact_data().diff_data

    # Every CRUD cycle performs (iterations * threads) insert, find, & update operations
    ops_per_cycle = iterations * threads
    num_crud_cycles = wld.get_crud_data().diff_data.shape[0] // ops_per_cycle
    num_compact_cycles = num_crud_cycles - 1

    # throughput sample window size (ie. throughput of last N seconds)
    duration_secs = 60

    for opname, crud_phase_df in crud_phase_dfs.items():
        crud_phase_tp_df = pandas.DataFrame(columns=["start ts", "end ts", "tail opcount", 
            "head opcount", "tail throughput", "head throughput", "prev head opcount",
            "prev head throughput"])
        compact_phase_tp_df = pandas.DataFrame(columns=["throughput", "compact latency(ms)"])
        prev_head_tp = None
        prev_head_opct = None

        # The CRUD phase data frame (crud_phase_df) contains metrics for all CRUD cycles.
        # To calculate the head and tail throughput for each CRUD cycle, it must
        # be segmented into per-cycle data frames (crud_cycle_df).
        for i in range(num_crud_cycles):
            start = i * ops_per_cycle
            end = ((i+1) * ops_per_cycle) if i < (num_crud_cycles-1) else crud_phase_df.shape[0]

            crud_cycle_df = crud_phase_df[start:end]

            start_ts = crud_cycle_df["ts"].iloc[0]
            end_ts = crud_cycle_df["ts"].iloc[-1]

            # calculate the tail throughput, or the throughput at the first duration_secs of the cycle
            cutoff_ts = start_ts + pandas.DateOffset(seconds=duration_secs)
            tail_opct = crud_cycle_df[crud_cycle_df["ts"] <= cutoff_ts].shape[0]
            tail_tp = tail_opct / duration_secs

            # calculate the head throughput, or the throughput at the last duration_secs of the cycle
            cutoff_ts = end_ts - pandas.DateOffset(seconds=duration_secs)
            head_opct = crud_cycle_df[crud_cycle_df["ts"] >= cutoff_ts].shape[0]
            head_tp = head_opct / duration_secs

            # add calc'd values as a row in the throughput dataframe
            crud_phase_tp_df.loc[i] = [start_ts, end_ts, tail_opct, head_opct, tail_tp, head_tp, 
                prev_head_opct, prev_head_tp]
            prev_head_opct = head_opct
            prev_head_tp = head_tp

        # The Compact phase CRUD data frame (compact_phase_crud_df) contains metrics for
        # all CRUD cycles happening while compaction is in progress.
        # To calculate the throughput, it must first be segmented into per-cycle data frames.
        # The boundary of each segment is determined by the last and first timestamps of the
        # CRUD phase cycles that preceded and followed the current Compact phase cycle.
        for i in range(num_compact_cycles):
            compact_phase_crud_df = compact_phase_crud_dfs[opname]
            tp = None
            if compact_phase_crud_df is not None:
                # The end timestamp of the CRUD phase cycle that immediately preceded
                # this Compact phase cycle is the lower bound of this segment.
                start_ts = crud_phase_tp_df.iloc[i]["end ts"]

                # The start timestamp of the CRUD phase cycle that immediately followed
                # this Compact phase sycle is the upper bound of this segment.
                end_ts = crud_phase_tp_df.iloc[i+1]["start ts"]

                crud_cycle_df = compact_phase_crud_df.loc[(compact_phase_crud_df["ts"] >= start_ts) & (compact_phase_crud_df["ts"] <= end_ts)]
                tp = crud_cycle_df.shape[0] / (end_ts - start_ts).total_seconds()

            compact_phase_tp_df.loc[i] = [tp, compact_df.iloc[i]["pure_latency(ms)"]]

        crud_phase_tp_df["change"] = crud_phase_tp_df["tail throughput"] - crud_phase_tp_df["prev head throughput"]
        crud_phase_tp_df["percent change"] = (crud_phase_tp_df["change"] * 100) / crud_phase_tp_df["prev head throughput"]

        agg_tp_df = pandas.DataFrame()
        crud_phase_tp_df = crud_phase_tp_df[1:].reset_index()
        agg_tp_df["compact latency(ms)"] = compact_phase_tp_df["compact latency(ms)"]
        agg_tp_df["latency change"] = agg_tp_df["compact latency(ms)"].diff()
        agg_tp_df["opcount before"] = crud_phase_tp_df["prev head opcount"]
        agg_tp_df["opcount after"] = crud_phase_tp_df["tail opcount"]
        agg_tp_df["throughput window(secs)"] = duration_secs
        agg_tp_df["throughput before"] = crud_phase_tp_df["prev head throughput"]
        agg_tp_df = agg_tp_df.reindex()
        agg_tp_df["throughput during"] = compact_phase_tp_df["throughput"]
        agg_tp_df["throughput after"] = crud_phase_tp_df["tail throughput"]
        agg_tp_df["throughput change"] = crud_phase_tp_df["change"]
        agg_tp_df["throughput percent change"] = crud_phase_tp_df["percent change"]

        title = f"Change in {opname} throughput vs compaction latency: N={iterations}"
        print(f"{title}:")

        ax = agg_tp_df[["throughput change", "throughput percent change", "compact latency(ms)"]].plot(
            xlabel="nth compaction", xticks=agg_tp_df.index, sharex=False, figsize=(10,10), title=title, subplots=True)
        ax[0].set_ylabel("ops per second")
        ax[0].set_xticks(agg_tp_df.index)
        ax[1].set_ylabel("percent")
        ax[2].set_ylabel("milliseconds")

        agg_tp_df[["throughput before", "throughput during", "throughput after"]].plot(
            xticks=agg_tp_df.index, sharex=False, figsize=(10,10), title=title)

        print(agg_tp_df.to_csv(index_label="compact execution"))


In [None]:
# Calculate the total throughput after 100k CRUD cycles with different
# compaction intervals.
# Compare with the base workload that performed no compaction.

base_df = nocompact_replset_wld.get_crud_data().diff_data
base_df = base_df[base_df["total_ops"] == 100000][["duration", "total_ops", "throughput"]]
print("=== Throughput of 100k CRUD cycles without compaction:")
pp.pprint(base_df)

filter=[128, 256, 512, 1024]
for task, wld in compact_replset_wlds.items():
    iterations = int(task.split('_')[-1])
    if iterations not in filter:
        continue
    cycles_per_phase = iterations * threads

    measure_df = wld.get_crud_data().diff_data
    measure_df = measure_df[measure_df["total_ops"] == 100000][["duration", "total_ops", "throughput"]]
    print(f"=== Throughput of 100k CRUD cycles with compaction every {cycles_per_phase} cycles:")
    pp.pprint(measure_df)


In [None]:
filter=[128, 256]
for task, wld in compact_sharded_wlds.items():
    if int(task.split('_')[-1]) not in filter:
        continue
    wld.plot_rw_data(row, col_latency)
    wld.plot_insert_data(row, col_latency)
    wld.plot_combined_find_data("ts", col_latency, True)
    wld.plot_combined_update_data("ts", col_latency, True)
    wld.plot_compact_data(row, col_latency, False)
    wld.print_all_summary_statistics()