In [83]:
!make docker-image > /dev/null 2>&1

In [84]:
!CONTAINER_CMD="bash -lc 'make install-ycsb'" make docker > /dev/null 2>&1

In [85]:
from pathlib import Path
import pexpect
import os
import time

""" Collector class has helper methods to interact with kermit"""
class Collector: 
    def __init__(self, config: Path):
        self.env = os.environ.copy()
        self.env["INTERACTIVE"] = "it"
        self.env["CONTAINER_CMD"] = f"bash -lc 'KERNMLOPS_CONFIG_FILE={config} make collect-data'"
        self.collect_process : pexpect.spawn | None = None

    def start_collection(self, logfile=None):
        self.collect_process = pexpect.spawn("make docker", env=self.env, timeout=None, logfile=logfile)
        self.collect_process.expect_exact(["Started benchmark"])

    def _after_run_generate_file_data() -> dict[str, list[Path]]:
        start_path : Path = Path("./data")
        list_of_collect_id_dirs = start_path.glob("*/*/*")
        latest_collect_id = max(list_of_collect_id_dirs, key=os.path.getctime)
        list_of_files = latest_collect_id.glob("*.*.parquet")
        output = {}
        for f in list_of_files:
            index = str(f).removeprefix(str(f.parent) + "/").split(".")[0]
            if index not in output.keys():
                output[index] = []
            output[index].append(f)
        return output
        
    def wait(self) -> int:
        if self.collect_process is None:
            return
        self.collect_process.expect([pexpect.EOF])
        self.collect_process.wait()
        return Collector._after_run_generate_file_data()
        
    def stop_collection(self):
        if self.collect_process is None:
            return
        self.collect_process.sendline("END")
        return self.wait()

In [86]:
def normalize_function_hits(df, function_col='function', output_scale='percentage'):
    """
    Processes raw trace hits to normalized function frequencies.
    
    Args:
        df: DataFrame where each row is a single trace hit
        function_col: Column containing function names (default: 'function')
        output_scale: 'percentage' (0-100%) or 'range' ([0,1]) (default: 'percentage')
    
    Returns:
        DataFrame with columns: [function, count, normalized]
    """
    # Count hits per function
    func_counts = df[function_col].value_counts().reset_index()
    func_counts.columns = ['function', 'count']
    
    # Normalize
    if output_scale == 'percentage':
        func_counts['normalized'] = (func_counts['count'] / func_counts['count'].sum()) * 100
    elif output_scale == 'range':
        func_counts['normalized'] = (func_counts['count'] - func_counts['count'].min()) / \
                                   (func_counts['count'].max() - func_counts['count'].min())
    else:
        raise ValueError("output_scale must be 'percentage' or 'range'")
    
    return func_counts.sort_values('normalized', ascending=False)

In [13]:
import subprocess
import sys
collect = Collector("./config/compound_overrides.yaml")
# This creates a raw collector, I suggest looking into this file to learn more

w = open("hello.txt", "bw")
collect.start_collection(logfile=w)
print("Collection has started")
# Start collection

f = open("blah.txt", "w")
bench_test = subprocess.Popen(["cat", "defaults.yaml"], stdout=f)
bench_test.wait()
# Run benchmark application

print("Exit application")
raw_coll_info = collect.stop_collection()
print(raw_coll_info)
# Stop the Collector

Collection has started
Exit application
{'system_info': [PosixPath('data/curated/faux/eca141ba-776c-4e97-88f8-9391d7ca171f/system_info.end.parquet')], 'compound': [PosixPath('data/curated/faux/eca141ba-776c-4e97-88f8-9391d7ca171f/compound.end.parquet')]}


In [14]:
import pandas as pd
raw_df = pd.read_parquet(raw_coll_info["compound"])
raw_df.head()

Unnamed: 0,timestamp,function,stack_hash,collection_id
0,105999045136,pick_next_task,584,eca141ba-776c-4e97-88f8-9391d7ca171f
1,105999045248,pick_next_task,584,eca141ba-776c-4e97-88f8-9391d7ca171f
2,105999161070,pick_next_task,724,eca141ba-776c-4e97-88f8-9391d7ca171f
3,105999161104,pick_next_task,606,eca141ba-776c-4e97-88f8-9391d7ca171f
4,105999537313,pick_next_task,983,eca141ba-776c-4e97-88f8-9391d7ca171f


In [17]:
normalized_raw = normalize_function_hits(raw_df)
normalized_raw

