Permalink
Browse files

update handler to use new semantics

  • Loading branch information...
1 parent 8dd62c9 commit 4ba43b99858c3eed93b01da4aba5f3bd9885cc34 @erikfrey erikfrey committed Aug 11, 2012
Showing with 91 additions and 86 deletions.
  1. +2 −6 include/darner/net/handler.h
  2. +89 −80 src/net/handler.cpp
@@ -62,13 +62,11 @@ class handler : public boost::enable_shared_from_this<handler>
void set_on_read_chunk(const boost::system::error_code& e, size_t bytes_transferred);
- void set_on_write_chunk(const boost::system::error_code& e);
-
// get loop:
- void get_on_pop_close_pre(const boost::system::error_code& e);
+ void get_on_queue_return(const boost::system::error_code& e);
- void get_on_read_first_chunk(const boost::system::error_code& e);
+ void write_first_chunk();
void get_on_read_next_chunk(const boost::system::error_code& e);
@@ -83,8 +81,6 @@ class handler : public boost::enable_shared_from_this<handler>
void finalize(const boost::system::error_code& e, size_t bytes_transferred);
- void do_nothing(const boost::system::error_code& e);
-
const queue::size_type chunk_size_;
socket_type socket_;
View
@@ -141,23 +141,20 @@ void handler::set_on_read_chunk(const system::error_code& e, size_t bytes_transf
in_.consume(chunk_size_);
}
- queues_.get_io_service().post(bind(
- &oqstream::write, &*push_stream_, cref(buf_), socket_.get_io_service().wrap(bind(
- &handler::set_on_write_chunk, shared_from_this(), _1))));
-}
-
-void handler::set_on_write_chunk(const boost::system::error_code& e)
-{
- if (e)
+ try
{
- log::ERROR("handler<%1%>::set_on_write_chunk: %2%", shared_from_this(), e.message());
- return done(false, "SERVER_ERROR " + e.message() + "\r\n");
+ push_stream_->write(buf_);
+ }
+ catch (const system::system_error& ex)
+ {
+ log::ERROR("handler<%1%>::set_on_write_chunk: %2%", shared_from_this(), ex.code().message());
+ return done(false, "SERVER_ERROR " + ex.code().message() + "\r\n");
}
- if (push_stream_->tell() == req_.num_bytes) // all done!
+ if (push_stream_->tell() == req_.num_bytes) // are we all done?
return done(true, "STORED\r\n");
- // second verse, same as the first
+ // otherwise, second verse, same as the first
queue::size_type remaining = req_.num_bytes - push_stream_->tell();
queue::size_type required = remaining > chunk_size_ ? chunk_size_ : remaining + 2;
async_read(
@@ -170,44 +167,47 @@ void handler::get()
if (req_.get_abort && (req_.get_open || req_.get_close))
return done(false, "CLIENT_ERROR abort must be by itself\r\n");
- if (pop_stream_ && !req_.get_close && !req_.get_abort)
- return done(false, "CLIENT_ERROR close current item first\r\n");
-
- if (!pop_stream_)
+ if (pop_stream_) // before getting the next item, close this one
{
- pop_stream_ = in_place(ref(queues_[req_.queue]), req_.wait_ms);
- queues_.get_io_service().post(
- bind(&iqstream::read, &*pop_stream_, ref(buf_),
- socket_.get_io_service().wrap(bind(&handler::get_on_read_first_chunk, shared_from_this(), _1))));
- }
- else // before getting the next item, close or abort this one out first
- queues_.get_io_service().post(
- bind(&iqstream::close, &*pop_stream_, req_.get_close,
- socket_.get_io_service().wrap(bind(&handler::get_on_pop_close_pre, shared_from_this(), _1))));
-}
+ if (!req_.get_close && !req_.get_abort) // needs to be a close or an abort
+ return done(false, "CLIENT_ERROR close current item first\r\n");
+
+ try
+ {
+ pop_stream_->close(req_.get_close);
+ }
+ catch (const system::system_error& ex)
+ {
+ log::ERROR("handler<%1%>::get_on_pop_close_pre: %2%", shared_from_this(), ex.code().message());
+ return done(false, "SERVER_ERROR " + ex.code().message() + "\r\n");
+ }
-void handler::get_on_pop_close_pre(const boost::system::error_code& e)
-{
- if (e)
- {
- log::ERROR("handler<%1%>::get_on_pop_close_pre: %2%", shared_from_this(), e.message());
- return done(false, "SERVER_ERROR " + e.message() + "\r\n");
+ pop_stream_.reset();
}
- pop_stream_.reset();
-
if (req_.get_abort)
return done(true, "END\r\n"); // aborts go no further
- pop_stream_ = in_place(ref(queues_[req_.queue]), req_.wait_ms);
- queues_.get_io_service().post(
- bind(&iqstream::read, &*pop_stream_, ref(buf_),
- socket_.get_io_service().wrap(bind(&handler::get_on_read_first_chunk, shared_from_this(), _1))));
+ pop_stream_ = in_place(ref(queues_[req_.queue]));
+
+ if (!pop_stream_->read(buf_))
+ {
+ // couldn't read... can we at least wait?
+ if (req_.wait_ms)
+ queues_[req_.queue].wait(req_.wait_ms, bind(&handler::get_on_queue_return, shared_from_this(), _1));
+ else
+ {
+ pop_stream_.reset();
+ return done(true, "END\r\n");
+ }
+ }
+ else
+ write_first_chunk();
}
-void handler::get_on_read_first_chunk(const boost::system::error_code& e)
+void handler::get_on_queue_return(const boost::system::error_code& e)
{
- if (e == asio::error::timed_out || e == asio::error::not_found)
+ if (e == asio::error::timed_out)
{
pop_stream_.reset();
return done(true, "END\r\n");
@@ -217,7 +217,12 @@ void handler::get_on_read_first_chunk(const boost::system::error_code& e)
log::ERROR("handler<%1%>::get_on_read_first_chunk: %2%", shared_from_this(), e.message());
return done(false, "SERVER_ERROR " + e.message() + "\r\n");
}
+ else
+ write_first_chunk();
+}
+void handler::write_first_chunk()
+{
ostringstream oss;
oss << "VALUE " << req_.queue << " 0 " << pop_stream_->size() << "\r\n";
header_buf_ = oss.str();
@@ -226,17 +231,6 @@ void handler::get_on_read_first_chunk(const boost::system::error_code& e)
async_write(socket_, bufs, bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2));
}
-void handler::get_on_read_next_chunk(const boost::system::error_code& e)
-{
- if (e)
- {
- log::ERROR("handler<%1%>::get_on_read_next_chunk: %2%", shared_from_this(), e.message());
- return done(false);
- }
-
- async_write(socket_, buffer(buf_), bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2));
-}
-
void handler::get_on_write_chunk(const boost::system::error_code& e, size_t bytes_transferred)
{
if (e)
@@ -247,30 +241,33 @@ void handler::get_on_write_chunk(const boost::system::error_code& e, size_t byte
if (pop_stream_->tell() == pop_stream_->size())
{
- if (req_.get_open)
- done(true, "\r\nEND\r\n");
- else
- queues_.get_io_service().post(
- bind(&iqstream::close, &*pop_stream_, true,
- socket_.get_io_service().wrap(bind(&handler::get_on_pop_close_post, shared_from_this(), _1))));
- }
- else
- queues_.get_io_service().post(
- bind(&iqstream::read, &*pop_stream_, ref(buf_),
- socket_.get_io_service().wrap(bind(&handler::get_on_read_next_chunk, shared_from_this(), _1))));
-}
-
-void handler::get_on_pop_close_post(const boost::system::error_code& e)
-{
- if (e)
- {
- log::ERROR("handler<%1%>::get_on_pop_close_post: %2%", shared_from_this(), e.message());
- return done(false);
+ if (!req_.get_open)
+ {
+ try
+ {
+ pop_stream_->close(true);
+ }
+ catch (const system::system_error& ex)
+ {
+ log::ERROR("handler<%1%>::get_on_write_chunk: %2%", shared_from_this(), ex.code().message());
+ return done(false);
+ }
+ pop_stream_.reset();
+ }
+ return done(true, "\r\nEND\r\n");
}
else
{
- pop_stream_.reset();
- done(true, "\r\nEND\r\n");
+ try
+ {
+ pop_stream_->read(buf_);
+ }
+ catch (const system::system_error& ex)
+ {
+ log::ERROR("handler<%1%>::get_on_write_chunk: %2%", shared_from_this(), ex.code().message());
+ return done(false);
+ }
+ async_write(socket_, buffer(buf_), bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2));
}
}
@@ -296,14 +293,26 @@ void handler::done(bool success, const std::string& msg /* = "" */)
void handler::finalize(const system::error_code& e, size_t bytes_transferred)
{
if (pop_stream_)
- queues_.get_io_service().post(
- bind(&iqstream::close, &*pop_stream_, req_.get_close,
- socket_.get_io_service().wrap(bind(&handler::do_nothing, shared_from_this(), _1))));
+ {
+ try
+ {
+ pop_stream_->close(false);
+ }
+ catch (const system::system_error& ex)
+ {
+ log::ERROR("handler<%1%>::finalize: %2%", shared_from_this(), ex.code().message());
+ }
+ }
if (push_stream_)
- queues_.get_io_service().post(bind(
- &oqstream::cancel, &*push_stream_, oqstream::success_callback(bind(
- &handler::do_nothing, shared_from_this(), _1))));
+ {
+ try
+ {
+ //push_stream_->cancel();
+ }
+ catch (const system::system_error& ex)
+ {
+ log::ERROR("handler<%1%>::finalize: %2%", shared_from_this(), ex.code().message());
+ }
+ }
}
-
-void handler::do_nothing(const system::error_code& e) {}

0 comments on commit 4ba43b9

Please sign in to comment.