Skip to content

Commit

Permalink
repair: Use the updated estimated_partitions to create writer
Browse files Browse the repository at this point in the history
The estimated_partitions is estimated after the repair_meta is created.

Currently, the default estimated_partitions was used to create the
write which is not correct.

To fix, use the updated estimated_partitions.

Reported by Petr Gusev

Closes #14179
  • Loading branch information
asias authored and denesb committed Jun 8, 2023
1 parent b4c21cf commit 4592bbe
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
12 changes: 5 additions & 7 deletions repair/row_level.cc
Expand Up @@ -405,7 +405,6 @@ using is_local_reader = bool_class<class is_local_reader_tag>;
class repair_writer_impl : public repair_writer::impl {
schema_ptr _schema;
reader_permit _permit;
uint64_t _estimated_partitions;
std::optional<future<>> _writer_done;
mutation_fragment_queue _mq;
sharded<replica::database>& _db;
Expand All @@ -417,7 +416,6 @@ class repair_writer_impl : public repair_writer::impl {
repair_writer_impl(
schema_ptr schema,
reader_permit permit,
uint64_t estimated_partitions,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator,
Expand All @@ -426,7 +424,6 @@ class repair_writer_impl : public repair_writer::impl {
flat_mutation_reader_v2 queue_reader)
: _schema(std::move(schema))
, _permit(std::move(permit))
, _estimated_partitions(estimated_partitions)
, _mq(std::move(queue))
, _db(db)
, _sys_dist_ks(sys_dist_ks)
Expand Down Expand Up @@ -501,8 +498,9 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
return;
}
replica::table& t = _db.local().find_column_family(_schema->id());
rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions());
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(_queue_reader),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, _estimated_partitions, _reason, is_offstrategy_supported(_reason)),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason)),
t.stream_in_progress()).then([w] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
Expand All @@ -517,14 +515,13 @@ void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
lw_shared_ptr<repair_writer> make_repair_writer(
schema_ptr schema,
reader_permit permit,
uint64_t estimated_partitions,
streaming::stream_reason reason,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator) {
auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit);
auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle));
auto i = std::make_unique<repair_writer_impl>(schema, permit, estimated_partitions, db, sys_dist_ks, view_update_generator, reason, std::move(queue), std::move(queue_reader));
auto i = std::make_unique<repair_writer_impl>(schema, permit, db, sys_dist_ks, view_update_generator, reason, std::move(queue), std::move(queue_reader));
return make_lw_shared<repair_writer>(schema, permit, std::move(i));
}

Expand Down Expand Up @@ -826,7 +823,7 @@ class repair_meta {
_seed,
repair_reader::is_local_reader(_repair_master || _same_sharding_config)
)
, _repair_writer(make_repair_writer(_schema, _permit, _estimated_partitions, _reason, _db, _sys_dist_ks, _view_update_generator))
, _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, _sys_dist_ks, _view_update_generator))
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) {
return rs.get_messaging().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr);
Expand Down Expand Up @@ -981,6 +978,7 @@ class repair_meta {
future<> set_estimated_partitions(uint64_t estimated_partitions) {
return with_gate(_gate, [this, estimated_partitions] {
_estimated_partitions = estimated_partitions;
_repair_writer->set_estimated_partitions(_estimated_partitions);
});
}

Expand Down
11 changes: 10 additions & 1 deletion repair/writer.hh
Expand Up @@ -89,6 +89,7 @@ class repair_writer : public enable_lw_shared_from_this<repair_writer> {
bool _partition_opened;
named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}};
bool _created_writer = false;
uint64_t _estimated_partitions = 0;
public:
class impl {
public:
Expand All @@ -111,6 +112,15 @@ public:
, _mq(&_impl->queue())
{}


void set_estimated_partitions(uint64_t estimated_partitions) {
_estimated_partitions = estimated_partitions;
}

uint64_t get_estimated_partitions() {
return _estimated_partitions;
}

void create_writer() {
_impl->create_writer(shared_from_this());
_created_writer = true;
Expand Down Expand Up @@ -141,7 +151,6 @@ private:
lw_shared_ptr<repair_writer> make_repair_writer(
schema_ptr schema,
reader_permit permit,
uint64_t estimated_partitions,
streaming::stream_reason reason,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
Expand Down

0 comments on commit 4592bbe

Please sign in to comment.