Permalink
Browse files

make sure no set/get/stats is going on while flushing

  • Loading branch information...
1 parent f0e3671 commit 22bdbf34d6058675f3844c856e06220d10ac4519 @nova77 nova77 committed Mar 14, 2013
Showing with 89 additions and 12 deletions.
  1. +21 −2 include/darner/net/handler.h
  2. +68 −9 src/net/handler.cpp
  3. +0 −1 src/queue/queue.cpp
@@ -40,6 +40,22 @@ class handler : public boost::enable_shared_from_this<handler>
private:
+ // keep a tally on the operations that touch the queues currently running
+ // (with the exception of flushes)
+ class scoped_op
+ {
+ public:
+ scoped_op(int& op_counter)
+ : op_counter_(op_counter)
+ { ++op_counter_; }
+
+ ~scoped_op()
+ { --op_counter_; }
+
+ private:
+ int& op_counter_;
+ };
+
// read from the socket up until a newline (request delimter)
void read_request(const boost::system::error_code& e, size_t bytes_transferred);
@@ -62,15 +78,15 @@ class handler : public boost::enable_shared_from_this<handler>
// set loop:
- void set_on_read_chunk(const boost::system::error_code& e, size_t bytes_transferred);
+ void set_on_read_chunk(const boost::system::error_code& e, size_t bytes_transferred, boost::shared_ptr<scoped_op> pOp);
// get loop:
void get_on_queue_return(const boost::system::error_code& e);
void get_on_read_next_chunk(const boost::system::error_code& e);
- void get_on_write_chunk(const boost::system::error_code& e, size_t bytes_transferred);
+ void get_on_write_chunk(const boost::system::error_code& e, size_t bytes_transferred, boost::shared_ptr<scoped_op> pOp);
void get_on_pop_close_post(const boost::system::error_code& e);
@@ -117,6 +133,9 @@ class handler : public boost::enable_shared_from_this<handler>
request_parser& parser_;
queue_map& queues_;
stats& stats_;
+ bool flushing_;
+ int ops_open_;
+
boost::asio::streambuf in_;
std::vector<char> header_buf_;
std::string buf_;
View
@@ -3,12 +3,30 @@
#include <cstdio>
#include <boost/array.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
using namespace std;
using namespace boost;
using namespace boost::asio;
using namespace darner;
+// simple RAII to make sure we're never gonna get out of a flush
+// operation with a flushing_ variable set to true
+class scoped_flush
+{
+public:
+ scoped_flush(bool& flush)
+ : flush_(flush)
+ { flush_ = true; }
+
+ ~scoped_flush()
+ { flush_ = false; }
+
+private:
+ bool& flush_;
+};
+
+
handler::handler(io_service& ios,
request_parser& parser,
queue_map& queues,
@@ -19,6 +37,8 @@ handler::handler(io_service& ios,
parser_(parser),
queues_(queues),
stats_(_stats),
+ flushing_(false),
+ ops_open_(0),
in_(chunk_size + 2) // make room for \r\n
{
}
@@ -75,6 +95,8 @@ void handler::write_stats()
ostringstream oss;
stats_.write(oss);
+ scoped_op op(ops_open_);
+
for (queue_map::const_iterator it = queues_.begin(); it != queues_.end(); ++it)
it->second->write_stats(it->first, oss);
@@ -92,37 +114,64 @@ void handler::write_version()
void handler::flush()
{
- queues_.flush_queue(req_.queue);
+ {
+ scoped_flush sf(flushing_);
+
+ if (ops_open_ > 0)
+ {
+ // we still have some ops running, let's wait 10ms
+ deadline_timer t(socket_.get_io_service(), boost::posix_time::milliseconds(10));
+ t.async_wait(bind(&handler::flush, shared_from_this()));
+ }
+
+ queues_.flush_queue(req_.queue);
+ } // flush set to false
buf_ = "OK\r\n";
async_write(socket_, buffer(buf_), bind(&handler::read_request, shared_from_this(), _1, _2));
}
void handler::flush_all()
{
- for (queue_map::const_iterator it = queues_.begin(); it != queues_.end(); ++it)
- queues_.flush_queue(it->first);
+ {
+ scoped_flush sf(flushing_);
+
+ if (ops_open_ > 0)
+ {
+ // we still have some ops running, let's wait 10ms
+ deadline_timer t(socket_.get_io_service(), boost::posix_time::milliseconds(10));
+ t.async_wait(bind(&handler::flush_all, shared_from_this()));
+ }
+
+ for (queue_map::const_iterator it = queues_.begin(); it != queues_.end(); ++it)
+ queues_.flush_queue(it->first);
+ } // flush set to false
buf_ = "OK\r\n";
async_write(socket_, buffer(buf_), bind(&handler::read_request, shared_from_this(), _1, _2));
}
void handler::set()
{
+ shared_ptr<scoped_op> pOp(new scoped_op(ops_open_));
+
// round up the number of chunks we need, and fetch \r\n if it's just one chunk
push_stream_.open(queues_[req_.queue], (req_.num_bytes + chunk_size_ - 1) / chunk_size_);
queue::size_type remaining = req_.num_bytes - push_stream_.tell();
queue::size_type required = remaining > chunk_size_ ? chunk_size_ : remaining + 2;
async_read(
socket_, in_, transfer_at_least(required > in_.size() ? required - in_.size() : 0),
- bind(&handler::set_on_read_chunk, shared_from_this(), _1, _2));
+ bind(&handler::set_on_read_chunk, shared_from_this(), _1, _2, pOp));
}
-void handler::set_on_read_chunk(const system::error_code& e, size_t bytes_transferred)
+void handler::set_on_read_chunk(const system::error_code& e, size_t bytes_transferred, shared_ptr<scoped_op> pOp)
{
if (e)
+ {
+ --ops_open_;
return error("set_on_read_chunk", e);
+ }
asio::streambuf::const_buffers_type bufs = in_.data();
queue::size_type bytes_remaining = req_.num_bytes - push_stream_.tell();
@@ -161,11 +210,18 @@ void handler::set_on_read_chunk(const system::error_code& e, size_t bytes_transf
queue::size_type required = remaining > chunk_size_ ? chunk_size_ : remaining + 2;
async_read(
socket_, in_, transfer_at_least(required > in_.size() ? required - in_.size() : 0),
- bind(&handler::set_on_read_chunk, shared_from_this(), _1, _2));
+ bind(&handler::set_on_read_chunk, shared_from_this(), _1, _2, pOp));
}
void handler::get()
{
+ if (flushing_)
+ {
+ buf_ = "OK\r\n";
+ async_write(socket_, buffer(buf_), bind(&handler::read_request, shared_from_this(), _1, _2));
+ return;
+ }
+
if (req_.get_abort && (req_.get_open || req_.get_close || req_.get_peek))
return error("abort must be by itself", "CLIENT_ERROR");
@@ -197,6 +253,9 @@ void handler::get()
return end();
}
+ // we're starting the operation on the queue here
+ shared_ptr<scoped_op> pOp(new scoped_op(ops_open_));
+
try
{
pop_stream_.read(buf_);
@@ -229,7 +288,7 @@ void handler::get()
else
{
array<const_buffer, 2> bufs = {{ buffer(header_buf_), buffer(buf_) }};
- async_write(socket_, bufs, bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2));
+ async_write(socket_, bufs, bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2, pOp));
}
}
@@ -243,7 +302,7 @@ void handler::get_on_queue_return(const boost::system::error_code& e)
get();
}
-void handler::get_on_write_chunk(const boost::system::error_code& e, size_t bytes_transferred)
+void handler::get_on_write_chunk(const boost::system::error_code& e, size_t bytes_transferred, boost::shared_ptr<scoped_op> pOp)
{
if (e)
return error("get_on_write_chunk", e, false);
@@ -276,6 +335,6 @@ void handler::get_on_write_chunk(const boost::system::error_code& e, size_t byte
return error("get_on_write_chunk", ex, false);
}
- async_write(socket_, buffer(buf_), bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2));
+ async_write(socket_, buffer(buf_), bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2, pOp));
}
}
View
@@ -217,7 +217,6 @@ void queue::flush_db()
wake_up(); // in case there's a waiter waiting
- waiters_.clear();
journal_.reset(); // close level_db
boost::filesystem::remove_all(path_);

0 comments on commit 22bdbf3

Please sign in to comment.