In [1]:
!make docker-image > /dev/null 2>&1

In [2]:
!CONTAINER_CMD="bash -lc 'make install-ycsb" make docker > /dev/null 2>&1

In [3]:
from pathlib import Path
import pexpect
import os
import time

""" Collector class has helper methods to interact with kermit"""
class Collector: 
    def __init__(self, config: Path):
        self.env = os.environ.copy()
        self.env["INTERACTIVE"] = "it"
        self.env["CONTAINER_CMD"] = f"bash -lc 'KERNMLOPS_CONFIG_FILE={config} make collect-data'"
        self.collect_process : pexpect.spawn | None = None

    def start_collection(self, logfile=None):
        self.collect_process = pexpect.spawn("make docker", env=self.env, timeout=None, logfile=logfile)
        self.collect_process.expect_exact(["Started benchmark"])

    def _after_run_generate_file_data() -> dict[str, list[Path]]:
        start_path : Path = Path("./data")
        list_of_collect_id_dirs = start_path.glob("*/*/*")
        latest_collect_id = max(list_of_collect_id_dirs, key=os.path.getctime)
        list_of_files = latest_collect_id.glob("*.*.parquet")
        output = {}
        for f in list_of_files:
            index = str(f).removeprefix(str(f.parent) + "/").split(".")[0]
            if index not in output.keys():
                output[index] = []
            output[index].append(f)
        return output
        
    def wait(self) -> int:
        if self.collect_process is None:
            return
        self.collect_process.expect([pexpect.EOF])
        self.collect_process.wait()
        return Collector._after_run_generate_file_data()
        
    def stop_collection(self):
        if self.collect_process is None:
            return
        self.collect_process.sendline("END")
        return self.wait()

There are two ways to run kermit:
- With the raw config
- With a pre-programmed benchmark config

In [7]:
# Test tcp_v4_rcv hook with iperf3 benchmark
import polars as pl
import matplotlib.pyplot as plt

# Create collector with iperf configuration
collect = Collector("./config/iperf.yaml")

# Start collection and run benchmark
print("Starting iperf3 TCP benchmark...")
print("This will generate high-volume TCP traffic for 30 seconds")
collect.start_collection(None)

# Wait for benchmark to complete
data = collect.wait()

# Analyze TCP traffic
print("\nAnalyzing TCP traffic from iperf3 benchmark:")
tcp_df = pl.read_parquet(data["tcp_v4_rcv"][0])

# Basic statistics
print(f"\nTotal TCP events captured: {len(tcp_df)}")

# Analyze by process
iperf_server = tcp_df.filter(pl.col("comm").str.contains("iperf3.*-s"))
iperf_client = tcp_df.filter(pl.col("comm").str.contains("iperf3").and_(~pl.col("comm").str.contains("-s")))

print(f"\nProcess breakdown:")
print(f"- iperf3 server: {len(iperf_server)} events")
print(f"- iperf3 client: {len(iperf_client)} events")

# Analyze port 5201 traffic (iperf3 default port)
port_5201 = tcp_df.filter((pl.col("dport") == 5201) | (pl.col("sport") == 5201))
print(f"- Port 5201 traffic: {len(port_5201)} events")

# Branch distribution
print("\nTCP state distribution:")
branch_dist = tcp_df.group_by("branch_name").agg([
    pl.count().alias("count"),
    (pl.count() / len(tcp_df) * 100).alias("percentage")
]).sort("count", descending=True)

for row in branch_dist.head(10).iter_rows():
    print(f"- {row[0]}: {row[1]:,} events ({row[2]:.1f}%)")

# Connection analysis
new_connections = tcp_df.filter(pl.col("branch_name") == "new_syn_recv")
print(f"\nNew TCP connections: {len(new_connections)}")
print(f"Connections per stream: ~{len(new_connections) / 4:.0f}")  # 4 parallel streams

# Timeline visualization
plt.figure(figsize=(14, 8))

# Convert timestamps to seconds
tcp_times = (tcp_df['uptime_timestamp'] - tcp_df['uptime_timestamp'].min()) / 1_000_000

