Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement fast streaming for intra-node migration #1

Open
wants to merge 54 commits into
base: intranode-migration
Choose a base branch
from

Conversation

raphaelsc
Copy link

With intra-node migration, all the movement is local, so we can make streaming faster by just cloning the sstable set of leaving replica and loading it into the pending one.

This cloning is underlying storage specific, but s3 doesn't support snapshot() yet (th sstables::storage procedure which clone is built upon). It's only supported by file system, with help of hard links. A new generation is picked for new cloned sstable, and it will live in the same directory as the original.

A challenge I bumped into was to understand why table refused to load the sstable at pending replica, as it considered them foreign. Later I realized that sharder (for reads) at this point of migration will point only to leaving replica. It didn't fail with mutation based streaming, because the sstable writer considers the shard that the sstable was written into as its owner, regardless of what sharder says. That was fixed by mimicking this behavior during loading at pending.

test:
./test.py --mode=dev intranode --repeat=100 passes.

balance_tablets() is invoked in a loop, so only the first call will
see non-empty skiplist.

This bug starts to manifest after adding intra-node migration plan,
causing failures of the test_load_balancing_with_skiplist test
case. The reason is that rebalancing will now require multiple passes
before convergence is reached, due to intra-node migrations, and later
calls will not see the skiplist and try to balance skipped nodes,
vioating test's assertions.
We need a separate transition kind for intra node migration so that we
don't have to recover this information from replica set in an
expensive way. This information is needed in the hot path - in
effective_replicaiton_map, to not return the pending tablet replica to
the coordinator. From its perspective, replica set is not
transitional.

The transition will also be used to alter the behavior of the
sharder. When not in intra-node migration, the sharder should
advertise the shard which is either in the previous or next replica
set. During intra-node migration, that's not possible as there may be
two such shards. So it will return the shard according to the current
read selector.
Tablet sharder is adjusted to handle intra-migration where a tablet
can have two replicas on the same host. For reads, sharder uses the
read selector to resolve the conflict. For writes, the write selector
is used.

The old shard_of() API is kept to represent shard for reads, and new
method is introduced to query the shards for writing:
shard_for_writes(). All writers should be switched to that API, which
is not done in this patch yet.

The request handler on replica side acts as a second-level
coordinator, using sharder to determine routing to shards. A given
sharder has a scope of a single topology version, a single
effective_replication_map_ptr, which should be kept alive during
writes.
tablet_map::get_shard() will go away as it is not prepared for
intra-node migration.
In preparation for intra-node migration for which get_shard() is not
prepared.
Its semantics do not fit well with intra-node migration which allow
two owning shards. Replace uses with the new has_replica() API.
When sharder says that the write should go to multiple shards,
we need to consider the write as applied only if it was applied
to all those shards.

This can happen during intra-node tablet migration. During such migration,
the request coordinator on storage_proxy side is coordinating to hosts
as if no migration was in progress. The replica-side coordinator coordinates
to shards based on sharder response.

One way to think about it is that
effective_replication_map::get_natural_endpoints()/get_pending_endpoints()
tells how to coordinate between nodes, and sharder tells how to
coordinate between shards. Both work with some snapshot of tablet
metadata, which should be kept alive around the operation. Sharder is
associated with its own effective_replication_map, which marks the
topology version as used and allows barriers to synchronize with
replica-side operations.
This writer is used by streaming, on tablet migration and
load-and-stream.

The caller of distribute_reader_and_consume_on_shards(), which provides
a sharder, is supposed to ensure that effective_replication_map is kept
alive around it, in order for topology coordinator to wait for any writes
which may be in flight to reach their shards before tablet replica starts
another migration. This is already the case:

  1) repair and load-and-stream keep the erm around writing.

  2) tablet migration uses autorefreshing_sharder, so it does not, but
     it keeps the topology_guard around the operation in the consumer,
     which serves the same purpose.
During streaming for intra-node migration we want to write only to the
new shard. To achieve that, allow altering write selector in
sharder::shard_for_writes() and per-instance of
auto_refreshing_sharder.
…unctions

Will be needed by member methods which generate migration plans.
target nodes

