forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
messaging_service.cc
1298 lines (1160 loc) · 62.2 KB
/
messaging_service.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/as_future.hh>
#include <seastar/coroutine/exception.hh>
#include "message/messaging_service.hh"
#include <seastar/core/distributed.hh>
#include "gms/failure_detector.hh"
#include "gms/gossiper.hh"
#include "streaming/prepare_message.hh"
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest_ack.hh"
#include "gms/gossip_digest_ack2.hh"
#include "query-request.hh"
#include "query-result.hh"
#include <seastar/rpc/rpc.hh>
#include "canonical_mutation.hh"
#include "schema_mutations.hh"
#include "db/config.hh"
#include "db/view/view_update_backlog.hh"
#include "dht/i_partitioner.hh"
#include "range.hh"
#include "frozen_schema.hh"
#include "repair/repair.hh"
#include "digest_algorithm.hh"
#include "service/paxos/proposal.hh"
#include "service/paxos/prepare_response.hh"
#include "query-request.hh"
#include "mutation_query.hh"
#include "repair/repair.hh"
#include "digest_algorithm.hh"
#include "streaming/stream_reason.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "cache_temperature.hh"
#include "raft/raft.hh"
#include "service/raft/messaging.hh"
#include "service/raft/group0_fwd.hh"
#include "replica/exceptions.hh"
#include "serializer.hh"
#include "full_position.hh"
#include "db/per_partition_rate_limit_info.hh"
#include "idl/consistency_level.dist.hh"
#include "idl/tracing.dist.hh"
#include "idl/result.dist.hh"
#include "idl/reconcilable_result.dist.hh"
#include "idl/ring_position.dist.hh"
#include "idl/keys.dist.hh"
#include "idl/uuid.dist.hh"
#include "idl/frozen_mutation.dist.hh"
#include "idl/frozen_schema.dist.hh"
#include "idl/streaming.dist.hh"
#include "idl/token.dist.hh"
#include "idl/gossip_digest.dist.hh"
#include "idl/read_command.dist.hh"
#include "idl/range.dist.hh"
#include "idl/position_in_partition.dist.hh"
#include "idl/partition_checksum.dist.hh"
#include "idl/query.dist.hh"
#include "idl/cache_temperature.dist.hh"
#include "idl/view.dist.hh"
#include "idl/mutation.dist.hh"
#include "idl/messaging_service.dist.hh"
#include "idl/paxos.dist.hh"
#include "idl/raft_storage.dist.hh"
#include "idl/raft.dist.hh"
#include "idl/group0.dist.hh"
#include "idl/replica_exception.dist.hh"
#include "idl/per_partition_rate_limit_info.dist.hh"
#include "idl/storage_proxy.dist.hh"
#include "message/rpc_protocol_impl.hh"
#include "idl/consistency_level.dist.impl.hh"
#include "idl/tracing.dist.impl.hh"
#include "idl/result.dist.impl.hh"
#include "idl/reconcilable_result.dist.impl.hh"
#include "idl/ring_position.dist.impl.hh"
#include "idl/keys.dist.impl.hh"
#include "idl/uuid.dist.impl.hh"
#include "idl/frozen_mutation.dist.impl.hh"
#include "idl/frozen_schema.dist.impl.hh"
#include "idl/streaming.dist.impl.hh"
#include "idl/token.dist.impl.hh"
#include "idl/gossip_digest.dist.impl.hh"
#include "idl/read_command.dist.impl.hh"
#include "idl/range.dist.impl.hh"
#include "idl/position_in_partition.dist.impl.hh"
#include "idl/query.dist.impl.hh"
#include "idl/cache_temperature.dist.impl.hh"
#include "idl/mutation.dist.impl.hh"
#include "idl/messaging_service.dist.impl.hh"
#include "idl/paxos.dist.impl.hh"
#include "idl/raft_storage.dist.impl.hh"
#include "idl/raft.dist.impl.hh"
#include "idl/group0.dist.impl.hh"
#include "idl/view.dist.impl.hh"
#include "idl/replica_exception.dist.impl.hh"
#include "idl/per_partition_rate_limit_info.dist.impl.hh"
#include "idl/storage_proxy.dist.impl.hh"
#include <seastar/rpc/lz4_compressor.hh>
#include <seastar/rpc/lz4_fragmented_compressor.hh>
#include <seastar/rpc/multi_algo_compressor_factory.hh>
#include "partition_range_compat.hh"
#include <boost/range/adaptor/filtered.hpp>
#include <boost/range/adaptor/indirected.hpp>
#include "frozen_mutation.hh"
#include "streaming/stream_manager.hh"
#include "streaming/stream_mutation_fragments_cmd.hh"
#include "idl/partition_checksum.dist.impl.hh"
#include "idl/forward_request.dist.hh"
#include "idl/forward_request.dist.impl.hh"
namespace netw {
static_assert(!std::is_default_constructible_v<msg_addr>);
static_assert(std::is_nothrow_copy_constructible_v<msg_addr>);
static_assert(std::is_nothrow_move_constructible_v<msg_addr>);
static logging::logger mlogger("messaging_service");
static logging::logger rpc_logger("rpc");
using inet_address = gms::inet_address;
using gossip_digest_syn = gms::gossip_digest_syn;
using gossip_digest_ack = gms::gossip_digest_ack;
using gossip_digest_ack2 = gms::gossip_digest_ack2;
using namespace std::chrono_literals;
static rpc::lz4_fragmented_compressor::factory lz4_fragmented_compressor_factory;
static rpc::lz4_compressor::factory lz4_compressor_factory;
static rpc::multi_algo_compressor_factory compressor_factory {
&lz4_fragmented_compressor_factory,
&lz4_compressor_factory,
};
struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; };
constexpr int32_t messaging_service::current_version;
// Count of connection types that are not associated with any tenant
const size_t PER_SHARD_CONNECTION_COUNT = 2;
// Counts per tenant connection types
const size_t PER_TENANT_CONNECTION_COUNT = 3;
bool operator==(const msg_addr& x, const msg_addr& y) noexcept {
// Ignore cpu id for now since we do not really support shard to shard connections
return x.addr == y.addr;
}
bool operator<(const msg_addr& x, const msg_addr& y) noexcept {
// Ignore cpu id for now since we do not really support shard to shard connections
if (x.addr < y.addr) {
return true;
} else {
return false;
}
}
std::ostream& operator<<(std::ostream& os, const msg_addr& x) {
return os << x.addr << ":" << x.cpu_id;
}
size_t msg_addr::hash::operator()(const msg_addr& id) const noexcept {
// Ignore cpu id for now since we do not really support // shard to shard connections
return std::hash<bytes_view>()(id.addr.bytes());
}
messaging_service::shard_info::shard_info(shared_ptr<rpc_protocol_client_wrapper>&& client, bool topo_ignored)
: rpc_client(std::move(client))
, topology_ignored(topo_ignored)
{
}
rpc::stats messaging_service::shard_info::get_stats() const {
return rpc_client->get_stats();
}
void messaging_service::foreach_client(std::function<void(const msg_addr& id, const shard_info& info)> f) const {
for (unsigned idx = 0; idx < _clients.size(); idx ++) {
for (auto i = _clients[idx].cbegin(); i != _clients[idx].cend(); i++) {
f(i->first, i->second);
}
}
}
void messaging_service::foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const {
for (auto&& s : _server) {
if (s) {
s->foreach_connection([f](const rpc_protocol::server::connection& c) {
f(c.info(), c.get_stats());
});
}
}
}
void messaging_service::increment_dropped_messages(messaging_verb verb) {
_dropped_messages[static_cast<int32_t>(verb)]++;
}
uint64_t messaging_service::get_dropped_messages(messaging_verb verb) const {
return _dropped_messages[static_cast<int32_t>(verb)];
}
const uint64_t* messaging_service::get_dropped_messages() const {
return _dropped_messages;
}
int32_t messaging_service::get_raw_version(const gms::inet_address& endpoint) const {
// FIXME: messaging service versioning
return current_version;
}
bool messaging_service::knows_version(const gms::inet_address& endpoint) const {
// FIXME: messaging service versioning
return true;
}
future<> messaging_service::unregister_handler(messaging_verb verb) {
return _rpc->unregister_handler(verb);
}
messaging_service::messaging_service(gms::inet_address ip, uint16_t port)
: messaging_service(config{std::move(ip), port}, scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr)
{}
static
rpc::resource_limits
rpc_resource_limits(size_t memory_limit) {
rpc::resource_limits limits;
limits.bloat_factor = 3;
limits.basic_request_size = 1000;
limits.max_memory = memory_limit;
return limits;
}
future<> messaging_service::start_listen(locator::shared_token_metadata& stm) {
_token_metadata = &stm;
if (_credentials_builder && !_credentials) {
return _credentials_builder->build_reloadable_server_credentials([](const std::unordered_set<sstring>& files, std::exception_ptr ep) {
if (ep) {
mlogger.warn("Exception loading {}: {}", files, ep);
} else {
mlogger.info("Reloaded {}", files);
}
}).then([this](shared_ptr<seastar::tls::server_credentials> creds) {
_credentials = std::move(creds);
do_start_listen();
});
}
do_start_listen();
return make_ready_future<>();
}
bool messaging_service::is_same_dc(inet_address addr) const {
// It's a "safety check". The token metadata pointer is nullptr before
// the service is start_listen()-ed and after it's being shutdown()-ed.
// No new clients should appear in those period, but if they do it's
// better to classify them somehow. Telling that all endpoints live in
// different DCs/RACKs would make messaging apply the most restrictive
// compression/encryption rules which can be sub-optimal but not bad.
if (_token_metadata == nullptr) {
return false;
}
const auto& topo = _token_metadata->get()->get_topology();
return topo.get_datacenter(addr) == topo.get_datacenter();
}
bool messaging_service::is_same_rack(inet_address addr) const {
// See comment in is_same_dc() about this check
if (_token_metadata == nullptr) {
return false;
}
const auto& topo = _token_metadata->get()->get_topology();
return topo.get_rack(addr) == topo.get_rack();
}
void messaging_service::do_start_listen() {
bool listen_to_bc = _cfg.listen_on_broadcast_address && _cfg.ip != utils::fb_utilities::get_broadcast_address();
rpc::server_options so;
if (_cfg.compress != compress_what::none) {
so.compressor_factory = &compressor_factory;
}
so.load_balancing_algorithm = server_socket::load_balancing_algorithm::port;
// FIXME: we don't set so.tcp_nodelay, because we can't tell at this point whether the connection will come from a
// local or remote datacenter, and whether or not the connection will be used for gossip. We can fix
// the first by wrapping its server_socket, but not the second.
auto limits = rpc_resource_limits(_cfg.rpc_memory_limit);
limits.isolate_connection = [this] (sstring isolation_cookie) {
rpc::isolation_config cfg;
cfg.sched_group = scheduling_group_for_isolation_cookie(isolation_cookie);
return cfg;
};
if (!_server[0] && _cfg.encrypt != encrypt_what::all && _cfg.port) {
auto listen = [&] (const gms::inet_address& a, rpc::streaming_domain_type sdomain) {
so.streaming_domain = sdomain;
so.filter_connection = {};
switch (_cfg.encrypt) {
default:
case encrypt_what::none:
break;
case encrypt_what::dc:
so.filter_connection = [this](const seastar::socket_address& caddr) {
auto addr = get_public_endpoint_for(caddr);
return is_same_dc(addr);
};
break;
case encrypt_what::rack:
so.filter_connection = [this](const seastar::socket_address& caddr) {
auto addr = get_public_endpoint_for(caddr);
return is_same_dc(addr) && is_same_rack(addr);
};
break;
}
auto addr = socket_address{a, _cfg.port};
return std::unique_ptr<rpc_protocol_server_wrapper>(new rpc_protocol_server_wrapper(_rpc->protocol(),
so, addr, limits));
};
_server[0] = listen(_cfg.ip, rpc::streaming_domain_type(0x55AA));
if (listen_to_bc) {
_server[1] = listen(utils::fb_utilities::get_broadcast_address(), rpc::streaming_domain_type(0x66BB));
}
}
if (!_server_tls[0] && _cfg.ssl_port) {
auto listen = [&] (const gms::inet_address& a, rpc::streaming_domain_type sdomain) {
so.filter_connection = {};
so.streaming_domain = sdomain;
return std::unique_ptr<rpc_protocol_server_wrapper>(
[this, &so, &a, limits] () -> std::unique_ptr<rpc_protocol_server_wrapper>{
if (_cfg.encrypt == encrypt_what::none) {
return nullptr;
}
if (!_credentials) {
throw std::invalid_argument("No certificates specified for encrypted service");
}
listen_options lo;
lo.reuse_address = true;
lo.lba = server_socket::load_balancing_algorithm::port;
auto addr = socket_address{a, _cfg.ssl_port};
return std::make_unique<rpc_protocol_server_wrapper>(_rpc->protocol(),
so, seastar::tls::listen(_credentials, addr, lo), limits);
}());
};
_server_tls[0] = listen(_cfg.ip, rpc::streaming_domain_type(0x77CC));
if (listen_to_bc) {
_server_tls[1] = listen(utils::fb_utilities::get_broadcast_address(), rpc::streaming_domain_type(0x88DD));
}
}
// Do this on just cpu 0, to avoid duplicate logs.
if (this_shard_id() == 0) {
if (_server_tls[0]) {
mlogger.info("Starting Encrypted Messaging Service on SSL port {}", _cfg.ssl_port);
}
if (_server_tls[1]) {
mlogger.info("Starting Encrypted Messaging Service on SSL broadcast address {} port {}", utils::fb_utilities::get_broadcast_address(), _cfg.ssl_port);
}
if (_server[0]) {
mlogger.info("Starting Messaging Service on port {}", _cfg.port);
}
if (_server[1]) {
mlogger.info("Starting Messaging Service on broadcast address {} port {}", utils::fb_utilities::get_broadcast_address(), _cfg.port);
}
}
}
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials)
: _cfg(std::move(cfg))
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
, _clients(PER_SHARD_CONNECTION_COUNT + scfg.statement_tenants.size() * PER_TENANT_CONNECTION_COUNT)
, _scheduling_config(scfg)
, _scheduling_info_for_connection_index(initial_scheduling_info())
{
_rpc->set_logger(&rpc_logger);
// this initialization should be done before any handler registration
// this is because register_handler calls to: scheduling_group_for_verb
// which in turn relies on _connection_index_for_tenant to be initialized.
_connection_index_for_tenant.reserve(_scheduling_config.statement_tenants.size());
for (unsigned i = 0; i < _scheduling_config.statement_tenants.size(); ++i) {
_connection_index_for_tenant.push_back({_scheduling_config.statement_tenants[i].sched_group, i});
}
register_handler(this, messaging_verb::CLIENT_ID, [] (rpc::client_info& ci, gms::inet_address broadcast_address, uint32_t src_cpu_id, rpc::optional<uint64_t> max_result_size) {
ci.attach_auxiliary("baddr", broadcast_address);
ci.attach_auxiliary("src_cpu_id", src_cpu_id);
ci.attach_auxiliary("max_result_size", max_result_size.value_or(query::result_memory_limiter::maximum_result_size));
return rpc::no_wait;
});
}
msg_addr messaging_service::get_source(const rpc::client_info& cinfo) {
return msg_addr{
cinfo.retrieve_auxiliary<gms::inet_address>("baddr"),
cinfo.retrieve_auxiliary<uint32_t>("src_cpu_id")
};
}
messaging_service::~messaging_service() = default;
uint16_t messaging_service::port() {
return _cfg.port;
}
gms::inet_address messaging_service::listen_address() {
return _cfg.ip;
}
static future<> stop_servers(std::array<std::unique_ptr<messaging_service::rpc_protocol_server_wrapper>, 2>& servers) {
return parallel_for_each(
servers | boost::adaptors::filtered([] (auto& ptr) { return bool(ptr); }) | boost::adaptors::indirected,
std::mem_fn(&messaging_service::rpc_protocol_server_wrapper::stop));
}
future<> messaging_service::stop_tls_server() {
mlogger.info("Stopping tls server");
return stop_servers(_server_tls).then( [] {
mlogger.info("Stopping tls server - Done");
});
}
future<> messaging_service::stop_nontls_server() {
mlogger.info("Stopping nontls server");
return stop_servers(_server).then([] {
mlogger.info("Stopping nontls server - Done");
});
}
future<> messaging_service::stop_client() {
return parallel_for_each(_clients, [] (auto& m) {
return parallel_for_each(m, [] (std::pair<const msg_addr, shard_info>& c) {
mlogger.info("Stopping client for address: {}", c.first);
return c.second.rpc_client->stop().then([addr = c.first] {
mlogger.info("Stopping client for address: {} - Done", addr);
});
});
});
}
future<> messaging_service::shutdown() {
_shutting_down = true;
co_await when_all(stop_nontls_server(), stop_tls_server(), stop_client()).discard_result();
_token_metadata = nullptr;
}
future<> messaging_service::stop() {
return unregister_handler(messaging_verb::CLIENT_ID).then([this] {
if (_rpc->has_handlers()) {
mlogger.error("RPC server still has handlers registered");
for (auto verb = messaging_verb::MUTATION; verb < messaging_verb::LAST;
verb = messaging_verb(int(verb) + 1)) {
if (_rpc->has_handler(verb)) {
mlogger.error(" - {}", static_cast<int>(verb));
}
}
std::abort();
}
return make_ready_future<>();
});
}
rpc::no_wait_type messaging_service::no_wait() {
return rpc::no_wait;
}
// The verbs using this RPC client use the following connection settings,
// regardless of whether the peer is in the same DC/Rack or not:
// - tcp_nodelay
// - encryption (unless completely disabled in config)
// - compression (unless completely disabled in config)
//
// The reason for having topology-independent setting for encryption is to ensure
// that gossiper verbs can reach the peer, even though the peer may not know our topology yet.
// See #11992 for detailed explanations.
//
// We also always want `tcp_nodelay` for gossiper verbs so they have low latency
// (and there's no advantage from batching verbs in this group anyway).
//
// And since we fixed a topology-independent setting for encryption and tcp_nodelay,
// to keep things simple, we also fix a setting for compression. This allows this RPC client
// to be established without checking the topology (which may not be known anyway
// when we first start gossiping).
static constexpr unsigned TOPOLOGY_INDEPENDENT_IDX = 0;
static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
// *_CONNECTION_COUNT constants needs to be updated after allocating a new index.
switch (verb) {
// GET_SCHEMA_VERSION is sent from read/mutate verbs so should be
// sent on a different connection to avoid potential deadlocks
// as well as reduce latency as there are potentially many requests
// blocked on schema version request.
case messaging_verb::GOSSIP_DIGEST_SYN:
case messaging_verb::GOSSIP_DIGEST_ACK:
case messaging_verb::GOSSIP_DIGEST_ACK2:
case messaging_verb::GOSSIP_SHUTDOWN:
case messaging_verb::GOSSIP_ECHO:
case messaging_verb::GOSSIP_GET_ENDPOINT_STATES:
case messaging_verb::GET_SCHEMA_VERSION:
// Raft peer exchange is mainly running at boot, but still
// should not be blocked by any data requests.
case messaging_verb::GROUP0_PEER_EXCHANGE:
case messaging_verb::GROUP0_MODIFY_CONFIG:
case messaging_verb::GET_GROUP0_UPGRADE_STATE:
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
// only verbs which are 'rare' and 'cheap'.
// DO NOT move GOSSIP_ verbs outside this group.
static_assert(TOPOLOGY_INDEPENDENT_IDX == 0);
return 0;
case messaging_verb::PREPARE_MESSAGE:
case messaging_verb::PREPARE_DONE_MESSAGE:
case messaging_verb::UNUSED__STREAM_MUTATION:
case messaging_verb::STREAM_MUTATION_DONE:
case messaging_verb::COMPLETE_MESSAGE:
case messaging_verb::REPLICATION_FINISHED:
case messaging_verb::UNUSED__REPAIR_CHECKSUM_RANGE:
case messaging_verb::STREAM_MUTATION_FRAGMENTS:
case messaging_verb::REPAIR_ROW_LEVEL_START:
case messaging_verb::REPAIR_ROW_LEVEL_STOP:
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES:
case messaging_verb::REPAIR_GET_COMBINED_ROW_HASH:
case messaging_verb::REPAIR_GET_SYNC_BOUNDARY:
case messaging_verb::REPAIR_GET_ROW_DIFF:
case messaging_verb::REPAIR_PUT_ROW_DIFF:
case messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS:
case messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS:
case messaging_verb::REPAIR_GET_DIFF_ALGORITHMS:
case messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM:
case messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM:
case messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM:
case messaging_verb::REPAIR_UPDATE_SYSTEM_TABLE:
case messaging_verb::REPAIR_FLUSH_HINTS_BATCHLOG:
case messaging_verb::NODE_OPS_CMD:
case messaging_verb::HINT_MUTATION:
return 1;
case messaging_verb::CLIENT_ID:
case messaging_verb::MUTATION:
case messaging_verb::READ_DATA:
case messaging_verb::READ_MUTATION_DATA:
case messaging_verb::READ_DIGEST:
case messaging_verb::DEFINITIONS_UPDATE:
case messaging_verb::TRUNCATE:
case messaging_verb::MIGRATION_REQUEST:
case messaging_verb::SCHEMA_CHECK:
case messaging_verb::COUNTER_MUTATION:
// Use the same RPC client for light weight transaction
// protocol steps as for standard mutations and read requests.
case messaging_verb::PAXOS_PREPARE:
case messaging_verb::PAXOS_ACCEPT:
case messaging_verb::PAXOS_LEARN:
case messaging_verb::PAXOS_PRUNE:
case messaging_verb::RAFT_SEND_SNAPSHOT:
case messaging_verb::RAFT_APPEND_ENTRIES:
case messaging_verb::RAFT_APPEND_ENTRIES_REPLY:
case messaging_verb::RAFT_VOTE_REQUEST:
case messaging_verb::RAFT_VOTE_REPLY:
case messaging_verb::RAFT_TIMEOUT_NOW:
case messaging_verb::RAFT_READ_QUORUM:
case messaging_verb::RAFT_READ_QUORUM_REPLY:
case messaging_verb::RAFT_EXECUTE_READ_BARRIER_ON_LEADER:
case messaging_verb::RAFT_ADD_ENTRY:
case messaging_verb::RAFT_MODIFY_CONFIG:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:
return 3;
case messaging_verb::FORWARD_REQUEST:
return 4;
case messaging_verb::LAST:
return -1; // should never happen
}
}
static constexpr std::array<uint8_t, static_cast<size_t>(messaging_verb::LAST)> make_rpc_client_idx_table() {
std::array<uint8_t, static_cast<size_t>(messaging_verb::LAST)> tab{};
for (size_t i = 0; i < tab.size(); ++i) {
tab[i] = do_get_rpc_client_idx(messaging_verb(i));
// This assert guards against adding new connection types without
// updating *_CONNECTION_COUNT constants.
assert(tab[i] < PER_TENANT_CONNECTION_COUNT + PER_SHARD_CONNECTION_COUNT);
}
return tab;
}
static std::array<uint8_t, static_cast<size_t>(messaging_verb::LAST)> s_rpc_client_idx_table = make_rpc_client_idx_table();
unsigned
messaging_service::get_rpc_client_idx(messaging_verb verb) const {
auto idx = s_rpc_client_idx_table[static_cast<size_t>(verb)];
if (idx < PER_SHARD_CONNECTION_COUNT) {
return idx;
}
// A statement or statement-ack verb
const auto curr_sched_group = current_scheduling_group();
for (unsigned i = 0; i < _connection_index_for_tenant.size(); ++i) {
if (_connection_index_for_tenant[i].sched_group == curr_sched_group) {
// i == 0: the default tenant maps to the default client indexes beloning to the interval
// [PER_SHARD_CONNECTION_COUNT, PER_SHARD_CONNECTION_COUNT + PER_TENANT_CONNECTION_COUNT).
idx += i * PER_TENANT_CONNECTION_COUNT;
break;
}
}
return idx;
}
std::vector<messaging_service::scheduling_info_for_connection_index>
messaging_service::initial_scheduling_info() const {
if (_scheduling_config.statement_tenants.empty()) {
throw std::runtime_error("messaging_service::initial_scheduling_info(): must have at least one tenant configured");
}
auto sched_infos = std::vector<scheduling_info_for_connection_index>({
{ _scheduling_config.gossip, "gossip" },
{ _scheduling_config.streaming, "streaming", },
});
sched_infos.reserve(sched_infos.size() +
_scheduling_config.statement_tenants.size() * PER_TENANT_CONNECTION_COUNT);
for (const auto& tenant : _scheduling_config.statement_tenants) {
sched_infos.push_back({ tenant.sched_group, "statement:" + tenant.name });
sched_infos.push_back({ tenant.sched_group, "statement-ack:" + tenant.name });
sched_infos.push_back({ tenant.sched_group, "forward:" + tenant.name });
}
assert(sched_infos.size() == PER_SHARD_CONNECTION_COUNT +
_scheduling_config.statement_tenants.size() * PER_TENANT_CONNECTION_COUNT);
return sched_infos;
};
scheduling_group
messaging_service::scheduling_group_for_verb(messaging_verb verb) const {
// We are not using get_rpc_client_idx() because it figures out the client
// index based on the current scheduling group, which is relevant when
// selecting the right client for sending a message, but is not relevant
// when registering handlers.
const auto idx = s_rpc_client_idx_table[static_cast<size_t>(verb)];
return _scheduling_info_for_connection_index[idx].sched_group;
}
scheduling_group
messaging_service::scheduling_group_for_isolation_cookie(const sstring& isolation_cookie) const {
// Once per connection, so a loop is fine.
for (auto&& info : _scheduling_info_for_connection_index) {
if (info.isolation_cookie == isolation_cookie) {
return info.sched_group;
}
}
// Client is using a new connection class that the server doesn't recognize yet.
// Assume it's important, after server upgrade we'll recognize it.
return default_scheduling_group();
}
/**
* Get an IP for a given endpoint to connect to
*
* @param ep endpoint to check
*
* @return preferred IP (local) for the given endpoint if exists and if the
* given endpoint resides in the same data center with the current Node.
* Otherwise 'ep' itself is returned.
*/
gms::inet_address messaging_service::get_preferred_ip(gms::inet_address ep) {
auto it = _preferred_ip_cache.find(ep);
if (it != _preferred_ip_cache.end()) {
if (is_same_dc(ep)) {
return it->second;
}
}
// If cache doesn't have an entry for this endpoint - return endpoint itself
return ep;
}
void messaging_service::init_local_preferred_ip_cache(const std::unordered_map<gms::inet_address, gms::inet_address>& ips_cache) {
_preferred_ip_cache = ips_cache;
_preferred_to_endpoint.clear();
//
// Reset the connections to the endpoints that have entries in
// _preferred_ip_cache so that they reopen with the preferred IPs we've
// just read.
//
for (auto& p : _preferred_ip_cache) {
this->remove_rpc_client(msg_addr(p.first));
_preferred_to_endpoint[p.second] = p.first;
}
}
void messaging_service::cache_preferred_ip(gms::inet_address ep, gms::inet_address ip) {
_preferred_ip_cache[ep] = ip;
_preferred_to_endpoint[ip] = ep;
}
gms::inet_address messaging_service::get_public_endpoint_for(const gms::inet_address& ip) const {
auto i = _preferred_to_endpoint.find(ip);
return i != _preferred_to_endpoint.end() ? i->second : ip;
}
shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::get_rpc_client(messaging_verb verb, msg_addr id) {
assert(!_shutting_down);
auto idx = get_rpc_client_idx(verb);
auto it = _clients[idx].find(id);
if (it != _clients[idx].end()) {
auto c = it->second.rpc_client;
if (!c->error()) {
return c;
}
// The 'dead_only' it should be true, because we're interested in
// dropping the errored socket, but since it's errored anyway (the
// above if) it's false to save unneeded second c->error() call
find_and_remove_client(_clients[idx], id, [] (const auto&) { return true; });
}
auto broadcast_address = utils::fb_utilities::get_broadcast_address();
bool listen_to_bc = _cfg.listen_on_broadcast_address && _cfg.ip != broadcast_address;
auto laddr = socket_address(listen_to_bc ? broadcast_address : _cfg.ip, 0);
std::optional<bool> topology_status;
auto has_topology = [&] {
if (!topology_status.has_value()) {
topology_status = _token_metadata ? _token_metadata->get()->get_topology().has_endpoint(id.addr) : false;
}
return *topology_status;
};
auto must_encrypt = [&] {
if (_cfg.encrypt == encrypt_what::none) {
return false;
}
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
if (_cfg.encrypt == encrypt_what::all || idx == TOPOLOGY_INDEPENDENT_IDX) {
return true;
}
// either rack/dc need to be in same dc to use non-tls
if (!has_topology() || !is_same_dc(id.addr)) {
return true;
}
// #9653 - if our idea of dc for bind address differs from our official endpoint address,
// we cannot trust downgrading. We need to ensure either (local) bind address is same as
// broadcast or that the dc info we get for it is the same.
if (broadcast_address != laddr && !is_same_dc(laddr)) {
return true;
}
// if cross-rack tls, check rack.
if (_cfg.encrypt == encrypt_what::dc) {
return false;
}
if (!is_same_rack(id.addr)) {
return true;
}
// See above: We need to ensure either (local) bind address is same as
// broadcast or that the rack info we get for it is the same.
return broadcast_address != laddr && !is_same_rack(laddr);
}();
auto must_compress = [&] {
if (_cfg.compress == compress_what::none) {
return false;
}
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
if (_cfg.compress == compress_what::all || idx == TOPOLOGY_INDEPENDENT_IDX) {
return true;
}
return !has_topology() || !is_same_dc(id.addr);
}();
auto must_tcp_nodelay = [&] {
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
if (_cfg.tcp_nodelay == tcp_nodelay_what::all || idx == TOPOLOGY_INDEPENDENT_IDX) {
return true;
}
return !has_topology() || is_same_dc(id.addr);
}();
auto addr = get_preferred_ip(id.addr);
auto remote_addr = socket_address(addr, must_encrypt ? _cfg.ssl_port : _cfg.port);
rpc::client_options opts;
// send keepalive messages each minute if connection is idle, drop connection after 10 failures
opts.keepalive = std::optional<net::tcp_keepalive_params>({60s, 60s, 10});
if (must_compress) {
opts.compressor_factory = &compressor_factory;
}
opts.tcp_nodelay = must_tcp_nodelay;
opts.reuseaddr = true;
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
auto client = must_encrypt ?
::make_shared<rpc_protocol_client_wrapper>(_rpc->protocol(), std::move(opts),
remote_addr, laddr, _credentials) :
::make_shared<rpc_protocol_client_wrapper>(_rpc->protocol(), std::move(opts),
remote_addr, laddr);
// Remember if we had the peer's topology information when creating the client;
// if not, we shall later drop the client and create a new one after we learn the peer's
// topology (so we can use optimal encryption settings and so on for intra-dc/rack messages).
// But we don't want to apply this logic for TOPOLOGY_INDEPENDENT_IDX client - its settings
// are independent of topology, so there's no point in dropping it later after we learn
// the topology (so we always set `topology_ignored` to `false` in that case).
bool topology_ignored = idx != TOPOLOGY_INDEPENDENT_IDX && topology_status.has_value() && *topology_status == false;
auto res = _clients[idx].emplace(id, shard_info(std::move(client), topology_ignored));
assert(res.second);
it = res.first;
uint32_t src_cpu_id = this_shard_id();
// No reply is received, nothing to wait for.
(void)_rpc->make_client<rpc::no_wait_type(gms::inet_address, uint32_t, uint64_t)>(messaging_verb::CLIENT_ID)(*it->second.rpc_client, utils::fb_utilities::get_broadcast_address(), src_cpu_id,
query::result_memory_limiter::maximum_result_size).handle_exception([ms = shared_from_this(), remote_addr, verb] (std::exception_ptr ep) {
mlogger.debug("Failed to send client id to {} for verb {}: {}", remote_addr, std::underlying_type_t<messaging_verb>(verb), ep);
});
return it->second.rpc_client;
}
template <typename Fn>
requires std::is_invocable_r_v<bool, Fn, const messaging_service::shard_info&>
void messaging_service::find_and_remove_client(clients_map& clients, msg_addr id, Fn&& filter) {
if (_shutting_down) {
// if messaging service is in a processed of been stopped no need to
// stop and remove connection here since they are being stopped already
// and we'll just interfere
return;
}
auto it = clients.find(id);
if (it != clients.end() && filter(it->second)) {
auto client = std::move(it->second.rpc_client);
clients.erase(it);
//
// Explicitly call rpc_protocol_client_wrapper::stop() for the erased
// item and hold the messaging_service shared pointer till it's over.
// This will make sure messaging_service::stop() blocks until
// client->stop() is over.
//
(void)client->stop().finally([id, client, ms = shared_from_this()] {
mlogger.debug("dropped connection to {}", id.addr);
}).discard_result();
_connection_dropped(id.addr);
}
}
void messaging_service::remove_error_rpc_client(messaging_verb verb, msg_addr id) {
find_and_remove_client(_clients[get_rpc_client_idx(verb)], id, [] (const auto& s) { return s.rpc_client->error(); });
}
void messaging_service::remove_rpc_client(msg_addr id) {
for (auto& c : _clients) {
find_and_remove_client(c, id, [] (const auto&) { return true; });
}
}
void messaging_service::remove_rpc_client_with_ignored_topology(msg_addr id) {
for (auto& c : _clients) {
find_and_remove_client(c, id, [] (const auto& s) { return s.topology_ignored; });
}
}
std::unique_ptr<messaging_service::rpc_protocol_wrapper>& messaging_service::rpc() {
return _rpc;
}
rpc::sink<int32_t> messaging_service::make_sink_for_stream_mutation_fragments(rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>>& source) {
return source.make_sink<netw::serializer, int32_t>();
}
future<std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>>
messaging_service::make_sink_and_source_for_stream_mutation_fragments(table_schema_version schema_id, streaming::plan_id plan_id, table_id cf_id, uint64_t estimated_partitions, streaming::stream_reason reason, msg_addr id) {
using value_type = std::tuple<rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>, rpc::source<int32_t>>;
if (is_shutting_down()) {
return make_exception_future<value_type>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(messaging_verb::STREAM_MUTATION_FRAGMENTS, id);
return rpc_client->make_stream_sink<netw::serializer, frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, rpc_client] (rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (streaming::plan_id, table_schema_version, table_id, uint64_t, streaming::stream_reason, rpc::sink<frozen_mutation_fragment, streaming::stream_mutation_fragments_cmd>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(*rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink).then_wrapped([sink, rpc_client] (future<rpc::source<int32_t>> source) mutable {
return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable {
return make_ready_future<value_type>(value_type(std::move(sink), source.get0()));
});
});
});
}
void messaging_service::register_stream_mutation_fragments(std::function<future<rpc::sink<int32_t>> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional<streaming::stream_reason>, rpc::source<frozen_mutation_fragment, rpc::optional<streaming::stream_mutation_fragments_cmd>> source)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_FRAGMENTS, std::move(func));
}
future<> messaging_service::unregister_stream_mutation_fragments() {
return unregister_handler(messaging_verb::STREAM_MUTATION_FRAGMENTS);
}
template<class SinkType, class SourceType>
future<std::tuple<rpc::sink<SinkType>, rpc::source<SourceType>>>
do_make_sink_source(messaging_verb verb, uint32_t repair_meta_id, shared_ptr<messaging_service::rpc_protocol_client_wrapper> rpc_client, std::unique_ptr<messaging_service::rpc_protocol_wrapper>& rpc) {
using value_type = std::tuple<rpc::sink<SinkType>, rpc::source<SourceType>>;
auto sink = co_await rpc_client->make_stream_sink<netw::serializer, SinkType>();
auto rpc_handler = rpc->make_client<rpc::source<SourceType> (uint32_t, rpc::sink<SinkType>)>(verb);
auto source_fut = co_await coroutine::as_future(rpc_handler(*rpc_client, repair_meta_id, sink));
if (source_fut.failed()) {
auto ex = source_fut.get_exception();
try {
co_await sink.close();
} catch (...) {
std::throw_with_nested(std::move(ex));
}
co_return coroutine::exception(std::move(ex));
}
co_return value_type(std::move(sink), std::move(source_fut.get0()));
}
// Wrapper for REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM
future<std::tuple<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>>
messaging_service::make_sink_and_source_for_repair_get_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
auto verb = messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM;
if (is_shutting_down()) {
return make_exception_future<std::tuple<rpc::sink<repair_hash_with_cmd>, rpc::source<repair_row_on_wire_with_cmd>>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(verb, id);
return do_make_sink_source<repair_hash_with_cmd, repair_row_on_wire_with_cmd>(verb, repair_meta_id, std::move(rpc_client), rpc());
}
rpc::sink<repair_row_on_wire_with_cmd> messaging_service::make_sink_for_repair_get_row_diff_with_rpc_stream(rpc::source<repair_hash_with_cmd>& source) {
return source.make_sink<netw::serializer, repair_row_on_wire_with_cmd>();
}
void messaging_service::register_repair_get_row_diff_with_rpc_stream(std::function<future<rpc::sink<repair_row_on_wire_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_hash_with_cmd> source)>&& func) {
register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM, std::move(func));
}
future<> messaging_service::unregister_repair_get_row_diff_with_rpc_stream() {
return unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF_WITH_RPC_STREAM);
}
// Wrapper for REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM
future<std::tuple<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>>>
messaging_service::make_sink_and_source_for_repair_put_row_diff_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
auto verb = messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM;
if (is_shutting_down()) {
return make_exception_future<std::tuple<rpc::sink<repair_row_on_wire_with_cmd>, rpc::source<repair_stream_cmd>>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(verb, id);
return do_make_sink_source<repair_row_on_wire_with_cmd, repair_stream_cmd>(verb, repair_meta_id, std::move(rpc_client), rpc());
}
rpc::sink<repair_stream_cmd> messaging_service::make_sink_for_repair_put_row_diff_with_rpc_stream(rpc::source<repair_row_on_wire_with_cmd>& source) {
return source.make_sink<netw::serializer, repair_stream_cmd>();
}
void messaging_service::register_repair_put_row_diff_with_rpc_stream(std::function<future<rpc::sink<repair_stream_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_row_on_wire_with_cmd> source)>&& func) {
register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM, std::move(func));
}
future<> messaging_service::unregister_repair_put_row_diff_with_rpc_stream() {
return unregister_handler(messaging_verb::REPAIR_PUT_ROW_DIFF_WITH_RPC_STREAM);
}
// Wrapper for REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM
future<std::tuple<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>>>
messaging_service::make_sink_and_source_for_repair_get_full_row_hashes_with_rpc_stream(uint32_t repair_meta_id, msg_addr id) {
auto verb = messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM;
if (is_shutting_down()) {
return make_exception_future<std::tuple<rpc::sink<repair_stream_cmd>, rpc::source<repair_hash_with_cmd>>>(rpc::closed_error());
}
auto rpc_client = get_rpc_client(verb, id);
return do_make_sink_source<repair_stream_cmd, repair_hash_with_cmd>(verb, repair_meta_id, std::move(rpc_client), rpc());
}
rpc::sink<repair_hash_with_cmd> messaging_service::make_sink_for_repair_get_full_row_hashes_with_rpc_stream(rpc::source<repair_stream_cmd>& source) {
return source.make_sink<netw::serializer, repair_hash_with_cmd>();
}
void messaging_service::register_repair_get_full_row_hashes_with_rpc_stream(std::function<future<rpc::sink<repair_hash_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_stream_cmd> source)>&& func) {
register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM, std::move(func));
}
future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_stream() {
return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM);
}
// Wrappers for verbs
// PREPARE_MESSAGE
void messaging_service::register_prepare_message(std::function<future<streaming::prepare_message> (const rpc::client_info& cinfo,
streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional<streaming::stream_reason> reason)>&& func) {
register_handler(this, messaging_verb::PREPARE_MESSAGE, std::move(func));