Permalink
Browse files

clean up iqstream/oqstream semantics

- no more boost::optional
- explicit opened/closed state for both streams
- stream dtors handle closing/aborting
- clearer documentation of queue class
  • Loading branch information...
1 parent df419f3 commit 25f9e24a643a1622d3ad9d75a3f018c71d5dd12e @erikfrey erikfrey committed Aug 16, 2012
@@ -1,7 +1,7 @@
#ifndef __DARNER_QUEUE_IQSTREAM_H__
#define __DARNER_QUEUE_IQSTREAM_H__
-#include <boost/optional.hpp>
+#include <boost/shared_ptr.hpp>
#include "darner/queue/queue.h"
@@ -12,20 +12,25 @@ class iqstream
public:
/*
- * tries to open an item immediately. reads will fail if it couldn't open an item.
+ * destroying an open iqstream will close it with erase = false
*/
- iqstream(queue& _queue);
+ ~iqstream();
/*
- * on the first read, tries to fetch an item and returns true if one was available
+ * tries to open an item for reading. returns true if an item was available.
+ */
+ bool open(boost::shared_ptr<queue> queue);
+
+ /*
+ * reads a chunk
* can continue calling read until eof (until tell() == size()).
*/
- bool read(std::string& result);
+ void read(std::string& result);
/*
- * closes the iqstream. if remove, completes the pop of the item off the queue, otherwise returns it
+ * closes the iqstream. if erase, completes the pop of the item off the queue, otherwise returns it.
*/
- void close(bool remove);
+ void close(bool erase);
/*
* returns the position in the stream in bytes. only valid after first read()
@@ -35,14 +40,14 @@ class iqstream
/*
* returns the size of the item. only valid after first read()
*/
- queue::size_type size() const { return header_ ? header_->size : tell_; }
+ queue::size_type size() const { return header_.size; }
private:
- queue& queue_;
+ boost::shared_ptr<queue> queue_;
- boost::optional<queue::id_type> id_; // id of key in queue, only set after a succesful first read
- boost::optional<queue::header_type> header_; // only set if it's multi-chunk
+ queue::id_type id_; // id of key in queue, only valid if open() succeeded
+ queue::header_type header_; // only valid if it's multi-chunk
queue::size_type chunk_pos_;
queue::size_type tell_;
};
@@ -1,8 +1,7 @@
#ifndef __DARNER_QUEUE_OQSTREAM_H__
#define __DARNER_QUEUE_OQSTREAM_H__
-#include <boost/optional.hpp>
-#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
#include "darner/queue/queue.h"
@@ -12,32 +11,39 @@ class oqstream
{
public:
- oqstream(queue& _queue, queue::size_type chunks);
+ /*
+ * destroying an unfinished oqstream will cancel it
+ */
+ ~oqstream();
+
+ /*
+ * immediately opens an oqstream for writing. the stream will automatically close after chunks_count chunks
+ * have been written
+ */
+ void open(boost::shared_ptr<queue> queue, queue::size_type chunks_count);
/*
* writes a chunk of the item. fails if more chunks are written than originally reserved.
*/
- void write(const std::string& value);
+ void write(const std::string& chunk);
/*
- * cancels the oqstream write. only available to mutli-chunks that haven't written all their chunks yet.
+ * cancels the oqstream write. only available if the stream hasn't written chunks_count chunks yet
*/
void cancel();
/*
* returns the position in the stream in bytes.
*/
- queue::size_type tell() const { return tell_; }
+ queue::size_type tell() const { return header_.size; }
private:
- queue& queue_;
- queue::size_type chunks_;
+ boost::shared_ptr<queue> queue_;
- boost::optional<queue::id_type> id_; // id of key in queue, only set after all chunks are written
- boost::optional<queue::header_type> header_; // only set if it's multi-chunk
+ queue::id_type id_; // id of key in queue, only set after all chunks are written
+ queue::header_type header_; // only set if it's multi-chunk
queue::size_type chunk_pos_;
- queue::size_type tell_;
};
} // darner
@@ -8,7 +8,6 @@
#include <boost/array.hpp>
#include <boost/ptr_container/ptr_list.hpp>
#include <boost/scoped_ptr.hpp>
-#include <boost/optional.hpp>
#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
@@ -22,27 +21,28 @@ namespace darner {
* queue is a fifo queue that is O(log(queue size / cache size)) for pushing/popping. it boasts these features:
*
* - an evented wait semantic for queue poppers
- * - items are first checked out, then later deleted or returned back into the queue
+ * - popping is two-phase with a begin and an end. ending a pop can either erase it or return it back to the queue.
* - large items are streamed in a chunk at a time
*
- * queue will post events such as journal writes and waits to a provided boost::asio io_service. interrupting the
- * io_service with pending events is okay - queue is never in an inconsistent state between io events.
+ * all queue methods are synchronous except for wait(), which starts an async timer on the provided io_service.
*
* queue is not thread-safe, it assumes a single-thread calling and operating the provided io_service
*/
class queue
{
public:
+ // a queue can only ever have a backlog of 2^64 items. so at darner's current peak throughput you can only run the
+ // server for 23 million years :(
typedef boost::uint64_t id_type;
typedef boost::uint64_t size_type;
- typedef boost::function<void (const boost::system::error_code& error)> success_callback;
+ typedef boost::function<void (const boost::system::error_code& error)> wait_callback;
// open or create the queue at the path
queue(boost::asio::io_service& ios, const std::string& path);
// wait up to wait_ms milliseconds for an item to become available, then call cb with success or timeout
- void wait(size_type wait_ms, const success_callback& cb);
+ void wait(size_type wait_ms, const wait_callback& cb);
// returns the number of items in the queue
size_type count() const;
@@ -60,7 +60,7 @@ class queue
{
public:
- header_type() : beg(0), end(0), size(0) {}
+ header_type() : beg(0), end(1), size(0) {}
header_type(id_type _beg, id_type _end, size_type _size)
: beg(_beg), end(_end), size(_size) {}
header_type(const std::string& buf)
@@ -79,51 +79,58 @@ class queue
mutable std::string buf_;
};
- // queue methods:
+ // queue methods aren't meant to be used directly. instead create an iqstream or oqstream to use it
/*
- * pushes a value to to the queue. returns true for success, false if there was a problem writing to the journal
+ * pushes an item to to the queue.
*/
- void push(boost::optional<id_type>& result, const std::string& value);
+ void push(id_type& result, const std::string& item);
/*
- * pushes a header to to the queue. call this after inserting a range of data chunks.
+ * pushes a header to to the queue. a header points to a range of chunks in a multi-chunk item.
*/
- void push(boost::optional<id_type>& result, const header_type& value);
+ void push(id_type& result, const header_type& header);
/*
- * begins the popping of an item. if the item is a single chunk, pops the value, otherwise just pops the
- * header. will wait wait_ms milliseconds for an item to become available before either returning a succes
- * or timeout status to the callback cb
+ * begins popping an item. if no items are available, immediately returns false. once an item pop is begun,
+ * it is owned solely by the caller, and must eventually be pop_ended. pop_begin is constant time.
*/
- bool pop_open(boost::optional<id_type>& result_id, boost::optional<header_type>& result_header,
- std::string& result_value);
+ bool pop_begin(id_type& result);
/*
- * finishes the popping of an item. if remove = true, deletes the dang ol' item, otherwise returns it
- * back into the queue
+ * once has a pop has begun, call pop_read. if the item is just one chunk (end - beg < 2), result_item will be
+ * immediately populated, otherwise fetch the chunks in the header's range [beg, end).
*/
- void pop_close(bool remove, id_type id, const boost::optional<header_type>& header);
+ void pop_read(std::string& result_item, header_type& result_header, id_type id);
+
+ /*
+ * finishes the popping of an item. if erase = true, deletes the dang ol' item, otherwise returns it
+ * back to its position near the tail of the queue. closing an item with erase = true is constant time, but
+ * closing an item with erase = false could take logn time and linear memory for # returned items.
+ *
+ * the simplest way to address this is to limit the number of items that can be opened at once.
+ */
+ void pop_end(bool erase, id_type id, const header_type& header);
// chunk methods:
/*
- * returns a header with a range of reserved chunks
+ * returns to a header a range of reserved chunks
*/
- void reserve_chunks(boost::optional<header_type>& result, size_type chunks);
+ void reserve_chunks(header_type& result, size_type count);
/*
* writes a chunk
*/
- void write_chunk(const std::string& value, id_type chunk_key);
+ void write_chunk(const std::string& chunk, id_type chunk_key);
/*
* reads a chunk
*/
void read_chunk(std::string& result, id_type chunk_key);
/*
- * removes all chunks referred to by a header
+ * removes all chunks referred to by a header. use this when aborting a multi-chunk push.
*/
void erase_chunks(const header_type& header);
@@ -157,13 +164,13 @@ class queue
// ties a set of results to a deadline timer
struct waiter
{
- waiter(boost::asio::io_service& ios, size_type wait_ms, const success_callback& _cb)
+ waiter(boost::asio::io_service& ios, size_type wait_ms, const wait_callback& _cb)
: cb(_cb),
timer(ios, boost::posix_time::milliseconds(wait_ms))
{
}
- success_callback cb;
+ wait_callback cb;
boost::asio::deadline_timer timer;
};
@@ -183,9 +190,29 @@ class queue
// any operation that mutates the queue or the waiter state should run this to crank any pending events
void spin_waiters();
- // fires either if timer times out, or is canceled
+ // fires either if timer times out or is canceled
void waiter_timeout(const boost::system::error_code& e, boost::ptr_list<waiter>::iterator waiter_it);
+ // some leveldb sugar:
+
+ void put(const key_type& key, const std::string& value)
+ {
+ if (!journal_->Put(leveldb::WriteOptions(), key.slice(), value).ok())
+ throw boost::system::system_error(boost::system::errc::io_error, boost::system::system_category());
+ }
+
+ void get(const key_type& key, std::string& result)
+ {
+ if (!journal_->Get(leveldb::ReadOptions(), key.slice(), &result).ok())
+ throw boost::system::system_error(boost::system::errc::io_error, boost::system::system_category());
+ }
+
+ void write(leveldb::WriteBatch& batch)
+ {
+ if (!journal_->Write(leveldb::WriteOptions(), &batch).ok())
+ throw boost::system::system_error(boost::system::errc::io_error, boost::system::system_category());
+ }
+
boost::scoped_ptr<comparator> cmp_;
boost::scoped_ptr<leveldb::DB> journal_;
View
@@ -1,54 +1,56 @@
#include "darner/queue/iqstream.h"
#include <boost/asio.hpp>
-#include <boost/bind.hpp>
-
-#include <leveldb/write_batch.h>
using namespace std;
using namespace boost;
using namespace darner;
-iqstream::iqstream(queue& _queue)
-: queue_(_queue),
- chunk_pos_(0),
- tell_(0)
+iqstream::~iqstream()
{
+ if (queue_)
+ queue_->pop_end(false, id_, header_);
}
-bool iqstream::read(string& result)
+bool iqstream::open(shared_ptr<queue> queue)
{
- if (!id_) // should we try to fetch a queue item?
- {
- if (!queue_.pop_open(id_, header_, result))
- return false;
+ if (queue_)
+ throw system::system_error(asio::error::already_open); // can't open what's open
- if (!header_) // not multi-chunk? we're done!
- {
- tell_ += result.size();
- return true;
- }
+ if (!queue->pop_begin(id_))
+ return false;
- chunk_pos_ = header_->beg;
- }
+ queue_ = queue;
+ header_ = queue::header_type();
+ chunk_pos_ = header_.beg;
+
+ return true;
+}
- // if we have an id already, and are still requesting more reads, we must be multi-chunk
- // make sure that's the cast, and that we haven't read past the end
- if (!header_ || chunk_pos_ >= header_->end)
+void iqstream::read(string& result)
+{
+ // not open or already read past end
+ if (!queue_ || chunk_pos_ >= header_.end)
throw system::system_error(asio::error::eof);
- queue_.read_chunk(result, chunk_pos_);
+ if (!chunk_pos_) // first read? check if there's a header
+ {
+ queue_->pop_read(result, header_, id_);
+ chunk_pos_ = header_.beg;
+ }
+
+ if (header_.end > 1) // multi-chunk? get the next chunk!
+ queue_->read_chunk(result, chunk_pos_);
++chunk_pos_;
tell_ += result.size();
-
- return true;
}
-void iqstream::close(bool remove)
+void iqstream::close(bool erase)
{
- if (!id_)
- throw system::system_error(asio::error::not_found); // can't close something we haven't opened
+ if (!queue_)
+ throw system::system_error(asio::error::eof); // can't close something we haven't opened
- queue_.pop_close(remove, *id_, header_);
+ queue_->pop_end(erase, id_, header_);
+ queue_.reset();
}
Oops, something went wrong.

0 comments on commit 25f9e24

Please sign in to comment.