# Plots

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator, FuncFormatter
import numpy as np

In [None]:
nats_disk = pd.read_csv("../nats_disk_usage.csv")
nats_metrics = pd.read_csv("../nats_metrics.csv")
kafka_disk  = pd.read_csv("../kafka_disk_usage.csv")
kafka_metrics = pd.read_csv("../kafka_metrics.csv")

nats_cluster_disk = pd.read_csv("../nats_cluster_disk_usage.csv")
nats_cluster_metrics = pd.read_csv("../nats_cluster_metrics.csv")
kafka_cluster_disk  = pd.read_csv("../kafka_cluster_disk_usage.csv")
kafka_cluster_metrics = pd.read_csv("../kafka_cluster_metrics.csv")

def convert(s, to_unit='mb'):
    units_in_bytes = {
        'kB': 1_000,
        'MB': 1_000_000,
        'MiB': 1024**2,
        'GB': 1_000_000_000,
        'GiB': 1024**3,
        'B': 1,
    }
    
    for unit, bytes_val in units_in_bytes.items():
        if s.endswith(unit):
            num = float(s.replace(unit, ''))
            if to_unit == 'kb':
                return num * bytes_val / 1_000
            if to_unit == 'mb':
                return num * bytes_val / 1_000_000
            elif to_unit == 'gb':
                return num * bytes_val / 1_000_000_000
    return float(s)

for df in [kafka_metrics, nats_metrics, kafka_cluster_metrics, nats_cluster_metrics]:
    # convert units
    df['mem'] = df['mem'].apply(lambda x: convert(x, to_unit='mb'))
    df['disk_read'] = df['disk_read'].apply(lambda x: convert(x, to_unit='kb'))
    df['disk_write'] = df['disk_write'].apply(lambda x: convert(x, to_unit='gb'))
    # get seconds
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['seconds'] = (df['timestamp'] - df['timestamp'].iloc[0]).dt.total_seconds()

In [None]:
kafka_color = "#d62728"
nats_color = "#1f77b4"
line_styles = ['-', '--', '-.']

metrics = ['cpu', 'mem', 'disk_write'] # 'disk_read'

y_axis_labels = {
    'cpu': 'CPU (%)',
    'mem': 'Memory (MB)',
    'disk_read': 'Disk Read (kB)',
    'disk_write': 'Disk Write (GB)',
}

## Single Node Results

### Metrics

In [None]:
def plot_metrics(nats_data, kafka_data, cluster=False):
    legend_y_anchor = 1.28 if cluster else 1.15
    legend_ncols = 3 if cluster else 2

    for m in metrics:
        plt.figure(figsize=(8, 5))
        
        for i, (node, group) in enumerate(nats_data.groupby("node")):
            plt.plot(group.index, group[m], label=node, color=nats_color, linestyle=line_styles[i % len(line_styles)])

        for i, (node, group) in enumerate(kafka_data.groupby("node")):
            plt.plot(group.index, group[m], label=node, color=kafka_color, linestyle=line_styles[i % len(line_styles)])

        plt.xlabel("Time (s)", fontsize=14, labelpad=15)
        plt.ylabel(y_axis_labels.get(m), fontsize=14, labelpad=15)
        plt.legend(loc='upper center', fontsize=14, bbox_to_anchor=(0.5, legend_y_anchor), ncols=legend_ncols)
        plt.tick_params(labelsize=14)
        plt.grid(False)
        plt.gca().yaxis.set_major_locator(MaxNLocator(integer=True))
        plt.gca().yaxis.set_major_formatter(plt.FuncFormatter(lambda x, _: f'{x:,.0f}'))
        plt.tight_layout()
        plt.show()

plot_metrics(nats_metrics, kafka_metrics)
plot_metrics(nats_cluster_metrics, kafka_cluster_metrics, True)

### Total disk usage single node

In [None]:
disk = pd.concat([nats_disk, kafka_disk], axis=0)
disk['increase_mb'] = disk['disk_after_mb'] - disk['disk_before_mb']

disk

### Total disk usage cluster

In [None]:
disk_cluster = pd.concat([nats_cluster_disk, kafka_cluster_disk], axis=0)
disk_cluster['increase_mb'] = disk_cluster['disk_after_mb'] - disk_cluster['disk_before_mb']

disk_cluster

## Plot latencies and throughput

In [None]:
def plot_latency(kafka_ms, nats_ms):
    latency_metrics = ['Median', '95th %', '99th %']
    kafka = [x / 1000 for x in kafka_ms]
    nats = [x / 1000 for x in nats_ms]

    x = np.arange(len(latency_metrics))
    width = 0.35

    _, ax = plt.subplots(figsize=(8, 5))

    bars1 = ax.bar(x - width/2, kafka, width, label='Kafka', color=kafka_color)
    bars2 = ax.bar(x + width/2, nats, width, label='NATS', color=nats_color)

    ax.set_ylabel('Latency (s)', fontsize=14, labelpad=15)
    ax.set_xticks(x)
    ax.set_xticklabels(latency_metrics, fontsize=14)
    ax.tick_params(labelsize=14)
    ax.legend(fontsize=14, loc='upper left')

    for bars in [bars1, bars2]:
        for bar in bars:
            height = bar.get_height()
            ax.annotate(f'{height:.2f}', xy=(bar.get_x() + bar.get_width() / 2, height), xytext=(0, 3),textcoords="offset points",ha='center', fontsize=14)

    plt.ylim(top=max(kafka + nats) * 1.15)
    ax.yaxis.set_major_formatter(FuncFormatter(lambda x, _: f'{int(x)}'))
    plt.tight_layout()
    plt.show()

plot_latency([19193, 26709, 28723], [338, 654, 742])
plot_latency([35165, 48855, 54492], [487, 935, 981])

In [None]:
# single node, three node
kafka = [107213, 75706]
nats = [71169, 32819]

x = range(2)
width = 0.35

plt.figure(figsize=(8, 5))

bars1 = plt.bar([i - width/2 for i in x], kafka, width=width, label='Kafka', color=kafka_color)
bars2 = plt.bar([i + width/2 for i in x], nats, width=width, label='NATS', color=nats_color)

plt.ylabel("Throughput (messages/sec)", fontsize=14, labelpad=15)
plt.xticks(x, ["Single Node", "Three Node"], fontsize=14)
plt.yticks(fontsize=14)
plt.gca().yaxis.set_major_locator(MaxNLocator(integer=True))
plt.gca().yaxis.set_major_formatter(FuncFormatter(lambda x, _: f'{int(x)}'))
plt.ylim(top=max(kafka + nats) * 1.15)
plt.legend(fontsize=14)

for bars in [bars1, bars2]:
    for bar in bars:
        yval = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2, yval + 1500, f'{int(yval)}', ha='center', fontsize=14)

plt.tight_layout()
plt.show()