In [34]:
import glob
import os
import pandas as pd

In [35]:
scorep_result_dir = './scorep-results'

# get all files in the directory
scorep_result_dir = './scorep-results'
scorep_result_dir = glob.glob(os.path.join(scorep_result_dir, '*'))

In [36]:
otf2_trace = "./scorep-results/bt.C.4.mpi_io_full" + '/traces.otf2'

In [37]:
import otf2.reader
from otf2.events import Enter
def extract_unique_functions(trace_file_path: str) -> set:
    """
    Extract unique function names from an OTF2 trace file.
    
    Args:
        trace_file_path (str): Path to the OTF2 trace file
            Example: './path/to/traces.otf2'
    
    Returns:
        set[str]: Set of unique function names found in the trace
            Example: {'MPI_Init', 'MPI_Finalize', 'MPI_Barrier'}
    
    Raises:
        ValueError: If file path doesn't end with .otf2
        FileNotFoundError: If trace file doesn't exist
    """
    # Validate input file
    if not trace_file_path.endswith('.otf2'):
        raise ValueError("Input file must be an .otf2 trace file")
    
    if not os.path.exists(trace_file_path):
        raise FileNotFoundError(f"Trace file not found: {trace_file_path}")

    unique_functions = set()
    
    try:
        with otf2.reader.open(trace_file_path) as trace:
            for _, event in trace.events:
                if isinstance(event, Enter):
                    unique_functions.add(event.region.name)
    except Exception as e:
        raise RuntimeError(f"Error reading trace file: {str(e)}")
            
    return unique_functions

In [38]:
extract_unique_functions(otf2_trace)

{'MPI_Allreduce',
 'MPI_Barrier',
 'MPI_Bcast',
 'MPI_Comm_dup',
 'MPI_Comm_rank',
 'MPI_Comm_size',
 'MPI_File_close',
 'MPI_File_delete',
 'MPI_File_open',
 'MPI_File_read_at_all',
 'MPI_File_set_view',
 'MPI_File_write_at_all',
 'MPI_Finalize',
 'MPI_Init',
 'MPI_Irecv',
 'MPI_Isend',
 'MPI_Reduce',
 'MPI_Wait',
 'MPI_Waitall'}

# Extract Metrics by Function Calls

In [39]:
from collections import defaultdict
from otf2.events import Enter, Leave

def accumulated_function_time(otf2_trace: str) -> dict:
    """
    Calculate timing metrics from an OTF2 trace file.
    
    Returns a dictionary with keys:
    - Function: Name of the function
    - Total Time (s): Total time spent in function
    - Call Count: Number of invocations
    - Average Time (s): Mean time per call
    
    Usage:
    >>> otf2_trace = "./scorep-results/bt.C.4.mpi_io_full/traces.otf2"
    >>> metrics_dict = calculate_accumulated_function_time(otf2_trace)
    >>> print(metrics_dict)
    
    Results:
    >>> list(metrics_dict.items())[:5]
        [('MPI_Init', {'Total Time (s)': 0.123456, 'Call Count': 10, 'Average Time (s)': 0.012346}),
         ('MPI_Finalize', {'Total Time (s)': 0.234567, 'Call Count': 20, 'Average Time (s)': 0.011728}),
         ('MPI_Barrier', {'Total Time (s)': 0.345678, 'Call Count': 30, 'Average Time (s)': 0.011523}),
         ('MPI_Bcast', {'Total Time (s)': 0.456789, 'Call Count': 40, 'Average Time (s)': 0.011420}),
         ('MPI_Reduce', {'Total Time (s)': 0.567890, 'Call Count': 50, 'Average Time (s)': 0.011358})]
    """
    # Input validation
    if not otf2_trace.endswith('.otf2'):
        raise ValueError("Input file must be an .otf2 trace file")
    
    # Initialize tracking dictionaries
    metrics = {
        'times': defaultdict(float), 
        'counts': defaultdict(int)    
    }
    call_stacks = defaultdict(list)
    
    # Process trace events
    with otf2.reader.open(otf2_trace) as trace:
        resolution = trace.timer_resolution
        
        for location, event in trace.events:
            if isinstance(event, Enter):
                call_stacks[location].append((event.region, event.time))
                metrics['counts'][event.region] += 1
                
            elif isinstance(event, Leave):
                if not call_stacks[location]:
                    raise RuntimeError(f"Unmatched Leave event in {location.name}")
                    
                region, start_time = call_stacks[location].pop()
                if region != event.region:
                    raise RuntimeError(f"Mismatched Enter/Leave in {location.name}")
                
                duration = (event.time - start_time) / resolution
                metrics['times'][region] += duration
    
   # Constructing the result dictionary 
   
    result_dict = {}
    for fn in metrics['times']:
        result_dict[fn.name] = {
            "Total Time (s)": metrics['times'][fn],
            "Call Count": metrics['counts'][fn],
            "Average Time (s)": metrics['times'][fn] / metrics['counts'][fn]
        }
    
    return result_dict

