In [1]:
import glob
import pandas as pd
import json
import subprocess
import yaml
import numpy as np
import os
import argparse

In [2]:
results_path = "../benchmark-results/"

In [3]:
result_files = glob.glob(results_path + "*/*")

In [4]:
results = []

for file in result_files:
    with open(file) as f:
        data = json.load(f)
    results.append(pd.json_normalize(data))

# Merging all results here

In [5]:
df = pd.concat(results, ignore_index=True)

In [6]:
del results

# Finding DAG file paths

In [7]:
def find_file(target_filename, search_path="."):
    """
    Recursively search for files with the name target_filename starting from search_path.
    
    Parameters:
      search_path (str): The directory to start the search (absolute or relative).
      target_filename (str): The exact name of the file to search for.
      
    Returns:
      Full file path that match the target_filename, or False if
      none was found.
    """

    for root, dirs, files in os.walk(search_path):
        for file in files:
            if target_filename in file:
                return os.path.join(root, file)
    
    print("Error: File not found {}".format(target_filename))
    sys.exit(1)


def file_finder_helper(unique_files, search_path):
    file_paths = {}

    for file_name in unique_files:
        file_path = find_file(file_name, search_path=search_path)
    
        if file_path in list(file_paths.values()):
            print("Error: Path already in dictionary. This is a duplicate: " + file_path)
            sys.exit(1)
        else:
            file_paths[file_name] = file_path

    return file_paths

In [8]:
dag_paths = file_finder_helper(
    df["dag"].unique(), os.getcwd() + "/../dags"
)

In [9]:
df["dag_path"] = df["dag"].map(dag_paths)

In [10]:
# Make sure all dags were found
df[df["dag_path"] == False]

Unnamed: 0,dag,system,scheduler,completed,makespan,exec_time,run_stats.scheduling_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,...,run_stats.max_memory_utilization,run_stats.cpu_utilization,run_stats.memory_utilization,run_stats.used_resource_count,run_stats.cpu_utilization_used,run_stats.memory_utilization_used,run_stats.cpu_utilization_active,run_stats.memory_utilization_active,run_stats.expected_makespan,dag_path


In [11]:
df[df["dag_path"] == ""]

Unnamed: 0,dag,system,scheduler,completed,makespan,exec_time,run_stats.scheduling_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,...,run_stats.max_memory_utilization,run_stats.cpu_utilization,run_stats.memory_utilization,run_stats.used_resource_count,run_stats.cpu_utilization_used,run_stats.memory_utilization_used,run_stats.cpu_utilization_active,run_stats.memory_utilization_active,run_stats.expected_makespan,dag_path


In [12]:
df.head()["dag_path"][0]

'/home/hgsm/dslab-fork/dytas-benchmark/dytas-comparisons/analysis/../dags/daggen/150-373-0.1.yaml'

# Now finding system file paths

In [13]:
system_paths = file_finder_helper(df["system"].unique(), os.getcwd() + "/../systems")

In [14]:
df["system_path"] = df["system"].map(system_paths)

In [15]:
df[df["system_path"] == False]

Unnamed: 0,dag,system,scheduler,completed,makespan,exec_time,run_stats.scheduling_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,...,run_stats.cpu_utilization,run_stats.memory_utilization,run_stats.used_resource_count,run_stats.cpu_utilization_used,run_stats.memory_utilization_used,run_stats.cpu_utilization_active,run_stats.memory_utilization_active,run_stats.expected_makespan,dag_path,system_path


In [16]:
df[df["system_path"] == ""]

Unnamed: 0,dag,system,scheduler,completed,makespan,exec_time,run_stats.scheduling_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,...,run_stats.cpu_utilization,run_stats.memory_utilization,run_stats.used_resource_count,run_stats.cpu_utilization_used,run_stats.memory_utilization_used,run_stats.cpu_utilization_active,run_stats.memory_utilization_active,run_stats.expected_makespan,dag_path,system_path


In [17]:
df.head()["system_path"][0]

