Skip to content

Commit

Permalink
Merge 'database, storage_proxy: Reconcile pages with dead rows and pa…
Browse files Browse the repository at this point in the history
…rtitions incrementally' from Botond Dénes

Currently, mutation query on replica side will not respond with a result which doesn't have at least one live row. This causes problems if there is a lot of dead rows or partitions before we reach a live row, which stem from the fact that resulting reconcilable_result will be large:

1. Large allocations.  Serialization of reconcilable_result causes large allocations for storing result rows in std::deque
2. Reactor stalls. Serialization of reconcilable_result on the replica side and on the coordinator side causes reactor stalls. This impacts not only the query at hand. For 1M dead rows, freezing takes 130ms, unfreezing takes 500ms. Coordinator  does multiple freezes and unfreezes. The reactor stall on the coordinator side is >5s
3. Too large repair mutations. If reconciliation works on large pages, repair may fail due to too large mutation size. 1M dead rows is already too much: Refs #9111.

This patch fixes all of the above by making mutation reads respect the memory accounter's limit for the page size, even for dead rows.

This patch also addresses the problem of client-side timeouts during paging. Reconciling queries processing long strings of tombstones will now properly page tombstones,like regular queries do.

My testing shows that this solution even increases efficiency. I tested with a cluster of 2 nodes, and a table of RF=2. The data layout was as follows (1 partition):
* Node1: 1 live row, 1M dead rows
* Node2: 1M dead rows, 1 live row

This was designed to trigger reconciliation right from the very start of the query.

Before:
```
Running query (node2, CL=ONE, cold cache)
Query done, duration: 140.0633503ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)]
Running query (node2, CL=ONE, hot cache)
Query done, duration: 66.7195275ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)]
Running query (all-nodes, CL=ALL, reconcile, cold-cache)
Query done, duration: 873.5400742ms, pages: 2, result: [Row(pk=0, ck=0, v=0), Row(pk=0, ck=3000000, v=0)]
```

After:
```
Running query (node2, CL=ONE, cold cache)
Query done, duration: 136.9035122ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)]
Running query (node2, CL=ONE, hot cache)
Query done, duration: 69.5286021ms, pages: 101, result: [Row(pk=0, ck=3000000, v=0)]
Running query (all-nodes, CL=ALL, reconcile, cold-cache)
Query done, duration: 162.6239498ms, pages: 100, result: [Row(pk=0, ck=0, v=0), Row(pk=0, ck=3000000, v=0)]
```

Non-reconciling queries have almost identical duration (1 few ms changes can be observed between runs). Note how in the after case, the reconciling read also produces 100 pages, vs. just 2 pages in the before case, leading to a much lower duration (less than 1/4 of the before).

Refs #7929
Refs #3672
Refs #7933
Fixes #9111

Closes #15414

* github.com:scylladb/scylladb:
  test/topology_custom: add test_read_repair.py
  replica/mutation_dump: detect end-of-page in range-scans
  tools/scylla-sstable: write: abort parser thread if writing fails
  test/pylib: add REST methods to get node exe and workdir paths
  test/pylib/rest_client: add load_new_sstables, keyspace_{flush,compaction}
  service/storage_proxy: add trace points for the actual read executor type
  service/storage_proxy: add trace points for read-repair
  storage_proxy: Add more trace-level logging to read-repair
  database: Fix accounting of small partitions in mutation query
  database, storage_proxy: Reconcile pages with no live rows incrementally
  • Loading branch information
avikivity committed Oct 5, 2023
2 parents 197b759 + 4acde0f commit 854188a
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 9 deletions.
14 changes: 13 additions & 1 deletion mutation/mutation_partition.cc
Expand Up @@ -2200,6 +2200,7 @@ void reconcilable_result_builder::consume_new_partition(const dht::decorated_key
_static_row_is_alive = false;
_live_rows = 0;
_mutation_consumer.emplace(streamed_mutation_freezer(_schema, dk.key(), _reversed));
_used_at_entry = _memory_accounter.used_memory();
}

void reconcilable_result_builder::consume(tombstone t) {
Expand All @@ -2220,7 +2221,7 @@ stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tom
}
_live_rows += is_alive;
auto stop = _memory_accounter.update_and_check(cr.memory_usage(_schema));
if (is_alive) {
if (is_alive || _slice.options.contains<query::partition_slice::option::allow_mutation_read_page_without_live_row>()) {
// We are considering finishing current read only after consuming a
// live clustering row. While sending a single live row is enough to
// guarantee progress, not ending the result on a live row would
Expand Down Expand Up @@ -2261,6 +2262,17 @@ stop_iteration reconcilable_result_builder::consume_end_of_partition() {
}
_total_live_rows += _live_rows;
_result.emplace_back(partition { _live_rows, _mutation_consumer->consume_end_of_stream() });

auto accounted = _memory_accounter.used_memory() - _used_at_entry;
auto actually_used = sizeof(partition) + _result.back().mut().representation().size();
if (actually_used > accounted) {
_memory_accounter.update(actually_used - accounted);
}

if (_slice.options.contains<query::partition_slice::option::allow_mutation_read_page_without_live_row>()) {
_stop = _stop || _memory_accounter.check();
}

return _stop;
}

Expand Down
1 change: 1 addition & 0 deletions mutation_query.hh
Expand Up @@ -133,6 +133,7 @@ class reconcilable_result_builder {
uint64_t _live_rows{};
// make this the last member so it is destroyed first. #7240
utils::chunked_vector<partition> _result;
size_t _used_at_entry;

private:
stop_iteration consume(range_tombstone&& rt);
Expand Down
8 changes: 7 additions & 1 deletion query-request.hh
Expand Up @@ -177,6 +177,11 @@ public:
// directly, bypassing the intermediate reconcilable_result format used
// in pre 4.5 range scans.
range_scan_data_variant,
// When set, mutation query can end a page even if there is no live row in the
// final reconcilable_result. This prevents exchanging large pages when there
// is a lot of dead rows. This flag is needed during rolling upgrades to support
// old coordinators which do not tolerate pages with no live rows.
allow_mutation_read_page_without_live_row,
};
using option_set = enum_set<super_enum<option,
option::send_clustering_key,
Expand All @@ -191,7 +196,8 @@ public:
option::with_digest,
option::bypass_cache,
option::always_return_static_content,
option::range_scan_data_variant>>;
option::range_scan_data_variant,
option::allow_mutation_read_page_without_live_row>>;
clustering_row_ranges _row_ranges;
public:
column_id_vector static_columns; // TODO: consider using bitmap
Expand Down
6 changes: 5 additions & 1 deletion replica/mutation_dump.cc
Expand Up @@ -604,7 +604,11 @@ future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
std::rethrow_exception(std::move(ex));
}

dk_opt = co_await partition_key_generator();
if (compaction_state->are_limits_reached() || qs.builder.is_short_read()) {
dk_opt = {};
} else {
dk_opt = co_await partition_key_generator();
}
}

co_return make_lw_shared<query::result>(qs.builder.build(compaction_state->current_full_position()));
Expand Down
21 changes: 18 additions & 3 deletions service/storage_proxy.cc
Expand Up @@ -4980,6 +4980,8 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
data_resolver_ptr data_resolver = ::make_shared<data_read_resolver>(_schema, cl, _targets.size(), timeout);
auto exec = shared_from_this();

cmd->slice.options.set<query::partition_slice::option::allow_mutation_read_page_without_live_row>();

// Waited on indirectly.
make_mutation_data_requests(cmd, data_resolver, _targets.begin(), _targets.end(), timeout);

Expand All @@ -5003,13 +5005,18 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec

// We generate a retry if at least one node reply with count live columns but after merge we have less
// than the total number of column we are interested in (which may be < count on a retry).
// So in particular, if no host returned count live columns, we know it's not a short read.
bool can_send_short_read = rr_opt && rr_opt->is_short_read() && rr_opt->row_count() > 0;
if (rr_opt && (can_send_short_read || data_resolver->all_reached_end() || rr_opt->row_count() >= original_row_limit()
// So in particular, if no host returned count live columns, we know it's not a short read due to
// row or partition limits being exhausted and retry is not needed.
if (rr_opt && (rr_opt->is_short_read()
|| data_resolver->all_reached_end()
|| rr_opt->row_count() >= original_row_limit()
|| data_resolver->live_partition_count() >= original_partition_limit())
&& !data_resolver->any_partition_short_read()) {
tracing::trace(_trace_state, "Read stage is done for read-repair");
mlogger.trace("reconciled: {}", rr_opt->pretty_printer(_schema));
auto result = ::make_foreign(::make_lw_shared<query::result>(
co_await to_data_query_result(std::move(*rr_opt), _schema, _cmd->slice, _cmd->get_row_limit(), cmd->partition_limit)));
qlogger.trace("reconciled: {}", result->pretty_printer(_schema, _cmd->slice));
// wait for write to complete before returning result to prevent multiple concurrent read requests to
// trigger repair multiple times and to prevent quorum read to return an old value, even after a quorum
// another read had returned a newer value (but the newer value had not yet been sent to the other replicas)
Expand All @@ -5032,6 +5039,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
on_read_resolved();
});
} else {
tracing::trace(_trace_state, "Not enough data, need a retry for read-repair");
_proxy->get_stats().read_retries++;
_retry_cmd = make_lw_shared<query::read_command>(*cmd);
// We asked t (= cmd->get_row_limit()) live columns and got l (=data_resolver->total_live_count) ones.
Expand Down Expand Up @@ -5138,6 +5146,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
exec->_targets.erase(i, exec->_targets.end());
}
}
tracing::trace(exec->_trace_state, "digest mismatch, starting read repair");
exec->reconcile(exec->_cl, timeout);
exec->_proxy->get_stats().read_repair_repaired_blocking++;
}
Expand Down Expand Up @@ -5343,13 +5352,15 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
// Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
if (retry_type == speculative_retry::type::NONE || block_for == all_replicas.size()
|| (repair_decision == db::read_repair_decision::DC_LOCAL && is_datacenter_local(cl) && block_for == target_replicas.size())) {
tracing::trace(trace_state, "Creating never_speculating_read_executor - speculative retry is disabled or there are no extra replicas to speculate with");
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
}

if (target_replicas.size() == all_replicas.size()) {
// CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
// We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
// (same amount of requests in total, but we turn 1 digest request into a full blown data request).
tracing::trace(trace_state, "always_speculating_read_executor (all targets)");
return ::make_shared<always_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
}

Expand All @@ -5358,16 +5369,20 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
auto local_dc_filter = erm->get_topology().get_local_dc_filter();
if (!extra_replica || (is_datacenter_local(cl) && !local_dc_filter(*extra_replica))) {
slogger.trace("read executor no extra target to speculate");
tracing::trace(trace_state, "Creating never_speculating_read_executor - there are no extra replicas to speculate with");
return ::make_shared<never_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
} else {
target_replicas.push_back(*extra_replica);
slogger.trace("creating read executor with extra target {}", *extra_replica);
tracing::trace(trace_state, "Added extra target {} for speculative read", *extra_replica);
}
}

if (retry_type == speculative_retry::type::ALWAYS) {
tracing::trace(trace_state, "Creating always_speculating_read_executor");
return ::make_shared<always_speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
} else {// PERCENTILE or CUSTOM.
tracing::trace(trace_state, "Creating speculating_read_executor");
return ::make_shared<speculating_read_executor>(schema, cf, p, std::move(erm), cmd, std::move(pr), cl, block_for, std::move(target_replicas), std::move(trace_state), std::move(permit), rate_limit_info);
}
}
Expand Down
27 changes: 27 additions & 0 deletions test/cql-pytest/test_select_from_mutation_fragments.py
Expand Up @@ -418,3 +418,30 @@ def test_ck_in_query(cql, test_table):
for col_name, expected_value in zip(columns, expected_row):
assert hasattr(row, col_name)
assert getattr(row, col_name) == expected_value


def test_many_partitions(cql, test_keyspace, scylla_only):
num_partitions = 5000
with util.new_test_table(cql, test_keyspace, 'pk int PRIMARY KEY, v int') as table:
delete_id = cql.prepare(f"DELETE FROM {table} WHERE pk = ?")
for pk in range(num_partitions):
cql.execute(delete_id, (pk,))

res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({table})"))
pks = set()
partition_starts = 0
partition_ends = 0
for row in res:
assert row.pk >= 0 and row.pk < num_partitions
if row.mutation_fragment_kind == "partition start":
partition_starts += 1
pks.add(row.pk)
elif row.mutation_fragment_kind == "partition end":
partition_ends += 1
assert row.pk in pks
else:
pytest.fail(f"Unexpected mutation fragment kind: {row.mutation_fragment_kind}")

assert partition_starts == num_partitions
assert partition_ends == num_partitions
assert len(pks) == num_partitions
6 changes: 6 additions & 0 deletions test/pylib/manager_client.py
Expand Up @@ -310,3 +310,9 @@ async def server_open_log(self, server_id: ServerNum) -> ScyllaLogFile:
logger.debug("ManagerClient getting log filename for %s", server_id)
log_filename = await self.client.get_text(f"/cluster/server/{server_id}/get_log_filename")
return ScyllaLogFile(self.thread_pool, log_filename)

async def server_get_workdir(self, server_id: ServerNum) -> str:
return await self.client.get_text(f"/cluster/server/{server_id}/workdir")

async def server_get_exe(self, server_id: ServerNum) -> str:
return await self.client.get_text(f"/cluster/server/{server_id}/exe")
18 changes: 18 additions & 0 deletions test/pylib/rest_client.py
Expand Up @@ -234,6 +234,24 @@ async def flush_keyspace(self, node_ip: str, ks: str) -> None:
"""Flush keyspace"""
await self.client.post(f"/storage_service/keyspace_flush/{ks}", host=node_ip)

async def load_new_sstables(self, node_ip: str, keyspace: str, table: str) -> None:
"""Load sstables from upload directory"""
await self.client.post(f"/storage_service/sstables/{keyspace}?cf={table}", host=node_ip)

async def keyspace_flush(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None:
"""Flush the specified or all tables in the keyspace"""
url = f"/storage_service/keyspace_flush/{keyspace}"
if table is not None:
url += "?cf={table}"
await self.client.post(url, host=node_ip)

async def keyspace_compaction(self, node_ip: str, keyspace: str, table: Optional[str] = None) -> None:
"""Compact the specified or all tables in the keyspace"""
url = f"/storage_service/keyspace_compaction/{keyspace}"
if table is not None:
url += "?cf={table}"
await self.client.post(url, host=node_ip)


class ScyllaMetrics:
def __init__(self, lines: list[str]):
Expand Down
20 changes: 18 additions & 2 deletions test/pylib/scylla_cluster.py
Expand Up @@ -1069,6 +1069,8 @@ def add_put(route: str, handler: Callable):
add_put('/cluster/server/{server_id}/update_config', self._server_update_config)
add_put('/cluster/server/{server_id}/change_ip', self._server_change_ip)
add_get('/cluster/server/{server_id}/get_log_filename', self._server_get_log_filename)
add_get('/cluster/server/{server_id}/workdir', self._server_get_workdir)
add_get('/cluster/server/{server_id}/exe', self._server_get_exe)

async def _manager_up(self, _request) -> aiohttp.web.Response:
return aiohttp.web.Response(text=f"{self.is_running}")
Expand Down Expand Up @@ -1279,13 +1281,27 @@ async def _server_change_ip(self, request: aiohttp.web.Request) -> aiohttp.web.R
ip_addr = await self.cluster.change_ip(server_id)
return aiohttp.web.json_response({"ip_addr": ip_addr})

async def _server_get_log_filename(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
async def _server_get_attribute(self, request: aiohttp.web.Request, attribute: str) -> aiohttp.web.Response:
"""Generic request handler which gets a particular attribute of a ScyllaServer instance
To be used to implement concrete handlers, not for direct use.
"""
assert self.cluster
server_id = ServerNum(int(request.match_info["server_id"]))
server = self.cluster.servers[server_id]
if not server:
return aiohttp.web.Response(status=404, text=f"Server {server_id} unknown")
return aiohttp.web.Response(text=f"{server.log_filename}")
return aiohttp.web.Response(text=f"{getattr(server, attribute)}")

async def _server_get_log_filename(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
return await self._server_get_attribute(request, "log_filename")

async def _server_get_workdir(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
return await self._server_get_attribute(request, "workdir")

async def _server_get_exe(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
return await self._server_get_attribute(request, "exe")



@asynccontextmanager
Expand Down

0 comments on commit 854188a

Please sign in to comment.