Skip to content

Commit

Permalink
8318986: Improve GenericWaitBarrier performance
Browse files Browse the repository at this point in the history
Reviewed-by: rehn, iwalulya, pchilanomate
  • Loading branch information
shipilev committed Nov 22, 2023
1 parent 407cdd4 commit 30462f9
Show file tree
Hide file tree
Showing 2 changed files with 274 additions and 61 deletions.
261 changes: 212 additions & 49 deletions src/hotspot/share/utilities/waitBarrier_generic.cpp
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -29,66 +30,228 @@
#include "utilities/waitBarrier_generic.hpp"
#include "utilities/spinYield.hpp"

// Implements the striped semaphore wait barrier.
//
// To guarantee progress and safety, we need to make sure that new barrier tag
// starts with the completely empty set of waiters and free semaphore. This
// requires either waiting for all threads to leave wait() for current barrier
// tag on disarm(), or waiting for all threads to leave the previous tag before
// reusing the semaphore in arm().
//
// When there are multiple threads, it is normal for some threads to take
// significant time to leave the barrier. Waiting for these threads introduces
// stalls on barrier reuse.
//
// If we wait on disarm(), this stall is nearly guaranteed to happen if some threads
// are de-scheduled by prior wait(). It would be especially bad if there are more
// waiting threads than CPUs: every thread would need to wake up and register itself
// as leaving, before we can unblock from disarm().
//
// If we wait on arm(), we can get lucky that most threads would be able to catch up,
// exit wait(), and so we arrive to arm() with semaphore ready for reuse. However,
// that is still insufficient in practice.
//
// Therefore, this implementation goes a step further and implements the _striped_
// semaphores. We maintain several semaphores in cells. The barrier tags are assigned
// to cells in some simple manner. Most of the current uses have sequential barrier
// tags, so simple modulo works well. We then operate on a cell like we would operate
// on a single semaphore: we wait at arm() for all threads to catch up before reusing
// the cell. For the cost of maintaining just a few cells, we have enough window for
// threads to catch up.
//
// The correctness is guaranteed by using a single atomic state variable per cell,
// with updates always done with CASes:
//
// [.......... barrier tag ..........][.......... waiters ..........]
// 63 31 0
//
// Cell starts with zero tag and zero waiters. Arming the cell swings barrier tag from
// zero to some tag, while checking that no waiters have appeared. Disarming swings
// the barrier tag back from tag to zero. Every waiter registers itself by incrementing
// the "waiters", while checking that barrier tag is still the same. Every completing waiter
// decrements the "waiters". When all waiters complete, a cell ends up in initial state,
// ready to be armed again. This allows accurate tracking of how many signals
// to issue and does not race with disarm.
//
// The implementation uses the strongest (default) barriers for extra safety, even
// when not strictly required to do so for correctness. Extra barrier overhead is
// dominated by the actual wait/notify latency anyway.
//

void GenericWaitBarrier::arm(int barrier_tag) {
assert(_barrier_tag == 0, "Already armed");
assert(_waiters == 0, "We left a thread hanging");
_barrier_tag = barrier_tag;
_waiters = 0;
assert(barrier_tag != 0, "Pre arm: Should be arming with armed value");
assert(Atomic::load(&_barrier_tag) == 0,
"Pre arm: Should not be already armed. Tag: %d",
Atomic::load(&_barrier_tag));
Atomic::release_store(&_barrier_tag, barrier_tag);

Cell &cell = tag_to_cell(barrier_tag);
cell.arm(barrier_tag);

// API specifies arm() must provide a trailing fence.
OrderAccess::fence();
}

