Skip to content

Commit

Permalink
Fix label order bugs found by rescue workflows.
Browse files Browse the repository at this point in the history
  • Loading branch information
MichelleGower committed Apr 4, 2023
1 parent 80bbc1b commit 0ebaf80
Show file tree
Hide file tree
Showing 10 changed files with 666 additions and 244 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-38377.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improvements to label order to fix bugs highlighted by rescue workflows.
15 changes: 12 additions & 3 deletions python/lsst/ctrl/bps/clustered_quantum_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

from lsst.pipe.base import NodeId, QuantumGraph
from lsst.utils.iteration import ensure_iterable
from networkx import DiGraph
from networkx import DiGraph, is_isomorphic, topological_sort

from .bps_draw import draw_networkx_dot

Expand Down Expand Up @@ -216,6 +216,15 @@ def __len__(self):
"""Return the number of clusters."""
return len(self._cluster_graph)

def __eq__(self, other):
if not isinstance(other, ClusteredQuantumGraph):
return False

Check warning on line 221 in python/lsst/ctrl/bps/clustered_quantum_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/clustered_quantum_graph.py#L221

Added line #L221 was not covered by tests
if len(self) != len(other):
return False

Check warning on line 223 in python/lsst/ctrl/bps/clustered_quantum_graph.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/clustered_quantum_graph.py#L223

Added line #L223 was not covered by tests
return self._quantum_graph == other._quantum_graph and is_isomorphic(
self._cluster_graph, other._cluster_graph
)

@property
def name(self):
"""The name of the ClusteredQuantumGraph."""
Expand Down Expand Up @@ -311,9 +320,9 @@ def clusters(self):
Returns
-------
clusters : `Iterator` [`lsst.ctrl.bps.QuantaCluster`]
Iterator over clusters.
Iterator over clusters in topological order.
"""
return map(self.get_cluster, self._cluster_graph.nodes())
return map(self.get_cluster, topological_sort(self._cluster_graph))

def successors(self, name):
"""Return clusters that are successors of the cluster
Expand Down
163 changes: 139 additions & 24 deletions python/lsst/ctrl/bps/generic_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ class GenericWorkflowJob:

# As of python 3.7.8, can't use __slots__ if give default values, so
# writing own __init__.
def __init__(self, name: str):
def __init__(self, name, label="UNK"):
self.name = name
self.label = None
self.label = label
self.quanta_counts = Counter()
self.tags = {}
self.executable = None
Expand Down Expand Up @@ -366,11 +366,11 @@ def __init__(self, name, incoming_graph_data=None, **attr):
super().__init__(incoming_graph_data, **attr)
self._name = name
self.run_attrs = {}
self._job_labels = GenericWorkflowLabels()
self._files = {}
self._executables = {}
self._inputs = {} # mapping job.names to list of GenericWorkflowFile
self._outputs = {} # mapping job.names to list of GenericWorkflowFile
self._labels = defaultdict(list) # mapping job label to list of GenericWorkflowJob
self.run_id = None
self._final = None

Expand All @@ -397,20 +397,24 @@ def quanta_counts(self):

@property
def labels(self):
"""List of job labels (`list` [`str`], read-only)"""
return list(self._labels.keys())
"""Job labels (`list` [`str`], read-only)"""
return self._job_labels.labels

def regenerate_labels(self):
"""Regenerate the list of job labels."""
self._labels = defaultdict(list)
self._job_labels = GenericWorkflowLabels()
for job_name in self:
job = self.get_job(job_name)
self._labels[job.label].append(job)
self._job_labels.add_job(
job,
[self.get_job(p).label for p in self.predecessors(job.name)],
[self.get_job(p).label for p in self.successors(job.name)],
)

@property
def job_counts(self):
"""Count of jobs per job label (`collections.Counter`)."""
jcounts = Counter({label: len(jobs) for label, jobs in self._labels.items()})
jcounts = self._job_labels.job_counts

