# Nextflow optimizer notebook

The current objective of this notebook is to:
1. Load execution trace of a Nextflow workflow.
2. Extract timing information of the different tasks executed.
3. (Optionnaly) Visualize the extracted information, similarly to what is done in nextflow reports
4. Generate Nextflow config file overriding the process time limit with the worst-case execution time observed.

In future version, it might be useful to maintain a database of process runtimes to better understand how this runtime evolves depending on its parameterization, or depending on the node used to run it.

## 1. Notebook parameters

In [None]:
from pathlib import Path

from nf_trace_db_manager import NextflowTraceDBManager

# Path to the report and dag files.
# nf_report_path = Path("./dat/250515_241226_CELEBI/","karol_241226_ult_2025-05-15_13_41_42")
nf_report_path = Path("./dat/250625_250313_CELEBI_wConfig/","karol_250313_2025-06-25_15_08_44")
#nf_report_path = Path("C:/Users/kdesnos/Desktop/Sandbox/pipelines/","execution_2025-02-14_10-15-33")

# Create the HTML and DAG files
html_report = nf_report_path.with_name(nf_report_path.name + "_report.html")
dag_report = nf_report_path.with_name(nf_report_path.name + "_dag.html")
output_config_file = nf_report_path.with_name(nf_report_path.name + "_config.config")
log_report = nf_report_path.with_name(nf_report_path.name + "_log.log")

# DB Manager 
db_manager = NextflowTraceDBManager("./dat/nf_trace_db.sqlite")

## 2. Load data

### 2.1 Load data from HTML

In [None]:
import extract_trace_from_html as parser

trace_df = parser.extract_trace_data(html_report)

if trace_df is not None:
    print(f"Extracted {trace_df.shape[0]} process execution traces.")

## 3. Display useful info

### 3.1 Process execution time box plot

In [None]:
import visualization as visualizer

name_filter = None # Optionnaly a string can be given to the viewer to display only processes containing this string
                   # Use None if no filter is wanted

visualizer.plot_realtime_boxplot(trace_df, name_filter)

### 3.2 Icicle chart of processes

In [None]:
import visualization as visualizer

visualizer.plot_icicle_chart(trace_df, include_names=True)

### 3.3 Processing times

In [None]:
sum = trace_df['realtime'].sum()
sum_cpu = (trace_df['realtime'] * trace_df['cpus']).sum()

print(f'Sum of all process execution time: {sum}')
print(f'Sum of all (process exec time)*(nb cpu): {sum_cpu}')

### 3.4 Average wait time

In [None]:
import visualization as visualizer

visualizer.plot_wait_times(trace_df)

### 3.5 Memory Utilization Ratio

In [None]:

# Calculate memory utilization ratio (peak_rss / memory)
# Filter out rows where memory is zero or NaN to avoid division by zero
valid_memory_df = trace_df[(trace_df['memory'] > 0) & (~trace_df['memory'].isna())]

# Calculate the ratio for each process execution
memory_ratio = valid_memory_df['peak_rss'] / valid_memory_df['memory']

# Calculate the average ratio
avg_memory_ratio = memory_ratio.mean()

print(f"Average memory utilization ratio (peak_rss / memory): {avg_memory_ratio:.2%}")
print(f"Number of processes with valid memory allocation: {len(valid_memory_df)} out of {len(trace_df)}")



### 3.6 Time Utilization Ratio

In [None]:
# Filter out rows where time is NaN or realtime is non-positive
from pandas import Timedelta

# Define the 60-second buffer as a Timedelta object
buffer_time = Timedelta(seconds=60)

valid_time_df = trace_df[(~trace_df['time'].isna()) & (trace_df['realtime'] > Timedelta(0))]

# Calculate allocated time utilization ratio (realtime / time)
time_ratio = valid_time_df['realtime'] / valid_time_df['time']
avg_time_ratio = time_ratio.mean()

