Skip to content

Commit

Permalink
Merge pull request #1137 from celprov/fix/networkx
Browse files Browse the repository at this point in the history
FIX: Deprecated networkx API
  • Loading branch information
oesteban committed Sep 21, 2023
2 parents f360d1e + ce40b97 commit 83a66e0
Showing 1 changed file with 15 additions and 22 deletions.
37 changes: 15 additions & 22 deletions mriqc/engine/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,7 @@ def run(self, graph, config, updatehash=False):
else:
if result:
if result["traceback"]:
notrun.append(
self._clean_queue(jobid, graph, result=result)
)
notrun.append(self._clean_queue(jobid, graph, result=result))
errors.append("".join(result["traceback"]))
else:
self._task_finished_cb(jobid)
Expand Down Expand Up @@ -289,12 +287,8 @@ def _submit_mapnode(self, jobid):
"lil",
)
self.depidx[-numnodes:, jobid] = 1
self.proc_done = np.concatenate(
(self.proc_done, np.zeros(numnodes, dtype=bool))
)
self.proc_pending = np.concatenate(
(self.proc_pending, np.zeros(numnodes, dtype=bool))
)
self.proc_done = np.concatenate((self.proc_done, np.zeros(numnodes, dtype=bool)))
self.proc_pending = np.concatenate((self.proc_pending, np.zeros(numnodes, dtype=bool)))
return False

def _local_hash_check(self, jobid, graph):
Expand All @@ -311,11 +305,7 @@ def _local_hash_check(self, jobid, graph):
overwrite = self.procs[jobid].overwrite
always_run = self.procs[jobid].interface.always_run

if (
cached
and updated
and (overwrite is False or overwrite is None and not always_run)
):
if cached and updated and (overwrite is False or overwrite is None and not always_run):
try:
self._task_finished_cb(jobid, cached=True)
self._remove_node_dirs()
Expand All @@ -339,18 +329,23 @@ def _task_finished_cb(self, jobid, cached=False):
rowview = self.depidx.getrowview(jobid)
rowview[rowview.nonzero()] = 0
if jobid not in self.mapnodesubids:
self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0
try:
self.refidx[self.refidx[:, jobid].nonzero()[0], jobid] = 0
except NotImplementedError:
self.refidx[self.refidx[:, [jobid]].nonzero()[0], jobid] = 0

def _generate_dependency_list(self, graph):
"""Generate a dependency list for a list of graphs."""
import numpy as np
import networkx as nx
from nipype.pipeline.engine.utils import topological_sort

try:
from networkx import to_scipy_sparse_array
except ImportError: # NetworkX < 2.7
from networkx import to_scipy_sparse_matrix as to_scipy_sparse_array

self.procs, _ = topological_sort(graph)
self.depidx = nx.to_scipy_sparse_matrix(
graph, nodelist=self.procs, format="lil"
)
self.depidx = to_scipy_sparse_array(graph, nodelist=self.procs, format="lil")
self.refidx = self.depidx.astype(int)
self.proc_done = np.zeros(len(self.procs), dtype=bool)
self.proc_pending = np.zeros(len(self.procs), dtype=bool)
Expand Down Expand Up @@ -505,9 +500,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
# Check to see if a job is available (jobs with all dependencies run)
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
# See also https://github.com/nipy/nipype/issues/2372
jobids = np.flatnonzero(
~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__()
)
jobids = np.flatnonzero(~self.proc_done & (self.depidx.sum(axis=0) == 0).__array__())

# Check available resources by summing all threads and memory used
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
Expand Down

0 comments on commit 83a66e0

Please sign in to comment.