Skip to content

Commit

Permalink
basic dask-based runner for // computation on cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
ghislainp committed Aug 28, 2020
1 parent e3054e1 commit 019b3be
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions smrt/core/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ def run(self, sensor, snowpack, atmosphere=None, snowpack_dimension=None, progre
else:
runner = SequentialRunner(progressbar=progressbar)

simulations = list(simulations)
print("nsim=", len(simulations))
# run all the simulations (with atmosphere as long as it is not depreciated), the results is a flat list of results
results = runner(self.run_single_simulation, ((simul, atmosphere) for simul in simulations))

Expand Down Expand Up @@ -336,3 +338,22 @@ def __call__(self, function, argument_list):
runner = Parallel(n_jobs=self.n_jobs, backend=self.backend) # Parallel Runner

return runner(delayed(function)(*args) for args in argument_list)


class DaskParallelRunner(object):

def __init__(self):
pass

def __call__(self, function, argument_list):

from dask import delayed, compute

def function_with_single_numerical_threads(*args):
lib.set_max_numerical_threads(1)
return function(*args)

lazy_results = [delayed(function_with_single_numerical_threads)(*args) for args in argument_list]

results = compute(*lazy_results)
return results

0 comments on commit 019b3be

Please sign in to comment.