<a href="https://colab.research.google.com/github/paridhika/DDL/blob/main/DAG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import csv
import re

class DAGNode:
    def __init__(self, task_name, job_name, start_time, end_time, status,cpu_avg,mem_avg,instance_num ):
        self.task_name = task_name
        self.job_name = job_name
        self.start_time = start_time
        self.end_time = end_time
        self.status = status
        self.dependencies = []
        self.children = []  # New attribute to store directly connected children
        self.cpu_avg = cpu_avg
        self.mem_avg = mem_avg
        self.instance_num  = instance_num
        self.indegree = 0
        self.outdegree = 0

    def add_dependency(self, dependency_list):
        self.dependencies = dependency_list

    def add_child(self, child_node):
        self.children.append(child_node)

    def increment_indegree(self):
        self.indegree += 1

    def increment_outdegree(self):
        self.outdegree += 1

    def cumulative_lower_duration(self, total_duration):
        self.cumulative_lower_duration = total_duration

    def set_cumulative_upper_duration(self, cumulative_duration):
        self.cumulative_upper_duration = cumulative_duration

class DAGJob:
    def __init__(self, job_name):
        self.job_name = job_name
        self.nodes = []
        self.dependency_matrix = None  # Will be populated during processing
        self.updated_weight_matrix =  None
        self.job_start_time = 0
        self.start_task = 1

    def add_node(self, node):
        self.nodes.append(node)



    def generate_dependency_matrix(self):
      num_nodes = len(self.nodes)
      self.dependency_matrix = [[0] * num_nodes for _ in range(num_nodes)]
      self.updated_weight_matrix = [[0.0] * num_nodes for _ in range(num_nodes)]

      for node in self.nodes:
          if node.dependencies != None:
              for dependency_task in node.dependencies:
                  # print(dependency_task)
                  if len(self.dependency_matrix) < int(dependency_task):
                      print(len(self.dependency_matrix))
                  if len(self.dependency_matrix) < int(node.task_name):
                      print(node.task_name)
                  self.dependency_matrix[int(dependency_task)-1][int(node.task_name)-1] = 1

    def display_nodes(self):
        for node in self.nodes:
            print(f"Task: {node.task_name}, Job: {node.job_name}, Start Time: {node.start_time}, End Time: {node.end_time}, Status: {node.status}")

    def display_dependency_matrix(self):
        for row in self.dependency_matrix:
            print(row)

    def has_all_nodes_connected(self):
        if self.dependency_matrix == None:
            return False
        for i in range(len(self.dependency_matrix)):
            if sum(self.dependency_matrix[i]) == 0 and sum(self.dependency_matrix[j][i] for j in range(len(self.dependency_matrix))) == 0:
                # If a node has no incoming or outgoing edges, return False
                return False
        return True

    def update_start_times_based_on_dependencies(self):
        # Create a dictionary to map task names to nodes
        task_name_to_node = {node.task_name: node for node in self.nodes}

        for node in self.nodes:
            # Create a list of tasks with matching task names
            matching_tasks = [task for task in self.nodes if int(task.task_name) in map(int, node.dependencies)]
            # print(len(matching_tasks))
            if matching_tasks:
                max_end_time = max(int(task.end_time) for task in matching_tasks if task.end_time.isdigit())
                # print(max_end_time)
                # Check if node's start_time is a valid float
                if node.start_time.isdigit():
                    node_start_time = int(node.start_time)

                    # Update start_time only if it's less than the max_end_time
                    if node_start_time < max_end_time:
                        node.start_time = str(max_end_time)

    def generate_directly_connected_children(self):
        for node in self.nodes:
            for dependency_task in node.dependencies:
                parent_node = next((n for n in self.nodes if int(n.task_name) == int(dependency_task)), None)
                if parent_node:
                    parent_node.add_child(node)


    def generate_updated_weight_matrix(self):
        num_nodes = len(self.nodes)
        self.updated_weight_matrix = [[0.0] * num_nodes for _ in range(num_nodes)]

        for node in self.nodes:
            for dependency_task in node.dependencies:
                parent_task = next((parent for parent in self.nodes if int(parent.task_name) == int(dependency_task)), None)
                if parent_task:
                    parent_task_index = int(parent_task.task_name) - 1
                    node_index = int(node.task_name) - 1
                    self.updated_weight_matrix[parent_task_index][node_index] = int(node.start_time) - int(parent_task.end_time)

    def calculate_indegree_outdegree(self):
        for node in self.nodes:
            for dependency_task in node.dependencies:
                parent_task = next((parent for parent in self.nodes if int(parent.task_name) == int(dependency_task)), None)
                if parent_task:
                    parent_task.increment_outdegree()
                    node.increment_indegree()

    def compute_lower_sum(self):
        visited = set()

        def dfs_recursive_lower(current_node):
            nonlocal visited

            visited.add(current_node)

            lower_sum = 0

            for child_task in current_node.children:
                # child_node = next((n for n in sorted_nodes_indegree if int(n.task_name) == int(child_task)), None)
                child_node = child_task
                if child_node and child_node not in visited:
                    lower_sum += dfs_recursive_lower(child_node) + int(child_node.duration)
                elif child_node and child_node in visited:
                    lower_sum += child_node.cumulative_lower_duration + int(child_node.duration)
            current_node.cumulative_lower_duration = lower_sum
            return lower_sum
        # Sort nodes based on indegree in decreasing order and then taskname in decreasing order
        sorted_nodes_indegree = sorted(self.nodes, key=lambda node: node.indegree, reverse=True)

        # Start DFS from each node with in-degree 0
        for node in sorted_nodes_indegree:
            if node not in visited:
                dfs_recursive_lower(node)


    def compute_upper_sum(self):
        visited = set()

        def dfs_recursive_upper(current_node):
            nonlocal visited

            visited.add(current_node)

            upper_sum = 0

            for parent_task in current_node.dependencies:
                parent_node = next((n for n in sorted_nodes_indegree if int(n.task_name) == int(parent_task)), None)
                if parent_node and parent_node not in visited:
                    upper_sum += dfs_recursive_upper(parent_node) + int(parent_node.duration)
                elif parent_node and parent_node in visited:
                    upper_sum += parent_node.cumulative_upper_duration  + int(parent_node.duration)
            current_node.cumulative_upper_duration = upper_sum

            return upper_sum

        # Sort nodes based on outdegree
        sorted_nodes_indegree = sorted(self.nodes, key=lambda node: node.indegree, reverse=True)

        # Start DFS from each node with out-degree 0
        for node in sorted_nodes_indegree:
            if node not in visited:
                dfs_recursive_upper(node)

