Permalink
Browse files

Fix lost wakeup bug in AutoResetEvent

Thanks to Tobias Brüll for pointing it out.
See tests/lostwakeup/README.md for more information.
1 parent 3347437 commit 3ff5ec464d5335342e667268e8537c3e1a785b79 @preshing committed Apr 18, 2015
View
@@ -6,7 +6,7 @@ Code is released under the [zlib license](http://en.wikipedia.org/wiki/Zlib_Lice
## How to Build the Tests
-First, you must generate the projects using [CMake](http://www.cmake.org/). Open a command prompt in the `tests` folder and do the following.
+First, you must generate the projects using [CMake](http://www.cmake.org/). Open a command prompt in the `tests/basetests` folder and do the following.
mkdir build
cd build
@@ -35,9 +35,7 @@ class AutoResetEvent
for (;;) // Increment m_status atomically via CAS loop.
{
assert(oldStatus <= 1);
- if (oldStatus == 1)
- return; // Event object is already signaled.
- int newStatus = oldStatus + 1;
+ int newStatus = oldStatus < 1 ? oldStatus + 1 : 1;
if (m_status.compare_exchange_weak(oldStatus, newStatus, std::memory_order_release, std::memory_order_relaxed))
break;
// The compare-exchange failed, likely because another thread changed m_status.
@@ -1,12 +1,12 @@
cmake_minimum_required(VERSION 2.8.6)
set(CMAKE_CONFIGURATION_TYPES "Debug;Release" CACHE INTERNAL "limited configs")
-project(Tests)
+project(BaseTests)
set(MACOSX_BUNDLE_GUI_IDENTIFIER "com.mycompany.\${PRODUCT_NAME:identifier}")
file(GLOB FILES *.cpp *.h)
-add_executable(Tests MACOSX_BUNDLE ${FILES})
-include(../cmake/BuildSettings.cmake)
+add_executable(${PROJECT_NAME} MACOSX_BUNDLE ${FILES})
+include(../../cmake/BuildSettings.cmake)
-add_subdirectory(../common common)
-include_directories(../common)
+add_subdirectory(../../common common)
+include_directories(../../common)
target_link_libraries(${PROJECT_NAME} Common)
File renamed without changes.
File renamed without changes.
@@ -0,0 +1,12 @@
+cmake_minimum_required(VERSION 2.8.6)
+set(CMAKE_CONFIGURATION_TYPES "Debug;Release" CACHE INTERNAL "limited configs")
+project(LostWakeup)
+
+set(MACOSX_BUNDLE_GUI_IDENTIFIER "com.mycompany.\${PRODUCT_NAME:identifier}")
+file(GLOB FILES *.cpp *.h)
+add_executable(${PROJECT_NAME} MACOSX_BUNDLE ${FILES})
+include(../../cmake/BuildSettings.cmake)
+
+add_subdirectory(../../common common)
+include_directories(../../common)
+target_link_libraries(${PROJECT_NAME} Common)
@@ -0,0 +1,56 @@
+The original implementation of `AutoResetEvent::signal()`, [as of commit 3347437](https://github.com/preshing/cpp11-on-multicore/tree/3347437911bfd3a45ddc07cb1cbb4a9d3e55583c), looked like this:
+
+ void signal()
+ {
+ int oldStatus = m_status.load(std::memory_order_relaxed);
+ for (;;) // Increment m_status atomically via CAS loop.
+ {
+ assert(oldStatus <= 1);
+ if (oldStatus == 1)
+ return; // Event object is already signaled.
+ int newStatus = oldStatus + 1;
+ if (m_status.compare_exchange_weak(oldStatus, newStatus, std::memory_order_release, std::memory_order_relaxed))
+ break;
+ // The compare-exchange failed, likely because another thread changed m_status.
+ // oldStatus has been updated. Retry the CAS loop.
+ }
+ if (oldStatus < 0)
+ m_sema.signal(); // Release one waiting thread.
+ }
+
+
+Commenter [Tobias Brüll correctly pointed out](http://preshing.com/20150316/semaphores-are-surprisingly-versatile) that this implementation was flawed, due to the early return in the middle of the CAS loop:
+
+> Isn't there the possibility of a lost wakeup in AutoResetEvent? Imagine two threads T1, T2 pushing requests, one thread W popping requests, an object `are` of type `AutoResetEvent` and the following course of actions:
+>
+> 1.) T1 pushes an item, calls `are.signal()`, and runs to termination. We now have `are.m_status == 1`.
+>
+> 2.) T2 reorders the `m_status.load(std::memory_order_relaxed)` before `requests.push(r)` to obtain 1.
+>
+> 3.) W runs. First it calls `are.wait()` and sets `are.m_status == 1`. W then empties the `requests`-queue; processing only one item. In the subsequent call to `are.wait()` it blocks on the semaphore.
+>
+> 4.) T2 continues. The effects of `requests.push(r)` become visible. However, since oldStatus == 1 in are.signal(), nothing happens in T2's call to are.signal().
+>
+> 5.) W continues to block on the semaphore, although there is work to do.
+
+Essentially, AutoResetEvent::signal() should always perform a release operation. That's the only way it can [_synchronize-with_](http://preshing.com/20130823/the-synchronizes-with-relation) the waiting thread. The bug exists because when the signaling thread takes that early return, it doesn't perform any release operation.
+
+The fix is to eliminate that early return and let it effectively perform `compare_exchange_weak(1, 1, std::memory_order_release)`:
+
+ void signal()
+ {
+ int oldStatus = m_status.load(std::memory_order_relaxed);
+ for (;;) // Increment m_status atomically via CAS loop.
+ {
+ assert(oldStatus <= 1);
+ int newStatus = oldStatus < 1 ? oldStatus + 1 : 1;
+ if (m_status.compare_exchange_weak(oldStatus, newStatus, std::memory_order_release, std::memory_order_relaxed))
+ break;
+ // The compare-exchange failed, likely because another thread changed m_status.
+ // oldStatus has been updated. Retry the CAS loop.
+ }
+ if (oldStatus < 0)
+ m_sema.signal(); // Release one waiting thread.
+ }
+
+The test in this folder can both reproduce the original bug and validate the fix. Build and run it using the same steps as `tests/basetests`, as described in the [root README file](https://github.com/preshing/cpp11-on-multicore/blob/master/README.md).
@@ -0,0 +1,168 @@
+//---------------------------------------------------------
+// For conditions of distribution and use, see
+// https://github.com/preshing/cpp11-on-multicore/blob/master/LICENSE
+//---------------------------------------------------------
+
+#include <iostream>
+#include <sstream>
+#include <iomanip>
+#include <thread>
+#include <random>
+#include <chrono>
+#include <memory>
+#include <atomic>
+#include "autoresetevent.h"
+
+
+std::string makeTimeString(const std::chrono::time_point<std::chrono::system_clock>& point)
+{
+ std::time_t time = std::chrono::system_clock::to_time_t(point);
+ std::stringstream str;
+ str << std::put_time(std::localtime(&time), "%c");
+ return str.str();
+}
+
+
+//---------------------------------------------------------
+// LostWakeupTester
+//---------------------------------------------------------
+class LostWakeupTester
+{
+private:
+ struct Wrapper
+ {
+ std::atomic<int> value;
+ Wrapper() : value(0) {}
+ };
+
+ struct ThreadData
+ {
+ std::atomic<bool> canStart;
+ std::atomic<bool> finished;
+ ThreadData() : canStart(false), finished(false) {}
+ };
+
+ AutoResetEvent m_event;
+ static const int kWorkAreaSize = 10000000;
+ std::unique_ptr<Wrapper[]> m_workArea;
+ int m_workIndex;
+ ThreadData m_threadData[3];
+
+public:
+ LostWakeupTester()
+ : m_workArea(new Wrapper[kWorkAreaSize])
+ , m_workIndex(0)
+ {
+ }
+
+ void threadFunc(int threadNum)
+ {
+ ThreadData& td = m_threadData[threadNum];
+ for (;;)
+ {
+ // Spin-wait for kick signal
+ while (!td.canStart)
+ std::atomic_signal_fence(std::memory_order_seq_cst);
+ td.canStart = false;
+
+ // Do this thread's job
+ int workIndex = m_workIndex;
+ if (threadNum == 0)
+ {
+ // Thread #0 "consumes work items" until signaled to stop
+ for (;;)
+ {
+ m_event.wait();
+ int previous = m_workArea[workIndex].value.exchange(0, std::memory_order_relaxed);
+ if (previous == -1)
+ break;
+ }
+ }
+ else
+ {
+ // Thread #1 and #2 each "publish a work item"
+ m_workArea[workIndex].value.store(1, std::memory_order_relaxed);
+ m_event.signal();
+ }
+
+ // Notify main thread that we've finished
+ td.finished = true;
+ }
+ }
+
+ bool test()
+ {
+ std::random_device rd;
+ std::mt19937 randomEngine(rd());
+ auto start = std::chrono::system_clock::now();
+ std::cout << "[" << makeTimeString(start) << "] start \n";
+ uint64_t failures = 0;
+ uint64_t trials = 0;
+ static const double kLogInterval = 1.0;
+ static const double kTimeout = 0.25;
+ double nextLogTime = kLogInterval;
+
+ // Spawn threads
+ std::thread t0(&LostWakeupTester::threadFunc, this, 0);
+ std::thread t1(&LostWakeupTester::threadFunc, this, 1);
+ std::thread t2(&LostWakeupTester::threadFunc, this, 2);
+
+ for (;;)
+ {
+ trials++;
+
+ // Initialize experiment
+ m_workIndex = std::uniform_int_distribution<>(0, kWorkAreaSize - 1)(randomEngine);
+ m_workArea[m_workIndex].value = 0;
+ for (ThreadData& td : m_threadData)
+ td.finished = false;
+
+ // Kick threads
+ for (ThreadData& td : m_threadData)
+ td.canStart = true;
+
+ // Wait for t1 + t2
+ while (!m_threadData[1].finished)
+ std::atomic_signal_fence(std::memory_order_seq_cst);
+ while (!m_threadData[2].finished)
+ std::atomic_signal_fence(std::memory_order_seq_cst);
+
+ // t0 should have consumed all "work items" within a reasonable time frame
+ auto startOfTimeout = std::chrono::high_resolution_clock::now();
+ while (m_workArea[m_workIndex].value != 0)
+ {
+ auto elapsed = std::chrono::high_resolution_clock::now() - startOfTimeout;
+ if (std::chrono::duration_cast<std::chrono::duration<double>>(elapsed).count() >= kTimeout)
+ {
+ failures++;
+ break;
+ }
+ }
+
+ // Stop t0
+ m_workArea[m_workIndex].value.store(-1, std::memory_order_relaxed);
+ while (!m_threadData[0].finished)
+ {
+ m_event.signal();
+ std::atomic_signal_fence(std::memory_order_seq_cst);
+ }
+
+ // Log the rate
+ auto now = std::chrono::system_clock::now();
+ double elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(now - start).count();
+ if (elapsed >= nextLogTime)
+ {
+ std::cout << "[" << makeTimeString(std::chrono::system_clock::now()) << "] "
+ << failures << " failures out of " << trials << ", "
+ << (trials / elapsed) << " trials/sec\n";
+ nextLogTime = elapsed + kLogInterval;
+ }
+ }
+ }
+};
+
+int main()
+{
+ LostWakeupTester tester;
+ return tester.test() ? 0 : 1;
+}

0 comments on commit 3ff5ec4

Please sign in to comment.