'/home/hgsm/dslab-fork/dytas-benchmark/dytas-comparisons/analysis/../systems/16-128/het/cluster-het-16-128-ConstantBandwidth-125-10.yaml'

# Loading each DAG's properties as it would be parsed by DSLab

In [18]:
path_to_dag_export = "/home/hgsm/dslab-fork/target/release/dag-export"

def get_tasks(dag_path):
    result = subprocess.run(
        [
            path_to_dag_export,
            '--dag-path',
            dag_path
        ],
        capture_output=True,
        text=True
    )

    if result.returncode != 0:
        raise RuntimeError(f"Rust executable failed: {result.stderr}")

    response_json = result.stdout.strip()
    tasks = json.loads(response_json)

    return tasks


def load_dags(dag_paths):
    dags = {}
    for path in dag_paths:
        dags[path] = get_tasks(path)

    return dags

In [19]:
cached_dags = load_dags(df.dag_path.unique())

# Measuring the DAGs properties
#### Properties: number of nodes and edges; graph density.


$$
DEN_{Undirected|Acyclic}=\frac{2|E|}{|V|*(|V|-1)}
$$

In [20]:
def calculate_dag_properties(dag_tasks):
    """
    Calculate the number of nodes, edges and density of a DAG.
        
    Each task is considered a node. The 'parents' field in each
    task lists its incoming edges.
    """
    if not dag_tasks:
        return 0, 0, 0  # If no tasks, then nodes, edges and density are all zero.

    num_nodes = len(dag_tasks)

    # For graphs with fewer than 2 nodes, density is not defined (we return 0)
    if num_nodes < 2:
        return num_nodes, 0, 0.0
    
    # Sum up the edges from the length of the "parents" list for each task.
    num_edges = sum(len(task["parents"]) for task in dag_tasks)
    
    # Compute density using the DAG density equation
    # (Actual number of edges vs maximum number of possible edges)
    density = (2 * num_edges) / (num_nodes * (num_nodes - 1))

    return (num_nodes, num_edges, density)

In [21]:
unique_dags = df["dag_path"].unique()

In [22]:
dag_properties = {}
for dag_path in unique_dags:
    properties = calculate_dag_properties(cached_dags[dag_path])
    dag_properties[dag_path] = properties

In [23]:
properties_df = df["dag_path"].map(dag_properties)

In [24]:
df["dag_nodes"] = properties_df.apply(lambda x: x[0]) # node count

In [25]:
df["dag_edges"] = properties_df.apply(lambda x: x[1]) # edge count

In [26]:
df["dag_density"] = properties_df.apply(lambda x: x[2]) # density

In [27]:
df.head()

Unnamed: 0,dag,system,scheduler,completed,makespan,exec_time,run_stats.scheduling_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,...,run_stats.cpu_utilization_used,run_stats.memory_utilization_used,run_stats.cpu_utilization_active,run_stats.memory_utilization_active,run_stats.expected_makespan,dag_path,system_path,dag_nodes,dag_edges,dag_density
0,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.953493,0.025704,0.006658,372855.07982,43908.119552,351.266126,...,0.04675,0.02374,0.069933,0.034966,,/home/hgsm/dslab-fork/dytas-benchmark/dytas-co...,/home/hgsm/dslab-fork/dytas-benchmark/dytas-co...,150,152,0.013602
1,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.956103,0.025953,0.004778,372855.07982,43908.119552,351.276656,...,0.04675,0.02374,0.069933,0.034966,,/home/hgsm/dslab-fork/dytas-benchmark/dytas-co...,/home/hgsm/dslab-fork/dytas-benchmark/dytas-co...,150,152,0.013602
2,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-1000,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.982203,0.025807,0.00475,372855.07982,43908.119552,351.381956,...,0.04675,0.02374,0.069933,0.034966,,/home/hgsm/dslab-fork/dytas-benchmark/dytas-co...,/home/hgsm/dslab-fork/dytas-benchmark/dytas-co...,150,152,0.013602
3,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.159104,0.025793,0.004743,372855.07982,43908.119552,35.127666,...,0.046781,0.023756,0.069971,0.034985,,/home/hgsm/dslab-fork/dytas-benchmark/dytas-co...,/home/hgsm/dslab-fork/dytas-benchmark/dytas-co...,150,152,0.013602
4,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.161714,0.034407,0.0064,372855.07982,43908.119552,35.138196,...,0.046781,0.023756,0.069971,0.034985,,/home/hgsm/dslab-fork/dytas-benchmark/dytas-co...,/home/hgsm/dslab-fork/dytas-benchmark/dytas-co...,150,152,0.013602


