# Workflow Analysis - DDMD

This notebook provides a simplified interface to the workflow analysis using modular Python scripts.

## Overview
The analysis process includes:
1. Loading workflow data from datalife statistics
2. Estimating transfer rates using 4D interpolation from IOR benchmark data
3. Calculating Storage Performance Metrics (SPM) for different storage configurations
4. Generating visualizations and recommendations

In [None]:
# Import required libraries and modules
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
import os

# Import our workflow analysis modules
from modules.workflow_config import DEFAULT_WF, TEST_CONFIGS, STORAGE_LIST
from modules.workflow_data_utils import (
    load_workflow_data, calculate_io_time_breakdown
)
from modules.workflow_interpolation import (
    estimate_transfer_rates_for_workflow, calculate_aggregate_filesize_per_node
)
from modules.workflow_spm_calculator import (
    calculate_spm_for_workflow, filter_storage_options,
    display_top_sorted_averaged_rank, select_best_storage_and_parallelism
)
from modules.workflow_visualization import plot_all_visualizations
from modules.workflow_data_staging import insert_data_staging_rows

print("Modules imported successfully using modules package!")



## Configuration

Set the workflow to analyze and other parameters.

In [None]:
# Configuration
# 1kg ddmd_4n_l
WORKFLOW_NAME = "1kg_2"  # Change this to analyze different workflows 
IOR_DATA_PATH = "../perf_profiles/updated_master_ior_df.csv"

print(f"Analyzing workflow: {WORKFLOW_NAME}")
print(f"Available workflows: {list(TEST_CONFIGS.keys())}")
print(f"IOR data path: {IOR_DATA_PATH}")

ALLOWED_PARALLELISM = TEST_CONFIGS[WORKFLOW_NAME]["ALLOWED_PARALLELISM"]
print(f"Allowed parallelism: {ALLOWED_PARALLELISM}")

## Step 1: Load Workflow Data

Load and process the workflow data from datalife statistics.

In [None]:
# Load workflow data
print("Loading workflow data...")
wf_df, task_order_dict, all_wf_dict = load_workflow_data(WORKFLOW_NAME, debug=False)

print(f"\nWorkflow data loaded:")
print(f"- Total records: {len(wf_df)}")
print(f"- Task definitions: {len(task_order_dict)}")
print(f"- Unique tasks: {list(wf_df['taskName'].unique())}")
print(f"- Stages: {sorted(wf_df['stageOrder'].unique())}")



In [None]:
# Display first few rows
print("\nFirst few rows of workflow data:")
print(wf_df.head())
print(wf_df.columns)

## Step 2: Calculate I/O Time Breakdown

Calculate the I/O time breakdown for each task in the workflow.

In [None]:
# Get configuration for the workflow
config = TEST_CONFIGS[WORKFLOW_NAME]
num_nodes_list = config["NUM_NODES_LIST"]

# Create task name to parallelism mapping
task_name_to_parallelism = {task: info['parallelism'] for task, info in task_order_dict.items()}

# Calculate I/O time breakdown
print("Calculating I/O time breakdown...")
io_breakdown = calculate_io_time_breakdown(wf_df, task_name_to_parallelism, num_nodes_list)

print(f"\nI/O breakdown results:")
for key, value in io_breakdown.items():
    if isinstance(value, dict):
        print(f"{key}:")
        for sub_key, sub_value in value.items():
            print(f"  {sub_key}: {sub_value:.2f} seconds")
    else:
        print(f"{key}: {value:.2f} seconds")

## Step 2.1: Calculate Aggregate File Size per Node

Calculate the aggregate file size per node for proper scaling.

In [None]:
# Calculate aggregate file size per node
print("Calculating aggregate file size per node...")
wf_df = calculate_aggregate_filesize_per_node(wf_df)

print("\nAggregate file size calculation completed.")
print(f"Updated columns: {[col for col in wf_df.columns if 'aggregateFilesizeMB' in col]}")

