Skip to content

Commit

Permalink
refactor: move visualization code
Browse files Browse the repository at this point in the history
  • Loading branch information
sbrugman committed Nov 24, 2021
1 parent 6781459 commit 8efc072
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 72 deletions.
63 changes: 0 additions & 63 deletions popmon/base/pipeline.py
Expand Up @@ -17,10 +17,8 @@
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

import json
import logging
from abc import ABC
from pathlib import Path


class Pipeline(ABC):
Expand Down Expand Up @@ -73,64 +71,3 @@ def transform(self, datastore):
else:
datastore = module._transform(datastore)
return datastore

def visualize(self, versioned=True, funcs=None, dsets=None):
if dsets is None:
dsets = {}
if funcs is None:
funcs = {}

modules = []
for module in self.modules:
name = module.__class__.__name__
if isinstance(module, Pipeline):
modules.append(module.visualize(versioned, funcs, dsets))
else:
in_keys = module.get_inputs()

if versioned:
new_ins = {}
for k, in_key in in_keys.items():
if in_key not in dsets:
dsets[in_key] = 1
in_key += f" (v{dsets[in_key]})"
new_ins[k] = in_key
in_keys = new_ins

out_keys = module.get_outputs()
if versioned:
new_outs = {}
for k, out_key in out_keys.items():
if out_key in dsets:
dsets[out_key] += 1
else:
dsets[out_key] = 1
out_key += f" (v{dsets[out_key]})"
new_outs[k] = out_key
out_keys = new_outs

self.logger.debug(f"{name}(inputs={in_keys}, outputs={out_keys})")

# add unique id
if name not in funcs:
funcs[name] = {}
if id(module) not in funcs[name]:
funcs[name][id(module)] = len(funcs[name]) + 1

modules.append(
{
"type": "module",
"name": f"{name}",
"i": f"{funcs[name][id(module)]}",
"desc": module.get_description(),
"in": in_keys,
"out": out_keys,
}
)
data = {"type": "subgraph", "name": self.__class__.__name__, "modules": modules}
return data

def to_json(self, file_name, versioned=True):
d = self.visualize(versioned=versioned)
data = json.dumps(d, indent=4, sort_keys=True)
Path(file_name).write_text(data)
81 changes: 72 additions & 9 deletions tools/pipeline_viz.py
Expand Up @@ -4,6 +4,70 @@

import pygraphviz as pgv

from popmon.base import Pipeline


def serialize_module(module, versioned, funcs, dsets):
in_keys = module.get_inputs()
name = module.__class__.__name__

if versioned:
new_ins = {}
for k, in_key in in_keys.items():
if in_key not in dsets:
dsets[in_key] = 1
in_key += f" (v{dsets[in_key]})"
new_ins[k] = in_key
in_keys = new_ins

out_keys = module.get_outputs()
if versioned:
new_outs = {}
for k, out_key in out_keys.items():
if out_key in dsets:
dsets[out_key] += 1
else:
dsets[out_key] = 1
out_key += f" (v{dsets[out_key]})"
new_outs[k] = out_key
out_keys = new_outs

# add unique id
if name not in funcs:
funcs[name] = {}
if id(module) not in funcs[name]:
funcs[name][id(module)] = len(funcs[name]) + 1

return {
"type": "module",
"name": f"{name}",
"i": f"{funcs[name][id(module)]}",
"desc": module.get_description(),
"in": in_keys,
"out": out_keys,
}


def serialize_pipeline(pipeline, versioned=True, funcs=None, dsets=None):
if dsets is None:
dsets = {}
if funcs is None:
funcs = {}

modules = []
for module in pipeline.modules:
if isinstance(module, Pipeline):
modules.append(serialize_pipeline(module, versioned, funcs, dsets))
else:
modules.append(serialize_module(module, versioned, funcs, dsets))
return {"type": "pipeline", "name": pipeline.__class__.__name__, "modules": modules}


def pipeline_to_json(pipeline, file_name, versioned=True):
d = serialize_pipeline(pipeline, versioned=versioned)
data = json.dumps(d, indent=4, sort_keys=True)
Path(file_name).write_text(data)


def generate_pipeline_visualisation(
input_file,
Expand Down Expand Up @@ -38,10 +102,10 @@ def generate_pipeline_visualisation(
]

colors = [f"#{r:02x}{g:02x}{b:02x}" for r, g, b in tableau20]
subgraph_colors = cycle(colors)
pipeline_colors = cycle(colors)
pipeline_style = {}
module_style = {"shape": "rectangle", "fillcolor": "chartreuse", "style": "filled"}
dataset_style = {"shape": "oval", "fillcolor": "orange", "style": "filled"}
subgraph_style = {}
edge_style = {"fontcolor": "gray50"}

def get_module_label(module):
Expand All @@ -53,13 +117,13 @@ def get_module_label(module):
return label

def process(data, G):
if data["type"] == "subgraph":
if data["type"] == "pipeline":
if include_subgraphs:
c = G.add_subgraph(
name=f'cluster_{data["name"]}',
label=data["name"],
color=next(subgraph_colors),
**subgraph_style,
color=next(pipeline_colors),
**pipeline_style,
)
else:
c = G
Expand All @@ -80,12 +144,11 @@ def process(data, G):
kwargs["taillabel"] = k
G.add_edge(name, v, **edge_style, **kwargs)
else:
raise ValueError("type should be 'subgraph' or 'module'")
raise ValueError("type should be 'pipeline' or 'module'")

g = pgv.AGraph(name="popmon-pipeline", directed=True)
g.node_attr.update(**dataset_style)
process(data, g)

g.layout("dot")
g.draw(output_file)

Expand Down Expand Up @@ -118,14 +181,14 @@ def process(data, G):
name = pipeline.__class__.__name__.lower()

input_file = data_path / f"pipeline_{name}_unversioned.json"
pipeline.to_json(input_file, versioned=False)
pipeline_to_json(pipeline, input_file, versioned=False)
output_file = f"pipeline_{name}_subgraphs_unversioned.pdf"
generate_pipeline_visualisation(input_file, output_file, include_subgraphs=True)
output_file = f"pipeline_{name}_unversioned.pdf"
generate_pipeline_visualisation(input_file, output_file, include_subgraphs=False)

input_file = data_path / f"pipeline_{name}_versioned.json"
pipeline.to_json(input_file, versioned=True)
pipeline_to_json(pipeline, input_file, versioned=True)
output_file = f"pipeline_{name}_subgraphs_versioned.pdf"
generate_pipeline_visualisation(input_file, output_file, include_subgraphs=True)
output_file = f"pipeline_{name}_versioned.pdf"
Expand Down

0 comments on commit 8efc072

Please sign in to comment.