Skip to content

Commit

Permalink
Merge branch 'add_proper_aggregation_for_paged_indexing_2' of git://g…
Browse files Browse the repository at this point in the history
…ithub.com/psarna/scylla into next

Piotr Sarna says:

Fixes #4540
This series adds proper handling of aggregation for paged indexed queries.
Before this series returned results were presented to the user in per-page
partial manner, while they should have been returned as a single aggregated
value.

Tests: unit(dev)

Piotr Sarna (8):
  cql3: split execute_base_query implementation
  cql3: enable explicit copying of query_options
  cql3: add a query options constructor with explicit page size
  cql3: add proper aggregation to paged indexing
  cql3: make DEFAULT_COUNT_PAGE_SIZE constant public
  tests: add query_options to cquery_nofail
  tests: add indexing + paging + aggregation test case
  tests: add indexing+paging test case for clustering keys
  • Loading branch information
nyh committed Jun 24, 2019
2 parents 82c2fba + b668ee2 commit 7be3d5d
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 20 deletions.
12 changes: 12 additions & 0 deletions cql3/query_options.cc
Expand Up @@ -130,6 +130,18 @@ query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<ser

}

query_options::query_options(std::unique_ptr<query_options> qo, ::shared_ptr<service::pager::paging_state> paging_state, int32_t page_size)
: query_options(qo->_consistency,
qo->get_timeout_config(),
std::move(qo->_names),
std::move(qo->_values),
std::move(qo->_value_views),
qo->_skip_metadata,
std::move(query_options::specific_options{page_size, paging_state, qo->_options.serial_consistency, qo->_options.timestamp}),
qo->_cql_serialization_format) {

}

query_options::query_options(std::vector<cql3::raw_value> values)
: query_options(
db::consistency_level::ONE, infinite_timeout_config, std::move(values))
Expand Down
3 changes: 2 additions & 1 deletion cql3/query_options.hh
Expand Up @@ -102,7 +102,7 @@ private:

public:
query_options(query_options&&) = default;
query_options(const query_options&) = delete;
explicit query_options(const query_options&) = default;

explicit query_options(db::consistency_level consistency,
const timeout_config& timeouts,
Expand Down Expand Up @@ -155,6 +155,7 @@ public:
explicit query_options(db::consistency_level, const timeout_config& timeouts,
std::vector<cql3::raw_value> values, specific_options options = specific_options::DEFAULT);
explicit query_options(std::unique_ptr<query_options>, ::shared_ptr<service::pager::paging_state> paging_state);
explicit query_options(std::unique_ptr<query_options>, ::shared_ptr<service::pager::paging_state> paging_state, int32_t page_size);

const timeout_config& get_timeout_config() const { return _timeout_config; }

Expand Down
103 changes: 88 additions & 15 deletions cql3/statements/select_statement.cc
Expand Up @@ -443,8 +443,8 @@ indexed_table_select_statement::prepare_command_for_base_query(const query_optio
return cmd;
}

future<shared_ptr<cql_transport::messages::result_message>>
indexed_table_select_statement::execute_base_query(
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
indexed_table_select_statement::do_execute_base_query(
service::storage_proxy& proxy,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
Expand Down Expand Up @@ -495,22 +495,27 @@ indexed_table_select_statement::execute_base_query(
}).then([&merger]() {
return merger.get();
});
}).then([this, &proxy, &state, &options, now, cmd, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return this->process_base_query_results(std::move(result), cmd, proxy, state, options, now, std::move(paging_state));
}).then([cmd] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>(std::move(result), std::move(cmd));
});
}

// Function for fetching the selected columns from a list of clustering rows.
// It is currently used only in our Secondary Index implementation - ordinary
// CQL SELECT statements do not have the syntax to request a list of rows.
// FIXME: The current implementation is very inefficient - it requests each
// row separately (and, incrementally, in parallel). Even multiple rows from a single
// partition are requested separately. This last case can be easily improved,
// but to implement the general case (multiple rows from multiple partitions)
// efficiently, we will need more support from other layers.
// Keys are ordered in token order (see #3423)
future<shared_ptr<cql_transport::messages::result_message>>
indexed_table_select_statement::execute_base_query(
service::storage_proxy& proxy,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state) {
return do_execute_base_query(proxy, std::move(partition_ranges), state, options, now, paging_state).then(
[this, &proxy, &state, &options, now, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result, lw_shared_ptr<query::read_command> cmd) {
return process_base_query_results(std::move(result), std::move(cmd), proxy, state, options, now, std::move(paging_state));
});
}

