Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cancellable lock-free synchronous channel #413

Merged
merged 1 commit into from Jan 26, 2023

Conversation

talex5
Copy link
Collaborator

@talex5 talex5 commented Jan 24, 2023

This is used when you ask for an Eio.Stream with capacity 0. It's slightly slower in the single-domain case, but much faster with multiple domains (running bench_stream.ml):

sync

I also tried adding a bit of work to the benchmark (for _ = 1 to 1000 do () done; in run_recv). Then multiple domains are actually faster than one domain, but the benefits are still present:

sync-work

I originally planned to write a stream replacement that worked for all capacities, but making a single data structure work for both zero and non-zero capacities turned out to be difficult, especially with non-blocking take.

For a non-blocking take on a non-zero capacity stream you have the advantage that you can reserve one of the items already in the stream, or fail if there isn't one. But with a zero capacity stream a non-blocking take always involves both parties. But zero capacity streams also offer simplifications.

The algorithm is described in detail in the comment at the top of sync.ml. lib_eio/tests/sync.md contains a walk-through of some (non-racing) cases, showing the internal state of the stream at each step. lib_eio/tests/dscheck/test_sync.ml tests all possible interleavings of atomic operations.

It should be fairly easy to extend this to provide a take operation that takes from exactly one of several streams. This is hard to do with non-0-capacity streams because while one consumer is considering whether to accept an item, all other consumers have to wait, so that the items are processed in order.

I also simplified the old locking version in stream.ml as it no longer needs special cases for capacity=0.

@polytypic
Copy link
Contributor

BTW, without looking in detail, this reminds me of Parallel Concurrent ML.

@@ -19,6 +19,7 @@ test_luv:
EIO_BACKEND=luv dune runtest

dscheck:
dune exec -- ./lib_eio/tests/dscheck/test_sync.exe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I run make dschek on my mac, it only seems to run the first of these tests:

➜  eio git:(sync) ✗ make dscheck      
dune exec -- ./lib_eio/tests/dscheck/test_sync.exe
Done: 40% (86/215, 129 left) (jobs: 0)run: 1000
run: 2000
... snip lots of similar lines ...
run: 28880000
rumake: *** [dscheck] Killed: 9
➜  eio git:(sync) ✗ 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the same note, running make bench fails:

dune exec -- ./lib_eio_linux/tests/bench_noop.exe
Error: Program "./lib_eio_linux/tests/bench_noop.exe" not found!
make: *** [bench] Error 1         

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the same also goes for dune build:

➜  eio git:(sync) ✗ dune build
File "lib_eio_linux/tests/dune", line 30, characters 21-30:
30 |  (libraries alcotest eio_linux))
                          ^^^^^^^^^
Error: Library "eio_linux" in _build/default/lib_eio_linux is hidden
(unsatisfied 'enabled_if').
-> required by _build/default/lib_eio_linux/tests/test.exe
-> required by alias lib_eio_linux/tests/all
-> required by alias default
Error: The package eio_linux does not have any user defined stanzas attached
to it. If this is intentional, add (allow_empty) to the package definition in
the dune-project file
-> required by _build/default/eio_linux.install
-> required by alias all
-> required by alias default

It would be nice that the build wouldn't appear to fail on mac. I don't know how to fix this immediately, but I can take a look after reviewing this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make dscheck currently requires a patched version of dscheck, as the official version is too slow to be useful: ocaml-multicore/dscheck#3. However, that version has a bug which means it may also miss some cases.

Ideally, we'd replace the explicit commands in make bench and make dscheck with e.g. dune build @dscheck. Then dune could be smarter about filtering things out on mac. But then you have to stop it running things in parallel, disable buffered output, and force it to let you run the benchmarks multiple times without changes to the code, which is a bit of a pain.

Fixing the main build on mac is important, though! I don't know why dune is trying to build it though. We already have:

(test
 (name test)
 (package eio_linux)
 (enabled_if (= %{system} "linux"))
 (modules test)
 (libraries alcotest eio_linux))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A related issue in dune: The documentation/value of %{system} is not consistent.

I also tried to work around this by using the (* -*- tuareg -*- *) magic, but that didn't work either as that hits a limitation in dune: (dirs ...) not recognised in dune2.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reported it at ocaml/dune#6938

| In_transition, _ ->
if Atomic.compare_and_set cell In_transition value then ()
else add_to_cell queue value cell
| (Slot _ | Item _), _ -> assert false
Copy link
Contributor