Unnamed: 0,function,count,normalized
0,schedule,184105,22.112988
1,pick_next_task_fair,180306,21.656687
2,vfs_read,161526,19.401008
3,pick_next_task,154907,18.605995
4,__alloc_pages,126669,15.214308
5,filemap_read,8440,1.013735
6,ext4_file_read_iter,8435,1.013134
7,mempool_alloc,3324,0.399248
8,enqueue_task_fair,1332,0.159988
9,check_preempt_wakeup,1180,0.141731


In [97]:
collect = Collector("./config/start_comp_overrides.yaml")
# This is a simple redis benchmark config

w = open("hello.txt", "bw")
collect.start_collection(logfile=w)
# Start collection

start_coll_info = collect.wait()
#Wait for collector to finish

In [98]:
redis_df = pd.read_parquet(start_coll_info["compound"])
redis_df.head()

Unnamed: 0,timestamp,function,stack_hash,collection_id
0,188039917025,pick_next_task,843,bd79ce5a-7e9a-45b1-95fe-2a058a3e4757
1,188039917053,pick_next_task,724,bd79ce5a-7e9a-45b1-95fe-2a058a3e4757
2,188039993033,pick_next_task,843,bd79ce5a-7e9a-45b1-95fe-2a058a3e4757
3,188039993046,pick_next_task,606,bd79ce5a-7e9a-45b1-95fe-2a058a3e4757
4,188039993060,pick_next_task,724,bd79ce5a-7e9a-45b1-95fe-2a058a3e4757


In [91]:
normalized_redis = normalize_function_hits(redis_df)
normalized_redis

Unnamed: 0,function,count,normalized
0,schedule,621167,17.901562
1,pick_next_task_fair,621123,17.900293
2,pick_next_task,611036,17.609594
3,vfs_read,542807,15.643286
4,__alloc_pages,402163,11.590032
5,check_preempt_wakeup,256045,7.379023
6,enqueue_task_fair,244954,7.059388
7,filemap_read,47831,1.378453
8,ext4_file_read_iter,47828,1.378367
9,mempool_alloc,32323,0.931524


In [99]:
collect = Collector("./config/start_comp_overrides_copy.yaml")
# This is a simple redis benchmark config

w = open("hello.txt", "bw")
collect.start_collection(logfile=w)
# Start collection

start_coll_info = collect.wait()
#Wait for collector to finish

In [100]:
redis_always_df = pd.read_parquet(start_coll_info["compound"])
redis_always_df.head()

Unnamed: 0,timestamp,function,stack_hash,collection_id
0,188300902033,pick_next_task,467,4ada244d-cb5c-4e0b-9922-fa77bfa8ae0e
1,188300902058,pick_next_task,724,4ada244d-cb5c-4e0b-9922-fa77bfa8ae0e
2,188300955876,pick_next_task,843,4ada244d-cb5c-4e0b-9922-fa77bfa8ae0e
3,188300955890,pick_next_task,606,4ada244d-cb5c-4e0b-9922-fa77bfa8ae0e
4,188301049032,pick_next_task,843,4ada244d-cb5c-4e0b-9922-fa77bfa8ae0e


In [101]:
normalized_redis_always_df = normalize_function_hits(redis_always_df)

In [102]:
normalized_redis_always_df

Unnamed: 0,function,count,normalized
0,pick_next_task_fair,641039,18.145712
1,schedule,639982,18.115791
2,pick_next_task,623318,17.644088
3,vfs_read,549174,15.545315
4,__alloc_pages,403210,11.413553
5,check_preempt_wakeup,262375,7.426976
6,enqueue_task_fair,237493,6.722648
7,filemap_read,50879,1.440218
8,ext4_file_read_iter,50877,1.440161
9,mempool_alloc,32157,0.910259


In [103]:
collect = Collector("./config/comp_mongodb.yaml")
# This is a simple redis benchmark config

w = open("hello.txt", "bw")
collect.start_collection(logfile=w)
# Start collection

start_coll_info = collect.wait()
#Wait for collector to finish

OSError: [Errno 28] No space left on device

In [None]:
mongodb_df = pd.read_parquet(start_coll_info["compound"])
mongodb_df.head()

In [None]:
normalizedmongodb_df = normalize_function_hits(mongodb_df)
normalizedmongodb_df

In [58]:
def get_top_call_chains(workload_df, n=5):
    top_chains = (
        workload_df.groupby(['stack_hash', 'stack_sequence'])
        .size()
        .reset_index(name='count')
        .sort_values('count', ascending=False)
        .head(n)
    )
    return top_chains

In [49]:
top_cpu_chains = get_top_call_chains(redis_df)
top_cpu_chains

