diff --git a/disdat/utility/aws_s3.py b/disdat/utility/aws_s3.py index 5a01200..939636c 100644 --- a/disdat/utility/aws_s3.py +++ b/disdat/utility/aws_s3.py @@ -36,7 +36,9 @@ S3_LS_USE_MP_THRESH = 4000 # the threshold after which we should use MP to look up bundles on s3 -# Use forkserver unless this environment variable is set (hopefully to fork, used by testing) +# Set to available MP contexts, such as forkserver, spawn, or fork. +# We also support "singleprocess" -- in which case Disdat will not use MP +SINGLE_PROCESS_MP_TYPE = "singleprocess" MP_CONTEXT_TYPE = os.environ.get('MP_CONTEXT_TYPE', 'forkserver') MAX_TASKS_PER_CHILD = 100 # Force the pool to kill workers when they've done 100 tasks. @@ -420,7 +422,7 @@ def ls_s3_url_keys(s3_url, is_object_directory=False): results = [] - if is_object_directory: + if is_object_directory and MP_CONTEXT_TYPE != SINGLE_PROCESS_MP_TYPE: prefixes = [f'{c}{d}' for c in hexes for d in hexes] multiple_results = [] mp_ctxt = get_context(MP_CONTEXT_TYPE) @@ -509,18 +511,23 @@ def delete_s3_dir_many(s3_urls): number objects deleted (int) """ - mp_ctxt = get_context(MP_CONTEXT_TYPE) # Using forkserver here causes moto / pytest failures - with mp_ctxt.Pool( - processes=disdat_cpu_count(), - maxtasksperchild=MAX_TASKS_PER_CHILD, - initializer=get_s3_resource, - ) as pool: - multiple_results = [] + if MP_CONTEXT_TYPE == SINGLE_PROCESS_MP_TYPE: results = [] for s3_url in s3_urls: - multiple_results.append(pool.apply_async(delete_s3_dir, (s3_url,), callback=results.append)) - pool.close() - pool.join() + results.append(delete_s3_dir(s3_url)) + else: + mp_ctxt = get_context(MP_CONTEXT_TYPE) # Using forkserver here causes moto / pytest failures + with mp_ctxt.Pool( + processes=disdat_cpu_count(), + maxtasksperchild=MAX_TASKS_PER_CHILD, + initializer=get_s3_resource, + ) as pool: + multiple_results = [] + results = [] + for s3_url in s3_urls: + multiple_results.append(pool.apply_async(delete_s3_dir, (s3_url,), callback=results.append)) + pool.close() + pool.join() return sum([1 if r > 0 else 0 for r in results]) @@ -638,23 +645,28 @@ def put_s3_key_many(bucket_key_file_tuples): list: list of destination s3 paths """ - mp_ctxt = get_context(MP_CONTEXT_TYPE) - est_cpu_count = disdat_cpu_count() - _logger.debug("put_s3_key_many using MP with cpu_count {}".format(est_cpu_count)) - with mp_ctxt.Pool( - processes=est_cpu_count, - maxtasksperchild=MAX_TASKS_PER_CHILD, - initializer=get_s3_resource, - ) as pool: - multiple_results = [] + + if MP_CONTEXT_TYPE == SINGLE_PROCESS_MP_TYPE: results = [] for local_object_path, s3_path in bucket_key_file_tuples: - multiple_results.append(pool.apply_async(cp_local_to_s3_file, - (local_object_path, s3_path), - callback=results.append)) - - pool.close() - pool.join() + results.append(cp_local_to_s3_file(local_object_path, s3_path)) + else: + mp_ctxt = get_context(MP_CONTEXT_TYPE) + est_cpu_count = disdat_cpu_count() + _logger.debug("put_s3_key_many using MP with cpu_count {}".format(est_cpu_count)) + with mp_ctxt.Pool( + processes=est_cpu_count, + maxtasksperchild=MAX_TASKS_PER_CHILD, + initializer=get_s3_resource, + ) as pool: + multiple_results = [] + results = [] + for local_object_path, s3_path in bucket_key_file_tuples: + multiple_results.append(pool.apply_async(cp_local_to_s3_file, + (local_object_path, s3_path), + callback=results.append)) + pool.close() + pool.join() return results @@ -714,23 +726,27 @@ def get_s3_key_many(bucket_key_file_tuples): filenames (list): list of filenames """ - mp_ctxt = get_context(MP_CONTEXT_TYPE) # Using forkserver here causes moto / pytest failures - est_cpu_count = disdat_cpu_count() - _logger.debug("get_s3_key_many using MP with cpu_count {}".format(est_cpu_count)) - with mp_ctxt.Pool( - processes=est_cpu_count, - maxtasksperchild=MAX_TASKS_PER_CHILD, - initializer=get_s3_resource, - ) as pool: - multiple_results = [] + if MP_CONTEXT_TYPE == SINGLE_PROCESS_MP_TYPE: results = [] for s3_bucket, s3_key, local_object_path in bucket_key_file_tuples: - multiple_results.append(pool.apply_async(get_s3_key, - (s3_bucket, s3_key, local_object_path), - callback=results.append)) - - pool.close() - pool.join() + results.append(get_s3_key(s3_bucket, s3_key, local_object_path)) + else: + mp_ctxt = get_context(MP_CONTEXT_TYPE) # Using forkserver here causes moto / pytest failures + est_cpu_count = disdat_cpu_count() + _logger.debug("get_s3_key_many using MP with cpu_count {}".format(est_cpu_count)) + with mp_ctxt.Pool( + processes=est_cpu_count, + maxtasksperchild=MAX_TASKS_PER_CHILD, + initializer=get_s3_resource, + ) as pool: + multiple_results = [] + results = [] + for s3_bucket, s3_key, local_object_path in bucket_key_file_tuples: + multiple_results.append(pool.apply_async(get_s3_key, + (s3_bucket, s3_key, local_object_path), + callback=results.append)) + pool.close() + pool.join() return results diff --git a/tests/functional/test_api_exit.py b/tests/functional/test_api_exit.py index 0e1ad8b..f919a29 100644 --- a/tests/functional/test_api_exit.py +++ b/tests/functional/test_api_exit.py @@ -88,4 +88,6 @@ def pipe_run(self, **kwargs): if __name__ == "__main__": + import multiprocessing as mp + mp.set_start_method('fork') test() diff --git a/tests/functional/test_external_bundle.py b/tests/functional/test_external_bundle.py index 7b30437..61f88fe 100644 --- a/tests/functional/test_external_bundle.py +++ b/tests/functional/test_external_bundle.py @@ -30,8 +30,8 @@ def test(run_test): 1.) Run DataMaker which runs PreMaker 2.) Assert that those ran, and remove PreMaker - 3.) run Root_1 which needs DataMaker (external dep) and PreMaker - 4.) assert that premaker re-ran and root ran successfully (getting external dependency) + 3.) run Root which needs DataMaker (external dep) and PreMaker + 4.) assert that premaker re-ran and Root ran successfully (getting external dependency) """ @@ -44,13 +44,13 @@ def test(run_test): pm_uuid = b.uuid b.rm() - api.apply(TEST_CONTEXT, Root_1) + api.apply(TEST_CONTEXT, Root) b = api.get(TEST_CONTEXT, 'PreMaker') assert(b is not None) assert(b.uuid != pm_uuid) - b = api.get(TEST_CONTEXT, 'Root_1') + b = api.get(TEST_CONTEXT, 'Root') assert(b is not None) api.delete_context(TEST_CONTEXT) @@ -86,7 +86,7 @@ def pipe_run(self): return pd.DataFrame({'fark': np.random.randint(100, size=10), 'bark': np.random.randint(10, size=10)}) -class Root_1(PipeTask): +class Root(PipeTask): def pipe_requires(self): self.add_dependency('premaker', PreMaker, params={}) diff --git a/tox.ini b/tox.ini index a3bd798..0c444ec 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = clean,py37,p38 +envlist = clean,py37,py38 skip_missing_interpreters=true [testenv]