Rename Workbucket's active flag to open, add enable flag#1373
Rename Workbucket's active flag to open, add enable flag#1373qinsoon merged 7 commits intommtk:masterfrom
Conversation
e49253d to
c1489fe
Compare
wks
left a comment
There was a problem hiding this comment.
The algorithm should be OK because we haven't actually added concurrent GC, and I think this PR is compatible with existing plans.
One problem is that the newly introduced terms "enabled/disabled" is somewhat confusing when used together with "active/inactive" or "activated/deactivated". The doc comment of WorkBucket::deactivate even says "disable the bucket". I suggest renaming "active" to "open". Then "open/closed" means if the bucket is open during a GC as a result of, for example, draining previous buckets. Meanwhile, "enabled/disabled" means whether we hide a bucket from mutators as if the bucket doesn't exist.
| !self.is_concurrent() | ||
| } | ||
|
|
||
| /// Is this stage concurrent (which may be executedd during mutator time)? |
There was a problem hiding this comment.
| /// Is this stage concurrent (which may be executedd during mutator time)? | |
| /// Is this stage concurrent (which may be executed during mutator time)? |
| self.is_disabled() || (self.is_activated() && self.is_empty()) | ||
| } | ||
|
|
||
| /// Disable the bucket |
There was a problem hiding this comment.
This comment says "disable the bucket"
| let mut items = Vec::new(); | ||
|
|
||
| { | ||
| // Drain queue by stealing until empty | ||
| loop { | ||
| match self.queue.steal() { | ||
| crossbeam::deque::Steal::Success(work) => { | ||
| items.push(work); | ||
| } | ||
| crossbeam::deque::Steal::Retry => continue, | ||
| crossbeam::deque::Steal::Empty => break, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Format collected items (just type names or Debug, depending on GCWork) | ||
| let debug_items: Vec<String> = items | ||
| .iter() | ||
| .map(|i| i.get_type_name().to_string()) // placeholder since GCWork isn’t Debug | ||
| .collect(); | ||
|
|
||
| // Push items back into the queue | ||
| { | ||
| for work in items { | ||
| self.queue.push(work); | ||
| } | ||
| } | ||
|
|
||
| f.debug_struct("BucketQueue") | ||
| .field("queue", &debug_items) | ||
| .finish() | ||
| } | ||
| } |
There was a problem hiding this comment.
This part drains the bucket to print the packets, and re-fill the packets. If this happens while some GC workers are fetching packets, the other worker may erroneously think the bucket is already drained.
If we really want to debug work buckets in such a detail, we should add a dedicated method, such as BucketQueue::debug_dump_packets. The Debug trait should be side-effect-free.
| } | ||
| if let Some(id) = error_example { | ||
| panic!("Some active buckets (such as {:?}) are not empty.", id); | ||
| panic!("Some open buckets (such as {:?}) are not empty.", id); |
| continue; | ||
| } | ||
| debug!("Checking if {:?} can be opened...", id); | ||
| let bucket_opened = bucket.update(self); |
There was a problem hiding this comment.
This line also says "opened".
|
|
||
| pub struct WorkBucket<VM: VMBinding> { | ||
| /// Whether this bucket has been opened. Work from an open bucket can be fetched by workers. | ||
| active: AtomicBool, |
| pub const fn is_always_activated(&self) -> bool { | ||
| matches!(self, WorkBucketStage::Unconstrained) | ||
| } |
There was a problem hiding this comment.
| pub const fn is_always_activated(&self) -> bool { | |
| matches!(self, WorkBucketStage::Unconstrained) | |
| } | |
| pub const fn is_always_open(&self) -> bool { | |
| matches!(self, WorkBucketStage::Unconstrained) | |
| } |
| } | ||
|
|
||
| /// Is this stage activated by default? | ||
| pub const fn is_activated_by_default(&self) -> bool { |
There was a problem hiding this comment.
| pub const fn is_activated_by_default(&self) -> bool { | |
| pub const fn is_open_by_default(&self) -> bool { |
| pub const fn is_disabled_by_default(&self) -> bool { | ||
| matches!(self, WorkBucketStage::Concurrent) | ||
| } |
There was a problem hiding this comment.
| pub const fn is_disabled_by_default(&self) -> bool { | |
| matches!(self, WorkBucketStage::Concurrent) | |
| } | |
| pub const fn is_enabled_by_default(&self) -> bool { | |
| !matches!(self, WorkBucketStage::Concurrent) | |
| } |
| pub(crate) fn new(stage: WorkBucketStage, active: bool, monitor: Arc<WorkerMonitor>) -> Self { | ||
| Self { | ||
| active: AtomicBool::new(active), |
There was a problem hiding this comment.
If a stage knows if the bucket at that stage is open by default, we can just initialize it here.
| pub(crate) fn new(stage: WorkBucketStage, active: bool, monitor: Arc<WorkerMonitor>) -> Self { | |
| Self { | |
| active: AtomicBool::new(active), | |
| pub(crate) fn new(stage: WorkBucketStage, monitor: Arc<WorkerMonitor>) -> Self { | |
| Self { | |
| active: AtomicBool::new(stage.is_open_by_default()), |
1a3e5ea to
0f64b73
Compare
|
@wks I have made changes as suggested. Can you review again? |
There was a problem hiding this comment.
There are still other places in the comments that mention "activated buckets". We should grep the word "activate" and change it to "open" or "opened", depending on the context.
We should update the pull request description #1373 (comment), too, as it will become the commit log.
Done. |
wks
left a comment
There was a problem hiding this comment.
LGTM.
I also pushed some pedantic changes and synchronized with master. You can merge this PR when you think it is appropriate.
This PR refactors work bucket to allow disabling a bucket and to support concurrent GC.
WorkBucket::enabled: if a bucket is disabled, it behaves the same as if the bucket/stage does not exist for scheduling, except that users can add work to a disabled bucket. There are mostly two use cases for this: 1. a plan can disable buckets that the plan does not use, and 2. a concurrent plan may push concurrent work to the disabled concurrent bucket during STW, and enable the bucket at the end of a STW.WorkBucketStage::Concurrent: this is unused in this PR.scheduler.rs, and move them towork_bucket.rs.BasePlan.