In [7]:
# # Display rows with taskName include string "stage-"
# staged = insert_data_staging_rows(wf_df)
# print(staged[staged['taskName'].str.startswith('stage_')][['taskName', 'stageOrder', 'operation']])

## Step 2.2: Insert data movement steps to workflow

In [None]:
# Step 2.2: Insert data staging (I/O) rows into the workflow DataFrame

# Set debug=False to see detailed output, or False for silent operation
wf_df = insert_data_staging_rows(wf_df, debug=False)

print("Data staging rows inserted. New DataFrame shape:", wf_df.shape)
display(wf_df.head(10))

# save the updated wf_df to a csv file
wf_df.to_csv(f"./analysis_data/{WORKFLOW_NAME}_wf_df_with_staging.csv", index=False)

In [None]:
saved_df = pd.read_csv(f"./analysis_data/{WORKFLOW_NAME}_wf_df_with_staging.csv")
saved_none_rows = saved_df[saved_df['operation'] == 'none']
print(f"\nSaved file has {len(saved_none_rows)} rows with operation='none'")

In [None]:
# Display first few rows
print("\nFirst few rows of workflow data:")
print(wf_df.head())
print(wf_df.columns)
print(wf_df[wf_df['operation'] == 0]['taskName'].unique())

## Step 3: Load IOR Benchmark Data and Estimate Transfer Rates

Load the IOR benchmark data and estimate transfer rates for different storage configurations.

In [None]:
# Load IOR benchmark data
print("Loading IOR benchmark data...")
if os.path.exists(IOR_DATA_PATH):
    df_ior = pd.read_csv(IOR_DATA_PATH)
    print(f"Loaded {len(df_ior)} IOR benchmark records")

    # Estimate transfer rates
    print("\nEstimating transfer rates...")
    cp_scp_parallelism = set(wf_df.loc[wf_df['operation'].isin(['cp', 'scp']), 'parallelism'].unique())
    ALLOWED_PARALLELISM = sorted(set(ALLOWED_PARALLELISM).union(cp_scp_parallelism))

    # Then call the function:
    wf_df = estimate_transfer_rates_for_workflow(
        wf_df, df_ior, STORAGE_LIST, ALLOWED_PARALLELISM, multi_nodes=True, debug=False
    )
    # wf_df = estimate_transfer_rates_for_workflow(wf_df, df_ior, STORAGE_LIST, ALLOWED_PARALLELISM)
    print("Transfer rate estimation completed")
    
    # Show estimated transfer rate columns
    estimated_cols = [col for col in wf_df.columns if col.startswith('estimated_trMiB_')]
    print(f"\nEstimated transfer rate columns: {len(estimated_cols)}")
    print(f"Sample columns: {estimated_cols}")
else:
    print(f"Warning: IOR data file not found at {IOR_DATA_PATH}")
    print("Skipping transfer rate estimation...")
    df_ior = pd.DataFrame()

# Save the estimated transfer rates to a new CSV file
os.makedirs("./analysis_data", exist_ok=True)
wf_df.to_csv(f"./analysis_data/{WORKFLOW_NAME}_estimated_transfer_rates.csv", index=True)
print(f"Saved estimated transfer rates to: ./analysis_data/{WORKFLOW_NAME}_estimated_transfer_rates.csv")

In [12]:
# Show all rows
pd.set_option('display.max_rows', None)
# Show all columns
pd.set_option('display.max_columns', None)
# Show full width
pd.set_option('display.width', None)
# Don't truncate column content
pd.set_option('display.max_colwidth', None)


## Step 5: Calculate SPM Values

Calculate Storage Performance Metrics (SPM) for the workflow.

In [None]:
# Calculate SPM values
print("Calculating SPM values...")
# Set debug=False for verbose output, debug=False for minimal output
spm_results = calculate_spm_for_workflow(wf_df, debug=True)

