From f2788f5391fb74c4e81adbc54e3fbfbbcdff45f2 Mon Sep 17 00:00:00 2001 From: Marcin Maliszkiewicz Date: Mon, 19 Dec 2022 10:43:12 +0100 Subject: [PATCH 1/3] alternator: yield during BatchGetItem operation --- alternator/executor.cc | 22 +++++++++++----------- alternator/executor.hh | 10 +++++----- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index 85a138232c1c..3205bc06ff6a 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -2369,21 +2369,22 @@ std::optional executor::describe_single_item(schema_ptr schema, return item; } -std::vector executor::describe_multi_item(schema_ptr schema, - const query::partition_slice& slice, - const cql3::selection::selection& selection, - const query::result& query_result, - const std::optional& attrs_to_get) { - cql3::selection::result_set_builder builder(selection, gc_clock::now()); - query::result_view::consume(query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection)); +future> executor::describe_multi_item(schema_ptr schema, + const query::partition_slice&& slice, + shared_ptr selection, + foreign_ptr> query_result, + shared_ptr> attrs_to_get) { + cql3::selection::result_set_builder builder(*selection, gc_clock::now()); + query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection)); auto result_set = builder.build(); std::vector ret; for (auto& result_row : result_set->rows()) { rjson::value item = rjson::empty_object(); - describe_single_item(selection, result_row, attrs_to_get, item); + describe_single_item(*selection, result_row, *attrs_to_get, item); ret.push_back(std::move(item)); + co_await coroutine::maybe_yield(); } - return ret; + co_return ret; } static bool check_needs_read_before_write(const parsed::value& v) { @@ -3255,8 +3256,7 @@ future executor::batch_get_item(client_state& cli service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then( [schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get] (service::storage_proxy::coordinator_query_result qr) mutable { utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); }); - std::vector jsons = describe_multi_item(schema, partition_slice, *selection, *qr.query_result, *attrs_to_get); - return make_ready_future>(std::move(jsons)); + return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get)); }); response_futures.push_back(std::move(f)); } diff --git a/alternator/executor.hh b/alternator/executor.hh index b4c588ba0d1e..28b5c63b71d3 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -222,11 +222,11 @@ public: const query::result&, const std::optional&); - static std::vector describe_multi_item(schema_ptr schema, - const query::partition_slice& slice, - const cql3::selection::selection& selection, - const query::result& query_result, - const std::optional& attrs_to_get); + static future> describe_multi_item(schema_ptr schema, + const query::partition_slice&& slice, + shared_ptr selection, + foreign_ptr> query_result, + shared_ptr> attrs_to_get); static void describe_single_item(const cql3::selection::selection&, const std::vector&, From f96ed4dba5971308529eb6ae4294661b725a3ee9 Mon Sep 17 00:00:00 2001 From: Marcin Maliszkiewicz Date: Thu, 19 Jan 2023 15:22:18 +0100 Subject: [PATCH 2/3] utils: yield when streaming json in print() - removed buffer reuse to simplify the code - added co_await suspention point on each send() making it yield --- utils/rjson.cc | 106 +++++++++++++++++++++++-------------------------- 1 file changed, 50 insertions(+), 56 deletions(-) diff --git a/utils/rjson.cc b/utils/rjson.cc index 995eadbde735..2da72aea8fe0 100644 --- a/utils/rjson.cc +++ b/utils/rjson.cc @@ -227,68 +227,62 @@ std::string print(const rjson::value& value, size_t max_nested_level) { return std::string(buffer.GetString()); } -future<> print(const rjson::value& value, seastar::output_stream& os, size_t max_nested_level) { - struct os_buffer { - seastar::output_stream& _os; - temporary_buffer _buf; - size_t _pos = 0; - future<> _f = make_ready_future<>(); - - #pragma GCC diagnostic push - #pragma GCC diagnostic ignored "-Wunused-local-typedefs" - using Ch = char; - #pragma GCC diagnostic pop - - void send(bool try_reuse = true) { - if (_f.failed()) { - _f.get0(); - } - if (!_buf.empty() && _pos > 0) { - _buf.trim(_pos); - _pos = 0; - // Note: we're assuming we're writing to a buffered output_stream (hello http server). - // If we were not, or if (http) output_stream supported mixed buffered/packed content - // it might be a good idea to instead send our buffer as a packet directly. If so, the - // buffer size should probably increase (at least after first send()). - _f = _f.then([this, buf = std::move(_buf), &os = _os, try_reuse]() mutable -> future<> { - return os.write(buf.get(), buf.size()).then([this, buf = std::move(buf), try_reuse]() mutable { - // Chances are high we just copied this to output_stream buffer, and got here - // immediately. If so, reuse the buffer. - if (try_reuse && _buf.empty() && _pos == 0) { - _buf = std::move(buf); - } - }); - }); - } +// This class implements RapidJSON Handler and batches Put() calls into output_stream writes. +class output_stream_buffer { + static constexpr size_t _buf_size = 512; + seastar::output_stream& _os; + temporary_buffer _buf = temporary_buffer(_buf_size); + size_t _pos = 0; + + future<> send(temporary_buffer b) { + co_return co_await _os.write(b.get(), b.size()); + } +public: + output_stream_buffer(seastar::output_stream& os) : _os(os) {} + using Ch = char; // Used by rjson internally + future<> f = make_ready_future<>(); + + void Flush() { + if (f.failed()) { + f.get0(); } - void Put(char c) { - if (_pos == _buf.size()) { - send(); - if (_buf.empty()) { - _buf = temporary_buffer(512); - } - } - // Second note: Should consider writing directly to the buffer in output_stream - // instead of double buffering. But output_stream for a single char has higher - // overhead than the above check + once we hit a non-completed future, we'd have - // to revert to this method anyway... - *(_buf.get_write() + _pos) = c; - ++_pos; + if (_pos == 0) { + return; } - void Flush() { - send(); + if (_pos < _buf_size) { + _buf.trim(_pos); // Last flush may be shorter } - future<> finish()&& { - send(false); - return std::move(_f); + // Either we call futures right away (if they are ready) or we start growing continuations + // chain as we don't have the ability to wait here because Flush() signature is set by rjson. + f = f.then([this, b = std::move(_buf)] () mutable { + return send(std::move(b)); + }); + _pos = 0; + _buf = temporary_buffer(_buf_size); + } + + void Put(Ch c) { + if (_pos == _buf_size) { + Flush(); } - }; + // Note: Should consider writing directly to the buffer in output_stream + // instead of double buffering. But output_stream for a single char has higher + // overhead than the above check + once we hit a non-completed future, we'd have + // to revert to this method anyway... + *(_buf.get_write() + _pos) = c; + ++_pos; + } +}; - os_buffer osb{ os }; - using streamer = rapidjson::Writer; - guarded_yieldable_json_handler writer(osb, max_nested_level); +future<> print(const rjson::value& value, seastar::output_stream& os, size_t max_nested_level) { + output_stream_buffer buf{ os }; + using streamer = rapidjson::Writer; + guarded_yieldable_json_handler writer(buf, max_nested_level); value.Accept(writer); - co_return co_await std::move(osb).finish(); + buf.Flush(); + // This function has to be a coroutine otherwise buf gets destroyed before all its + // continuations from buf.f finish leading to use-after-free. + co_return co_await std::move(buf.f); } rjson::malformed_value::malformed_value(std::string_view name, const rjson::value& value) From 0b5655021a71263fce81d0f5766b6016d4d43b9f Mon Sep 17 00:00:00 2001 From: Marcin Maliszkiewicz Date: Fri, 20 Jan 2023 13:59:54 +0100 Subject: [PATCH 3/3] alternator: remove redundant flush call in make_streamed As output_stream close() is doing flush anyway. --- alternator/executor.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index 3205bc06ff6a..81e7073034ca 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -90,7 +90,6 @@ json::json_return_type make_streamed(rjson::value&& value) { auto lrs = std::move(rs); try { co_await rjson::print(*lrs, los); - co_await los.flush(); co_await los.close(); } catch (...) { // at this point, we cannot really do anything. HTTP headers and return code are