Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
8231672: Simplify the reference processing parallelization framework
Reviewed-by: tschatzl, ayang
  • Loading branch information
lkorinth committed May 19, 2021
1 parent 392f962 commit 6ef46ce
Show file tree
Hide file tree
Showing 20 changed files with 380 additions and 724 deletions.
154 changes: 32 additions & 122 deletions src/hotspot/share/gc/g1/g1CollectedHeap.cpp
Expand Up @@ -3051,7 +3051,7 @@ bool G1ParEvacuateFollowersClosure::offer_termination() {
EventGCPhaseParallel event;
G1ParScanThreadState* const pss = par_scan_state();
start_term_time();
const bool res = terminator()->offer_termination();
const bool res = (terminator() == nullptr) ? true : terminator()->offer_termination();
end_term_time();
event.commit(GCId::current(), pss->worker_id(), G1GCPhaseTimes::phase_name(G1GCPhaseTimes::Termination));
return res;
Expand Down Expand Up @@ -3182,99 +3182,35 @@ class G1STWDrainQueueClosure: public VoidClosure {
}
};

// Parallel Reference Processing closures

// Implementation of AbstractRefProcTaskExecutor for parallel reference
// processing during G1 evacuation pauses.

class G1STWRefProcTaskExecutor: public AbstractRefProcTaskExecutor {
private:
G1CollectedHeap* _g1h;
G1ParScanThreadStateSet* _pss;
G1ScannerTasksQueueSet* _queues;
WorkGang* _workers;

public:
G1STWRefProcTaskExecutor(G1CollectedHeap* g1h,
G1ParScanThreadStateSet* per_thread_states,
WorkGang* workers,
G1ScannerTasksQueueSet *task_queues) :
_g1h(g1h),
_pss(per_thread_states),
_queues(task_queues),
_workers(workers)
{
g1h->ref_processor_stw()->set_active_mt_degree(workers->active_workers());
}

// Executes the given task using concurrent marking worker threads.
virtual void execute(ProcessTask& task, uint ergo_workers);
};

// Gang task for possibly parallel reference processing

