diff --git a/include/darner/net/handler.h b/include/darner/net/handler.h index aebedf6..52192d5 100644 --- a/include/darner/net/handler.h +++ b/include/darner/net/handler.h @@ -40,6 +40,22 @@ class handler : public boost::enable_shared_from_this private: + // keep a tally on the operations that touch the queues currently running + // (with the exception of flushes) + class scoped_op + { + public: + scoped_op(int& op_counter) + : op_counter_(op_counter) + { ++op_counter_; } + + ~scoped_op() + { --op_counter_; } + + private: + int& op_counter_; + }; + // read from the socket up until a newline (request delimter) void read_request(const boost::system::error_code& e, size_t bytes_transferred); @@ -62,7 +78,7 @@ class handler : public boost::enable_shared_from_this // set loop: - void set_on_read_chunk(const boost::system::error_code& e, size_t bytes_transferred); + void set_on_read_chunk(const boost::system::error_code& e, size_t bytes_transferred, boost::shared_ptr pOp); // get loop: @@ -70,7 +86,7 @@ class handler : public boost::enable_shared_from_this void get_on_read_next_chunk(const boost::system::error_code& e); - void get_on_write_chunk(const boost::system::error_code& e, size_t bytes_transferred); + void get_on_write_chunk(const boost::system::error_code& e, size_t bytes_transferred, boost::shared_ptr pOp); void get_on_pop_close_post(const boost::system::error_code& e); @@ -117,6 +133,9 @@ class handler : public boost::enable_shared_from_this request_parser& parser_; queue_map& queues_; stats& stats_; + bool flushing_; + int ops_open_; + boost::asio::streambuf in_; std::vector header_buf_; std::string buf_; diff --git a/include/darner/queue/queue.h b/include/darner/queue/queue.h index b7dfe24..9206aba 100644 --- a/include/darner/queue/queue.h +++ b/include/darner/queue/queue.h @@ -50,6 +50,9 @@ class queue // writes out stats (stuff like queue count) to a stream void write_stats(const std::string& name, std::ostringstream& out) const; + // flush the db by deleting it and creating a new one + void flush_db(); + protected: friend class iqstream; @@ -194,6 +197,9 @@ class queue void FindShortSuccessor(std::string*) const {} }; + // initalize level db + void init_db(); + // any operation that adds to the queue should crank a wakeup void wake_up(); @@ -241,6 +247,7 @@ class queue size_type items_open_; // an open item is < TAIL but not in returned_ size_type bytes_evicted_; // after we've evicted 32MB from the journal, compress that evicted range + size_type total_flushes_; // total number of times this queue has been flushed std::set returned_; // items < TAIL that were reserved but later returned (not popped) diff --git a/include/darner/util/queue_map.hpp b/include/darner/util/queue_map.hpp index a82404c..64a1501 100644 --- a/include/darner/util/queue_map.hpp +++ b/include/darner/util/queue_map.hpp @@ -50,6 +50,15 @@ class queue_map return it->second; } + bool flush_queue(const std::string& queue_name) + { + iterator it = queues_.find(queue_name); + if (it == queues_.end()) + return; // nothing to do + + it->second->flush_db(); + } + iterator begin() { return queues_.begin(); } iterator end() { return queues_.end(); } const_iterator begin() const { return queues_.begin(); } diff --git a/src/net/handler.cpp b/src/net/handler.cpp index a7f601f..6b91d58 100644 --- a/src/net/handler.cpp +++ b/src/net/handler.cpp @@ -3,12 +3,30 @@ #include #include +#include using namespace std; using namespace boost; using namespace boost::asio; using namespace darner; +// simple RAII to make sure we're never gonna get out of a flush +// operation with a flushing_ variable set to true +class scoped_flush +{ +public: + scoped_flush(bool& flush) + : flush_(flush) + { flush_ = true; } + + ~scoped_flush() + { flush_ = false; } + +private: + bool& flush_; +}; + + handler::handler(io_service& ios, request_parser& parser, queue_map& queues, @@ -19,6 +37,8 @@ handler::handler(io_service& ios, parser_(parser), queues_(queues), stats_(_stats), + flushing_(false), + ops_open_(0), in_(chunk_size + 2) // make room for \r\n { } @@ -75,6 +95,8 @@ void handler::write_stats() ostringstream oss; stats_.write(oss); + scoped_op op(ops_open_); + for (queue_map::const_iterator it = queues_.begin(); it != queues_.end(); ++it) it->second->write_stats(it->first, oss); @@ -92,16 +114,48 @@ void handler::write_version() void handler::flush() { - // TODO: implement + { + scoped_flush sf(flushing_); + + if (ops_open_ > 0) + { + // we still have some ops running, let's wait 10ms + deadline_timer t(socket_.get_io_service(), boost::posix_time::milliseconds(10)); + t.async_wait(bind(&handler::flush, shared_from_this())); + } + + queues_.flush_queue(req_.queue); + } // flush set to false + + return end("OK\r\n"); } void handler::flush_all() { - // TODO: implement + { + scoped_flush sf(flushing_); + + if (ops_open_ > 0) + { + // we still have some ops running, let's wait 10ms + deadline_timer t(socket_.get_io_service(), boost::posix_time::milliseconds(10)); + t.async_wait(bind(&handler::flush_all, shared_from_this())); + } + + for (queue_map::const_iterator it = queues_.begin(); it != queues_.end(); ++it) + queues_.flush_queue(it->first); + } // flush set to false + + return end("OK\r\n"); } void handler::set() { + if (flushing_) + return end("NOT STORED\r\n"); + + shared_ptr pOp(new scoped_op(ops_open_)); + // round up the number of chunks we need, and fetch \r\n if it's just one chunk push_stream_.open(queues_[req_.queue], (req_.num_bytes + chunk_size_ - 1) / chunk_size_); queue::size_type remaining = req_.num_bytes - push_stream_.tell(); @@ -109,13 +163,16 @@ void handler::set() async_read( socket_, in_, transfer_at_least(required > in_.size() ? required - in_.size() : 0), - bind(&handler::set_on_read_chunk, shared_from_this(), _1, _2)); + bind(&handler::set_on_read_chunk, shared_from_this(), _1, _2, pOp)); } -void handler::set_on_read_chunk(const system::error_code& e, size_t bytes_transferred) +void handler::set_on_read_chunk(const system::error_code& e, size_t bytes_transferred, shared_ptr pOp) { if (e) + { + --ops_open_; return error("set_on_read_chunk", e); + } asio::streambuf::const_buffers_type bufs = in_.data(); queue::size_type bytes_remaining = req_.num_bytes - push_stream_.tell(); @@ -154,11 +211,14 @@ void handler::set_on_read_chunk(const system::error_code& e, size_t bytes_transf queue::size_type required = remaining > chunk_size_ ? chunk_size_ : remaining + 2; async_read( socket_, in_, transfer_at_least(required > in_.size() ? required - in_.size() : 0), - bind(&handler::set_on_read_chunk, shared_from_this(), _1, _2)); + bind(&handler::set_on_read_chunk, shared_from_this(), _1, _2, pOp)); } void handler::get() { + if (flushing_) + return end("OK\r\n"); // nothing to return + if (req_.get_abort && (req_.get_open || req_.get_close || req_.get_peek)) return error("abort must be by itself", "CLIENT_ERROR"); @@ -190,6 +250,9 @@ void handler::get() return end(); } + // we're starting the operation on the queue here + shared_ptr pOp(new scoped_op(ops_open_)); + try { pop_stream_.read(buf_); @@ -222,7 +285,7 @@ void handler::get() else { array bufs = {{ buffer(header_buf_), buffer(buf_) }}; - async_write(socket_, bufs, bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2)); + async_write(socket_, bufs, bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2, pOp)); } } @@ -236,7 +299,7 @@ void handler::get_on_queue_return(const boost::system::error_code& e) get(); } -void handler::get_on_write_chunk(const boost::system::error_code& e, size_t bytes_transferred) +void handler::get_on_write_chunk(const boost::system::error_code& e, size_t bytes_transferred, boost::shared_ptr pOp) { if (e) return error("get_on_write_chunk", e, false); @@ -269,6 +332,6 @@ void handler::get_on_write_chunk(const boost::system::error_code& e, size_t byte return error("get_on_write_chunk", ex, false); } - async_write(socket_, buffer(buf_), bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2)); + async_write(socket_, buffer(buf_), bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2, pOp)); } } diff --git a/src/queue/queue.cpp b/src/queue/queue.cpp index c8fe4c8..8fc156d 100644 --- a/src/queue/queue.cpp +++ b/src/queue/queue.cpp @@ -1,6 +1,7 @@ #include "darner/queue/queue.h" #include +#include #include #include @@ -18,16 +19,28 @@ queue::queue(asio::io_service& ios, const string& path) chunks_head_(key_type::KT_CHUNK, 0), items_open_(0), bytes_evicted_(0), + total_flushes_(0), wake_up_it_(waiters_.begin()), ios_(ios), path_(path) { + init_db(); +} + +void queue::init_db() +{ + queue_head_.id = 0; + queue_tail_.id = 0; + chunks_head_.id = 0; + //wake_up_it_ = waiters_.begin(); + leveldb::Options options; options.create_if_missing = true; options.comparator = cmp_.get(); + leveldb::DB* pdb; - if (!leveldb::DB::Open(options, path, &pdb).ok()) - throw runtime_error("can't open journal: " + path); + if (!leveldb::DB::Open(options, path_, &pdb).ok()) + throw runtime_error("can't open journal: " + path_); journal_.reset(pdb); // get head and tail of queue scoped_ptr it(journal_->NewIterator(leveldb::ReadOptions())); @@ -69,6 +82,7 @@ void queue::write_stats(const string& name, ostringstream& out) const out << "STAT queue_" << name << "_items " << count() << "\r\n"; out << "STAT queue_" << name << "_waiters " << waiters_.size() << "\r\n"; out << "STAT queue_" << name << "_open_transactions " << items_open_ << "\r\n"; + out << "STAT queue_" << name << "_total_flushes " << total_flushes_ << "\r\n"; } // protected: @@ -197,6 +211,22 @@ void queue::erase_chunks(const header_type& header) write(batch); } +void queue::flush_db() +{ + log::INFO("queue <%1%>: flushing db", path_); + + wake_up(); // in case there's a waiter waiting + + journal_.reset(); // close level_db + boost::filesystem::remove_all(path_); + + items_open_ = 0; + bytes_evicted_ = 0; + + init_db(); // now create a new one + ++total_flushes_; +} + // private: void queue::wake_up() diff --git a/tests/queue.cpp b/tests/queue.cpp index abd3186..67c20a2 100644 --- a/tests/queue.cpp +++ b/tests/queue.cpp @@ -127,6 +127,21 @@ BOOST_FIXTURE_TEST_CASE( test_queue_count, fixtures::basic_queue ) BOOST_REQUIRE_EQUAL(queue_->count(), 1); } +// test that we can delete and reopen queue +BOOST_FIXTURE_TEST_CASE( test_queue_delete_reopen, fixtures::basic_queue ) +{ + string value = "More than once, I've wished my real life had a delete key"; + + oqs_.open(queue_, 1); + oqs_.write(value); + queue_.reset(new darner::queue(ios_, (tmp_ / "queue").string())); + BOOST_REQUIRE_EQUAL(queue_->count(), 1); + + // delete and start fresh + queue_->flush_db(); + BOOST_REQUIRE_EQUAL(queue_->count(), 0); +} + // test overruning an oqstream raises BOOST_FIXTURE_TEST_CASE( test_oqstream_overflow, fixtures::basic_queue ) {