Skip to content

Commit

Permalink
Merge 'Yield while building large results in Alternator - rjson::prin…
Browse files Browse the repository at this point in the history
…t, executor::batch_get_item' from Marcin Maliszkiewicz

Adds preemption points used in Alternator when:
 - sending bigger json response
 - building results for BatchGetItem

I've tested manually by inserting in preemptible sections (e.g. before `os.write`) code similar to:

    auto start  = std::chrono::steady_clock::now();
    do { } while ((std::chrono::steady_clock::now() - start) < 100ms);

and seeing reactor stall times. After the patch they
were not increasing while before they kept building up due to no preemption.

Refs #7926
Fixes #13689

Closes #12351

* github.com:scylladb/scylladb:
  alternator: remove redundant flush call in make_streamed
  utils: yield when streaming json in print()
  alternator: yield during BatchGetItem operation
  • Loading branch information
nyh committed Jun 4, 2023
2 parents 8a1334c + 0b56550 commit d2e0897
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 73 deletions.
23 changes: 11 additions & 12 deletions alternator/executor.cc
Expand Up @@ -111,7 +111,6 @@ json::json_return_type make_streamed(rjson::value&& value) {
auto lrs = std::move(rs);
try {
co_await rjson::print(*lrs, los);
co_await los.flush();
co_await los.close();
} catch (...) {
// at this point, we cannot really do anything. HTTP headers and return code are
Expand Down Expand Up @@ -2380,21 +2379,22 @@ std::optional<rjson::value> executor::describe_single_item(schema_ptr schema,
return item;
}

std::vector<rjson::value> executor::describe_multi_item(schema_ptr schema,
const query::partition_slice& slice,
const cql3::selection::selection& selection,
const query::result& query_result,
const std::optional<attrs_to_get>& attrs_to_get) {
cql3::selection::result_set_builder builder(selection, gc_clock::now());
query::result_view::consume(query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection));
future<std::vector<rjson::value>> executor::describe_multi_item(schema_ptr schema,
const query::partition_slice&& slice,
shared_ptr<cql3::selection::selection> selection,
foreign_ptr<lw_shared_ptr<query::result>> query_result,
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get) {
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
auto result_set = builder.build();
std::vector<rjson::value> ret;
for (auto& result_row : result_set->rows()) {
rjson::value item = rjson::empty_object();
describe_single_item(selection, result_row, attrs_to_get, item);
describe_single_item(*selection, result_row, *attrs_to_get, item);
ret.push_back(std::move(item));
co_await coroutine::maybe_yield();
}
return ret;
co_return ret;
}

static bool check_needs_read_before_write(const parsed::value& v) {
Expand Down Expand Up @@ -3268,8 +3268,7 @@ future<executor::request_return_type> executor::batch_get_item(client_state& cli
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable {
utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); });
std::vector<rjson::value> jsons = describe_multi_item(schema, partition_slice, *selection, *qr.query_result, *attrs_to_get);
return make_ready_future<std::vector<rjson::value>>(std::move(jsons));
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get));
});
response_futures.push_back(std::move(f));
}
Expand Down
10 changes: 5 additions & 5 deletions alternator/executor.hh
Expand Up @@ -235,11 +235,11 @@ public:
const query::result&,
const std::optional<attrs_to_get>&);

static std::vector<rjson::value> describe_multi_item(schema_ptr schema,
const query::partition_slice& slice,
const cql3::selection::selection& selection,
const query::result& query_result,
const std::optional<attrs_to_get>& attrs_to_get);
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
const query::partition_slice&& slice,
shared_ptr<cql3::selection::selection> selection,
foreign_ptr<lw_shared_ptr<query::result>> query_result,
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get);