class G1STWRefProcTaskProxy: public AbstractGangTask {
typedef AbstractRefProcTaskExecutor::ProcessTask ProcessTask;
ProcessTask& _proc_task;
G1CollectedHeap* _g1h;
G1ParScanThreadStateSet* _pss;
G1ScannerTasksQueueSet* _task_queues;
TaskTerminator* _terminator;
class G1STWRefProcProxyTask : public RefProcProxyTask {
G1CollectedHeap& _g1h;
G1ParScanThreadStateSet& _pss;
TaskTerminator _terminator;
G1ScannerTasksQueueSet& _task_queues;

public:
G1STWRefProcTaskProxy(ProcessTask& proc_task,
G1CollectedHeap* g1h,
G1ParScanThreadStateSet* per_thread_states,
G1ScannerTasksQueueSet *task_queues,
TaskTerminator* terminator) :
AbstractGangTask("Process reference objects in parallel"),
_proc_task(proc_task),
_g1h(g1h),
_pss(per_thread_states),
_task_queues(task_queues),
_terminator(terminator)
{}

virtual void work(uint worker_id) {
// The reference processing task executed by a single worker.
ResourceMark rm;

G1STWIsAliveClosure is_alive(_g1h);

G1ParScanThreadState* pss = _pss->state_for_worker(worker_id);
pss->set_ref_discoverer(NULL);

// Keep alive closure.
G1CopyingKeepAliveClosure keep_alive(_g1h, pss);

// Complete GC closure
G1ParEvacuateFollowersClosure drain_queue(_g1h, pss, _task_queues, _terminator, G1GCPhaseTimes::ObjCopy);
G1STWRefProcProxyTask(uint max_workers, G1CollectedHeap& g1h, G1ParScanThreadStateSet& pss, G1ScannerTasksQueueSet& task_queues)
: RefProcProxyTask("G1STWRefProcProxyTask", max_workers),
_g1h(g1h),
_pss(pss),
_terminator(max_workers, &task_queues),
_task_queues(task_queues) {}

// Call the reference processing task's work routine.
_proc_task.work(worker_id, is_alive, keep_alive, drain_queue);
void work(uint worker_id) override {
assert(worker_id < _max_workers, "sanity");
uint index = (_tm == RefProcThreadModel::Single) ? 0 : worker_id;
_pss.state_for_worker(index)->set_ref_discoverer(nullptr);
G1STWIsAliveClosure is_alive(&_g1h);
G1CopyingKeepAliveClosure keep_alive(&_g1h, _pss.state_for_worker(index));
G1ParEvacuateFollowersClosure complete_gc(&_g1h, _pss.state_for_worker(index), &_task_queues, _tm == RefProcThreadModel::Single ? nullptr : &_terminator, G1GCPhaseTimes::ObjCopy);
_rp_task->rp_work(worker_id, &is_alive, &keep_alive, &complete_gc);
}

// Note we cannot assert that the refs array is empty here as not all
// of the processing tasks (specifically phase2 - pp2_work) execute
// the complete_gc closure (which ordinarily would drain the queue) so
// the queue may not be empty.
void prepare_run_task_hook() override {
_terminator.reset_for_reuse(_queue_count);
}
};

// Driver routine for parallel reference processing.
// Creates an instance of the ref processing gang
// task and has the worker threads execute it.
void G1STWRefProcTaskExecutor::execute(ProcessTask& proc_task, uint ergo_workers) {
assert(_workers != NULL, "Need parallel worker threads.");

assert(_workers->active_workers() >= ergo_workers,
"Ergonomically chosen workers (%u) should be less than or equal to active workers (%u)",
ergo_workers, _workers->active_workers());
TaskTerminator terminator(ergo_workers, _queues);
G1STWRefProcTaskProxy proc_task_proxy(proc_task, _g1h, _pss, _queues, &terminator);

_workers->run_task(&proc_task_proxy, ergo_workers);
}

// End of weak reference support closures

void G1CollectedHeap::process_discovered_references(G1ParScanThreadStateSet* per_thread_states) {
Expand All @@ -3283,53 +3219,27 @@ void G1CollectedHeap::process_discovered_references(G1ParScanThreadStateSet* per
ReferenceProcessor* rp = _ref_processor_stw;
assert(rp->discovery_enabled(), "should have been enabled");

// Closure to test whether a referent is alive.
G1STWIsAliveClosure is_alive(this);

// Even when parallel reference processing is enabled, the processing
// of JNI refs is serial and performed serially by the current thread
// rather than by a worker. The following PSS will be used for processing
// JNI refs.

// Use only a single queue for this PSS.
G1ParScanThreadState* pss = per_thread_states->state_for_worker(0);
pss->set_ref_discoverer(NULL);
assert(pss->queue_is_empty(), "pre-condition");

// Keep alive closure.
G1CopyingKeepAliveClosure keep_alive(this, pss);

// Serial Complete GC closure
G1STWDrainQueueClosure drain_queue(this, pss);

// Setup the soft refs policy...
rp->setup_policy(false);

ReferenceProcessorPhaseTimes* pt = phase_times()->ref_phase_times();
ReferenceProcessorPhaseTimes& pt = *phase_times()->ref_phase_times();

ReferenceProcessorStats stats;
if (!rp->processing_is_mt()) {
// Serial reference processing...
stats = rp->process_discovered_references(&is_alive,
&keep_alive,
&drain_queue,
NULL,
pt);
} else {
uint no_of_gc_workers = workers()->active_workers();
uint no_of_gc_workers = workers()->active_workers();

// Parallel reference processing
assert(no_of_gc_workers <= rp->max_num_queues(),
"Mismatch between the number of GC workers %u and the maximum number of Reference process queues %u",
no_of_gc_workers, rp->max_num_queues());
// Parallel reference processing
assert(no_of_gc_workers <= rp->max_num_queues(),
"Mismatch between the number of GC workers %u and the maximum number of Reference process queues %u",
no_of_gc_workers, rp->max_num_queues());

G1STWRefProcTaskExecutor par_task_executor(this, per_thread_states, workers(), _task_queues);
stats = rp->process_discovered_references(&is_alive,
&keep_alive,
&drain_queue,
&par_task_executor,
pt);
}
rp->set_active_mt_degree(no_of_gc_workers);
G1STWRefProcProxyTask task(rp->max_num_queues(), *this, *per_thread_states, *_task_queues);
stats = rp->process_discovered_references(task, pt);

_gc_tracer_stw->report_gc_reference_stats(stats);

Expand Down
106 changes: 22 additions & 84 deletions src/hotspot/share/gc/g1/g1ConcurrentMark.cpp
Expand Up @@ -1458,71 +1458,34 @@ class G1CMDrainMarkingStackClosure : public VoidClosure {
}
};

// Implementation of AbstractRefProcTaskExecutor for parallel
// reference processing at the end of G1 concurrent marking

class G1CMRefProcTaskExecutor : public AbstractRefProcTaskExecutor {
private:
G1CollectedHeap* _g1h;
G1ConcurrentMark* _cm;
WorkGang* _workers;
uint _active_workers;
class G1CMRefProcProxyTask : public RefProcProxyTask {
G1CollectedHeap& _g1h;
G1ConcurrentMark& _cm;

public:
G1CMRefProcTaskExecutor(G1CollectedHeap* g1h,
G1ConcurrentMark* cm,
WorkGang* workers,
uint n_workers) :
_g1h(g1h), _cm(cm),
_workers(workers), _active_workers(n_workers) { }

virtual void execute(ProcessTask& task, uint ergo_workers);
};

class G1CMRefProcTaskProxy : public AbstractGangTask {
typedef AbstractRefProcTaskExecutor::ProcessTask ProcessTask;
ProcessTask& _proc_task;
G1CollectedHeap* _g1h;
G1ConcurrentMark* _cm;
G1CMRefProcProxyTask(uint max_workers, G1CollectedHeap& g1h, G1ConcurrentMark &cm)
: RefProcProxyTask("G1CMRefProcProxyTask", max_workers),
_g1h(g1h),
_cm(cm) {}

public:
G1CMRefProcTaskProxy(ProcessTask& proc_task,
G1CollectedHeap* g1h,
G1ConcurrentMark* cm) :
AbstractGangTask("Process reference objects in parallel"),
_proc_task(proc_task), _g1h(g1h), _cm(cm) {
ReferenceProcessor* rp = _g1h->ref_processor_cm();
assert(rp->processing_is_mt(), "shouldn't be here otherwise");
void work(uint worker_id) override {
assert(worker_id < _max_workers, "sanity");
G1CMIsAliveClosure is_alive(&_g1h);
uint index = (_tm == RefProcThreadModel::Single) ? 0 : worker_id;
G1CMKeepAliveAndDrainClosure keep_alive(&_cm, _cm.task(index), _tm == RefProcThreadModel::Single);
G1CMDrainMarkingStackClosure complete_gc(&_cm, _cm.task(index), _tm == RefProcThreadModel::Single);
_rp_task->rp_work(worker_id, &is_alive, &keep_alive, &complete_gc);
}

virtual void work(uint worker_id) {
ResourceMark rm;
G1CMTask* task = _cm->task(worker_id);
G1CMIsAliveClosure g1_is_alive(_g1h);
G1CMKeepAliveAndDrainClosure g1_par_keep_alive(_cm, task, false /* is_serial */);
G1CMDrainMarkingStackClosure g1_par_drain(_cm, task, false /* is_serial */);

_proc_task.work(worker_id, g1_is_alive, g1_par_keep_alive, g1_par_drain);
void prepare_run_task_hook() override {
// We need to reset the concurrency level before each
// proxy task execution, so that the termination protocol
// and overflow handling in G1CMTask::do_marking_step() knows
// how many workers to wait for.
_cm.set_concurrency(_queue_count);
}
};

void G1CMRefProcTaskExecutor::execute(ProcessTask& proc_task, uint ergo_workers) {
assert(_workers != NULL, "Need parallel worker threads.");
assert(_g1h->ref_processor_cm()->processing_is_mt(), "processing is not MT");
assert(_workers->active_workers() >= ergo_workers,
"Ergonomically chosen workers(%u) should be less than or equal to active workers(%u)",
ergo_workers, _workers->active_workers());

G1CMRefProcTaskProxy proc_task_proxy(proc_task, _g1h, _cm);

// We need to reset the concurrency level before each
// proxy task execution, so that the termination protocol
// and overflow handling in G1CMTask::do_marking_step() knows
// how many workers to wait for.
_cm->set_concurrency(ergo_workers);
_workers->run_task(&proc_task_proxy, ergo_workers);
}

void G1ConcurrentMark::weak_refs_work(bool clear_all_soft_refs) {
ResourceMark rm;

Expand All @@ -1541,23 +1504,6 @@ void G1ConcurrentMark::weak_refs_work(bool clear_all_soft_refs) {
rp->setup_policy(clear_all_soft_refs);
assert(_global_mark_stack.is_empty(), "mark stack should be empty");

// Instances of the 'Keep Alive' and 'Complete GC' closures used
// in serial reference processing. Note these closures are also
// used for serially processing (by the the current thread) the
// JNI references during parallel reference processing.
//
// These closures do not need to synchronize with the worker
// threads involved in parallel reference processing as these
// instances are executed serially by the current thread (e.g.
// reference processing is not multi-threaded and is thus
// performed by the current thread instead of a gang worker).
//
// The gang tasks involved in parallel reference processing create
// their own instances of these closures, which do their own
// synchronization among themselves.
G1CMKeepAliveAndDrainClosure g1_keep_alive(this, task(0), true /* is_serial */);
G1CMDrainMarkingStackClosure g1_drain_mark_stack(this, task(0), true /* is_serial */);

// We need at least one active thread. If reference processing
// is not multi-threaded we use the current (VMThread) thread,
// otherwise we use the work gang from the G1CollectedHeap and
Expand All @@ -1576,19 +1522,11 @@ void G1ConcurrentMark::weak_refs_work(bool clear_all_soft_refs) {
rp->set_active_mt_degree(active_workers);

// Parallel processing task executor.
G1CMRefProcTaskExecutor par_task_executor(_g1h, this,
_g1h->workers(), active_workers);
AbstractRefProcTaskExecutor* executor = (rp->processing_is_mt() ? &par_task_executor : NULL);

G1CMRefProcProxyTask task(rp->max_num_queues(), *_g1h, *this);
ReferenceProcessorPhaseTimes pt(_gc_timer_cm, rp->max_num_queues());

// Process the weak references.
const ReferenceProcessorStats& stats =
rp->process_discovered_references(&g1_is_alive,
&g1_keep_alive,
&g1_drain_mark_stack,
executor,
&pt);
const ReferenceProcessorStats& stats = rp->process_discovered_references(task, pt);
_gc_tracer_cm->report_gc_reference_stats(stats);
pt.print_all_references();

Expand Down
9 changes: 4 additions & 5 deletions src/hotspot/share/gc/g1/g1ConcurrentMark.hpp
Expand Up @@ -278,15 +278,14 @@ class G1CMRootMemRegions {
// This class manages data structures and methods for doing liveness analysis in
// G1's concurrent cycle.
class G1ConcurrentMark : public CHeapObj<mtGC> {
friend class G1ConcurrentMarkThread;
friend class G1CMRefProcTaskProxy;
friend class G1CMRefProcTaskExecutor;
friend class G1CMKeepAliveAndDrainClosure;
friend class G1CMDrainMarkingStackClosure;
friend class G1CMBitMapClosure;
friend class G1CMConcurrentMarkingTask;
friend class G1CMDrainMarkingStackClosure;
friend class G1CMKeepAliveAndDrainClosure;
friend class G1CMRefProcProxyTask;
friend class G1CMRemarkTask;
friend class G1CMTask;
friend class G1ConcurrentMarkThread;

G1ConcurrentMarkThread* _cm_thread; // The thread doing the work
G1CollectedHeap* _g1h; // The heap
Expand Down

1 comment on commit 6ef46ce

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.