# Plot 1: Overall TCP events timeline
plt.subplot(3, 1, 1)
plt.hist(tcp_times, bins=100, alpha=0.7, color='blue')
plt.title('TCP Events Timeline - All Traffic')
plt.xlabel('Time (seconds)')
plt.ylabel('Event Count')

# Plot 2: Server vs Client events
plt.subplot(3, 1, 2)
server_times = (iperf_server['uptime_timestamp'] - tcp_df['uptime_timestamp'].min()) / 1_000_000
client_times = (iperf_client['uptime_timestamp'] - tcp_df['uptime_timestamp'].min()) / 1_000_000

plt.hist(server_times, bins=50, alpha=0.5, color='green', label='Server')
plt.hist(client_times, bins=50, alpha=0.5, color='red', label='Client')
plt.title('TCP Events - Server vs Client')
plt.xlabel('Time (seconds)')
plt.ylabel('Event Count')
plt.legend()

# Plot 3: TCP states over time
plt.subplot(3, 1, 3)
# Focus on main states
main_states = ['entry', 'time_wait', 'socket_busy', 'new_syn_recv']
colors = ['blue', 'green', 'red', 'orange']

for state, color in zip(main_states, colors):
    state_df = tcp_df.filter(pl.col("branch_name") == state)
    if len(state_df) > 0:
        state_times = (state_df['uptime_timestamp'] - tcp_df['uptime_timestamp'].min()) / 1_000_000
        plt.hist(state_times, bins=30, alpha=0.5, label=state, color=color)

plt.title('TCP States Over Time')
plt.xlabel('Time (seconds)')
plt.ylabel('Event Count')
plt.legend()

plt.tight_layout()
plt.savefig('iperf_tcp_analysis.png')
plt.show()

# Bandwidth estimation (rough)
# Each TCP event represents some data transfer activity
total_duration = (tcp_df['uptime_timestamp'].max() - tcp_df['uptime_timestamp'].min()) / 1_000_000
events_per_second = len(tcp_df) / total_duration

print(f"\nThroughput metrics:")
print(f"- Total duration: {total_duration:.1f} seconds")
print(f"- TCP events/second: {events_per_second:.0f}")

# Drop analysis
drops = tcp_df.filter(pl.col("drop_reason") > 0)
if len(drops) > 0:
    print(f"\nDropped packets: {len(drops)}")
    drop_dist = drops.group_by("drop_reason_name").len()
    for row in drop_dist.iter_rows():
        print(f"- {row[0]}: {row[1]} drops")
else:
    print("\nNo dropped packets detected - excellent!")

# Summary
print(f"\nSummary:")
print(f"- iperf3 successfully generated high-volume TCP traffic")
print(f"- Captured detailed TCP state transitions")
print(f"- Parallel streams created multiple concurrent connections")
print(f"- Data suitable for TCP performance analysis")

Starting iperf3 TCP benchmark...
This will generate high-volume TCP traffic for 30 seconds


EOF: End Of File (EOF). Exception style platform.
<pexpect.pty_spawn.spawn object at 0x7291e548c800>
command: /usr/bin/make
args: ['/usr/bin/make', 'docker']
buffer (last 100 chars): b''
before (last 100 chars): b'ors.BenchmarkError: iperf3 server not responding\r\n\r\nmake: *** [Makefile:107: collect-data] Error 1\r\n'
after: <class 'pexpect.exceptions.EOF'>
match: None
match_index: None
exitstatus: None
flag_eof: True
pid: 50962
child_fd: 58
closed: False
timeout: None
delimiter: <class 'pexpect.exceptions.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1
searcher: searcher_string:
    0: b'Started benchmark'

In [5]:
import subprocess
import sys


# New TCP Collector
collect = Collector("./config/raw_overrides.yaml")
collect.start_collection()

# Generate some TCP traffic
!curl https://www.google.com > /dev/null
!nc -l 8080 &  # Listen on port 8080
!echo "test" | nc localhost 8080 

data = collect.stop_collection()

# Analyze TCP branches
import polars as pl
tcp_df = pl.read_parquet(data["tcp_v4_rcv"][0])

# Show branch distribution
print(tcp_df.group_by("branch_name").count().sort("count", descending=True))

