Skip to content

Commit

Permalink
Merge branch 'tickets/DM-32241'
Browse files Browse the repository at this point in the history
DM-32241: Cache config values repeated per label
  • Loading branch information
MichelleGower committed Nov 3, 2021
2 parents 373c287 + 3ab5e3d commit 55af10e
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 215 deletions.
2 changes: 2 additions & 0 deletions doc/changes/DM-32241.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Fixed bug when not using lazy commands but using execution butler.
* Fixed bug in htcondor_service.py that overwrote message in bps report.
1 change: 1 addition & 0 deletions doc/changes/DM-32241.perf.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Cache values by labels to reduce number of config lookups to speed up multiple submission stages.
18 changes: 11 additions & 7 deletions python/lsst/ctrl/bps/clustered_quantum_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def __init__(self, name, qgraph, qgraph_filename=None):
raise ValueError(f"name cannot have a / ({name})")
self._name = name
self._quantum_graph = qgraph
self._quantum_graph_filename = qgraph_filename
self._quantum_graph_filename = Path(qgraph_filename).resolve()
self._cluster_graph = DiGraph()

def __str__(self):
Expand Down Expand Up @@ -260,11 +260,10 @@ def get_cluster(self, name):
Raised if the ClusteredQuantumGraph does not contain
a cluster with given name.
"""
if name not in self._cluster_graph:
raise KeyError(f"{self.name} does not have a cluster named {name}")

_LOG.debug("get_cluster nodes = %s", list(self._cluster_graph.nodes))
attr = self._cluster_graph.nodes[name]
try:
attr = self._cluster_graph.nodes[name]
except KeyError as ex:
raise KeyError(f"{self.name} does not have a cluster named {name}") from ex
return attr['cluster']

def get_quantum_node(self, id_):
Expand Down Expand Up @@ -490,5 +489,10 @@ def load(cls, filename, format_=None):
cgraph = pickle.load(fh)

# The QuantumGraph was saved separately
cgraph._quantum_graph = QuantumGraph.loadUri(cgraph._quantum_graph_filename, dim_univ)
try:
cgraph._quantum_graph = QuantumGraph.loadUri(cgraph._quantum_graph_filename, dim_univ)
except FileNotFoundError: # Try same path as ClusteredQuantumGraph
new_filename = path.parent / Path(cgraph._quantum_graph_filename).name
cgraph._quantum_graph = QuantumGraph.loadUri(new_filename, dim_univ)

return cgraph
2 changes: 1 addition & 1 deletion python/lsst/ctrl/bps/etc/bps_defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ clusterAlgorithm: lsst.ctrl.bps.quantum_clustering_funcs.single_quantum_clusteri
submitPath: ${PWD}/submit/{outputRun}
qgraphFileTemplate: "{uniqProcName}.qgraph"
executionButlerTemplate: "{submitPath}/EXEC_REPO-{uniqProcName}"
subDirTemplate: "{label}/{tract}/{patch}/{visit.day_obs}/{exposure.day_obs}/{band}/{subfilter}/{physical_filter}/{visit}/{exposure}"
subDirTemplate: "{label}/{tract}/{patch}/{band}/{subfilter}/{physical_filter}/{visit}/{exposure}"
templateDataId: "{tract}_{patch}_{band}_{visit}_{exposure}_{detector}"

# Whether to output bps-specific intermediate files
Expand Down
35 changes: 20 additions & 15 deletions python/lsst/ctrl/bps/quantum_clustering_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,22 @@ def single_quantum_clustering(config, qgraph, name):
# multiple times.
number_to_name = {}

# Cache template per label for speed.
cached_template = {}

# Create cluster of single quantum.
for qnode in qgraph:
found, template = config.search("templateDataId",
opt={"curvals": {"curr_pipetask": qnode.taskDef.label},
"replaceVars": False})
if found:
template = "{node_number}_{label}_" + template
else:
template = "{node_number:08d}"
if qnode.taskDef.label not in cached_template:
found, template_data_id = config.search("templateDataId",
opt={"curvals": {"curr_pipetask": qnode.taskDef.label},
"replaceVars": False})
if found:
template = "{node_number}_{label}_" + template_data_id
else:
template = "{node_number:08d}"
cached_template[qnode.taskDef.label] = template

cluster = QuantaCluster.from_quantum_node(qnode, template)
cluster = QuantaCluster.from_quantum_node(qnode, cached_template[qnode.taskDef.label])

# Save mapping for use when creating dependencies.
number_to_name[qnode.nodeId] = cluster.name
Expand Down Expand Up @@ -139,6 +144,8 @@ def dimension_clustering(config, qgraph, name):
task_def = qgraph.findTaskDefByLabel(task_label)
quantum_nodes = qgraph.getNodesForTask(task_def)

equal_dims = cluster_config[cluster_label].get("equalDimensions", None)

# Determine cluster for each node
for qnode in quantum_nodes:
# Gather info for cluster name template into a dictionary.
Expand All @@ -149,8 +156,7 @@ def dimension_clustering(config, qgraph, name):
_LOG.debug("dim_name = %s", dim_name)
if dim_name in data_id_info:
info[dim_name] = data_id_info[dim_name]
if "equalDimensions" in cluster_config[cluster_label]:
equal_dims = cluster_config[cluster_label]["equalDimensions"]
if equal_dims:
for pair in [pt.strip() for pt in equal_dims.split(",")]:
dim1, dim2 = pair.strip().split(":")
if dim1 in cluster_dims and dim2 in data_id_info:
Expand Down Expand Up @@ -178,7 +184,6 @@ def dimension_clustering(config, qgraph, name):
# requires it for creating per-job QuantumGraphs.
if cluster_name in cqgraph:
cluster = cqgraph.get_cluster(cluster_name)
assert isinstance(cluster, QuantaCluster)
else:
cluster = QuantaCluster(cluster_name, cluster_label, info)
cqgraph.add_cluster(cluster)
Expand All @@ -188,11 +193,11 @@ def dimension_clustering(config, qgraph, name):
for task_def in qgraph.iterTaskGraph():
if task_def.label not in task_labels_seen:
_LOG.info("Creating 1-quantum clusters for task %s", task_def.label)
found, template = config.search("templateDataId",
opt={"curvals": {"curr_pipetask": task_def.label},
"replaceVars": False})
found, template_data_id = config.search("templateDataId",
opt={"curvals": {"curr_pipetask": task_def.label},
"replaceVars": False})
if found:
template = "{node_number}_{label}_" + template
template = "{node_number}_{label}_" + template_data_id
else:
template = "{node_number:08d}"

Expand Down

0 comments on commit 55af10e

Please sign in to comment.