The node_load datastructure was not updated to reflect migration
decisions on the target node. This is not needed for inter-node
migration because target nodes are not considered as sources. But we
want it to reflect migration decisions so that later inter-node
migration sees an accurate picture with earlier migrations reflected
in node_load.
Currently the load balancer is only generting an inter-node plan, and
the algorithm is embedded in make_plan(). The method will become even
harder to follow once we add more kinds of plan generating steps,
e.g. inter-node plan. Extract the inter-node plan to make it easier to
add other plans and see the grand flow.
Intra-node migrations are scheduled for each node independently with
the aim to equalize per-shard tablet count on each node.

This is needed to avoid severe imbalance between shards which can
happen when some table grows and is split. The inter-node balance can
be equal, so inter-node migration cannot fix the imbalance. Also, if
RF=N then there is not even a possibility of moving tablets around to
fix the imbalance.  The only way to bring the system to balance is to
move tablets within the nodes.

After scheduling inter-node migrations, the algorithm schedules
intra-node migrations. This means that across-node migrations can
proceed in parallel with intra-node migrations if there is free
capacity to carry them out, but across-node migrations have higher
priority.

Fixes scylladb#16594
Existing tests are augmented with a check which verifies that
all nodes are internally balanced.
It would be a waste of effort to do so, since we migrate tablets away
anyway.
Require users to specify whether we want shard for reads or for writes
by switching to appropriate non-deprecated variant.

For example, shard_of() can be replaced with shard_for_reads() or
shard_for_writes().

The next_shard/token_for_next_shard APIs have only for-reads variant,
and the act of switching will be a testimony to the fact that the code
is valid for intra-node migration.
Before the patch, dht::sharder could be instantiated and it would
behave like a static sharder. This is not safe with regards to
extensions of the API because if a derived implementation forgets to
override some method, it would incorrectly default to the
implementation from static sharder. Better to fail the compilation in
this case, so extract static sharder logic to dht::static_sharder
class and make all methods in dht::sharder pure virtual.

This also allows us to have algorithms indicate that they only work
with static sharder by accepting the type, and have compile-time
safety for this requirement.

schema::get_sharder() is changed to return the static_sharder&.
In preparation for tablet intra-node migration.

Existing uses are for reads, so it's safe to use shard_for_reads():
  - in multishard reader
  - in forward_service

The ring_position_range_vector_sharder is used when computing sstable
shards, which for intra-node migration should use the view for
reads. If we haven't completed streaming, sstables should be attached
to the old shard (used by reads). When in write-both-read-new stage,
streaming is complete, reads are using the new shard, and we should
attach sstables to the new shard.

When not in intra-node migration, the view for reads on the pending
node will return the pending shard even if read selector is "read old".
So if pending node restarts during streaming, we will attach to sstables
to the shard which is used by writes even though we're using the selector
for reads.
In preparation for intra-node tablet migration, to avoid
using deprecated sharder APIs.

This function is used for generating sstable sharding metadata.
For tablets, it is not invoked, so we can safely work with the
static sharder. The call site already passes static_sharder only.
All current uses are used in the read path.
… owned by the shard

This check would lead to correctness issues with intra-node migration
because the shard may switch during read, from "read old" to "read
new". If the coordinator used "read old" for shard routing, but table
on the old shard is already using "read new" erm, such a read would
observe empty result, which is wrong.

Drop the optimization. In the scenario above, read will observe all
past writes because:

  1) writes are still using "write both"

  2) writes are switched to "write new" only after all requests which
  might be using "read old" are done

Replica-side coordinators should already route single-key requests to
the correct shard, so it's not important as an optimization.

This issue shows how assumptions about static sharding are embedded in
the current code base and how intra-node migration, by violating those
assumptions, can lead to correctness issues.
There is not tablet migration in unit tests, so shard_of() can be
safely replaced with shard_for_reads(). Even if it's used for writes.
@tgrabiec
Copy link
Owner

Please rebase

replica/table.cc Outdated
utils::chunked_vector<sstables::entry_descriptor> ret;
auto holder = async_gate().hold();

auto& sg = storage_groups()[tid.value()];
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replica/table.cc:1004:32: error: no viable overloaded operator[] for type 'const storage_group_map' (aka 'const flat_hash_map<unsigned long, unique_ptr<storage_group>, Hash<unsigned long>>')
    auto& sg = storage_groups()[tid.value()];
               ~~~~~~~~~~~~~~~~^~~~~~~~~~~~
