Skip to content

Commit

Permalink
dask: support numWorker input in percentage with improved stability
Browse files Browse the repository at this point in the history
+ cluster.DaskCluster:
   - run(): set the num of scaled workers to the num of boxes, to avoid unnecessary resource initiation
   - format_num_worker(): revert "divide num core by num of threads per core" (#691) as it's not right.
   - format_num_worker(): support num of workers in percentage, e.g. 80% to use 80% of all cores

+ ifgram_inversion: use ceil instead of rint to prevent the occurrence of extra small patches.

+ cluster.split_box2sub_boxes:
   - add a min step size constrain
   - trim the final number of boxes, to avoid negative dimension
  • Loading branch information
yunjunz committed Nov 13, 2021
1 parent e14a53e commit d2727e0
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 32 deletions.
2 changes: 1 addition & 1 deletion docs/api/module_hierarchy.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Hierarchy of sub-modules within MintPy. Level _N_ modules depends on level _N-1_
auto_path
template
/objects
cluster
colors
giant
ramp
Expand All @@ -23,7 +24,6 @@ Hierarchy of sub-modules within MintPy. Level _N_ modules depends on level _N-1_
fractal
------------------ level 1 --------------------
/objects
cluster (utils/utils0)
conncomp (objects/ramp)
stack (utils/ptime)
/utils
Expand Down
5 changes: 3 additions & 2 deletions mintpy/defaults/smallbaselineApp.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ mintpy.compute.maxMemory = auto #[float > 0.0], auto for 4, max memory to alloca
## parallel processing with dask
## currently apply to steps: invert_network, correct_topography
## cluster = none to turn off the parallel computing
## numWorker = all to use all locally available cores (for cluster = local only)
## numWorker = all to use all of locally available cores (for cluster = local only)
## numWorker = 80% to use 80% of locally available cores (for cluster = local only)
## config = none to rollback to the default name (same as the cluster type; for cluster != local)
mintpy.compute.cluster = auto #[local / slurm / pbs / lsf / none], auto for none, cluster type
mintpy.compute.numWorker = auto #[int > 1 / all], auto for 4 (local) or 40 (slurm / pbs / lsf), num of workers
mintpy.compute.numWorker = auto #[int > 1 / all / num%], auto for 4 (local) or 40 (slurm / pbs / lsf), num of workers
mintpy.compute.config = auto #[none / slurm / pbs / lsf ], auto for none (same as cluster), config name


Expand Down
2 changes: 1 addition & 1 deletion mintpy/ifgram_inversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ def split2boxes(ifgram_file, max_memory=4, print_msg=True):
ds_size = (ifg_obj.numIfgram * 2 + ifg_obj.numDate + 5) * length * width * 4

num_box = int(np.ceil(ds_size * 1.5 / (max_memory * 1024**3)))
y_step = int(np.rint((length / num_box) / 10) * 10)
y_step = int(np.ceil((length / num_box) / 10) * 10)
num_box = int(np.ceil(length / y_step))
if print_msg and num_box > 1:
print('maximum memory size: %.1E GB' % max_memory)
Expand Down
65 changes: 37 additions & 28 deletions mintpy/objects/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import time
import glob
import shutil
import subprocess
import numpy as np
from mintpy.utils import utils0 as ut0


# supported / tested clusters
Expand Down Expand Up @@ -53,6 +51,8 @@ def split_box2sub_boxes(box, num_split, dimension='x', print_msg=False):
else:
dim_size = width
step = int(np.ceil(dim_size / num_split))
step = max(step, 10) # constain the min step size
num_split = int(np.ceil(dim_size / step)) # trim the final number of boxes

# get list of boxes
sub_boxes = []
Expand Down Expand Up @@ -174,7 +174,7 @@ def __init__(self, cluster_type, num_worker, config_name=None, **kwargs):


def open(self):
"""Initiate and scale the cluster"""
"""Initiate the cluster"""

# initiate the cluster object
# Look at the ~/.config/dask/mintpy.yaml file for changing the Dask configuration defaults
Expand Down Expand Up @@ -213,12 +213,6 @@ def open(self):
with open('dask_command_run_from_python.txt', 'w') as f:
f.write(self.cluster.job_script() + '\n')

# This line submits num_worker jobs to the cluster to start a bunch of workers
# In tests on Pegasus `general` queue in Jan 2019, no more than 40 workers could RUN
# at once (other user's jobs gained higher priority in the general at that point)
print('scale the cluster to {} workers'.format(self.num_worker))
self.cluster.scale(self.num_worker)


def run(self, func, func_data, results):
"""Wrapper function encapsulating submit_workers and compile_workers.
Expand All @@ -235,21 +229,27 @@ def run(self, func, func_data, results):
"""
from dask.distributed import Client

# This line needs to be in a function or in a `if __name__ == "__main__":` block. If it is in no function
# or "main" block, each worker will try to create its own client (which is bad) when loading the module
# split the primary box into sub boxes for workers AND
# update the number of workers based on split result
box = func_data["box"]
sub_boxes = split_box2sub_boxes(box, num_split=self.num_worker, dimension='x', print_msg=False)
self.num_worker = len(sub_boxes)
print('split patch into {} sub boxes in x direction for workers to process'.format(self.num_worker))

# start a bunch of workers from the cluster
print('scale Dask cluster to {} workers'.format(self.num_worker))
self.cluster.scale(self.num_worker)

print('initiate Dask client')
self.client = Client(self.cluster)

# split the primary box into sub boxes for each worker
box = func_data["box"]
sub_boxes = split_box2sub_boxes(box, num_split=self.num_worker, dimension='x')
print('split patch into {} sub boxes in x direction for workers to process'.format(len(sub_boxes)))

# submit job for each worker
futures, submission_time = self.submit_job(func, func_data, sub_boxes)

# assemble results from all workers
return self.collect_result(futures, results, box, submission_time)
results = self.collect_result(futures, results, box, submission_time)

return results


def submit_job(self, func, func_data, sub_boxes):
Expand Down Expand Up @@ -358,21 +358,30 @@ def format_num_worker(cluster_type, num_worker):
if cluster_type == 'local':
num_core = os.cpu_count()

# all --> num_core
# all / percentage --> num_core
msg = f'numWorker = {num_worker}'
if num_worker == 'all':
# divide by the number of threads per core [for Linux only]
# as it works better on HPC, check https://github.com/insarlab/MintPy/issues/518
if ut0.which('lscpu') is not None:
# get the number of threads per core
# link: https://stackoverflow.com/questions/62652951
ps = subprocess.run(['lscpu'], capture_output=True, text=True).stdout.split('\n')
ns = [p.split(':')[1].strip() for p in ps if p.startswith('Thread(s) per core:')]
if len(ns) > 0:
num_thread = int(ns[0])
num_core = int(num_core / num_thread)
## divide by the number of threads per core [for Linux only]
#import subprocess
#from mintpy.utils import utils0 as ut0
#if ut0.which('lscpu') is not None:
# # get the number of threads per core
# # link: https://stackoverflow.com/questions/62652951
# ps = subprocess.run(['lscpu'], capture_output=True, text=True).stdout.split('\n')
# ns = [p.split(':')[1].strip() for p in ps if p.startswith('Thread(s) per core:')]
# if len(ns) > 0:
# num_thread = int(ns[0])
# num_core = int(num_core / num_thread)

# set num_worker to the number of cores
num_worker = str(num_core)
print('translate {} to {}'.format(msg, num_worker))

elif num_worker.endswith('%'):
num_worker = int(num_core * float(num_worker[:-1]) / 100)
print('translate {} to {}'.format(msg, num_worker))
if num_worker < 1 or num_worker >= num_core:
raise ValueError('Invalid numWorker percentage!')

# str --> int
num_worker = int(num_worker)
Expand Down

0 comments on commit d2727e0

Please sign in to comment.