Skip to content

Commit

Permalink
Merge pull request #194 from kyocum/feature/sp-pull
Browse files Browse the repository at this point in the history
Allow single process pull/pushes
  • Loading branch information
kyocum committed Aug 13, 2021
2 parents c3ef7e8 + cfb08ff commit b4fa281
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 48 deletions.
100 changes: 58 additions & 42 deletions disdat/utility/aws_s3.py
Expand Up @@ -36,7 +36,9 @@

S3_LS_USE_MP_THRESH = 4000 # the threshold after which we should use MP to look up bundles on s3

# Use forkserver unless this environment variable is set (hopefully to fork, used by testing)
# Set to available MP contexts, such as forkserver, spawn, or fork.
# We also support "singleprocess" -- in which case Disdat will not use MP
SINGLE_PROCESS_MP_TYPE = "singleprocess"
MP_CONTEXT_TYPE = os.environ.get('MP_CONTEXT_TYPE', 'forkserver')
MAX_TASKS_PER_CHILD = 100 # Force the pool to kill workers when they've done 100 tasks.

Expand Down Expand Up @@ -420,7 +422,7 @@ def ls_s3_url_keys(s3_url, is_object_directory=False):

results = []

if is_object_directory:
if is_object_directory and MP_CONTEXT_TYPE != SINGLE_PROCESS_MP_TYPE:
prefixes = [f'{c}{d}' for c in hexes for d in hexes]
multiple_results = []
mp_ctxt = get_context(MP_CONTEXT_TYPE)
Expand Down Expand Up @@ -509,18 +511,23 @@ def delete_s3_dir_many(s3_urls):
number objects deleted (int)
"""
mp_ctxt = get_context(MP_CONTEXT_TYPE) # Using forkserver here causes moto / pytest failures
with mp_ctxt.Pool(
processes=disdat_cpu_count(),
maxtasksperchild=MAX_TASKS_PER_CHILD,
initializer=get_s3_resource,
) as pool:
multiple_results = []
if MP_CONTEXT_TYPE == SINGLE_PROCESS_MP_TYPE:
results = []
for s3_url in s3_urls:
multiple_results.append(pool.apply_async(delete_s3_dir, (s3_url,), callback=results.append))
pool.close()
pool.join()
results.append(delete_s3_dir(s3_url))
else:
mp_ctxt = get_context(MP_CONTEXT_TYPE) # Using forkserver here causes moto / pytest failures
with mp_ctxt.Pool(
processes=disdat_cpu_count(),
maxtasksperchild=MAX_TASKS_PER_CHILD,
initializer=get_s3_resource,
) as pool:
multiple_results = []
results = []
for s3_url in s3_urls:
multiple_results.append(pool.apply_async(delete_s3_dir, (s3_url,), callback=results.append))
pool.close()
pool.join()

return sum([1 if r > 0 else 0 for r in results])

Expand Down Expand Up @@ -638,23 +645,28 @@ def put_s3_key_many(bucket_key_file_tuples):
list: list of destination s3 paths
"""
mp_ctxt = get_context(MP_CONTEXT_TYPE)
est_cpu_count = disdat_cpu_count()
_logger.debug("put_s3_key_many using MP with cpu_count {}".format(est_cpu_count))
with mp_ctxt.Pool(
processes=est_cpu_count,
maxtasksperchild=MAX_TASKS_PER_CHILD,
initializer=get_s3_resource,
) as pool:
multiple_results = []

if MP_CONTEXT_TYPE == SINGLE_PROCESS_MP_TYPE:
results = []
for local_object_path, s3_path in bucket_key_file_tuples:
multiple_results.append(pool.apply_async(cp_local_to_s3_file,
(local_object_path, s3_path),
callback=results.append))

pool.close()
pool.join()
results.append(cp_local_to_s3_file(local_object_path, s3_path))
else:
mp_ctxt = get_context(MP_CONTEXT_TYPE)
est_cpu_count = disdat_cpu_count()
_logger.debug("put_s3_key_many using MP with cpu_count {}".format(est_cpu_count))
with mp_ctxt.Pool(
processes=est_cpu_count,
maxtasksperchild=MAX_TASKS_PER_CHILD,
initializer=get_s3_resource,
) as pool:
multiple_results = []
results = []
for local_object_path, s3_path in bucket_key_file_tuples:
multiple_results.append(pool.apply_async(cp_local_to_s3_file,
(local_object_path, s3_path),
callback=results.append))
pool.close()
pool.join()
return results


