Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
fix timeout/cancel race condition
Browse files Browse the repository at this point in the history
deadline_timer will issue a timeout even if the timer has a cancel in the event loop, so let's cooperate with that.
  • Loading branch information
erikfrey committed Aug 16, 2012
1 parent bdec87f commit 439a4a8
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 19 deletions.
7 changes: 4 additions & 3 deletions include/darner/queue/queue.h
Expand Up @@ -187,11 +187,11 @@ class queue
void FindShortSuccessor(std::string*) const {}
};

// any operation that mutates the queue or the waiter state should run this to crank any pending events
void spin_waiters();
// any operation that adds to the queue should crank a wakeup
void wake_up();

// fires either if timer times out or is canceled
void waiter_timeout(const boost::system::error_code& e, boost::ptr_list<waiter>::iterator waiter_it);
void waiter_wakeup(const boost::system::error_code& e, boost::ptr_list<waiter>::iterator waiter_it);

// some leveldb sugar:

Expand Down Expand Up @@ -234,6 +234,7 @@ class queue
std::set<id_type> returned_; // items < TAIL that were reserved but later returned (not popped)

boost::ptr_list<waiter> waiters_;
boost::ptr_list<waiter>::iterator wake_up_it_;

boost::asio::io_service& ios_;
};
Expand Down
28 changes: 12 additions & 16 deletions src/queue/queue.cpp
Expand Up @@ -15,6 +15,7 @@ queue::queue(asio::io_service& ios, const string& path)
queue_tail_(key_type::KT_QUEUE, 0),
chunks_head_(key_type::KT_CHUNK, 0),
items_open_(0),
wake_up_it_(waiters_.begin()),
ios_(ios)
{
leveldb::Options options;
Expand Down Expand Up @@ -49,7 +50,7 @@ queue::queue(asio::io_service& ios, const string& path)
void queue::wait(size_type wait_ms, const wait_callback& cb)
{
ptr_list<waiter>::iterator it = waiters_.insert(waiters_.end(), new waiter(ios_, wait_ms, cb));
it->timer.async_wait(bind(&queue::waiter_timeout, this, asio::placeholders::error, it));
it->timer.async_wait(bind(&queue::waiter_wakeup, this, asio::placeholders::error, it));
}

queue::size_type queue::count() const
Expand All @@ -76,7 +77,7 @@ void queue::push(id_type& result, const string& item)

result = queue_head_.id++;

spin_waiters(); // in case there's a waiter waiting for this new item
wake_up(); // in case there's a waiter waiting for this new item
}

void queue::push(id_type& result, const header_type& header)
Expand All @@ -85,7 +86,7 @@ void queue::push(id_type& result, const header_type& header)

result = queue_head_.id++;

spin_waiters(); // in case there's a waiter waiting for this new item
wake_up(); // in case there's a waiter waiting for this new item
}

bool queue::pop_begin(id_type& result)
Expand Down Expand Up @@ -147,8 +148,7 @@ void queue::pop_end(bool erase, id_type id, const header_type& header)
{
returned_.insert(id);

// in case there's a waiter waiting for this returned key
spin_waiters();
wake_up(); // in case there's a waiter waiting for this returned key
}

--items_open_;
Expand Down Expand Up @@ -182,25 +182,21 @@ void queue::erase_chunks(const header_type& header)

// private:

void queue::spin_waiters()
void queue::wake_up()
{
while (!waiters_.empty() && (!returned_.empty() || queue_tail_.id < queue_head_.id))
if (wake_up_it_ != waiters_.end())
{
ptr_list<waiter>::auto_type waiter = waiters_.release(waiters_.begin());
waiter->timer.cancel();
waiter->cb(system::error_code());
wake_up_it_->timer.cancel();
++wake_up_it_;
}
}

void queue::waiter_timeout(const system::error_code& e, ptr_list<queue::waiter>::iterator waiter_it)
void queue::waiter_wakeup(const system::error_code& e, ptr_list<queue::waiter>::iterator waiter_it)
{
if (e == asio::error::operation_aborted) // can be error if timer was canceled
return;

ptr_list<waiter>::auto_type waiter = waiters_.release(waiter_it);

if (e) // weird unspecified error, better pass it up just in case
waiter->cb(e);
if (!returned_.empty() || queue_tail_.id < queue_head_.id)
waiter->cb(system::error_code());
else
waiter->cb(asio::error::timed_out);
}
Expand Down
12 changes: 12 additions & 0 deletions tests/queue.cpp
Expand Up @@ -46,6 +46,18 @@ BOOST_FIXTURE_TEST_CASE( test_pop_wait, fixtures::basic_queue )
BOOST_REQUIRE(!error_);
}

// test the race condition where a wait callback does work that crosses it over the wait timeout
BOOST_FIXTURE_TEST_CASE( test_pop_wait_race, fixtures::basic_queue )
{
string value = "Fur pillows are hard to actually sleep on";
deadline_timer timer(ios_, posix_time::milliseconds(100));
timer.async_wait(bind(&fixtures::basic_queue::delayed_push, this, ref(value), _1));
queue_->wait(100, wait_cb_);
ios_.run();

BOOST_REQUIRE(!error_);
}

// now the opposite, test a pop wait timeout
BOOST_FIXTURE_TEST_CASE( test_pop_wait_timeout, fixtures::basic_queue )
{
Expand Down

0 comments on commit 439a4a8

Please sign in to comment.