def create_dags_from_csv(file_path):
    dag_jobs = {}

    with open(file_path, 'r', encoding='utf-8-sig') as csvfile:

        reader = csv.DictReader(csvfile)

        for row in reader:
            # print(row)
            job_name = row['job_name']
            task_name = row['task_name']
            if task_name.startswith("task_"):
                continue
            start_time = row['start_time']
            end_time = row['end_time']
            status = row['status']
            cpu_avg=row['plan_cpu']
            mem_avg=row['plan_mem']
            instance_num = row['instance_num']

            # Extract the first number using regular expression
            match = re.search(r'\d+', task_name)

            if match:
                first_number = int(match.group())

            node = DAGNode(first_number, job_name, start_time, end_time, status,cpu_avg,mem_avg,instance_num )

            if job_name not in dag_jobs:
                dag_jobs[job_name] = DAGJob(job_name)
                dag_jobs[job_name].job_start_time = start_time
                dag_jobs[job_name].start_task = first_number
            existing_node = next((n for n in dag_jobs[job_name].nodes if n.task_name == node.task_name), None)
            if int(start_time) < int(dag_jobs[job_name].job_start_time):
                dag_jobs[job_name].job_start_time = start_time
            if first_number < int(dag_jobs[job_name].start_task):
                dag_jobs[job_name].start_task = first_number
            if existing_node:
                print("existing")
                # Merge by updating start_time and end_time
                existing_node.start_time = min(existing_node.start_time, node.start_time)
                existing_node.end_time = max(existing_node.end_time, node.end_time)
            else:
                dag_jobs[job_name].add_node(node)

            # Extract dependencies from task_name
            dependencies = [part for part in task_name.split('_') if part.isdigit()]
            node.add_dependency(dependencies)

    return dag_jobs


# Example usage:
csv_file_path = 'tasks_3_to_20.csv'
dag_jobs = create_dags_from_csv(csv_file_path)



In [2]:
# Remove jobs which does not have dag structure
jobs_to_remove = [job_name for job_name, dag_job in dag_jobs.items() if int(dag_job.start_task) != 1]

for job_name in jobs_to_remove:
    del dag_jobs[job_name]