# Show drop reasons
drops = tcp_df.filter(pl.col("drop_reason") > 0)
print(drops.group_by("drop_reason_name").count())

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 17669    0 17669    0     0  90050      0 --:--:-- --:--:-- --:--:-- 90147
shape: (5, 2)
┌──────────────┬───────┐
│ branch_name  ┆ count │
│ ---          ┆ ---   │
│ str          ┆ u32   │
╞══════════════╪═══════╡
│ entry        ┆ 114   │
│ new_syn_recv ┆ 113   │
│ time_wait    ┆ 113   │
│ socket_busy  ┆ 7     │
│ no_socket    ┆ 1     │
└──────────────┴───────┘
shape: (1, 2)
┌──────────────────┬───────┐
│ drop_reason_name ┆ count │
│ ---              ┆ ---   │
│ str              ┆ u32   │
╞══════════════════╪═══════╡
│ no_socket        ┆ 1     │
└──────────────────┴───────┘


  print(tcp_df.group_by("branch_name").count().sort("count", descending=True))
  print(drops.group_by("drop_reason_name").count())


In [6]:
collect = Collector("./config/redis_never.yaml")
collect.start_collection(None)
data = collect.stop_collection()

# Analyze TCP branches
import polars as pl
tcp_df = pl.read_parquet(data["tcp_v4_rcv"][0])

# Show branch distribution
print(tcp_df.group_by("branch_name").count().sort("count", descending=True))

# Show drop reasons
drops = tcp_df.filter(pl.col("drop_reason") > 0)
print(drops.group_by("drop_reason_name").count())

# Analyze results
import polars as pl
df = pl.read_parquet(data["page_fault"])
# print(df.head())
print(f"Total faults: {len(df)}")
print(f"Major faults: {df.filter(pl.col('is_major')).height}")
# Check fault patterns
df_filtered = df.filter(
    (pl.col('comm') == 'page_fault') & 
    (pl.col('is_major') == True)
)
print(f"Major faults for page_fault app: {len(df_filtered)}")

major_summary = df.filter(pl.col('is_major')).group_by('comm').len()
print(major_summary)

shape: (7, 2)
┌──────────────┬───────┐
│ branch_name  ┆ count │
│ ---          ┆ ---   │
│ str          ┆ u32   │
╞══════════════╪═══════╡
│ entry        ┆ 13727 │
│ time_wait    ┆ 13711 │
│ new_syn_recv ┆ 13706 │
│ socket_busy  ┆ 1020  │
│ no_socket    ┆ 16    │
│ listen_state ┆ 10    │
│ not_for_host ┆ 5     │
└──────────────┴───────┘
shape: (2, 2)
┌──────────────────┬───────┐
│ drop_reason_name ┆ count │
│ ---              ┆ ---   │
│ str              ┆ u32   │
╞══════════════════╪═══════╡
│ no_socket        ┆ 16    │
│ not_specified    ┆ 5     │
└──────────────────┴───────┘
Total faults: 422246
Major faults: 9293
Major faults for page_fault app: 0
shape: (18, 2)
┌─────────────────┬─────┐
│ comm            ┆ len │
│ ---             ┆ --- │
│ str             ┆ u32 │
╞═════════════════╪═════╡
│ python          ┆ 44  │
│ dirname         ┆ 13  │
│ ycsb            ┆ 46  │
│ redis-cli       ┆ 107 │
│ C1 CompilerThre ┆ 1   │
│ …               ┆ …   │
│ systemd-journal ┆ 2   │
│ C2 Compiler

  print(tcp_df.group_by("branch_name").count().sort("count", descending=True))
  print(drops.group_by("drop_reason_name").count())


In [7]:
# Create collector with XSBench configuration
collect = Collector("./config/xsbench.yaml")

# Start collection and run XSBench
print("Starting collection with XSBench workload...")
collect.start_collection(None)

# Wait for XSBench to complete
print("Running XSBench benchmark...")
data = collect.stop_collection()

# Analyze TCP traffic generated by XSBench
print("\nAnalyzing TCP traffic from XSBench:")
tcp_df = pl.read_parquet(data["tcp_v4_rcv"][0])

