Skip to content

Commit

Permalink
Rewrote the entire front buffering scheme
Browse files Browse the repository at this point in the history
  • Loading branch information
ogiroux committed Sep 7, 2019
1 parent a8ea4b4 commit 4d3bafc
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 51 deletions.
4 changes: 4 additions & 0 deletions include/latch
Expand Up @@ -24,6 +24,8 @@ THE SOFTWARE.

#include "atomic_wait"

namespace std {

class latch {
public:
constexpr explicit latch(ptrdiff_t expected) : counter(expected) { }
Expand Down Expand Up @@ -63,3 +65,5 @@ public:
private:
std::atomic<ptrdiff_t> counter;
};

}
105 changes: 57 additions & 48 deletions include/semaphore
Expand Up @@ -147,6 +147,8 @@ THE SOFTWARE.

#endif

namespace std {

class __atomic_semaphore_base {

__semaphore_no_inline inline bool __fetch_sub_if_slow(ptrdiff_t old) {
Expand Down Expand Up @@ -243,44 +245,65 @@ public:

class __semaphore_base {

inline void __backfill() {
inline bool __backfill(bool success) {
#ifndef __NO_SEM_BACK
auto const back_amount = __backbuffer.fetch_sub(2, std::memory_order_acquire);
bool const post_one = back_amount > 0;
bool const post_two = back_amount > 1;
auto const success = (!post_one || __semaphore_sem_post(__semaphore, 1)) &&
(!post_two || __semaphore_sem_post(__semaphore, 1));
assert(success);
if(!post_one || !post_two)
__backbuffer.fetch_add(!post_one ? 2 : 1, std::memory_order_relaxed);
if(success) {
auto const back_amount = __backbuffer.fetch_sub(2, std::memory_order_acquire);
bool const post_one = back_amount > 0;
bool const post_two = back_amount > 1;
auto const success = (!post_one || __semaphore_sem_post(__semaphore, 1)) &&
(!post_two || __semaphore_sem_post(__semaphore, 1));
assert(success);
if(!post_one || !post_two)
__backbuffer.fetch_add(!post_one ? 2 : 1, std::memory_order_relaxed);
}
#endif
return success;
}
inline bool __try_acquire_fast() {
#ifndef __NO_SEM_FRONT
#ifndef __NO_SEM_POLL
if(__builtin_expect(__frontbuffer.load(std::memory_order_relaxed) <= 0,0)) {
ptrdiff_t old = __frontbuffer.load(std::memory_order_relaxed);
if(!(old >> 32)) {
using __clock = std::conditional<std::chrono::high_resolution_clock::is_steady,
std::chrono::high_resolution_clock,
std::chrono::steady_clock>::type;
auto const start = __clock::now();
while (__frontbuffer.load(std::memory_order_relaxed) <= 0) {
old = __frontbuffer.load(std::memory_order_relaxed);
while(!(old >> 32)) {
auto const elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(__clock::now() - start);
if(elapsed > std::chrono::microseconds(5))
break;
std::this_thread::sleep_for((elapsed + std::chrono::nanoseconds(100)) / 4);
}
}
#endif
auto const old = __frontbuffer.fetch_sub(1, std::memory_order_acq_rel);
return old > 0; // after: non-negative
#else
return false;
// boldly assume the semaphore is free with a count of 1, just because
ptrdiff_t old = 1ll << 32;
#endif
// always steal if you can
while(old >> 32)
if(__frontbuffer.compare_exchange_weak(old, old - (1ll << 32), std::memory_order_acquire))
return true;
// record we're waiting
old = __frontbuffer.fetch_add(1ll, std::memory_order_release);
// ALWAYS steal if you can!
while(old >> 32)
if(__frontbuffer.compare_exchange_weak(old, old - (1ll << 32), std::memory_order_acquire))
break;
// not going to wait after all
if(old >> 32)
return __try_done(true);
#endif
// the wait has begun...
return false;
}
inline void __try_failed() {
inline bool __try_done(bool success) {
#ifndef __NO_SEM_FRONT
__frontbuffer.fetch_add(1, std::memory_order_relaxed);
// record we're NOT waiting
__frontbuffer.fetch_sub(1ll, std::memory_order_release);
#endif
return __backfill(success);
}
__semaphore_no_inline inline void __release_slow(ptrdiff_t post_amount) {
#ifdef __SEM_POST_ONE
Expand All @@ -303,12 +326,6 @@ class __semaphore_base {
assert(success);
#endif
}
__semaphore_no_inline inline bool __try_acquire_for_slow(std::chrono::nanoseconds const& rel_time) {
auto const result = __semaphore_sem_wait_timed(__semaphore, rel_time);
if(result)
__backfill();
return result;
}
__semaphore_sem_t __semaphore;
#ifndef __NO_SEM_FRONT
std::atomic<ptrdiff_t> __frontbuffer;
Expand All @@ -324,7 +341,7 @@ public:

__semaphore_base(ptrdiff_t count = 0) : __semaphore()
#ifndef __NO_SEM_FRONT
, __frontbuffer(count)
, __frontbuffer(count << 32)
#endif
#ifndef __NO_SEM_BACK
, __backbuffer(0)
Expand All @@ -341,7 +358,7 @@ public:
}
~__semaphore_base() {
#ifndef __NO_SEM_FRONT
assert(__frontbuffer.load(std::memory_order_relaxed) >= 0);
assert(0 == (__frontbuffer.load(std::memory_order_relaxed) & ~0u));
#endif
auto const success = __semaphore_sem_destroy(__semaphore);
assert(success);
Expand All @@ -352,42 +369,32 @@ public:

inline void release(ptrdiff_t update = 1) {
#ifndef __NO_SEM_FRONT
auto const old = __frontbuffer.fetch_add(update, std::memory_order_acq_rel);
if(__builtin_expect(old >= 0,1)) // before: non-negative
return;
__release_slow(std::min(update, -old));
#else
// boldly assume the semaphore is taken but uncontended
ptrdiff_t old = 0;
// try to fast-release as long as it's uncontended
while(0 == (old & ~0ul))
if(__frontbuffer.compare_exchange_weak(old, old + (update << 32), std::memory_order_acq_rel))
return;
#endif
// slow-release it is
__release_slow(update);
#endif
}
inline void acquire() {
if(__builtin_expect(__try_acquire_fast(),1))
return;
auto const success = __semaphore_sem_wait(__semaphore);
assert(success);
__backfill();
if(!__try_acquire_fast())
__try_done(__semaphore_sem_wait(__semaphore));
}
inline bool try_acquire() noexcept {
return try_acquire_for(std::chrono::nanoseconds(0));
}
template <class Clock, class Duration>
bool try_acquire_until(std::chrono::time_point<Clock, Duration> const& abs_time) {
if(__builtin_expect(__try_acquire_fast(),1))
return true;
auto const current = std::max(Clock::now(), abs_time);
auto const result = __try_acquire_for_slow(std::chrono::duration_cast<std::chrono::nanoseconds>(abs_time - current));
if(!result)
__try_failed();
return result;
return try_acquire_for(std::chrono::duration_cast<std::chrono::nanoseconds>(abs_time - current));
}
template <class Rep, class Period>
bool try_acquire_for(std::chrono::duration<Rep, Period> const& rel_time) {
if(__builtin_expect(__try_acquire_fast(),1))
return true;
auto const result = __try_acquire_for_slow(std::chrono::duration_cast<std::chrono::nanoseconds>(rel_time));
if(!result)
__try_failed();
return result;
return __try_acquire_fast() ||
__try_done(__semaphore_sem_wait_timed(__semaphore, rel_time));
}
};

Expand Down Expand Up @@ -492,3 +499,5 @@ public:
#endif // __NO_SEM

using binary_semaphore = counting_semaphore<1>;

}
4 changes: 2 additions & 2 deletions sample.cpp
Expand Up @@ -181,13 +181,13 @@ int main() {

int const max = std::thread::hardware_concurrency();
std::cout << "System has " << max << " hardware threads." << std::endl;
/*

#ifndef __NO_MUTEX
test_mutex<sem_mutex>("Semlock");
test_mutex<mutex>("Spinlock");
test_mutex<ticket_mutex>("Ticket");
#endif
*/

#ifndef __NO_BARRIER
test_barrier<barrier<>>("Barrier");
#endif
Expand Down
2 changes: 1 addition & 1 deletion sample.hpp
Expand Up @@ -75,5 +75,5 @@ struct sem_mutex {
void unlock() noexcept {
c.release();
}
binary_semaphore c = 1;
std::binary_semaphore c = 1;
};

0 comments on commit 4d3bafc

Please sign in to comment.