Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

queues support delete on destroy

  • Loading branch information...
commit 8f2d543b1eba9d7c09b832d540be7765f0636a2c 1 parent 3f83b7c
@erikfrey erikfrey authored
View
13 include/darner/queue/queue.h
@@ -11,6 +11,7 @@
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <leveldb/db.h>
#include <leveldb/comparator.h>
@@ -28,7 +29,7 @@ namespace darner {
*
* queue is not thread-safe, it assumes a single-thread calling and operating the provided io_service
*/
-class queue
+class queue : public boost::enable_shared_from_this<queue>
{
public:
@@ -41,9 +42,15 @@ class queue
// open or create the queue at the path
queue(boost::asio::io_service& ios, const std::string& path);
+ // destroy the queue, and delete the journal if delete_on_destroy() was called
+ ~queue();
+
// wait up to wait_ms milliseconds for an item to become available, then call cb with success or timeout
void wait(size_type wait_ms, const wait_callback& cb);
+ // delete the journal upon destruction
+ void destroy();
+
// returns the number of items in the queue
size_type count() const;
@@ -246,11 +253,13 @@ class queue
std::set<id_type> returned_; // items < TAIL that were reserved but later returned (not popped)
+ bool delete_; // if true, we will delete the journal upon destruction
+
boost::ptr_list<waiter> waiters_;
boost::ptr_list<waiter>::iterator wake_up_it_;
boost::asio::io_service& ios_;
- const std::string path_;
+ std::string path_;
};
} // darner
View
35 src/queue/queue.cpp
@@ -1,6 +1,8 @@
#include "darner/queue/queue.h"
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/filesystem/operations.hpp>
#include <leveldb/iterator.h>
#include <leveldb/write_batch.h>
@@ -18,6 +20,7 @@ queue::queue(asio::io_service& ios, const string& path)
chunks_head_(key_type::KT_CHUNK, 0),
items_open_(0),
bytes_evicted_(0),
+ delete_(false),
wake_up_it_(waiters_.begin()),
ios_(ios),
path_(path)
@@ -51,12 +54,42 @@ queue::queue(asio::io_service& ios, const string& path)
}
}
+queue::~queue()
+{
+ journal_.reset();
+ if (delete_)
+ boost::filesystem::remove_all(path_);
+}
+
void queue::wait(size_type wait_ms, const wait_callback& cb)
{
ptr_list<waiter>::iterator it = waiters_.insert(waiters_.end(), new waiter(ios_, wait_ms, cb));
if (wake_up_it_ == waiters_.end())
wake_up_it_ = it;
- it->timer.async_wait(bind(&queue::waiter_wakeup, this, asio::placeholders::error, it));
+ it->timer.async_wait(bind(&queue::waiter_wakeup, shared_from_this(), asio::placeholders::error, it));
+}
+
+void queue::destroy()
+{
+ if (delete_)
+ return; // already going to delete on dtor!
+
+ // rename the journal dir in case the user creates a new queue with the same name before this one is destroyed
+ string new_path = path_ + ".0";
+ for (size_t i = 0; boost::filesystem::exists(new_path); ++i)
+ new_path = path_ + "." + lexical_cast<string>(i);
+ journal_.reset();
+ boost::filesystem::rename(path_, new_path);
+
+ leveldb::DB* pdb;
+ leveldb::Options options;
+ options.comparator = cmp_.get();
+ if (!leveldb::DB::Open(options, new_path, &pdb).ok())
+ throw runtime_error("can't open journal: " + path_); // should never happen, but fatal if it does
+
+ journal_.reset(pdb);
+ path_ = new_path;
+ delete_ = true;
}
queue::size_type queue::count() const
View
14 tests/queue.cpp
@@ -197,4 +197,18 @@ BOOST_FIXTURE_TEST_CASE( test_push_zero, fixtures::basic_queue )
BOOST_REQUIRE_EQUAL(value, pop_value_);
}
+// test that we can delete a queue when we are done with it
+BOOST_FIXTURE_TEST_CASE( test_delete_queue, fixtures::basic_queue )
+{
+ queue_->destroy();
+ BOOST_REQUIRE(!filesystem::exists(tmp_ / "queue"));
+ BOOST_REQUIRE(filesystem::exists(tmp_ / "queue.0")); // first delete gets .0
+ darner::queue queue2(ios_, (tmp_ / "queue").string());
+ queue2.destroy();
+ BOOST_REQUIRE(filesystem::exists(tmp_ / "queue.1")); // second delete gets .1
+ queue_.reset();
+ BOOST_REQUIRE(!filesystem::exists(tmp_ / "queue.0")); // finally, destroying the queue deletes the journal
+}
+
+
BOOST_AUTO_TEST_SUITE_END()
Please sign in to comment.
Something went wrong with that request. Please try again.