# Network Maximum Parallelism

Experiment modelling maximum parallelism of executors given a DAG of tasks.

This often pops up in computer science / data engineering.

A DAG can represent a "heap" like structure of tasks that can pull the next most important task, respect minimal lineage requirements but allow parallel work to occur. You would have a pool of workers consuming tasks.

The idea is:

> ***How big of a pool do I need given the shape of the DAG?***

This also shows up in project planning, 

> ***Given a Work Breakdown Structure, how many consultants would I need?***

For simplicity we assume each task takes 1 unit of time, each worker is equivalent to each other and can process tasks consistently taking onl 1 unit of time.

If we want to look at the [*Maximum Flow*](https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.flow.maximum_flow.html#networkx.algorithms.flow.maximum_flow) of the DAG as well, we will assume all edges have equal capacity of 1.

# Example

![](./assets/dag.png)

![](./assets/topological.png)

With the widest being `3`.

## Install dependencies

In [1]:
!pip install -U pip -qq

In [2]:
!pip install networkx ipycytoscape -qq

In [3]:
# Need to use `master` instead of PyPI version since 'fcose' algorithm only available there and not published yet.
# pip install https://github.com/plotly/dash-cytoscape/archive/master.zip

## Google Colab Specific Code

In [4]:
try:
    from google.colab import output

    output.enable_custom_widget_manager()
except ModuleNotFoundError:
    pass

## Generate a Random DAG and visualise it

In [5]:
import networkx as nx
import random

G = nx.gnp_random_graph(15, 0.5, directed=True)
DAG = nx.DiGraph(
    [(u, v, {"capacity": 1, "weight": 1}) for (u, v) in G.edges() if u < v]
)

### [Optional] Source and Sink Nodes for looking at Maximum Flow
This next section finds all the roots nodes with no dependencies (*roots*) and all the nodes with no successors (*leaves*). Then we attach an extra node to each so there is a single *source* and *sink* to the DAG.

In [6]:
leaves = [n for n in DAG.nodes if len(list(DAG.successors(n))) == 0]
roots = [n for n in DAG.nodes if len(list(DAG.predecessors(n))) == 0]
DAG.add_nodes_from(["source", "sink"])
DAG.add_edges_from([("source", v, {"tooltip": 10_000}) for v in roots])
DAG.add_edges_from([(u, "sink", {"tooltip": 10_000}) for u in leaves])

for node in DAG.nodes():
    DAG.nodes[node]["label"] = node


from networkx.algorithms.flow import shortest_augmenting_path

flow_value, flow_dict = nx.maximum_flow(
    DAG.copy(), "source", "sink", flow_func=shortest_augmenting_path
)
[(k, v) for k, v in flow_dict.items() if len(v) > 0], flow_value

([(0, {1: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 13: 1, 14: 1}),
  (1, {2: 1, 9: 0, 12: 0}),
  (4, {7: 0, 9: 0, 10: 0, 13: 1}),
  (5, {9: 0, 10: 1, 12: 0}),
  (6, {7: 0, 8: 0, 9: 0, 11: 0, 12: 0, 13: 1}),
  (7, {8: 0, 10: 0, 11: 0, 13: 1}),
  (8, {9: 0, 10: 0, 13: 1}),
  (13, {'sink': 7}),
  (14, {'sink': 1}),
  (2, {3: 0, 10: 0, 11: 0, 13: 1}),
  (9, {10: 0, 11: 0, 12: 0}),
  (12, {13: 0}),
  (3, {4: 0, 5: 0, 7: 0, 9: 0, 11: 0}),
  (10, {13: 1}),
  (11, {12: 0}),
  ('source', {0: 8})],
 8)

In [7]:
import ipycytoscape as cy

cyDAG = cy.CytoscapeWidget()
cyDAG.graph.add_graph_from_networkx(DAG, directed=True)
cyDAG.set_layout(name="dagre")
cyDAG

CytoscapeWidget(cytoscape_layout={'name': 'dagre'}, cytoscape_style=[{'selector': 'node', 'css': {'background-…

## Topological Sort

Get each layer as though it was breadth first sorted.

Find the widest layer.

In [8]:
import json

gens = [sorted(generation) for generation in nx.topological_generations(DAG)]
for gen in gens:
    print(gen)

['source']
[0]
[1, 6, 14]
[2]
[3]
[4, 5]
[7]
[8]
[9]
[10, 11]
[12]
[13]
['sink']


In [9]:
max([len(g) for g in gens])

3