Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ProxyStore serialization issue with Ray #62

Merged
merged 5 commits into from
May 31, 2024
Merged

Fix ProxyStore serialization issue with Ray #62

merged 5 commits into from
May 31, 2024

Conversation

gpauloski
Copy link
Contributor

Description

I couldn't verify that proxies were getting resolved multiple times in TaPS, but I did add some tests to verify that proxies are only resolved when we expect them to.

I also found the source of the Ray serialization issues with ProxyStore was limited to Proxy[bytes] instances. Ray skips serializing bytes instances, and Proxy[bytes] is technically a bytes instance so Ray wasn't serializing the proxy and then crashing because the proxy wasn't actually a bytestring. Since this was limited to Proxy[bytes] which are only used in the synthetic app, I just altered the synthetic app with some data indirection and added a docstring note to the RayExecutor.

Fixes

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Refactoring (internal implementation changes)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update (no changes to the code)
  • CI change (changes to CI workflows, packages, templates, etc.)
  • Version changes (changes to the package or dependency versions)

Testing

Update unit tests and testing the proxy transformer with Ray.

I also tested ProxyStore performance with Dask and got the following.

Baseline

(taps) cc@escience-taps:~/taps$ python -m taps.run synthetic --log-level APP --executor dask -
-dask-workers 32 --structure bag --task-count 320 --task-data-bytes 10000000 --task-sleep 0 --
bag-max-running 32 --filter-type object-size --filter-min-size 10000 --transformer null --ps-type file --ps-file-dir /tmp/proxy-cache
[2024-05-31 21:41:32.487] RUN   (taps.run) :: Starting app (name=synthetic)
[2024-05-31 21:41:32.487] RUN   (taps.run) :: name='synthetic' timestamp=datetime.datetime(2024, 5, 31, 21, 41, 32, 486998) executor_name='dask' app=SyntheticConfig(structure='bag', task_count=320, task_data_bytes=10000000, task_sleep=0.0, bag_max_running=32, warmup_task=True) executor=DaskDistributedConfig(dask_scheduler_address=None, dask_use_threads=False, dask_workers=32, dask_daemon_workers=True) filter=FilterConfig(filter_type='object-size', filter_min_size=10000, filter_max_size=inf) run=RunConfig(log_file_level='INFO', log_file_name='log.txt', log_level='APP', task_record_file_name='tasks.jsonl', run_dir_format='runs/{name}-{executor}-{timestamp}') transformer=NullTransformerConfig()
[2024-05-31 21:41:32.487] RUN   (taps.run) :: Runtime directory: /home/cc/taps/runs/synthetic-dask-2024-05-31-21-41-32
[2024-05-31 21:41:32.980] APP   (taps.apps.synthetic) :: Submitting warmup task
[2024-05-31 21:41:33.095] APP   (taps.apps.synthetic) :: Warmup task completed
[2024-05-31 21:41:33.095] APP   (taps.apps.synthetic) :: Starting bag workflow
/home/cc/miniconda3/envs/taps/lib/python3.11/site-packages/distributed/client.py:3161: UserWarning: Sending large graph of size 9.54 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(
[2024-05-31 21:41:34.782] APP   (taps.apps.synthetic) :: Submitted 32 initial tasks
[2024-05-31 21:41:46.015] APP   (taps.apps.synthetic) :: Completed 128/320 tasks (rate: 9.91 tasks/s, running tasks: 32)
[2024-05-31 21:42:01.462] APP   (taps.apps.synthetic) :: Completed 324/320 (rate: 11.42 tasks/s)
[2024-05-31 21:42:08.757] RUN   (taps.run) :: Finished app (name=synthetic, runtime=36.27s, tasks=325)

ProxyStore

(taps) cc@escience-taps:~/taps$ python -m taps.run synthetic --log-level APP --executor dask --dask-workers 32 --structure bag --task-count 320 --task-data-bytes 10000000 --task-sleep 0 --bag-max-running 32 --filter-type object-size --filter-min-size 10000 --transformer proxy --ps-
type file --ps-file-dir /tmp/proxy-cache
[2024-05-31 21:42:26.750] RUN   (taps.run) :: Starting app (name=synthetic)
[2024-05-31 21:42:26.750] RUN   (taps.run) :: name='synthetic' timestamp=datetime.datetime(2024, 5, 31, 21, 42, 26, 750157) executor_name='dask' app=SyntheticConfig(structure='bag', task_count=320, task_data_bytes=10000000, task_sleep=0.0, bag_max_running=32, warmup_task=True) executor=DaskDistributedConfig(dask_scheduler_address=None, dask_use_threads=False, dask_workers=32, dask_daemon_workers=True) filter=FilterConfig(filter_type='object-size', filter_min_size=10000, filter_max_size=inf) run=RunConfig(log_file_level='INFO', log_file_name='log.txt', log_level='APP', task_record_file_name='tasks.jsonl', run_dir_format='runs/{name}-{executor}-{timestamp}') transformer=ProxyFileTransformerConfig(ps_type='file', ps_file_dir='/tmp/proxy-cache', ps_redis_addr=None, ps_extract_target=False)
[2024-05-31 21:42:26.750] RUN   (taps.run) :: Runtime directory: /home/cc/taps/runs/synthetic-dask-2024-05-31-21-42-26
[2024-05-31 21:42:27.265] APP   (taps.apps.synthetic) :: Submitting warmup task
[2024-05-31 21:42:27.380] APP   (taps.apps.synthetic) :: Warmup task completed
[2024-05-31 21:42:27.380] APP   (taps.apps.synthetic) :: Starting bag workflow
[2024-05-31 21:42:28.491] APP   (taps.apps.synthetic) :: Submitted 32 initial tasks
[2024-05-31 21:42:41.724] APP   (taps.apps.synthetic) :: Completed 342/320 (rate: 23.84 tasks/s)
[2024-05-31 21:42:42.647] RUN   (taps.run) :: Finished app (name=synthetic, runtime=15.90s, tasks=343)

As expected, using ProxyStore is much faster (more than 2x here) which is reasonably in line with what we found in the paper.

Pull Request Checklist

Please confirm the PR meets the following requirements.

  • Relevant tags are added (breaking, bug, dependencies, documentation, enhancement, refactor).
  • Code changes pass pre-commit (e.g., ruff, mypy, etc.).
  • Tests have been added to show the fix is effective or that the new feature works.
  • New and existing unit tests pass locally with the changes.
  • Docs have been updated and reviewed if relevant.

ProxyStore and Ray do not place nicely together with Proxy[bytes] types.
This is because Ray skips serializing bytes types, but then crashes
because the Proxy isn't actually an instance of bytes. To get around
this, we just wrap bytestring data in the synthetic app in another class
so that the Proxy[Data] gets correctly serialized.
@gpauloski gpauloski added the bug Something isn't working label May 31, 2024
@gpauloski gpauloski merged commit 2ad9807 into main May 31, 2024
7 checks passed
@gpauloski gpauloski deleted the issue-55 branch May 31, 2024 21:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Debug ProxyStore serialization errors
1 participant