jobs_to_remove = []
for job_name, dag_job in dag_jobs.items():
    max_task_name = max(int(node.task_name) for node in dag_job.nodes)
    if max_task_name > len(dag_job.nodes):
        jobs_to_remove.append(job_name)

for job_name in jobs_to_remove:
    del dag_jobs[job_name]

In [3]:
for job_name, dag_job in dag_jobs.items():
    dag_job.generate_directly_connected_children()
    dag_job.generate_dependency_matrix()
    dag_job.update_start_times_based_on_dependencies()
    dag_job.generate_updated_weight_matrix()
    dag_job.calculate_indegree_outdegree()

In [5]:
for job_name, dag_job in dag_jobs.items():
    for node in dag_job.nodes:
        node.duration = str(int(node.end_time) - int(node.start_time))
        node.start_time = str(int(node.start_time) - int(dag_job.job_start_time))
        node.end_time = str(int(node.end_time) - int(dag_job.job_start_time))

In [6]:
connected_jobs = {}
disconnected_jobs = {}

for job_name, dag_job in dag_jobs.items():
    if dag_job.has_all_nodes_connected():
        connected_jobs[job_name] = dag_job
    else:
        disconnected_jobs[job_name] = dag_job


In [15]:
for job_name, dag_job in connected_jobs.items():
    dag_job.compute_lower_sum()
    dag_job.compute_upper_sum()


In [None]:
print(len(connected_jobs))
import os
# Ensure the output directory exists
dot_output_directory = "./connected_dot"
os.makedirs(dot_output_directory, exist_ok=True)

for key,values in connected_jobs.items():
    name = []

    for node in values.nodes:
        name.append(node.task_name)
    dot_content = matrix_to_dot(values.dependency_matrix,name,values.updated_weight_matrix,values.nodes)
    # Save DOT file
    dot_file_path = os.path.join(dot_output_directory, key + ".dot")
    with open(dot_file_path, "w") as dot_file:
        dot_file.write(dot_content)

In [None]:
import os
from graphviz import Source

# Directory containing DOT files
dot_directory = "./connected_dot"

# Output directory for PNG images
png_output_directory = "./connected_3-20"

# Ensure the output directory exists
os.makedirs(png_output_directory, exist_ok=True)

# Iterate over DOT files in the directory
for dot_filename in os.listdir(dot_directory):
    if dot_filename.endswith(".dot"):
        # Construct the full paths for DOT and PNG files
        dot_file_path = os.path.join(dot_directory, dot_filename)
        png_output_path = os.path.join(png_output_directory, os.path.splitext(dot_filename)[0])

        # Read the DOT file
        source = Source.from_file(dot_file_path, format="png")

        # Save the PNG image
        source.render(png_output_path, format="png", cleanup=True)

        # print(f"PNG image created: {png_output_path}")
        os.remove(dot_file_path)



In [None]:
# !pip install graphviz
import os
from graphviz import Digraph

def create_dot_file(dag, output_file_path):
    dot = Digraph(comment='Directed Acyclic Graph', format='png')

    num_nodes = len(dag.nodes)
    # Add nodes
    for node in dag.nodes:
        name = str(node.task_name) if node is not None else ""
        start_time = "" if node.start_time is None else f'\nStart Time: {node.start_time}'
        end_time = "" if node.end_time is None else f'\nEnd Time: {node.end_time}'
        duration = "" if node.duration is None else f'\nDuration: {node.duration}'
        upper_sum = "" if node.cumulative_upper_duration is None else f'\nUpper: {node.cumulative_upper_duration}'
        lower_sum = "" if node.cumulative_lower_duration is None else f'\nLower: {node.cumulative_lower_duration}'
        instance_num = "" if node.instance_num is None else f'\n#instances: {node.instance_num}'
        cpu = "" if node.cpu_avg is None else f'\nCPU: {node.cpu_avg}'
        dot.node(str(name), label=f'{start_time}{duration}{upper_sum}{lower_sum}{cpu}{instance_num}')

    # Add edges
    for row in range(num_nodes):
        for column in range(num_nodes):
            if dag.dependency_matrix[row][column] == 0:
                continue
            weight = "" if dag.updated_weight_matrix is None else f'{dag.updated_weight_matrix[row][column]}'
            dot.edge(str(row + 1), str(column + 1), label=f'{weight}')

    dot.render(output_file_path, view=True)