/usr/include/absl/container/internal/raw_hash_map.h:163:22: note: candidate function not viable: 'this' argument has type 'const storage_group_map' (aka 'const flat_hash_map<unsigned long, unique_ptr<storage_group>, Hash<unsigned long>>'), but method is not marked const
  MappedReference<P> operator[](key_arg<K>&& key) {
                     ^
/usr/include/absl/container/internal/raw_hash_map.h:168:22: note: candidate function not viable: 'this' argument has type 'const storage_group_map' (aka 'const flat_hash_map<unsigned long, unique_ptr<storage_group>, Hash<unsigned long>>'), but method is not marked const
  MappedReference<P> operator[](const key_arg<K>& key) {
                     ^
1 error generated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgrabiec Finished rebasing and retesting.

With intra-node migration, all the movement is local, so we can make
streaming faster by just cloning the sstable set of leaving replica
and loading it into the pending one.

This cloning is underlying storage specific, but s3 doesn't support
snapshot() yet (th sstables::storage procedure which clone is built
upon). It's only supported by file system, with help of hard links.
A new generation is picked for new cloned sstable, and it will
live in the same directory as the original.

A challenge I bumped into was to understand why table refused to
load the sstable at pending replica, as it considered them foreign.
Later I realized that sharder (for reads) at this stage of migration
will point only to leaving replica. It didn't fail with mutation
based streaming, because the sstable writer considers the shard --
that the sstable was written into -- as its owner, regardless of what
sharder says. That was fixed by mimicking this behavior during
loading at pending.

test:
./test.py --mode=dev intranode --repeat=100 passes.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
@raphaelsc raphaelsc force-pushed the intranodemigration-with-faster-streaming branch from 9d04c18 to 4d1e500 Compare April 29, 2024 12:45
@tgrabiec tgrabiec force-pushed the intranode-migration branch 3 times, most recently from 7614ba6 to 366ae01 Compare May 6, 2024 12:40
@tgrabiec tgrabiec force-pushed the intranode-migration branch 2 times, most recently from f0e8eba to 6d32433 Compare May 9, 2024 11:19
tgrabiec pushed a commit that referenced this pull request May 14, 2024
Perevent stalls from "unpacking" of large
canonical mutations seen with test_add_many_nodes_under_load
when called from `group0_state_machine::transfer_snapshot`:

```
  ++[1#1/44 14%] addr=0x395b2f total=569 count=6 avg=95: ?? ??:0
  | ++[2#1/2 56%] addr=0x3991e3 total=321 count=4 avg=80: ?? ??:0
  | ++          - addr=0x1587159:
  | |             std::__new_allocator<seastar::basic_sstring<signed char, unsigned int, 31u, false> >::allocate at /usr/bin/../lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13/bits/new_allocator.h:147
  | |             (inlined by) std::allocator<seastar::basic_sstring<signed char, unsigned int, 31u, false> >::allocate at /usr/bin/../lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13/bits/allocator.h:198
  | |             (inlined by) std::allocator_traits<std::allocator<seastar::basic_sstring<signed char, unsigned int, 31u, false> > >::allocate at /usr/bin/../lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13/bits/alloc_traits.h:482
  | |             (inlined by) std::_Vector_base<seastar::basic_sstring<signed char, unsigned int, 31u, false>, std::allocator<seastar::basic_sstring<signed char, unsigned int, 31u, false> > >::_M_allocate at /usr/bin/../lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13/bits/stl_vector.h:378
  | |             (inlined by) std::vector<seastar::basic_sstring<signed char, unsigned int, 31u, false>, std::allocator<seastar::basic_sstring<signed char, unsigned int, 31u, false> > >::reserve at /usr/bin/../lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13/bits/vector.tcc:79
  | |             (inlined by) ser::idl::serializers::internal::vector_serializer<std::vector<seastar::basic_sstring<signed char, unsigned int, 31u, false>, std::allocator<seastar::basic_sstring<signed char, unsigned int, 31u, false> > > >::read<seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator> > at ././serializer_impl.hh:226
  | |             (inlined by) ser::deserialize<std::vector<seastar::basic_sstring<signed char, unsigned int, 31u, false>, std::allocator<seastar::basic_sstring<signed char, unsigned int, 31u, false> > >, seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator> > at ././serializer.hh:264
  | |             (inlined by) ser::serializer<clustering_key_prefix>::read<seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator> >(seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator>&)::{lambda(auto:1&)#1}::operator()<seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator> > at ./build/dev/gen/idl/keys.dist.impl.hh:31
  | ++          - addr=0x1587085:
  | |             seastar::with_serialized_stream<seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator>, ser::serializer<clustering_key_prefix>::read<seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator> >(seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator>&)::{lambda(auto:1&)#1}, void, void> at ././seastar/include/seastar/core/simple-stream.hh:646
  | |             (inlined by) ser::serializer<clustering_key_prefix>::read<seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator> > at ./build/dev/gen/idl/keys.dist.impl.hh:28
  | |             (inlined by) ser::deserialize<clustering_key_prefix, seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator> > at ././serializer.hh:264
  | |             (inlined by) ser::deletable_row_view::key() const::{lambda(auto:1&)#1}::operator()<seastar::fragmented_memory_input_stream<bytes_ostream::fragment_iterator> const> at ./build/dev/gen/idl/mutation.dist.impl.hh:1268
  | | ++[3#1/1 100%] addr=0x15865a3 total=577 count=7 avg=82:
  | | |              seastar::memory_input_stream<bytes_ostream::fragment_iterator>::with_stream<ser::deletable_row_view::key() const::{lambda(auto:1&)#1}> at ././seastar/include/seastar/core/simple-stream.hh:491
  | | |              (inlined by) seastar::with_serialized_stream<seastar::memory_input_stream<bytes_ostream::fragment_iterator> const, ser::deletable_row_view::key() const::{lambda(auto:1&)#1}, void> at ././seastar/include/seastar/core/simple-stream.hh:639
  | | |              (inlined by) ser::deletable_row_view::key at ./build/dev/gen/idl/mutation.dist.impl.hh:1264
  | |   ++[4#1/1 100%] addr=0x157cf27 total=643 count=8 avg=80:
  | |   |              mutation_partition_view::do_accept<partition_builder> at ./mutation/mutation_partition_view.cc:212
  | |   ++           - addr=0x1516cac:
  | |   |              mutation_partition::apply at ./mutation/mutation_partition.cc:497
  | |     ++[5#1/1 100%] addr=0x14e4433 total=1765 count=22 avg=80:
  | |     |              canonical_mutation::to_mutation at ./mutation/canonical_mutation.cc:60
  | |       ++[6#1/2 98%] addr=0x2452a60 total=1732 count=21 avg=82:
  | |       |             service::storage_service::merge_topology_snapshot at ./service/storage_service.cc:761
  | |       ++          - addr=0x2858782:
  | |       |             service::group0_state_machine::transfer_snapshot at ./service/raft/group0_state_machine.cc:303
```

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
tgrabiec pushed a commit that referenced this pull request May 14, 2024
Freezing large mutations synchronously may cause
reactor stalls, as seen in the test_add_many_nodes_under_load
dtest:
```
  ++[1#1/37 5%] addr=0x15b0bf total=99 count=2 avg=50: ?? ??:0
  | ++[2#1/2 67%] addr=0x15a331f total=66 count=1 avg=66:
  | |             bytes_ostream::write at ././bytes_ostream.hh:248
  | |             (inlined by) bytes_ostream::write at ././bytes_ostream.hh:263
  | |             (inlined by) ser::serialize_integral<unsigned int, bytes_ostream> at ././serializer.hh:203
  | |             (inlined by) ser::integral_serializer<unsigned int>::write<bytes_ostream> at ././serializer.hh:217
  | |             (inlined by) ser::serialize<unsigned int, bytes_ostream> at ././serializer.hh:254
  | |             (inlined by) ser::writer_of_column<bytes_ostream>::write_id at ./build/dev/gen/idl/mutation.dist.impl.hh:4680
  | | ++[3#1/1 100%] addr=0x159df71 total=132 count=2 avg=66:
  | | |              (anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}::operator() at ./mutation/mutation_partition_serializer.cc:99
  | | |              (inlined by) row::maybe_invoke_with_hash<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1} const, cell_and_hash const> at ./mutation/mutation_partition.hh:133
  | | |              (inlined by) row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}::operator() at ./mutation/mutation_partition.hh:152
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true>::operator() at ././utils/compact-radix-tree.hh:1888
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::visit_slot<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true>&> at ././utils/compact-radix-tree.hh:1560
  | | ++           - addr=0x159d84d:
  | | |              compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true>&> at ././utils/compact-radix-tree.hh:1364
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::node_base<cell_and_hash, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::direct_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u> >::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true>&, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::direct_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u> > at ././utils/compact-radix-tree.hh:799
  | | |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::node_base<cell_and_hash, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)1, 4u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)2, 8u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::indirect_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)3, 16u>, compact_radix_tree::tree<cell_and_hash, unsigned int>::direct_layout<cell_and_hash, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)6, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)0, 0u, (compact_radix_tree::tree<cell_and_hash, unsigned int>::layout)4, 32u> >::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true>&> at ././utils/compact-radix-tree.hh:807
  | |   ++[4#1/1 100%] addr=0x1596f4a total=329 count=5 avg=66:
  | |   |              compact_radix_tree::tree<cell_and_hash, unsigned int>::node_head::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true> > at ././utils/compact-radix-tree.hh:473
  | |   |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::visit<compact_radix_tree::tree<cell_and_hash, unsigned int>::walking_visitor<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}, true> > at ././utils/compact-radix-tree.hh:1626
  | |   |              (inlined by) compact_radix_tree::tree<cell_and_hash, unsigned int>::walk<row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}>(ser::deletable_row__cells<bytes_ostream>&&) const::{lambda(unsigned int, cell_and_hash const&)#1}> at ././utils/compact-radix-tree.hh:1909
  | |   |              (inlined by) row::for_each_cell<(anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> >(ser::deletable_row__cells<bytes_ostream>&&, row const&, schema const&, column_kind)::{lambda(unsigned int, atomic_cell_or_collection const&)#1}> at ./mutation/mutation_partition.hh:151
  | |   |              (inlined by) (anonymous namespace)::write_row_cells<ser::deletable_row__cells<bytes_ostream> > at ./mutation/mutation_partition_serializer.cc:97
  | |   |              (inlined by) write_row<ser::writer_of_deletable_row<bytes_ostream> > at ./mutation/mutation_partition_serializer.cc:168
  | |     ++[5#1/2 80%] addr=0x15a310c total=263 count=4 avg=66:
  | |     |             mutation_partition_serializer::write_serialized<ser::writer_of_mutation_partition<bytes_ostream> > at ./mutation/mutation_partition_serializer.cc:180
  | |     | ++[6#1/2 62%] addr=0x14eb60a total=428 count=7 avg=61:
  | |     | |             frozen_mutation::frozen_mutation(mutation const&)::$_0::operator()<ser::writer_of_mutation_partition<bytes_ostream> > at ./mutation/frozen_mutation.cc:85
  | |     | |             (inlined by) ser::after_mutation__key<bytes_ostream>::partition<frozen_mutation::frozen_mutation(mutation const&)::$_0> at ./build/dev/gen/idl/mutation.dist.impl.hh:7058
  | |     | |             (inlined by) frozen_mutation::frozen_mutation at ./mutation/frozen_mutation.cc:84
  | |     | | ++[7#1/1 100%] addr=0x14ed388 total=532 count=9 avg=59:
  | |     | | |              freeze at ./mutation/frozen_mutation.cc:143
  | |     | |   ++[8#1/2 74%] addr=0x252cf55 total=394 count=6 avg=66:
  | |     | |   |             service::storage_service::merge_topology_snapshot at ./service/storage_service.cc:763
```

This change uses freeze_gently to freeze
the cdc_generations_v2 mutations one at a time
to prevent the stalls reported above.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
tgrabiec pushed a commit that referenced this pull request May 14, 2024
Prevent stalls coming from applying large
mutations in memory synchronously,
like the ones seen with the test_add_many_nodes_under_load
dtest:
```
  | | |   ++[5#2/2 44%] addr=0x1498efb total=256 count=3 avg=85:
  | | |   |             replica::memtable::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const> const&, db::rp_handle&&)::$_0::operator()() const::{lambda()#1}::operator() at ./replica/memtable.cc:804
  | | |   |             (inlined by) logalloc::allocating_section::with_reclaiming_disabled<replica::memtable::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const> const&, db::rp_handle&&)::$_0::operator()() const::{lambda()#1}&> at ././utils/logalloc.hh:500
  | | |   |             (inlined by) logalloc::allocating_section::operator()<replica::memtable::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const> const&, db::rp_handle&&)::$_0::operator()() const::{lambda()#1}>(logalloc::region&, replica::memtable::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const> const&, db::rp_handle&&)::$_0::operator()() const::{lambda()#1}&&)::{lambda()#1}::operator() at ././utils/logalloc.hh:527
  | | |   |             (inlined by) logalloc::allocating_section::with_reserve<logalloc::allocating_section::operator()<replica::memtable::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const> const&, db::rp_handle&&)::$_0::operator()() const::{lambda()#1}>(logalloc::region&, replica::memtable::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const> const&, db::rp_handle&&)::$_0::operator()() const::{lambda()#1}&&)::{lambda()#1}> at ././utils/logalloc.hh:471
  | | |   |             (inlined by) logalloc::allocating_section::operator()<replica::memtable::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const> const&, db::rp_handle&&)::$_0::operator()() const::{lambda()#1}> at ././utils/logalloc.hh:526
  | | |   |             (inlined by) replica::memtable::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const> const&, db::rp_handle&&)::$_0::operator() at ./replica/memtable.cc:800
  | | |   |             (inlined by) with_allocator<replica::memtable::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const> const&, db::rp_handle&&)::$_0> at ././utils/allocation_strategy.hh:318
  | | |   |             (inlined by) replica::memtable::apply at ./replica/memtable.cc:799
  | | |     ++[6#1/1 100%] addr=0x145047b total=1731 count=21 avg=82:
  | | |     |              replica::table::do_apply<frozen_mutation const&, seastar::lw_shared_ptr<schema const>&> at ./replica/table.cc:2896
  | | |       ++[7#1/1 100%] addr=0x13ddccb total=2852 count=32 avg=89:
  | | |       |              replica::table::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const>, db::rp_handle&&, std::chrono::time_point<seastar::lowres_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l> > >)::$_0::operator() at ./replica/table.cc:2924
  | | |       |              (inlined by) seastar::futurize<void>::invoke<replica::table::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const>, db::rp_handle&&, std::chrono::time_point<seastar::lowres_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l> > >)::$_0&> at ././seastar/include/seastar/core/future.hh:2032
  | | |       |              (inlined by) seastar::futurize_invoke<replica::table::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const>, db::rp_handle&&, std::chrono::time_point<seastar::lowres_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l> > >)::$_0&> at ././seastar/include/seastar/core/future.hh:2066
  | | |       |              (inlined by) replica::dirty_memory_manager_logalloc::region_group::run_when_memory_available<replica::table::apply(frozen_mutation const&, seastar::lw_shared_ptr<schema const>, db::rp_handle&&, std::chrono::time_point<seastar::lowres_clock, std::chrono::duration<long, std::ratio<1l, 1000000000l> > >)::$_0> at ./replica/dirty_memory_manager.hh:572
  | | |       |              (inlined by) replica::table::apply at ./replica/table.cc:2923
  | | |       ++           - addr=0x1330ba1:
  | | |       |              replica::database::apply_in_memory at ./replica/database.cc:1812
  | | |       ++           - addr=0x1360054:
  | | |       |              replica::database::do_apply at ./replica/database.cc:2032
```

This change has virtually no effect on small mutations
(up to 128KB in size).
build/release/scylla perf-simple-query --write --default-log-level=error --random-seed=1 -c 1
Before:
median 80092.06 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   53291 insns/op,        0 errors)
After:
median 78780.86 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   53311 insns/op,        0 errors)