# Show branch distribution
print("\nTCP branch distribution:")
print(tcp_df.group_by("branch_name").len().sort("len", descending=True))

# Show drop reasons if any
drops = tcp_df.filter(pl.col("drop_reason") > 0)
if len(drops) > 0:
    print("\nDropped packets:")
    print(drops.group_by("drop_reason_name").len())
else:
    print("\nNo dropped packets detected")

# Show process-specific TCP activity
print("\nTCP activity by process:")
process_tcp = tcp_df.group_by("comm").len().sort("len", descending=True).head(10)
print(process_tcp)

# Check for XSBench-specific activity
xsbench_traffic = tcp_df.filter(pl.col("comm").str.contains("XSBench"))
if len(xsbench_traffic) > 0:
    print(f"\nXSBench generated {len(xsbench_traffic)} TCP events")
else:
    print("\nNo direct TCP traffic from XSBench process detected")


Starting collection with XSBench workload...
Running XSBench benchmark...

Analyzing TCP traffic from XSBench:

TCP branch distribution:
shape: (4, 2)
┌──────────────┬─────┐
│ branch_name  ┆ len │
│ ---          ┆ --- │
│ str          ┆ u32 │
╞══════════════╪═════╡
│ time_wait    ┆ 19  │
│ new_syn_recv ┆ 19  │
│ entry        ┆ 19  │
│ socket_busy  ┆ 6   │
└──────────────┴─────┘

No dropped packets detected

TCP activity by process:
shape: (5, 2)
┌─────────────────┬─────┐
│ comm            ┆ len │
│ ---             ┆ --- │
│ str             ┆ u32 │
╞═════════════════╪═════╡
│ code-2901c5ac6d ┆ 21  │
│ swapper/14      ┆ 15  │
│ ZMQbg/IO/0      ┆ 14  │
│ jupyter-noteboo ┆ 7   │
│ sshd            ┆ 6   │
└─────────────────┴─────┘

No direct TCP traffic from XSBench process detected


In [None]:
# New Page Fault Collector
collect = Collector("./config/raw_overrides.yaml")
log = open("page_fault_log.txt", "bw")
    
collect.start_collection(logfile=log)

# Run a program that causes exactly 1 page fault
!sudo ./page_fault

data = collect.stop_collection()
log.close()

# Check what was collected
# print("Available keys:", data.keys())
# print(data)

# Read the log to see if there were errors
# with open("page_fault_log.txt", "r") as f:
#    print("Log contents:", f.read())

# Analyze results
import polars as pl
df = pl.read_parquet(data["page_fault"])
# print(df.head())
print(f"Total faults: {len(df)}")
print(f"Major faults: {df.filter(pl.col('is_major')).height}")
# Check fault patterns
df_filtered = df.filter(
    (pl.col('comm') == 'page_fault') & 
    (pl.col('is_major') == True)
)
print(f"Major faults for page_fault app: {len(df_filtered)}")

major_summary = df.filter(pl.col('is_major')).group_by('comm').len()
print(major_summary)

In [None]:
collect = Collector("./config/raw_overrides.yaml")
# This creates a raw collector, I suggest looking into this file to learn more

w = open("hello.txt", "bw")
collect.start_collection(logfile=w)
print("Collection has started")
# Start collection

f = open("blah.txt", "w")
bench_test = subprocess.Popen(["cat", "defaults.yaml"], stdout=f)
bench_test.wait()
# Run benchmark application

# Run a program that causes page faults
!python -c "import numpy as np; a = np.zeros((1000, 1000, 100))" & echo $!
!ps -a

print("Exit application")
raw_coll_info = collect.stop_collection()
print(raw_coll_info)
# Stop the Collector

In [None]:
# Analyze page fault results here
import polars as pl
df = pl.read_parquet(raw_coll_info["page_fault"])
print(df.head())
print(f"Total faults: {len(df)}")
print(f"Major faults: {df.filter(pl.col('is_major')).height}")

In [None]:
collect = Collector("./config/start_overrides.yaml")
# This is a simple redis benchmark config

w = open("hello.txt", "bw")
collect.start_collection(logfile=w)
# Start collection

