Permalink
Browse files

queue flush implementation and handler

  • Loading branch information...
1 parent 5ed5798 commit eb4041b241556566cd02b370326bd67b92f70bf1 @erikfrey erikfrey committed Mar 15, 2013
Showing with 30 additions and 8 deletions.
  1. +5 −2 include/darner/queue/queue.h
  2. +5 −2 src/net/handler.cpp
  3. +10 −4 src/queue/queue.cpp
  4. +10 −0 tests/queue.cpp
View
7 include/darner/queue/queue.h
@@ -42,7 +42,7 @@ class queue : public boost::enable_shared_from_this<queue>
// open or create the queue at the path
queue(boost::asio::io_service& ios, const std::string& path);
- // destroy the queue, and delete the journal if delete_on_destroy() was called
+ // destruct the queue, and delete the journal if destroy() was called
~queue();
// wait up to wait_ms milliseconds for an item to become available, then call cb with success or timeout
@@ -51,6 +51,9 @@ class queue : public boost::enable_shared_from_this<queue>
// delete the journal upon destruction
void destroy();
+ // flushes all items from the queue
+ void flush();
+
// returns the number of items in the queue
size_type count() const;
@@ -253,7 +256,7 @@ class queue : public boost::enable_shared_from_this<queue>
std::set<id_type> returned_; // items < TAIL that were reserved but later returned (not popped)
- bool delete_; // if true, we will delete the journal upon destruction
+ bool destroy_; // if true, we will delete the journal upon destruction
boost::ptr_list<waiter> waiters_;
boost::ptr_list<waiter>::iterator wake_up_it_;
View
7 src/net/handler.cpp
@@ -99,12 +99,15 @@ void handler::destroy()
void handler::flush()
{
- // TODO: implement
+ queues_[req_.queue]->flush();
+ return end();
}
void handler::flush_all()
{
- // TODO: implement
+ for (queue_map::iterator it = queues_.begin(); it != queues_.end(); ++it)
+ it->second->flush();
+ return end("Flushed all queues.");
}
void handler::set()
View
14 src/queue/queue.cpp
@@ -20,7 +20,7 @@ queue::queue(asio::io_service& ios, const string& path)
chunks_head_(key_type::KT_CHUNK, 0),
items_open_(0),
bytes_evicted_(0),
- delete_(false),
+ destroy_(false),
wake_up_it_(waiters_.begin()),
ios_(ios),
path_(path)
@@ -57,7 +57,7 @@ queue::queue(asio::io_service& ios, const string& path)
queue::~queue()
{
journal_.reset();
- if (delete_)
+ if (destroy_)
boost::filesystem::remove_all(path_);
}
@@ -71,7 +71,7 @@ void queue::wait(size_type wait_ms, const wait_callback& cb)
void queue::destroy()
{
- if (delete_)
+ if (destroy_)
return; // already going to delete on dtor!
// rename the journal dir in case the user creates a new queue with the same name before this one is destroyed
@@ -89,7 +89,13 @@ void queue::destroy()
journal_.reset(pdb);
path_ = new_path;
- delete_ = true;
+ destroy_ = true;
+}
+
+void queue::flush()
+{
+ queue_tail_ = queue_head_;
+ returned_.clear();
}
queue::size_type queue::count() const
View
10 tests/queue.cpp
@@ -210,5 +210,15 @@ BOOST_FIXTURE_TEST_CASE( test_delete_queue, fixtures::basic_queue )
BOOST_REQUIRE(!filesystem::exists(tmp_ / "queue.0")); // finally, destroying the queue deletes the journal
}
+// test that we can flush a queue
+BOOST_FIXTURE_TEST_CASE( test_flush, fixtures::basic_queue )
+{
+ string value = "Sometimes I get emotional over fonts";
+ oqs_.open(queue_, 1);
+ oqs_.write(value);
+ queue_->flush();
+ BOOST_REQUIRE_EQUAL(queue_->count(), 0);
+ BOOST_REQUIRE(!iqs_.open(queue_));
+}
BOOST_AUTO_TEST_SUITE_END()

0 comments on commit eb4041b

Please sign in to comment.