Skip to content

Commit

Permalink
output_stream: add batching to zero copy interface
Browse files Browse the repository at this point in the history
Current zero copy interface is very simplistic - it immediately pushes
provided data to the data_sink. This patch changes output_stream to
accumulate data up to _size or until flush and provides a capability to
batch flushes in order to push data in bigger chunks to data_sink.

Fixes #213

Message-Id: <20161218132137.GD1950@scylladb.com>
  • Loading branch information
Gleb Natapov authored and avikivity committed Dec 18, 2016
1 parent b56e1f7 commit 68b3548
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 23 deletions.
83 changes: 60 additions & 23 deletions core/iostream-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,64 @@ future<> output_stream<CharType>::write(scattered_message<CharType> msg) {
return write(std::move(msg).release());
}

template<typename CharType>
future<>
output_stream<CharType>::zero_copy_put(net::packet p) {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
// flush in progress, wait for it to end before continuing
return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
return _fd.put(std::move(p));
});
} else {
return _fd.put(std::move(p));
}
}

// Writes @p in chunks of _size length. The last chunk is buffered if smaller.
template <typename CharType>
future<>
output_stream<CharType>::zero_copy_split_and_put(net::packet p) {
return repeat([this, p = std::move(p)] () mutable {
if (p.len() < _size) {
if (p.len()) {
_zc_bufs = std::move(p);
} else {
_zc_bufs = net::packet::make_null_packet();
}
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto chunk = p.share(0, _size);
p.trim_front(_size);
return zero_copy_put(std::move(chunk)).then([] {
return stop_iteration::no;
});
});
}

template<typename CharType>
future<> output_stream<CharType>::write(net::packet p) {
static_assert(std::is_same<CharType, char>::value, "packet works on char");

if (p.len() == 0) {
return make_ready_future<>();
}
if (p.len() != 0) {
assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");

assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
if (_zc_bufs) {
_zc_bufs.append(std::move(p));
} else {
_zc_bufs = std::move(p);
}

if (!_trim_to_size || p.len() <= _size) {
// TODO: aggregate buffers for later coalescing. Currently we flush right
// after appending the message anyway, so it doesn't matter.
return _fd.put(std::move(p));
if (_zc_bufs.len() >= _size) {
if (_trim_to_size) {
return zero_copy_split_and_put(std::move(_zc_bufs));
} else {
return zero_copy_put(std::move(_zc_bufs));
}
}
}

auto head = p.share(0, _size);
p.trim_front(_size);
return _fd.put(std::move(head)).then([this, p = std::move(p)] () mutable {
return write(std::move(p));
});
return make_ready_future<>();
}

template<typename CharType>
Expand All @@ -96,15 +133,8 @@ future<> output_stream<CharType>::write(temporary_buffer<CharType> p) {
return make_ready_future<>();
}
assert(!_end && "Mixing buffered writes and zero-copy writes not supported yet");
if (!_trim_to_size || p.size() <= _size) {
// TODO: aggregate buffers for later coalescing.
return _fd.put(std::move(p));
}
auto head = p.share(0, _size);
p.trim_front(_size);
return _fd.put(std::move(head)).then([this, p = std::move(p)] () mutable {
return write(std::move(p));
});

return write(net::packet(std::move(p)));
}

template <typename CharType>
Expand Down Expand Up @@ -287,6 +317,7 @@ output_stream<CharType>::split_and_put(temporary_buffer<CharType> buf) {
template <typename CharType>
future<>
output_stream<CharType>::write(const char_type* buf, size_t n) {
assert(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
auto bulk_threshold = _end ? (2 * _size - _end) : _size;
if (n >= bulk_threshold) {
if (_end) {
Expand Down Expand Up @@ -343,6 +374,10 @@ output_stream<CharType>::flush() {
return put(std::move(_buf)).then([this] {
return _fd.flush();
});
} else if (_zc_bufs) {
return zero_copy_put(std::move(_zc_bufs)).then([this] {
return _fd.flush();
});
}
} else {
if (_ex) {
Expand Down Expand Up @@ -396,6 +431,8 @@ output_stream<CharType>::poll_flush() {
_buf.trim(_end);
_end = 0;
f = _fd.put(std::move(_buf));
} else if(_zc_bufs) {
f = _fd.put(std::move(_zc_bufs));
}

f.then([this] {
Expand Down
3 changes: 3 additions & 0 deletions core/iostream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class output_stream final {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_sink _fd;
temporary_buffer<CharType> _buf;
net::packet _zc_bufs = net::packet::make_null_packet(); //zero copy buffers
size_t _size = 0;
size_t _begin = 0;
size_t _end = 0;
Expand All @@ -197,6 +198,8 @@ private:
future<> split_and_put(temporary_buffer<CharType> buf);
future<> put(temporary_buffer<CharType> buf);
void poll_flush();
future<> zero_copy_put(net::packet p);
future<> zero_copy_split_and_put(net::packet p);
public:
using char_type = CharType;
output_stream() = default;
Expand Down
12 changes: 12 additions & 0 deletions net/packet.hh
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ public:
packet(packet&& x, fragment frag, deleter d);
// append temporary_buffer (zero-copy)
packet(packet&& x, temporary_buffer<char> buf);
// create from temporary_buffer (zero-copy)
packet(temporary_buffer<char> buf);
// append deleter
packet(packet&& x, deleter d);

Expand Down Expand Up @@ -297,6 +299,12 @@ public:
});
return ret;
}
explicit operator bool() {
return bool(_impl);
}
static packet make_null_packet() {
return net::packet(nullptr);
}
private:
void linearize(size_t at_frag, size_t desired_size);
bool allocate_headroom(size_t size);
Expand Down Expand Up @@ -466,6 +474,10 @@ packet::packet(packet&& x, temporary_buffer<char> buf)
: packet(std::move(x), fragment{buf.get_write(), buf.size()}, buf.release()) {
}

inline
packet::packet(temporary_buffer<char> buf)
: packet(fragment{buf.get_write(), buf.size()}, buf.release()) {}

inline
void packet::append(packet&& p) {
if (!_impl->_len) {
Expand Down

0 comments on commit 68b3548

Please sign in to comment.