Skip to content
This repository has been archived by the owner on Apr 24, 2020. It is now read-only.

Commit

Permalink
queue count and test
Browse files Browse the repository at this point in the history
  • Loading branch information
erikfrey committed Jul 30, 2012
1 parent dac5f6b commit 92c7888
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 33 deletions.
2 changes: 1 addition & 1 deletion bench/queue_flood.cpp
Expand Up @@ -45,7 +45,7 @@ class event_loop
ios_.post(boost::bind(&queue::push, &q_, boost::cref(value_), push_cb_));
}

void pop_cb(const boost::system::error_code& error, queue::key_t key, const std::string& value)
void pop_cb(const boost::system::error_code& error, queue::key_type key, const std::string& value)
{
if (!error)
q_.pop_end(key, true, pop_end_cb_);
Expand Down
68 changes: 38 additions & 30 deletions include/darner/queue.hpp
Expand Up @@ -5,6 +5,7 @@
#include <set>

#include <boost/ptr_container/ptr_list.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/function.hpp>
Expand Down Expand Up @@ -32,9 +33,10 @@ class queue
{
public:

typedef boost::uint64_t key_t;
typedef boost::uint64_t key_type;
typedef boost::uint64_t size_type;
typedef boost::function<void (const boost::system::error_code& error)> push_callback;
typedef boost::function<void (const boost::system::error_code& error, key_t key, const std::string& value)> pop_callback;
typedef boost::function<void (const boost::system::error_code& error, key_type key, const std::string& value)> pop_callback;
typedef boost::function<void (const boost::system::error_code& error)> pop_end_callback;

queue(boost::asio::io_service& ios,
Expand All @@ -50,13 +52,15 @@ class queue
options.comparator = cmp_ = new comparator();
if (!leveldb::DB::Open(options, journal_path, &journal_).ok())
throw std::runtime_error("can't open journal: " + journal_path);
leveldb::Iterator* it = journal_->NewIterator(leveldb::ReadOptions());
it->SeekToFirst();
if (it->Valid())
head_ = *reinterpret_cast<const key_t *>(it->key().data());
it->SeekToLast();
if (it->Valid())
tail_ = *reinterpret_cast<const key_t *>(it->key().data());
// get head and tail
boost::scoped_ptr<leveldb::Iterator> it(journal_->NewIterator(leveldb::ReadOptions()));
it->SeekToFirst();
if (it->Valid())
{
head_ = *reinterpret_cast<const key_type *>(it->key().data());
it->SeekToLast();
tail_ = *reinterpret_cast<const key_type *>(it->key().data());
}
}

~queue()
Expand All @@ -74,7 +78,7 @@ class queue
*/
void push(const std::string& value, const push_callback& cb)
{
leveldb::Slice skey(reinterpret_cast<const char *>(&tail_), sizeof(key_t));
leveldb::Slice skey(reinterpret_cast<const char *>(&tail_), sizeof(key_type));
if (!journal_->Put(leveldb::WriteOptions(), skey, value).ok())
{
cb(boost::system::error_code(boost::system::errc::io_error, boost::system::system_category()));
Expand All @@ -96,7 +100,7 @@ class queue
{
spin_waiters(); // first let's drive out any current waiters

key_t key;
key_type key;
if (!next_key(key)) // do we have an item right away?
{
if (wait_ms > 0) // okay, no item. can we fire up a timer and wait?
Expand All @@ -105,7 +109,7 @@ class queue
it->timer.async_wait(boost::bind(&queue::waiter_timeout, this, boost::asio::placeholders::error, it));
}
else
cb(boost::asio::error::not_found, key_t(), ""); // nothing? okay, return back no item
cb(boost::asio::error::not_found, key_type(), ""); // nothing? okay, return back no item
return;
}
get_value(key, cb);
Expand All @@ -115,11 +119,11 @@ class queue
* finish the pop, either by deleting the item or returning back to the queue. calls cb after the pop_end finishes
* with a success code. on failure, sets error as io_error if there was a problem with the underlying journal.
*/
void pop_end(key_t key, bool remove, const pop_end_callback& cb)
void pop_end(key_type key, bool remove, const pop_end_callback& cb)
{
if (remove)
{
leveldb::Slice skey(reinterpret_cast<const char *>(&key), sizeof(key_t));
leveldb::Slice skey(reinterpret_cast<const char *>(&key), sizeof(key_type));
if (!journal_->Delete(leveldb::WriteOptions(), skey).ok())
cb(boost::system::error_code(boost::system::errc::io_error, boost::system::system_category()));
else
Expand All @@ -135,6 +139,14 @@ class queue
}
}

// returns the number of items in the queue
size_type count()
{
return (tail_ - head_) + returned_.size();
}

// TODO: consider also reporting a queue size

private:

// any operation that mutates the queue or the waiter state should run this to crank any pending events
Expand All @@ -144,7 +156,7 @@ class queue
{
if (waiters_.empty())
break;
key_t key;
key_type key;
if (!next_key(key))
break;
boost::ptr_list<waiter>::auto_type waiter = waiters_.release(waiters_.begin());
Expand All @@ -154,7 +166,7 @@ class queue
}

// fetch the next key and return true if there is one
bool next_key(key_t & key)
bool next_key(key_type & key)
{
if (!returned_.empty())
{
Expand All @@ -169,9 +181,9 @@ class queue
}

// fetch the value in the journal at key and pass it to cb
void get_value(key_t key, const pop_callback& cb)
void get_value(key_type key, const pop_callback& cb)
{
leveldb::Slice skey(reinterpret_cast<const char *>(&key), sizeof(key_t));
leveldb::Slice skey(reinterpret_cast<const char *>(&key), sizeof(key_type));
std::string value;
boost::system::error_code e;
if (!journal_->Get(leveldb::ReadOptions(), skey, &value).ok())
Expand Down Expand Up @@ -200,9 +212,9 @@ class queue
boost::ptr_list<waiter>::auto_type waiter = waiters_.release(waiter_it);

if (error) // weird unspecified error, better pass it up just in case
waiter->cb(error, key_t(), "");
waiter->cb(error, key_type(), "");
else
waiter->cb(boost::asio::error::timed_out, key_t(), "");
waiter->cb(boost::asio::error::timed_out, key_type(), "");
}

// compare keys as native uint64's instead of lexically
Expand All @@ -211,13 +223,9 @@ class queue
public:
int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const
{
boost::uint64_t uia = *reinterpret_cast<const boost::uint64_t*>(a.data());
boost::uint64_t uib = *reinterpret_cast<const boost::uint64_t*>(b.data());
if (uia < uib)
return -1;
if (uia > uib)
return 1;
return 0;
key_type uia = *reinterpret_cast<const key_type*>(a.data());
key_type uib = *reinterpret_cast<const key_type*>(b.data());
return (uia < uib ? -1 : (uia > uib ? 1 : 0));
}
const char* Name() const { return "queue::comparator"; }
void FindShortestSeparator(std::string*, const leveldb::Slice&) const { }
Expand All @@ -233,9 +241,9 @@ class queue
// reserved items are held by a connection and not finished yet
// returned items were released by a connection but not deleted

boost::uint64_t head_;
boost::uint64_t tail_;
std::set<key_t> returned_; // items < TAIL that were reserved but later returned (not popped)
key_type head_;
key_type tail_;
std::set<key_type> returned_; // items < TAIL that were reserved but later returned (not popped)

boost::ptr_list<waiter> waiters_;

Expand Down
12 changes: 10 additions & 2 deletions tests/fixtures/basic.hpp
Expand Up @@ -18,6 +18,7 @@ class basic
basic()
: push_cb_(boost::bind(&basic::push_cb, this, _1)),
pop_cb_(boost::bind(&basic::pop_cb, this, _1, _2, _3)),
pop_end_cb_(boost::bind(&basic::pop_end_cb, this, _1)),
tmp_(boost::filesystem::temp_directory_path() / boost::filesystem::unique_path())
{
boost::filesystem::create_directories(tmp_);
Expand All @@ -42,20 +43,27 @@ class basic
push_error_ = error;
}

void pop_cb(const boost::system::error_code& error, key_t key, const std::string& value)
void pop_cb(const boost::system::error_code& error, darner::queue::key_type key, const std::string& value)
{
pop_error_ = error;
pop_key_ = key;
pop_value_ = value;
}

void pop_end_cb(const boost::system::error_code& error)
{
pop_end_error_ = error;
}

boost::system::error_code push_error_;
boost::system::error_code pop_error_;
key_t pop_key_;
boost::system::error_code pop_end_error_;
darner::queue::key_type pop_key_;
std::string pop_value_;

darner::queue::push_callback push_cb_;
darner::queue::pop_callback pop_cb_;
darner::queue::pop_end_callback pop_end_cb_;
boost::asio::io_service ios_;
boost::shared_ptr<darner::queue> queue_;
boost::filesystem::path tmp_;
Expand Down
14 changes: 14 additions & 0 deletions tests/queue.cpp
Expand Up @@ -59,4 +59,18 @@ BOOST_FIXTURE_TEST_CASE( test_pop_wait_timeout, fixtures::basic )
BOOST_REQUIRE(pop_error_);
}

// test we maintain size correctly
BOOST_FIXTURE_TEST_CASE( test_queue_size, fixtures::basic )
{
string value = "NO ALCOHOL BEFORE TATTOOS";
queue_->push(value, push_cb_);
BOOST_REQUIRE_EQUAL(queue_->count(), 1);
// even beginning a pop lowers the count...
queue_->pop(0, pop_cb_);
BOOST_REQUIRE_EQUAL(queue_->count(), 0);
// but returning it raises it back up
queue_->pop_end(0, false, pop_end_cb_);
BOOST_REQUIRE_EQUAL(queue_->count(), 1);
}

BOOST_AUTO_TEST_SUITE_END()

0 comments on commit 92c7888

Please sign in to comment.