# Final is separate
final = self.get_final()
Expand Down Expand Up @@ -467,6 +471,9 @@ def add_job(self, job, parent_names=None, child_names=None):
child_names : `list` [`str`], optional
Names of jobs that are children of given job
"""
_LOG.debug("job: %s (%s)", job.name, job.label)
_LOG.debug("parent_names: %s", parent_names)
_LOG.debug("child_names: %s", child_names)
if not isinstance(job, GenericWorkflowJob):
raise RuntimeError(f"Invalid type for job to be added to GenericWorkflowGraph ({type(job)}).")
if self.has_node(job.name):
Expand All @@ -475,7 +482,11 @@ def add_job(self, job, parent_names=None, child_names=None):
self.add_job_relationships(parent_names, job.name)
self.add_job_relationships(job.name, child_names)
self.add_executable(job.executable)
self._labels[job.label].append(job)
self._job_labels.add_job(
job,
[self.get_job(p).label for p in self.predecessors(job.name)],
[self.get_job(p).label for p in self.successors(job.name)],
)

def add_node(self, node_for_adding, **attr):
"""Override networkx function to call more specific add_job function.
Expand All @@ -502,6 +513,10 @@ def add_job_relationships(self, parents, children):
"""
if parents is not None and children is not None:
self.add_edges_from(itertools.product(ensure_iterable(parents), ensure_iterable(children)))
self._job_labels.add_job_relationships(
[self.get_job(n).label for n in ensure_iterable(parents)],
[self.get_job(n).label for n in ensure_iterable(children)],
)

def add_edges_from(self, ebunch_to_add, **attr):
"""Add several edges between jobs in the generic workflow.
Expand Down Expand Up @@ -559,10 +574,9 @@ def del_job(self, job_name: str):
Name of job to delete from workflow.
"""
job = self.get_job(job_name)
self._labels[job.label].remove(job)
# Don't leave keys around if removed last job
if not self._labels[job.label]:
del self._labels[job.label]

# Remove from job labels
self._job_labels.del_job(job)

# Connect all parent jobs to all children jobs.
parents = self.predecessors(job_name)
Expand Down Expand Up @@ -786,21 +800,22 @@ def add_workflow_source(self, workflow):
for sink in new_sinks:
self.add_edge(sink, source)

# Files are stored separately so copy them.
# Add separately stored info
for job_name in workflow:
job = self.get_job(job_name)
# Add job labels
self._job_labels.add_job(
job,
[self.get_job(p).label for p in self.predecessors(job.name)],
[self.get_job(p).label for p in self.successors(job.name)],
)
# Files are stored separately so copy them.
self.add_job_inputs(job_name, workflow.get_job_inputs(job_name, data=True))
self.add_job_outputs(job_name, workflow.get_job_outputs(job_name, data=True))
# Executables are stored separately so copy them.
self.add_job_inputs(job_name, workflow.get_job_inputs(job_name, data=True))
self.add_executable(workflow.get_job(job_name).executable)

# Note: label ordering inferred from dict order
# so adding given source workflow first
labels = defaultdict(list)
for label in workflow._labels:
labels[label] = workflow._labels[label]
for label in self._labels:
labels[label] = self._labels[label]
self._labels = labels

def add_final(self, final):
"""Add special final job/workflow to the generic workflow.
Expand Down Expand Up @@ -885,4 +900,104 @@ def get_jobs_by_label(self, label: str):
jobs : list[`lsst.ctrl.bps.GenericWorkflowJob`]
Jobs having given label.
"""
return self._labels[label]
return self._job_labels.get_jobs_by_label(label)


class GenericWorkflowLabels:
"""A generic representation of a workflow used to submit to specific
workflow management systems.
"""

def __init__(self):
self._label_graph = DiGraph() # Dependency graph of job labels
self._label_to_jobs = defaultdict(list) # mapping job label to list of GenericWorkflowJob

@property
def labels(self):
"""List of job labels (`list` [`str`], read-only)"""
return list(topological_sort(self._label_graph))

@property
def job_counts(self):
"""Count of jobs per job label (`collections.Counter`)."""
jcounts = Counter({label: len(jobs) for label, jobs in self._label_to_jobs.items()})
return jcounts

def get_jobs_by_label(self, label: str):
"""Retrieve jobs by label from workflow.
Parameters
----------
label : `str`
Label of jobs to retrieve.
Returns
-------
jobs : list[`lsst.ctrl.bps.GenericWorkflowJob`]
Jobs having given label.
"""
return self._label_to_jobs[label]

def add_job(self, job, parent_labels, child_labels):
"""Add job's label to labels.
Parameters
----------
job : `lsst.ctrl.bps.GenericWorkflowJob`
The job to delete from the job labels.
parent_labels : `list` [`str`]
Parent job labels.
children_labels : `list` [`str`]
Children job labels.
"""
_LOG.debug("job: %s (%s)", job.name, job.label)
_LOG.debug("parent_labels: %s", parent_labels)
_LOG.debug("child_labels: %s", child_labels)
self._label_to_jobs[job.label].append(job)
self._label_graph.add_node(job.label)
for parent in parent_labels:
self._label_graph.add_edge(parent, job.label)
for child in child_labels:
self._label_graph.add_edge(job.label, child)

def add_job_relationships(self, parent_labels, children_labels):
"""Add dependencies between parent and child job labels.
All parents will be connected to all children.
Parameters
----------
parent_labels : `list` [`str`]
Parent job labels.
children_labels : `list` [`str`]
Children job labels.
"""
if parent_labels is not None and children_labels is not None:
# Since labels, must ensure not adding edge from label to itself.
edges = [
e
for e in itertools.product(ensure_iterable(parent_labels), ensure_iterable(children_labels))
if e[0] != e[1]
]

self._label_graph.add_edges_from(edges)

def del_job(self, job):
"""Delete job and its label from job labels.
Parameters
----------
job : `lsst.ctrl.bps.GenericWorkflowJob`
The job to delete from the job labels.
"""
self._label_to_jobs[job.label].remove(job)
# Don't leave keys around if removed last job
if not self._label_to_jobs[job.label]:
del self._label_to_jobs[job.label]

Check warning on line 995 in python/lsst/ctrl/bps/generic_workflow.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/generic_workflow.py#L995

Added line #L995 was not covered by tests

# remove from graph
parents = self._label_graph.predecessors(job.label)
children = self._label_graph.successors(job.label)
self._label_graph.remove_node(job.label)
self._label_graph.add_edges_from(

Check warning on line 1001 in python/lsst/ctrl/bps/generic_workflow.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/ctrl/bps/generic_workflow.py#L998-L1001

Added lines #L998 - L1001 were not covered by tests
itertools.product(ensure_iterable(parents), ensure_iterable(children))
)

0 comments on commit 0ebaf80

Please sign in to comment.