Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
simplify queue, enforce single thread since we'll probably just have …
Browse files Browse the repository at this point in the history
…one disk i/o thread anyway
  • Loading branch information
erikfrey committed Jul 30, 2012
1 parent 1784a90 commit 934c4d2
Showing 1 changed file with 26 additions and 45 deletions.
71 changes: 26 additions & 45 deletions include/darner/queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace darner {
* queue will post events such as journal writes to a provided boost::asio io_service. interrupting the
* io_service with pending events is okay - queue is never in an inconsistent state between io events.
*
* queue is thread-safe, it synchronizes naively on a single asio strand
* queue is not thread-safe, it assumes a single-thread calling and operating the provided io_service
*/
class queue
{
Expand All @@ -43,7 +43,7 @@ class queue
cmp_(NULL),
head_(0),
tail_(0),
strand_(ios)
ios_(ios)
{
leveldb::Options options;
options.create_if_missing = true;
Expand Down Expand Up @@ -74,7 +74,17 @@ class queue
*/
void push(const std::string& value, const push_callback& cb)
{
strand_.post(boost::bind(&queue::push_, this, boost::cref(value), cb));
leveldb::Slice skey(reinterpret_cast<const char *>(&tail_), sizeof(key_t));
if (!journal_->Put(leveldb::WriteOptions(), skey, value).ok())
{
cb(boost::system::error_code(boost::system::errc::io_error, boost::system::system_category()));
return;
}

++tail_; // post-increment tail key in case leveldb write fails
cb(boost::system::error_code());

spin_waiters(); // in case there's a waiter waiting for this
}

/*
Expand All @@ -84,57 +94,28 @@ class queue
*/
void pop(size_t wait_ms, const pop_callback& cb)
{
strand_.post(boost::bind(&queue::pop_, this, wait_ms, cb));
}

/*
* finish the pop, either by deleting the item or returning back to the queue. calls cb after the pop_end finishes
* with a success code. on failure, sets error as io_error if there was a problem with the underlying journal.
*/
void pop_end(key_t key, bool remove, const pop_end_callback& cb)
{
strand_.post(boost::bind(&queue::pop_end_, this, key, remove, cb));
}

private:

void push_(const std::string& value, const push_callback& cb)
{
leveldb::Slice skey(reinterpret_cast<const char *>(&tail_), sizeof(key_t));
if (!journal_->Put(leveldb::WriteOptions(), skey, value).ok())
{
cb(boost::system::error_code(boost::system::errc::io_error, boost::system::system_category()));
return;
}

++tail_; // post-increment tail key in case leveldb write fails
cb(boost::system::error_code());

spin_waiters(); // in case there's a waiter waiting for this
}

void pop_(size_t wait_ms, const pop_callback& cb)
{
spin_waiters(); // first let's drive out any current waiters

key_t key;
if (!next_key(key)) // do we have an item right away?
{
if (wait_ms > 0) // okay, no item. can we fire up a timer and wait?
{
boost::ptr_list<waiter>::iterator it = waiters_.insert(waiters_.end(), new waiter(strand_, wait_ms, cb));
it->timer.async_wait(strand_.wrap(
boost::bind(&queue::waiter_timeout, this, boost::asio::placeholders::error, it)
));
boost::ptr_list<waiter>::iterator it = waiters_.insert(waiters_.end(), new waiter(ios_, wait_ms, cb));
it->timer.async_wait(boost::bind(&queue::waiter_timeout, this, boost::asio::placeholders::error, it));
}
else
cb(boost::asio::error::not_found, key_t(), ""); // nothing? okay, return back no item
return;
}
get_value(key, cb);
}
}

void pop_end_(key_t key, bool remove, const pop_end_callback& cb)
/*
* finish the pop, either by deleting the item or returning back to the queue. calls cb after the pop_end finishes
* with a success code. on failure, sets error as io_error if there was a problem with the underlying journal.
*/
void pop_end(key_t key, bool remove, const pop_end_callback& cb)
{
if (remove)
{
Expand All @@ -154,6 +135,8 @@ class queue
}
}

private:

// any operation that mutates the queue or the waiter state should run this to crank any pending events
void spin_waiters()
{
Expand Down Expand Up @@ -199,9 +182,9 @@ class queue
// a waiter is a cheap struct that ties a callback to a deadline timer
struct waiter
{
waiter(boost::asio::strand& strand_, size_t wait_ms, const pop_callback& _cb)
waiter(boost::asio::io_service& ios, size_t wait_ms, const pop_callback& _cb)
: cb(_cb),
timer(strand_.get_io_service(), boost::posix_time::milliseconds(wait_ms))
timer(ios, boost::posix_time::milliseconds(wait_ms))
{
}

Expand Down Expand Up @@ -256,9 +239,7 @@ class queue

boost::ptr_list<waiter> waiters_;

// all state mutation is synchronized on a single strand
// no updates to the journal or to waiters can happen simultaenously
boost::asio::strand strand_;
boost::asio::io_service& ios_;
};

} // darner
Expand Down

0 comments on commit 934c4d2

Please sign in to comment.