In [28]:
df.dag_density.max()

0.4329004329004329

# Getting DAG stats

In [29]:
path_to_dag_stats = "/home/hgsm/dslab-fork/target/release/dag-stats-export"

def get_dag_stats(dag_path):
    result = subprocess.run(
        [
            path_to_dag_stats,
            '--dag-path',
            dag_path
        ],
        capture_output=True,
        text=True
    )

    if result.returncode != 0:
        raise RuntimeError(f"Rust executable failed: {result.stderr}")

    response_json = result.stdout.strip()
    stats = json.loads(response_json)

    return stats


def load_dag_stats(dag_paths):
    dag_stats = {}
    for path in dag_paths:
        dag_stats[path] = get_dag_stats(path)

    return dag_stats

In [30]:
dag_stats_cached = load_dag_stats(df.dag_path.unique())

In [31]:
dag_stats_cached['/home/hgsm/dslab-fork/dytas-benchmark/dytas-comparisons/analysis/../dags/daggen/150-373-0.1.yaml']

{'task_count': 150,
 'max_cores_per_task': 1,
 'total_comp_size': 1670712.9927359996,
 'total_data_size': 55979.32646399999,
 'total_transfers_size': 55979.32646399999,
 'input_data_size': 6000.0,
 'output_data_size': -0.0,
 'max_input_size': 500.0,
 'max_output_size': 0.0,
 'min_max_input_size': 500.0,
 'min_max_output_size': 0.0,
 'comp_transfers_ratio': 29.84517853765937,
 'critical_path_size': 226141.394072,
 'parallelism_degree': 7.3879132106352445,
 'depth': 14,
 'width': 13,
 'max_parallelism': 16,
 'level_profiles': [{'level': 0,
   'task_count': 12,
   'task_comp_size': {'min': 812.663112,
    'max': 31141.552292,
    'sum': 133008.906045,
    'avg': 11084.07550375,
    'std': 8650.957170498905},
   'task_input_size': {'min': 500.0,
    'max': 500.0,
    'sum': 6000.0,
    'avg': 500.0,
    'std': 0.0},
   'task_output_size': {'min': 33.554432,
    'max': 838.8608,
    'sum': 3607.1014400000004,
    'avg': 360.710144,
    'std': 290.1536929528163},
   'predecessor_count': 0,
 

In [32]:
def get_main_stats(x):
    dag_stats = dag_stats_cached[x]

    return [dag_stats["total_comp_size"],
            dag_stats["total_transfers_size"],
           dag_stats["depth"],
            dag_stats["width"],
            dag_stats["parallelism_degree"],
            dag_stats["critical_path_size"]
           ]

dag_stats = pd.Series(df["dag_path"].map(get_main_stats))

In [33]:
dag_stats

0          [1670712.9927359996, 55979.32646399999, 14, 13...
1          [1670712.9927359996, 55979.32646399999, 14, 13...
2          [1670712.9927359996, 55979.32646399999, 14, 13...
3          [1670712.9927359996, 55979.32646399999, 14, 13...
4          [1670712.9927359996, 55979.32646399999, 14, 13...
                                 ...                        
2130619    [150805.00284000006, 79284.90643199999, 4, 51,...
2130620    [150805.00284000006, 79284.90643199999, 4, 51,...
2130621    [150805.00284000006, 79284.90643199999, 4, 51,...
2130622    [150805.00284000006, 79284.90643199999, 4, 51,...
2130623    [150805.00284000006, 79284.90643199999, 4, 51,...
Name: dag_path, Length: 2130624, dtype: object

In [34]:
df["dag_total_comp_size"] = dag_stats.apply(lambda x: x[0])
df["dag_total_transfers_size"] = dag_stats.apply(lambda x: x[1])
df["dag_depth"] = dag_stats.apply(lambda x: x[2])
df["dag_width"] = dag_stats.apply(lambda x: x[3])
df["dag_parallelism_degree"] = dag_stats.apply(lambda x: x[4])
df["dag_critical_path_size"] = dag_stats.apply(lambda x: x[5])

# Calculating the Communication-to-Computation Ratio (CCR)

$$
\text{CCR} = \frac{\frac{\sum_{e \in E} c(e)}{|{E}|}}{\frac{\sum_{v \in V} w(v)}{|V|}}
$$

Where the numerator is the total data transfer cost (in this case in GBytes) and the denominator is the total task computation cost (here in Gflops), for the workflow or DAG.

In [35]:
# This old method is after the DAG has been scheduled, which means it isn't
# ain inherent DAG property
# df["CCR"] = df["run_stats.total_network_time"] / df["run_stats.total_task_time"]

# This way of measuring it assures it is a property inherent of each DAG
# df["CCR"] = df["dag_total_transfers_size"] / df["dag_total_comp_size"]

# Taking into consideration mean transfer size and mean computational size instead
df["CCR"] = (df.dag_total_transfers_size / df.dag_edges) / (df.dag_total_comp_size / df.dag_nodes)

# Calculating Speedup
Sequential execution time to parallel execution time.

$$
\text{Speedup} = 
  \frac{\displaystyle \min_{p_j \in Q}\!\Bigl\{\sum_{n_{i} \in V} \omega_{i,j}\Bigr\}}
       {\text{makespan}}
$$

In [36]:

def calculate_min_task_execution_times(dag_tasks, system):
    """
    For each of the tasks in the dag it will find the
    execution time of the task as if it were being executed
    on the processor that will minimize its executiion time
    the most.
    """
    min_task_execution_times = {}
    
    for task in dag_tasks:
        minimum_execution_time = np.inf
        for node in system['resources']:
            # Depending on how many cores the task
            # requires, and is able
            # to be assigned, this calculation will be
            # modified, since the speed of each node
            # is per core.

            cores_used = min(node["cores"], task["max_cores"])
            execution_time = task["flops"] / (node["speed"] * cores_used)

            minimum_execution_time = min(minimum_execution_time, execution_time)

        min_task_execution_times[task["id"]] = minimum_execution_time

    return min_task_execution_times
    

In [37]:
def load_systems(system_paths):
    systems = {}
    for path in system_paths:
        with open(path) as f:
            system = yaml.safe_load(f)
            
        systems[path] = system

    return systems

In [38]:
cached_systems = load_systems(df.system_path.unique())

In [39]:
# Caching each system-dag pair minimum task execution times
sys_dag_min_task_exec_times = {}

for dag_path, dag_tasks in cached_dags.items():
    for system_path, system in cached_systems.items():
        system_dag_pair = system_path + "_" + dag_path

        try:
            # If value was already calculated, continue.
            value = sys_dag_min_task_exec_times[system_dag_pair]
        except:
            # Otherwise, calculate and store the value
            sys_dag_min_task_exec_times[
                system_dag_pair
            ] = calculate_min_task_execution_times(dag_tasks, system)

In [40]:
df.head()

Unnamed: 0,dag,system,scheduler,completed,makespan,exec_time,run_stats.scheduling_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,...,dag_nodes,dag_edges,dag_density,dag_total_comp_size,dag_total_transfers_size,dag_depth,dag_width,dag_parallelism_degree,dag_critical_path_size,CCR
0,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.953493,0.025704,0.006658,372855.07982,43908.119552,351.266126,...,150,152,0.013602,1670713.0,55979.326464,14,13,7.387913,226141.394072,0.033065
1,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.956103,0.025953,0.004778,372855.07982,43908.119552,351.276656,...,150,152,0.013602,1670713.0,55979.326464,14,13,7.387913,226141.394072,0.033065
2,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-1000,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.982203,0.025807,0.00475,372855.07982,43908.119552,351.381956,...,150,152,0.013602,1670713.0,55979.326464,14,13,7.387913,226141.394072,0.033065
3,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.159104,0.025793,0.004743,372855.07982,43908.119552,35.127666,...,150,152,0.013602,1670713.0,55979.326464,14,13,7.387913,226141.394072,0.033065
4,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.161714,0.034407,0.0064,372855.07982,43908.119552,35.138196,...,150,152,0.013602,1670713.0,55979.326464,14,13,7.387913,226141.394072,0.033065


In [41]:
df["speedup"] = df.apply(
    lambda row:
    sum(sys_dag_min_task_exec_times[
        row["system_path"] + "_" + row["dag_path"]
    ].values()) / row["makespan"],
    axis=1
)

# Calculating Efficiency

$$
\text{Efficiency} = \frac{\text{Speedup}}{\text{No. Processors in the System}}
$$

In [42]:
df["processor_count"] = df["system_path"].apply(
    lambda x: len(cached_systems[x]["resources"])
)

In [43]:
df["processor_count"].head()

0    16
1    16
2    16
3    16
4    16
Name: processor_count, dtype: int64

In [44]:
df["core_count"] = df["system_path"].apply(lambda x: sum([node["cores"] for node in cached_systems[x]["resources"]]))

In [45]:
df["core_count"].head()

0    128
1    128
2    128
3    128
4    128
Name: core_count, dtype: int64

In [46]:
df["efficiency_processors"] = df.speedup / df.processor_count

In [47]:
df["efficiency_cores"] = df.speedup / df.core_count

In [48]:
df.head()

Unnamed: 0,dag,system,scheduler,completed,makespan,exec_time,run_stats.scheduling_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,...,dag_depth,dag_width,dag_parallelism_degree,dag_critical_path_size,CCR,speedup,processor_count,core_count,efficiency_processors,efficiency_cores
0,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.953493,0.025704,0.006658,372855.07982,43908.119552,351.266126,...,14,13,7.387913,226141.394072,0.033065,2.269356,16,128,0.141835,0.017729
1,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.956103,0.025953,0.004778,372855.07982,43908.119552,351.276656,...,14,13,7.387913,226141.394072,0.033065,2.269356,16,128,0.141835,0.017729
2,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-1000,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.982203,0.025807,0.00475,372855.07982,43908.119552,351.381956,...,14,13,7.387913,226141.394072,0.033065,2.269356,16,128,0.141835,0.017729
3,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.159104,0.025793,0.004743,372855.07982,43908.119552,35.127666,...,14,13,7.387913,226141.394072,0.033065,2.270888,16,128,0.141931,0.017741
4,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.161714,0.034407,0.0064,372855.07982,43908.119552,35.138196,...,14,13,7.387913,226141.394072,0.033065,2.270888,16,128,0.141931,0.017741


# Obtaining system network properties

In [49]:
network_cols = [
    "network_bandwidth", 
    "network_latency", 
    "network_model"
]

df[network_cols] = df["system_path"].apply(
    lambda x: pd.Series([
        cached_systems[x]["network"]["bandwidth"],
        cached_systems[x]["network"]["latency"],
        cached_systems[x]["network"]["model"]
    ]
    )
)

In [50]:
df[network_cols].head()

Unnamed: 0,network_bandwidth,network_latency,network_model
0,125,10,ConstantBandwidth
1,125,100,ConstantBandwidth
2,125,1000,ConstantBandwidth
3,1250,10,ConstantBandwidth
4,1250,100,ConstantBandwidth


# Obtaining System Processor Speeds
Will only extract the minimum and the maximum processor speeds

In [51]:
def find_min_proc_speed(x):
    resources = cached_systems[x]["resources"]

    min_ = np.inf

    for processor in resources:
        min_ = min(min_, processor["speed"])

    return min_

df["min_processor_speed"] = df["system_path"].apply(find_min_proc_speed)

In [52]:
df.head()

Unnamed: 0,dag,system,scheduler,completed,makespan,exec_time,run_stats.scheduling_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,...,CCR,speedup,processor_count,core_count,efficiency_processors,efficiency_cores,network_bandwidth,network_latency,network_model,min_processor_speed
0,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.953493,0.025704,0.006658,372855.07982,43908.119552,351.266126,...,0.033065,2.269356,16,128,0.141835,0.017729,125,10,ConstantBandwidth,2.0
1,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.956103,0.025953,0.004778,372855.07982,43908.119552,351.276656,...,0.033065,2.269356,16,128,0.141835,0.017729,125,100,ConstantBandwidth,2.0
2,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-1000,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.982203,0.025807,0.00475,372855.07982,43908.119552,351.381956,...,0.033065,2.269356,16,128,0.141835,0.017729,125,1000,ConstantBandwidth,2.0
3,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.159104,0.025793,0.004743,372855.07982,43908.119552,35.127666,...,0.033065,2.270888,16,128,0.141931,0.017741,1250,10,ConstantBandwidth,2.0
4,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.161714,0.034407,0.0064,372855.07982,43908.119552,35.138196,...,0.033065,2.270888,16,128,0.141931,0.017741,1250,100,ConstantBandwidth,2.0


In [53]:
def find_max_proc_speed(x):
    resources = cached_systems[x]["resources"]

    max_ = -np.inf

    for processor in resources:
        max_ = max(max_, processor["speed"])

    return max_

df["max_processor_speed"] = df["system_path"].apply(find_max_proc_speed)

In [54]:
df.head()

Unnamed: 0,dag,system,scheduler,completed,makespan,exec_time,run_stats.scheduling_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,...,speedup,processor_count,core_count,efficiency_processors,efficiency_cores,network_bandwidth,network_latency,network_model,min_processor_speed,max_processor_speed
0,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.953493,0.025704,0.006658,372855.07982,43908.119552,351.266126,...,2.269356,16,128,0.141835,0.017729,125,10,ConstantBandwidth,2.0,6
1,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.956103,0.025953,0.004778,372855.07982,43908.119552,351.276656,...,2.269356,16,128,0.141835,0.017729,125,100,ConstantBandwidth,2.0,6
2,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-1000,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.982203,0.025807,0.00475,372855.07982,43908.119552,351.381956,...,2.269356,16,128,0.141835,0.017729,125,1000,ConstantBandwidth,2.0,6
3,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.159104,0.025793,0.004743,372855.07982,43908.119552,35.127666,...,2.270888,16,128,0.141931,0.017741,1250,10,ConstantBandwidth,2.0,6
4,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.161714,0.034407,0.0064,372855.07982,43908.119552,35.138196,...,2.270888,16,128,0.141931,0.017741,1250,100,ConstantBandwidth,2.0,6


In [55]:
# cached_systems

# Now computing the SLR
#### Schedule Length Ratio (SLR): A normalized makespan

$$
\mathrm{SLR} = \frac{\mathrm{makespan}}{\sum_{n_i \in CP_{MIN}} \min_{p_j \in Q} \{\omega_{i,j}\}}.
$$

In [56]:
from collections import deque

def find_critical_path(dag_tasks):
    """
    Will find the critical path of the DAG.
    """

    dp = {}
    # Initializing dp for nodes without parents (start nodes).
    # Each value will contain a tuple of:
    # (max total time to this node, predecessor node id)
    for i, task in enumerate(dag_tasks):
        if not task["parents"]:
            dp[i] = (task['min_exec_time'], None)

    # Contains how many parents have been satisfied per node
    indegree = {i : len(task["parents"]) for i,task in enumerate(dag_tasks)}

    # Contains nodes without parents (start nodes)
    queue = deque([i for i, task in enumerate(dag_tasks) if indegree[i] == 0])

    while queue:
        current = queue.popleft()
        current_time = dp[current][0]
        for child in dag_tasks[current]['children']:
            new_time = current_time + dag_tasks[child]["min_exec_time"]

            # We update if this new time is longer than prior one
            if child not in dp or new_time > dp[child][0]:
                dp[child] = (new_time, current)
            indegree[child] -= 1
            if indegree[child] == 0:
                queue.append(child)

    # Task with the max accumulated time (critical path time).
    max_task = max(dp, key=lambda i: dp[i][0])
    max_time = dp[max_task][0]

    # Reconstructing the critical path by backtracking
    critical_path = []
    task = max_task
    while task is not None:
        critical_path.append(task)
        task = dp[task][1]

    critical_path.reverse()
    
    return (critical_path, max_time)

In [57]:
# Caching the critical path for each system dag pair
# (each DAG's critical path will be different
# according to the system it is being executed in)
sys_dag_critical_path = {}
# import time
for dag_path, dag_tasks in cached_dags.items():
    for system_path, system in cached_systems.items():
        system_dag_pair = system_path + "_" + dag_path

        min_task_execution_times = sys_dag_min_task_exec_times[system_dag_pair]
        
        copy_dag_tasks = dag_tasks.copy()
        
            # Map the min execution time to each task in the DAG.
        for i, exec_time in min_task_execution_times.items():
            copy_dag_tasks[i]["min_exec_time"] = exec_time
        
        try:
            # If value was already calculated, continue.
            value = sys_dag_critical_path[system_dag_pair]
        except:
            # Otherwise, calculate and store the value
            sys_dag_critical_path[
                system_dag_pair
            ] = find_critical_path(copy_dag_tasks)

In [58]:
# sys_dag_critical_path

In [59]:
df["SLR"] = df.apply(
    lambda row: 
        row["makespan"] / sys_dag_critical_path[row["system_path"] + "_" + row["dag_path"]][1],
        axis=1
)

In [60]:
df.head()

Unnamed: 0,dag,system,scheduler,completed,makespan,exec_time,run_stats.scheduling_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,...,processor_count,core_count,efficiency_processors,efficiency_cores,network_bandwidth,network_latency,network_model,min_processor_speed,max_processor_speed,SLR
0,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.953493,0.025704,0.006658,372855.07982,43908.119552,351.266126,...,16,128,0.141835,0.017729,125,10,ConstantBandwidth,2.0,6,3.255511
1,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.956103,0.025953,0.004778,372855.07982,43908.119552,351.276656,...,16,128,0.141835,0.017729,125,100,ConstantBandwidth,2.0,6,3.255511
2,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-1000,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122700.982203,0.025807,0.00475,372855.07982,43908.119552,351.381956,...,16,128,0.141835,0.017729,125,1000,ConstantBandwidth,2.0,6,3.255511
3,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.159104,0.025793,0.004743,372855.07982,43908.119552,35.127666,...,16,128,0.141931,0.017741,1250,10,ConstantBandwidth,2.0,6,3.253314
4,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",True,122618.161714,0.034407,0.0064,372855.07982,43908.119552,35.138196,...,16,128,0.141931,0.017741,1250,100,ConstantBandwidth,2.0,6,3.253314


# Applying workflow names

In [61]:
workflows = [
    "blast", "bwa", "1000genome",
    "cycles", "epigenomics", "montage",
    "seismology", "soykb", "srasearch"
]

In [62]:
for workflow in workflows:
    df.loc[df.dag.str.contains(workflow), "workflow"] = workflow

In [63]:
# Assigning "random" to the onees generated
# with daggen
df.loc[df.workflow.isna(), "workflow"] = "random"

In [64]:
df.workflow.head()

0    random
1    random
2    random
3    random
4    random
Name: workflow, dtype: object

# Creating a test id

Each (system, dag) pair with be a test instance.

In [65]:
df["test_id"] = df["system"].astype(str) + "_" + df["dag"].astype(str)

# Homogeneity and Heterogeneity of system

In [66]:
df.loc[df.system.str.contains("het"), "system_type"]  = "heterogeneous"
df.loc[df.system.str.contains("hom"), "system_type"]  = "homogeneous"

# Dropping unneccessary columns

In [67]:
df.columns

Index(['dag', 'system', 'scheduler', 'completed', 'makespan', 'exec_time',
       'run_stats.scheduling_time', 'run_stats.total_task_time',
       'run_stats.total_network_traffic', 'run_stats.total_network_time',
       'run_stats.max_used_cores', 'run_stats.max_used_memory',
       'run_stats.max_cpu_utilization', 'run_stats.max_memory_utilization',
       'run_stats.cpu_utilization', 'run_stats.memory_utilization',
       'run_stats.used_resource_count', 'run_stats.cpu_utilization_used',
       'run_stats.memory_utilization_used', 'run_stats.cpu_utilization_active',
       'run_stats.memory_utilization_active', 'run_stats.expected_makespan',
       'dag_path', 'system_path', 'dag_nodes', 'dag_edges', 'dag_density',
       'dag_total_comp_size', 'dag_total_transfers_size', 'dag_depth',
       'dag_width', 'dag_parallelism_degree', 'dag_critical_path_size', 'CCR',
       'speedup', 'processor_count', 'core_count', 'efficiency_processors',
       'efficiency_cores', 'network_bandwidth'

In [68]:
df_dropped = df.drop(["dag_path", "system_path", "completed", "run_stats.scheduling_time",
        "run_stats.max_used_memory", "run_stats.max_memory_utilization",
        "run_stats.memory_utilization_used","run_stats.memory_utilization_active",
        "run_stats.expected_makespan"], axis=1)

In [69]:
df_dropped.head()

Unnamed: 0,dag,system,scheduler,makespan,exec_time,run_stats.total_task_time,run_stats.total_network_traffic,run_stats.total_network_time,run_stats.max_used_cores,run_stats.max_cpu_utilization,...,efficiency_cores,network_bandwidth,network_latency,network_model,min_processor_speed,max_processor_speed,SLR,workflow,test_id,system_type
0,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",122700.953493,0.025704,372855.07982,43908.119552,351.266126,8,0.062016,...,0.017729,125,10,ConstantBandwidth,2.0,6,3.255511,random,cluster-het-16-128-ConstantBandwidth-125-10_15...,heterogeneous
1,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",122700.956103,0.025953,372855.07982,43908.119552,351.276656,8,0.062016,...,0.017729,125,100,ConstantBandwidth,2.0,6,3.255511,random,cluster-het-16-128-ConstantBandwidth-125-100_1...,heterogeneous
2,150-373-0.1,cluster-het-16-128-ConstantBandwidth-125-1000,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",122700.982203,0.025807,372855.07982,43908.119552,351.381956,8,0.062016,...,0.017729,125,1000,ConstantBandwidth,2.0,6,3.255511,random,cluster-het-16-128-ConstantBandwidth-125-1000_...,heterogeneous
3,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-10,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",122618.159104,0.025793,372855.07982,43908.119552,35.127666,8,0.062016,...,0.017741,1250,10,ConstantBandwidth,2.0,6,3.253314,random,cluster-het-16-128-ConstantBandwidth-1250-10_1...,heterogeneous
4,150-373-0.1,cluster-het-16-128-ConstantBandwidth-1250-100,"DYTAS[navigation=Front,sorting=DFS,multicore=S...",122618.161714,0.034407,372855.07982,43908.119552,35.138196,8,0.062016,...,0.017741,1250,100,ConstantBandwidth,2.0,6,3.253314,random,cluster-het-16-128-ConstantBandwidth-1250-100_...,heterogeneous


In [70]:
base_filename = "pre-processed-metrics-dataframe"

df_dropped.to_parquet(
    "./data/{}.parquet".format(base_filename),
    engine="pyarrow",
    compression="gzip"
)

df_dropped.to_pickle("./data/{}.pkl".format(base_filename))

df_dropped.to_csv("./data/{}.csv".format(base_filename), index=False)