Skip to content

Commit

Permalink
Fix: Update code to reflect networkx updates
Browse files Browse the repository at this point in the history
  • Loading branch information
dileep-kishore committed Jan 26, 2021
1 parent 0b86f65 commit 9b584c8
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 18 deletions.
26 changes: 13 additions & 13 deletions micone/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def _create_processes(self, settings: dict) -> None:
root_dir = str(default_process_data.root_dir) + suffix_flag
else:
root_dir = str(default_process_data.root_dir)
tree.node[node_name]["process"] = Process(
tree.nodes[node_name]["process"] = Process(
default_process_data,
self.profile,
str(self.output_location),
Expand All @@ -235,12 +235,12 @@ def __len__(self) -> int:

def __getitem__(self, key: str) -> Process:
for process in self:
if process.name == key:
if process == key:
return process
raise KeyError(f"{key} is not a process of this pipeline")

def __repr__(self) -> str:
processes = [process.name for process in self]
processes = [process for process in self]
return (
f"<Pipeline title={self.title} output_location={self.output_location} "
f"processes={processes}>"
Expand Down Expand Up @@ -268,28 +268,28 @@ def run(self, max_procs: int = 4) -> Iterator[Process]:
# Get the process for the root node and update locations
tree = self.process_tree
root_node = next(nx.topological_sort(tree))
root_node_process = tree.node[root_node]["process"]
root_node_process = tree.nodes[root_node]["process"]
root_node_process.update_location(str(self.base_dir), "input")
root_path = self.output_location / root_node_process.params.root_dir
root_node_process.update_location(str(root_path), "output")
tree.node[root_node]["location"] = str(root_path)
tree.nodes[root_node]["location"] = str(root_path)
# Attach outputs of parent node to inputs of child node
for curr_process_name in nx.bfs_tree(tree, root_node):
curr_process = tree.node[curr_process_name]["process"]
curr_process = tree.nodes[curr_process_name]["process"]
curr_process.update_location(str(self.base_dir), "input")
root_path = self.output_location / curr_process.params.root_dir
curr_process.update_location(str(root_path), "output")
tree.node[curr_process_name]["location"] = str(root_path)
tree.nodes[curr_process_name]["location"] = str(root_path)
predecessors: List[str] = list(tree.predecessors(curr_process_name))
while predecessors:
prev_process_name = predecessors[0]
prev_process = tree.node[prev_process_name]["process"]
prev_process = tree.nodes[prev_process_name]["process"]
curr_process.attach_to(prev_process)
predecessors = list(tree.predecessors(prev_process_name))
self.draw_process_tree(self.output_location)
self.process_queue = collections.deque([], max_procs)
for process_name in nx.bfs_tree(tree, root_node):
process = self.process_tree.node[process_name]["process"]
process = self.process_tree.nodes[process_name]["process"]
loc = pathlib.Path(self.output_location) # / process.params.output_location
if self.resume and process.io_exist:
yield process
Expand Down Expand Up @@ -319,7 +319,7 @@ def draw_process_tree(self, fpath: str) -> None:
gml = pathlib.Path(fpath) / "DAG.gml"
nodes = list(self.process_tree.nodes)
labels = {n: n.split(".", 2)[-1] for n in nodes}
pos = hierarchy_pos(tree, 0, width=2 * math.pi, xcenter=0)
pos = hierarchy_pos(tree, width=2 * math.pi)
# If you want a radial graph
# new_pos = {u:(r*math.cos(theta),r*math.sin(theta)) for u, (theta, r) in pos.items()}
nx.draw_networkx_nodes(tree, pos, node_size=500, alpha=0.8)
Expand Down Expand Up @@ -347,7 +347,7 @@ def status(self) -> Dict[str, str]:
root_node = next(nx.topological_sort(tree))
status_dict: Dict[str, str] = {}
for process_name in nx.bfs_tree(tree, root_node):
process = tree.node[process_name]["process"]
process = tree.nodes[process_name]["process"]
status_dict[process_name] = process.status
return status_dict

Expand Down Expand Up @@ -439,7 +439,7 @@ def clean(self, files: List[str] = ["logs"]) -> None:
cmd.log()
if "configs" in files:
for node in self:
process = self.process_tree.node[node]["process"]
process = self.process_tree.nodes[node]["process"]
script_file = f"{process.id}.nf"
config_file = f"{process.id}.config"
cmd = Command(f"rm {script_file} {config_file}", "local")
Expand All @@ -453,7 +453,7 @@ def clean(self, files: List[str] = ["logs"]) -> None:
cmd.log()
if "results" in files:
for node in self:
process = self.process_tree.node[node]["process"]
process = self.process_tree.nodes[node]["process"]
root_path = process.output_location / process.params.root_dir
cmd = Command(f"rm -rf {root_path}", "local")
cmd.run()
Expand Down
3 changes: 1 addition & 2 deletions micone/utils/hierarchical_layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,7 @@ def _hierarchy_pos(
leafpos[root] = (leftmost, vert_loc)
# pos[root] = (leftmost + (leaf_count-1)*dx/2., vert_loc)
# print(leaf_count)

return rootpos, leafpos, leaf_count
return rootpos, leafpos, leaf_count

xcenter = width / 2.0
if isinstance(G, nx.DiGraph):
Expand Down
6 changes: 3 additions & 3 deletions tests/pipelines/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ def test_pipeline_init(self, example_pipeline_files):
def test_pipeline_len_getitem(self, example_pipeline_files):
user_settings = example_pipeline_files["grouptaxa_sparcc_json"]
pipeline = Pipeline(user_settings, profile="local")
assert len(pipeline) == 4
process = pipeline["network_inference.network.make_network"]
assert process.name == "network_inference.network.make_network"
assert len(pipeline) == 6
process = pipeline["network_inference.network.make_network.1"]
assert process == "network_inference.network.make_network.1"

@pytest.mark.filterwarnings("ignore::UserWarning")
def test_pipeline_run(self, example_pipeline_files, tmpdir):
Expand Down

0 comments on commit 9b584c8

Please sign in to comment.