From d2727e06b8fea2d74ff870e5c989bbd9485515eb Mon Sep 17 00:00:00 2001 From: Zhang Yunjun Date: Sat, 13 Nov 2021 14:01:56 -0800 Subject: [PATCH] dask: support numWorker input in percentage with improved stability + cluster.DaskCluster: - run(): set the num of scaled workers to the num of boxes, to avoid unnecessary resource initiation - format_num_worker(): revert "divide num core by num of threads per core" (#691) as it's not right. - format_num_worker(): support num of workers in percentage, e.g. 80% to use 80% of all cores + ifgram_inversion: use ceil instead of rint to prevent the occurrence of extra small patches. + cluster.split_box2sub_boxes: - add a min step size constrain - trim the final number of boxes, to avoid negative dimension --- docs/api/module_hierarchy.md | 2 +- mintpy/defaults/smallbaselineApp.cfg | 5 ++- mintpy/ifgram_inversion.py | 2 +- mintpy/objects/cluster.py | 65 ++++++++++++++++------------ 4 files changed, 42 insertions(+), 32 deletions(-) diff --git a/docs/api/module_hierarchy.md b/docs/api/module_hierarchy.md index da293b359..f097a6296 100644 --- a/docs/api/module_hierarchy.md +++ b/docs/api/module_hierarchy.md @@ -7,6 +7,7 @@ Hierarchy of sub-modules within MintPy. Level _N_ modules depends on level _N-1_ auto_path template /objects + cluster colors giant ramp @@ -23,7 +24,6 @@ Hierarchy of sub-modules within MintPy. Level _N_ modules depends on level _N-1_ fractal ------------------ level 1 -------------------- /objects - cluster (utils/utils0) conncomp (objects/ramp) stack (utils/ptime) /utils diff --git a/mintpy/defaults/smallbaselineApp.cfg b/mintpy/defaults/smallbaselineApp.cfg index b6ce5db90..608b13f69 100644 --- a/mintpy/defaults/smallbaselineApp.cfg +++ b/mintpy/defaults/smallbaselineApp.cfg @@ -5,10 +5,11 @@ mintpy.compute.maxMemory = auto #[float > 0.0], auto for 4, max memory to alloca ## parallel processing with dask ## currently apply to steps: invert_network, correct_topography ## cluster = none to turn off the parallel computing -## numWorker = all to use all locally available cores (for cluster = local only) +## numWorker = all to use all of locally available cores (for cluster = local only) +## numWorker = 80% to use 80% of locally available cores (for cluster = local only) ## config = none to rollback to the default name (same as the cluster type; for cluster != local) mintpy.compute.cluster = auto #[local / slurm / pbs / lsf / none], auto for none, cluster type -mintpy.compute.numWorker = auto #[int > 1 / all], auto for 4 (local) or 40 (slurm / pbs / lsf), num of workers +mintpy.compute.numWorker = auto #[int > 1 / all / num%], auto for 4 (local) or 40 (slurm / pbs / lsf), num of workers mintpy.compute.config = auto #[none / slurm / pbs / lsf ], auto for none (same as cluster), config name diff --git a/mintpy/ifgram_inversion.py b/mintpy/ifgram_inversion.py index fe3290ffd..2cc5d14a8 100755 --- a/mintpy/ifgram_inversion.py +++ b/mintpy/ifgram_inversion.py @@ -575,7 +575,7 @@ def split2boxes(ifgram_file, max_memory=4, print_msg=True): ds_size = (ifg_obj.numIfgram * 2 + ifg_obj.numDate + 5) * length * width * 4 num_box = int(np.ceil(ds_size * 1.5 / (max_memory * 1024**3))) - y_step = int(np.rint((length / num_box) / 10) * 10) + y_step = int(np.ceil((length / num_box) / 10) * 10) num_box = int(np.ceil(length / y_step)) if print_msg and num_box > 1: print('maximum memory size: %.1E GB' % max_memory) diff --git a/mintpy/objects/cluster.py b/mintpy/objects/cluster.py index 1ba8c3d03..14127fc9d 100644 --- a/mintpy/objects/cluster.py +++ b/mintpy/objects/cluster.py @@ -12,9 +12,7 @@ import time import glob import shutil -import subprocess import numpy as np -from mintpy.utils import utils0 as ut0 # supported / tested clusters @@ -53,6 +51,8 @@ def split_box2sub_boxes(box, num_split, dimension='x', print_msg=False): else: dim_size = width step = int(np.ceil(dim_size / num_split)) + step = max(step, 10) # constain the min step size + num_split = int(np.ceil(dim_size / step)) # trim the final number of boxes # get list of boxes sub_boxes = [] @@ -174,7 +174,7 @@ def __init__(self, cluster_type, num_worker, config_name=None, **kwargs): def open(self): - """Initiate and scale the cluster""" + """Initiate the cluster""" # initiate the cluster object # Look at the ~/.config/dask/mintpy.yaml file for changing the Dask configuration defaults @@ -213,12 +213,6 @@ def open(self): with open('dask_command_run_from_python.txt', 'w') as f: f.write(self.cluster.job_script() + '\n') - # This line submits num_worker jobs to the cluster to start a bunch of workers - # In tests on Pegasus `general` queue in Jan 2019, no more than 40 workers could RUN - # at once (other user's jobs gained higher priority in the general at that point) - print('scale the cluster to {} workers'.format(self.num_worker)) - self.cluster.scale(self.num_worker) - def run(self, func, func_data, results): """Wrapper function encapsulating submit_workers and compile_workers. @@ -235,21 +229,27 @@ def run(self, func, func_data, results): """ from dask.distributed import Client - # This line needs to be in a function or in a `if __name__ == "__main__":` block. If it is in no function - # or "main" block, each worker will try to create its own client (which is bad) when loading the module + # split the primary box into sub boxes for workers AND + # update the number of workers based on split result + box = func_data["box"] + sub_boxes = split_box2sub_boxes(box, num_split=self.num_worker, dimension='x', print_msg=False) + self.num_worker = len(sub_boxes) + print('split patch into {} sub boxes in x direction for workers to process'.format(self.num_worker)) + + # start a bunch of workers from the cluster + print('scale Dask cluster to {} workers'.format(self.num_worker)) + self.cluster.scale(self.num_worker) + print('initiate Dask client') self.client = Client(self.cluster) - # split the primary box into sub boxes for each worker - box = func_data["box"] - sub_boxes = split_box2sub_boxes(box, num_split=self.num_worker, dimension='x') - print('split patch into {} sub boxes in x direction for workers to process'.format(len(sub_boxes))) - # submit job for each worker futures, submission_time = self.submit_job(func, func_data, sub_boxes) # assemble results from all workers - return self.collect_result(futures, results, box, submission_time) + results = self.collect_result(futures, results, box, submission_time) + + return results def submit_job(self, func, func_data, sub_boxes): @@ -358,21 +358,30 @@ def format_num_worker(cluster_type, num_worker): if cluster_type == 'local': num_core = os.cpu_count() - # all --> num_core + # all / percentage --> num_core + msg = f'numWorker = {num_worker}' if num_worker == 'all': - # divide by the number of threads per core [for Linux only] - # as it works better on HPC, check https://github.com/insarlab/MintPy/issues/518 - if ut0.which('lscpu') is not None: - # get the number of threads per core - # link: https://stackoverflow.com/questions/62652951 - ps = subprocess.run(['lscpu'], capture_output=True, text=True).stdout.split('\n') - ns = [p.split(':')[1].strip() for p in ps if p.startswith('Thread(s) per core:')] - if len(ns) > 0: - num_thread = int(ns[0]) - num_core = int(num_core / num_thread) + ## divide by the number of threads per core [for Linux only] + #import subprocess + #from mintpy.utils import utils0 as ut0 + #if ut0.which('lscpu') is not None: + # # get the number of threads per core + # # link: https://stackoverflow.com/questions/62652951 + # ps = subprocess.run(['lscpu'], capture_output=True, text=True).stdout.split('\n') + # ns = [p.split(':')[1].strip() for p in ps if p.startswith('Thread(s) per core:')] + # if len(ns) > 0: + # num_thread = int(ns[0]) + # num_core = int(num_core / num_thread) # set num_worker to the number of cores num_worker = str(num_core) + print('translate {} to {}'.format(msg, num_worker)) + + elif num_worker.endswith('%'): + num_worker = int(num_core * float(num_worker[:-1]) / 100) + print('translate {} to {}'.format(msg, num_worker)) + if num_worker < 1 or num_worker >= num_core: + raise ValueError('Invalid numWorker percentage!') # str --> int num_worker = int(num_worker)