# Calculate predicted time ratio (realtime / (time - 60sec))
# This accounts for the 60-second buffer added to time limits
predicted_time_ratio = valid_time_df['realtime'] / (valid_time_df['time'] - buffer_time)
avg_predicted_ratio = predicted_time_ratio.mean()

# Count how many times realtime < (time - 60sec)
count_under_predicted = (valid_time_df['realtime'] < (valid_time_df['time'] - buffer_time)).sum()
percentage_under_predicted = count_under_predicted / len(valid_time_df) * 100 if len(valid_time_df) > 0 else 0

print(f"Average allocated time utilization ratio (realtime / time): {avg_time_ratio:.2%}")
print(f"Average predicted time ratio (realtime / (time - buffer)): {avg_predicted_ratio:.2%}")
print(f"Number of processes with realtime < (time - buffer): {count_under_predicted} out of {len(valid_time_df)} ({percentage_under_predicted:.2f}%)")
print(f"Number of processes with valid time allocation: {len(valid_time_df)} out of {len(trace_df)}")

# Visualization of time utilization
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))

# Create a boxplot to show distribution of time utilization ratios
plt.subplot(1, 2, 1)
plt.boxplot([time_ratio, predicted_time_ratio], labels=['Allocated Ratio', 'Predicted Ratio'])
plt.ylabel('Ratio Value')
plt.title('Distribution of Time Utilization Ratios')
plt.grid(axis='y', alpha=0.3)

# Create a histogram of time utilization ratio
plt.subplot(1, 2, 2)
plt.hist(time_ratio, bins=20, alpha=0.7, color='skyblue', edgecolor='black', label='Allocated')
plt.hist(predicted_time_ratio, bins=20, alpha=0.5, color='orange', edgecolor='black', label='Predicted')
plt.axvline(avg_time_ratio, color='blue', linestyle='dashed', linewidth=2, label=f'Avg Allocated: {avg_time_ratio:.2%}')
plt.axvline(avg_predicted_ratio, color='red', linestyle='dashed', linewidth=2, label=f'Avg Predicted: {avg_predicted_ratio:.2%}')
plt.xlabel('Time Utilization Ratio')
plt.ylabel('Number of Processes')
plt.title('Distribution of Time Utilization Ratios')
plt.grid(True, alpha=0.3)
plt.legend()

plt.tight_layout()
plt.show()


## 4. Export Config File

In [None]:
import config_file_generator as generator

generator.generate_nextflow_config_from_trace(trace_df, output_config_file)

## 5. Load DAG

### 5.1. Import from Mermaid DAG

In [None]:
import networkx as nx
import import_dag_from_mermaid_html as dag_importer

# Example usage
dag = dag_importer.extract_mermaid_graph(dag_report)

# Check if the graph is a DAG
if not nx.is_directed_acyclic_graph(dag):
    raise ValueError("The extracted graph is not a DAG.")

# Print nodes and edges in a simpler format
print(f"{dag.number_of_nodes()} Nodes:", dag.nodes(data=True))
print(f"{dag.number_of_edges()} Edges:", dag.edges(data=True))

### 5.2. Update with trace info.

In [None]:
dag_importer.add_execution_counts_to_graph(dag, trace_df)

print(f"{dag.number_of_nodes()} Nodes:", dag.nodes(data=True))

## 6. Export DAG

### 6.1. Export as DOT

In [None]:
import export_dag_to_dot as dag_exporter

dag_exporter.export_to_dot(dag, nf_report_path.with_name(nf_report_path.name + "_dag.dot"))

### 6.2 Export as PiSDF

In [None]:
from export_dag_to_pisdf import PreesmExporter
import export_dag_to_dot as dag_exporter

preesm_exporter = PreesmExporter(nf_report_path.parent / "nextflow_toy", db_manager, html_report, dag_report)

dag_exporter.export_to_dot(preesm_exporter.graph, nf_report_path.with_name(nf_report_path.name + "_dag.dot"))
preesm_exporter.export_preesm_project(log_report, {1: 4, 16: 2, 32: 4})