Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #22 from wavii/flush_delete

Flush and delete
  • Loading branch information...
commit 492d15490b20d6767f684049808dbd3121ea8dcd 2 parents 3f83b7c + 1ffda47
@erikfrey erikfrey authored
View
2  README.md
@@ -70,4 +70,4 @@ Voila! By default, Darner listens on port 22133.
Darner follows the same protocol as [Kestrel](/robey/kestrel/blob/master/docs/guide.md#memcache), which is the memcache
protocol.
-Currently missing from the Darner implementation but TODO: `FLUSH`, `FLUSH_ALL`, `DELETE`, and some stats.
+Currently missing from the Darner implementation but TODO: some stats.
View
2  include/darner/net/handler.h
@@ -52,6 +52,8 @@ class handler : public boost::enable_shared_from_this<handler>
void write_version();
+ void destroy(); // really "delete", but that's a reserved word
+
void flush();
void flush_all();
View
11 include/darner/net/request.h
@@ -15,10 +15,11 @@ struct request
{
RT_STATS = 1,
RT_VERSION = 2,
- RT_FLUSH = 3,
- RT_FLUSH_ALL = 4,
- RT_SET = 5,
- RT_GET = 6
+ RT_DESTROY = 3,
+ RT_FLUSH = 4,
+ RT_FLUSH_ALL = 5,
+ RT_SET = 6,
+ RT_GET = 7
};
request_type type;
@@ -37,7 +38,7 @@ struct request_grammar : boost::spirit::qi::grammar<std::string::const_iterator>
request_grammar();
request req;
boost::spirit::qi::rule<std::string::const_iterator, std::string()> key_name;
- boost::spirit::qi::rule<std::string::const_iterator> stats, version, flush, flush_all, set_option, set, get_option, get, start;
+ boost::spirit::qi::rule<std::string::const_iterator> stats, version, destroy, flush, flush_all, set_option, set, get_option, get, start;
};
// grammar are expensive to construct. to be thread-safe, let's make one grammar per thread.
View
13 include/darner/queue/queue.h
@@ -11,6 +11,7 @@
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <leveldb/db.h>
#include <leveldb/comparator.h>
@@ -28,7 +29,7 @@ namespace darner {
*
* queue is not thread-safe, it assumes a single-thread calling and operating the provided io_service
*/
-class queue
+class queue : public boost::enable_shared_from_this<queue>
{
public:
@@ -41,9 +42,15 @@ class queue
// open or create the queue at the path
queue(boost::asio::io_service& ios, const std::string& path);
+ // destruct the queue, and delete the journal if destroy() was called
+ ~queue();
+
// wait up to wait_ms milliseconds for an item to become available, then call cb with success or timeout
void wait(size_type wait_ms, const wait_callback& cb);
+ // delete the journal upon destruction
+ void destroy();
+
// returns the number of items in the queue
size_type count() const;
@@ -246,11 +253,13 @@ class queue
std::set<id_type> returned_; // items < TAIL that were reserved but later returned (not popped)
+ bool destroy_; // if true, we will delete the journal upon destruction
+
boost::ptr_list<waiter> waiters_;
boost::ptr_list<waiter>::iterator wake_up_it_;
boost::asio::io_service& ios_;
- const std::string path_;
+ std::string path_;
};
} // darner
View
26 include/darner/util/queue_map.hpp
@@ -5,7 +5,7 @@
#include <map>
#include <boost/asio.hpp>
-#include <boost/shared_ptr.hpp>
+#include <boost/make_shared.hpp>
#include <boost/filesystem/operations.hpp>
#include "darner/queue/queue.h"
@@ -32,8 +32,7 @@ class queue_map
{
std::string queue_name =
boost::filesystem::path(it->path().filename()).string(); // useless recast for boost backwards compat
- boost::shared_ptr<queue> p(new queue(ios_, (data_path_ / queue_name).string()));
- queues_.insert(container_type::value_type(queue_name, p));
+ queues_[queue_name] = boost::make_shared<queue>(boost::ref(ios_), (data_path_ / queue_name).string());
}
}
@@ -42,14 +41,27 @@ class queue_map
iterator it = queues_.find(queue_name);
if (it == queues_.end())
- {
- boost::shared_ptr<queue> p(new queue(ios_, (data_path_ / queue_name).string()));
- it = queues_.insert(container_type::value_type(queue_name, p)).first;
- }
+ it = queues_.insert(container_type::value_type(queue_name,
+ boost::make_shared<queue>(boost::ref(ios_), (data_path_ / queue_name).string()))).first;
return it->second;
}
+ void erase(const std::string& queue_name, bool recreate = false)
+ {
+ iterator it = queues_.find(queue_name);
+
+ if (it == queues_.end())
+ return;
+
+ it->second->destroy();
+
+ queues_.erase(it);
+
+ if (recreate)
+ queues_[queue_name] = boost::make_shared<queue>(boost::ref(ios_), (data_path_ / queue_name).string());
+ }
+
iterator begin() { return queues_.begin(); }
iterator end() { return queues_.end(); }
const_iterator begin() const { return queues_.begin(); }
View
16 src/net/handler.cpp
@@ -63,6 +63,7 @@ void handler::parse_request(const system::error_code& e, size_t bytes_transferre
{
case request::RT_STATS: write_stats(); break;
case request::RT_VERSION: write_version(); break;
+ case request::RT_DESTROY: destroy(); break;
case request::RT_FLUSH: flush(); break;
case request::RT_FLUSH_ALL: flush_all(); break;
case request::RT_SET: ++stats_.cmd_sets; set(); break;
@@ -90,14 +91,25 @@ void handler::write_version()
async_write(socket_, buffer(buf_), bind(&handler::read_request, shared_from_this(), _1, _2));
}
+void handler::destroy()
+{
+ queues_.erase(req_.queue, false);
+ return end("DELETED\r\n");
+}
+
void handler::flush()
{
- // TODO: implement
+ // TODO: flush should guarantee that an item that's halfway pushed should still appear after
+ // the flush. right now, item will only appear to a client that was waiting to pop before the flush
+ queues_.erase(req_.queue, true);
+ return end();
}
void handler::flush_all()
{
- // TODO: implement
+ for (queue_map::iterator it = queues_.begin(); it != queues_.end(); ++it)
+ queues_.erase(it->first, true);
+ return end("Flushed all queues.\r\n");
}
void handler::set()
View
6 src/net/request.cpp
@@ -22,6 +22,10 @@ request_grammar::request_grammar()
version =
lit("version") [phoenix::ref(req.type) = request::RT_VERSION];
+ destroy =
+ lit("delete ") [phoenix::ref(req.type) = request::RT_DESTROY]
+ >> key_name [phoenix::ref(req.queue) = _1];
+
flush =
lit("flush ") [phoenix::ref(req.type) = request::RT_FLUSH]
>> key_name [phoenix::ref(req.queue) = _1];
@@ -61,5 +65,5 @@ request_grammar::request_grammar()
>> *get_option
>> -lit(' '); // be permissive to clients inserting spaces
- start = (stats | version | flush | flush_all | set | get) >> eol;
+ start = (stats | version | destroy | flush | flush_all | set | get) >> eol;
}
View
41 src/queue/queue.cpp
@@ -1,6 +1,8 @@
#include "darner/queue/queue.h"
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/filesystem/operations.hpp>
#include <leveldb/iterator.h>
#include <leveldb/write_batch.h>
@@ -18,6 +20,7 @@ queue::queue(asio::io_service& ios, const string& path)
chunks_head_(key_type::KT_CHUNK, 0),
items_open_(0),
bytes_evicted_(0),
+ destroy_(false),
wake_up_it_(waiters_.begin()),
ios_(ios),
path_(path)
@@ -51,12 +54,44 @@ queue::queue(asio::io_service& ios, const string& path)
}
}
+queue::~queue()
+{
+ journal_.reset();
+ // TODO: most non-crap filesystems should be able to drop large files quickly, but this will block painfully on ext3.
+ // one ugly solution is a separate delete thread. or we can wait out everyone upgrading to ext4 :)
+ if (destroy_)
+ boost::filesystem::remove_all(path_);
+}
+
void queue::wait(size_type wait_ms, const wait_callback& cb)
{
ptr_list<waiter>::iterator it = waiters_.insert(waiters_.end(), new waiter(ios_, wait_ms, cb));
if (wake_up_it_ == waiters_.end())
wake_up_it_ = it;
- it->timer.async_wait(bind(&queue::waiter_wakeup, this, asio::placeholders::error, it));
+ it->timer.async_wait(bind(&queue::waiter_wakeup, shared_from_this(), asio::placeholders::error, it));
+}
+
+void queue::destroy()
+{
+ if (destroy_)
+ return; // already going to delete on dtor!
+
+ // rename the journal dir in case the user creates a new queue with the same name before this one is destroyed
+ string new_path = path_ + ".0";
+ for (size_t i = 0; boost::filesystem::exists(new_path); ++i)
+ new_path = path_ + "." + lexical_cast<string>(i);
+ journal_.reset();
+ boost::filesystem::rename(path_, new_path);
+
+ leveldb::DB* pdb;
+ leveldb::Options options;
+ options.comparator = cmp_.get();
+ if (!leveldb::DB::Open(options, new_path, &pdb).ok())
+ throw runtime_error("can't open journal: " + path_); // should never happen, but fatal if it does
+
+ journal_.reset(pdb);
+ path_ = new_path;
+ destroy_ = true;
}
queue::size_type queue::count() const
@@ -111,6 +146,8 @@ bool queue::pop_begin(id_type& result)
else
return false;
+ ++items_open_;
+
return true;
}
@@ -131,8 +168,6 @@ void queue::pop_read(std::string& result_item, header_type& result_header, id_ty
throw system::system_error(system::errc::io_error,
boost::asio::error::get_system_category()); // anything else is bad data
}
-
- ++items_open_;
}
void queue::pop_end(bool erase, id_type id, const header_type& header)
View
13 tests/queue.cpp
@@ -197,4 +197,17 @@ BOOST_FIXTURE_TEST_CASE( test_push_zero, fixtures::basic_queue )
BOOST_REQUIRE_EQUAL(value, pop_value_);
}
+// test that we can delete a queue when we are done with it
+BOOST_FIXTURE_TEST_CASE( test_delete_queue, fixtures::basic_queue )
+{
+ queue_->destroy();
+ BOOST_REQUIRE(!filesystem::exists(tmp_ / "queue"));
+ BOOST_REQUIRE(filesystem::exists(tmp_ / "queue.0")); // first delete gets .0
+ darner::queue queue2(ios_, (tmp_ / "queue").string());
+ queue2.destroy();
+ BOOST_REQUIRE(filesystem::exists(tmp_ / "queue.1")); // second delete gets .1
+ queue_.reset();
+ BOOST_REQUIRE(!filesystem::exists(tmp_ / "queue.0")); // finally, destroying the queue deletes the journal
+}
+
BOOST_AUTO_TEST_SUITE_END()
Please sign in to comment.
Something went wrong with that request. Please try again.