In [40]:
import pandas as pd
acc_fun_time_df = pd.DataFrame(accumulated_function_time(otf2_trace)).T
acc_fun_time_df

Unnamed: 0,Total Time (s),Call Count,Average Time (s)
MPI_Init,3.571143,4.0,0.8927858
MPI_Comm_size,7e-06,4.0,1.679048e-06
MPI_Comm_rank,2e-06,4.0,4.985714e-07
MPI_Comm_dup,0.002375,8.0,0.0002969209
MPI_Bcast,0.003023,32.0,9.445419e-05
MPI_File_delete,0.035998,1.0,0.03599807
MPI_Barrier,0.112276,12.0,0.009356348
MPI_File_open,0.233338,8.0,0.02916728
MPI_File_set_view,0.002926,8.0,0.0003657445
MPI_Irecv,0.013122,9672.0,1.356701e-06


In [41]:
import pandas as pd

def beautify_df(df: pd.DataFrame, top_n: int = 10, sort_by: str | int = 0) -> pd.DataFrame:
    """
    Beautify the DataFrame by sorting and aggregating smaller values.
    
    Parameters:
    df (pd.DataFrame): The input DataFrame.
    top_n (int): The number of top rows to retain before aggregating the rest.
    sort_by (str | int): The column name or index to sort by.

    Returns:
    pd.DataFrame: A beautified DataFrame with the top_n rows and an aggregated 'others' row.
    """
    # Determine the sorting column
    if isinstance(sort_by, int):
        if sort_by < 0 or sort_by >= len(df.columns):
            raise ValueError("Invalid column index for sorting.")
        sort_by = df.columns[sort_by]
    elif sort_by not in df.columns:
        raise ValueError("Invalid column name for sorting.")
    
    sorted_df = df.sort_values(by=sort_by, ascending=False)
    
    # Aggregate rows beyond the top_n into a single row labeled 'others'
    if len(sorted_df) > top_n:
        remaining_df = sorted_df.iloc[top_n:].sum(numeric_only=True).to_frame().T
        remaining_df.index = ['others']
    else:
        return sorted_df
    
    # Concatenate top rows with the aggregated 'others' row
    result_df = pd.concat([sorted_df.head(top_n), remaining_df])
    
    return result_df.round(4)

In [42]:
beautify_df(acc_fun_time_df, top_n=5, sort_by=2)

Unnamed: 0,Total Time (s),Call Count,Average Time (s)
MPI_Init,3.5711,4.0,0.8928
MPI_File_write_at_all,20.8652,160.0,0.1304
MPI_File_read_at_all,12.7506,160.0,0.0797
MPI_File_delete,0.036,1.0,0.036
MPI_File_open,0.2333,8.0,0.0292
others,11.7054,30052.0,0.0128


In [43]:
def list_all_events(trace_file_path: str):
    """
    List all events from an OTF2 trace file.
    
    Args:
        trace_file_path (str): Path to the OTF2 trace file
    
    Returns:
        list: A list of all events found in the trace file
    """
    # Validate input file
    if not trace_file_path.endswith('.otf2'):
        raise ValueError("Input file must be an .otf2 trace file")
    
    if not os.path.exists(trace_file_path):
        raise FileNotFoundError(f"Trace file not found: {trace_file_path}")
    
    event_list = []
    with otf2.reader.open(trace_file_path) as trace:
        for _, event in trace.events:
            event_list.append(event)
    
    return event_list

# Example usage
all_events = list_all_events(otf2_trace)
print(len(all_events))


100611


In [44]:
from enum import Enum

class Otf2Paradigm(Enum):
    """
    Enumeration for different I/O paradigms with their identifications in OTF2 trace files.
    
    Attributes:
        MPIIO (str): Represents the MPI-IO paradigm.
        POSIX (str): Represents the POSIX paradigm.
    """
    MPIIO = 'MPI-IO'
    POSIX = 'POSIX'

# Extract Bandwidth Information by I/O Operation Types