print(f"\nSPM calculation completed:")
print(f"- Producer-consumer pairs: {len(spm_results)}")
for pair in spm_results.keys():
    print(f"  - {pair}")

In [None]:
# printe all spm results with producer : consumer pairs

# Print weighted SPM values for debugging
for pair, data in spm_results.items():
    # # only print producer and consumer that are not stage_in and stage_out
    # if 'stage_in' in pair or 'stage_out' in pair:
    #     continue

    print(f"\nProducer-Consumer Pair: {pair}")
    print("SPM:")
    for storage_n, spm_values in data['SPM'].items():
        print(f"  {storage_n}: {spm_values[0:10]} ...")
    print("estT_prod:")
    for storage_n, estT_prod_values in data['estT_prod'].items():
        print(f"  {storage_n}: {estT_prod_values[0:10]} ...")
    print("estT_cons:")
    for storage_n, estT_cons_values in data['estT_cons'].items():
        print(f"  {storage_n}: {estT_cons_values[0:10]} ...")
    print("dsize_prod:")
    for storage_n, dsize_prod_values in data['dsize_prod'].items():
        print(f"  {storage_n}: {dsize_prod_values[0:10]} ...")
    print("dsize_cons:")
    for storage_n, dsize_cons_values in data['dsize_cons'].items():
        print(f"  {storage_n}: {dsize_cons_values[0:10]} ...")

## Step 6: Filter Storage Options and Select Best Configuration

Filter storage options and select the best storage configuration for each producer-consumer pair.

In [None]:
# Filter storage options
print("Filtering storage options...")
filtered_spm_results = filter_storage_options(spm_results, WORKFLOW_NAME)

# Select best storage and parallelism
print("\nSelecting best storage and parallelism...")
best_results = select_best_storage_and_parallelism(spm_results, baseline=0)

print("\nBest storage configurations:")
for pair, config in best_results.items():
    print(f"{pair}:")
    print(f"  Best storage: {config['best_storage_type']}")
    print(f"  Best parallelism: {config['best_parallelism']}")
    print(f"  Best rank: {config['best_rank']:.2f}")

## Step 7: Display Top Results

Display the top storage configurations based on rank values.

In [None]:
# Display top results
print("Displaying top results...")
display_top_sorted_averaged_rank(spm_results, top_n=40)

## Step 8: Generate Visualizations

Generate comprehensive visualizations of the analysis results.

In [17]:
# # Generate visualizations
# print("Generating visualizations...")
# plot_all_visualizations(wf_df, best_results, io_breakdown['task_io_time_adjust'])
# print("Visualizations completed!")

## Step 9: Save Results

Save the analysis results to files for future reference.

In [None]:
#FIXME Step 9: Debug and save filtered SPM results
import os
import pandas as pd
import numpy as np
from modules.workflow_results_exporter import extract_producer_consumer_results, print_storage_analysis

# Debug: Check what's in filtered_spm_results
print("=== Debugging filtered_spm_results ===")
print(f"Type: {type(filtered_spm_results)}")
print(f"Length: {len(filtered_spm_results) if filtered_spm_results else 'None/Empty'}")

if filtered_spm_results:
    print(f"Keys: {list(filtered_spm_results.keys())[:5]}...")  # First 5 keys
    
    # Examine first item structure
    first_key = list(filtered_spm_results.keys())[0]
    first_value = filtered_spm_results[first_key]
    print(f"\nFirst item - Key: '{first_key}'")
    print(f"  Type: {type(first_value)}")
    if isinstance(first_value, dict):
        print(f"  Keys: {list(first_value.keys())}")
        for k, v in first_value.items():
            print(f"    {k}: {type(v)} = {v}")