future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
indexed_table_select_statement::do_execute_base_query(
service::storage_proxy& proxy,
std::vector<primary_key>&& primary_keys,
service::query_state& state,
Expand Down Expand Up @@ -565,9 +570,23 @@ indexed_table_select_statement::execute_base_query(
});
}).then([&merger] () {
return merger.get();
}).then([cmd] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>(std::move(result), std::move(cmd));
});
}).then([this, &proxy, &state, &options, now, cmd, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result) mutable {
return this->process_base_query_results(std::move(result), cmd, proxy, state, options, now, std::move(paging_state));
});
}

future<shared_ptr<cql_transport::messages::result_message>>
indexed_table_select_statement::execute_base_query(
service::storage_proxy& proxy,
std::vector<primary_key>&& primary_keys,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state) {
return do_execute_base_query(proxy, std::move(primary_keys), state, options, now, paging_state).then(
[this, &proxy, &state, &options, now, paging_state = std::move(paging_state)] (foreign_ptr<lw_shared_ptr<query::result>> result, lw_shared_ptr<query::read_command> cmd) {
return process_base_query_results(std::move(result), std::move(cmd), proxy, state, options, now, std::move(paging_state));
});
}

Expand Down Expand Up @@ -875,6 +894,60 @@ indexed_table_select_statement::do_execute(service::storage_proxy& proxy,
}
}

// Aggregated and paged filtering needs to aggregate the results from all pages
// in order to avoid returning partial per-page results (issue #4540).
// It's a little bit more complicated than regular aggregation, because each paging state
// needs to be translated between the base table and the underlying view.
// The routine below keeps fetching pages from the underlying view, which are then
// used to fetch base rows, which go straight to the result set builder.
// A local, internal copy of query_options is kept in order to keep updating
// the paging state between requesting data from replicas.
const bool aggregate = _selection->is_aggregate() || has_group_by();
if (aggregate) {
const bool restrictions_need_filtering = _restrictions->need_filtering();
return do_with(cql3::selection::result_set_builder(*_selection, now, options.get_cql_serialization_format()), std::make_unique<cql3::query_options>(cql3::query_options(options)),
[this, &options, &proxy, &state, now, whole_partitions, partition_slices, restrictions_need_filtering] (cql3::selection::result_set_builder& builder, std::unique_ptr<cql3::query_options>& internal_options) {
// page size is set to the internal count page size, regardless of the user-provided value
internal_options.reset(new cql3::query_options(std::move(internal_options), options.get_paging_state(), DEFAULT_COUNT_PAGE_SIZE));
return repeat([this, &builder, &options, &internal_options, &proxy, &state, now, whole_partitions, partition_slices, restrictions_need_filtering] () {
auto consume_results = [this, &builder, &options, &internal_options, restrictions_need_filtering] (foreign_ptr<lw_shared_ptr<query::result>> results, lw_shared_ptr<query::read_command> cmd) {
if (restrictions_need_filtering) {
query::result_view::consume(*results, cmd->slice, cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection,
cql3::selection::result_set_builder::restrictions_filter(_restrictions, options, cmd->row_limit, _schema, cmd->slice.partition_row_limit())));
} else {
query::result_view::consume(*results, cmd->slice, cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection));
}
};

if (whole_partitions || partition_slices) {
return find_index_partition_ranges(proxy, state, *internal_options).then(
[this, now, &state, &internal_options, &proxy, consume_results = std::move(consume_results)] (dht::partition_range_vector partition_ranges, ::shared_ptr<const service::pager::paging_state> paging_state) {
bool has_more_pages = paging_state && paging_state->get_remaining() > 0;
internal_options.reset(new cql3::query_options(std::move(internal_options), paging_state ? ::make_shared<service::pager::paging_state>(*paging_state) : nullptr));
return do_execute_base_query(proxy, std::move(partition_ranges), state, *internal_options, now, std::move(paging_state)).then(consume_results).then([has_more_pages] {
return stop_iteration(!has_more_pages);
});
});
} else {
return find_index_clustering_rows(proxy, state, *internal_options).then(
[this, now, &state, &internal_options, &proxy, consume_results = std::move(consume_results)] (std::vector<primary_key> primary_keys, ::shared_ptr<const service::pager::paging_state> paging_state) {
bool has_more_pages = paging_state && paging_state->get_remaining() > 0;
internal_options.reset(new cql3::query_options(std::move(internal_options), paging_state ? ::make_shared<service::pager::paging_state>(*paging_state) : nullptr));
return this->do_execute_base_query(proxy, std::move(primary_keys), state, *internal_options, now, std::move(paging_state)).then(consume_results).then([has_more_pages] {
return stop_iteration(!has_more_pages);
});
});
}
}).then([this, &builder, restrictions_need_filtering] () {
auto rs = builder.build();
update_stats_rows_read(rs->size());
_stats.filtered_rows_matched_total += restrictions_need_filtering ? rs->size() : 0;
auto msg = ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(rs)));
return make_ready_future<shared_ptr<cql_transport::messages::result_message>>(std::move(msg));
});
});
}

