Skip to content

Commit

Permalink
untyped_result_set: Do not copy data from input store (retain fragmen…
Browse files Browse the repository at this point in the history
…ted views)

Refs #7961
Fixes #8014

Instead of doing a deep copy of input, we keep assume ownership and build
rows of the views therein, potentially retaining fragmented data as-is
avoiding premature linearization.

Note that this is not all sugar and flowers though. Any data access will
by nature be more expensive, and the view collections we create are
potentially just as expensive as copying for small cells.

Otoh, it allows writing code using this that avoids data copying,
depending on destination.

v2:
* Fixed wrong collection reserved in visitor
* Changed row index from shared ptr to ref
* Moved typedef
* Removed non-existing constructors
* Added const ref to index build
* Fixed raft usage after rebase

v3:
* Changed shared_ptr to unique
  • Loading branch information
Calle Wilund committed Mar 3, 2021
1 parent 353730d commit e4d6c89
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 60 deletions.
16 changes: 5 additions & 11 deletions cdc/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -813,13 +813,13 @@ static bytes_opt get_preimage_col_value(const column_definition& cdef, const cql
auto v = pirow->get_view(cdef.name_as_text());
auto f = cql_serialization_format::internal();
auto n = read_collection_size(v, f);
std::vector<bytes_view> tmp;
std::vector<bytes> tmp;
tmp.reserve(n);
while (n--) {
tmp.emplace_back(read_collection_value(v, f)); // key
tmp.emplace_back(read_collection_value(v, f).linearize()); // key
read_collection_value(v, f); // value. ignore.
}
return set_type_impl::serialize_partially_deserialized_form(tmp, f);
return set_type_impl::serialize_partially_deserialized_form({tmp.begin(), tmp.end()}, f);
},
[&] (const abstract_type& o) -> bytes {
return pirow->get_blob(cdef.name_as_text());
Expand Down Expand Up @@ -1646,13 +1646,7 @@ class transformer final : public change_processor {
try {
return _ctx._proxy.query(_schema, std::move(command), std::move(partition_ranges), select_cl, service::storage_proxy::coordinator_query_options(default_timeout(), empty_service_permit(), client_state)).then(
[s = _schema, partition_slice = std::move(partition_slice), selection = std::move(selection)] (service::storage_proxy::coordinator_query_result qr) -> lw_shared_ptr<cql3::untyped_result_set> {
cql3::selection::result_set_builder builder(*selection, gc_clock::now(), cql_serialization_format::latest());
query::result_view::consume(*qr.query_result, partition_slice, cql3::selection::result_set_builder::visitor(builder, *s, *selection));
auto result_set = builder.build();
if (!result_set || result_set->empty()) {
return {};
}
return make_lw_shared<cql3::untyped_result_set>(*result_set);
return make_lw_shared<cql3::untyped_result_set>(*s, std::move(qr.query_result), *selection, partition_slice);
});
} catch (exceptions::unavailable_exception& e) {
// `query` can throw `unavailable_exception`, which is seen by clients as ~ "NoHostAvailable".
Expand Down Expand Up @@ -1696,7 +1690,7 @@ class transformer final : public change_processor {
// as there will be no clustering row data to load into the state.
return;
}
ck_parts.emplace_back(*v);
ck_parts.emplace_back(v->linearize());
}
auto ck = clustering_key::from_exploded(std::move(ck_parts));

Expand Down
2 changes: 2 additions & 0 deletions cql3/result_generator.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "stats.hh"

namespace cql3 {
class untyped_result_set;

class result_generator {
schema_ptr _schema;
Expand All @@ -33,6 +34,7 @@ class result_generator {
shared_ptr<const selection::selection> _selection;
cql_stats* _stats;
private:
friend class untyped_result_set;
template<typename Visitor>
class query_result_visitor {
const schema& _schema;
Expand Down
124 changes: 92 additions & 32 deletions cql3/untyped_result_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,58 +47,118 @@
#include "result_set.hh"
#include "transport/messages/result_message.hh"

cql3::untyped_result_set_row::untyped_result_set_row(const map_t& data)
: _data(data)
cql3::untyped_result_set_row::untyped_result_set_row(const index_map& index, const cql3::metadata& metadata, data_views data)
: _name_to_index(index)
, _metadata(metadata)
, _data(std::move(data))
{}

cql3::untyped_result_set_row::untyped_result_set_row(const std::vector<lw_shared_ptr<column_specification>>& columns, std::vector<bytes_opt> data)
: _columns(columns)
, _data([&columns, data = std::move(data)] () mutable {
map_t tmp;
std::transform(columns.begin(), columns.end(), data.begin(), std::inserter(tmp, tmp.end()), [](lw_shared_ptr<column_specification> c, bytes_opt& d) {
return std::make_pair<sstring, bytes_opt>(c->name->to_string(), std::move(d));
});
return tmp;
}())
{}
size_t cql3::untyped_result_set_row::index(const std::string_view& name) const {
auto i = _name_to_index.find(name);
return i != _name_to_index.end() ? i->second : std::numeric_limits<size_t>::max();
}

bool cql3::untyped_result_set_row::has(std::string_view name) const {
auto i = _data.find(name);
return i != _data.end() && i->second;
auto i = index(name);
if (i < _data.size()) {
return !std::holds_alternative<std::monostate>(_data.at(i));
}
return false;
}

cql3::untyped_result_set_row::view_type cql3::untyped_result_set_row::get_view(std::string_view name) const {
return std::visit(make_visitor(
[](std::monostate) -> view_type { throw std::bad_variant_access(); },
[](const view_type& v) -> view_type { return v; },
[](const bytes& b) -> view_type { return view_type(b); }
), _data.at(index(name)));
}

const std::vector<lw_shared_ptr<cql3::column_specification>>& cql3::untyped_result_set_row::get_columns() const {
return _metadata.get_names();
}

using cql_transport::messages::result_message;

cql3::untyped_result_set::untyped_result_set(const cql3::result_set& rs) {
auto& cn = rs.get_metadata().get_names();
for (auto& r : rs.rows()) {
// r is const ref. TODO: make this more efficient by either wrapping result set
// or adding modifying accessors to it.
_rows.emplace_back(cn, r);
}
cql3::untyped_result_set::index_map_ptr cql3::untyped_result_set::make_index(const cql3::metadata& metadata) {
auto map = std::make_unique<untyped_result_set_row::index_map>();
auto& names = metadata.get_names();
size_t i = 0;
std::transform(names.begin(), names.end(), std::inserter(*map, map->end()), [&](const lw_shared_ptr<column_specification>& c) mutable {
return std::make_pair<std::string_view, size_t>(c->name->text(), i++);
});
return map;
}

struct cql3::untyped_result_set::visitor {
rows_type& rows;
const cql3::metadata& meta;
const untyped_result_set_row::index_map& index;
untyped_result_set_row::data_views tmp;

visitor(rows_type& r, const cql3::metadata& m, const untyped_result_set_row::index_map& i)
: rows(r)
, meta(m)
, index(i)
{}

void start_row() {
tmp.reserve(index.size());
}
void accept_value(std::optional<query::result_bytes_view>&& v) {
if (v) {
tmp.emplace_back(std::move(*v));
} else {
tmp.emplace_back(std::monostate{});
}
}
// somewhat weird dispatch, but when visiting directly via
// result_generator, pk:s will be temporary - and sent
// as views, not opt_views. So we can dispatch on this and
// simply copy the temporaries.
void accept_value(const query::result_bytes_view& v) {
tmp.emplace_back(v.linearize());
}
void end_row() {
rows.emplace_back(untyped_result_set_row(index, meta, std::exchange(tmp, {})));
}
};

cql3::untyped_result_set::untyped_result_set(::shared_ptr<result_message> msg)
: _rows([msg]{
class visitor : public result_message::visitor_base {
: _storage(msg)
{
class msg_visitor : public result_message::visitor_base {
public:
std::optional<untyped_result_set> res;
const cql3::result* res = nullptr;
void visit(const result_message::rows& rmrs) override {
const auto& rs = rmrs.rs();
const auto& set = rs.result_set();
res.emplace(set); // construct untyped_result_set by const ref.
res = &rmrs.rs();
}
};
visitor v;
msg_visitor v;
if (msg != nullptr) {
msg->accept(v);
}
if (v.res) {
return std::move(v.res->_rows);
auto& metadata = v.res->get_metadata();
_index = make_index(metadata);
v.res->visit(visitor{_rows, metadata, *_index});
}
return rows_type{};
}())
{}
}

cql3::untyped_result_set::untyped_result_set(const schema& s, foreign_ptr<lw_shared_ptr<query::result>> qr, const cql3::selection::selection& selection, const query::partition_slice& slice)
: _storage(std::make_tuple(std::move(qr), selection.get_result_metadata()))
{
auto& qt = std::get<qr_tuple>(_storage);
auto& qres = std::get<0>(qt);
auto& metadata = *std::get<1>(qt);

_index = make_index(metadata);
visitor v{_rows, metadata, *_index};
result_generator::query_result_visitor<visitor> vv(s, v, selection);
query::result_view::consume(*qres, slice, vv);
}

cql3::untyped_result_set::~untyped_result_set() = default;

const cql3::untyped_result_set_row& cql3::untyped_result_set::one() const {
if (_rows.size() != 1) {
Expand Down
68 changes: 53 additions & 15 deletions cql3/untyped_result_set.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,36 +41,60 @@
*/
#include <unordered_map>
#include <optional>
#include <seastar/core/sharded.hh>
#include "bytes.hh"
#include "types.hh"
#include "types/map.hh"
#include "types/list.hh"
#include "types/set.hh"
#include "serializer.hh"
#include "bytes_ostream.hh"
#include "transport/messages/result_message_base.hh"
#include "column_specification.hh"
#include "absl-flat_hash_map.hh"
#include "query-result.hh"

#pragma once

namespace query {
class result;
class partition_slice;
// duplicate template def. But avoids a huge include chain
using result_bytes_view = ser::buffer_view<bytes_ostream::fragment_iterator>;
}

namespace cql3 {
namespace selection {
class selection;
}

class untyped_result_set;
class result;
class metadata;

class untyped_result_set_row {
public:
using view_type = query::result_bytes_view;
using opt_view_type = std::optional<view_type>;
using view_holder = std::variant<std::monostate, view_type, bytes>;
private:
const std::vector<lw_shared_ptr<column_specification>> _columns;
using map_t = flat_hash_map<sstring, bytes_opt>;
const map_t _data;
friend class untyped_result_set;
using index_map = std::unordered_map<std::string_view, size_t>;
using data_views = std::vector<view_holder>;

const index_map& _name_to_index;
const cql3::metadata& _metadata;
data_views _data;

untyped_result_set_row(const index_map&, const cql3::metadata&, data_views);
size_t index(const std::string_view&) const;
public:
untyped_result_set_row(const map_t&);
untyped_result_set_row(const std::vector<lw_shared_ptr<column_specification>>&, std::vector<bytes_opt>);
untyped_result_set_row(untyped_result_set_row&&) = default;
untyped_result_set_row(const untyped_result_set_row&) = delete;

bool has(std::string_view) const;
bytes_view get_view(std::string_view name) const {
return _data.at(name).value();
}
view_type get_view(std::string_view name) const;
bytes get_blob(std::string_view name) const {
return bytes(get_view(name));
return get_view(name).linearize();
}
template<typename T>
T get_as(std::string_view name) const {
Expand All @@ -80,7 +104,7 @@ public:
std::optional<T> get_opt(std::string_view name) const {
return has(name) ? get_as<T>(name) : std::optional<T>{};
}
bytes_view_opt get_view_opt(const sstring& name) const {
opt_view_type get_view_opt(const sstring& name) const {
if (has(name)) {
return get_view(name);
}
Expand Down Expand Up @@ -147,23 +171,25 @@ public:
get_set_data<V>(name, std::inserter(res, res.end()), valtype);
return res;
}
const std::vector<lw_shared_ptr<column_specification>>& get_columns() const {
return _columns;
const cql3::metadata& get_metadata() const {
return _metadata;
}
const std::vector<lw_shared_ptr<column_specification>>& get_columns() const;
};

class result_set;

class untyped_result_set {
public:
using row = untyped_result_set_row;
typedef std::vector<row> rows_type;
using rows_type = std::vector<row>;
using const_iterator = rows_type::const_iterator;
using iterator = rows_type::const_iterator;

untyped_result_set(::shared_ptr<cql_transport::messages::result_message>);
untyped_result_set(const cql3::result_set&);
untyped_result_set(const schema&, foreign_ptr<lw_shared_ptr<query::result>>, const cql3::selection::selection&, const query::partition_slice&);
untyped_result_set(untyped_result_set&&) = default;
~untyped_result_set();

const_iterator begin() const {
return _rows.begin();
Expand All @@ -188,8 +214,20 @@ public:
return _rows.back();
}
private:
using index_map_ptr = std::unique_ptr<untyped_result_set_row::index_map>;
using qr_tuple = std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, shared_ptr<const cql3::metadata>>;
using storage = std::variant<std::monostate
, ::shared_ptr<cql_transport::messages::result_message>
, qr_tuple
>;
struct visitor;

storage _storage;
index_map_ptr _index;
rows_type _rows;

untyped_result_set() = default;
static index_map_ptr make_index(const cql3::metadata&);
public:
static untyped_result_set make_empty() {
return untyped_result_set();
Expand Down
4 changes: 2 additions & 2 deletions service/raft/raft_sys_table_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ future<raft::log_entries> raft_sys_table_storage::load_log() {
for (const cql3::untyped_result_set_row& row : *rs) {
raft::term_t term = raft::term_t(row.get_as<int64_t>("term"));
raft::index_t idx = raft::index_t(row.get_as<int64_t>("index"));
bytes_view raw_data = row.get_view("data");
auto raw_data = row.get_blob("data");
auto in = ser::as_input_stream(raw_data);
using data_variant_type = decltype(raft::log_entry::data);
data_variant_type data = ser::deserialize(in, boost::type<data_variant_type>());
Expand All @@ -98,7 +98,7 @@ future<raft::snapshot> raft_sys_table_storage::load_snapshot() {
db::system_keyspace::RAFT_SNAPSHOTS);
::shared_ptr<cql3::untyped_result_set> snp_rs = co_await _qp.execute_internal(load_snp_info_cql, {int64_t(_group_id), snapshot_id});
const auto& snp_row = snp_rs->one(); // should be only one matching row for a given snapshot id
bytes_view snp_cfg = snp_row.get_view("config");
auto snp_cfg = snp_row.get_blob("config");
auto in = ser::as_input_stream(snp_cfg);

raft::snapshot s{
Expand Down

0 comments on commit e4d6c89

Please sign in to comment.