Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #6 from wavii/clean_streams

Clean streams
  • Loading branch information...
commit 9b7be61bf05cac16bfca35d2c5aa8c93959c66f7 2 parents df419f3 + d627460
Erik Frey erikfrey authored
43 include/darner/net/handler.h
View
@@ -1,9 +1,10 @@
#ifndef __DARNER_HANDLER_HPP__
#define __DARNER_HANDLER_HPP__
+#include <sstream>
+
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
-#include <boost/optional.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
@@ -76,10 +77,40 @@ class handler : public boost::enable_shared_from_this<handler>
// utils
- // call done after a successful call or a failure. on fail, ensures the handler cleans up
- void done(bool success, const std::string& msg = "");
+ void end(const char* msg = "END\r\n")
+ {
+ boost::asio::async_write(
+ socket_, boost::asio::buffer(msg), boost::bind(&handler::read_request, shared_from_this(), _1, _2));
+ }
+
+ void error(const char* msg, const char* error_type = "ERROR")
+ {
+ std::ostringstream oss;
+ oss << error_type << ' ' << msg << "\r\n";
+ buf_ = oss.str();
+
+ boost::asio::async_write(
+ socket_, boost::asio::buffer(buf_), boost::bind(&handler::hang_up, shared_from_this(), _1, _2));
+ }
+
+ void error(const char* location, const boost::system::error_code& e)
+ {
+ log::ERROR("handler<%1%>::%2%: %3%", shared_from_this(), location, e.message());
+ }
+
+ void error(const char* location, const boost::system::system_error& ex, bool echo = true)
+ {
+ log::ERROR("handler<%1%>::%2%: %3%", shared_from_this(), location, ex.code().message());
+
+ if (echo)
+ {
+ buf_ = "SERVER_ERROR " + ex.code().message() + "\r\n";
+ boost::asio::async_write(
+ socket_, boost::asio::buffer(buf_), boost::bind(&handler::hang_up, shared_from_this(), _1, _2));
+ }
+ }
- void finalize(const boost::system::error_code& e, size_t bytes_transferred);
+ void hang_up(const boost::system::error_code& e, size_t bytes_transferred) {}
const queue::size_type chunk_size_;
@@ -92,8 +123,8 @@ class handler : public boost::enable_shared_from_this<handler>
std::string buf_;
request req_;
- boost::optional<iqstream> pop_stream_;
- boost::optional<oqstream> push_stream_;
+ iqstream pop_stream_;
+ oqstream push_stream_;
};
} // darner
32 include/darner/queue/iqstream.h
View
@@ -1,7 +1,7 @@
#ifndef __DARNER_QUEUE_IQSTREAM_H__
#define __DARNER_QUEUE_IQSTREAM_H__
-#include <boost/optional.hpp>
+#include <boost/shared_ptr.hpp>
#include "darner/queue/queue.h"
@@ -12,20 +12,25 @@ class iqstream
public:
/*
- * tries to open an item immediately. reads will fail if it couldn't open an item.
+ * destroying an open iqstream will close it with erase = false
*/
- iqstream(queue& _queue);
+ ~iqstream();
/*
- * on the first read, tries to fetch an item and returns true if one was available
+ * tries to open an item for reading. returns true if an item was available.
+ */
+ bool open(boost::shared_ptr<queue> queue);
+
+ /*
+ * reads a chunk
* can continue calling read until eof (until tell() == size()).
*/
- bool read(std::string& result);
+ void read(std::string& result);
/*
- * closes the iqstream. if remove, completes the pop of the item off the queue, otherwise returns it
+ * closes the iqstream. if erase, completes the pop of the item off the queue, otherwise returns it.
*/
- void close(bool remove);
+ void close(bool erase);
/*
* returns the position in the stream in bytes. only valid after first read()
@@ -35,14 +40,19 @@ class iqstream
/*
* returns the size of the item. only valid after first read()
*/
- queue::size_type size() const { return header_ ? header_->size : tell_; }
+ queue::size_type size() const { return header_.size; }
+
+ /*
+ * returns true if open
+ */
+ operator bool() const { return queue_; }
private:
- queue& queue_;
+ boost::shared_ptr<queue> queue_;
- boost::optional<queue::id_type> id_; // id of key in queue, only set after a succesful first read
- boost::optional<queue::header_type> header_; // only set if it's multi-chunk
+ queue::id_type id_; // id of key in queue, only valid if open() succeeded
+ queue::header_type header_; // only valid if it's multi-chunk
queue::size_type chunk_pos_;
queue::size_type tell_;
};
28 include/darner/queue/oqstream.h
View
@@ -1,8 +1,7 @@
#ifndef __DARNER_QUEUE_OQSTREAM_H__
#define __DARNER_QUEUE_OQSTREAM_H__
-#include <boost/optional.hpp>
-#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
#include "darner/queue/queue.h"
@@ -12,32 +11,39 @@ class oqstream
{
public:
- oqstream(queue& _queue, queue::size_type chunks);
+ /*
+ * destroying an unfinished oqstream will cancel it
+ */
+ ~oqstream();
+
+ /*
+ * immediately opens an oqstream for writing. the stream will automatically close after chunks_count chunks
+ * have been written
+ */
+ void open(boost::shared_ptr<queue> queue, queue::size_type chunks_count);
/*
* writes a chunk of the item. fails if more chunks are written than originally reserved.
*/
- void write(const std::string& value);
+ void write(const std::string& chunk);
/*
- * cancels the oqstream write. only available to mutli-chunks that haven't written all their chunks yet.
+ * cancels the oqstream write. only available if the stream hasn't written chunks_count chunks yet
*/
void cancel();
/*
* returns the position in the stream in bytes.
*/
- queue::size_type tell() const { return tell_; }
+ queue::size_type tell() const { return header_.size; }
private:
- queue& queue_;
- queue::size_type chunks_;
+ boost::shared_ptr<queue> queue_;
- boost::optional<queue::id_type> id_; // id of key in queue, only set after all chunks are written
- boost::optional<queue::header_type> header_; // only set if it's multi-chunk
+ queue::id_type id_; // id of key in queue, only set after all chunks are written
+ queue::header_type header_; // only set if it's multi-chunk
queue::size_type chunk_pos_;
- queue::size_type tell_;
};
} // darner
81 include/darner/queue/queue.h
View
@@ -8,7 +8,6 @@
#include <boost/array.hpp>
#include <boost/ptr_container/ptr_list.hpp>
#include <boost/scoped_ptr.hpp>
-#include <boost/optional.hpp>
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
@@ -22,11 +21,10 @@ namespace darner {
* queue is a fifo queue that is O(log(queue size / cache size)) for pushing/popping. it boasts these features:
*
* - an evented wait semantic for queue poppers
- * - items are first checked out, then later deleted or returned back into the queue
+ * - popping is two-phase with a begin and an end. ending a pop can either erase it or return it back to the queue.
* - large items are streamed in a chunk at a time
*
- * queue will post events such as journal writes and waits to a provided boost::asio io_service. interrupting the
- * io_service with pending events is okay - queue is never in an inconsistent state between io events.
+ * all queue methods are synchronous except for wait(), which starts an async timer on the provided io_service.
*
* queue is not thread-safe, it assumes a single-thread calling and operating the provided io_service
*/
@@ -34,15 +32,17 @@ class queue
{
public:
+ // a queue can only ever have a backlog of 2^64 items. so at darner's current peak throughput you can only run the
+ // server for 23 million years :(
typedef boost::uint64_t id_type;
typedef boost::uint64_t size_type;
- typedef boost::function<void (const boost::system::error_code& error)> success_callback;
+ typedef boost::function<void (const boost::system::error_code& error)> wait_callback;
// open or create the queue at the path
queue(boost::asio::io_service& ios, const std::string& path);
// wait up to wait_ms milliseconds for an item to become available, then call cb with success or timeout
- void wait(size_type wait_ms, const success_callback& cb);
+ void wait(size_type wait_ms, const wait_callback& cb);
// returns the number of items in the queue
size_type count() const;
@@ -60,7 +60,7 @@ class queue
{
public:
- header_type() : beg(0), end(0), size(0) {}
+ header_type() : beg(0), end(1), size(0) {}
header_type(id_type _beg, id_type _end, size_type _size)
: beg(_beg), end(_end), size(_size) {}
header_type(const std::string& buf)
@@ -79,43 +79,50 @@ class queue
mutable std::string buf_;
};
- // queue methods:
+ // queue methods aren't meant to be used directly. instead create an iqstream or oqstream to use it
/*
- * pushes a value to to the queue. returns true for success, false if there was a problem writing to the journal
+ * pushes an item to to the queue.
*/
- void push(boost::optional<id_type>& result, const std::string& value);
+ void push(id_type& result, const std::string& item);
/*
- * pushes a header to to the queue. call this after inserting a range of data chunks.
+ * pushes a header to to the queue. a header points to a range of chunks in a multi-chunk item.
*/
- void push(boost::optional<id_type>& result, const header_type& value);
+ void push(id_type& result, const header_type& header);
/*
- * begins the popping of an item. if the item is a single chunk, pops the value, otherwise just pops the
- * header. will wait wait_ms milliseconds for an item to become available before either returning a succes
- * or timeout status to the callback cb
+ * begins popping an item. if no items are available, immediately returns false. once an item pop is begun,
+ * it is owned solely by the caller, and must eventually be pop_ended. pop_begin is constant time.
*/
- bool pop_open(boost::optional<id_type>& result_id, boost::optional<header_type>& result_header,
- std::string& result_value);
+ bool pop_begin(id_type& result);
/*
- * finishes the popping of an item. if remove = true, deletes the dang ol' item, otherwise returns it
- * back into the queue
+ * once has a pop has begun, call pop_read. if the item is just one chunk (end - beg < 2), result_item will be
+ * immediately populated, otherwise fetch the chunks in the header's range [beg, end).
*/
- void pop_close(bool remove, id_type id, const boost::optional<header_type>& header);
+ void pop_read(std::string& result_item, header_type& result_header, id_type id);
+
+ /*
+ * finishes the popping of an item. if erase = true, deletes the dang ol' item, otherwise returns it
+ * back to its position near the tail of the queue. closing an item with erase = true is constant time, but
+ * closing an item with erase = false could take logn time and linear memory for # returned items.
+ *
+ * the simplest way to address this is to limit the number of items that can be opened at once.
+ */
+ void pop_end(bool erase, id_type id, const header_type& header);
// chunk methods:
/*
- * returns a header with a range of reserved chunks
+ * returns to a header a range of reserved chunks
*/
- void reserve_chunks(boost::optional<header_type>& result, size_type chunks);
+ void reserve_chunks(header_type& result, size_type count);
/*
* writes a chunk
*/
- void write_chunk(const std::string& value, id_type chunk_key);
+ void write_chunk(const std::string& chunk, id_type chunk_key);
/*
* reads a chunk
@@ -123,7 +130,7 @@ class queue
void read_chunk(std::string& result, id_type chunk_key);
/*
- * removes all chunks referred to by a header
+ * removes all chunks referred to by a header. use this when aborting a multi-chunk push.
*/
void erase_chunks(const header_type& header);
@@ -157,13 +164,13 @@ class queue
// ties a set of results to a deadline timer
struct waiter
{
- waiter(boost::asio::io_service& ios, size_type wait_ms, const success_callback& _cb)
+ waiter(boost::asio::io_service& ios, size_type wait_ms, const wait_callback& _cb)
: cb(_cb),
timer(ios, boost::posix_time::milliseconds(wait_ms))
{
}
- success_callback cb;
+ wait_callback cb;
boost::asio::deadline_timer timer;
};
@@ -183,9 +190,29 @@ class queue
// any operation that mutates the queue or the waiter state should run this to crank any pending events
void spin_waiters();
- // fires either if timer times out, or is canceled
+ // fires either if timer times out or is canceled
void waiter_timeout(const boost::system::error_code& e, boost::ptr_list<waiter>::iterator waiter_it);
+ // some leveldb sugar:
+
+ void put(const key_type& key, const std::string& value)
+ {
+ if (!journal_->Put(leveldb::WriteOptions(), key.slice(), value).ok())
+ throw boost::system::system_error(boost::system::errc::io_error, boost::system::system_category());
+ }
+
+ void get(const key_type& key, std::string& result)
+ {
+ if (!journal_->Get(leveldb::ReadOptions(), key.slice(), &result).ok())
+ throw boost::system::system_error(boost::system::errc::io_error, boost::system::system_category());
+ }
+
+ void write(leveldb::WriteBatch& batch)
+ {
+ if (!journal_->Write(leveldb::WriteOptions(), &batch).ok())
+ throw boost::system::system_error(boost::system::errc::io_error, boost::system::system_category());
+ }
+
boost::scoped_ptr<comparator> cmp_;
boost::scoped_ptr<leveldb::DB> journal_;
28 include/darner/util/queue_map.hpp
View
@@ -2,9 +2,10 @@
#define __DARNER_QUEUE_MAP_HPP__
#include <string>
+#include <map>
#include <boost/asio.hpp>
-#include <boost/ptr_container/ptr_map.hpp>
+#include <boost/shared_ptr.hpp>
#include <boost/filesystem/operations.hpp>
#include "darner/queue/queue.h"
@@ -14,10 +15,14 @@ namespace darner {
// maps a queue name to a queue instance, reloads queues
class queue_map
{
+private:
+
+ typedef std::map<std::string, boost::shared_ptr<queue> > container_type;
+
public:
- typedef boost::ptr_map<std::string, queue>::iterator iterator;
- typedef boost::ptr_map<std::string, queue>::const_iterator const_iterator;
+ typedef container_type::iterator iterator;
+ typedef container_type::const_iterator const_iterator;
queue_map(boost::asio::io_service& ios, const std::string& data_path)
: data_path_(data_path), ios_(ios)
@@ -26,19 +31,22 @@ class queue_map
for (boost::filesystem::directory_iterator it(data_path_); it != end_it; ++it)
{
std::string queue_name = it->path().leaf().string();
- queues_.insert(queue_name, new queue(ios_, it->path().string()));
+ boost::shared_ptr<queue> p(new queue(ios_, (data_path_ / queue_name).string()));
+ queues_.insert(container_type::value_type(queue_name, p));
}
}
- queue& operator[] (const std::string& queue_name)
+ boost::shared_ptr<queue> operator[] (const std::string& queue_name)
{
- boost::ptr_map<std::string, queue>::iterator it = queues_.find(queue_name);
+ iterator it = queues_.find(queue_name);
+
if (it == queues_.end())
{
- std::string q(queue_name); // some strange ptr_map limitation, needs non-const key
- it = queues_.insert(q, new queue(ios_, (data_path_ / queue_name).string())).first;
+ boost::shared_ptr<queue> p(new queue(ios_, (data_path_ / queue_name).string()));
+ it = queues_.insert(container_type::value_type(queue_name, p)).first;
}
- return *it->second;
+
+ return it->second;
}
iterator begin() { return queues_.begin(); }
@@ -48,7 +56,7 @@ class queue_map
private:
- boost::ptr_map<std::string, queue> queues_;
+ std::map<std::string, boost::shared_ptr<queue> > queues_;
boost::filesystem::path data_path_;
boost::asio::io_service& ios_;
194 src/net/handler.cpp
View
@@ -1,6 +1,5 @@
#include "darner/net/handler.h"
-#include <sstream>
#include <boost/array.hpp>
using namespace std;
@@ -38,31 +37,24 @@ void handler::start()
void handler::read_request(const system::error_code& e, size_t bytes_transferred)
{
if (e)
- {
- log::ERROR("handler<%1%>::read_request: %2%", shared_from_this(), e.message());
- return done(false);
- }
+ return error("read_request", e);
- async_read_until(
- socket_, in_, '\n',
- bind(&handler::parse_request, shared_from_this(), _1, _2));
+ async_read_until(socket_, in_, '\n', bind(&handler::parse_request, shared_from_this(), _1, _2));
}
void handler::parse_request(const system::error_code& e, size_t bytes_transferred)
{
- if (e)
- {
- if (e != error::eof || in_.size()) // clean close by client?
- log::ERROR("handler<%1%>::handle_read_request: %2%", shared_from_this(), e.message());
- return done(false);
- }
+ if (e == error::eof && !in_.size()) // clean close by client?
+ return;
+ else if (e)
+ return error("parse_request", e);
// TODO: it would have been nice to pass in an buffers_iterator directly to spirit, but
// something constness thing about the iterator_traits::value_type is borking up being able to use it
asio::streambuf::const_buffers_type bufs = in_.data();
buf_.assign(buffers_begin(bufs), buffers_begin(bufs) + bytes_transferred);
if (!parser_.parse(req_, buf_))
- return done(false, "ERROR\r\n");
+ return error("");
in_.consume(bytes_transferred);
switch (req_.type)
@@ -71,8 +63,8 @@ void handler::parse_request(const system::error_code& e, size_t bytes_transferre
case request::RT_VERSION: write_version(); break;
case request::RT_FLUSH: flush(); break;
case request::RT_FLUSH_ALL: flush_all(); break;
- case request::RT_SET: set(); break;
- case request::RT_GET: get(); break;
+ case request::RT_SET: ++stats_.cmd_sets; set(); break;
+ case request::RT_GET: ++stats_.cmd_gets; get(); break;
}
}
@@ -92,7 +84,8 @@ void handler::write_stats()
void handler::write_version()
{
- done(true, "VERSION " + string(DARNER_VERSION) + "\r\n");
+ buf_ = "VERSION " DARNER_VERSION "\r\n";
+ async_write(socket_, buffer(buf_), bind(&handler::read_request, shared_from_this(), _1, _2));
}
void handler::flush()
@@ -107,11 +100,9 @@ void handler::flush_all()
void handler::set()
{
- ++stats_.cmd_sets;
-
// round up the number of chunks we need, and fetch \r\n if it's just one chunk
- push_stream_ = in_place(ref(queues_[req_.queue]), (req_.num_bytes + chunk_size_ - 1) / chunk_size_);
- queue::size_type remaining = req_.num_bytes - push_stream_->tell();
+ push_stream_.open(queues_[req_.queue], (req_.num_bytes + chunk_size_ - 1) / chunk_size_);
+ queue::size_type remaining = req_.num_bytes - push_stream_.tell();
queue::size_type required = remaining > chunk_size_ ? chunk_size_ : remaining + 2;
async_read(
@@ -122,19 +113,16 @@ void handler::set()
void handler::set_on_read_chunk(const system::error_code& e, size_t bytes_transferred)
{
if (e)
- {
- log::ERROR("handler<%1%>::set_on_read_chunk: %2%", shared_from_this(), e.message());
- return done(false);
- }
+ return error("set_on_read_chunk", e);
asio::streambuf::const_buffers_type bufs = in_.data();
- queue::size_type bytes_remaining = req_.num_bytes - push_stream_->tell();
+ queue::size_type bytes_remaining = req_.num_bytes - push_stream_.tell();
if (bytes_remaining <= chunk_size_) // last chunk! make sure it ends with \r\n
{
buf_.assign(buffers_begin(bufs) + bytes_remaining, buffers_begin(bufs) + bytes_remaining + 2);
if (buf_ != "\r\n")
- return done(false, "CLIENT_ERROR bad data chunk\r\n");
+ return error("bad data chunk", "CLIENT_ERROR");
buf_.assign(buffers_begin(bufs), buffers_begin(bufs) + bytes_remaining);
in_.consume(bytes_remaining + 2);
}
@@ -146,23 +134,21 @@ void handler::set_on_read_chunk(const system::error_code& e, size_t bytes_transf
try
{
- push_stream_->write(buf_);
+ push_stream_.write(buf_);
}
catch (const system::system_error& ex)
{
- log::ERROR("handler<%1%>::set_on_write_chunk: %2%", shared_from_this(), ex.code().message());
- return done(false, "SERVER_ERROR " + ex.code().message() + "\r\n");
+ return error("set_on_read_chunk", ex);
}
- if (push_stream_->tell() == req_.num_bytes) // are we all done?
+ if (push_stream_.tell() == req_.num_bytes) // are we all done?
{
- push_stream_.reset();
++stats_.items_enqueued;
- return done(true, "STORED\r\n");
+ return end("STORED\r\n");
}
// otherwise, second verse, same as the first
- queue::size_type remaining = req_.num_bytes - push_stream_->tell();
+ queue::size_type remaining = req_.num_bytes - push_stream_.tell();
queue::size_type required = remaining > chunk_size_ ? chunk_size_ : remaining + 2;
async_read(
socket_, in_, transfer_at_least(required > in_.size() ? required - in_.size() : 0),
@@ -171,80 +157,60 @@ void handler::set_on_read_chunk(const system::error_code& e, size_t bytes_transf
void handler::get()
{
- ++stats_.cmd_gets;
-
if (req_.get_abort && (req_.get_open || req_.get_close))
- return done(false, "CLIENT_ERROR abort must be by itself\r\n");
+ return error("abort must be by itself", "CLIENT_ERROR");
- if (pop_stream_) // before getting the next item, close this one
+ if (req_.get_abort || req_.get_close)
{
- if (!req_.get_close && !req_.get_abort) // needs to be a close or an abort
- return done(false, "CLIENT_ERROR close current item first\r\n");
-
try
{
- pop_stream_->close(req_.get_close);
+ pop_stream_.close(req_.get_close);
}
catch (const system::system_error& ex)
{
- log::ERROR("handler<%1%>::get: %2%", shared_from_this(), ex.code().message());
- return done(false, "SERVER_ERROR " + ex.code().message() + "\r\n");
+ return error("get", ex);
}
-
- pop_stream_.reset();
}
+ else if (pop_stream_)
+ return error("close current item first", "CLIENT_ERROR");
if (req_.get_abort)
- return done(true, "END\r\n"); // aborts go no further
+ return end(); // aborts go no further
- pop_stream_ = in_place(ref(queues_[req_.queue]));
-
- if (!pop_stream_->read(buf_))
+ if (!pop_stream_.open(queues_[req_.queue]))
{
- // couldn't read... can we at least wait?
- if (req_.wait_ms)
- queues_[req_.queue].wait(req_.wait_ms, bind(&handler::get_on_queue_return, shared_from_this(), _1));
+ if (req_.wait_ms) // couldn't read... can we at least wait?
+ queues_[req_.queue]->wait(req_.wait_ms, bind(&handler::get_on_queue_return, shared_from_this(), _1));
else
- {
- pop_stream_.reset();
- return done(true, "END\r\n");
- }
+ return end();
}
- else
- write_first_chunk();
+
+ try
+ {
+ pop_stream_.read(buf_);
+ }
+ catch (const system::system_error& ex)
+ {
+ return error("get", ex);
+ }
+
+ write_first_chunk();
}
void handler::get_on_queue_return(const boost::system::error_code& e)
{
if (e == asio::error::timed_out)
- {
- pop_stream_.reset();
- return done(true, "END\r\n");
- }
+ return end();
else if (e)
- {
- pop_stream_.reset();
- log::ERROR("handler<%1%>::get_on_queue_return: %2%", shared_from_this(), e.message());
- return done(false, "SERVER_ERROR " + e.message() + "\r\n");
- }
+ return error("get_on_queue_return", e);
else
- {
- if (!pop_stream_->read(buf_))
- {
- // well this is very unusual. the queue woke us up but nothing is available.
- pop_stream_.reset();
- log::ERROR("handler<%1%>::get_on_queue_return: %2%", shared_from_this(), "bad queue_return");
- return done(false, "SERVER_ERROR bad queue return\r\n");
- }
-
- write_first_chunk();
- }
+ get();
}
void handler::write_first_chunk()
{
ostringstream oss;
- oss << "VALUE " << req_.queue << " 0 " << pop_stream_->size() << "\r\n";
+ oss << "VALUE " << req_.queue << " 0 " << pop_stream_.size() << "\r\n";
header_buf_ = oss.str();
array<const_buffer, 2> bufs = {{ buffer(header_buf_), buffer(buf_) }};
@@ -254,86 +220,36 @@ void handler::write_first_chunk()
void handler::get_on_write_chunk(const boost::system::error_code& e, size_t bytes_transferred)
{
if (e)
- {
- log::ERROR("handler<%1%>::get_on_write_chunk: %2%", shared_from_this(), e.message());
- return done(false);
- }
+ return error("get_on_write_chunk", e, false);
- if (pop_stream_->tell() == pop_stream_->size())
+ if (pop_stream_.tell() == pop_stream_.size())
{
if (!req_.get_open)
{
try
{
- pop_stream_->close(true);
+ pop_stream_.close(true);
}
catch (const system::system_error& ex)
{
- log::ERROR("handler<%1%>::get_on_write_chunk: %2%", shared_from_this(), ex.code().message());
- return done(false);
+ return error("get_on_write_chunk", ex, false);
}
- pop_stream_.reset();
}
++stats_.items_dequeued;
- return done(true, "\r\nEND\r\n");
- }
- else
- {
- try
- {
- pop_stream_->read(buf_);
- }
- catch (const system::system_error& ex)
- {
- log::ERROR("handler<%1%>::get_on_write_chunk: %2%", shared_from_this(), ex.code().message());
- return done(false);
- }
- async_write(socket_, buffer(buf_), bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2));
- }
-}
-void handler::done(bool success, const std::string& msg /* = "" */)
-{
- if (!msg.empty())
- {
- buf_ = msg;
- if (success)
- async_write(socket_, buffer(buf_), bind(&handler::read_request, shared_from_this(), _1, _2));
- else
- async_write(socket_, buffer(buf_), bind(&handler::finalize, shared_from_this(), _1, _2));
+ return end("\r\nEND\r\n");
}
else
{
- if (success)
- read_request(system::error_code(), 0);
- else
- finalize(system::error_code(), 0);
- }
-}
-
-void handler::finalize(const system::error_code& e, size_t bytes_transferred)
-{
- if (pop_stream_)
- {
try
{
- pop_stream_->close(false);
+ pop_stream_.read(buf_);
}
catch (const system::system_error& ex)
{
- log::ERROR("handler<%1%>::finalize: %2%", shared_from_this(), ex.code().message());
+ return error("get_on_write_chunk", ex, false);
}
- }
- if (push_stream_)
- {
- try
- {
- push_stream_->cancel();
- }
- catch (const system::system_error& ex)
- {
- log::ERROR("handler<%1%>::finalize: %2%", shared_from_this(), ex.code().message());
- }
+ async_write(socket_, buffer(buf_), bind(&handler::get_on_write_chunk, shared_from_this(), _1, _2));
}
}
64 src/queue/iqstream.cpp
View
@@ -1,54 +1,60 @@
#include "darner/queue/iqstream.h"
#include <boost/asio.hpp>
-#include <boost/bind.hpp>
-
-#include <leveldb/write_batch.h>
using namespace std;
using namespace boost;
using namespace darner;
-iqstream::iqstream(queue& _queue)
-: queue_(_queue),
- chunk_pos_(0),
- tell_(0)
+iqstream::~iqstream()
{
+ if (queue_)
+ queue_->pop_end(false, id_, header_);
}
-bool iqstream::read(string& result)
+bool iqstream::open(shared_ptr<queue> queue)
{
- if (!id_) // should we try to fetch a queue item?
- {
- if (!queue_.pop_open(id_, header_, result))
- return false;
+ if (queue_)
+ throw system::system_error(asio::error::already_open); // can't open what's open
- if (!header_) // not multi-chunk? we're done!
- {
- tell_ += result.size();
- return true;
- }
+ if (!queue->pop_begin(id_))
+ return false;
- chunk_pos_ = header_->beg;
- }
+ queue_ = queue;
+ header_ = queue::header_type();
+ chunk_pos_ = header_.beg;
+ tell_ = 0;
+
+ return true;
+}
- // if we have an id already, and are still requesting more reads, we must be multi-chunk
- // make sure that's the cast, and that we haven't read past the end
- if (!header_ || chunk_pos_ >= header_->end)
+void iqstream::read(string& result)
+{
+ // not open or already read past end
+ if (!queue_ || chunk_pos_ >= header_.end)
throw system::system_error(asio::error::eof);
- queue_.read_chunk(result, chunk_pos_);
+ if (!chunk_pos_) // first read? check if there's a header
+ {
+ queue_->pop_read(result, header_, id_);
+ if (header_.end > 1)
+ chunk_pos_ = header_.beg;
+ else
+ header_.size = result.size();
+ }
+
+ if (header_.end > 1) // multi-chunk? get the next chunk!
+ queue_->read_chunk(result, chunk_pos_);
++chunk_pos_;
tell_ += result.size();
-
- return true;
}
-void iqstream::close(bool remove)
+void iqstream::close(bool erase)
{
- if (!id_)
- throw system::system_error(asio::error::not_found); // can't close something we haven't opened
+ if (!queue_)
+ return; // it's not an error to close more than once
- queue_.pop_close(remove, *id_, header_);
+ queue_->pop_end(erase, id_, header_);
+ queue_.reset();
}
82 src/queue/oqstream.cpp
View
@@ -1,60 +1,74 @@
#include "darner/queue/oqstream.h"
+#include <stdexcept>
+
#include <boost/asio.hpp>
-#include <boost/bind.hpp>
-#include <leveldb/write_batch.h>
+#include "darner/util/log.h"
using namespace std;
using namespace boost;
using namespace darner;
-oqstream::oqstream(queue& _queue, queue::size_type chunks)
-: queue_(_queue),
- chunks_(chunks),
- chunk_pos_(0),
- tell_(0)
+oqstream::~oqstream()
{
+ if (queue_)
+ {
+ try
+ {
+ cancel();
+ }
+ catch (const std::exception& e) // we're in the dtor! we have to swallow everything
+ {
+ log::ERROR("oqstream::~oqstream(): %1%", e.what());
+ }
+ catch (...)
+ {
+ log::ERROR("oqstream::~oqstream(): unknown error");
+ }
+ }
}
-void oqstream::write(const std::string& value)
+void oqstream::open(boost::shared_ptr<queue> queue, queue::size_type chunks_count)
{
- if (id_) // have an id already? that's a paddlin'
- throw system::system_error(asio::error::eof);
+ if (queue_) // already open? that's a paddlin'
+ throw system::system_error(asio::error::already_open);
- if (chunks_ == 1) // just one chunk? push it on
- {
- queue_.push(id_, value);
- tell_ += value.size();
- return;
- }
+ queue_ = queue;
+ header_ = queue::header_type();
+ if (chunks_count > 1)
+ queue_->reserve_chunks(header_, chunks_count);
+ chunk_pos_ = header_.beg;
+}
- if (!header_) // reserve a swath of chunks if we haven't already
- {
- queue_.reserve_chunks(header_, chunks_);
- chunk_pos_ = header_->beg;
- }
+void oqstream::write(const std::string& chunk)
+{
+ if (!queue_ || chunk_pos_ == header_.end)
+ throw system::system_error(asio::error::eof);
- queue_.write_chunk(value, chunk_pos_);
+ if (header_.end <= 1) // just one chunk? push it on
+ queue_->push(id_, chunk);
+ else
+ queue_->write_chunk(chunk, chunk_pos_);
- tell_ += value.size();
- header_->size += value.size();
+ header_.size += chunk.size();
- if (++chunk_pos_ == header_->end) // time to push the item?
- queue_.push(id_, *header_);
+ if (++chunk_pos_ == header_.end) // time to close up shop?
+ {
+ if (header_.end > 1) // multi-chunk? push the header
+ queue_->push(id_, header_);
+ queue_.reset();
+ }
}
void oqstream::cancel()
{
- if (id_) // can't cancel if we already pushed all the chunks
- throw system::system_error(asio::error::invalid_argument);
-
- if (!header_) // nothing wrong with canceling nothing
- return;
+ if (!queue_)
+ throw system::system_error(asio::error::eof); // not gon' do it
- queue_.erase_chunks(*header_);
+ if (header_.end > 1) // multi-chunk? erase them
+ queue_->erase_chunks(header_);
- header_.reset();
- chunk_pos_ = tell_ = 0;
+ queue_.reset();
}
90 src/queue/queue.cpp
View
@@ -46,7 +46,7 @@ queue::queue(asio::io_service& ios, const string& path)
}
}
-void queue::wait(size_type wait_ms, const success_callback& cb)
+void queue::wait(size_type wait_ms, const wait_callback& cb)
{
ptr_list<waiter>::iterator it = waiters_.insert(waiters_.end(), new waiter(ios_, wait_ms, cb));
it->timer.async_wait(bind(&queue::waiter_timeout, this, asio::placeholders::error, it));
@@ -66,82 +66,77 @@ void queue::write_stats(const string& name, ostringstream& out) const
// protected:
-void queue::push(optional<id_type>& result, const string& value)
+void queue::push(id_type& result, const string& item)
{
- // values that end in 0 are escaped to (0, 0), so we can distinguish them from headers (which end in (1, 0))
- if (value[value.size() - 1] == '\0')
- {
- string new_value = value + '\0';
- if (!journal_->Put(leveldb::WriteOptions(), queue_head_.slice(), new_value).ok())
- throw system::system_error(system::errc::io_error, system::system_category());
- }
- else if (!journal_->Put(leveldb::WriteOptions(), queue_head_.slice(), value).ok())
- throw system::system_error(system::errc::io_error, system::system_category());
+ // items that end in 0 are escaped to (0, 0), so we can distinguish them from headers (which end in (1, 0))
+ if (item[item.size() - 1] == '\0')
+ put(queue_head_, item + '\0');
+ else
+ put(queue_head_, item);
result = queue_head_.id++;
spin_waiters(); // in case there's a waiter waiting for this new item
}
-void queue::push(optional<id_type>& result, const header_type& value)
+void queue::push(id_type& result, const header_type& header)
{
- if (!journal_->Put(leveldb::WriteOptions(), queue_head_.slice(), value.str()).ok())
- throw system::system_error(system::errc::io_error, system::system_category());
+ put(queue_head_.slice(), header.str());
result = queue_head_.id++;
spin_waiters(); // in case there's a waiter waiting for this new item
}
-bool queue::pop_open(optional<id_type>& result_id, optional<header_type>& result_header, string& result_value)
+bool queue::pop_begin(id_type& result)
{
if (!returned_.empty())
{
- result_id = *returned_.begin();
+ result = *returned_.begin();
returned_.erase(returned_.begin());
}
else if (queue_tail_.id != queue_head_.id)
- result_id = queue_tail_.id++;
+ result = queue_tail_.id++;
else
return false;
- if (!journal_->Get(leveldb::ReadOptions(), key_type(key_type::KT_QUEUE, *result_id).slice(), &result_value).ok())
- throw system::system_error(system::errc::io_error, system::system_category());
+ return true;
+}
+
+void queue::pop_read(std::string& result_item, header_type& result_header, id_type id)
+{
+ get(key_type(key_type::KT_QUEUE, id), result_item);
+
+ result_header = header_type();
// check the escapes
- if (result_value.size() > 2 && result_value[result_value.size() - 1] == '\0')
+ if (result_item.size() > 2 && result_item[result_item.size() - 1] == '\0')
{
- if (result_value[result_value.size() - 2] == '\1') // \1 \0 means header
- {
- result_header = header_type(result_value);
- if (!journal_->Get(leveldb::ReadOptions(), key_type(key_type::KT_CHUNK, result_header->beg).slice(),
- &result_value).ok())
- throw system::system_error(system::errc::io_error, system::system_category());
- }
- else if (result_value[result_value.size() - 2] == '\0') // \0 \0 means escaped \0
- result_value.resize(result_value.size() - 1);
+ if (result_item[result_item.size() - 2] == '\1') // \1 \0 means header
+ result_header = header_type(result_item);
+ else if (result_item[result_item.size() - 2] == '\0') // \0 \0 means escaped \0
+ result_item.resize(result_item.size() - 1);
else
throw system::system_error(system::errc::io_error, system::system_category()); // anything else is bad data
}
++items_open_;
-
- return true;
}
-void queue::pop_close(bool remove, id_type id, const optional<header_type>& header)
+void queue::pop_end(bool erase, id_type id, const header_type& header)
{
- if (remove)
+ if (erase)
{
leveldb::WriteBatch batch;
batch.Delete(key_type(key_type::KT_QUEUE, id).slice());
- if (header)
- for (key_type k(key_type::KT_CHUNK, header->beg); k.id != header->end; ++k.id)
+ if (header.end > 1) // multi-chunk?
+ {
+ for (key_type k(key_type::KT_CHUNK, header.beg); k.id != header.end; ++k.id)
batch.Delete(k.slice());
+ }
- if (!journal_->Write(leveldb::WriteOptions(), &batch).ok())
- throw system::system_error(system::errc::io_error, system::system_category());
+ write(batch);
// leveldb is conservative about reclaiming deleted keys, of which there will be many when a queue grows and
// later shrinks. let's explicitly force it to compact every 1024 deletes
@@ -159,22 +154,20 @@ void queue::pop_close(bool remove, id_type id, const optional<header_type>& head
--items_open_;
}
-void queue::reserve_chunks(optional<header_type>& result, size_type chunks)
+void queue::reserve_chunks(header_type& result, size_type count)
{
- result = header_type(chunks_head_.id, chunks_head_.id + chunks, 0);
- chunks_head_.id += chunks;
+ result = header_type(chunks_head_.id, chunks_head_.id + count, 0);
+ chunks_head_.id += count;
}
-void queue::write_chunk(const string& value, id_type chunk_key)
+void queue::write_chunk(const string& chunk, id_type chunk_key)
{
- if (!journal_->Put(leveldb::WriteOptions(), key_type(key_type::KT_CHUNK, chunk_key).slice(), value).ok())
- throw system::system_error(system::errc::io_error, system::system_category());
+ put(key_type(key_type::KT_CHUNK, chunk_key), chunk);
}
void queue::read_chunk(string& result, id_type chunk_key)
{
- if (!journal_->Get(leveldb::ReadOptions(), key_type(key_type::KT_CHUNK, chunk_key).slice(), &result).ok())
- throw system::system_error(system::errc::io_error, system::system_category());
+ get(key_type(key_type::KT_CHUNK, chunk_key), result);
}
void queue::erase_chunks(const header_type& header)
@@ -184,18 +177,15 @@ void queue::erase_chunks(const header_type& header)
for (key_type k(key_type::KT_CHUNK, header.beg); k.id != header.end; ++k.id)
batch.Delete(k.slice());
- if (!journal_->Write(leveldb::WriteOptions(), &batch).ok())
- throw system::system_error(system::errc::io_error, system::system_category());
+ write(batch);
}
// private:
void queue::spin_waiters()
{
- while (true)
+ while (!waiters_.empty() && (!returned_.empty() || queue_tail_.id < queue_head_.id))
{
- if (waiters_.empty() || (returned_.empty() && queue_tail_.id == queue_head_.id))
- break;
ptr_list<waiter>::auto_type waiter = waiters_.release(waiters_.begin());
waiter->timer.cancel();
waiter->cb(system::error_code());
12 tests/fixtures/basic_queue.hpp
View
@@ -18,7 +18,7 @@ class basic_queue
public:
basic_queue()
- : success_cb_(boost::bind(&basic_queue::success_cb, this, _1)),
+ : wait_cb_(boost::bind(&basic_queue::wait_cb, this, _1)),
tmp_(boost::filesystem::temp_directory_path() / boost::filesystem::unique_path())
{
boost::filesystem::create_directories(tmp_);
@@ -33,12 +33,14 @@ class basic_queue
void delayed_push(std::string& value, const boost::system::error_code& error)
{
- darner::oqstream(*queue_, 1).write(value);
+ darner::oqstream oqs;
+ oqs.open(queue_, 1);
+ oqs.write(value);
}
protected:
- void success_cb(const boost::system::error_code& error)
+ void wait_cb(const boost::system::error_code& error)
{
error_ = error;
}
@@ -46,10 +48,12 @@ class basic_queue
std::string pop_value_;
boost::system::error_code error_;
- darner::queue::success_callback success_cb_;
+ darner::queue::wait_callback wait_cb_;
boost::asio::io_service ios_;
boost::shared_ptr<darner::queue> queue_;
+ darner::oqstream oqs_;
+ darner::iqstream iqs_;
boost::filesystem::path tmp_;
};
80 tests/queue.cpp
View
@@ -18,16 +18,19 @@ BOOST_FIXTURE_TEST_CASE( test_push_pop, fixtures::basic_queue )
{
string value = "I hate when I'm on a flight and I wake up with a water bottle next to me like oh great now I gotta "
"be responsible for this water bottle";
- oqstream(*queue_, 1).write(value);
- iqstream(*queue_).read(pop_value_);
+ oqs_.open(queue_, 1);
+ oqs_.write(value);
+ BOOST_REQUIRE(iqs_.open(queue_));
+
+ iqs_.read(pop_value_);
BOOST_REQUIRE_EQUAL(value, pop_value_);
}
// test having nothing to pop
BOOST_FIXTURE_TEST_CASE( test_pop_empty, fixtures::basic_queue )
{
- BOOST_REQUIRE(!iqstream(*queue_).read(pop_value_));
+ BOOST_REQUIRE(!iqs_.open(queue_));
}
// test a pop wait
@@ -37,7 +40,7 @@ BOOST_FIXTURE_TEST_CASE( test_pop_wait, fixtures::basic_queue )
"elevator sometimes, my 7 floor sanctuary";
deadline_timer timer(ios_, posix_time::milliseconds(10));
timer.async_wait(bind(&fixtures::basic_queue::delayed_push, this, ref(value), _1));
- queue_->wait(100, success_cb_);
+ queue_->wait(100, wait_cb_);
ios_.run();
BOOST_REQUIRE(!error_);
@@ -49,7 +52,7 @@ BOOST_FIXTURE_TEST_CASE( test_pop_wait_timeout, fixtures::basic_queue )
string value = "Classical music is tight yo";
deadline_timer timer(ios_, posix_time::milliseconds(50));
timer.async_wait(bind(&fixtures::basic_queue::delayed_push, this, ref(value), _1));
- queue_->wait(10, success_cb_);
+ queue_->wait(10, wait_cb_);
ios_.run();
BOOST_REQUIRE(error_);
@@ -61,11 +64,13 @@ BOOST_FIXTURE_TEST_CASE( test_queue_close_reopen, fixtures::basic_queue )
string value = "Do you know where to find marble conference tables? I’m looking to have a conference…not until I "
"get the table though";
- oqstream(*queue_, 1).write(value);
+ oqs_.open(queue_, 1);
+ oqs_.write(value);
queue_.reset(new darner::queue(ios_, (tmp_ / "queue").string()));
BOOST_REQUIRE_EQUAL(queue_->count(), 1);
- iqstream(*queue_).read(pop_value_);
+ iqs_.open(queue_);
+ iqs_.read(pop_value_);
BOOST_REQUIRE_EQUAL(pop_value_, value);
}
@@ -74,16 +79,16 @@ BOOST_FIXTURE_TEST_CASE( test_queue_count, fixtures::basic_queue )
{
string value = "NO ALCOHOL BEFORE TATTOOS";
- oqstream(*queue_, 1).write(value);
+ oqs_.open(queue_, 1);
+ oqs_.write(value);
BOOST_REQUIRE_EQUAL(queue_->count(), 1);
// even beginning a pop lowers the count...
- iqstream is(*queue_);
- is.read(pop_value_);
+ iqs_.open(queue_);
BOOST_REQUIRE_EQUAL(queue_->count(), 0);
// but returning it raises it back up
- is.close(false);
+ iqs_.close(false);
BOOST_REQUIRE_EQUAL(queue_->count(), 1);
}
@@ -92,9 +97,9 @@ BOOST_FIXTURE_TEST_CASE( test_oqstream_overflow, fixtures::basic_queue )
{
string value = "I would like to thank Julius Caesar for originating my hairstyle";
- oqstream os(*queue_, 1);
- os.write(value);
- BOOST_REQUIRE_THROW(os.write(value), system::system_error);
+ oqs_.open(queue_, 1);
+ oqs_.write(value);
+ BOOST_REQUIRE_THROW(oqs_.write(value), system::system_error);
}
// test that we can push/pop multi-chunked
@@ -102,54 +107,57 @@ BOOST_FIXTURE_TEST_CASE( test_multi_chunked, fixtures::basic_queue )
{
string value1 = "I don’t ever watch dramas on a plane... I don’t be wanting to reflect";
string value2 = "I make awesome decisions in bike stores!!!";
- oqstream os(*queue_, 2);
- os.write(value1);
+ oqs_.open(queue_, 2);
+ oqs_.write(value1);
BOOST_REQUIRE_EQUAL(queue_->count(), 0); // not ready yet...
- BOOST_REQUIRE_EQUAL(os.tell(), value1.size());
+ BOOST_REQUIRE_EQUAL(oqs_.tell(), value1.size());
- os.write(value2);
+ oqs_.write(value2);
BOOST_REQUIRE_EQUAL(queue_->count(), 1); // okay it's done
- BOOST_REQUIRE_EQUAL(os.tell(), value1.size() + value2.size());
+ BOOST_REQUIRE_EQUAL(oqs_.tell(), value1.size() + value2.size());
- // even beginning a pop lowers the count...
- iqstream is(*queue_);
- is.read(pop_value_);
- BOOST_REQUIRE_EQUAL(queue_->count(), 0);
- BOOST_REQUIRE_EQUAL(is.tell(), value1.size());
- BOOST_REQUIRE_EQUAL(is.size(), value1.size() + value2.size());
+ iqs_.open(queue_);
+ iqs_.read(pop_value_);
+ BOOST_REQUIRE_EQUAL(queue_->count(), 0); // even beginning a pop lowers the count...
+ BOOST_REQUIRE_EQUAL(iqs_.tell(), value1.size());
+ BOOST_REQUIRE_EQUAL(iqs_.size(), value1.size() + value2.size());
BOOST_REQUIRE_EQUAL(pop_value_, value1);
- is.read(pop_value_);
- BOOST_REQUIRE_EQUAL(is.tell(), value1.size() + value2.size());
+ iqs_.read(pop_value_);
+ BOOST_REQUIRE_EQUAL(iqs_.tell(), value1.size() + value2.size());
BOOST_REQUIRE_EQUAL(pop_value_, value2);
// finally, the delete
- is.close(true);
+ iqs_.close(true);
// should be nothing left to pop
- BOOST_REQUIRE(!iqstream(*queue_).read(pop_value_));
+ BOOST_REQUIRE(!iqs_.open(queue_));
}
// test that we can cancel a multi-chunked push
BOOST_FIXTURE_TEST_CASE( test_push_cancel, fixtures::basic_queue )
{
string value = "I ordered the salmon medium instead of medium well I didn’t want to ruin the magic";
- oqstream os(*queue_, 2);
- os.write(value);
+ oqs_.open(queue_, 2);
+ oqs_.write(value);
+
+ BOOST_REQUIRE_EQUAL(oqs_.tell(), value.size());
- BOOST_REQUIRE_EQUAL(os.tell(), value.size());
+ oqs_.cancel();
- os.cancel();
- BOOST_REQUIRE_EQUAL(os.tell(), 0);
+ // shouldn't be able to finish the push now
+ BOOST_REQUIRE_THROW(oqs_.write(value), system::system_error);
}
// test that we can push a value that ends in a null-zero
BOOST_FIXTURE_TEST_CASE( test_push_zero, fixtures::basic_queue )
{
string value = string("I'm sorry Taylor.") + '\0';
- oqstream(*queue_, 1).write(value);
- iqstream(*queue_).read(pop_value_);
+ oqs_.open(queue_, 1);
+ oqs_.write(value);
+ iqs_.open(queue_);
+ iqs_.read(pop_value_);
BOOST_REQUIRE_EQUAL(value, pop_value_);
}
Please sign in to comment.
Something went wrong with that request. Please try again.