# Concurrent Dependency Graph

![DAG1](dag1.png)
![DAG2](dag2.png)

## Details

* Nodes are tasks to be executed
* Edges are dependencies between tasks.


## Requirements

* Tasks must execute in dependency order 
     * A must complete before B begins and B
     * C and D must complete before E begins


* Tasks should execute concurrently when possible 
     * After 1 completes, 2 and 3 can execute concurrently
     * After 2 completes, 4 and 5 can execute concurrently (even if 3 hasn’t completed)

## Problem

Implement the following class:

In [None]:
from typing import Set, Dict, Any

class TaskGraph(object):
    def __init__(self, dependencies: Dict[Any, Set[Any]]):
        """Initializes the dependency graph from a dict of each task to its set of dependencies."""
        pass
        
    def executable_tasks(self) -> Set[Any]:
        """Returns the set of all tasks that can execute immediately because they have no dependencies."""
        pass
        
    def task_executed(self, task: Any) -> Set[Any]:
        """Marks a task as executed and returns any _new_ executable tasks in a set."""
        pass
        
    def all_tasks_executed(self) -> bool:
        """Returns true when all tasks have executed."""
        pass

In [None]:
def make_graph():
    return TaskGraph({
        "A": set(),
        "B": {"A"},
        "C": {"B"},
        "D": {"B", "G"},
        "E": {"B", "C", "D"},
        "F": {"E"},
        "G": set(),
    })

In [None]:
dag = make_graph()
assert dag.executable_tasks() == {"A", "G"}

assert dag.task_executed("A") == {"B"}
assert not dag.all_tasks_executed()
assert dag.executable_tasks() == {"B", "G"}

assert dag.task_executed("B") == {"C"}
assert not dag.all_tasks_executed()
assert dag.executable_tasks() == {"C", "G"}

assert dag.task_executed("G") == {"D"}
assert not dag.all_tasks_executed()
assert dag.executable_tasks() == {"C", "D"}

assert dag.task_executed("C") == set()
assert not dag.all_tasks_executed()
assert dag.executable_tasks() == {"D"}

assert dag.task_executed("D") == {"E"}
assert not dag.all_tasks_executed()
assert dag.executable_tasks() == {"E"}

assert dag.task_executed("E") == {"F"}
assert not dag.all_tasks_executed()
assert dag.executable_tasks() == {"F"}

assert dag.task_executed("F") == set()
assert dag.all_tasks_executed()
assert dag.executable_tasks() == set()

## Bonus (time permitting)

In [None]:
import time
import random
import concurrent.futures
import threading

def execute_task(task):
    seconds = random.randint(1, 5)
    print(f"{threading.current_thread().name} {task}: Begin. Waiting {seconds} seconds.")
    time.sleep(seconds)
    print(f"{threading.current_thread().name} {task}: End")
    return task


def execute_dag(graph: TaskGraph, max_workers: int = 2):
    """
    Uses a concurrent.futures.ThreadPoolExecutor to call execute_task for each task in the dependency graph with as much 
    concurrency as possible while still obeying the dependencies.
    """
    pass