# Try to extract results
print("\n=== Attempting to Extract Results ===")
try:
    results_df = extract_producer_consumer_results(filtered_spm_results, wf_df)
    print(f"Extracted {len(results_df)} rows")
    
    if not results_df.empty:
        print("Sample data:")
        print(results_df.head())
        
        # Save to CSV
        output_dir = "workflow_spm_results"
        os.makedirs(output_dir, exist_ok=True)
        workflow_name = "ddmd_4n_l"  # or your workflow name
        csv_filename = f"{workflow_name}_filtered_spm_results.csv"
        csv_path = os.path.join(output_dir, csv_filename)
        
        results_df.to_csv(csv_path, index=False)
        print(f"✓ Saved to: {csv_path}")
        
        # Print storage analysis
        print_storage_analysis(results_df)
        
    else:
        print("Error: Extracted DataFrame is empty - trying alternative method...")
        
        # Alternative extraction method
        results_data = []
        for pair, data in filtered_spm_results.items():
            if isinstance(data, dict):
                # Try to find storage and SPM information
                storage_info = None
                spm_value = None
                
                # Look for storage-related keys
                for key in data.keys():
                    if 'storage' in key.lower() or 'type' in key.lower():
                        storage_info = data[key]
                    if 'spm' in key.lower() or 'rank' in key.lower():
                        spm_value = data[key]
                
                if storage_info:
                    producer, consumer = pair.split(':') if ':' in pair else ('unknown', 'unknown')
                    results_data.append({
                        'producer': producer,
                        'producerStage': -1,
                        'consumer': consumer,
                        'consumerStage': -1,
                        'prodParallelism': np.nan,
                        'consParallelism': np.nan,
                        'p-c-Storage': storage_info,
                        'p-c-SPM': spm_value if spm_value else np.nan
                    })
        
        if results_data:
            alt_df = pd.DataFrame(results_data)
            
            # Fill stage information
            task_stage_mapping = {}
            for _, row in wf_df.iterrows():
                task_name = row['taskName']
                stage_order = row['stageOrder']
                if task_name not in task_stage_mapping:
                    task_stage_mapping[task_name] = stage_order
            
            for i, row in alt_df.iterrows():
                if row['producer'] in task_stage_mapping:
                    alt_df.at[i, 'producerStage'] = task_stage_mapping[row['producer']]
                if row['consumer'] in task_stage_mapping:
                    alt_df.at[i, 'consumerStage'] = task_stage_mapping[row['consumer']]
            
            # Save alternative results
            output_dir = "workflow_spm_results"
            os.makedirs(output_dir, exist_ok=True)
            workflow_name = "ddmd_4n_l"
            csv_filename = f"{workflow_name}_filtered_spm_results_alt.csv"
            csv_path = os.path.join(output_dir, csv_filename)
            
            alt_df.to_csv(csv_path, index=False)
            print(f"✓ Saved alternative results to: {csv_path}")
            print(f"Alternative DataFrame shape: {alt_df.shape}")
            print(alt_df.head())
        else:
            print("Error: No data could be extracted with alternative method")
            
except Exception as e:
    print(f"Error: {e}")
    import traceback
    traceback.print_exc()

## Summary

The workflow analysis has been completed successfully. The results include:

1. **Workflow Data**: Processed datalife statistics organized in a DataFrame
2. **I/O Breakdown**: Time analysis for each task in the workflow
3. **Transfer Rate Estimates**: Estimated transfer rates for different storage configurations
4. **SPM Values**: Storage Performance Metrics for producer-consumer pairs
5. **Best Configurations**: Recommended storage and parallelism settings
6. **Visualizations**: Comprehensive plots and charts
7. **Saved Results**: CSV and JSON files for future reference

### Key Findings

The analysis provides insights into:
- Which storage types perform best for each workflow stage
- Optimal parallelism levels for different storage configurations
- I/O bottlenecks and performance characteristics
- Recommendations for storage selection

### Next Steps

You can:
- Analyze different workflows by changing the `WORKFLOW_NAME` variable
- Modify the analysis parameters in the configuration
- Use the saved results for further analysis or comparison
- Run the analysis programmatically using the `workflow_analysis_main.py` script