Unnamed: 0,stack_hash,function,count
209,51,vfs_read,474220
2695,691,__alloc_pages,254649
3610,925,schedule,148985
2823,723,pick_next_task_fair,141568
2828,724,pick_next_task,134345


In [66]:
import pandas as pd

def reconstruct_stacks(df):
    """Rebuild call stacks from individual hook hits."""
    # Group by stack_hash, sort by timestamp within each group
    grouped = df.groupby('stack_hash', group_keys=False).apply(
        lambda x: x.sort_values('timestamp')
    )
    
    # Join functions in order to form the full stack trace
    stack_sequences = (
        grouped.groupby('stack_hash')['function']
        .apply(lambda x: ';'.join(x))
        .reset_index(name='stack_sequence')
    )
    
    # Count occurrences of each stack
    stack_counts = (
        df['stack_hash'].value_counts()
        .reset_index(name='count')
    )
    stack_counts.columns = ['stack_hash', 'count']
    
    # Merge sequences with counts
    result = pd.merge(stack_sequences, stack_counts, on='stack_hash')
    return result.sort_values('count', ascending=False)

# Example usage:
reconstructed_stacks = reconstruct_stacks(redis_df)
reconstructed_stacks

  grouped = df.groupby('stack_hash', group_keys=False).apply(


Unnamed: 0,stack_hash,stack_sequence,count
51,51,vfs_read;vfs_read;vfs_read;vfs_read;vfs_read;v...,476656
691,691,__alloc_pages;__alloc_pages;__alloc_pages;__al...,254686
983,983,pick_next_task;pick_next_task;pick_next_task;p...,157788
925,925,schedule;schedule;schedule;schedule;schedule;s...,149068
723,723,pick_next_task_fair;pick_next_task_fair;pick_n...,141614
...,...,...,...
541,541,enqueue_task_fair;check_preempt_wakeup;enqueue...,4
289,289,blk_stat_add;check_preempt_wakeup;check_preemp...,4
1011,1011,__alloc_pages;check_preempt_wakeup;enqueue_tas...,3
130,130,check_preempt_wakeup;check_preempt_wakeup;chec...,3


In [77]:
top_stack_chains = get_top_call_chains(clean_df)
top_stack_chains

TypeError: Series.sort_values() takes 1 positional argument but 2 positional arguments (and 1 keyword-only argument) were given

In [74]:
def clean_stack_sequences(df):
    """Remove consecutive duplicate functions from stacks."""
    df['cleaned_stack'] = df['stack_sequence'].apply(
        lambda s: ';'.join([func for i, func in enumerate(s.split(';')) if i == 0 or func != s.split(';')[i-1]])
    )
    return df

# Apply cleaning
df_cleaned = clean_stack_sequences(reconstructed_stacks)

KeyboardInterrupt: 

In [75]:

def reconstruct_clean_stacks(df):
    """
    Reconstruct call stacks from raw hook data while:
    1. Preserving original call order
    2. Removing consecutive duplicates
    3. Maintaining accurate hit counts
    """
    # Helper function to clean sequences
    def remove_consecutive_dupes(sequence):
        return ';'.join([v for i, v in enumerate(sequence) if i == 0 or v != sequence[i-1]])
    
    # Process each stack_hash group
    result = (
        df.sort_values('timestamp')
        .groupby('stack_hash', group_keys=False)
        .apply(lambda group: pd.Series({
            'stack_sequence': remove_consecutive_dupes(group['function'].tolist()),
            'count': len(group)
        }))
        .reset_index()
    )
    
    # Merge with original counts if needed
    return result.sort_values('count', ascending=False)

In [76]:
clean_df = reconstruct_clean_stacks(redis_df)
clean_df

  .apply(lambda group: pd.Series({


Unnamed: 0,stack_hash,stack_sequence,count
51,51,vfs_read;__alloc_pages;vfs_read;__alloc_pages;...,476656
691,691,__alloc_pages;check_preempt_wakeup;__alloc_pag...,254686
983,983,pick_next_task;check_preempt_wakeup;pick_next_...,157788
925,925,schedule;filemap_fault;check_preempt_wakeup;en...,149068
723,723,pick_next_task_fair;blk_stat_add;pick_next_tas...,141614
...,...,...,...
541,541,enqueue_task_fair;check_preempt_wakeup;enqueue...,4
289,289,blk_stat_add;check_preempt_wakeup;enqueue_task...,4
1011,1011,__alloc_pages;check_preempt_wakeup;enqueue_tas...,3
130,130,check_preempt_wakeup,3