Expand Down Expand Up @@ -714,23 +726,27 @@ def get_s3_key_many(bucket_key_file_tuples):
filenames (list): list of filenames
"""
mp_ctxt = get_context(MP_CONTEXT_TYPE) # Using forkserver here causes moto / pytest failures
est_cpu_count = disdat_cpu_count()
_logger.debug("get_s3_key_many using MP with cpu_count {}".format(est_cpu_count))
with mp_ctxt.Pool(
processes=est_cpu_count,
maxtasksperchild=MAX_TASKS_PER_CHILD,
initializer=get_s3_resource,
) as pool:
multiple_results = []
if MP_CONTEXT_TYPE == SINGLE_PROCESS_MP_TYPE:
results = []
for s3_bucket, s3_key, local_object_path in bucket_key_file_tuples:
multiple_results.append(pool.apply_async(get_s3_key,
(s3_bucket, s3_key, local_object_path),
callback=results.append))

pool.close()
pool.join()
results.append(get_s3_key(s3_bucket, s3_key, local_object_path))
else:
mp_ctxt = get_context(MP_CONTEXT_TYPE) # Using forkserver here causes moto / pytest failures
est_cpu_count = disdat_cpu_count()
_logger.debug("get_s3_key_many using MP with cpu_count {}".format(est_cpu_count))
with mp_ctxt.Pool(
processes=est_cpu_count,
maxtasksperchild=MAX_TASKS_PER_CHILD,
initializer=get_s3_resource,
) as pool:
multiple_results = []
results = []
for s3_bucket, s3_key, local_object_path in bucket_key_file_tuples:
multiple_results.append(pool.apply_async(get_s3_key,
(s3_bucket, s3_key, local_object_path),
callback=results.append))
pool.close()
pool.join()
return results


Expand Down
2 changes: 2 additions & 0 deletions tests/functional/test_api_exit.py
Expand Up @@ -88,4 +88,6 @@ def pipe_run(self, **kwargs):


if __name__ == "__main__":
import multiprocessing as mp
mp.set_start_method('fork')
test()
10 changes: 5 additions & 5 deletions tests/functional/test_external_bundle.py
Expand Up @@ -30,8 +30,8 @@ def test(run_test):
1.) Run DataMaker which runs PreMaker
2.) Assert that those ran, and remove PreMaker
3.) run Root_1 which needs DataMaker (external dep) and PreMaker
4.) assert that premaker re-ran and root ran successfully (getting external dependency)
3.) run Root which needs DataMaker (external dep) and PreMaker
4.) assert that premaker re-ran and Root ran successfully (getting external dependency)
"""

Expand All @@ -44,13 +44,13 @@ def test(run_test):
pm_uuid = b.uuid
b.rm()

api.apply(TEST_CONTEXT, Root_1)
api.apply(TEST_CONTEXT, Root)

b = api.get(TEST_CONTEXT, 'PreMaker')
assert(b is not None)
assert(b.uuid != pm_uuid)

b = api.get(TEST_CONTEXT, 'Root_1')
b = api.get(TEST_CONTEXT, 'Root')
assert(b is not None)

api.delete_context(TEST_CONTEXT)
Expand Down Expand Up @@ -86,7 +86,7 @@ def pipe_run(self):
return pd.DataFrame({'fark': np.random.randint(100, size=10), 'bark': np.random.randint(10, size=10)})


class Root_1(PipeTask):
class Root(PipeTask):

def pipe_requires(self):
self.add_dependency('premaker', PreMaker, params={})
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Expand Up @@ -4,7 +4,7 @@
# and then run "tox" from this directory.

[tox]
envlist = clean,py37,p38
envlist = clean,py37,py38
skip_missing_interpreters=true

[testenv]
Expand Down

0 comments on commit b4fa281

Please sign in to comment.