start_coll_info = collect.wait()
#Wait for collector to finish

In [None]:
print(raw_coll_info.keys())
print(start_coll_info.keys())

Now let's try to examine some of the system information from this.
I use polars, you can use whatever you like as far as data frames go, so long as they can read parquet files.

In [None]:
import polars as pl
df = pl.read_parquet(raw_coll_info["process_trace"])
print(df.columns)

In [None]:
import polars as pl
df = pl.read_parquet(raw_coll_info["process_trace"])

def filter_process_trace(process_trace_df: pl.DataFrame) -> pl.DataFrame :
    df = process_trace_df
    # Filter just the processes
    df = df.filter(pl.col("tgid") == pl.col("pid")).drop("collection_id")

    # Find the last name of each process
    start_df = df.sort(pl.col("ts_ns"), descending = True)
    helper_dict = {}
    for row in start_df.iter_rows():
        pid = row[0]
        comm = row[3]
        if pid in helper_dict.keys() or comm == "": 
            continue
        helper_dict[pid] = comm

    # Separate the start and end
    full_df = start_df.with_columns(pl.col("pid").map_elements(lambda x : helper_dict.get(x, ""), return_dtype=str).alias("full_name"))
    full_df = full_df.drop(["tgid", "name"])
    start_df = full_df.filter(pl.col("cap_type") == "start").rename({"ts_ns": "start_ns"}).drop("cap_type")
    end_df = full_df.filter(pl.col("cap_type") == "end").rename({"ts_ns": "end_ns"}).drop(["cap_type", "full_name"])

    # Join them to get the process table
    return start_df.join(end_df, "pid")



In [None]:
filter_process_trace(pl.read_parquet(raw_coll_info["process_trace"]))

In [None]:
filter_process_trace(pl.read_parquet(start_coll_info["process_trace"]))

In [None]:
import polars as pl

def filter_process_trace(process_trace_df: pl.DataFrame) -> pl.DataFrame :
    df = process_trace_df
    # Filter just the processes
    df = df.filter(pl.col("tgid") == pl.col("pid")).drop("collection_id")

    # Find the last name of each process
    start_df = df.sort(pl.col("ts_ns"), descending = True)
    helper_dict = {}
    for row in start_df.iter_rows():
        pid = row[0]
        comm = row[3]
        if pid in helper_dict.keys() or comm == "": 
            continue
        helper_dict[pid] = comm

    # Separate the start and end
    full_df = start_df.with_columns(pl.col("pid").map_elements(lambda x : helper_dict.get(x, ""), return_dtype=str).alias("full_name"))
    full_df = full_df.drop(["tgid", "name"])
    start_df = full_df.filter(pl.col("cap_type") == "start").rename({"ts_ns": "start_ns"}).drop("cap_type")
    end_df = full_df.filter(pl.col("cap_type") == "end").rename({"ts_ns": "end_ns"}).drop(["cap_type", "full_name"])

    # Join them to get the process table
    combined_df = start_df.join(end_df, "pid")
    return combined_df.with_columns((pl.col("end_ns") - pl.col("start_ns")).alias("duration"))

def process_trace_start_end_ts(process_trace_df: pl.DataFrame, proc_name: str, index: int) ->(int, int, int):
    trace_df = filter_process_trace(process_trace_df).sort(pl.col("start_ns"))
    df = trace_df.filter(pl.col("full_name") == proc_name)
    print(df)
    df = df[index]
    pid = df["pid"][0]
    start_ns = df["start_ns"][0]
    end_ns = df["end_ns"][0]
    return pid, start_ns, end_ns