To estimate the performance ramifications on
large mutations, I measured perf-simple-query --write
calling unfreeze_gently in all cases:
median 77411.26 tps ( 71.3 allocs/op,   8.0 logallocs/op,  14.3 tasks/op,   53280 insns/op,        0 errors)

Showing the allocations that moved out of logalloc
(in memtable::apply of frozen_mutation) into seastar
allocations (in unfreeze_gently) and <1% cpu overhead.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
@tgrabiec tgrabiec force-pushed the intranode-migration branch 2 times, most recently from d7cb4fe to 715ae68 Compare May 15, 2024 22:29
tgrabiec pushed a commit that referenced this pull request May 29, 2024
Seen the following unexplained assertion failure with
pytest -s -v --scylla-version=local_tarball --tablets repair_additional_test.py::TestRepairAdditional::test_repair_option_pr_multi_dc
```
INFO  2024-05-27 11:18:05,081 [shard 0:main] init - Shutting down repair service
INFO  2024-05-27 11:18:05,081 [shard 0:main] task_manager - Stopping module repair
INFO  2024-05-27 11:18:05,081 [shard 0:main] task_manager - Unregistered module repair
INFO  2024-05-27 11:18:05,081 [shard 1:main] task_manager - Stopping module repair
INFO  2024-05-27 11:18:05,081 [shard 1:main] task_manager - Unregistered module repair
scylla: repair/row_level.cc:3230: repair_service::~repair_service(): Assertion `_stopped' failed.
Aborting on shard 0.
Backtrace:
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libseastar.so+0x3f040c
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libseastar.so+0x41c7a1
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libc.so.6+0x3dbaf
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libc.so.6+0x8e883
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libc.so.6+0x3dafd
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libc.so.6+0x2687e
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libc.so.6+0x2679a
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libc.so.6+0x36186
  0x26f2428
  0x10fb373
  0x10fc8b8
  0x10fc809
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libseastar.so+0x456c6d
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libseastar.so+0x456bcf
  0x10fc65b
  0x10fc5bc
  0x10808d0
  0x1080800
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libseastar.so+0x3ff22f
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libseastar.so+0x4003b7
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libseastar.so+0x3ff888
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libseastar.so+0x36dea8
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libseastar.so+0x36d0e2
  0x101cefa
  0x105a390
  0x101bde7
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libc.so.6+0x27b89
  /home/bhalevy/.ccm/scylla-repository/local_tarball/libreloc/libc.so.6+0x27c4a
  0x101a764