In [45]:
with otf2.reader.open(otf2_trace) as trace:
    op_types = set()
    for _, event in trace.events:     
        if isinstance(event, otf2.events.IoOperationBegin):
            op_types.add(event.mode)
        else:
            continue
    print(op_types)

{IoOperationMode.READ, IoOperationMode.WRITE}


In [46]:
def io_bandwidth_by_operation_type(trace_file_path: str) -> dict:
    """
    Calculate I/O bandwidth metrics grouped by operation mode (READ/WRITE) from an OTF2 trace file.

    Args:
        trace_file_path (str): Path to the OTF2 trace file

    Returns:
        dict: Dictionary containing bandwidth metrics per operation mode:
            {
                operation_mode: {
                    'count': int,              # Number of operations
                    'total_bytes': int,        # Total bytes transferred
                    'total_duration': float,   # Total duration in seconds
                    'avg_bandwidth': float,    # Average bandwidth in bytes/second
                    'min_bandwidth': float,    # Minimum bandwidth in bytes/second
                    'max_bandwidth': float     # Maximum bandwidth in bytes/second
                }
            }

    Example:
        >>> metrics = io_bandwidth_by_operation_type("./traces.otf2")
        >>> print(metrics[otf2.IoOperationMode.READ])
        {
            'count': 100,
            'total_bytes': 1048576,
            'total_duration': 0.5,
            'avg_bandwidth': 2097152.0,
            'min_bandwidth': 1048576.0,
            'max_bandwidth': 4194304.0
        }
    """
    # Input validation
    if not otf2_trace.endswith('.otf2'):
        raise ValueError("Input file must be an .otf2 trace file")

    # Initialize storage for I/O operations by operation type
    io_ops = defaultdict(list)
    
    with otf2.reader.open(trace_file_path) as trace:
        time_resolution = trace.timer_resolution
        # Track ongoing operations by matching_id
        ongoing_ops = dict()
        
        for location, event in trace.events:
            # Handle direct I/O operations
            if isinstance(event, otf2.events.IoOperationBegin):
                ongoing_ops[location, event.matching_id] = {
                    'start_time': event.time,
                    'bytes_requested': event.bytes_request,
                    'op_mode': event.mode
                }
                
            elif isinstance(event, otf2.events.IoOperationComplete):
                if (location, event.matching_id) in ongoing_ops:
                    start_info = ongoing_ops[(location, event.matching_id)]
                    duration = (event.time - start_info['start_time']) / time_resolution

                    assert event.bytes_result == start_info['bytes_requested']
                    bytes_transferred = event.bytes_result
                    
                    if duration > 0:
                        bandwidth = bytes_transferred / duration
                        op_mode = start_info['op_mode']
                        io_ops[op_mode].append({
                            'bytes': bytes_transferred,
                            'duration': duration,
                            'bandwidth': bandwidth
                        })
                    
                    del ongoing_ops[(location, event.matching_id)]
            else:
                pass
            


    # Calculate metrics per operation type
    metrics = {}
    
    for op_mode, operations in io_ops.items():
        if operations:  # Only include operations that actually occurred
            metrics[op_mode] = {
                'count': len(operations),
                'total_bytes': sum(op['bytes'] for op in operations),
                'total_duration': sum(op['duration'] for op in operations),
                'min_bandwidth': min((op['bandwidth'] for op in operations), default=0),
                'max_bandwidth': max((op['bandwidth'] for op in operations), default=0)
            }
            
            # Calculate average bandwidth
            if metrics[op_mode]['total_duration'] > 0:
                metrics[op_mode]['avg_bandwidth'] = \
                    (metrics[op_mode]['total_bytes']) / \
                    (metrics[op_mode]['total_duration'])
            else:
                metrics[op_mode]['avg_bandwidth'] = 0
    return metrics

In [47]:
io_bandwidth_by_operation_type(otf2_trace)

{IoOperationMode.WRITE: {'count': 160,
  'total_bytes': 6802444800,
  'total_duration': 20.864085853457667,
  'min_bandwidth': 141650272.97728464,
  'max_bandwidth': 357241816.45105994,
  'avg_bandwidth': 326036081.70413446},
 IoOperationMode.READ: {'count': 160,
  'total_bytes': 6802444800,
  'total_duration': 12.750237656057056,
  'min_bandwidth': 166246445.67537937,
  'max_bandwidth': 657328608.4881408,
  'avg_bandwidth': 533515137.7957625}}