Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Flush and delete #22

Merged
merged 7 commits into from

2 participants

@erikfrey

Alright @nova77 - here's how I'd do flush and delete.

Delete drops the queue from the queue_map, and tells it that within the dtor it should delete all its data. But it uses the shared_ptr semantics of other clients that might still be using the queue to keep it alive, and keep the journal available, until all clients have gone away.

The flush implementation is much simpler.

@nova77

I like it: it's much simpler. However, I am not sure to understand what happened to all the logic you wanted to be in place to prevent a flush while a get/set operation is going on.

@erikfrey erikfrey merged commit 492d154 into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
2  README.md
@@ -70,4 +70,4 @@ Voila! By default, Darner listens on port 22133.
Darner follows the same protocol as [Kestrel](/robey/kestrel/blob/master/docs/guide.md#memcache), which is the memcache
protocol.
-Currently missing from the Darner implementation but TODO: `FLUSH`, `FLUSH_ALL`, `DELETE`, and some stats.
+Currently missing from the Darner implementation but TODO: some stats.
View
2  include/darner/net/handler.h
@@ -52,6 +52,8 @@ class handler : public boost::enable_shared_from_this<handler>
void write_version();
+ void destroy(); // really "delete", but that's a reserved word
+
void flush();
void flush_all();
View
11 include/darner/net/request.h
@@ -15,10 +15,11 @@ struct request
{
RT_STATS = 1,
RT_VERSION = 2,
- RT_FLUSH = 3,
- RT_FLUSH_ALL = 4,
- RT_SET = 5,
- RT_GET = 6
+ RT_DESTROY = 3,
+ RT_FLUSH = 4,
+ RT_FLUSH_ALL = 5,
+ RT_SET = 6,
+ RT_GET = 7
};
request_type type;
@@ -37,7 +38,7 @@ struct request_grammar : boost::spirit::qi::grammar<std::string::const_iterator>
request_grammar();
request req;
boost::spirit::qi::rule<std::string::const_iterator, std::string()> key_name;
- boost::spirit::qi::rule<std::string::const_iterator> stats, version, flush, flush_all, set_option, set, get_option, get, start;
+ boost::spirit::qi::rule<std::string::const_iterator> stats, version, destroy, flush, flush_all, set_option, set, get_option, get, start;
};
// grammar are expensive to construct. to be thread-safe, let's make one grammar per thread.
View
13 include/darner/queue/queue.h
@@ -11,6 +11,7 @@
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <leveldb/db.h>
#include <leveldb/comparator.h>
@@ -28,7 +29,7 @@ namespace darner {
*
* queue is not thread-safe, it assumes a single-thread calling and operating the provided io_service
*/
-class queue
+class queue : public boost::enable_shared_from_this<queue>
{
public:
@@ -41,9 +42,15 @@ class queue
// open or create the queue at the path
queue(boost::asio::io_service& ios, const std::string& path);
+ // destruct the queue, and delete the journal if destroy() was called
+ ~queue();
+
// wait up to wait_ms milliseconds for an item to become available, then call cb with success or timeout
void wait(size_type wait_ms, const wait_callback& cb);
+ // delete the journal upon destruction
+ void destroy();
+
// returns the number of items in the queue
size_type count() const;
@@ -246,11 +253,13 @@ class queue
std::set<id_type> returned_; // items < TAIL that were reserved but later returned (not popped)
+ bool destroy_; // if true, we will delete the journal upon destruction
+
boost::ptr_list<waiter> waiters_;
boost::ptr_list<waiter>::iterator wake_up_it_;
boost::asio::io_service& ios_;
- const std::string path_;
+ std::string path_;
};
} // darner
View
26 include/darner/util/queue_map.hpp
@@ -5,7 +5,7 @@
#include <map>
#include <boost/asio.hpp>
-#include <boost/shared_ptr.hpp>
+#include <boost/make_shared.hpp>
#include <boost/filesystem/operations.hpp>
#include "darner/queue/queue.h"
@@ -32,8 +32,7 @@ class queue_map
{
std::string queue_name =
boost::filesystem::path(it->path().filename()).string(); // useless recast for boost backwards compat
- boost::shared_ptr<queue> p(new queue(ios_, (data_path_ / queue_name).string()));
- queues_.insert(container_type::value_type(queue_name, p));
+ queues_[queue_name] = boost::make_shared<queue>(boost::ref(ios_), (data_path_ / queue_name).string());
}
}
@@ -42,14 +41,27 @@ class queue_map
iterator it = queues_.find(queue_name);
if (it == queues_.end())
- {
- boost::shared_ptr<queue> p(new queue(ios_, (data_path_ / queue_name).string()));
- it = queues_.insert(container_type::value_type(queue_name, p)).first;
- }
+ it = queues_.insert(container_type::value_type(queue_name,
+ boost::make_shared<queue>(boost::ref(ios_), (data_path_ / queue_name).string()))).first;
return it->second;
}
+ void erase(const std::string& queue_name, bool recreate = false)
+ {
+ iterator it = queues_.find(queue_name);
+
+ if (it == queues_.end())
+ return;
+
+ it->second->destroy();
+
+ queues_.erase(it);
+
+ if (recreate)
+ queues_[queue_name] = boost::make_shared<queue>(boost::ref(ios_), (data_path_ / queue_name).string());
+ }
+
iterator begin() { return queues_.begin(); }
iterator end() { return queues_.end(); }
const_iterator begin() const { return queues_.begin(); }
View
16 src/net/handler.cpp
@@ -63,6 +63,7 @@ void handler::parse_request(const system::error_code& e, size_t bytes_transferre
{
case request::RT_STATS: write_stats(); break;
case request::RT_VERSION: write_version(); break;
+ case request::RT_DESTROY: destroy(); break;
case request::RT_FLUSH: flush(); break;
case request::RT_FLUSH_ALL: flush_all(); break;
case request::RT_SET: ++stats_.cmd_sets; set(); break;
@@ -90,14 +91,25 @@ void handler::write_version()
async_write(socket_, buffer(buf_), bind(&handler::read_request, shared_from_this(), _1, _2));
}
+void handler::destroy()
+{
+ queues_.erase(req_.queue, false);
+ return end("DELETED\r\n");
+}
+
void handler::flush()
{
- // TODO: implement
+ // TODO: flush should guarantee that an item that's halfway pushed should still appear after
+ // the flush. right now, item will only appear to a client that was waiting to pop before the flush
+ queues_.erase(req_.queue, true);
+ return end();
}
void handler::flush_all()
{
- // TODO: implement
+ for (queue_map::iterator it = queues_.begin(); it != queues_.end(); ++it)
+ queues_.erase(it->first, true);
+ return end("Flushed all queues.\r\n");
}
void handler::set()
View
6 src/net/request.cpp
@@ -22,6 +22,10 @@ request_grammar::request_grammar()
version =
lit("version") [phoenix::ref(req.type) = request::RT_VERSION];
+ destroy =
+ lit("delete ") [phoenix::ref(req.type) = request::RT_DESTROY]
+ >> key_name [phoenix::ref(req.queue) = _1];
+
flush =
lit("flush ") [phoenix::ref(req.type) = request::RT_FLUSH]
>> key_name [phoenix::ref(req.queue) = _1];
@@ -61,5 +65,5 @@ request_grammar::request_grammar()
>> *get_option
>> -lit(' '); // be permissive to clients inserting spaces
- start = (stats | version | flush | flush_all | set | get) >> eol;
+ start = (stats | version | destroy | flush | flush_all | set | get) >> eol;
}
View
41 src/queue/queue.cpp
@@ -1,6 +1,8 @@
#include "darner/queue/queue.h"
#include <boost/bind.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/filesystem/operations.hpp>
#include <leveldb/iterator.h>
#include <leveldb/write_batch.h>
@@ -18,6 +20,7 @@ queue::queue(asio::io_service& ios, const string& path)
chunks_head_(key_type::KT_CHUNK, 0),
items_open_(0),
bytes_evicted_(0),
+ destroy_(false),
wake_up_it_(waiters_.begin()),
ios_(ios),
path_(path)
@@ -51,12 +54,44 @@ queue::queue(asio::io_service& ios, const string& path)
}
}
+queue::~queue()
+{
+ journal_.reset();
+ // TODO: most non-crap filesystems should be able to drop large files quickly, but this will block painfully on ext3.
+ // one ugly solution is a separate delete thread. or we can wait out everyone upgrading to ext4 :)
+ if (destroy_)
+ boost::filesystem::remove_all(path_);
+}
+
void queue::wait(size_type wait_ms, const wait_callback& cb)
{
ptr_list<waiter>::iterator it = waiters_.insert(waiters_.end(), new waiter(ios_, wait_ms, cb));
if (wake_up_it_ == waiters_.end())
wake_up_it_ = it;
- it->timer.async_wait(bind(&queue::waiter_wakeup, this, asio::placeholders::error, it));
+ it->timer.async_wait(bind(&queue::waiter_wakeup, shared_from_this(), asio::placeholders::error, it));
+}
+
+void queue::destroy()
+{
+ if (destroy_)
+ return; // already going to delete on dtor!
+
+ // rename the journal dir in case the user creates a new queue with the same name before this one is destroyed
+ string new_path = path_ + ".0";
+ for (size_t i = 0; boost::filesystem::exists(new_path); ++i)
+ new_path = path_ + "." + lexical_cast<string>(i);
+ journal_.reset();
+ boost::filesystem::rename(path_, new_path);
+
+ leveldb::DB* pdb;
+ leveldb::Options options;
+ options.comparator = cmp_.get();
+ if (!leveldb::DB::Open(options, new_path, &pdb).ok())
+ throw runtime_error("can't open journal: " + path_); // should never happen, but fatal if it does
+
+ journal_.reset(pdb);
+ path_ = new_path;
+ destroy_ = true;
}
queue::size_type queue::count() const
@@ -111,6 +146,8 @@ bool queue::pop_begin(id_type& result)
else
return false;
+ ++items_open_;
+
return true;
}
@@ -131,8 +168,6 @@ void queue::pop_read(std::string& result_item, header_type& result_header, id_ty
throw system::system_error(system::errc::io_error,
boost::asio::error::get_system_category()); // anything else is bad data
}
-
- ++items_open_;
}
void queue::pop_end(bool erase, id_type id, const header_type& header)
View
13 tests/queue.cpp
@@ -197,4 +197,17 @@ BOOST_FIXTURE_TEST_CASE( test_push_zero, fixtures::basic_queue )
BOOST_REQUIRE_EQUAL(value, pop_value_);
}
+// test that we can delete a queue when we are done with it
+BOOST_FIXTURE_TEST_CASE( test_delete_queue, fixtures::basic_queue )
+{
+ queue_->destroy();
+ BOOST_REQUIRE(!filesystem::exists(tmp_ / "queue"));
+ BOOST_REQUIRE(filesystem::exists(tmp_ / "queue.0")); // first delete gets .0
+ darner::queue queue2(ios_, (tmp_ / "queue").string());
+ queue2.destroy();
+ BOOST_REQUIRE(filesystem::exists(tmp_ / "queue.1")); // second delete gets .1
+ queue_.reset();
+ BOOST_REQUIRE(!filesystem::exists(tmp_ / "queue.0")); // finally, destroying the queue deletes the journal
+}
+
BOOST_AUTO_TEST_SUITE_END()
Something went wrong with that request. Please try again.