-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
storage_service.cc
3888 lines (3519 loc) · 194 KB
/
storage_service.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#include "storage_service.hh"
#include "dht/boot_strapper.hh"
#include <seastar/core/distributed.hh>
#include <seastar/util/defer.hh>
#include "locator/snitch_base.hh"
#include "locator/production_snitch_base.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/consistency_level.hh"
#include <seastar/core/smp.hh>
#include "utils/UUID.hh"
#include "gms/inet_address.hh"
#include "log.hh"
#include "service/migration_manager.hh"
#include "service/raft/raft_group0.hh"
#include "to_string.hh"
#include "gms/gossiper.hh"
#include "gms/failure_detector.hh"
#include "gms/feature_service.hh"
#include <seastar/core/thread.hh>
#include <sstream>
#include <algorithm>
#include "locator/local_strategy.hh"
#include "version.hh"
#include "unimplemented.hh"
#include "streaming/stream_plan.hh"
#include "streaming/stream_state.hh"
#include "dht/range_streamer.hh"
#include <boost/range/adaptors.hpp>
#include <boost/range/algorithm.hpp>
#include "service/load_broadcaster.hh"
#include "transport/server.hh"
#include <seastar/core/rwlock.hh>
#include "db/batchlog_manager.hh"
#include "db/commitlog/commitlog.hh"
#include "db/hints/manager.hh"
#include "utils/exceptions.hh"
#include "message/messaging_service.hh"
#include "supervisor.hh"
#include "compaction/compaction_manager.hh"
#include "sstables/sstables.hh"
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "replica/database.hh"
#include <seastar/core/metrics.hh>
#include "cdc/generation.hh"
#include "cdc/generation_service.hh"
#include "repair/repair.hh"
#include "repair/row_level.hh"
#include "service/priority_manager.hh"
#include "utils/generation-number.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/as_future.hh>
#include "utils/stall_free.hh"
#include "utils/error_injection.hh"
#include "locator/util.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/trim_all.hpp>
using token = dht::token;
using UUID = utils::UUID;
using inet_address = gms::inet_address;
extern logging::logger cdc_log;
namespace service {
static logging::logger slogger("storage_service");
storage_service::storage_service(abort_source& abort_source,
distributed<replica::database>& db, gms::gossiper& gossiper,
sharded<db::system_keyspace>& sys_ks,
gms::feature_service& feature_service,
sharded<service::migration_manager>& mm,
locator::shared_token_metadata& stm,
locator::effective_replication_map_factory& erm_factory,
sharded<netw::messaging_service>& ms,
sharded<repair_service>& repair,
sharded<streaming::stream_manager>& stream_manager,
endpoint_lifecycle_notifier& elc_notif,
sharded<db::batchlog_manager>& bm,
sharded<locator::snitch_ptr>& snitch)
: _abort_source(abort_source)
, _feature_service(feature_service)
, _db(db)
, _gossiper(gossiper)
, _messaging(ms)
, _migration_manager(mm)
, _repair(repair)
, _stream_manager(stream_manager)
, _snitch(snitch)
, _node_ops_abort_thread(node_ops_abort_thread())
, _shared_token_metadata(stm)
, _erm_factory(erm_factory)
, _lifecycle_notifier(elc_notif)
, _batchlog_manager(bm)
, _sys_ks(sys_ks)
, _snitch_reconfigure([this] {
return container().invoke_on(0, [] (auto& ss) {
return ss.snitch_reconfigured();
});
})
{
register_metrics();
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(sstable_read_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(sstable_write_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(general_disk_error.connect([this] { do_isolate_on_error(disk_error::regular); }))));
_listeners.emplace_back(make_lw_shared(bs2::scoped_connection(commit_error.connect([this] { do_isolate_on_error(disk_error::commit); }))));
if (_snitch.local_is_initialized()) {
_listeners.emplace_back(make_lw_shared(_snitch.local()->when_reconfigured(_snitch_reconfigure)));
}
}
enum class node_external_status {
UNKNOWN = 0,
STARTING = 1,
JOINING = 2,
NORMAL = 3,
LEAVING = 4,
DECOMMISSIONED = 5,
DRAINING = 6,
DRAINED = 7,
MOVING = 8 //deprecated
};
static node_external_status map_operation_mode(storage_service::mode m) {
switch (m) {
case storage_service::mode::NONE: return node_external_status::STARTING;
case storage_service::mode::STARTING: return node_external_status::STARTING;
case storage_service::mode::BOOTSTRAP: return node_external_status::JOINING;
case storage_service::mode::JOINING: return node_external_status::JOINING;
case storage_service::mode::NORMAL: return node_external_status::NORMAL;
case storage_service::mode::LEAVING: return node_external_status::LEAVING;
case storage_service::mode::DECOMMISSIONED: return node_external_status::DECOMMISSIONED;
case storage_service::mode::DRAINING: return node_external_status::DRAINING;
case storage_service::mode::DRAINED: return node_external_status::DRAINED;
case storage_service::mode::MOVING: return node_external_status::MOVING;
}
return node_external_status::UNKNOWN;
}
void storage_service::register_metrics() {
if (this_shard_id() != 0) {
// the relevant data is distributed between the shards,
// We only need to register it once.
return;
}
namespace sm = seastar::metrics;
_metrics.add_group("node", {
sm::make_gauge("operation_mode", sm::description("The operation mode of the current node. UNKNOWN = 0, STARTING = 1, JOINING = 2, NORMAL = 3, "
"LEAVING = 4, DECOMMISSIONED = 5, DRAINING = 6, DRAINED = 7, MOVING = 8"), [this] {
return static_cast<std::underlying_type_t<node_external_status>>(map_operation_mode(_operation_mode));
}),
});
}
bool storage_service::is_replacing() {
const auto& cfg = _db.local().get_config();
if (!cfg.replace_node_first_boot().empty()) {
if (_sys_ks.local().bootstrap_complete()) {
slogger.info("Replace node on first boot requested; this node is already bootstrapped");
return false;
}
return true;
}
if (!cfg.replace_address_first_boot().empty()) {
if (_sys_ks.local().bootstrap_complete()) {
slogger.info("Replace address on first boot requested; this node is already bootstrapped");
return false;
}
return true;
}
// Returning true if cfg.replace_address is provided
// will trigger an exception down the road if bootstrap_complete(),
// as it is an error to use this option post bootstrap.
// That said, we should just stop supporting it and force users
// to move to the new, replace_node_first_boot config option.
return !cfg.replace_address().empty();
}
bool storage_service::is_first_node() {
if (is_replacing()) {
return false;
}
auto seeds = _gossiper.get_seeds();
if (seeds.empty()) {
return false;
}
// Node with the smallest IP address is chosen as the very first node
// in the cluster. The first node is the only node that does not
// bootstrap in the cluser. All other nodes will bootstrap.
std::vector<gms::inet_address> sorted_seeds(seeds.begin(), seeds.end());
std::sort(sorted_seeds.begin(), sorted_seeds.end());
if (sorted_seeds.front() == get_broadcast_address()) {
slogger.info("I am the first node in the cluster. Skip bootstrap. Node={}", get_broadcast_address());
return true;
}
return false;
}
bool storage_service::should_bootstrap() {
return !_sys_ks.local().bootstrap_complete() && !is_first_node();
}
/* Broadcasts the chosen tokens through gossip,
* together with a CDC generation timestamp and STATUS=NORMAL.
*
* Assumes that no other functions modify CDC_GENERATION_ID, TOKENS or STATUS
* in the gossiper's local application state while this function runs.
*/
static future<> set_gossip_tokens(gms::gossiper& g,
const std::unordered_set<dht::token>& tokens, std::optional<cdc::generation_id> cdc_gen_id) {
assert(!tokens.empty());
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
return g.add_local_application_state({
{ gms::application_state::TOKENS, gms::versioned_value::tokens(tokens) },
{ gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(cdc_gen_id) },
{ gms::application_state::STATUS, gms::versioned_value::normal(tokens) }
});
}
/*
* The helper waits for two things
* 1) for schema agreement
* 2) there's no pending node operations
* before proceeding with the bootstrap or replace
*/
future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds delay) {
// first sleep the delay to make sure we see *at least* one other node
for (int i = 0; i < delay.count() && _gossiper.get_live_members().size() < 2; i += 1000) {
co_await sleep_abortable(std::chrono::seconds(1), _abort_source);
}
auto t = gms::gossiper::clk::now();
while (true) {
while (!_migration_manager.local().have_schema_agreement()) {
slogger.info("waiting for schema information to complete");
co_await sleep_abortable(std::chrono::seconds(1), _abort_source);
}
co_await update_pending_ranges("joining");
auto tmptr = get_token_metadata_ptr();
if (!_db.local().get_config().consistent_rangemovement() ||
(tmptr->get_bootstrap_tokens().empty() && tmptr->get_leaving_endpoints().empty())) {
break;
}
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(gms::gossiper::clk::now() - t).count();
slogger.info("Checking bootstrapping/leaving nodes: tokens {}, leaving {}, sleep 1 second and check again ({} seconds elapsed)",
tmptr->get_bootstrap_tokens().size(),
tmptr->get_leaving_endpoints().size(),
elapsed);
if (gms::gossiper::clk::now() > t + std::chrono::seconds(60)) {
throw std::runtime_error("Other bootstrapping/leaving nodes detected, cannot bootstrap while consistent_rangemovement is true");
}
co_await sleep_abortable(std::chrono::seconds(1), _abort_source);
}
slogger.info("Checking bootstrapping/leaving nodes: ok");
}
future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_service,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<service::storage_proxy>& proxy,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
std::chrono::milliseconds delay) {
std::unordered_set<token> bootstrap_tokens;
std::map<gms::application_state, gms::versioned_value> app_states;
/* The timestamp of the CDC streams generation that this node has proposed when joining.
* This value is nullopt only when:
* 1. this node is being upgraded from a non-CDC version,
* 2. this node is starting for the first time or restarting with CDC previously disabled,
* in which case the value should become populated before we leave the join_token_ring procedure.
*
* Important: this variable is using only during the startup procedure. It is moved out from
* at the end of `join_token_ring`; the responsibility handling of CDC generations is passed
* to cdc::generation_service.
*
* DO NOT use this variable after `join_token_ring` (i.e. after we call `generation_service::after_join`
* and pass it the ownership of the timestamp.
*/
std::optional<cdc::generation_id> cdc_gen_id;
if (_sys_ks.local().was_decommissioned()) {
if (_db.local().get_config().override_decommission()) {
slogger.warn("This node was decommissioned, but overriding by operator request.");
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED);
} else {
auto msg = sstring("This node was decommissioned and will not rejoin the ring unless override_decommission=true has been set,"
"or all existing data is removed and the node is bootstrapped again");
slogger.error("{}", msg);
throw std::runtime_error(msg);
}
}
bool replacing_a_node_with_same_ip = false;
bool replacing_a_node_with_diff_ip = false;
std::optional<replacement_info> ri;
std::optional<gms::inet_address> replace_address;
std::optional<locator::host_id> replaced_host_id;
std::optional<raft_group0::replace_info> raft_replace_info;
auto tmlock = std::make_unique<token_metadata_lock>(co_await get_token_metadata_lock());
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (is_replacing()) {
if (_sys_ks.local().bootstrap_complete()) {
throw std::runtime_error("Cannot replace address with a node that is already bootstrapped");
}
ri = co_await prepare_replacement_info(initial_contact_nodes, loaded_peer_features);
bootstrap_tokens = std::move(ri->tokens);
replace_address = ri->address;
replacing_a_node_with_same_ip = *replace_address == get_broadcast_address();
replacing_a_node_with_diff_ip = *replace_address != get_broadcast_address();
slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}",
get_broadcast_address() == *replace_address ? "the same" : "a different",
get_broadcast_address(), *replace_address);
tmptr->update_topology(*replace_address, std::move(ri->dc_rack));
co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address);
replaced_host_id = ri->host_id;
raft_replace_info = raft_group0::replace_info {
.ip_addr = *replace_address,
.raft_id = raft::server_id{ri->host_id.uuid()},
};
} else if (should_bootstrap()) {
co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features);
} else {
auto local_features = _feature_service.supported_feature_set();
slogger.info("Checking remote features with gossip, initial_contact_nodes={}", initial_contact_nodes);
co_await _gossiper.do_shadow_round(initial_contact_nodes);
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
_gossiper.check_snitch_name_matches(_snitch.local()->get_name());
// Check if the node is already removed from the cluster
auto local_host_id = _db.local().get_config().host_id;
auto my_ip = get_broadcast_address();
if (!_gossiper.is_safe_for_restart(my_ip, local_host_id)) {
throw std::runtime_error(fmt::format("The node {} with host_id {} is removed from the cluster. Can not restart the removed node to join the cluster again!",
my_ip, local_host_id));
}
co_await _gossiper.reset_endpoint_state_map();
for (auto ep : loaded_endpoints) {
co_await _gossiper.add_saved_endpoint(ep);
}
}
auto features = _feature_service.supported_feature_set();
slogger.info("Save advertised features list in the 'system.{}' table", db::system_keyspace::LOCAL);
// Save the advertised feature set to system.local table after
// all remote feature checks are complete and after gossip shadow rounds are done.
// At this point, the final feature set is already determined before the node joins the ring.
co_await db::system_keyspace::save_local_supported_features(features);
// If this is a restarting node, we should update tokens before gossip starts
auto my_tokens = co_await _sys_ks.local().get_saved_tokens();
bool restarting_normal_node = _sys_ks.local().bootstrap_complete() && !is_replacing() && !my_tokens.empty();
if (restarting_normal_node) {
slogger.info("Restarting a node in NORMAL status");
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore we update _token_metadata now, before gossip starts.
tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack());
co_await tmptr->update_normal_tokens(my_tokens, get_broadcast_address());
cdc_gen_id = co_await _sys_ks.local().get_cdc_generation_id();
if (!cdc_gen_id) {
// We could not have completed joining if we didn't generate and persist a CDC streams timestamp,
// unless we are restarting after upgrading from non-CDC supported version.
// In that case we won't begin a CDC generation: it should be done by one of the nodes
// after it learns that it everyone supports the CDC feature.
cdc_log.warn(
"Restarting node in NORMAL status with CDC enabled, but no streams timestamp was proposed"
" by this node according to its local tables. Are we upgrading from a non-CDC supported version?");
}
}
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
// Seed the host ID-to-endpoint map with our own ID.
auto local_host_id = _db.local().get_config().host_id;
if (!replacing_a_node_with_diff_ip) {
auto endpoint = get_broadcast_address();
auto eps = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
if (eps) {
auto replace_host_id = _gossiper.get_host_id(get_broadcast_address());
slogger.info("Host {}/{} is replacing {}/{} using the same address", local_host_id, endpoint, replace_host_id, endpoint);
}
tmptr->update_host_id(local_host_id, get_broadcast_address());
}
// Replicate the tokens early because once gossip runs other nodes
// might send reads/writes to this node. Replicate it early to make
// sure the tokens are valid on all the shards.
co_await replicate_to_all_cores(std::move(tmptr));
tmlock.reset();
auto broadcast_rpc_address = utils::fb_utilities::get_broadcast_rpc_address();
// Ensure we know our own actual Schema UUID in preparation for updates
co_await db::schema_tables::recalculate_schema_version(_sys_ks, proxy, _feature_service);
app_states.emplace(gms::application_state::NET_VERSION, versioned_value::network_version());
app_states.emplace(gms::application_state::HOST_ID, versioned_value::host_id(local_host_id));
app_states.emplace(gms::application_state::RPC_ADDRESS, versioned_value::rpcaddress(broadcast_rpc_address));
app_states.emplace(gms::application_state::RELEASE_VERSION, versioned_value::release_version());
app_states.emplace(gms::application_state::SUPPORTED_FEATURES, versioned_value::supported_features(features));
app_states.emplace(gms::application_state::CACHE_HITRATES, versioned_value::cache_hitrates(""));
app_states.emplace(gms::application_state::SCHEMA_TABLES_VERSION, versioned_value(db::schema_tables::version));
app_states.emplace(gms::application_state::RPC_READY, versioned_value::cql_ready(false));
app_states.emplace(gms::application_state::VIEW_BACKLOG, versioned_value(""));
app_states.emplace(gms::application_state::SCHEMA, versioned_value::schema(_db.local().get_version()));
if (restarting_normal_node) {
// Order is important: both the CDC streams timestamp and tokens must be known when a node handles our status.
// Exception: there might be no CDC streams timestamp proposed by us if we're upgrading from a non-CDC version.
app_states.emplace(gms::application_state::TOKENS, versioned_value::tokens(my_tokens));
app_states.emplace(gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(cdc_gen_id));
app_states.emplace(gms::application_state::STATUS, versioned_value::normal(my_tokens));
}
if (replacing_a_node_with_same_ip || replacing_a_node_with_diff_ip) {
app_states.emplace(gms::application_state::TOKENS, versioned_value::tokens(bootstrap_tokens));
}
app_states.emplace(gms::application_state::SNITCH_NAME, versioned_value::snitch_name(_snitch.local()->get_name()));
app_states.emplace(gms::application_state::SHARD_COUNT, versioned_value::shard_count(smp::count));
app_states.emplace(gms::application_state::IGNORE_MSB_BITS, versioned_value::ignore_msb_bits(_db.local().get_config().murmur3_partitioner_ignore_msb_bits()));
for (auto&& s : _snitch.local()->get_app_states()) {
app_states.emplace(s.first, std::move(s.second));
}
slogger.info("Starting up server gossip");
auto generation_number = co_await _sys_ks.local().increment_and_get_generation();
auto advertise = gms::advertise_myself(!replacing_a_node_with_same_ip);
co_await _gossiper.start_gossiping(generation_number, app_states, advertise);
assert(_group0);
co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, raft_replace_info);
auto schema_change_announce = _db.local().observable_schema_version().observe([this] (table_schema_version schema_version) mutable {
_migration_manager.local().passive_announce(std::move(schema_version));
});
_listeners.emplace_back(make_lw_shared(std::move(schema_change_announce)));
co_await _gossiper.wait_for_gossip_to_settle();
set_mode(mode::JOINING);
// We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed.
// If we are a seed, or if the user manually sets auto_bootstrap to false,
// we'll skip streaming data from other nodes and jump directly into the ring.
//
// The seed check allows us to skip the RING_DELAY sleep for the single-node cluster case,
// which is useful for both new users and testing.
//
// We attempted to replace this with a schema-presence check, but you need a meaningful sleep
// to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details.
if (should_bootstrap()) {
bool resume_bootstrap = _sys_ks.local().bootstrap_in_progress();
if (resume_bootstrap) {
slogger.warn("Detected previous bootstrap failure; retrying");
} else {
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::IN_PROGRESS);
}
slogger.info("waiting for ring information");
// if our schema hasn't matched yet, keep sleeping until it does
// (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
co_await wait_for_ring_to_settle(delay);
if (!replace_address) {
auto tmptr = get_token_metadata_ptr();
if (tmptr->is_normal_token_owner(get_broadcast_address())) {
throw std::runtime_error("This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)");
}
slogger.info("getting bootstrap token");
if (resume_bootstrap) {
bootstrap_tokens = co_await _sys_ks.local().get_saved_tokens();
if (!bootstrap_tokens.empty()) {
slogger.info("Using previously saved tokens = {}", bootstrap_tokens);
} else {
bootstrap_tokens = boot_strapper::get_bootstrap_tokens(tmptr, _db.local().get_config(), dht::check_token_endpoint::yes);
}
} else {
bootstrap_tokens = boot_strapper::get_bootstrap_tokens(tmptr, _db.local().get_config(), dht::check_token_endpoint::yes);
}
} else {
if (*replace_address != get_broadcast_address()) {
// Sleep additionally to make sure that the server actually is not alive
// and giving it more time to gossip if alive.
slogger.info("Sleeping before replacing {}...", *replace_address);
co_await sleep_abortable(2 * get_ring_delay(), _abort_source);
// check for operator errors...
const auto tmptr = get_token_metadata_ptr();
for (auto token : bootstrap_tokens) {
auto existing = tmptr->get_endpoint(token);
if (existing) {
auto* eps = _gossiper.get_endpoint_state_for_endpoint_ptr(*existing);
if (eps && eps->get_update_timestamp() > gms::gossiper::clk::now() - delay) {
throw std::runtime_error("Cannot replace a live node...");
}
} else {
throw std::runtime_error(format("Cannot replace token {} which does not exist!", token));
}
}
} else {
slogger.info("Sleeping before replacing {}...", *replace_address);
co_await sleep_abortable(get_ring_delay(), _abort_source);
}
slogger.info("Replacing a node with token(s): {}", bootstrap_tokens);
// bootstrap_tokens was previously set using tokens gossiped by the replaced node
}
co_await sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
co_await mark_existing_views_as_built(sys_dist_ks);
co_await _sys_ks.local().update_tokens(bootstrap_tokens);
co_await bootstrap(cdc_gen_service, bootstrap_tokens, cdc_gen_id, ri);
} else {
supervisor::notify("starting system distributed keyspace");
co_await sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
bootstrap_tokens = co_await _sys_ks.local().get_saved_tokens();
if (bootstrap_tokens.empty()) {
bootstrap_tokens = boot_strapper::get_bootstrap_tokens(get_token_metadata_ptr(), _db.local().get_config(), dht::check_token_endpoint::no);
co_await _sys_ks.local().update_tokens(bootstrap_tokens);
} else {
size_t num_tokens = _db.local().get_config().num_tokens();
if (bootstrap_tokens.size() != num_tokens) {
throw std::runtime_error(format("Cannot change the number of tokens from {:d} to {:d}", bootstrap_tokens.size(), num_tokens));
} else {
slogger.info("Using saved tokens {}", bootstrap_tokens);
}
}
}
slogger.debug("Setting tokens to {}", bootstrap_tokens);
co_await mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) {
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now.
tmptr->update_topology(get_broadcast_address(), _sys_ks.local().local_dc_rack());
return tmptr->update_normal_tokens(bootstrap_tokens, get_broadcast_address());
});
if (!_sys_ks.local().bootstrap_complete()) {
// If we're not bootstrapping then we shouldn't have chosen a CDC streams timestamp yet.
assert(should_bootstrap() || !cdc_gen_id);
// Don't try rewriting CDC stream description tables.
// See cdc.md design notes, `Streams description table V1 and rewriting` section, for explanation.
co_await _sys_ks.local().cdc_set_rewritten(std::nullopt);
}
if (!cdc_gen_id) {
// If we didn't observe any CDC generation at this point, then either
// 1. we're replacing a node,
// 2. we've already bootstrapped, but are upgrading from a non-CDC version,
// 3. we're the first node, starting a fresh cluster.
// In the replacing case we won't create any CDC generation: we're not introducing any new tokens,
// so the current generation used by the cluster is fine.
// In the case of an upgrading cluster, one of the nodes is responsible for creating
// the first CDC generation. We'll check if it's us.
// Finally, if we're the first node, we'll create the first generation.
if (!is_replacing()
&& (!_sys_ks.local().bootstrap_complete()
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
try {
cdc_gen_id = co_await cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node());
} catch (...) {
cdc_log.warn(
"Could not create a new CDC generation: {}. This may make it impossible to use CDC or cause performance problems."
" Use nodetool checkAndRepairCdcStreams to fix CDC.", std::current_exception());
}
}
}
// Persist the CDC streams timestamp before we persist bootstrap_state = COMPLETED.
if (cdc_gen_id) {
co_await _sys_ks.local().update_cdc_generation_id(*cdc_gen_id);
}
// If we crash now, we will choose a new CDC streams timestamp anyway (because we will also choose a new set of tokens).
// But if we crash after setting bootstrap_state = COMPLETED, we will keep using the persisted CDC streams timestamp after restarting.
co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED);
// At this point our local tokens and CDC streams timestamp are chosen (bootstrap_tokens, cdc_gen_id) and will not be changed.
// start participating in the ring.
co_await set_gossip_tokens(_gossiper, bootstrap_tokens, cdc_gen_id);
set_mode(mode::NORMAL);
if (get_token_metadata().sorted_tokens().empty()) {
auto err = format("join_token_ring: Sorted token in token_metadata is empty");
slogger.error("{}", err);
throw std::runtime_error(err);
}
assert(_group0);
co_await _group0->finish_setup_after_join();
co_await cdc_gen_service.after_join(std::move(cdc_gen_id));
}
future<> storage_service::mark_existing_views_as_built(sharded<db::system_distributed_keyspace>& sys_dist_ks) {
assert(this_shard_id() == 0);
auto views = _db.local().get_views();
co_await coroutine::parallel_for_each(views, [this, &sys_dist_ks] (view_ptr& view) -> future<> {
co_await _sys_ks.local().mark_view_as_built(view->ks_name(), view->cf_name());
co_await sys_dist_ks.local().finish_view_build(view->ks_name(), view->cf_name());
});
}
std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(const token_metadata& tm) {
std::vector<sstring> ignore_nodes_strs;
std::list<gms::inet_address> ignore_nodes;
boost::split(ignore_nodes_strs, _db.local().get_config().ignore_dead_nodes_for_replace(), boost::is_any_of(","));
for (std::string n : ignore_nodes_strs) {
try {
std::replace(n.begin(), n.end(), '\"', ' ');
std::replace(n.begin(), n.end(), '\'', ' ');
boost::trim_all(n);
if (!n.empty()) {
auto ep_and_id = tm.parse_host_id_and_endpoint(n);
ignore_nodes.push_back(ep_and_id.endpoint);
}
} catch (...) {
throw std::runtime_error(format("Failed to parse --ignore-dead-nodes-for-replace parameter: ignore_nodes={}, node={}: {}", ignore_nodes_strs, n, std::current_exception()));
}
}
return ignore_nodes;
}
// Runs inside seastar::async context
future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id, const std::optional<replacement_info>& replacement_info) {
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &cdc_gen_service, &replacement_info] {
auto bootstrap_rbno = is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap);
set_mode(mode::BOOTSTRAP);
slogger.debug("bootstrap: rbno={} replacing={}", bootstrap_rbno, is_replacing());
// Wait until we know tokens of existing node before announcing replacing status.
slogger.info("Wait until local node knows tokens of peer nodes");
_gossiper.wait_for_range_setup().get();
_db.invoke_on_all([] (replica::database& db) {
for (auto& cf : db.get_non_system_column_families()) {
cf->notify_bootstrap_or_replace_start();
}
}).get();
if (!replacement_info) {
int retry = 0;
while (get_token_metadata_ptr()->count_normal_token_owners() == 0) {
if (retry++ < 500) {
sleep_abortable(std::chrono::milliseconds(10), _abort_source).get();
continue;
}
// We're joining an existing cluster, so there are normal nodes in the cluster.
// We've waited for tokens to arrive.
// But we didn't see any normal token owners. Something's wrong, we cannot proceed.
throw std::runtime_error{
"Failed to learn about other nodes' tokens during bootstrap. Make sure that:\n"
" - the node can contact other nodes in the cluster,\n"
" - the `ring_delay` parameter is large enough (the 30s default should be enough for small-to-middle-sized clusters),\n"
" - a node with this IP didn't recently leave the cluster. If it did, wait for some time first (the IP is quarantined),\n"
"and retry the bootstrap."};
}
// Even if we reached this point before but crashed, we will make a new CDC generation.
// It doesn't hurt: other nodes will (potentially) just do more generation switches.
// We do this because with this new attempt at bootstrapping we picked a different set of tokens.
// Update pending ranges now, so we correctly count ourselves as a pending replica
// when inserting the new CDC generation.
if (!bootstrap_rbno) {
// When is_repair_based_node_ops_enabled is true, the bootstrap node
// will use node_ops_cmd to bootstrap, node_ops_cmd will update the pending ranges.
slogger.debug("bootstrap: update pending ranges: endpoint={} bootstrap_tokens={}", get_broadcast_address(), bootstrap_tokens);
mutate_token_metadata([this, &bootstrap_tokens] (mutable_token_metadata_ptr tmptr) {
auto endpoint = get_broadcast_address();
tmptr->update_topology(endpoint, _sys_ks.local().local_dc_rack());
tmptr->add_bootstrap_tokens(bootstrap_tokens, endpoint);
return update_pending_ranges(std::move(tmptr), format("bootstrapping node {}", endpoint));
}).get();
}
// After we pick a generation timestamp, we start gossiping it, and we stick with it.
// We don't do any other generation switches (unless we crash before complecting bootstrap).
assert(!cdc_gen_id);
cdc_gen_id = cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node()).get0();
if (!bootstrap_rbno) {
// When is_repair_based_node_ops_enabled is true, the bootstrap node
// will use node_ops_cmd to bootstrap, bootstrapping gossip status is not needed for bootstrap.
_gossiper.add_local_application_state({
{ gms::application_state::TOKENS, versioned_value::tokens(bootstrap_tokens) },
{ gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(cdc_gen_id) },
{ gms::application_state::STATUS, versioned_value::bootstrapping(bootstrap_tokens) },
}).get();
slogger.info("sleeping {} ms for pending range setup", get_ring_delay().count());
_gossiper.wait_for_range_setup().get();
dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), _sys_ks.local().local_dc_rack(), bootstrap_tokens, get_token_metadata_ptr());
slogger.info("Starting to bootstrap...");
bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper).get();
} else {
// Even with RBNO bootstrap we need to announce the new CDC generation immediately after it's created.
_gossiper.add_local_application_state({
{ gms::application_state::CDC_GENERATION_ID, versioned_value::cdc_generation_id(cdc_gen_id) },
}).get();
slogger.info("Starting to bootstrap...");
run_bootstrap_ops(bootstrap_tokens);
}
} else {
auto replace_addr = replacement_info->address;
auto replaced_host_id = replacement_info->host_id;
slogger.debug("Removing replaced endpoint {} from system.peers", replace_addr);
_sys_ks.local().remove_endpoint(replace_addr).get();
assert(replaced_host_id);
auto raft_id = raft::server_id{replaced_host_id.uuid()};
assert(_group0);
bool raft_available = _group0->wait_for_raft().get();
if (raft_available) {
slogger.info("Replace: removing {}/{} from group 0...", replace_addr, raft_id);
_group0->remove_from_group0(raft_id).get();
}
slogger.info("Starting to bootstrap...");
run_replace_ops(bootstrap_tokens, *replacement_info);
}
_db.invoke_on_all([] (replica::database& db) {
for (auto& cf : db.get_non_system_column_families()) {
cf->notify_bootstrap_or_replace_end();
}
}).get();
slogger.info("Bootstrap completed! for the tokens {}", bootstrap_tokens);
});
}
sstring
storage_service::get_rpc_address(const inet_address& endpoint) const {
if (endpoint != get_broadcast_address()) {
auto* v = _gossiper.get_application_state_ptr(endpoint, gms::application_state::RPC_ADDRESS);
if (v) {
return v->value;
}
}
return boost::lexical_cast<std::string>(endpoint);
}
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::get_range_to_address_map(const sstring& keyspace) const {
return get_range_to_address_map(_db.local().find_keyspace(keyspace).get_effective_replication_map());
}
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm) const {
return get_range_to_address_map(erm, erm->get_token_metadata_ptr()->sorted_tokens());
}
// Caller is responsible to hold token_metadata valid until the returned future is resolved
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm,
const std::vector<token>& sorted_tokens) const {
co_return co_await construct_range_to_endpoint_map(erm, co_await get_all_ranges(sorted_tokens));
}
future<> storage_service::handle_state_replacing_update_pending_ranges(mutable_token_metadata_ptr tmptr, inet_address replacing_node) {
try {
slogger.info("handle_state_replacing: Waiting for replacing node {} to be alive on all shards", replacing_node);
co_await _gossiper.wait_alive({replacing_node}, std::chrono::milliseconds(5 * 1000));
slogger.info("handle_state_replacing: Replacing node {} is now alive on all shards", replacing_node);
} catch (...) {
slogger.warn("handle_state_replacing: Failed to wait for replacing node {} to be alive on all shards: {}",
replacing_node, std::current_exception());
}
slogger.info("handle_state_replacing: Update pending ranges for replacing node {}", replacing_node);
co_await update_pending_ranges(tmptr, format("handle_state_replacing {}", replacing_node));
}
future<> storage_service::handle_state_bootstrap(inet_address endpoint) {
slogger.debug("endpoint={} handle_state_bootstrap", endpoint);
// explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified
auto tokens = get_tokens_for(endpoint);
slogger.debug("Node {} state bootstrapping, token {}", endpoint, tokens);
// if this node is present in token metadata, either we have missed intermediate states
// or the node had crashed. Print warning if needed, clear obsolete stuff and
// continue.
auto tmlock = co_await get_token_metadata_lock();
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (tmptr->is_normal_token_owner(endpoint)) {
// If isLeaving is false, we have missed both LEAVING and LEFT. However, if
// isLeaving is true, we have only missed LEFT. Waiting time between completing
// leave operation and rebootstrapping is relatively short, so the latter is quite
// common (not enough time for gossip to spread). Therefore we report only the
// former in the log.
if (!tmptr->is_leaving(endpoint)) {
slogger.info("Node {} state jump to bootstrap", endpoint);
}
tmptr->remove_endpoint(endpoint);
}
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint));
tmptr->add_bootstrap_tokens(tokens, endpoint);
if (_gossiper.uses_host_id(endpoint)) {
tmptr->update_host_id(_gossiper.get_host_id(endpoint), endpoint);
}
co_await update_pending_ranges(tmptr, format("handle_state_bootstrap {}", endpoint));
co_await replicate_to_all_cores(std::move(tmptr));
}
future<> storage_service::handle_state_normal(inet_address endpoint) {
slogger.debug("endpoint={} handle_state_normal", endpoint);
auto tokens = get_tokens_for(endpoint);
slogger.debug("Node {} state normal, token {}", endpoint, tokens);
auto tmlock = std::make_unique<token_metadata_lock>(co_await get_token_metadata_lock());
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (tmptr->is_normal_token_owner(endpoint)) {
slogger.info("Node {} state jump to normal", endpoint);
}
std::unordered_set<inet_address> endpoints_to_remove;
auto do_remove_node = [&] (gms::inet_address node) {
tmptr->remove_endpoint(node);
endpoints_to_remove.insert(node);
};
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
if (_gossiper.uses_host_id(endpoint)) {
auto host_id = _gossiper.get_host_id(endpoint);
auto existing = tmptr->get_endpoint_for_host_id(host_id);
if (existing && *existing != endpoint) {
if (*existing == get_broadcast_address()) {
slogger.warn("Not updating host ID {} for {} because it's mine", host_id, endpoint);
do_remove_node(endpoint);
} else if (_gossiper.compare_endpoint_startup(endpoint, *existing) > 0) {
slogger.warn("Host ID collision for {} between {} and {}; {} is the new owner", host_id, *existing, endpoint, endpoint);
do_remove_node(*existing);
slogger.info("Set host_id={} to be owned by node={}, existing={}", host_id, endpoint, *existing);
tmptr->update_host_id(host_id, endpoint);
} else {
slogger.warn("Host ID collision for {} between {} and {}; ignored {}", host_id, *existing, endpoint, endpoint);
do_remove_node(endpoint);
}
} else if (existing && *existing == endpoint) {
tmptr->del_replacing_endpoint(endpoint);
} else {
auto nodes = _gossiper.get_nodes_with_host_id(host_id);
bool left = std::any_of(nodes.begin(), nodes.end(), [this] (const gms::inet_address& node) { return _gossiper.is_left(node); });
if (left) {
slogger.info("Skip to set host_id={} to be owned by node={}, because the node is removed from the cluster, nodes {} used to own the host_id", host_id, endpoint, nodes);
co_return;
}
slogger.info("Set host_id={} to be owned by node={}", host_id, endpoint);
tmptr->update_host_id(host_id, endpoint);
}
}
// Tokens owned by the handled endpoint.
// The endpoint broadcasts its set of chosen tokens. If a token was also chosen by another endpoint,
// the collision is resolved by assigning the token to the endpoint which started later.
std::unordered_set<token> owned_tokens;
// token_to_endpoint_map is used to track the current token owners for the purpose of removing replaced endpoints.
// when any token is replaced by a new owner, we track the existing owner in `candidates_for_removal`
// and eventually, if any candidate for removal ends up owning no tokens, it is removed from token_metadata.
std::unordered_map<token, inet_address> token_to_endpoint_map = get_token_metadata().get_token_to_endpoint();
std::unordered_set<inet_address> candidates_for_removal;
for (auto t : tokens) {
// we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
auto current = token_to_endpoint_map.find(t);
if (current == token_to_endpoint_map.end()) {
slogger.debug("handle_state_normal: New node {} at token {}", endpoint, t);
owned_tokens.insert(t);
continue;
}
auto current_owner = current->second;
if (endpoint == current_owner) {
slogger.debug("handle_state_normal: endpoint={} == current_owner={} token {}", endpoint, current_owner, t);
// set state back to normal, since the node may have tried to leave, but failed and is now back up
owned_tokens.insert(t);
} else if (_gossiper.compare_endpoint_startup(endpoint, current_owner) > 0) {
slogger.debug("handle_state_normal: endpoint={} > current_owner={}, token {}", endpoint, current_owner, t);
owned_tokens.insert(t);
slogger.info("handle_state_normal: remove endpoint={} token={}", current_owner, t);
// currentOwner is no longer current, endpoint is. Keep track of these moves, because when
// a host no longer has any tokens, we'll want to remove it.
token_to_endpoint_map.erase(current);
candidates_for_removal.insert(current_owner);
slogger.info("handle_state_normal: Nodes {} and {} have the same token {}. {} is the new owner", endpoint, current_owner, t, endpoint);
} else {
// current owner of this token is kept and endpoint attempt to own it is rejected.
// Keep track of these moves, because when a host no longer has any tokens, we'll want to remove it.
token_to_endpoint_map.erase(current);
candidates_for_removal.insert(endpoint);
slogger.info("handle_state_normal: Nodes {} and {} have the same token {}. Ignoring {}", endpoint, current_owner, t, endpoint);
}
}
// After we replace all tokens owned by current_owner
// We check for each candidate for removal if it still owns any tokens,
// and remove it if it doesn't anymore.
if (!candidates_for_removal.empty()) {
for (const auto& [t, ep] : token_to_endpoint_map) {
if (candidates_for_removal.contains(ep)) {
slogger.debug("handle_state_normal: endpoint={} still owns tokens, will not be removed", ep);
candidates_for_removal.erase(ep);
if (candidates_for_removal.empty()) {
break;
}
}
}
}
for (const auto& ep : candidates_for_removal) {
slogger.info("handle_state_normal: endpoints_to_remove endpoint={}", ep);
endpoints_to_remove.insert(ep);
}
bool is_normal_token_owner = tmptr->is_normal_token_owner(endpoint);
bool do_notify_joined = false;
if (endpoints_to_remove.contains(endpoint)) [[unlikely]] {
if (!owned_tokens.empty()) {
on_fatal_internal_error(slogger, format("endpoint={} is marked for removal but still owns {} tokens", endpoint, owned_tokens.size()));
}
} else {
if (owned_tokens.empty()) {
on_internal_error_noexcept(slogger, format("endpoint={} is not marked for removal but owns no tokens", endpoint));
}
if (!is_normal_token_owner) {
do_notify_joined = true;
}
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint));
co_await tmptr->update_normal_tokens(owned_tokens, endpoint);
}
co_await update_pending_ranges(tmptr, format("handle_state_normal {}", endpoint));
co_await replicate_to_all_cores(std::move(tmptr));
tmlock.reset();
for (auto ep : endpoints_to_remove) {
co_await remove_endpoint(ep);
}
slogger.debug("handle_state_normal: endpoint={} is_normal_token_owner={} endpoint_to_remove={} owned_tokens={}", endpoint, is_normal_token_owner, endpoints_to_remove.contains(endpoint), owned_tokens);
if (!owned_tokens.empty() && !endpoints_to_remove.count(endpoint)) {
co_await update_peer_info(endpoint);
try {
co_await _sys_ks.local().update_tokens(endpoint, owned_tokens);
} catch (...) {
slogger.error("handle_state_normal: fail to update tokens for {}: {}", endpoint, std::current_exception());
}
}
// Send joined notification only when this node was not a member prior to this
if (do_notify_joined) {
co_await notify_joined(endpoint);
}
if (slogger.is_enabled(logging::log_level::debug)) {
const auto& tm = get_token_metadata();
auto ver = tm.get_ring_version();
for (auto& x : tm.get_token_to_endpoint()) {
slogger.debug("handle_state_normal: token_metadata.ring_version={}, token={} -> endpoint={}", ver, x.first, x.second);
}
}
}
future<> storage_service::handle_state_leaving(inet_address endpoint) {
slogger.debug("endpoint={} handle_state_leaving", endpoint);
auto tokens = get_tokens_for(endpoint);