@polytypic polytypic Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought: You could probably rule this case out by using either GADTs or polymorphic variants and make the two queues to be of different type. I'm not sure it is worth the effort, but it could be possible.

Copy link
Collaborator Author

@talex5 talex5 Jan 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't work - you can get either type on either queue (it just depends who writes the cell first). e.g. on the consumers queue usually the consumer gets there first and writes a Slot, sometimes the producer gets there first and writes an Item instead.

And there's no way that add_to_cell can require its caller to call it only once.

if cur > 0 then (
if Atomic.compare_and_set t.balance cur (cur - 1) then true
else decr_balance_if_positive t
) else false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also use a combination of && and || here and avoid having your coolness factor drop. 😄

Seriously speaking this is probably clearer with an if then else.

lib_eio/sync.ml Outdated
in aux ()
)

(* We tried to [put] and no value was immediately available.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should say "no valueslot was immediately available".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be clearer. Note that I am using "value" to mean "item or slot" in some places (e.g. in add_to_cell).

@polytypic
Copy link
Contributor

I haven't found a smoking gun yet and the basic approach seems like it should work. I still feel I want to stare at the code a little bit more to understand exactly how things work, but I need to take a break now. I'll try to finish my review by tomorrow morning.

This is used when you ask for an Eio.Stream with capacity 0. It's
slightly slower in the single-domain case, but much faster with multiple
domains.

I originally planned to write a stream replacement that worked for all
capacities, but making a single data structure work for both zero and
non-zero capacities turned out to be difficult, especially with
non-blocking take.

For a non-blocking take on a non-zero capacity stream you have the
advantage that you can reserve one of the items already in the stream,
or fail if there isn't one. But with a zero capacity stream a
non-blocking take always involves both parties. But zero capacity
streams also offer simplifications.

The algorithm is described in detail in the comment at the top of
sync.ml. `lib_eio/tests/sync.md` contains a walk-through of some
(non-racing) cases, showing the internal state of the stream at each
step. `lib_eio/tests/dscheck/test_sync.ml` tests all possible
interleavings of atomic operations.

I simplified the old locking version in `stream.ml` as it no longer
needs special cases for `capacity=0`.
@polytypic
Copy link
Contributor

Alright. I believe I'm now reasonably satisfied that this should work. So, I'm accepting this.

However, I'm getting a strong feeling now that there should be a more modular way to handle cancellation. In the semaphore implementation cancellation didn't require that much logic. In this channel implementation there is quite a bit of logic related to cancellation, which worries me. Also, I don't believe the cancellation logic is strong enough to support selective operations such as trying to acquire at most one of two different semaphores or more generally, trying to do at most one of two different cancellable operations. Being able to do such things is what languages like Concurrent ML, Go, and some others allow you to do via selective communication. It is something I've had in mind to explore (in my par-ml project: "TODO: Composable synchronization primitives"). The Parallel Concurrent ML paper I linked to presents an approach to that and I've implemented it previously in F#. However, I feel that it is not entirely satisfactory as it is not completely lock-free and I believe it is possible to better — perhaps the Cells abstraction is a key to that.

Copy link
Contributor

@polytypic polytypic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 🌔

@talex5
Copy link
Collaborator Author

talex5 commented Jan 26, 2023

Yes, I want to support that kind of thing in future (thanks for the PCML paper link!). There's also the Reagents work that e.g. @bartoszmodelski and @kayceesrk are working on.

I think the only change required here is to provide a version of Sync.take that lets the user provide kc (the consumer callback, which decides whether to accept a value or reject it).

On its own, that should allow us to provide a Sync.take_choose operation, to remove an item from exactly one of several streams atomically, without any overhead with the other operations. The kc function would CAS an atomic (shared between all the streams being selected over) before accepting the value, to ensure it accepted only one.

For the full CML API (allowing e.g. pushing to one of multiple streams), I think it could be built on top of that, by defining e.g.

type 'a t = 'a state Atomic.t Sync.t

So the provider provides its own atomic value and the consumer marks that and its own atomic before accepting. But that will come with a little overhead (not sure how much) so we might not want to do it in all cases. Not having used CML, it's not clear to me how useful the full system is vs just having take_choose.

Whether we provide CML-like features everywhere or as an additional layer, I think that sync.ml itself won't need to change much.

@talex5 talex5 merged commit f707dce into ocaml-multicore:main Jan 26, 2023
@talex5 talex5 deleted the sync branch January 26, 2023 11:23
talex5 added a commit to talex5/opam-repository that referenced this pull request Feb 1, 2023
CHANGES:

New features:

- Add `Eio.Net.run_server` (@bikallem @talex5 ocaml-multicore/eio#408).
  Runs an accept loop in one or more domains, with cancellation and graceful shutdown,
  and an optional maximum number of concurrent connections.

- Add `Buf_read.BE` and `LE` parsers (@Cjen1 ocaml-multicore/eio#399).
  Parse numbers in various binary formats.

- Add `Eio.Buf_read.uint8` (@talex5 ocaml-multicore/eio#418).

Performance:

- Make `Eio.Condition` lock-free (@talex5 ocaml-multicore/eio#397 ocaml-multicore/eio#381).
  In addition to being faster, this allows using conditions in signal handlers.

- Make `Eio.Semaphore` lock-free (@talex5 @polytypic ocaml-multicore/eio#398).

- Make `Eio.Stream` lock-free when the capacity is zero (@talex5 ocaml-multicore/eio#413 ocaml-multicore/eio#411).

- Make `Eio.Promise` lock-free (@talex5 ocaml-multicore/eio#401).

Bug fixes:

- eio_linux: call `Uring.submit` as needed (@talex5 @bikallem ocaml-multicore/eio#428).
  Previously, we could fail to submit a job promptly because the SQE queue was full.

- Fix luv signals (@haesbaert ocaml-multicore/eio#412).
  `libuv` automatically retries polling if it gets `EINTR`, without giving OCaml signal handlers a chance to run.

- eio_luv: fix some resource leaks (@talex5 @patricoferris ocaml-multicore/eio#421).

- eio_luv: fix "unavailable signal" error on Windows (@talex5 ocaml-multicore/eio#420, reported by @nojb).

- Fix `Buf_write.BE.uint48` and `LE.uint48` (@adatario ocaml-multicore/eio#418).

Documentation:

- Add example programs (@talex5 ocaml-multicore/eio#389).

- Update network examples to use `run_server` (@talex5 ocaml-multicore/eio#417).

- Add a warning to the tutorial about `Fiber.first` (@talex5 ocaml-multicore/eio#394).

- Clarify the epoch used for `Eio.Time.now` (@bikallem ocaml-multicore/eio#395).

- Describe `secure_random` as an infinite source (@patricoferris ocaml-multicore/eio#426).

- Update README for OCaml 5 release (@talex5 ocaml-multicore/eio#384 ocaml-multicore/eio#391 ocaml-multicore/eio#393).

Other changes:

- Delay setting `SIGPIPE` handler until the `run` function is called (@talex5 ocaml-multicore/eio#420).

- Remove debug-level logging (@talex5 ocaml-multicore/eio#403).

- eio-luv: improve `process.md` test (@smondet ocaml-multicore/eio#414).

- Update to Dune 3 (@talex5 ocaml-multicore/eio#410).

- Remove test dependency on Astring (@talex5 ocaml-multicore/eio#402 ocaml-multicore/eio#404).

- Simplify cancellation logic (@talex5 ocaml-multicore/eio#396).

- time: `Mtime.Spand.to_s` has been deprecated in mtime 2.0.0 (@bikallem ocaml-multicore/eio#385).
talex5 added a commit to talex5/opam-repository that referenced this pull request Feb 1, 2023
CHANGES:

New features:

- Add `Eio.Net.run_server` (@bikallem @talex5 ocaml-multicore/eio#408).
  Runs an accept loop in one or more domains, with cancellation and graceful shutdown,
  and an optional maximum number of concurrent connections.

- Add `Buf_read.BE` and `LE` parsers (@Cjen1 ocaml-multicore/eio#399).
  Parse numbers in various binary formats.

- Add `Eio.Buf_read.uint8` (@talex5 ocaml-multicore/eio#418).

Performance:

- Make `Eio.Condition` lock-free (@talex5 ocaml-multicore/eio#397 ocaml-multicore/eio#381).
  In addition to being faster, this allows using conditions in signal handlers.

- Make `Eio.Semaphore` lock-free (@talex5 @polytypic ocaml-multicore/eio#398).

- Make `Eio.Stream` lock-free when the capacity is zero (@talex5 ocaml-multicore/eio#413 ocaml-multicore/eio#411).

- Make `Eio.Promise` lock-free (@talex5 ocaml-multicore/eio#401).

Bug fixes:

- eio_linux: call `Uring.submit` as needed (@talex5 @bikallem ocaml-multicore/eio#428).
  Previously, we could fail to submit a job promptly because the SQE queue was full.

- Fix luv signals (@haesbaert ocaml-multicore/eio#412).
  `libuv` automatically retries polling if it gets `EINTR`, without giving OCaml signal handlers a chance to run.

- eio_luv: fix some resource leaks (@talex5 @patricoferris ocaml-multicore/eio#421).

- eio_luv: fix "unavailable signal" error on Windows (@talex5 ocaml-multicore/eio#420, reported by @nojb).

- Fix `Buf_write.BE.uint48` and `LE.uint48` (@adatario ocaml-multicore/eio#418).

Documentation:

- Add example programs (@talex5 ocaml-multicore/eio#389).

- Update network examples to use `run_server` (@talex5 ocaml-multicore/eio#417).

- Add a warning to the tutorial about `Fiber.first` (@talex5 ocaml-multicore/eio#394).

- Clarify the epoch used for `Eio.Time.now` (@bikallem ocaml-multicore/eio#395).

- Describe `secure_random` as an infinite source (@patricoferris ocaml-multicore/eio#426).

- Update README for OCaml 5 release (@talex5 ocaml-multicore/eio#384 ocaml-multicore/eio#391 ocaml-multicore/eio#393).

Other changes:

- Delay setting `SIGPIPE` handler until the `run` function is called (@talex5 ocaml-multicore/eio#420).

- Remove debug-level logging (@talex5 ocaml-multicore/eio#403).

- eio-luv: improve `process.md` test (@smondet ocaml-multicore/eio#414).

- Update to Dune 3 (@talex5 ocaml-multicore/eio#410).

- Remove test dependency on Astring (@talex5 ocaml-multicore/eio#402 ocaml-multicore/eio#404).

- Simplify cancellation logic (@talex5 ocaml-multicore/eio#396).

- time: `Mtime.Spand.to_s` has been deprecated in mtime 2.0.0 (@bikallem ocaml-multicore/eio#385).
talex5 added a commit to talex5/opam-repository that referenced this pull request Feb 1, 2023
CHANGES:

New features:

- Add `Eio.Net.run_server` (@bikallem @talex5 ocaml-multicore/eio#408).
  Runs an accept loop in one or more domains, with cancellation and graceful shutdown,
  and an optional maximum number of concurrent connections.

- Add `Buf_read.BE` and `LE` parsers (@Cjen1 ocaml-multicore/eio#399).
  Parse numbers in various binary formats.

- Add `Eio.Buf_read.uint8` (@talex5 ocaml-multicore/eio#418).

Performance:

- Make `Eio.Condition` lock-free (@talex5 ocaml-multicore/eio#397 ocaml-multicore/eio#381).
  In addition to being faster, this allows using conditions in signal handlers.

- Make `Eio.Semaphore` lock-free (@talex5 @polytypic ocaml-multicore/eio#398).

- Make `Eio.Stream` lock-free when the capacity is zero (@talex5 ocaml-multicore/eio#413 ocaml-multicore/eio#411).

- Make `Eio.Promise` lock-free (@talex5 ocaml-multicore/eio#401).

Bug fixes:

- eio_linux: call `Uring.submit` as needed (@talex5 @bikallem ocaml-multicore/eio#428).
  Previously, we could fail to submit a job promptly because the SQE queue was full.

- Fix luv signals (@haesbaert ocaml-multicore/eio#412).
  `libuv` automatically retries polling if it gets `EINTR`, without giving OCaml signal handlers a chance to run.

- eio_luv: fix some resource leaks (@talex5 @patricoferris ocaml-multicore/eio#421).

- eio_luv: fix "unavailable signal" error on Windows (@talex5 ocaml-multicore/eio#420, reported by @nojb).

- Fix `Buf_write.BE.uint48` and `LE.uint48` (@adatario ocaml-multicore/eio#418).

Documentation:

- Add example programs (@talex5 ocaml-multicore/eio#389).

- Update network examples to use `run_server` (@talex5 ocaml-multicore/eio#417).

- Add a warning to the tutorial about `Fiber.first` (@talex5 ocaml-multicore/eio#394).

- Clarify the epoch used for `Eio.Time.now` (@bikallem ocaml-multicore/eio#395).

- Describe `secure_random` as an infinite source (@patricoferris ocaml-multicore/eio#426).

- Update README for OCaml 5 release (@talex5 ocaml-multicore/eio#384 ocaml-multicore/eio#391 ocaml-multicore/eio#393).

Other changes:

- Delay setting `SIGPIPE` handler until the `run` function is called (@talex5 ocaml-multicore/eio#420).

- Remove debug-level logging (@talex5 ocaml-multicore/eio#403).

- eio-luv: improve `process.md` test (@smondet ocaml-multicore/eio#414).

- Update to Dune 3 (@talex5 ocaml-multicore/eio#410).

- Remove test dependency on Astring (@talex5 ocaml-multicore/eio#402 ocaml-multicore/eio#404).

- Simplify cancellation logic (@talex5 ocaml-multicore/eio#396).

- time: `Mtime.Spand.to_s` has been deprecated in mtime 2.0.0 (@bikallem ocaml-multicore/eio#385).
talex5 added a commit to talex5/opam-repository that referenced this pull request Feb 1, 2023
CHANGES:

New features:

- Add `Eio.Net.run_server` (@bikallem @talex5 ocaml-multicore/eio#408).
  Runs an accept loop in one or more domains, with cancellation and graceful shutdown,
  and an optional maximum number of concurrent connections.

- Add `Buf_read.BE` and `LE` parsers (@Cjen1 ocaml-multicore/eio#399).
  Parse numbers in various binary formats.

- Add `Eio.Buf_read.uint8` (@talex5 ocaml-multicore/eio#418).

Performance:

- Make `Eio.Condition` lock-free (@talex5 ocaml-multicore/eio#397 ocaml-multicore/eio#381).
  In addition to being faster, this allows using conditions in signal handlers.

- Make `Eio.Semaphore` lock-free (@talex5 @polytypic ocaml-multicore/eio#398).

- Make `Eio.Stream` lock-free when the capacity is zero (@talex5 ocaml-multicore/eio#413 ocaml-multicore/eio#411).

- Make `Eio.Promise` lock-free (@talex5 ocaml-multicore/eio#401).

Bug fixes:

- eio_linux: call `Uring.submit` as needed (@talex5 @bikallem ocaml-multicore/eio#428).
  Previously, we could fail to submit a job promptly because the SQE queue was full.

- Fix luv signals (@haesbaert ocaml-multicore/eio#412).
  `libuv` automatically retries polling if it gets `EINTR`, without giving OCaml signal handlers a chance to run.

- eio_luv: fix some resource leaks (@talex5 @patricoferris ocaml-multicore/eio#421).

- eio_luv: fix "unavailable signal" error on Windows (@talex5 ocaml-multicore/eio#420, reported by @nojb).

- Fix `Buf_write.BE.uint48` and `LE.uint48` (@adatario ocaml-multicore/eio#418).

Documentation:

- Add example programs (@talex5 ocaml-multicore/eio#389).

- Update network examples to use `run_server` (@talex5 ocaml-multicore/eio#417).

- Add a warning to the tutorial about `Fiber.first` (@talex5 ocaml-multicore/eio#394).

- Clarify the epoch used for `Eio.Time.now` (@bikallem ocaml-multicore/eio#395).

- Describe `secure_random` as an infinite source (@patricoferris ocaml-multicore/eio#426).

- Update README for OCaml 5 release (@talex5 ocaml-multicore/eio#384 ocaml-multicore/eio#391 ocaml-multicore/eio#393).

Other changes:

- Delay setting `SIGPIPE` handler until the `run` function is called (@talex5 ocaml-multicore/eio#420).

- Remove debug-level logging (@talex5 ocaml-multicore/eio#403).

- eio-luv: improve `process.md` test (@smondet ocaml-multicore/eio#414).

- Update to Dune 3 (@talex5 ocaml-multicore/eio#410).

- Remove test dependency on Astring (@talex5 ocaml-multicore/eio#402 ocaml-multicore/eio#404).

- Simplify cancellation logic (@talex5 ocaml-multicore/eio#396).

- time: `Mtime.Spand.to_s` has been deprecated in mtime 2.0.0 (@bikallem ocaml-multicore/eio#385).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants