-
-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Suggested API change #237
Comments
I will think about this, but just as a short term idea, the mitigation can be simplified like this: def process_jobs(jobs, parent: Async::Task.current)
jobs
.map { parent.async { _1.process! } }
.map(&:wait)
end This has almost identical behaviour but is the correct way to establish "Run in a parent context". https://github.com/socketry/async/blob/main/lib/async/task.rb#L236-L238 Maybe we can adjust the error message to be a bit more meaningful. Additionally, maybe we can document this more clearly. The reason why this is a good idea is because it enables you to control the scope of the child tasks (e.g. using a semaphore, or some other control structure). Regarding your proposal, how does this work: Async.run do
Async.run do # (1) Is this an error?
end
Async do # (2) This isn't an error?
end
end
Async do # (3) This is an error?
end
Sync do # (4) What about this?
end |
Haha, I was going to ask you the same thing! Is it supported today to start a reactor nested inside another one? Whatever the answer, we can support it. The nice thing about not overloading is that we can be precise in either allowing it or disallowing it. |
Yes, that is the error we want to catch (after Phase 3). It represents code that wants to be concurrent--and assumes that its caller has spun up a surrounding reactor--but the caller forgot to, or didn't understand that they needed to. This is an easy mistake to make, especially in tests. |
I'm a bit fuzzy on |
Sync do
# It creates a reactor if required, otherwise it just executes the block.
end The point of |
Apologies because I think I'm misunderstanding the issue as initially described, but would using a top level |
I think a top level If they don't know, then the problem is, some internal I think that as I outlined on the linked PR, I don't think it should be an error - if the user calls the top level function, probably it should have a top level i.e. your proposal is, "Is this acceptable?": def process_jobs(jobs)
Sync do
jobs
.map { Async { _1.process! } }
.map(&:wait)
end
end And I wonder if it is or not. |
I just read up on
These tell me that So my proposal logically extends here. The quoted sentences above would simplify to:
|
BTW I spotted a mistake in my initial message for the exception. I had omitted the
|
To test my understanding: I find the "don't need concurrency" point confusing. Both offer concurrency inside their block, right? Concurrency with sibling blocks depends on what is enclosing this block and sibling blocks. Which gets us back to the importance, IMO, of using distinct APIs for these cases: spinning up a reactor vs. writing concurrent code inside a reactor. |
@ioquatix Any thoughts on the above? ^^^ |
@trevorturk Now that I understand The confusion arises from Async/Sync { ... } each being overloaded:
When writing concurrent code (case 2), it is necessary that it be surrounded by a reactor (case 1). If that is missing, case 2 will be misconstrued as case 1, and it won't be concurrent at all. This is an easy mistake to make, especially when writing tests. To address this risk, we like to follow Design by Contract, which in this case means that concurrent code (case 2) should ensure that there is a surrounding reactor before it proceeds. Or, here's another way to restate the problem at a high level:
|
Thank you for the detailed explanation! First, I think this helps clarify Anyway, putting aside Async { puts "starting 1"; sleep 1; puts "1" }
Async { puts "starting 2"; sleep 2; puts "2" } ...produces:
..whereas: Async do
Async { puts "starting 1"; sleep 1; puts "1" }
Async { puts "starting 2"; sleep 2; puts "2" }
end ...produces:
So, the issue is that, in the first example, you're not running the tasks concurrently and there is no warning/error, so it's an easy mistake to make. If I'm understanding this correctly, I wonder if logging a warning might be enough... or perhaps having a global "strict" setting which would turn those warnings into errors? I see you're discussing more in-depth and perhaps better solutions here, but maybe there's room for a less controversial incremental change? (I don't have a strong opinion here, I'm just trying to follow along, and hopefully this might be helpful in your discussion, and/or for future people stumbling across this issue!) |
Thanks @trevorturk! That's a perfect illustration. It's exactly the trap that everyone on my team has stepped into at different times. It's especially easy to have happen with specs. Let's suppose your code were in a method: def sleep_1_and_2
Async { puts "starting 1"; sleep 1; puts "1" }
Async { puts "starting 2"; sleep 2; puts "2" }
end The full program might correctly start the reactor around it: Async do
...
sleep_1_and_2
...
end But the specs are often testing the pieces as independently as possible: RSpec.describe 'sleep_1_and_2' do
it 'calls sleep twice' do
expect(Kernel).to receive(:sleep).with(1)
expect(Kernel).to receive(:sleep).with(2)
sleep_1_and_2
end
end The above spec will pass, but the code is not actually running concurrently as designed! In my Proposal, the above mistake would cause a helpful exception to be raised: Async::ReactorNotRunning: You must start a reactor first with `Async.run { ... }` surrounding this code. and that would explain to us exactly what we needed to change to honor the contract: RSpec.describe 'sleep_1_and_2' do
it 'calls sleep twice' do
expect(Kernel).to receive(:sleep).with(1)
expect(Kernel).to receive(:sleep).with(2)
Async.run do
sleep_1_and_2
end
end
end |
Or, given that the contract of Async.run_reactor do
sleep_1_and_2
end or Async.with_reactor do
sleep_1_and_2
end or other variation. |
Yes, that's where I am too. I don't need a shorthand for Also, I think it is best practice for any gem |
I realized while discussing with my team that this documentation for Currently it says
To be more accurate, it could say
The difference in the scope of the reactor is why it's so easy to write concurrent code then accidentally run it non-concurrently. The fix I'm proposing would simplify the semantics to
Async::ReactorNotRunning: You must start a reactor first with `Async.run { ... }` surrounding this code. which will then lead the developer to decide exactly the reactor boundary they want for concurrency. |
I'm happy to hear I'm understanding the issue correctly now, and I hope that example helps clarify for others! I think we're discussing a few issues (which might be best converted into GitHub Discussions?) but I'll try to summarize anyway:
FWIW I'd be in favor of this, just because As an aside, I believe I read at some point that Samuel was invited to put the
To my mind, if we're considering adding At least for today, I think I understand what you're trying to accomplish, and I think it's a noble effort and a good idea. I'm not sure I think it's worth breaking the API and making things more verbose, though, and I still wonder if an optional warning/exception might serve the purpose, if we could set aside the "overloading" issue. (I think the "overloading" issue is valid and worth discussing, I just don't know how big of an issue it is.) I don't think this will be appealing, but FWIW I wanted to point out that the Alba gem (which I'm a big fan of) uses global configs that you might put in an initializer, for example here: https://github.com/okuramasafumi/alba/#encoder-configuration -- this is the kind of thing I was imagining when thinking of an opt-in config, like Sorry for the long post, and I hope I'm not derailing things too badly. I don't have any strong feelings about this, honestly, I just wanted to air out some ideas and figured why not! 😄 |
Regarding this example:
I agree, the way it's explained in this issue, this could be considered confusing. However, there is no expectation that two async blocks run concurrently together. t1 = Async { puts "starting 1"; sleep 1; puts "1" }
t2 = Async { puts "starting 2"; sleep 2; puts "2" }
t1.wait
t2.wait This code has the same result no matter whether it's run in an event loop or not. However, those tasks do have side effects and that's what's showing up differently. If what you want is for those tasks to run asynchronously you should always mark the outer part, e.g. def sleep_1_and_2
Sync do
Async { puts "starting 1"; sleep 1; puts "1" }
Async { puts "starting 2"; sleep 2; puts "2" }
end
end or def sleep_1_and_2(parent: Async::Task.current)
parent.async { puts "starting 1"; sleep 1; puts "1" }
parent.async { puts "starting 2"; sleep 2; puts "2" }
end Regarding the proposed implementation: Async.run do
process_jobs(jobs)
end To me this is a bit ugly - why should the user care about whether def process_jobs(jobs)
jobs.map do |job|
Thread.new{job.process}
end.map(&:value)
end
values = process_jobs(jobs) That way, the def process_jobs(jobs)
Sync do |task|
jobs.map do |job|
task.async{job.process}
end.map(&:wait)
end
end
values = process_jobs(jobs) Later on, if someone wants to process several batches of jobs: Sync do |task|
batches.map do |jobs|
task.async do
values = process_jobs(jobs)
puts "Finished batch: #{values}"
end
end
end Your proposal makes this kind of retroactive application of def process_jobs(jobs)
Async::Barrier do |barrier|
jobs.map do |job|
barrier.async{job.process}
end.map(&:wait)
end # implicit stop / kill of jobs that leak here.
end The fiber scheduler def fetch_urls(urls)
unless Fiber.scheduler
Fiber.set_scheduler(Scheduler.new)
begin
fetch_urls(urls)
ensure
Fiber.set_scheduler(nil)
end
end
urls.each do |url|
Fiber.schedule{Net::HTTP.get(url)}
end
end Note that https://github.com/socketry/async/blob/v1.10.0/lib/async.rb#L25-L30 Async.run do |task|
task.async # the only way to create child tasks
end The initial design did not have I personally found Bare |
I'd rather not say it "has the same result" if it runs serially rather than concurrently. The
Async.run do
process_jobs(jobs)
end
This is a contract question.
Let me stop there and see if I have (3) right. You show this example: def process_jobs(jobs)
Sync do |task|
jobs.map do |job|
task.async{job.process}
end.map(&:wait)
end
end In the event that no Reactor is run around |
You can't avoid having at least one task, whether it's in this method or outside. However, if it's already in an event loop, it won't create a task.
The # Only one task is created, the root one, along with the event loop:
Sync{Sync{Sync{sleep 1}}
# The same, only one root task created:
Async{Sync{Sync{sleep 1}}
Fair enough, but in my mind, concurrency is never guaranteed, some event loop design may run everything sequentially and it should, in theory, still work correctly, even if the actual wall clock performance is bad. There are a multitude of different design decisions in schedulers, like optimistic (immediately start tasks) vs pessimistic (schedule tasks to start on the next iteration), throughput vs latency scheduling (prefer a single task running for a long time, or prioritise switching between tasks), etc. Over time, more and more blocking operations will be supported by the fiber scheduler, e.g. In fact, some degenerate testing systems can deliberately limit fiber scheduling to detect race conditions and deadlocks by deliberately invoking every possible task order and interleaving of every non-blocking operation.. I believe the For example, def process_jobs(jobs)
Async::Task.current? or raise ArgumentError, "must start up reactor first with surrounding 'Async { ... }"
jobs
.map { Async { _1.process! } }
.map(&:wait)
end
# Process hung for 1 hour, now there are 1 million jobs:
process_jobs(job_server.all_pending_jobs) It can be very tricky to write correct, general code that uses a semaphore without causing deadlocks, especially in systems with constrained resources. I suppose my point is, while the goal of Async is to provide concurrency, it won't always be possible and you might encounter system limitations or resource limitations.
Then I don't see why this should be the responsibility of the caller, because if that's true, invariably someone will eventually forget and the performance will be bad. In my proposed We can make this really easily, i.e. in the class MyJobProcessor
include Async::Await
sync def process_jobs(jobs)
# ...
end
end The purpose of the |
If there was always a reactor, Async { puts "starting 1"; sleep 1; puts "1" }
Async { puts "starting 2"; sleep 2; puts "2" }
Async do
Async { puts "starting 1"; sleep 1; puts "1" }
Async { puts "starting 2"; sleep 2; puts "2" }
end would both output:
A agree that having to use something like Could there always be a reactor, whenever someone does |
In short, unfortunately this is impossible. However, we could introduce something like
Such a design is feasible, however I'm not sure of it's value. I do use the following alias which is similar:
The |
Hmm. Can Ruby be updated to make it possible...? Easy for me to ask of course :-) A separate |
Essentially what you are asking is for the Ruby interpreter's user code to be run inside an async block. It could change too much behaviour. Rather than changing Ruby, it's better to write code that works correctly regardless of whether it's used in an event loop (fiber scheduler). IMHO, this is best for everyone, because existing application servers like Sidekiq, Unicorn and Puma can continue to work correctly, while your application gains concurrency. My goal has always been for asynchronous execution to be transparent to existing sequential code. It's true this can't be achieved for the top level Ruby script without some kind of # async/default_scheduler.rb
Fiber.set_scheduler(Async::Scheduler.new)
# user_code.rb
require 'async/default_scheduler'
Fiber.schedule do
# ... etc
end Something like that is possible, but you loose a lot of the nice scope that is provided by |
Good points, thank for you taking the time to reply. |
Problem Statement
Recently I've worked with a team of ~6 developers converting two services from
EventMachine
+Synchrony
toAsync
. That has gone well, but one detail of theAsync
API has tripped us all up at first and in some cases, more than once.The confusion arises from
Async { ... }
being overloaded:Async { ... }
starts up the reactor that must surround the concurrent code.Async { ... }
creates a concurrentAsync::Task
within the reactor.When writing concurrent code (case 2), it is necessary that it be surrounded by a reactor (case 1). If that is missing, case 2 will be misconstrued as case 1, and it won't be concurrent at all. This is an easy mistake to make, especially when writing tests.
Mitigation
To address this risk, we like to follow Design by Contract, which in this case means that concurrent code (case 2) should ensure that there is a surrounding reactor before it proceeds. Here's how we've done that, crudely:
The wrapper code that creates the reactor looks like:
Proposal
An ideal solution would separate the overloaded cases so that the contract can be implicitly enforced by the
Async
gem without clients needing to write manual code each time.Since case 2 appears much more frequently in code than case 1, let's leave case 2 as-is, but change case 1 to
Async.run { ... }
. So the wrapper for the above code would read:With this contract, case 2 could automatically raise an exception on this line if it is run without a surrounding reactor:
Migration
We obviously would not want to introduce this proposal as a breaking change. Instead we could phase it in:
Phase 1
Introduce
Async.run
as a preferred but optional way to create the reactor at the outer level. Update all documentation to show it as the preferred way to create the reactor.Phase 2
Deprecate
Async { ... }
being called without a surrounding reactor. In the deprecation warning, indicate that it should be written asAsync.run { ... }
if it is the outer level that is creating the reactor.Phase 3
Complete the deprecation: If
Async { ... }
is called without a surrounding reactor, raise theAsync::ReactorNotRunning
exception as shown above.The text was updated successfully, but these errors were encountered: