New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distribute select count(*)
queries
#9209
Conversation
I've never seen a COUNT(*) query that would fail this detector. So this part will work just fine for production. |
@@ -0,0 +1,21 @@ | |||
namespace query { | |||
struct forward_request { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"forwarding" as a term refers to when the request is fully processed by the new node, which is not the case here, as the caller does more processing. We'll need to find a better term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delegate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's more or less the same as forwarding. We're subcontracting some of the work here, but not all of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but the request we forward/delegate also makes sense as a standalone request, and I'd say that it's an implementation detail that we create multiple requests, forward/delegate them and then merge the results. In fact, our first iteration of this project implemented full forwarding - the whole request was simply forwarded from one coordinator to another. Perhaps that's why the name sounds reasonable to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nodes we are "forwarding" to: are they expected in turn to read from more nodes (replicas), or only from themselves? In the latter case this is not forwarding, but query processing pushdown to replicas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are "forwarding" to other coordinators. This node's working title is a "supercoordinator"
message/messaging_service.cc
Outdated
@@ -573,6 +575,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { | |||
case messaging_verb::HINT_MUTATION: | |||
case messaging_verb::HINT_SYNC_POINT_CREATE: | |||
case messaging_verb::HINT_SYNC_POINT_CHECK: | |||
case messaging_verb::FORWARD_REQUEST: | |||
return 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to allocate a new connection for this. Otherwise we run the risk of deadlocks.
Consider two nodes, A and B. Both are flooded with with COUNT(*) queries. They respond by sending FORWARD requests to each other. This happens, until the RPC servers on both nodes hit their capacity. The RPC servers process the FORWARD requests by issuing READ_DATA requests to each other. However, these READ_DATA requests are waiting for server resources. But the server resources will only be released when FORWARD processing is complete. So we deadlock. The deadlock might be resolved by timeouts, but that's not a good way out.
Allocating a new connection ensures the rpc server has a separate resource pool for FOWARD and READ_DATA requests, and so deadlock cannot occur. This is why MUTATION_DONE and MUTATION_FAILED requests have a separate connection too.
service/storage_proxy.cc
Outdated
#include "cql3/query_options.hh" | ||
#include "cql_serialization_format.hh" | ||
#include "service/pager/query_pagers.hh" | ||
#include "cql3/result_set.hh" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be a new service. As written, we create circular dependency between storage_proxy and cql3. We're also running the risk of a deadlock, if both super-coordinator and coordinator requests take resources from the same pool.
service/storage_proxy.cc
Outdated
if (i < ranges.size()) { | ||
return {ranges[i++]}; | ||
} else { | ||
ranges = generator(1024); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not call it with a parameter of 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When calling it with parameter of 1, each partition range that generator
creates will be returned in a separate vector. I wanted to decrease the number of vector creations, and written the next_vnode
lambda to do so. next_vnode
acts as an iterator, it returns a single partition range from its internal cache of partition ranges. The cache is refilled if needed, using the generator
with an argument > 1 (I chose 1024 due to the fact, that it's a generator
's internal limit).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A vnode represents a huge amount of work, gigabytes to terabytes. An extra allocation here is invisible.
service/storage_proxy.cc
Outdated
std::optional<query::forward_result> result; | ||
return do_with(std::move(vnodes_per_addr), std::move(req), std::move(result), | ||
[this] (auto& vnodes_per_addr, auto& req, auto& result) { | ||
// forward request to each endpoint and merge results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid auto parameters, it makes the thing a template and everything becomes more complicated. In any case you there's no need to use do_with, just make it a coroutine.
service/storage_proxy.cc
Outdated
return _messaging | ||
.send_forward_request(addr, req_with_modified_pr) | ||
.then([&req, &result](auto res) { | ||
if (result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here, coroutines are cleaner and safer.
service/storage_proxy.cc
Outdated
std::move(req.pr), nullptr); | ||
|
||
// execute query | ||
return do_with( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coroutine
Looks good. Please add metrics so we can count how many requests are sent and received per node in this new layer. |
|
||
void forward_result::merge(const forward_result& other, const forward_request::reduction_type& type) { | ||
switch (type) { | ||
case forward_request::reduction_type::count: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't scale well, especially if we want to support UDAs. I think we should use @avikivity's distributed map-reduce infrastructure instead, and define all built-in aggregates that support map-reduce as a pair of map
and reduce
functions. This will be forward compatible with arbitrary user defined map-reduce funtions (via Lua/Wasm).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what infrastructure are you talking about. Could you provide some links/pointers to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I wanted to refer to this: https://docs.google.com/document/d/1McmaG--HGtjokIUtL4AnTE-hABMJ71OTOlzx9wmtAvk/edit#
I don't think we ever got beyond an idea phase. In any case, we can merge the code as is and generalize later. Verbs are difficult to change/remove though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The general form is cql3::expr::expression
. We don't have a way to serialize it now.
I'm okay with adding a new verb if/when we can serialize expressions.
It is not clear to me, whether the coordinator receiving such a sub-query is expected to contact other nodes, or strictly execute it itself. In the latter case we are talking about query pushdown, not forwarding and I don't think we need a brand new mechanism for it (forwarding), we can extend the existing read verbs to optionally accept a |
I think it is supposed to be the first case. Data from multiple replicas must be merged before the aggregate can be applied. This task can be divided by the "super-coordinator" into sub-ranges, then each sub-range delegated to other nodes/shards - "coordinators" - and finally the results can be merged by the "super-coordinator". This way we can utilize multiple CPU cores. |
I think we can do better. One of the reason aggregate queries are so slow is that all data in the table is sent to the coordinator, which then aggregates it. We should aim at eliminating this data movement altogether by splitting the range into sub-ranges such that each sub-range is owned by a single primary replica. This replica would then become the coordinator for this sub-range. It would read the range locally, requesting only digests from other replicas (to satisfy CL). In case of digest mismatch it would fall-back to the slow data merging method. This can be done as a follow-up. |
In this RFC, the queried range is divided with a per-vnode granularity. Each sub-range is sent to the first live replica of the corresponding vnode. See the |
service/storage_proxy.cc
Outdated
std::map<netw::messaging_service::msg_addr, dht::partition_range_vector> vnodes_per_addr; | ||
while (std::optional<dht::partition_range> vnode = next_vnode()) { | ||
inet_address_vector_replica_set live_endpoints = get_live_endpoints(ks, end_token(*vnode)); | ||
auto endpoint_addr = netw::messaging_service::msg_addr{*live_endpoints.begin(), 0}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we should check if live_endpoints
is not empty and throw an appropriate exception if it is.
Right, so the first part of my suggestion is implemented. The second part (digest reads to remote replicas) can be a follow-up. |
service/storage_proxy.cc
Outdated
cql3::query_options::specific_options::DEFAULT, | ||
cql_serialization_format::latest()); | ||
auto p = service::pager::query_pagers::pager(schema, selection, *query_state, *query_options, command, | ||
std::move(req.pr), nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To increase the parallelism even further and reduce the need for moving data between shards, here you could probably split the query range again - this time with a per-shard granularity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would result in an explosion of ranges. Not sure it is worth it, we already have a quite efficient shard-merging in query_data_from_all_shards()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
Actually, since you are using a query pager to execute the query on the delegate coordinator, this might already be the case. |
d9c462e
to
5378658
Compare
v2:
|
cql3/statements/select_statement.cc
Outdated
@@ -379,6 +382,29 @@ select_statement::do_execute(service::storage_proxy& proxy, | |||
command->slice.options.set<query::partition_slice::option::allow_short_read>(); | |||
auto timeout_duration = get_timeout(state.get_client_state(), options); | |||
auto timeout = db::timeout_clock::now() + timeout_duration; | |||
|
|||
if (aggregate && _selection->is_count()) { | |||
query::forward_request req = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to detect this at prepare() time and create a different select_statement subclass instead, similar to how we have indexed_table_select_statement. This won't have a huge impact, but that's the direction the code should be moving in - making more decisions at prepare time and fewer during runtime.
return false; | ||
} | ||
return p->name().name == "countRows"; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As also mentioned in patch 6, I'd like to see this detected during prepare time instead of runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not see a response to this.
Is it possible to fool the check using SELECT x as "countRows"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm only judging by the code, but I think it won't be fooled - AS X
is an alias, and the dynamic cast above specifically looks for an abstract_function instance. Unless it's possible to override the built-in "countRows" function with a non-aggregate or an aggregate that doesn't actually count rows, we should be safe.
idl/forward_request.idl.hh
Outdated
@@ -0,0 +1,21 @@ | |||
namespace query { | |||
struct forward_request { | |||
enum class reduction_type:uint8_t { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
coding style: spaces around :
message/messaging_service.cc
Outdated
future<query::forward_result> messaging_service::send_forward_request(msg_addr id, query::forward_request req) { | ||
return send_message<future<query::forward_result>>(this, messaging_verb::FORWARD_REQUEST, std::move(id), std::move(req)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since 0ea7955 we can autogenerate this boilerplate, please use it instead.
query-request.hh
Outdated
count, | ||
}; | ||
reduction_type type; | ||
std::vector<sstring> selected_columns; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why string? We should use column_id (and pass the schema id along to make sure we interpret it correctly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is all in read_command already.
query-request.hh
Outdated
|
||
db::consistency_level cl; | ||
lowres_clock::time_point timeout; | ||
int32_t page_size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think page_size is not meaningful since we're asking for exactly one row.
Looks good. One additional request: please add a docs entry, e.g. as |
@@ -496,6 +503,8 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { | |||
case messaging_verb::MUTATION_DONE: | |||
case messaging_verb::MUTATION_FAILED: | |||
return 3; | |||
case messaging_verb::FORWARD_REQUEST: | |||
return 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a new socket for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid the risk of deadlocks, details in this Avi's comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And why not reuse idx 3
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@avikivity is Gleb's suggestion ok for you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it works.
The two types of verbs are very different. MUTATION_DONE/MUTATION_FAILED are nowait messages. FORWARD_REQUEST is a round-trip request with high latency. Maybe we shouldn't mix them, causing write acknowledgements to be delayed behind these expensive requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@avikivity Each socket takes resources. Adding one more index here increase number of sockets in the system proportional to number of shards. Latency of one request does not affect latency of the others on the same connection unless connection buffers are full which is not likely will be the case for aggregate requests that do most of the work on a nodes and send back only a single value. There is not problem to mix nowait and round-trip requests on the same socket as well and we do it all the time.
Added document describes the design of a mechanism that parallelizes execution of aggregation queries.
v7:
|
@@ -64,6 +64,7 @@ constexpr std::string_view features::SEPARATE_PAGE_SIZE_AND_SAFETY_LIMIT = "SEPA | |||
constexpr std::string_view features::SUPPORTS_RAFT_CLUSTER_MANAGEMENT = "SUPPORTS_RAFT_CLUSTER_MANAGEMENT"; | |||
constexpr std::string_view features::USES_RAFT_CLUSTER_MANAGEMENT = "USES_RAFT_CLUSTER_MANAGEMENT"; | |||
constexpr std::string_view features::TOMBSTONE_GC_OPTIONS = "TOMBSTONE_GC_OPTIONS"; | |||
constexpr std::string_view features::PARALLELIZED_AGGREGATION = "PARALLELIZED_AGGREGATION"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll have to call the next flag PARALLELIZED_AGGREGATION_FOR_REAL_NOT_JUST_COUNT_STAR, but okay.
&& !restrictions->need_filtering() // No filtering | ||
&& group_by_cell_indices->empty() // No GROUP BY | ||
// All potential intermediate coordinators must support forwarding | ||
&& db.features().cluster_supports_parallelized_aggregation(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A small problem is that a prepared statement won't be re-prepared once the cluster is upgraded, and will keep using the old implementation. I think it's tolerable.
This pull request speeds up execution of `count(*)` queries. It does so by splitting given query into sub-queries and distributing them across some group of nodes for parallel execution. New level of coordination was added. Node called super-coordinator splits aggregation query into sub-queries and distributes them across some group of coordinators. Super-coordinator is also responsible for merging results. To develop a mechanism for speeding up `count(*)` queries, there was a need to detect which queries have a `count(*)` selector. Due to this pull request being a proof of concept, detection was realized rather poorly. It is only allows catching the simplest cases of `count(*)` queries (with only one selector and no column name specified). After detecting that a query is a `count(*)` it should be split into sub-queries and sent to another coordinators. Splitting part wasn't that difficult, it has been achieved by limiting original query's partition ranges. Sending modified query to another node was much harder. The easiest scenario would be to send whole `cql3::statements::select_statement`. Unfortunately `cql3::statements::select_statement` can't be [de]serialized, so sending it was out of the question. Even more unfortunately, some non-[de]serializable members of `cql3::statements::select_statement` are required to start the execution process of this statement. Finally, I have decided to send a `query::read_command` paired with required [de]serializable members. Objects, that cannot be [de]serialized (such as query's selector) are mocked on the receiving end. When a super-coordinator receives a `count(*)` query, it splits it into sub-queries. It does so, by splitting original query's partition ranges into list of vnodes, grouping them by their owner and creating sub-queries with partition ranges set to successive results of such grouping. After creation, each sub-query is sent to the owner of its partition ranges. Owner dispatches received sub-query to all of its shards. Shards slice partition ranges of the received sub-query, so that they will only query data that is owned by them. Each shard becomes a coordinator and executes so prepared sub-query. 3 node cluster set up on powerful desktops located in the office (3x32 cores) Filled the cluster with ~2 * 10^8 rows using scylla-bench and run: ``` time cqlsh <ip> <port> --request-timeout=3600 -e "select count(*) from scylla_bench.test using timeout 1h;" ``` * master: 68s * this branch: 2s 3 node cluster (each node had 2 shards, `murmur3_ignore_msb_bits` was set to 1, `num_tokens` was set to 3) ``` > cqlsh -e 'tracing on; select count(*) from ks.t; Now Tracing is enabled count ------- 1000 (1 rows) Tracing session: e5852020-7fc3-11ec-8600-4c4c210dd657 activity | timestamp | source | source_elapsed | client ---------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+-----------+----------------+----------- Execute CQL3 query | 2022-01-27 22:53:08.770000 | 127.0.0.1 | 0 | 127.0.0.1 Parsing a statement [shard 1] | 2022-01-27 22:53:08.770451 | 127.0.0.1 | -- | 127.0.0.1 Processing a statement [shard 1] | 2022-01-27 22:53:08.770487 | 127.0.0.1 | 36 | 127.0.0.1 Dispatching forward_request to 3 endpoints [shard 1] | 2022-01-27 22:53:08.770509 | 127.0.0.1 | 58 | 127.0.0.1 Sending forward_request to 127.0.0.1:0 [shard 1] | 2022-01-27 22:53:08.770516 | 127.0.0.1 | 64 | 127.0.0.1 Executing forward_request [shard 1] | 2022-01-27 22:53:08.770519 | 127.0.0.1 | -- | 127.0.0.1 read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770528 | 127.0.0.1 | 9 | 127.0.0.1 Start querying token range ({-4242912715832118944, end}, {-4075408479358018994, end}] [shard 1] | 2022-01-27 22:53:08.770531 | 127.0.0.1 | 12 | 127.0.0.1 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770537 | 127.0.0.1 | 18 | 127.0.0.1 Scanning cache for range ({-4242912715832118944, end}, {-4075408479358018994, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770541 | 127.0.0.1 | 22 | 127.0.0.1 Page stats: 12 partition(s), 0 static row(s) (0 live, 0 dead), 12 clustering row(s) (12 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.770589 | 127.0.0.1 | 70 | 127.0.0.1 Sending forward_request to 127.0.0.2:0 [shard 1] | 2022-01-27 22:53:08.770600 | 127.0.0.1 | 149 | 127.0.0.1 Sending forward_request to 127.0.0.3:0 [shard 1] | 2022-01-27 22:53:08.770608 | 127.0.0.1 | 157 | 127.0.0.1 Executing forward_request [shard 0] | 2022-01-27 22:53:08.770627 | 127.0.0.1 | -- | 127.0.0.1 read_data: querying locally [shard 0] | 2022-01-27 22:53:08.770639 | 127.0.0.1 | 11 | 127.0.0.1 Start querying token range ({2507462623645193091, end}, {3897266736829642805, end}] [shard 0] | 2022-01-27 22:53:08.770643 | 127.0.0.1 | 15 | 127.0.0.1 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.770646 | 127.0.0.1 | 19 | 127.0.0.1 Scanning cache for range ({2507462623645193091, end}, {3897266736829642805, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.770649 | 127.0.0.1 | 22 | 127.0.0.1 Executing forward_request [shard 1] | 2022-01-27 22:53:08.770658 | 127.0.0.2 | -- | 127.0.0.1 Executing forward_request [shard 1] | 2022-01-27 22:53:08.770674 | 127.0.0.3 | 5 | 127.0.0.1 read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770698 | 127.0.0.2 | 40 | 127.0.0.1 Start querying token range [{4611686018427387904, start}, {5592106830937975806, end}] [shard 1] | 2022-01-27 22:53:08.770704 | 127.0.0.2 | 46 | 127.0.0.1 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770710 | 127.0.0.2 | 52 | 127.0.0.1 read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770712 | 127.0.0.3 | 43 | 127.0.0.1 Scanning cache for range [{4611686018427387904, start}, {5592106830937975806, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770714 | 127.0.0.2 | 56 | 127.0.0.1 Start querying token range [{-4611686018427387904, start}, {-4242912715832118944, end}] [shard 1] | 2022-01-27 22:53:08.770718 | 127.0.0.3 | 49 | 127.0.0.1 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770739 | 127.0.0.3 | 70 | 127.0.0.1 Scanning cache for range [{-4611686018427387904, start}, {-4242912715832118944, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770743 | 127.0.0.3 | 73 | 127.0.0.1 Page stats: 17 partition(s), 0 static row(s) (0 live, 0 dead), 17 clustering row(s) (17 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.770814 | 127.0.0.3 | 145 | 127.0.0.1 Executing forward_request [shard 0] | 2022-01-27 22:53:08.770846 | 127.0.0.3 | -- | 127.0.0.1 read_data: querying locally [shard 0] | 2022-01-27 22:53:08.770862 | 127.0.0.3 | 16 | 127.0.0.1 Page stats: 71 partition(s), 0 static row(s) (0 live, 0 dead), 71 clustering row(s) (71 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.770865 | 127.0.0.1 | 238 | 127.0.0.1 Start querying token range ({-6683686776653114062, end}, {-6473446911791631266, end}] [shard 0] | 2022-01-27 22:53:08.770867 | 127.0.0.3 | 21 | 127.0.0.1 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.770874 | 127.0.0.3 | 28 | 127.0.0.1 Scanning cache for range ({-6683686776653114062, end}, {-6473446911791631266, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.770879 | 127.0.0.3 | 33 | 127.0.0.1 Page stats: 48 partition(s), 0 static row(s) (0 live, 0 dead), 48 clustering row(s) (48 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.770880 | 127.0.0.2 | 222 | 127.0.0.1 Querying is done [shard 1] | 2022-01-27 22:53:08.770888 | 127.0.0.1 | 369 | 127.0.0.1 read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770909 | 127.0.0.1 | 390 | 127.0.0.1 Start querying token range ({-4075408479358018994, end}, {-3391415989210253693, end}] [shard 1] | 2022-01-27 22:53:08.770911 | 127.0.0.1 | 392 | 127.0.0.1 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.770914 | 127.0.0.1 | 395 | 127.0.0.1 Scanning cache for range ({-4075408479358018994, end}, {-3391415989210253693, end}] and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.770936 | 127.0.0.1 | 418 | 127.0.0.1 Executing forward_request [shard 0] | 2022-01-27 22:53:08.770951 | 127.0.0.2 | -- | 127.0.0.1 read_data: querying locally [shard 0] | 2022-01-27 22:53:08.770966 | 127.0.0.2 | 15 | 127.0.0.1 Page stats: 12 partition(s), 0 static row(s) (0 live, 0 dead), 12 clustering row(s) (12 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.770969 | 127.0.0.3 | 123 | 127.0.0.1 Start querying token range (-inf, {-6683686776653114062, end}] [shard 0] | 2022-01-27 22:53:08.770969 | 127.0.0.2 | 18 | 127.0.0.1 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.770974 | 127.0.0.2 | 23 | 127.0.0.1 Scanning cache for range (-inf, {-6683686776653114062, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.770977 | 127.0.0.2 | 26 | 127.0.0.1 Querying is done [shard 1] | 2022-01-27 22:53:08.770993 | 127.0.0.3 | 324 | 127.0.0.1 read_data: querying locally [shard 1] | 2022-01-27 22:53:08.770998 | 127.0.0.3 | 329 | 127.0.0.1 Start querying token range ({-3391415989210253693, end}, {0, start}) [shard 1] | 2022-01-27 22:53:08.771001 | 127.0.0.3 | 332 | 127.0.0.1 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.771004 | 127.0.0.3 | 335 | 127.0.0.1 Scanning cache for range ({-3391415989210253693, end}, {0, start}) and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.771007 | 127.0.0.3 | 338 | 127.0.0.1 Page stats: 48 partition(s), 0 static row(s) (0 live, 0 dead), 48 clustering row(s) (48 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.771044 | 127.0.0.1 | 525 | 127.0.0.1 Querying is done [shard 0] | 2022-01-27 22:53:08.771069 | 127.0.0.1 | 442 | 127.0.0.1 On shard execution result is [71] [shard 0] | 2022-01-27 22:53:08.771145 | 127.0.0.1 | 518 | 127.0.0.1 Querying is done [shard 1] | 2022-01-27 22:53:08.771308 | 127.0.0.1 | 789 | 127.0.0.1 On shard execution result is [60] [shard 1] | 2022-01-27 22:53:08.771351 | 127.0.0.1 | 832 | 127.0.0.1 Page stats: 127 partition(s), 0 static row(s) (0 live, 0 dead), 127 clustering row(s) (127 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.771379 | 127.0.0.2 | 427 | 127.0.0.1 Page stats: 183 partition(s), 0 static row(s) (0 live, 0 dead), 183 clustering row(s) (183 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.771385 | 127.0.0.3 | 716 | 127.0.0.1 Querying is done [shard 0] | 2022-01-27 22:53:08.771402 | 127.0.0.3 | 556 | 127.0.0.1 Querying is done [shard 1] | 2022-01-27 22:53:08.771403 | 127.0.0.2 | 745 | 127.0.0.1 read_data: querying locally [shard 1] | 2022-01-27 22:53:08.771408 | 127.0.0.2 | 750 | 127.0.0.1 read_data: querying locally [shard 0] | 2022-01-27 22:53:08.771409 | 127.0.0.3 | 563 | 127.0.0.1 Start querying token range ({5592106830937975806, end}, +inf) [shard 1] | 2022-01-27 22:53:08.771411 | 127.0.0.2 | 754 | 127.0.0.1 Start querying token range ({-6272011798787969456, end}, {-4611686018427387904, start}) [shard 0] | 2022-01-27 22:53:08.771412 | 127.0.0.3 | 566 | 127.0.0.1 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.771415 | 127.0.0.3 | 569 | 127.0.0.1 Creating shard reader on shard: 1 [shard 1] | 2022-01-27 22:53:08.771415 | 127.0.0.2 | 757 | 127.0.0.1 Scanning cache for range ({5592106830937975806, end}, +inf) and slice {(-inf, +inf)} [shard 1] | 2022-01-27 22:53:08.771419 | 127.0.0.2 | 761 | 127.0.0.1 Scanning cache for range ({-6272011798787969456, end}, {-4611686018427387904, start}) and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.771419 | 127.0.0.3 | 573 | 127.0.0.1 Received forward_result=[131] from 127.0.0.1:0 [shard 1] | 2022-01-27 22:53:08.771454 | 127.0.0.1 | 1003 | 127.0.0.1 Page stats: 74 partition(s), 0 static row(s) (0 live, 0 dead), 74 clustering row(s) (74 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.771764 | 127.0.0.3 | 918 | 127.0.0.1 read_data: querying locally [shard 0] | 2022-01-27 22:53:08.771768 | 127.0.0.3 | 922 | 127.0.0.1 Start querying token range [{0, start}, {2507462623645193091, end}] [shard 0] | 2022-01-27 22:53:08.771771 | 127.0.0.3 | 925 | 127.0.0.1 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.771775 | 127.0.0.3 | 929 | 127.0.0.1 Scanning cache for range [{0, start}, {2507462623645193091, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.771779 | 127.0.0.3 | 933 | 127.0.0.1 Querying is done [shard 1] | 2022-01-27 22:53:08.771935 | 127.0.0.3 | 1265 | 127.0.0.1 Querying is done [shard 0] | 2022-01-27 22:53:08.771950 | 127.0.0.2 | 998 | 127.0.0.1 read_data: querying locally [shard 0] | 2022-01-27 22:53:08.771956 | 127.0.0.2 | 1004 | 127.0.0.1 Start querying token range ({-6473446911791631266, end}, {-6272011798787969456, end}] [shard 0] | 2022-01-27 22:53:08.771959 | 127.0.0.2 | 1008 | 127.0.0.1 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.771963 | 127.0.0.2 | 1011 | 127.0.0.1 Scanning cache for range ({-6473446911791631266, end}, {-6272011798787969456, end}] and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.771966 | 127.0.0.2 | 1014 | 127.0.0.1 Page stats: 13 partition(s), 0 static row(s) (0 live, 0 dead), 13 clustering row(s) (13 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.772008 | 127.0.0.2 | 1057 | 127.0.0.1 read_data: querying locally [shard 0] | 2022-01-27 22:53:08.772012 | 127.0.0.2 | 1061 | 127.0.0.1 Start querying token range ({3897266736829642805, end}, {4611686018427387904, start}) [shard 0] | 2022-01-27 22:53:08.772014 | 127.0.0.2 | 1063 | 127.0.0.1 Creating shard reader on shard: 0 [shard 0] | 2022-01-27 22:53:08.772016 | 127.0.0.2 | 1065 | 127.0.0.1 Scanning cache for range ({3897266736829642805, end}, {4611686018427387904, start}) and slice {(-inf, +inf)} [shard 0] | 2022-01-27 22:53:08.772019 | 127.0.0.2 | 1067 | 127.0.0.1 On shard execution result is [200] [shard 1] | 2022-01-27 22:53:08.772053 | 127.0.0.3 | 1384 | 127.0.0.1 Page stats: 56 partition(s), 0 static row(s) (0 live, 0 dead), 56 clustering row(s) (56 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.772138 | 127.0.0.2 | 1186 | 127.0.0.1 Page stats: 190 partition(s), 0 static row(s) (0 live, 0 dead), 190 clustering row(s) (190 live, 0 dead) and 0 range tombstone(s) [shard 1] | 2022-01-27 22:53:08.772364 | 127.0.0.2 | 1706 | 127.0.0.1 Page stats: 149 partition(s), 0 static row(s) (0 live, 0 dead), 149 clustering row(s) (149 live, 0 dead) and 0 range tombstone(s) [shard 0] | 2022-01-27 22:53:08.772407 | 127.0.0.3 | 1561 | 127.0.0.1 Querying is done [shard 0] | 2022-01-27 22:53:08.772417 | 127.0.0.3 | 1571 | 127.0.0.1 Querying is done [shard 1] | 2022-01-27 22:53:08.772418 | 127.0.0.2 | 1760 | 127.0.0.1 Querying is done [shard 0] | 2022-01-27 22:53:08.772426 | 127.0.0.2 | 1475 | 127.0.0.1 Querying is done [shard 0] | 2022-01-27 22:53:08.772428 | 127.0.0.2 | 1476 | 127.0.0.1 Querying is done [shard 0] | 2022-01-27 22:53:08.772449 | 127.0.0.3 | 1604 | 127.0.0.1 On shard execution result is [196] [shard 0] | 2022-01-27 22:53:08.772555 | 127.0.0.2 | 1603 | 127.0.0.1 On shard execution result is [238] [shard 1] | 2022-01-27 22:53:08.772674 | 127.0.0.2 | 2016 | 127.0.0.1 On shard execution result is [235] [shard 0] | 2022-01-27 22:53:08.772770 | 127.0.0.3 | 1924 | 127.0.0.1 Received forward_result=[435] from 127.0.0.3:0 [shard 1] | 2022-01-27 22:53:08.772933 | 127.0.0.1 | 2482 | 127.0.0.1 Received forward_result=[434] from 127.0.0.2:0 [shard 1] | 2022-01-27 22:53:08.773110 | 127.0.0.1 | 2658 | 127.0.0.1 Merged result is [1000] [shard 1] | 2022-01-27 22:53:08.773111 | 127.0.0.1 | 2660 | 127.0.0.1 Done processing - preparing a result [shard 1] | 2022-01-27 22:53:08.773114 | 127.0.0.1 | 2663 | 127.0.0.1 Request complete | 2022-01-27 22:53:08.772666 | 127.0.0.1 | 2666 | 127.0.0.1 ``` Fixes #1385 Closes #9209 * github.com:scylladb/scylla: docs: add parallel aggregations design doc db: config: add a flag to disable new parallelized aggregation algorithm test: add parallelized select count test forward_service: add metrics forward_service: parallelize execution across shards forward_service: add tracing cql3: statements: introduce parallelized_select_statement cql3: query_processor: add forward_service reference to query_processor gms: add PARALLELIZED_AGGREGATION feature service: introduce forward_service storage_proxy: extract query_ranges_to_vnodes_generator to a separate file messaging_service: add verb for count(*) request forwarding cql3: selection: detect if a selection represents count(*)
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
This PR extends #9209. It consists of 2 main points: To enable parallelization of user-defined aggregates, reduction function was added to UDA definition. Reduction function is optional and it has to be scalar function that takes 2 arguments with type of UDA's state and returns UDA's state All currently implemented native aggregates got their reducible counterpart, which return their state as final result, so it can be reduced with other result. Hence all native aggregates can now be distributed. Local 3-node cluster made with current master. `node1` updated to this branch. Accessing node with `ccm <node-name> cqlsh` I've tested belowed things from both old and new node: - creating UDA with reduce function - not allowed - selecting count(*) - distributed - selecting other aggregate function - not distributed Fixes: #10224 Closes #10295 * github.com:scylladb/scylla: test: add tests for parallelized aggregates test: cql3: Add UDA REDUCEFUNC test forward_service: enable multiple selection forward_service: support UDA and native aggregate parallelization cql3:functions: Add cql3::functions::functions::mock_get() cql3: selection: detect parallelize reduction type db,cql3: Move part of cql3's function into db selection: detect if selectors factory contains only simple selectors cql3: reducible aggregates DB: Add `scylla_aggregates` system table db,gms: Add SCYLLA_AGGREGATES schema features CQL3: Add reduce function to UDA gms: add UDA_NATIVE_PARALLELIZED_AGGREGATION feature
Enables parallelization of UDA and native aggregates. The way the query is parallelized is the same as in scylladb#9209. Separate reduction type for `COUNT(*)` is left for compatibility reason.
This pull request speeds up execution of
count(*)
queries. It does so by splitting given query into sub-queries and distributing them across some group of nodes for parallel execution.Design
New level of coordination was added. Node called super-coordinator splits aggregation query into sub-queries and distributes them across some group of coordinators. Super-coordinator is also responsible for merging results.
Detection
To develop a mechanism for speeding up
count(*)
queries, there was a need to detect which queries have acount(*)
selector. Due to this pull request being a proof of concept, detection was realized rather poorly. It is only allows catching the simplest cases ofcount(*)
queries (with only one selector and no column name specified).Delegation
After detecting that a query is a
count(*)
it should be split into sub-queries and sent to another coordinators. Splitting part wasn't that difficult, it has been achieved by limiting original query's partition ranges. Sending modified query to another node was much harder. The easiest scenario would be to send wholecql3::statements::select_statement
. Unfortunatelycql3::statements::select_statement
can't be [de]serialized, so sending it was out of the question. Even more unfortunately, some non-[de]serializable members ofcql3::statements::select_statement
are required to start the execution process of this statement. Finally, I have decided to send aquery::read_command
paired with required [de]serializable members. Objects, that cannot be [de]serialized (such as query's selector) are mocked on the receiving end.Distributing
When a super-coordinator receives a
count(*)
query, it splits it into sub-queries. It does so, by splitting original query's partition ranges into list of vnodes, grouping them by their owner and creating sub-queries with partition ranges set to successive results of such grouping. After creation, each sub-query is sent to the owner of its partition ranges. Owner dispatches received sub-query to all of its shards. Shards slice partition ranges of the received sub-query, so that they will only query data that is owned by them. Each shard becomes a coordinator and executes so prepared sub-query.Benchmark
Method
3 node cluster set up on powerful desktops located in the office (3x32 cores)
Filled the cluster with ~2 * 10^8 rows using scylla-bench and run:
Results
Extra
Trace output
3 node cluster (each node had 2 shards,
murmur3_ignore_msb_bits
was set to 1,num_tokens
was set to 3)Issues
Fixes #1385