-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC 2: Add Fiber::ExecutionContext::MultiThreaded #15517
base: master
Are you sure you want to change the base?
RFC 2: Add Fiber::ExecutionContext::MultiThreaded #15517
Conversation
Introduces the second EC scheduler that runs in multiple threads. Uses the thread-safe queues (Runnables, GlobalQueue). Contrary to the ST scheduler, the MT scheduler needs to actively park the thread in addition to waiting on the event loop, because only one thread is allowed to run the event loop.
f5e466e
to
7bcff33
Compare
# Starts a context with a maximum number of threads. Threads aren't started | ||
# right away, but will be started as needed to increase parallelism up to | ||
# the configured maximum. | ||
def self.new(name : String, size : Range(Nil, Int32)) : self | ||
new(name, 0..size.end, hijack: false) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: This overload feels odd because it's semantically equivalent to the next one (or rather a subset), with the only exception of representing the minimum number 0
as nil
.
So technically I think this shouldn't need a separate overload, just a value conversion (this would be easy with clamping from #15106).
Merging both would be a bit rough on the type restriction of size
.
Although the current overloads don't account for a nilable lower bound (Range(Int32?, Int32)
). So I guess it's all a bit of a mess. (this would be easy with bounded free variables such as size : Range(T, Int32) forall T <= Int32?
, see #3803).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could simplify on a single method with size : Range
type restriction and manually coalesce nil
to 0
on the range's begin?
# Starts a context with a maximum number of threads. Threads aren't started | ||
# right away, but will be started as needed to increase parallelism up to | ||
# the configured maximum. | ||
def self.new(name : String, size : Int32) : self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Rename the parameter to make clear it's the maximum size, not a fixed size.
def self.new(name : String, size : Int32) : self | |
def self.new(name : String, max_size : Int32) : self |
protected def self.default(size : Int32) : self | ||
new("DEFAULT", 1..size, hijack: true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: ditto
protected def self.default(size : Int32) : self | |
new("DEFAULT", 1..size, hijack: true) | |
protected def self.default(max_size : Int32) : self | |
new("DEFAULT", 1..max_size, hijack: true) |
# right away, but will be started as needed to increase parallelism up to | ||
# the configured maximum. | ||
def self.new(name : String, size : Int32) : self | ||
new(name, 0..size, hijack: false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue: The range coalesce isn't entirely correct. It does not account for exclusive range.
A safe implementation should be trivial with #15106.
private def start_schedulers(hijack) | ||
@size.end.times do |index| | ||
@schedulers << Scheduler.new(self, "#{@name}-#{index}") | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: The hijack parameter doesn't seem to have any purpose in this method. Should we drop it?
@size.begin.times do |index| | ||
scheduler = @schedulers[index] | ||
|
||
if hijack && index == 0 | ||
@threads << hijack_current_thread(scheduler, index) | ||
else | ||
@threads << start_thread(scheduler, index) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Simplify loop logic.
@size.begin.times do |index| | |
scheduler = @schedulers[index] | |
if hijack && index == 0 | |
@threads << hijack_current_thread(scheduler, index) | |
else | |
@threads << start_thread(scheduler, index) | |
end | |
end | |
offset = 0 | |
if hijack | |
@threads << hijack_current_thread(@schedulers[0], 0) | |
offset = 1 | |
end | |
offset.upto(@size.begin) do |index| | |
@threads << start_thread(@schedulers[index], index) | |
end |
private def start_schedulers(hijack) | ||
@size.end.times do |index| | ||
@schedulers << Scheduler.new(self, "#{@name}-#{index}") | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Why do we initialize all schedulers from the start? We only start the minimal number of threads. And might never actually need all schedulers when the number of threads doesn't reach the max.
Could we lazily initialize?
|
||
# Starts a new `Thread` and attaches *scheduler*. Runs the scheduler loop | ||
# directly in the thread's main `Fiber`. | ||
private def start_thread(scheduler, index) : Thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: scheduler
is @schedulers[index]
in all calls.
Perhaps we could simplify that, passing only the index and accessing @schedulers
here.
# Picks a scheduler at random then iterates all schedulers to try to steal | ||
# fibers from. | ||
protected def steal(& : Scheduler ->) : Nil | ||
return if size == 1 | ||
|
||
i = @rng.next_int | ||
n = @schedulers.size | ||
|
||
n.times do |j| | ||
if scheduler = @schedulers[(i &+ j) % n]? | ||
yield scheduler | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: This would also yield the current scheduler. It makes no sense to try to steal from yourself.
I noticed this case is handled at the call site, where the current scheduler is directly accessible.
But this makes me wonder whether this method should be an internal helper method in Scheduler
instead. It only deals with schedulers and it's only called from Scheduler
. So I don't think there's a compelling reason why it should be on ExecutionContext
. We can access the ExecutionContext
's instance variables from Scheduler
.
Introduces the LAST EC scheduler that runs in multiple thread, with work stealing so any thread can resume any runnable fiber in the context (no more starving threads).
Unlike the ST scheduler, the MT scheduler needs to actively park threads since only one thread in the context can run the event loop (no parallel runs).
Having a single event loop for the whole context instead of having one per thread avoids situations where fibers would wait in an event loop but won't be processed because this thread happens to be busy, causing delays. With a single event loop, as soon as a thread is starving it can check the event loop and enqueue runnable fibers, that can be immediately resumed (and stolen).
NOTE: we can start running the specs in this context though they can segfault sometimes. Maybe because of some issues in spec helpers that used to expect fibers not switching, or maybe of issues in the stdlib for the same reason (for example libxml).
Kept in draft until #15511 and #15513 are merged.refs #15342