if (whole_partitions || partition_slices) {
// In this case, can use our normal query machinery, which retrieves
// entire partitions or the same slice for many partitions.
Expand Down
27 changes: 26 additions & 1 deletion cql3/statements/select_statement.hh
Expand Up @@ -68,8 +68,8 @@ class select_statement : public cql_statement {
public:
using parameters = raw::select_statement::parameters;
using ordering_comparator_type = raw::select_statement::ordering_comparator_type;
protected:
static constexpr int DEFAULT_COUNT_PAGE_SIZE = 10000;
protected:
static thread_local const ::shared_ptr<parameters> _default_parameters;
schema_ptr _schema;
uint32_t _bound_terms;
Expand Down Expand Up @@ -236,6 +236,14 @@ private:
lw_shared_ptr<query::read_command>
prepare_command_for_base_query(const query_options& options, service::query_state& state, gc_clock::time_point now, bool use_paging);

future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
do_execute_base_query(
service::storage_proxy& proxy,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state);
future<shared_ptr<cql_transport::messages::result_message>>
execute_base_query(
service::storage_proxy& proxy,
Expand All @@ -245,6 +253,23 @@ private:
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state);

// Function for fetching the selected columns from a list of clustering rows.
// It is currently used only in our Secondary Index implementation - ordinary
// CQL SELECT statements do not have the syntax to request a list of rows.
// FIXME: The current implementation is very inefficient - it requests each
// row separately (and, incrementally, in parallel). Even multiple rows from a single
// partition are requested separately. This last case can be easily improved,
// but to implement the general case (multiple rows from multiple partitions)
// efficiently, we will need more support from other layers.
// Keys are ordered in token order (see #3423)
future<foreign_ptr<lw_shared_ptr<query::result>>, lw_shared_ptr<query::read_command>>
do_execute_base_query(
service::storage_proxy& proxy,
std::vector<primary_key>&& primary_keys,
service::query_state& state,
const query_options& options,
gc_clock::time_point now,
::shared_ptr<const service::pager::paging_state> paging_state);
future<shared_ptr<cql_transport::messages::result_message>>
execute_base_query(
service::storage_proxy& proxy,
Expand Down
8 changes: 6 additions & 2 deletions tests/cql_assertions.cc
Expand Up @@ -180,9 +180,13 @@ rows_assertions rows_assertions::with_serialized_columns_count(size_t columns_co
}

shared_ptr<cql_transport::messages::result_message> cquery_nofail(
cql_test_env& env, const char* query, const std::experimental::source_location& loc) {
cql_test_env& env, const char* query, std::unique_ptr<cql3::query_options>&& qo, const std::experimental::source_location& loc) {
try {
return env.execute_cql(query).get0();
if (qo) {
return env.execute_cql(query, std::move(qo)).get0();
} else {
return env.execute_cql(query).get0();
}
} catch (...) {
BOOST_FAIL(format("query '{}' failed: {}\n{}:{}: originally from here",
query, std::current_exception(), loc.file_name(), loc.line()));
Expand Down
1 change: 1 addition & 0 deletions tests/cql_assertions.hh
Expand Up @@ -86,4 +86,5 @@ void assert_that_failed(future<T...>&& f)
shared_ptr<cql_transport::messages::result_message> cquery_nofail(
cql_test_env& env,
const char* query,
std::unique_ptr<cql3::query_options>&& qo = nullptr,
const std::experimental::source_location& loc = std::experimental::source_location::current());
2 changes: 1 addition & 1 deletion tests/cql_query_test.cc
Expand Up @@ -3703,7 +3703,7 @@ void require_rows(cql_test_env& e,
const std::vector<std::vector<bytes_opt>>& expected,
const source_location& loc = source_location::current()) {
try {
assert_that(cquery_nofail(e, qstr, loc)).is_rows().with_rows_ignore_order(expected);
assert_that(cquery_nofail(e, qstr, nullptr, loc)).is_rows().with_rows_ignore_order(expected);
}
catch (const std::exception& e) {
BOOST_FAIL(format("query '{}' failed: {}\n{}:{}: originally from here",
Expand Down
58 changes: 58 additions & 0 deletions tests/secondary_index_test.cc
Expand Up @@ -28,6 +28,7 @@
#include "types/list.hh"
#include "types/set.hh"
#include "exception_utils.hh"
#include "cql3/statements/select_statement.hh"


SEASTAR_TEST_CASE(test_secondary_index_regular_column_query) {
Expand Down Expand Up @@ -1132,3 +1133,60 @@ SEASTAR_TEST_CASE(test_secondary_index_on_partition_key_with_filtering) {
});
});
}

SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) {
static constexpr int row_count = 2 * cql3::statements::select_statement::DEFAULT_COUNT_PAGE_SIZE + 120;

return do_with_cql_env_thread([] (cql_test_env& e) {
cquery_nofail(e, "CREATE TABLE fpa (id int primary key, v int)");
cquery_nofail(e, "CREATE INDEX ON fpa(v)");
for (int i = 0; i < row_count; ++i) {
cquery_nofail(e, format("INSERT INTO fpa (id, v) VALUES ({}, {})", i + 1, i % 2).c_str());
}

auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 0;", std::move(qo));
// Even though we set up paging, we still expect a single result from an aggregation function.
// Also, instead of the user-provided page size, internal DEFAULT_COUNT_PAGE_SIZE is expected to be used.
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count * row_count / 4)},
});

// Even if paging is not explicitly used, the query will be internally paged to avoid OOM.
msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 1;");
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count * row_count / 4 + row_count / 2)},
});

qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa WHERE v = 1;", std::move(qo));
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count / 2 + 1)},
});

// Similar, but this time a non-prefix clustering key part is indexed (wrt. issue 3405, after which we have
// a special code path for indexing composite non-prefix clustering keys).
cquery_nofail(e, "CREATE TABLE fpa2 (id int, c1 int, c2 int, primary key (id, c1, c2))");
cquery_nofail(e, "CREATE INDEX ON fpa2(c2)");
for (int i = 0; i < row_count; ++i) {
cquery_nofail(e, format("INSERT INTO fpa2 (id, c1, c2) VALUES ({}, {}, {})", i + 1, i + 1, i % 2).c_str());
}

qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()});
msg = cquery_nofail(e, "SELECT sum(id) FROM fpa2 WHERE c2 = 0;", std::move(qo));
// Even though we set up paging, we still expect a single result from an aggregation function
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count * row_count / 4)},
});

qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()});
msg = cquery_nofail(e, "SELECT avg(id) FROM fpa2 WHERE c2 = 1;", std::move(qo));
assert_that(msg).is_rows().with_rows({
{ int32_type->decompose(row_count / 2 + 1)},
});
});
}

0 comments on commit 7be3d5d

Please sign in to comment.