-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
row_level.cc
3400 lines (3203 loc) · 170 KB
/
row_level.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <exception>
#include <seastar/util/defer.hh>
#include "gms/endpoint_state.hh"
#include "repair/repair.hh"
#include "message/messaging_service.hh"
#include "repair/task_manager_module.hh"
#include "seastar/coroutine/exception.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "mutation/mutation_fragment.hh"
#include "mutation_writer/multishard_writer.hh"
#include "dht/i_partitioner.hh"
#include "dht/sharder.hh"
#include "utils/xx_hasher.hh"
#include "utils/UUID.hh"
#include "replica/database.hh"
#include <seastar/util/bool_class.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <list>
#include <vector>
#include <algorithm>
#include <random>
#include <optional>
#include <boost/range/adaptors.hpp>
#include <boost/intrusive/list.hpp>
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "gms/gossiper.hh"
#include "repair/row_level.hh"
#include "utils/stall_free.hh"
#include "service/migration_manager.hh"
#include "streaming/consumer.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/all.hh>
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
#include "service/raft/raft_address_map.hh"
#include "db/batchlog_manager.hh"
#include "idl/partition_checksum.dist.hh"
#include "readers/empty_v2.hh"
#include "readers/evictable.hh"
#include "readers/queue.hh"
#include "readers/filtering.hh"
#include "readers/mutation_fragment_v1_stream.hh"
#include "repair/hash.hh"
#include "repair/decorated_key_with_hash.hh"
#include "repair/row.hh"
#include "repair/writer.hh"
#include "repair/reader.hh"
#include "compaction/compaction_manager.hh"
#include "utils/xx_hasher.hh"
extern logging::logger rlogger;
static bool inject_rpc_stream_error = false;
static shard_id get_dst_shard_id(uint32_t src_cpu_id, const rpc::optional<shard_id>& dst_cpu_id_opt) {
uint32_t dst_cpu_id = 0;
if (dst_cpu_id_opt && *dst_cpu_id_opt != repair_unspecified_shard) {
dst_cpu_id = *dst_cpu_id_opt;
} else {
dst_cpu_id = src_cpu_id % smp::count;
}
return dst_cpu_id;
}
enum class repair_state : uint16_t {
unknown,
row_level_start_started,
row_level_start_finished,
get_estimated_partitions_started,
get_estimated_partitions_finished,
set_estimated_partitions_started,
set_estimated_partitions_finished,
get_sync_boundary_started,
get_sync_boundary_finished,
get_combined_row_hash_started,
get_combined_row_hash_finished,
get_row_diff_with_rpc_stream_started,
get_row_diff_with_rpc_stream_finished,
get_row_diff_and_update_peer_row_hash_sets_started,
get_row_diff_and_update_peer_row_hash_sets_finished,
get_full_row_hashes_with_rpc_stream_started,
get_full_row_hashes_with_rpc_stream_finished,
get_full_row_hashes_started,
get_full_row_hashes_finished,
get_row_diff_started,
get_row_diff_finished,
put_row_diff_with_rpc_stream_started,
put_row_diff_with_rpc_stream_finished,
put_row_diff_started,
put_row_diff_finished,
row_level_stop_started,
row_level_stop_finished,
};
struct repair_node_state {
gms::inet_address node;
repair_state state = repair_state::unknown;
// The shard that repair instance runs on
shard_id shard;
explicit repair_node_state(gms::inet_address n) : node(n) { }
explicit repair_node_state(gms::inet_address n, shard_id s) : node(n), shard(s) { }
};
// Wraps sink and source objects for repair master or repair follower nodes.
// For repair master, it stores sink and source pair for each of the followers.
// For repair follower, it stores one sink and source pair for repair master.
template<class SinkType, class SourceType>
class sink_source_for_repair {
uint32_t _repair_meta_id;
using get_sink_source_fn_type = std::function<future<std::tuple<rpc::sink<SinkType>, rpc::source<SourceType>>> (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id, netw::messaging_service::msg_addr addr)>;
using sink_type = std::reference_wrapper<rpc::sink<SinkType>>;
using source_type = std::reference_wrapper<rpc::source<SourceType>>;
// The vectors below store sink and source object for peer nodes.
std::vector<std::optional<rpc::sink<SinkType>>> _sinks;
std::vector<std::optional<rpc::source<SourceType>>> _sources;
std::vector<bool> _sources_closed;
get_sink_source_fn_type _fn;
public:
sink_source_for_repair(uint32_t repair_meta_id, size_t nr_peer_nodes, get_sink_source_fn_type fn)
: _repair_meta_id(repair_meta_id)
, _sinks(nr_peer_nodes)
, _sources(nr_peer_nodes)
, _sources_closed(nr_peer_nodes, false)
, _fn(std::move(fn)) {
}
void mark_source_closed(unsigned node_idx) {
_sources_closed[node_idx] = true;
}
future<std::tuple<sink_type, source_type>> get_sink_source(gms::inet_address remote_node, unsigned node_idx, std::optional<shard_id> dst_cpu_id) {
using value_type = std::tuple<sink_type, source_type>;
if (_sinks[node_idx] && _sources[node_idx]) {
return make_ready_future<value_type>(value_type(_sinks[node_idx].value(), _sources[node_idx].value()));
}
if (_sinks[node_idx] || _sources[node_idx]) {
return make_exception_future<value_type>(std::runtime_error(format("sink or source is missing for node {}", remote_node)));
}
return _fn(_repair_meta_id, dst_cpu_id, netw::messaging_service::msg_addr(remote_node)).then_unpack([this, node_idx] (rpc::sink<SinkType> sink, rpc::source<SourceType> source) mutable {
_sinks[node_idx].emplace(std::move(sink));
_sources[node_idx].emplace(std::move(source));
return make_ready_future<value_type>(value_type(_sinks[node_idx].value(), _sources[node_idx].value()));
});
}
future<> close() {
return parallel_for_each(boost::irange(unsigned(0), unsigned(_sources.size())), [this] (unsigned node_idx) mutable {
std::optional<rpc::sink<SinkType>>& sink_opt = _sinks[node_idx];
auto f = sink_opt ? sink_opt->close() : make_ready_future<>();
return f.finally([this, node_idx] {
std::optional<rpc::source<SourceType>>& source_opt = _sources[node_idx];
if (source_opt && !_sources_closed[node_idx]) {
return repeat([&source_opt] () mutable {
// Keep reading source until end of stream
return (*source_opt)().then([] (std::optional<std::tuple<SourceType>> opt) mutable {
if (opt) {
return make_ready_future<stop_iteration>(stop_iteration::no);
} else {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
}).handle_exception([] (std::exception_ptr ep) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
});
});
}
return make_ready_future<>();
});
});
}
};
using sink_source_for_get_full_row_hashes = sink_source_for_repair<repair_stream_cmd, repair_hash_with_cmd>;
using sink_source_for_get_row_diff = sink_source_for_repair<repair_hash_with_cmd, repair_row_on_wire_with_cmd>;
using sink_source_for_put_row_diff = sink_source_for_repair<repair_row_on_wire_with_cmd, repair_stream_cmd>;
struct row_level_repair_metrics {
seastar::metrics::metric_groups _metrics;
uint64_t tx_row_nr{0};
uint64_t rx_row_nr{0};
uint64_t tx_row_bytes{0};
uint64_t rx_row_bytes{0};
uint64_t row_from_disk_nr{0};
uint64_t row_from_disk_bytes{0};
uint64_t tx_hashes_nr{0};
uint64_t rx_hashes_nr{0};
row_level_repair_metrics() {
namespace sm = seastar::metrics;
_metrics.add_group("repair", {
sm::make_counter("tx_row_nr", tx_row_nr,
sm::description("Total number of rows sent on this shard.")),
sm::make_counter("rx_row_nr", rx_row_nr,
sm::description("Total number of rows received on this shard.")),
sm::make_counter("tx_row_bytes", tx_row_bytes,
sm::description("Total bytes of rows sent on this shard.")),
sm::make_counter("rx_row_bytes", rx_row_bytes,
sm::description("Total bytes of rows received on this shard.")),
sm::make_counter("tx_hashes_nr", tx_hashes_nr,
sm::description("Total number of row hashes sent on this shard.")),
sm::make_counter("rx_hashes_nr", rx_hashes_nr,
sm::description("Total number of row hashes received on this shard.")),
sm::make_counter("row_from_disk_nr", row_from_disk_nr,
sm::description("Total number of rows read from disk on this shard.")),
sm::make_counter("row_from_disk_bytes", row_from_disk_bytes,
sm::description("Total bytes of rows read from disk on this shard.")),
});
}
};
static thread_local row_level_repair_metrics _metrics;
static const std::vector<row_level_diff_detect_algorithm>& suportted_diff_detect_algorithms() {
static std::vector<row_level_diff_detect_algorithm> _algorithms = {
row_level_diff_detect_algorithm::send_full_set,
row_level_diff_detect_algorithm::send_full_set_rpc_stream,
};
return _algorithms;
};
static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(netw::messaging_service& ms, const inet_address_vector_replica_set& nodes) {
std::vector<std::vector<row_level_diff_detect_algorithm>> nodes_algorithms(nodes.size());
parallel_for_each(boost::irange(size_t(0), nodes.size()), [&ms, &nodes_algorithms, &nodes] (size_t idx) {
return ms.send_repair_get_diff_algorithms(netw::messaging_service::msg_addr(nodes[idx])).then(
[&nodes_algorithms, &nodes, idx] (std::vector<row_level_diff_detect_algorithm> algorithms) {
std::sort(algorithms.begin(), algorithms.end());
nodes_algorithms[idx] = std::move(algorithms);
rlogger.trace("Got node_algorithms={}, from node={}", nodes_algorithms[idx], nodes[idx]);
});
}).get();
auto common_algorithms = suportted_diff_detect_algorithms();
for (auto& algorithms : nodes_algorithms) {
std::sort(common_algorithms.begin(), common_algorithms.end());
std::vector<row_level_diff_detect_algorithm> results;
std::set_intersection(algorithms.begin(), algorithms.end(),
common_algorithms.begin(), common_algorithms.end(),
std::back_inserter(results));
common_algorithms = std::move(results);
}
rlogger.trace("peer_algorithms={}, local_algorithms={}, common_diff_detect_algorithms={}",
nodes_algorithms, suportted_diff_detect_algorithms(), common_algorithms);
if (common_algorithms.empty()) {
throw std::runtime_error("Can not find row level repair diff detect algorithm");
}
return common_algorithms.back();
}
static bool is_rpc_stream_supported(row_level_diff_detect_algorithm algo) {
// send_full_set is the only algorithm that does not support rpc stream
return algo != row_level_diff_detect_algorithm::send_full_set;
}
static uint64_t get_random_seed() {
static thread_local std::default_random_engine random_engine{std::random_device{}()};
static thread_local std::uniform_int_distribution<uint64_t> random_dist{};
return random_dist(random_engine);
}
repair_hash repair_hasher::do_hash_for_mf(const decorated_key_with_hash& dk_with_hash, const mutation_fragment& mf) {
xx_hasher h(_seed);
feed_hash(h, mf, *_schema);
feed_hash(h, dk_with_hash.hash.hash);
return repair_hash(h.finalize_uint64());
}
flat_mutation_reader_v2 repair_reader::make_reader(
seastar::sharded<replica::database>& db,
replica::column_family& cf,
read_strategy strategy,
const dht::sharder& remote_sharder,
unsigned remote_shard,
gc_clock::time_point compaction_time) {
switch (strategy) {
case read_strategy::local: {
auto ms = mutation_source([&cf, compaction_time] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding fwd_mr) {
return cf.make_streaming_reader(std::move(s), std::move(permit), pr, ps, fwd_mr, compaction_time);
});
flat_mutation_reader_v2 rd(nullptr);
std::tie(rd, _reader_handle) = make_manually_paused_evictable_reader_v2(
std::move(ms),
_schema,
_permit,
_range,
_schema->full_slice(),
{},
mutation_reader::forwarding::no);
return rd;
}
case read_strategy::multishard_split: {
// We can't have two permits with count resource for 1 repair.
// So we release the one on _permit so the only one is the one the
// shard reader will obtain.
_permit.release_base_resources();
return make_multishard_streaming_reader(db, _schema, _permit, [this] {
auto shard_range = _sharder.next();
if (shard_range) {
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
}
return std::optional<dht::partition_range>();
}, compaction_time);
}
case read_strategy::multishard_filter: {
// We can't have two permits with count resource for 1 repair.
// So we release the one on _permit so the only one is the one the
// shard reader will obtain.
_permit.release_base_resources();
return make_filtering_reader(make_multishard_streaming_reader(db, _schema, _permit, _range, compaction_time),
[&remote_sharder, remote_shard](const dht::decorated_key& k) {
return remote_sharder.shard_of(k.token()) == remote_shard;
});
}
default:
on_internal_error(rlogger,
format("make_reader: unexpected read_strategy {}", static_cast<int>(strategy)));
}
}
repair_reader::repair_reader(
seastar::sharded<replica::database>& db,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
const dht::sharder& remote_sharder,
unsigned remote_shard,
uint64_t seed,
read_strategy strategy,
gc_clock::time_point compaction_time)
: _schema(s)
, _permit(std::move(permit))
, _range(dht::to_partition_range(range))
, _sharder(remote_sharder, range, remote_shard)
, _seed(seed)
, _local_read_op(strategy == read_strategy::local ? std::optional(cf.read_in_progress()) : std::nullopt)
, _reader(make_reader(db, cf, strategy, remote_sharder, remote_shard, compaction_time))
{ }
future<mutation_fragment_opt>
repair_reader::read_mutation_fragment() {
++_reads_issued;
// Use a very long timeout for the reader to break out any eventual
// deadlock within the reader. Thirty minutes should be more than
// enough to read a single mutation fragment.
auto timeout = db::timeout_clock::now() + std::chrono::minutes(30);
_reader.set_timeout(timeout); // reset to db::no_timeout in pause()
return _reader().then_wrapped([this] (future<mutation_fragment_opt> f) {
try {
auto mfopt = f.get();
++_reads_finished;
return mfopt;
} catch (seastar::timed_out_error& e) {
rlogger.warn("Failed to read a fragment from the reader, keyspace={}, table={}, range={}: {}",
_schema->ks_name(), _schema->cf_name(), _range, e);
throw;
} catch (...) {
throw;
}
});
}
future<> repair_reader::on_end_of_stream() noexcept {
return _reader.close().then([this] {
_permit.release_base_resources();
_reader = mutation_fragment_v1_stream(make_empty_flat_reader_v2(_schema, _permit));
_reader_handle.reset();
});
}
future<> repair_reader::close() noexcept {
return _reader.close().then([this] {
_permit.release_base_resources();
_reader_handle.reset();
});
}
void repair_reader::set_current_dk(const dht::decorated_key& key) {
_current_dk = make_lw_shared<const decorated_key_with_hash>(*_schema, key, _seed);
}
void repair_reader::clear_current_dk() {
_current_dk = {};
}
void repair_reader::check_current_dk() {
if (!_current_dk) {
throw std::runtime_error("Current partition_key is unknown");
}
}
void repair_reader::pause() {
_reader.set_timeout(db::no_timeout);
if (_reader_handle) {
_reader_handle->pause();
}
}
class repair_writer_impl : public repair_writer::impl {
schema_ptr _schema;
reader_permit _permit;
std::optional<future<>> _writer_done;
mutation_fragment_queue _mq;
sharded<replica::database>& _db;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::view::view_update_generator>& _view_update_generator;
streaming::stream_reason _reason;
flat_mutation_reader_v2 _queue_reader;
public:
repair_writer_impl(
schema_ptr schema,
reader_permit permit,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator,
streaming::stream_reason reason,
mutation_fragment_queue queue,
flat_mutation_reader_v2 queue_reader)
: _schema(std::move(schema))
, _permit(std::move(permit))
, _mq(std::move(queue))
, _db(db)
, _sys_dist_ks(sys_dist_ks)
, _view_update_generator(view_update_generator)
, _reason(reason)
, _queue_reader(std::move(queue_reader))
{}
virtual void create_writer(lw_shared_ptr<repair_writer> writer) override;
virtual mutation_fragment_queue& queue() override {
return _mq;
}
virtual future<> wait_for_writer_done() override;
private:
static sstables::offstrategy is_offstrategy_supported(streaming::stream_reason reason) {
static const std::unordered_set<streaming::stream_reason> operations_supported = {
streaming::stream_reason::bootstrap,
streaming::stream_reason::replace,
streaming::stream_reason::removenode,
streaming::stream_reason::decommission,
streaming::stream_reason::repair,
streaming::stream_reason::rebuild,
};
return sstables::offstrategy(operations_supported.contains(reason));
}
};
future<> repair_writer::write_start_and_mf(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
_current_dk_written_to_sstable = dk;
if (mf.is_partition_start()) {
return _mq->push(std::move(mf)).then([this] {
_partition_opened = true;
});
} else {
auto start = mutation_fragment(*_schema, _permit, partition_start(dk->dk, tombstone()));
return _mq->push(std::move(start)).then([this, mf = std::move(mf)] () mutable {
_partition_opened = true;
return _mq->push(std::move(mf));
});
}
};
class queue_reader_handle_adapter : public mutation_fragment_queue::impl {
queue_reader_handle_v2 _handle;
public:
queue_reader_handle_adapter(queue_reader_handle_v2 handle)
: _handle(std::move(handle))
{}
virtual future<> push(mutation_fragment_v2 mf) override {
return _handle.push(std::move(mf));
}
virtual void abort(std::exception_ptr ep) override {
_handle.abort(std::move(ep));
}
virtual void push_end_of_stream() override {
_handle.push_end_of_stream();
}
};
mutation_fragment_queue make_mutation_fragment_queue(schema_ptr s, reader_permit permit, queue_reader_handle_v2 handle) {
return mutation_fragment_queue(std::move(s), std::move(permit), seastar::make_shared<queue_reader_handle_adapter>(std::move(handle)));
}
void repair_writer_impl::create_writer(lw_shared_ptr<repair_writer> w) {
if (_writer_done) {
return;
}
replica::table& t = _db.local().find_column_family(_schema->id());
rlogger.debug("repair_writer: keyspace={}, table={}, estimated_partitions={}", w->schema()->ks_name(), w->schema()->cf_name(), w->get_estimated_partitions());
service::frozen_topology_guard topo_guard = service::null_topology_guard; // FIXME: propagate
// The sharder is valid only when the erm is valid. Keep a reference of the erm to keep the sharder valid.
auto erm = t.get_effective_replication_map();
auto& sharder = erm->get_sharder(*(w->schema()));
_writer_done = mutation_writer::distribute_reader_and_consume_on_shards(_schema, sharder, std::move(_queue_reader),
streaming::make_streaming_consumer(sstables::repair_origin, _db, _sys_dist_ks, _view_update_generator, w->get_estimated_partitions(), _reason, is_offstrategy_supported(_reason), topo_guard),
t.stream_in_progress()).then([w, erm] (uint64_t partitions) {
rlogger.debug("repair_writer: keyspace={}, table={}, managed to write partitions={} to sstable",
w->schema()->ks_name(), w->schema()->cf_name(), partitions);
}).handle_exception([w, erm] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, multishard_writer failed: {}",
w->schema()->ks_name(), w->schema()->cf_name(), ep);
w->queue().abort(ep);
return make_exception_future<>(std::move(ep));
});
}
lw_shared_ptr<repair_writer> make_repair_writer(
schema_ptr schema,
reader_permit permit,
streaming::stream_reason reason,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator) {
auto [queue_reader, queue_handle] = make_queue_reader_v2(schema, permit);
auto queue = make_mutation_fragment_queue(schema, permit, std::move(queue_handle));
auto i = std::make_unique<repair_writer_impl>(schema, permit, db, sys_dist_ks, view_update_generator, reason, std::move(queue), std::move(queue_reader));
return make_lw_shared<repair_writer>(schema, permit, std::move(i));
}
future<> repair_writer::write_partition_end() {
if (_partition_opened) {
return _mq->push(mutation_fragment(*_schema, _permit, partition_end())).then([this] {
_partition_opened = false;
});
}
return make_ready_future<>();
}
future<> repair_writer::do_write(lw_shared_ptr<const decorated_key_with_hash> dk, mutation_fragment mf) {
if (_current_dk_written_to_sstable) {
const auto cmp_res = _current_dk_written_to_sstable->dk.tri_compare(*_schema, dk->dk);
if (cmp_res > 0) {
on_internal_error(rlogger, format("repair_writer::do_write(): received out-of-order partition, current: {}, next: {}", _current_dk_written_to_sstable->dk, dk->dk));
} else if (cmp_res == 0) {
return _mq->push(std::move(mf));
} else {
return write_partition_end().then([this,
dk = std::move(dk), mf = std::move(mf)] () mutable {
return write_start_and_mf(std::move(dk), std::move(mf));
});
}
} else {
return write_start_and_mf(std::move(dk), std::move(mf));
}
}
future<> repair_writer::write_end_of_stream() {
if (_created_writer) {
return with_semaphore(_sem, 1, [this] {
// Partition_end is never sent on wire, so we have to write one ourselves.
return write_partition_end().then([this] () mutable {
_mq->push_end_of_stream();
}).handle_exception([this] (std::exception_ptr ep) {
_mq->abort(ep);
rlogger.warn("repair_writer: keyspace={}, table={}, write_end_of_stream failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
return make_exception_future<>(std::move(ep));
});
});
} else {
return make_ready_future<>();
}
}
future<> repair_writer_impl::wait_for_writer_done() {
if (_writer_done) {
return std::move(*(_writer_done));
} else {
return make_ready_future<>();
}
}
future<> repair_writer::wait_for_writer_done() {
return when_all_succeed(write_end_of_stream(), _impl->wait_for_writer_done()).discard_result().handle_exception(
[this] (std::exception_ptr ep) {
rlogger.warn("repair_writer: keyspace={}, table={}, wait_for_writer_done failed: {}",
_schema->ks_name(), _schema->cf_name(), ep);
return make_exception_future<>(std::move(ep));
});
}
class repair_meta;
class repair_meta_tracker;
class row_level_repair;
static void add_to_repair_meta_for_masters(repair_meta& rm);
static void add_to_repair_meta_for_followers(repair_meta& rm);
future<std::list<repair_row>> to_repair_rows_list(repair_rows_on_wire rows, schema_ptr s, uint64_t seed, repair_master is_master, reader_permit permit, repair_hasher hasher) {
std::list<repair_row> row_list;
std::exception_ptr ex;
try {
lw_shared_ptr<const decorated_key_with_hash> dk_ptr;
lw_shared_ptr<mutation_fragment> last_mf;
position_in_partition::tri_compare cmp(*s);
// Consume the rows and mutation_fragment:s below incrementally
// as it interleaves frees with allocation, reducing memory pressure,
// and to prevent reactor stalls when freeing a large repair_rows_on_wire
// object in one shot when the function returns.
for (auto it = rows.begin(); it != rows.end(); it = rows.erase(it)) {
auto x = std::move(*it);
dht::decorated_key dk = dht::decorate_key(*s, x.get_key());
if (!(dk_ptr && dk_ptr->dk.equal(*s, dk))) {
dk_ptr = make_lw_shared<const decorated_key_with_hash>(*s, dk, seed);
}
auto& mutation_fragments = x.get_mutation_fragments();
if (is_master) {
for (auto fmfit = mutation_fragments.begin(); fmfit != mutation_fragments.end(); fmfit = mutation_fragments.erase(fmfit)) {
auto fmf = std::move(*fmfit);
_metrics.rx_row_nr += 1;
_metrics.rx_row_bytes += fmf.representation().size();
// Keep the mutation_fragment in repair_row as an
// optimization to avoid unfreeze again when
// mutation_fragment is needed by _repair_writer.do_write()
// to apply the repair_row to disk
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*s, permit));
auto hash = hasher.do_hash_for_mf(*dk_ptr, *mf);
position_in_partition pos(mf->position());
row_list.push_back(repair_row(std::move(fmf), std::move(pos), dk_ptr, std::move(hash), is_dirty_on_master::yes, std::move(mf)));
co_await coroutine::maybe_yield();
}
} else {
last_mf = {};
for (auto fmfit = mutation_fragments.begin(); fmfit != mutation_fragments.end(); fmfit = mutation_fragments.erase(fmfit)) {
auto fmf = std::move(*fmfit);
_metrics.rx_row_nr += 1;
_metrics.rx_row_bytes += fmf.representation().size();
auto mf = make_lw_shared<mutation_fragment>(fmf.unfreeze(*s, permit));
// If the mutation_fragment has the same position as
// the last mutation_fragment, it means they are the
// same row with different contents. We can not feed
// such rows into the sstable writer. Instead we apply
// the mutation_fragment into the previous one.
if (last_mf && cmp(last_mf->position(), mf->position()) == 0 && last_mf->mergeable_with(*mf)) {
last_mf->apply(*s, std::move(*mf));
} else {
last_mf = mf;
// On repair follower node, only decorated_key_with_hash and the mutation_fragment inside repair_row are used.
row_list.push_back(repair_row({}, {}, dk_ptr, {}, is_dirty_on_master::no, std::move(mf)));
}
co_await coroutine::maybe_yield();
}
}
co_await coroutine::maybe_yield();
}
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_await utils::clear_gently(rows);
co_await utils::clear_gently(row_list);
co_await coroutine::return_exception_ptr(std::move(ex));
}
co_return std::move(row_list);
}
void flush_rows(schema_ptr s, std::list<repair_row>& rows, lw_shared_ptr<repair_writer>& writer, locator::effective_replication_map_ptr erm, bool small_table_optimization) {
auto cmp = position_in_partition::tri_compare(*s);
lw_shared_ptr<mutation_fragment> last_mf;
lw_shared_ptr<const decorated_key_with_hash> last_dk;
bool do_small_table_optimization = erm && small_table_optimization;
auto* strat = do_small_table_optimization ? &erm->get_replication_strategy() : nullptr;
auto* tm = do_small_table_optimization ? &erm->get_token_metadata() : nullptr;
auto myip = do_small_table_optimization ? erm->get_topology().my_address() : gms::inet_address();
for (auto& r : rows) {
thread::maybe_yield();
if (!r.dirty_on_master()) {
continue;
}
const auto& dk = r.get_dk_with_hash()->dk;
if (do_small_table_optimization) {
// Check if the token is owned by the node
auto eps = strat->calculate_natural_ips(dk.token(), *tm).get();
if (!eps.contains(myip)) {
rlogger.trace("master: ignore row, token={}", dk.token());
continue;
}
}
writer->create_writer();
auto mf = r.get_mutation_fragment_ptr();
if (last_mf && last_dk &&
cmp(last_mf->position(), mf->position()) == 0 &&
dk.tri_compare(*s, last_dk->dk) == 0 &&
last_mf->mergeable_with(*mf)) {
last_mf->apply(*s, std::move(*mf));
} else {
if (last_mf && last_dk) {
writer->do_write(std::move(last_dk), std::move(*last_mf)).get();
}
last_mf = mf;
last_dk = r.get_dk_with_hash();
}
}
if (last_mf && last_dk) {
writer->do_write(std::move(last_dk), std::move(*last_mf)).get();
}
}
class repair_meta {
friend repair_meta_tracker;
public:
using update_working_row_buf = bool_class<class update_working_row_buf_tag>;
using update_peer_row_hash_sets = bool_class<class update_peer_row_hash_sets_tag>;
using needs_all_rows_t = bool_class<class needs_all_rows_tag>;
using msg_addr = netw::messaging_service::msg_addr;
using tracker_link_type = boost::intrusive::list_member_hook<bi::link_mode<boost::intrusive::auto_unlink>>;
private:
repair_service& _rs;
seastar::sharded<replica::database>& _db;
netw::messaging_service& _messaging;
seastar::sharded<db::system_distributed_keyspace>& _sys_dist_ks;
seastar::sharded<db::view::view_update_generator>& _view_update_generator;
schema_ptr _schema;
reader_permit _permit;
dht::token_range _range;
repair_sync_boundary::tri_compare _cmp;
// The algorithm used to find the row difference
row_level_diff_detect_algorithm _algo;
// Max rows size can be stored in _row_buf
size_t _max_row_buf_size;
uint64_t _seed = 0;
repair_master _repair_master;
gms::inet_address _myip;
uint32_t _repair_meta_id;
streaming::stream_reason _reason;
// Repair master's sharding configuration
shard_config _master_node_shard_config;
// sharding info of repair master
dht::sharder _remote_sharder;
bool _same_sharding_config = false;
struct local_range_estimation {
size_t master_subranges_count;
size_t partitions_count;
};
std::optional<local_range_estimation> _local_range_estimation;
uint64_t _estimated_partitions = 0;
// For repair master nr peers is the number of repair followers, for repair
// follower nr peers is always one because repair master is the only peer.
size_t _nr_peer_nodes= 1;
repair_stats _stats;
std::optional<repair_reader> _repair_reader;
lw_shared_ptr<repair_writer> _repair_writer;
// Contains rows read from disk
std::list<repair_row> _row_buf;
// Contains rows we are working on to sync between peers
std::list<repair_row> _working_row_buf;
// Combines all the repair_hash in _working_row_buf
repair_hash _working_row_buf_combined_hash;
// Tracks the last sync boundary
std::optional<repair_sync_boundary> _last_sync_boundary;
// Tracks current sync boundary
std::optional<repair_sync_boundary> _current_sync_boundary;
// Contains the hashes of rows in the _working_row_buffor for all peer nodes
std::vector<repair_hash_set> _peer_row_hash_sets;
// Gate used to make sure pending operation of meta data is done
seastar::gate _gate;
sink_source_for_get_full_row_hashes _sink_source_for_get_full_row_hashes;
sink_source_for_get_row_diff _sink_source_for_get_row_diff;
sink_source_for_put_row_diff _sink_source_for_put_row_diff;
tracker_link_type _tracker_link;
row_level_repair* _row_level_repair_ptr;
std::vector<repair_node_state> _all_node_states;
is_dirty_on_master _dirty_on_master = is_dirty_on_master::no;
std::optional<shared_future<>> _stopped;
repair_hasher _repair_hasher;
gc_clock::time_point _compaction_time;
bool _is_tablet;
public:
std::vector<repair_node_state>& all_nodes() {
return _all_node_states;
}
void set_repair_state(repair_state state, gms::inet_address node) {
for (auto& ns : all_nodes()) {
if (ns.node == node) {
ns.state = state;
}
}
}
void set_repair_state_for_local_node(repair_state state) {
// The first node is the local node
all_nodes().front().state = state;
}
repair_stats& stats() {
return _stats;
}
gms::inet_address myip() const {
return _myip;
}
uint32_t repair_meta_id() const {
return _repair_meta_id;
}
const std::optional<repair_sync_boundary>& current_sync_boundary() const {
return _current_sync_boundary;
}
const std::optional<repair_sync_boundary>& last_sync_boundary() const {
return _last_sync_boundary;
};
const repair_hash& working_row_buf_combined_hash() const {
return _working_row_buf_combined_hash;
}
bool use_rpc_stream() const {
return is_rpc_stream_supported(_algo);
}
public:
// master constructor
repair_meta(
repair_service& rs,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
row_level_diff_detect_algorithm algo,
size_t max_row_buf_size,
uint64_t seed,
repair_master master,
uint32_t repair_meta_id,
streaming::stream_reason reason,
shard_config master_node_shard_config,
inet_address_vector_replica_set all_live_peer_nodes,
size_t nr_peer_nodes,
std::vector<std::optional<shard_id>> all_live_peer_shards,
row_level_repair* row_level_repair_ptr,
gc_clock::time_point compaction_time)
: _rs(rs)
, _db(rs.get_db())
, _messaging(rs.get_messaging())
, _sys_dist_ks(rs.get_sys_dist_ks())
, _view_update_generator(rs.get_view_update_generator())
, _schema(s)
, _permit(std::move(permit))
, _range(range)
, _cmp(repair_sync_boundary::tri_compare(*_schema))
, _algo(algo)
, _max_row_buf_size(max_row_buf_size)
, _seed(seed)
, _repair_master(master)
, _myip(_db.local().get_token_metadata().get_topology().my_address())
, _repair_meta_id(repair_meta_id)
, _reason(reason)
, _master_node_shard_config(std::move(master_node_shard_config))
, _remote_sharder(make_remote_sharder())
, _same_sharding_config(is_same_sharding_config(cf))
, _nr_peer_nodes(nr_peer_nodes)
, _repair_writer(make_repair_writer(_schema, _permit, _reason, _db, _sys_dist_ks, _view_update_generator))
, _sink_source_for_get_full_row_hashes(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, netw::messaging_service::msg_addr addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
rlogger.debug("get_full_row_hashes: repair_meta_id={} dst_cpu_id={}", repair_meta_id, dst_cpu_id);
return rs.get_messaging().make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(repair_meta_id, dst_cpu_id, addr);
})
, _sink_source_for_get_row_diff(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, netw::messaging_service::msg_addr addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
rlogger.debug("get_row_diff: repair_meta_id={} dst_cpu_id={}", repair_meta_id, dst_cpu_id);
return rs.get_messaging().make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(repair_meta_id, dst_cpu_id, addr);
})
, _sink_source_for_put_row_diff(_repair_meta_id, _nr_peer_nodes,
[&rs] (uint32_t repair_meta_id, std::optional<shard_id> dst_cpu_id_opt, netw::messaging_service::msg_addr addr) {
auto dst_cpu_id = dst_cpu_id_opt.value_or(repair_unspecified_shard);
rlogger.debug("put_row_diff: repair_meta_id={} dst_cpu_id={}", repair_meta_id, dst_cpu_id);
return rs.get_messaging().make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(repair_meta_id, dst_cpu_id, addr);
})
, _row_level_repair_ptr(row_level_repair_ptr)
, _repair_hasher(_seed, _schema)
, _compaction_time(compaction_time)
, _is_tablet(cf.uses_tablets())
{
if (master) {
add_to_repair_meta_for_masters(*this);
} else {
add_to_repair_meta_for_followers(*this);
}
assert(all_live_peer_shards.size() == all_live_peer_nodes.size());
_all_node_states.push_back(repair_node_state(_myip, this_shard_id()));
for (unsigned i = 0; i < all_live_peer_nodes.size(); i++) {
_all_node_states.push_back(repair_node_state(all_live_peer_nodes[i], all_live_peer_shards[i].value_or(repair_unspecified_shard)));
}
}
// follower constructor
repair_meta(
repair_service& rs,
replica::column_family& cf,
schema_ptr s,
reader_permit permit,
dht::token_range range,
row_level_diff_detect_algorithm algo,
size_t max_row_buf_size,
uint64_t seed,
repair_master master,
uint32_t repair_meta_id,
streaming::stream_reason reason,
shard_config master_node_shard_config,
inet_address_vector_replica_set all_live_peer_nodes,
gc_clock::time_point compaction_time)
: repair_meta(rs, cf, std::move(s), std::move(permit), std::move(range), algo, max_row_buf_size, seed, master, repair_meta_id, reason,
std::move(master_node_shard_config), std::move(all_live_peer_nodes), 1, {std::nullopt}, nullptr, compaction_time)
{
}
public:
std::optional<shard_id> get_peer_node_dst_cpu_id(uint32_t peer_node_idx) {
assert(peer_node_idx + 1 < all_nodes().size());
return all_nodes()[peer_node_idx + 1].shard;
}
public:
future<> clear_gently() noexcept {
co_await utils::clear_gently(_peer_row_hash_sets);
co_await utils::clear_gently(_working_row_buf);
co_await utils::clear_gently(_row_buf);
}
future<> stop() {
// Handle deferred stop
if (_stopped) {
return _stopped->get_future();
}
promise<> stopped;
_stopped.emplace(stopped.get_future());
auto gate_future = _gate.close();
auto f1 = _sink_source_for_get_full_row_hashes.close();
auto f2 = _sink_source_for_get_row_diff.close();
auto f3 = _sink_source_for_put_row_diff.close();
rlogger.debug("repair_meta::stop");
// move to background. waited on via _stopped->get_future.
when_all_succeed(std::move(gate_future), std::move(f1), std::move(f2), std::move(f3)).discard_result().finally([this] {
return _repair_writer->wait_for_writer_done().finally([this] {
return close().then([this] {
return clear_gently();
});
});
}).forward_to(std::move(stopped));
return _stopped->get_future();
}
void reset_peer_row_hash_sets() {
if (_peer_row_hash_sets.size() != _nr_peer_nodes) {
_peer_row_hash_sets.resize(_nr_peer_nodes);
} else {
for (auto& x : _peer_row_hash_sets) {
x.clear();
}
}
}
repair_hash_set& peer_row_hash_sets(unsigned node_idx) {
return _peer_row_hash_sets[node_idx];
}
// Get a list of row hashes in _working_row_buf
future<repair_hash_set>
working_row_hashes() {
return do_with(repair_hash_set(), [this] (repair_hash_set& hashes) {
return do_for_each(_working_row_buf, [&hashes] (repair_row& r) mutable {
hashes.emplace(r.hash());
}).then([&hashes] () mutable {
return std::move(hashes);
});
});
}
std::pair<std::optional<repair_sync_boundary>, bool>
get_common_sync_boundary(bool zero_rows,
std::vector<repair_sync_boundary>& sync_boundaries,
std::vector<repair_hash>& combined_hashes) {
if (sync_boundaries.empty()) {
throw std::runtime_error("sync_boundaries is empty");
}
if(combined_hashes.empty()) {
throw std::runtime_error("combined_hashes is empty");
}
// Get the smallest sync boundary in the list as the common sync boundary
std::sort(sync_boundaries.begin(), sync_boundaries.end(),
[this] (const auto& a, const auto& b) { return this->_cmp(a, b) < 0; });