```

Decoded:
```
~repair_service at ./repair/row_level.cc:3230
~shared_ptr_count_for at ././seastar/include/seastar/core/shared_ptr.hh:491
 (inlined by) ~shared_ptr_count_for at ././seastar/include/seastar/core/shared_ptr.hh:491
~shared_ptr at ././seastar/include/seastar/core/shared_ptr.hh:569
 (inlined by) seastar::shared_ptr<repair_service>::operator=(seastar::shared_ptr<repair_service>&&) at ././seastar/include/seastar/core/shared_ptr.hh:582
 (inlined by) seastar::shared_ptr<repair_service>::operator=(decltype(nullptr)) at ././seastar/include/seastar/core/shared_ptr.hh:588
 (inlined by) operator() at ././seastar/include/seastar/core/sharded.hh:727
 (inlined by) seastar::future<void> seastar::futurize<seastar::future<void> >::invoke<seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}::operator()(unsigned int) const::{lambda()#1}&>(seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}::operator()(unsigned int) const::{lambda()#1}&) at ././seastar/include/seastar/core/future.hh:2035
 (inlined by) seastar::futurize<std::invoke_result<seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}::operator()(unsigned int) const::{lambda()#1}>::type>::type seastar::smp::submit_to<seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}::operator()(unsigned int) const::{lambda()#1}>(unsigned int, seastar::smp_submit_to_options, seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}::operator()(unsigned int) const::{lambda()#1}&&) at ././seastar/include/seastar/core/smp.hh:367
seastar::futurize<std::invoke_result<seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}::operator()(unsigned int) const::{lambda()#1}>::type>::type seastar::smp::submit_to<seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}::operator()(unsigned int) const::{lambda()#1}>(unsigned int, seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}::operator()(unsigned int) const::{lambda()#1}&&) at ././seastar/include/seastar/core/smp.hh:394
 (inlined by) operator() at ././seastar/include/seastar/core/sharded.hh:725
 (inlined by) seastar::future<void> std::__invoke_impl<seastar::future<void>, seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}&, unsigned int>(std::__invoke_other, seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}&, unsigned int&&) at /usr/bin/../lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13/bits/invoke.h:61
 (inlined by) std::enable_if<is_invocable_r_v<seastar::future<void>, seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}&, unsigned int>, seastar::future<void> >::type std::__invoke_r<seastar::future<void>, seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}&, unsigned int>(seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}&, unsigned int&&) at /usr/bin/../lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13/bits/invoke.h:114
 (inlined by) std::_Function_handler<seastar::future<void> (unsigned int), seastar::sharded<repair_service>::stop()::{lambda(seastar::future<void>)#1}::operator()(seastar::future<void>) const::{lambda(unsigned int)#1}>::_M_invoke(std::_Any_data const&, unsigned int&&) at /usr/bin/../lib/gcc/x86_64-redhat-linux/13/../../../../include/c++/13/bits/std_function.h:290
```

FWIW, gdb crashed when opening the coredump.

This commit will help catch the issue earlier
when repair_service::stop() fails (and it must never fail)

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants