# <font color="#000099">llm-d-benchmarking Sweep Analysis</font>

This notebook imports benchmark report data from configuration sweeps from [llm-d-benchmark](https://github.com/llm-d/llm-d-benchmark), and creates Pareto plots to compare configurations for a particular model and workload.

The first cell contains function and class definitions to support basic functionality, while the second cell imports data from user-provided directories into [Pandas](https://pandas.pydata.org/) [DataFrames](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html).

The cells following will look at the different scenarios (a particular selection of model, GPU, and workload input/output size) and create tables and Pareto plots for different configurations under these scenarios.

While this basic functionality may be sufficient for many purposes, this notebook should be considered a starting point for more detailed analysis and customization by the user.

## Package imports and definitions (run once)

In [None]:
################################################################################
# Package imports
################################################################################

import os
import sys
from pathlib import Path

import matplotlib
import matplotlib.pyplot as plt
import pandas

#sys.path.insert(0, '../workload/report/')
import convert
import schema


class Text:
    """ANSI SGR control codes for text formatting"""
    DEFAULT = "\x1b[0m"
    BOLD = "\x1b[1m"
    BOLD_OFF = "\x1b[22m"
    UNDERLINE = "\x1b[4m"
    UNDERLINE_OFF = "\x1b[24m"
    DEFAULT_COLOR = "\x1b[39m"
    DEFAULT_BG_COLOR = "\x1b[49m"
    RED = "\x1b[31m"
    YELLOW = "\x1b[33m"
    GREEN = "\x1b[32m"
    CYAN = "\x1b[36m"
    BLUE = "\x1b[34m"
    MAGENTA = "\x1b[35m"
    BLACK = "\x1b[30m"
    WHITE = "\x1b[37m"
    BG_RED = "\x1b[41m"
    BG_YELLOW = "\x1b[43m"
    BG_GREEN = "\x1b[42m"
    BG_CYAN = "\x1b[46m"
    BG_BLUE = "\x1b[44m"
    BG_MAGENTA = "\x1b[45m"
    BG_BLACK = "\x1b[40m"
    BG_WHITE = "\x1b[47m"


def info(mesg: str) -> None:
    """Print information message.

    Args:
        mesg (str): Information message.
    """
    sys.stderr.write(f'{Text.GREEN}{mesg}\n{Text.DEFAULT}')


def warn(mesg: str) -> None:
    """Print a warning message.

    Args:
        mesg (str): Warming message.
    """
    sys.stderr.write(f'{Text.YELLOW}{mesg}\n{Text.DEFAULT}')


def error(mesg: str, err_code: int = 1) -> None:
    """Print an error message and exit with an error code.

    Args:
        mesg (str): Error message.
        err_code (int): Error code.
    """
    sys.stderr.write(f'{Text.RED}{mesg}\n{Text.DEFAULT}')
    sys.exit(err_code)


def check_dir(dir: str) -> None:
    """Print an error if directory does not exist.

    Args:
        dir (str): Directory to check existence of.
    """
    if not os.path.isdir(dir):
        error(f'Invalid path: {dir}')


def check_file(file: str) -> None:
    """Print an error if file does not exist.

    Args:
        file (str): File to check existence of.
    """
    if not os.path.isfile(file):
        error(f'Invalid file: {file}')


def get_benchmark_report_files(source_dir: str) -> list[str]:
    """Get a list of benchmark report files within provided path (recursive).

    Args:
        source_dir (str): Directory to recursively search for results files.
    
    Returns:
        list: List of paths to benchmark report files.
    """
    rb_files = []
    check_dir(source_dir)
    path = Path(source_dir)
    for file in path.rglob("benchmark_report,_*.yaml"):
        rb_files.append(str(file))
    return rb_files


def make_benchmark_runs_df() -> pandas.core.frame.DataFrame:
    """Create DataFrame for benchmark run results.

    Returns:
        DataFrame: Empty DataFrame for benchmark runs.
    """
    return pandas.DataFrame(columns=[
        'Name',
        'Directory',
        'Model',
        'GPU',
        'DP',
        'TP',
        'PP',
        'EP',
        'Replicas',
        'P_DP',
        'P_TP',
        'P_PP',
        'P_EP',
        'P_Replicas',
        'D_DP',
        'D_TP',
        'D_PP',
        'D_EP',
        'D_Replicas',
        'Concurrency',
        'ISL',
        'OSL',
        'Backend',
        'Duration',
        'Completed',
        'Request_Throughput',
        'Output_Token_Throughput',
        'Total_Token_Throughput',
        'Mean_TTFT_ms',
        'Mean_TPOT_ms',
        'Mean_ITL_ms',
        'Mean_E2EL_ms',
    ])


def _get_replicas_and_parallelism(report: schema.BenchmarkReport) -> dict[str, int | None]:
    """Get the number of replicas and parallelisms.

    Args:
        report (BenchmarkReport): Benchmark run to evaluate.

    Returns:
        dict[str, int | None]: Replicas and parallelisms for standalone or
            prefill/decode configuration. Irrelevant fields will have a value
            of None.
    """
    rp = {}
    rp['replicas'] = report.scenario.host.type.count(schema.HostType.REPLICA)
    rp['p_replicas'] = report.scenario.host.type.count(schema.HostType.PREFILL)
    rp['d_replicas'] = report.scenario.host.type.count(schema.HostType.DECODE)
    if rp['replicas'] == 0:
        rp['replicas'] = None
    if rp['p_replicas'] == 0:
        rp['p_replicas'] = None
    if rp['d_replicas'] == 0:
        rp['d_replicas'] = None
    rp['tp'] = None
    rp['dp'] = None
    rp['pp'] = None
    rp['ep'] = None
    rp['p_tp'] = None
    rp['p_dp'] = None
    rp['p_pp'] = None
    rp['p_ep'] = None
    rp['d_tp'] = None
    rp['d_dp'] = None
    rp['d_pp'] = None
    rp['d_ep'] = None
    if rp['replicas']:
        # We have a standalone setup
        rp['tp'] = report.scenario.host.accelerator[0].parallelism.tp
        rp['dp'] = report.scenario.host.accelerator[0].parallelism.dp
        rp['pp'] = report.scenario.host.accelerator[0].parallelism.pp
        rp['ep'] = report.scenario.host.accelerator[0].parallelism.ep
        return rp
    # We have a P/D setup
    for ii, accel in enumerate(report.scenario.host.accelerator):
        if report.scenario.host.type[ii] is schema.HostType.PREFILL and not rp['p_tp']:
            rp['p_tp'] = accel.parallelism.tp
            rp['p_dp'] = accel.parallelism.dp
            rp['p_pp'] = accel.parallelism.pp
            rp['p_ep'] = accel.parallelism.ep
        if report.scenario.host.type[ii] is schema.HostType.DECODE and not rp['d_tp']:
            rp['d_tp'] = accel.parallelism.tp
            rp['d_dp'] = accel.parallelism.dp
            rp['d_pp'] = accel.parallelism.pp
            rp['d_ep'] = accel.parallelism.ep
        if rp['p_tp'] and rp['d_tp']:
            break
    return rp


def _make_name(report: schema.BenchmarkReport) -> str:
    """Create a name based on the benchmark run's configuration.

    Args:
        report (BenchmarkReport): Benchmark report to create a name for.

    Returns:
        str: Name of benchmark run, providing replica and parallelism details.
    """
    rp = _get_replicas_and_parallelism(report)
    if rp['replicas']:
        # We have a standalone setup
        return f'{rp['replicas']}R TP{rp['tp']}'
    # We have a P/D setup
    # TODO we currently assume the only type of parallelism is TP
    return f'{rp['p_replicas']}P TP{rp['p_tp']}, {rp['d_replicas']}D TP{rp['d_tp']}'


def add_benchmark_report_to_df(
    runs_df: pandas.core.frame.DataFrame,
    br_file: str) -> None:
    """Load a results file and add it to the DataFrame of benchmark runs.

    Args:
        runs_df (DataFrame): DataFrame to add a row to for the provided run.
        br_file (str): Benchmark report file to import.
    """
    report = convert.import_benchmark_report(br_file)
    rp = _get_replicas_and_parallelism(report)
    # TODO getting concurrency is speciffic to each harness, will need
    # a way to capture this universally in the report so we don't have to do
    # extractions like this
    if report.scenario.load.args and 'max_concurrency' in report.scenario.load.args:
        # vLLM Benchmark
        concurrency = report.scenario.load.args['max_concurrency']
    elif report.scenario.load.args and 'profile' in report.scenario.load.args \
    and 'measured_concurrencies' in report.scenario.load.args['profile']:
        # GuideLLM
        concurrency = report.scenario.load.args['profile']['measured_concurrencies'][0]
    else:
        warn('"Concurrency" is not defined, setting to 1, "Thpt_per_User" and Pareto plots will also be invalid.')
        concurrency = 1

    runs_df.loc[len(runs_df)] = {
        'Name': _make_name(report),
        # We want the base directory for the sweep, which is two levels up
        'Directory': os.path.abspath(br_file).rsplit(os.sep, 2)[0],
        'Model': report.scenario.model.name,
        # Assume heterogeneous across P and D
        'GPU': report.scenario.host.accelerator[0].model,
        'DP': rp['dp'],
        'TP': rp['tp'],
        'PP': rp['pp'],
        'EP': rp['ep'],
        'Replicas': rp['replicas'],
        'P_DP': rp['p_dp'],
        'P_TP': rp['p_tp'],
        'P_PP': rp['p_pp'],
        'P_EP': rp['p_ep'],
        'P_Replicas': rp['p_replicas'],
        'D_DP': rp['d_dp'],
        'D_TP': rp['d_tp'],
        'D_PP': rp['d_pp'],
        'D_EP': rp['d_ep'],
        'D_Replicas': rp['d_replicas'],
        'Concurrency': concurrency,
        # TODO this may need to be configurable...
        # We need to group by ISL/OSL exactly, so round and convert to int.
        # Round ISL to nearest 10's
        'ISL': int(round(report.metrics.requests.input_length.mean, -1)),
        'OSL': int(round(report.metrics.requests.output_length.mean)),
        'Duration': report.metrics.time.duration,
        'Completed': report.metrics.requests.total,
        'Request_Throughput': report.metrics.throughput.requests_per_sec,
        'Output_Token_Throughput': report.metrics.throughput.output_tokens_per_sec,
        'Total_Token_Throughput': report.metrics.throughput.total_tokens_per_sec,
        'Mean_TTFT_ms': report.metrics.latency.time_to_first_token,
        'Mean_TPOT_ms': report.metrics.latency,
        'Mean_ITL_ms': report.metrics.latency,
        'Mean_E2EL_ms': report.metrics.latency,
    }
    # Add calculated columns
    if rp['tp']:
        runs_df['Is_PD'] = False
        runs_df['Num_GPUs'] = runs_df['TP']*runs_df['Replicas']
    else:
        runs_df['Is_PD'] = True
        runs_df['Num_GPUs'] = runs_df['P_TP']*runs_df['P_Replicas'] + runs_df['D_TP']*runs_df['D_Replicas']
    runs_df['Thpt_per_GPU'] = runs_df['Output_Token_Throughput']/runs_df['Num_GPUs']
    runs_df['Thpt_per_User'] = runs_df['Output_Token_Throughput']/runs_df['Concurrency']


def get_scenarios(runs_df: pandas.core.frame.DataFrame) -> list[tuple[str]]:
    """Get a list of available scenarios from runs DataFrame, where
    configurations and concurrency will be swept.

    Args:
        runs_df (DataFrame): Benchmark runs to find the scenarios for.

    Returns:
        list[tuple[str]]: List of scenarios, consisting of unique groups of
            model, GPU type, ISL, and OSL.
    """
    columns = ['Model', 'GPU', 'ISL', 'OSL']
    return list(set(runs_df.set_index(columns).index))


def print_scenarios(scenarios: list[tuple[str]]) -> None:
    """Print a formatted table of scenarios.

    Args:
        scenarios (list[tuple[str]]): Scenario groups to print.
    """
    columns = ['Model', 'GPU', 'ISL', 'OSL']
    # Get maximum text length for each column, including header
    spans = list(map(len, columns))
    for sc in scenarios:
        for jj, item in enumerate(sc):
            if spans[jj] < len(str(item)):
                spans[jj] = len(str(item))

    # Create header, starting with scenario index
    header = f'{Text.BOLD}{Text.BLUE}IDX  {Text.DEFAULT}{Text.BOLD}'
    # Add each column name to header
    for ii, col in enumerate(columns):
        header += col + " " * (spans[ii] - len(col) + 2)
    header += f'{Text.DEFAULT}'
    print(header)

    # Print details of each scenario
    for ii, sc in enumerate(scenarios):
        row = f'{Text.BLUE}{ii}{Text.DEFAULT}' + " " * (5 - len(str(ii)))
        for jj, val in enumerate(sc):
            row += f'{str(val)}' + " " * (spans[jj] - len(str(val)) + 2)
        print(row)

## Import datasets

In [None]:
################################################################################
# User inputs
################################################################################

# List of directories containing benchmark sweeps to import.
search_dirs = [
    '/files/',
]

################################################################################
# Standard code
################################################################################

# Create blank DataFrames for benchmarking runs
runs = make_benchmark_runs_df()

# Populate the runs DataFrame
for sdir in search_dirs:
    info(f'Searching for benchmark report files within {sdir}')
    # Find all benchmark report files in the directory
    for br_file in get_benchmark_report_files(sdir):
        info(f'Importing {br_file}')
        # Import the results and add to the runs DataFrame
        add_benchmark_report_to_df(runs, br_file)

## Scenarios available

In [None]:
# Scenarios available, where model, GPU type, ISL and OSL are constant.
# Configurations (seplicas and parallelism) are swept within a scenario.

scenarios = get_scenarios(runs)
print_scenarios(scenarios)

## Pareto plotting and tables

In [None]:
################################################################################
# User inputs
################################################################################

# Select scenario
idx = 0

# Show P/D disaggregated scenarios
show_pd = True
# Show standalone scenarios
show_sa = True

# Segregate traces by directory (directories with identical scenarios, such as
# repeated runs, will not be joined together in a single trace)
seg_by_dir = True

################################################################################
# Standard code
################################################################################

# Get parameters of selected scenario
model, gpu, isl, osl = scenarios[idx]

# Filter on column values
pd_runs_selected = runs[
    (runs['Model'] == model) &
    (runs['GPU'] == gpu) &
    (runs['ISL'] == isl) &
    (runs['OSL'] == osl)][[
    'Model',
    'GPU',
    'P_TP',
    'P_Replicas',
    'D_TP',
    'D_Replicas',
    'Concurrency',
    'ISL',
    'OSL',
    'Output_Token_Throughput',
    'Thpt_per_GPU',
    'Thpt_per_User',
    'Directory']].drop('Model', axis=1).drop('GPU', axis=1).drop('ISL', axis=1).drop('OSL', axis=1)#.sort_values(by='Output_Token_Throughput')

sa_runs_selected = runs[
    (runs['Model'] == model) &
    (runs['GPU'] == gpu) &
    (runs['ISL'] == isl) &
    (runs['OSL'] == osl) &
    (runs['Is_PD']) == False][[
    'Model',
    'GPU',
    'TP',
    'Replicas',
    'Concurrency',
    'ISL',
    'OSL',
    'Output_Token_Throughput',
    'Thpt_per_GPU',
    'Thpt_per_User',
    'Directory']].drop('Model', axis=1).drop('GPU', axis=1).drop('ISL', axis=1).drop('OSL', axis=1)#.sort_values(by='Output_Token_Throughput')

# Plot performance results
colors = ['#FF0000', '#FFAA00', '#DDDD00', '#00DD00', '#00FFFF', '#0000FF',
          '#FF00FF', '#666666', '#000000']

# Unique configurations of replicas and TP, described as a tuple
# Tuple format is (rep, tp, p_rep, p_tp, d_rep, d_tp, dir, is_pd)
config_sets = []
if seg_by_dir:
    configs_pd = list(set(pd_runs_selected.set_index(['P_Replicas', 'P_TP', 'D_Replicas', 'D_TP', 'Directory']).index))
    configs_sa = list(set(sa_runs_selected.set_index(['Replicas', 'TP', 'Directory']).index))
    for conf in configs_pd:
        config_sets.append((
            None,    # Replicas
            None,    # TP
            conf[0], # P replicas
            conf[1], # P TP
            conf[2], # D replicas
            conf[3], # D TP
            conf[4], # Directory
            True     # Is PD
        ))
    for conf in configs_sa:
        config_sets.append((
            conf[0], # Replicas
            conf[0], # TP
            None,    # P replicas
            None,    # P TP
            None,    # D replicas
            None,    # D TP
            conf[2], # Directory
            False    # Is PD
        ))
else:
    pd_runs_selected = pd_runs_selected.drop('Directory', axis=1)
    sa_runs_selected = sa_runs_selected.drop('Directory', axis=1)
    configs_pd = list(set(pd_runs_selected.set_index(['P_Replicas', 'P_TP', 'D_Replicas', 'D_TP']).index))
    configs_sa = list(set(sa_runs_selected.set_index(['Replicas', 'TP']).index))
    for conf in configs_pd:
        config_sets.append((
            None,    # Replicas
            None,    # TP
            conf[0], # P replicas
            conf[1], # P TP
            conf[2], # D replicas
            conf[3], # D TP
            None,    # Directory
            True     # Is PD
        ))
    for conf in configs_sa:
        config_sets.append((
            conf[0], # Replicas
            conf[0], # TP
            None,    # P replicas
            None,    # P TP
            None,    # D replicas
            None,    # D TP
            None,    # Directory
            False    # Is PD
        ))

# Sort so prinouts/plots are organized
config_sets.sort()

# Convert the list of sets to a list of dicts, to make code following clearer
configs = []
for conf in config_sets:
    configs.append({
        'rep': conf[0],
        'tp': conf[1],
        'p_rep': conf[2],
        'p_tp': conf[3],
        'd_rep': conf[4],
        'd_tp': conf[5],
        'dir': conf[6],
        'is_pd': conf[7],
    })

# Sweep through configurations
for ii, conf in enumerate(configs):
    is_pd = 'P_TP' in conf
    # Make a DataFrame for specific configuration
    if conf['is_pd']:
        # This configuration is PD
        if seg_by_dir:
            conf_df = pd_runs_selected[
                (pd_runs_selected['P_Replicas'] == conf['p_rep']) &
                (pd_runs_selected['P_TP'] == conf['p_tp']) &
                (pd_runs_selected['D_Replicas'] == conf['d_rep']) &
                (pd_runs_selected['D_TP'] == conf['d_tp']) &
                (pd_runs_selected['Directory'] == conf['dir'])
            ].sort_values(by='Concurrency')
        else:
            conf_df = pd_runs_selected[
                (pd_runs_selected['P_Replicas'] == conf['p_rep']) &
                (pd_runs_selected['P_TP'] == conf['p_tp']) &
                (pd_runs_selected['D_Replicas'] == conf['d_rep']) &
                (pd_runs_selected['D_TP'] == conf['d_tp'])
            ].sort_values(by='Concurrency')

        # Print table
        display(conf_df)
    
        # Plot throughputs for configuration
        plt.plot(conf_df.Thpt_per_User, conf_df.Thpt_per_GPU,
                 label=f'{conf['p_rep']}P-TP{conf['p_tp']} {conf['d_rep']}D-TP{conf['d_tp']}',
                 marker='o', markersize=4,
                 color=colors[ii%len(colors)]
                )
        for jj, val in enumerate(conf_df.Concurrency):
            plt.text(list(conf_df.Thpt_per_User)[jj],
                     list(conf_df.Thpt_per_GPU)[jj]+pd_runs_selected['Thpt_per_GPU'].max()*0.02,
                     str(val), ha='center', color=colors[ii%len(colors)])
    else:
        # This configuration is standalone
        if seg_by_dir:
            conf_df = sa_runs_selected[
                (sa_runs_selected['Replicas'] == conf['rep']) &
                (sa_runs_selected['TP'] == conf['tp']) &
                (sa_runs_selected['Directory'] == conf['dir'])
            ].sort_values(by='Concurrency')
        else:
            conf_df = sa_runs_selected[
                (sa_runs_selected['Replicas'] == conf['rep']) &
                (sa_runs_selected['TP'] == conf['tp'])
            ].sort_values(by='Concurrency')

        # Print table
        display(conf_df)
    
        # Plot throughputs for configuration
        plt.plot(conf_df.Thpt_per_User, conf_df.Thpt_per_GPU,
                 label=f'Replicas: {conf['rep']}  TP{conf['tp']}',
                 marker='o', markersize=4,
                 color=colors[ii%len(colors)]
                )
        for jj, val in enumerate(conf_df.Concurrency):
            plt.text(list(conf_df.Thpt_per_User)[jj],
                     list(conf_df.Thpt_per_GPU)[jj]+sa_runs_selected['Thpt_per_GPU'].max()*0.02,
                     str(val), ha='center', color=colors[ii%len(colors)])

plt.title(f'GPU: {gpu}\nModel: {model}\nISL: {isl}  OSL: {osl}')
plt.xlabel('Tok/s/User', fontsize='16')
plt.ylabel('Tok/s/GPU', fontsize='16')
plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
plt.grid(True, linewidth=1, ls='--', color='gray')
plt.axis([0, None, 0, None])
plt.show()
