diff --git a/repair/row_level.cc b/repair/row_level.cc index 4097445752a8..7b7050524005 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -405,7 +405,6 @@ using is_local_reader = bool_class; class repair_writer_impl : public repair_writer::impl { schema_ptr _schema; reader_permit _permit; - uint64_t _estimated_partitions; std::optional> _writer_done; mutation_fragment_queue _mq; sharded& _db; @@ -417,7 +416,6 @@ class repair_writer_impl : public repair_writer::impl { repair_writer_impl( schema_ptr schema, reader_permit permit, - uint64_t estimated_partitions, sharded& db, sharded& sys_dist_ks, sharded& view_update_generator, @@ -426,7 +424,6 @@ class repair_writer_impl : public repair_writer::impl { flat_mutation_reader_v2 queue_reader) : _schema(std::move(schema)) , _permit(std::move(permit)) - , _estimated_partitions(estimated_partitions) , _mq(std::move(queue)) , _db(db) , _sys_dist_ks(sys_dist_ks) @@ -501,8 +498,9 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { return; } replica::table& t = _db.local().find_column_family(_schema->id()); + rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions()); _writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, std::move(_queue_reader), - streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, _estimated_partitions, _reason, is_offstrategy_supported(_reason)), + streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason)), t.stream_in_progress()).then([w] (uint64_t partitions) { rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable", w->schema()->ks_name(), w->schema()->cf_name(), partitions); @@ -517,14 +515,13 @@ void repair_writer_impl::create_writer(lw_shared_ptr w) { lw_shared_ptr make_repair_writer( schema_ptr schema, reader_permit permit, - uint64_t estimated_partitions, streaming::stream_reason reason, sharded& db, sharded& sys_dist_ks, sharded& view_update_generator) { auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit); auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle)); - auto i = std::make_unique(schema, permit, estimated_partitions, db, sys_dist_ks, view_update_generator, reason, std::move(queue), std::move(queue_reader)); + auto i = std::make_unique(schema, permit, db, sys_dist_ks, view_update_generator, reason, std::move(queue), std::move(queue_reader)); return make_lw_shared(schema, permit, std::move(i)); } @@ -826,7 +823,7 @@ class repair_meta { _seed, repair_reader::is_local_reader(_repair_master || _same_sharding_config) ) - , _repair_writer(make_repair_writer(_schema, _permit, _estimated_partitions, _reason, _db, _sys_dist_ks, _view_update_generator)) + , _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, _sys_dist_ks, _view_update_generator)) , _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes, [&rs] (uint32_t repair_meta_id, netw::messaging_service::msg_addr addr) { return rs.get_messaging().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, addr); @@ -981,6 +978,7 @@ class repair_meta { future<> set_estimated_partitions(uint64_t estimated_partitions) { return with_gate(_gate, [this, estimated_partitions] { _estimated_partitions = estimated_partitions; + _repair_writer->set_estimated_partitions(_estimated_partitions); }); } diff --git a/repair/writer.hh b/repair/writer.hh index 62e23f6c5f0d..881cfe97a203 100644 --- a/repair/writer.hh +++ b/repair/writer.hh @@ -89,6 +89,7 @@ class repair_writer : public enable_lw_shared_from_this { bool _partition_opened; named_semaphore _sem{1, named_semaphore_exception_factory{"repair_writer"}}; bool _created_writer = false; + uint64_t _estimated_partitions = 0; public: class impl { public: @@ -111,6 +112,15 @@ public: , _mq(&_impl->queue()) {} + + void set_estimated_partitions(uint64_t estimated_partitions) { + _estimated_partitions = estimated_partitions; + } + + uint64_t get_estimated_partitions() { + return _estimated_partitions; + } + void create_writer() { _impl->create_writer(shared_from_this()); _created_writer = true; @@ -141,7 +151,6 @@ private: lw_shared_ptr make_repair_writer( schema_ptr schema, reader_permit permit, - uint64_t estimated_partitions, streaming::stream_reason reason, sharded& db, sharded& sys_dist_ks,