Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added support for sync writes through "/sync" option to "set" command

  • Loading branch information...
commit 676a865e6e661a53d608aa701f36d6458d6d4f6d 1 parent 0132f68
@anight anight authored
View
3  include/darner/net/request.h
@@ -28,6 +28,7 @@ struct request
bool get_peek;
bool get_close;
bool get_abort;
+ bool set_sync;
size_t wait_ms;
};
@@ -36,7 +37,7 @@ struct request_grammar : boost::spirit::qi::grammar<std::string::const_iterator>
request_grammar();
request req;
boost::spirit::qi::rule<std::string::const_iterator, std::string()> key_name;
- boost::spirit::qi::rule<std::string::const_iterator> stats, version, flush, flush_all, set, get_option, get, start;
+ boost::spirit::qi::rule<std::string::const_iterator> stats, version, flush, flush_all, set_option, set, get_option, get, start;
};
// grammar are expensive to construct. to be thread-safe, let's make one grammar per thread.
View
3  include/darner/queue/oqstream.h
@@ -20,7 +20,7 @@ class oqstream
* immediately opens an oqstream for writing. the stream will automatically close after chunks_count chunks
* have been written
*/
- void open(boost::shared_ptr<queue> queue, queue::size_type chunks_count);
+ void open(boost::shared_ptr<queue> queue, queue::size_type chunks_count, bool sync = false);
/*
* writes a chunk of the item. fails if more chunks are written than originally reserved.
@@ -44,6 +44,7 @@ class oqstream
queue::id_type id_; // id of key in queue, only set after all chunks are written
queue::header_type header_; // only set if it's multi-chunk
queue::size_type chunk_pos_;
+ bool sync_;
};
} // darner
View
10 include/darner/queue/queue.h
@@ -80,12 +80,12 @@ class queue
/*
* pushes an item to to the queue.
*/
- void push(id_type& result, const std::string& item);
+ void push(id_type& result, const std::string& item, bool sync);
/*
* pushes a header to to the queue. a header points to a range of chunks in a multi-chunk item.
*/
- void push(id_type& result, const header_type& header);
+ void push(id_type& result, const header_type& header, bool sync);
/*
* begins popping an item. if no items are available, immediately returns false. once an item pop is begun,
@@ -205,9 +205,11 @@ class queue
// some leveldb sugar:
- void put(const key_type& key, const std::string& value)
+ void put(const key_type& key, const std::string& value, bool sync = false)
{
- if (!journal_->Put(leveldb::WriteOptions(), key.slice(), value).ok())
+ leveldb::WriteOptions write_options;
+ write_options.sync = sync;
+ if (!journal_->Put(write_options, key.slice(), value).ok())
throw boost::system::system_error(boost::system::errc::io_error, boost::asio::error::get_system_category());
}
View
2  src/net/handler.cpp
@@ -103,7 +103,7 @@ void handler::flush_all()
void handler::set()
{
// round up the number of chunks we need, and fetch \r\n if it's just one chunk
- push_stream_.open(queues_[req_.queue], (req_.num_bytes + chunk_size_ - 1) / chunk_size_);
+ push_stream_.open(queues_[req_.queue], (req_.num_bytes + chunk_size_ - 1) / chunk_size_, req_.set_sync);
queue::size_type remaining = req_.num_bytes - push_stream_.tell();
queue::size_type required = remaining > chunk_size_ ? chunk_size_ : remaining + 2;
View
4 src/net/request.cpp
@@ -29,9 +29,13 @@ request_grammar::request_grammar()
flush_all =
lit("flush_all") [phoenix::ref(req.type) = request::RT_FLUSH_ALL];
+ set_option =
+ lit("/sync") [phoenix::ref(req.set_sync) = true];
+
set =
lit("set ") [phoenix::ref(req.type) = request::RT_SET]
>> key_name [phoenix::ref(req.queue) = _1]
+ >> *set_option
>> ' '
>> uint_ // flags (ignored)
>> ' '
View
7 src/queue/oqstream.cpp
@@ -29,11 +29,12 @@ oqstream::~oqstream()
}
}
-void oqstream::open(boost::shared_ptr<queue> queue, queue::size_type chunks_count)
+void oqstream::open(boost::shared_ptr<queue> queue, queue::size_type chunks_count, bool sync)
{
if (queue_) // already open? that's a paddlin'
throw system::system_error(asio::error::already_open);
+ sync_ = sync;
queue_ = queue;
header_ = queue::header_type();
if (chunks_count > 1)
@@ -47,7 +48,7 @@ void oqstream::write(const std::string& chunk)
throw system::system_error(asio::error::eof);
if (header_.end <= 1) // just one chunk? push it on
- queue_->push(id_, chunk);
+ queue_->push(id_, chunk, sync_);
else
queue_->write_chunk(chunk, chunk_pos_);
@@ -56,7 +57,7 @@ void oqstream::write(const std::string& chunk)
if (++chunk_pos_ == header_.end) // time to close up shop?
{
if (header_.end > 1) // multi-chunk? push the header
- queue_->push(id_, header_);
+ queue_->push(id_, header_, sync_);
queue_.reset();
}
}
View
10 src/queue/queue.cpp
@@ -73,26 +73,26 @@ void queue::write_stats(const string& name, ostringstream& out) const
// protected:
-void queue::push(id_type& result, const string& item)
+void queue::push(id_type& result, const string& item, bool sync)
{
// items that end in 0 are escaped to (0, 0), so we can distinguish them from headers (which end in (1, 0))
if (item[item.size() - 1] == '\0')
- put(queue_head_, item + '\0');
+ put(queue_head_, item + '\0', sync);
else
- put(queue_head_, item);
+ put(queue_head_, item, sync);
result = queue_head_.id++;
wake_up(); // in case there's a waiter waiting for this new item
}
-void queue::push(id_type& result, const header_type& header)
+void queue::push(id_type& result, const header_type& header, bool sync)
{
std::string buf;
header.str(buf);
- put(queue_head_.slice(), buf);
+ put(queue_head_.slice(), buf, sync);
result = queue_head_.id++;
Please sign in to comment.
Something went wrong with that request. Please try again.