Skip to content

Commit

Permalink
cql3: add proper aggregation to paged indexing
Browse files Browse the repository at this point in the history
Aggregated and paged filtering needs to aggregate the results
from all pages in order to avoid returning partial per-page
results. It's a little bit more complicated than regular aggregation,
because each paging state needs to be translated between the base
table and the underlying view. The routine keeps fetching pages
from the underlying view, which are then used to fetch base rows,
which go straight to the result set builder.

Fixes scylladb#4540
  • Loading branch information
psarna committed Nov 14, 2019
1 parent 7c991a2 commit 37ed603
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions cql3/statements/select_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,60 @@ indexed_table_select_statement::do_execute(service::storage_proxy& proxy,
}
}

// Aggregated and paged filtering needs to aggregate the results from all pages
// in order to avoid returning partial per-page results (issue #4540).
// It's a little bit more complicated than regular aggregation, because each paging state
// needs to be translated between the base table and the underlying view.
// The routine below keeps fetching pages from the underlying view, which are then
// used to fetch base rows, which go straight to the result set builder.
// A local, internal copy of query_options is kept in order to keep updating
// the paging state between requesting data from replicas.
const bool aggregate = _selection->is_aggregate();
if (aggregate) {
const bool restrictions_need_filtering = _restrictions->need_filtering();
return do_with(cql3::selection::result_set_builder(*_selection, now, options.get_cql_serialization_format()), std::make_unique<cql3::query_options>(cql3::query_options(options)),
[this, &options, &proxy, &state, now, whole_partitions, partition_slices, restrictions_need_filtering] (cql3::selection::result_set_builder& builder, std::unique_ptr<cql3::query_options>& internal_options) {
// page size is set to the internal count page size, regardless of the user-provided value
internal_options.reset(new cql3::query_options(std::move(internal_options), options.get_paging_state(), DEFAULT_COUNT_PAGE_SIZE));
return repeat([this, &builder, &options, &internal_options, &proxy, &state, now, whole_partitions, partition_slices, restrictions_need_filtering] () {
auto consume_results = [this, &builder, &options, &internal_options, restrictions_need_filtering] (foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd) {
if (restrictions_need_filtering) {
query::result_view::consume(*results, cmd->slice, cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection,
cql3::selection::result_set_builder::restrictions_filter(_restrictions, options, cmd->row_limit)));
} else {
query::result_view::consume(*results, cmd->slice, cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection));
}
};

if (whole_partitions || partition_slices) {
return find_index_partition_ranges(proxy, state, *internal_options).then(
[this, now, &state, &internal_options, &proxy, consume_results = std::move(consume_results)] (dht::partition_range_vector partition_ranges, ::shared_ptr<const service::pager::paging_state> paging_state) {
bool has_more_pages = paging_state && paging_state->get_remaining() > 0;
internal_options.reset(new cql3::query_options(std::move(internal_options), paging_state ? ::make_shared<service::pager::paging_state>(*paging_state) : nullptr));
return do_execute_base_query(proxy, std::move(partition_ranges), state, *internal_options, now, std::move(paging_state)).then(consume_results).then([has_more_pages] {
return stop_iteration(!has_more_pages);
});
});
} else {
return find_index_clustering_rows(proxy, state, *internal_options).then(
[this, now, &state, &internal_options, &proxy, consume_results = std::move(consume_results)] (std::vector<primary_key> primary_keys, ::shared_ptr<const service::pager::paging_state> paging_state) {
bool has_more_pages = paging_state && paging_state->get_remaining() > 0;
internal_options.reset(new cql3::query_options(std::move(internal_options), paging_state ? ::make_shared<service::pager::paging_state>(*paging_state) : nullptr));
return this->do_execute_base_query(proxy, std::move(primary_keys), state, *internal_options, now, std::move(paging_state)).then(consume_results).then([has_more_pages] {
return stop_iteration(!has_more_pages);
});
});
}
}).then([this, &builder, restrictions_need_filtering] () {
auto rs = builder.build();
update_stats_rows_read(rs->size());
_stats.filtered_rows_matched_total += restrictions_need_filtering ? rs->size() : 0;
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(rs)));
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
});
});
}

if (whole_partitions || partition_slices) {
// In this case, can use our normal query machinery, which retrieves
// entire partitions or the same slice for many partitions.
Expand Down

0 comments on commit 37ed603

Please sign in to comment.