int GenericWaitBarrier::wake_if_needed() {
assert(_barrier_tag == 0, "Not disarmed");
int w = _waiters;
if (w == 0) {
// Load of _barrier_threads in caller must not pass the load of _waiters.
OrderAccess::loadload();
return 0;
}
assert(w > 0, "Bad counting");
// We need an exact count which never goes below zero,
// otherwise the semaphore may be signalled too many times.
if (Atomic::cmpxchg(&_waiters, w, w - 1) == w) {
_sem_barrier.signal();
return w - 1;
}
return w;
void GenericWaitBarrier::disarm() {
int barrier_tag = Atomic::load_acquire(&_barrier_tag);
assert(barrier_tag != 0, "Pre disarm: Should be armed. Tag: %d", barrier_tag);
Atomic::release_store(&_barrier_tag, 0);

Cell &cell = tag_to_cell(barrier_tag);
cell.disarm(barrier_tag);

// API specifies disarm() must provide a trailing fence.
OrderAccess::fence();
}

void GenericWaitBarrier::disarm() {
assert(_barrier_tag != 0, "Not armed");
_barrier_tag = 0;
// Loads of _barrier_threads/_waiters must not float above disarm store and
// disarm store must not sink below.
void GenericWaitBarrier::wait(int barrier_tag) {
assert(barrier_tag != 0, "Pre wait: Should be waiting on armed value");

Cell &cell = tag_to_cell(barrier_tag);
cell.wait(barrier_tag);

// API specifies wait() must provide a trailing fence.
OrderAccess::fence();
int left;
}

void GenericWaitBarrier::Cell::arm(int32_t requested_tag) {
// Before we continue to arm, we need to make sure that all threads
// have left the previous cell.

int64_t state;

SpinYield sp;
do {
left = GenericWaitBarrier::wake_if_needed();
if (left == 0 && _barrier_threads > 0) {
// There is no thread to wake but we still have barrier threads.
while (true) {
state = Atomic::load_acquire(&_state);
assert(decode_tag(state) == 0,
"Pre arm: Should not be armed. "
"Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
decode_tag(state), decode_waiters(state));
if (decode_waiters(state) == 0) {
break;
}
sp.wait();
}

// Try to swing cell to armed. This should always succeed after the check above.
int64_t new_state = encode(requested_tag, 0);
int64_t prev_state = Atomic::cmpxchg(&_state, state, new_state);
if (prev_state != state) {
fatal("Cannot arm the wait barrier. "
"Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
decode_tag(prev_state), decode_waiters(prev_state));
}
}

int GenericWaitBarrier::Cell::signal_if_needed(int max) {
int signals = 0;
while (true) {
int cur = Atomic::load_acquire(&_outstanding_wakeups);
if (cur == 0) {
// All done, no more waiters.
return 0;
}
assert(cur > 0, "Sanity");

int prev = Atomic::cmpxchg(&_outstanding_wakeups, cur, cur - 1);
if (prev != cur) {
// Contention, return to caller for early return or backoff.
return prev;
}

// Signal!
_sem.signal();

if (++signals >= max) {
// Signalled requested number of times, break out.
return prev;
}
}
}

void GenericWaitBarrier::Cell::disarm(int32_t expected_tag) {
int32_t waiters;

while (true) {
int64_t state = Atomic::load_acquire(&_state);
int32_t tag = decode_tag(state);
waiters = decode_waiters(state);

assert((tag == expected_tag) && (waiters >= 0),
"Mid disarm: Should be armed with expected tag and have sane waiters. "
"Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
tag, waiters);

int64_t new_state = encode(0, waiters);
if (Atomic::cmpxchg(&_state, state, new_state) == state) {
// Successfully disarmed.
break;
}
}

// Wake up waiters, if we have at least one.
// Allow other threads to assist with wakeups, if possible.
if (waiters > 0) {
Atomic::release_store(&_outstanding_wakeups, waiters);
SpinYield sp;
while (signal_if_needed(INT_MAX) > 0) {
sp.wait();
}
// We must loop here until there are no waiters or potential waiters.
} while (left > 0 || _barrier_threads > 0);
// API specifies disarm() must provide a trailing fence.
OrderAccess::fence();
}
assert(Atomic::load(&_outstanding_wakeups) == 0, "Post disarm: Should not have outstanding wakeups");
}

void GenericWaitBarrier::wait(int barrier_tag) {
assert(barrier_tag != 0, "Trying to wait on disarmed value");
if (barrier_tag != _barrier_tag) {
// API specifies wait() must provide a trailing fence.
OrderAccess::fence();
return;
void GenericWaitBarrier::Cell::wait(int32_t expected_tag) {
// Try to register ourselves as pending waiter.
while (true) {
int64_t state = Atomic::load_acquire(&_state);
int32_t tag = decode_tag(state);
if (tag != expected_tag) {
// Cell tag had changed while waiting here. This means either the cell had
// been disarmed, or we are late and the cell was armed with a new tag.
// Exit without touching anything else.
return;
}
int32_t waiters = decode_waiters(state);

assert((tag == expected_tag) && (waiters >= 0 && waiters < INT32_MAX),
"Before wait: Should be armed with expected tag and waiters are in range. "
"Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
tag, waiters);

int64_t new_state = encode(tag, waiters + 1);
if (Atomic::cmpxchg(&_state, state, new_state) == state) {
// Success! Proceed to wait.
break;
}
}
Atomic::add(&_barrier_threads, 1);
if (barrier_tag != 0 && barrier_tag == _barrier_tag) {
Atomic::add(&_waiters, 1);
_sem_barrier.wait();
// We help out with posting, but we need to do so before we decrement the
// _barrier_threads otherwise we might wake threads up in next wait.
GenericWaitBarrier::wake_if_needed();

// Wait for notification.
_sem.wait();

// Unblocked! We help out with waking up two siblings. This allows to avalanche
// the wakeups for many threads, even if some threads are lagging behind.
// Note that we can only do this *before* reporting back as completed waiter,
// otherwise we might prematurely wake up threads for another barrier tag.
// Current arm() sequence protects us from this trouble by waiting until all waiters
// leave.
signal_if_needed(2);

// Register ourselves as completed waiter before leaving.
while (true) {
int64_t state = Atomic::load_acquire(&_state);
int32_t tag = decode_tag(state);
int32_t waiters = decode_waiters(state);

assert((tag == 0) && (waiters > 0),
"After wait: Should be not armed and have non-complete waiters. "
"Tag: " INT32_FORMAT "; Waiters: " INT32_FORMAT,
tag, waiters);

int64_t new_state = encode(tag, waiters - 1);
if (Atomic::cmpxchg(&_state, state, new_state) == state) {
// Success!
break;
}
}
Atomic::add(&_barrier_threads, -1);
}
74 changes: 62 additions & 12 deletions src/hotspot/share/utilities/waitBarrier_generic.hpp
Expand Up @@ -26,29 +26,79 @@
#define SHARE_UTILITIES_WAITBARRIER_GENERIC_HPP

#include "memory/allocation.hpp"
#include "memory/padded.hpp"
#include "runtime/semaphore.hpp"
#include "utilities/globalDefinitions.hpp"

// In addition to the barrier tag, it uses two counters to keep the semaphore
// count correct and not leave any late thread waiting.
class GenericWaitBarrier : public CHeapObj<mtInternal> {
private:
class Cell : public CHeapObj<mtInternal> {
private:
// Pad out the cells to avoid interference between the cells.
// This would insulate from stalls when adjacent cells have returning
// workers and contend over the cache line for current latency-critical
// cell.
DEFINE_PAD_MINUS_SIZE(0, DEFAULT_CACHE_LINE_SIZE, 0);

Semaphore _sem;

// Cell state, tracks the arming + waiters status
volatile int64_t _state;

// Wakeups to deliver for current waiters
volatile int _outstanding_wakeups;

int signal_if_needed(int max);

static int64_t encode(int32_t barrier_tag, int32_t waiters) {
int64_t val = (((int64_t) barrier_tag) << 32) |
(((int64_t) waiters) & 0xFFFFFFFF);
assert(decode_tag(val) == barrier_tag, "Encoding is reversible");
assert(decode_waiters(val) == waiters, "Encoding is reversible");
return val;
}

static int32_t decode_tag(int64_t value) {
return (int32_t)(value >> 32);
}

static int32_t decode_waiters(int64_t value) {
return (int32_t)(value & 0xFFFFFFFF);
}

public:
Cell() : _sem(0), _state(encode(0, 0)), _outstanding_wakeups(0) {}
NONCOPYABLE(Cell);

void arm(int32_t requested_tag);
void disarm(int32_t expected_tag);
void wait(int32_t expected_tag);
};

// Should be enough for most uses without exploding the footprint.
static constexpr int CELLS_COUNT = 16;

Cell _cells[CELLS_COUNT];

// Trailing padding to protect the last cell.
DEFINE_PAD_MINUS_SIZE(0, DEFAULT_CACHE_LINE_SIZE, 0);

volatile int _barrier_tag;
// The number of threads waiting on or about to wait on the semaphore.
volatile int _waiters;
// The number of threads in the wait path, before or after the tag check.
// These threads can become waiters.
volatile int _barrier_threads;
Semaphore _sem_barrier;

// Trailing padding to insulate the rest of the barrier from adjacent
// data structures. The leading padding is not needed, as cell padding
// handles this for us.
DEFINE_PAD_MINUS_SIZE(1, DEFAULT_CACHE_LINE_SIZE, 0);

NONCOPYABLE(GenericWaitBarrier);

int wake_if_needed();
Cell& tag_to_cell(int tag) { return _cells[tag & (CELLS_COUNT - 1)]; }

public:
GenericWaitBarrier() : _barrier_tag(0), _waiters(0), _barrier_threads(0), _sem_barrier(0) {}
public:
GenericWaitBarrier() : _cells(), _barrier_tag(0) {}
~GenericWaitBarrier() {}

const char* description() { return "semaphore"; }
const char* description() { return "striped semaphore"; }

void arm(int barrier_tag);
void disarm();
Expand Down

1 comment on commit 30462f9

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.