static void describe_single_item(const cql3::selection::selection&,
const std::vector<managed_bytes_opt>&,
Expand Down
106 changes: 50 additions & 56 deletions utils/rjson.cc
Expand Up @@ -227,68 +227,62 @@ std::string print(const rjson::value& value, size_t max_nested_level) {
return std::string(buffer.GetString());
}

future<> print(const rjson::value& value, seastar::output_stream<char>& os, size_t max_nested_level) {
struct os_buffer {
seastar::output_stream<char>& _os;
temporary_buffer<char> _buf;
size_t _pos = 0;
future<> _f = make_ready_future<>();

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-local-typedefs"
using Ch = char;
#pragma GCC diagnostic pop

void send(bool try_reuse = true) {
if (_f.failed()) {
_f.get0();
}
if (!_buf.empty() && _pos > 0) {
_buf.trim(_pos);
_pos = 0;
// Note: we're assuming we're writing to a buffered output_stream (hello http server).
// If we were not, or if (http) output_stream supported mixed buffered/packed content
// it might be a good idea to instead send our buffer as a packet directly. If so, the
// buffer size should probably increase (at least after first send()).
_f = _f.then([this, buf = std::move(_buf), &os = _os, try_reuse]() mutable -> future<> {
return os.write(buf.get(), buf.size()).then([this, buf = std::move(buf), try_reuse]() mutable {
// Chances are high we just copied this to output_stream buffer, and got here
// immediately. If so, reuse the buffer.
if (try_reuse && _buf.empty() && _pos == 0) {
_buf = std::move(buf);
}
});
});
}
// This class implements RapidJSON Handler and batches Put() calls into output_stream writes.
class output_stream_buffer {
static constexpr size_t _buf_size = 512;
seastar::output_stream<char>& _os;
temporary_buffer<char> _buf = temporary_buffer<char>(_buf_size);
size_t _pos = 0;

future<> send(temporary_buffer<char> b) {
co_return co_await _os.write(b.get(), b.size());
}
public:
output_stream_buffer(seastar::output_stream<char>& os) : _os(os) {}
using Ch = char; // Used by rjson internally
future<> f = make_ready_future<>();

void Flush() {
if (f.failed()) {
f.get0();
}
void Put(char c) {
if (_pos == _buf.size()) {
send();
if (_buf.empty()) {
_buf = temporary_buffer<char>(512);
}
}
// Second note: Should consider writing directly to the buffer in output_stream
// instead of double buffering. But output_stream for a single char has higher
// overhead than the above check + once we hit a non-completed future, we'd have
// to revert to this method anyway...
*(_buf.get_write() + _pos) = c;
++_pos;
if (_pos == 0) {
return;
}
void Flush() {
send();
if (_pos < _buf_size) {
_buf.trim(_pos); // Last flush may be shorter
}
future<> finish()&& {
send(false);
return std::move(_f);
// Either we call futures right away (if they are ready) or we start growing continuations
// chain as we don't have the ability to wait here because Flush() signature is set by rjson.
f = f.then([this, b = std::move(_buf)] () mutable {
return send(std::move(b));
});
_pos = 0;
_buf = temporary_buffer<char>(_buf_size);
}

void Put(Ch c) {
if (_pos == _buf_size) {
Flush();
}
};
// Note: Should consider writing directly to the buffer in output_stream
// instead of double buffering. But output_stream for a single char has higher
// overhead than the above check + once we hit a non-completed future, we'd have
// to revert to this method anyway...
*(_buf.get_write() + _pos) = c;
++_pos;
}
};

os_buffer osb{ os };
using streamer = rapidjson::Writer<os_buffer, encoding, encoding, allocator>;
guarded_yieldable_json_handler<streamer, false, os_buffer> writer(osb, max_nested_level);
future<> print(const rjson::value& value, seastar::output_stream<char>& os, size_t max_nested_level) {
output_stream_buffer buf{ os };
using streamer = rapidjson::Writer<output_stream_buffer, encoding, encoding, allocator>;
guarded_yieldable_json_handler<streamer, false, output_stream_buffer> writer(buf, max_nested_level);
value.Accept(writer);
co_return co_await std::move(osb).finish();
buf.Flush();
// This function has to be a coroutine otherwise buf gets destroyed before all its
// continuations from buf.f finish leading to use-after-free.
co_return co_await std::move(buf.f);
}

rjson::malformed_value::malformed_value(std::string_view name, const rjson::value& value)
Expand Down

0 comments on commit d2e0897

Please sign in to comment.