def clean_rss_pid(rss_df: pl.DataFrame, pid: int) -> pl.DataFrame:
    df = rss_df.drop(["pid", "collection_id"]).sort(pl.col("ts_ns"))
    df = df.filter(pl.col("tgid") == pid)
    df = df.with_columns(pl.when(pl.col("member") == "MM_FILEPAGES")
                     .then(pl.col("count"))
                     .otherwise(None)
                     .fill_null(strategy="forward")
                     .fill_null(strategy="backward")
                     .alias("file"))
    df = df.with_columns(pl.when(pl.col("member") == "MM_ANONPAGES")
                     .then(pl.col("count"))
                     .otherwise(None)
                     .fill_null(strategy="forward")
                     .fill_null(strategy="backward")
                     .alias("anon"))
    df = df.with_columns(pl.when(pl.col("member") == "MM_SWAPENTS")
                     .then(pl.col("count"))
                     .otherwise(None)
                     .fill_null(strategy="forward")
                     .fill_null(strategy="backward")
                     .alias("swap"))
    df = df.drop(["member", "count"])
    zero_df = pl.DataFrame({"tgid": pid, "ts_ns" : -1, "file" : 0, "anon": 0, "swap": 0})
    df = pl.concat([df, zero_df]).sort("ts_ns")
    df = df.fill_null(strategy="forward")
    df = df.filter(pl.col("ts_ns") >= 0)
    df = df.with_columns((pl.col("file") + pl.col("anon") + pl.col("swap")).alias("count"))
    df = df.drop(["file", "anon", "swap"])
    return df

def filter_rss_with_ts(rss_trace_df: pl.DataFrame, start: int, end: int):
    print(start, end)
    new_frame_dict = {}
    for column_name in rss_trace_df.columns:
        new_frame_dict[column_name] = [None, None]
    new_frame_dict["ts_ns"] = [start, end]
    df = rss_trace_df.vstack(pl.DataFrame(new_frame_dict))
    df = df.sort(pl.col("ts_ns")).fill_null(strategy="forward").fill_null(strategy="backward")
    return df.filter(pl.col("ts_ns").is_between(start, end, closed='both'))

def get_proper_rss(proc_path: Path, rss_path: Path, rss_name: str, rss_ind: int, runner_name: str, runner_ind: int, tag:str):
    proc_trace_df = pl.read_parquet(proc_path)
    rss_df = pl.read_parquet(rss_path)

    _, start, end = process_trace_start_end_ts(proc_trace_df, runner_name, runner_ind)
    pid, _, _ = process_trace_start_end_ts(proc_trace_df, rss_name, rss_ind)
    clean_rss_df = filter_rss_with_ts(clean_rss_pid(rss_df, pid), start, end)
    return clean_rss_df.with_columns((pl.col("ts_ns") - pl.min("ts_ns")).alias("norm_ts_ns")).with_columns(pl.lit(tag).alias('policy'))

from pathlib import Path
from plotnine import ggplot, aes, geom_line, geom_point, labs

def create_graph(inputs: [(str, dict[str, Path])], proc_tag: str, proc_ind: int, time_proc_tag: str, time_proc_index: int, title: str) -> None:
    df = pl.DataFrame()
    for (tag, filedict) in inputs:
        append_df = get_proper_rss(filedict["process_trace"],
                                   filedict["mm_rss_stat"],
                                   proc_tag, proc_ind,
                                   time_proc_tag,
                                   time_proc_index,
                                   tag).drop(["tgid", "ts_ns"])
        df = pl.concat([df, append_df])
    df = df.with_columns((pl.col("norm_ts_ns") / (10**9)/ 60).alias("norm_ts_mins"))
    plt0 = (ggplot(df)
            + aes("norm_ts_mins", y="count", colour="policy")
            + geom_point()
            + geom_line()
            + labs(x="Time (mins)",
                   y="4kB Pages",
                   title=title)
           )
    return plt0

In [None]:
collect = Collector("./config/redis_never.yaml")
collect.start_collection(None)
redis_never_info = collect.wait()
collect = Collector("./config/redis_madvise.yaml")
collect.start_collection(None)
redis_madvise_info = collect.wait()
collect = Collector("./config/redis_always.yaml")
collect.start_collection(None)
redis_always_info = collect.wait()

In [None]:
from IPython.display import Image, display
plt = create_graph([("4k", redis_never_info),
                    ("madvise", redis_madvise_info),
                    ("thp", redis_always_info)],
                   "redis-server", 0, 
                   "redis-server", 0,
                   "Redis driven by YCSB with Insertions and Deletes using Jemalloc")
plt.save("deletes-redis.png")

In [None]:
Image(filename="./deletes-redis.png")