# Ensure the output directory exists
dot_output_directory = "./connected_dot/"
os.makedirs(dot_output_directory, exist_ok=True)

for key, dag in connected_jobs.items():
    create_dot_file(dag, os.path.join(dot_output_directory, key + ".gv"))

def matrix_to_dot(matrix, node_labels=None, edge_weights=None,task_node=None):
    """
    Convert an adjacency matrix with weights to Graphviz DOT format.

    Parameters:
    - matrix: List of lists representing the adjacency matrix.
    - node_labels: Optional list of node labels.
    - edge_weights: Optional list of lists representing edge weights.

    Returns:
    - dot_content: String containing the DOT representation.
    """
    num_nodes = len(matrix)

    # Create DOT header
    dot_content = "digraph MyDAG {\n"

    # Add nodes
    for node in range(num_nodes):
        label = str(node + 1) if node_labels is None else str(node_labels[node])
        start_time = "" if task_node[node].start_time is None else f'\nStart Time: {task_node[node].start_time}'
        end_time = "" if task_node[node].end_time is None else f'\nEnd Time: {task_node[node].end_time}'
        duration = "" if task_node[node].duration is None else f'\nDuration: {task_node[node].duration}'
        upper_sum = "" if task_node[node].cumulative_upper_duration is None else f'\nUpper: {task_node[node].cumulative_upper_duration}'
        lower_sum = "" if task_node[node].cumulative_lower_duration is None else f'\nLower: {task_node[node].cumulative_lower_duration}'
        instance_num = "" if task_node[node].instance_num is None else f'\n#instances: {task_node[node].instance_num}'
        cpu = "" if task_node[node].cpu_avg is None else f'\nCPU: {task_node[node].cpu_avg}'
        dot_content += f'  {str(node_labels[node])} [label="{label}{start_time}{duration}{upper_sum}{lower_sum}{cpu}"];\n'

    # for node in range(num_nodes):
    #     label = str(node + 1) if node_labels is None else str(node_labels[node])
    #     dot_content += f'  {node + 1} [label="{label}"];\n'

    # Add edges with weights
    for row in range(num_nodes):
        for column in range(num_nodes):
          if matrix[row][column] != 1:
            continue
          weight = "" if edge_weights is None else f' [label="{edge_weights[row][column]}"]'
          dot_content += f'  {row + 1} -> {column + 1}{weight};\n'

    # Close DOT file
    dot_content += "}"

    return dot_content


In [None]:
print(len(disconnected_jobs))

# Ensure the output directory exists
dot_output_directory = "./disconnected_dot"
os.makedirs(dot_output_directory, exist_ok=True)
for key,values in disconnected_jobs.items():
    name = []

    for node in values.nodes:
        name.append(node.task_name)
    dot_content = matrix_to_dot(values.dependency_matrix,name,values.updated_weight_matrix,values.nodes)
    # Save DOT file
    dot_file_path = os.path.join(dot_output_directory, key + ".dot")
    with open(dot_file_path, "w") as dot_file:
        dot_file.write(dot_content)

In [None]:
dot_directory = "./disconnected_dot"

# Output directory for PNG images
png_output_directory = "./disconnected_3-20"

# Ensure the output directory exists
os.makedirs(png_output_directory, exist_ok=True)

# Iterate over DOT files in the directory
for dot_filename in os.listdir(dot_directory):
    if dot_filename.endswith(".dot"):
        # Construct the full paths for DOT and PNG files
        dot_file_path = os.path.join(dot_directory, dot_filename)
        png_output_path = os.path.join(png_output_directory, os.path.splitext(dot_filename)[0])

        # Read the DOT file
        source = Source.from_file(dot_file_path, format="png")

        # Save the PNG image
        source.render(png_output_path, format="png", cleanup=True)

        # print(f"PNG image created: {png_output_path}")
        os.remove(dot_file_path)

In [None]:
import shutil

def zip_folder(folder_path, zip_path):
    shutil.make_archive(zip_path, 'zip', folder_path)
    shutil.rmtree(folder_path)

# Replace 'path_to_folder' and 'path_to_zipfile' with your actual folder and desired zip file paths
folder_path = '/content/connected_3-20'
zip_path = '/content/output_connected_3-20'

zip_folder(folder_path, zip_path)

folder_path = '/content/disconnected_3-20'
zip_path = '/content/output_disconnected_3-20'

zip_folder(folder_path, zip_path)