-
Notifications
You must be signed in to change notification settings - Fork 5
/
node.rs
686 lines (619 loc) · 24.3 KB
/
node.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
use std::{env, net::Ipv4Addr, str::FromStr, sync::Arc, time::Duration};
use etcd_client::{Client, EventType, GetOptions, LockOptions, PutOptions, WatchOptions};
use parking_lot::RwLock;
use tokio::{select, time::interval};
use tracing::{debug, error, info};
use crate::{group_add_lock, group_membership_key_gen, id_key_lock};
use super::{config::Config, error::ClusterNodeError, registry::Registry, NodeType, ServiceNode};
enum KeyType {
Election,
Leader,
}
#[derive(Clone)]
pub struct Node {
/// Client for the etcd cluster.
client: Client,
/// The node name and IP address of this node.
svc_node: ServiceNode,
/// The node ID of this node.
node_id: Option<usize>,
/// The group key where members register themselves to the group.
group_key: Option<String>,
/// The cluster configuration.
config: Config,
/// This node's etcd cluster lease ID.
lease: i64,
/// Time between lease refreshes.
refresh_interval: u64,
/// The type of node this is - leader or only member.
nodetype: NodeType,
/// If the node is a leader then it has to watch its peers.
registry: Arc<RwLock<Option<Registry>>>,
}
impl Node {
pub async fn new() -> Self {
let lease_ttl = env::var("LEASE_TTL")
.expect("Lease TTL undefined")
.parse::<i64>()
.expect("Invalid lease ttl");
let refresh_interval = env::var("REFRESH_INTERVAL")
.expect("Refresh interval undefined")
.parse::<u64>()
.expect("Invalid refresh interval");
if lease_ttl <= refresh_interval as i64 {
error!(
"Lease refresh interval({}) larger than lease ttl({})",
refresh_interval, lease_ttl
);
}
let etcd = env::var("ETCD").expect("etcd address undefined");
let mut client = Client::connect([etcd], None)
.await
.expect("failed to create client");
let node = env::var("NODE").expect("Node name undefined");
let address = Ipv4Addr::from_str(
env::var("ADDRESS")
.expect("Node address undefined")
.as_str(),
)
.expect("Invalid IPv4 address");
let svc_node = ServiceNode { node, address };
let lease = client
.lease_grant(lease_ttl, None)
.await
.expect("Lease grant failed")
.id();
let config = Config::new();
Self {
client,
svc_node,
node_id: None,
group_key: None,
config,
lease,
refresh_interval,
nodetype: NodeType::Member,
registry: Arc::new(RwLock::new(None)),
}
}
pub fn is_leader(&self) -> bool {
self.nodetype == NodeType::Leader
}
/// Add a new service node to the group.
pub(crate) fn add_endpoint(&self, endpoint: String) -> Result<(), ClusterNodeError> {
if self.nodetype == NodeType::Member {
return Err(ClusterNodeError::InvalidFunctionAttempt(
"add_endpoint".to_owned(),
));
}
let mut reg = self.registry.as_ref().write();
match reg.as_mut() {
Some(reg) => {
reg.add_endpoint(endpoint)?;
Ok(())
}
None => Err(ClusterNodeError::InvalidState(
"Leader doesn't have registry initialized".to_owned(),
)),
}
}
/// Replace the entire group.
pub(crate) fn update_registry(&self, endpoints: Vec<String>) -> Result<(), ClusterNodeError> {
if self.nodetype == NodeType::Member {
return Err(ClusterNodeError::InvalidFunctionAttempt(
"update_registry".to_owned(),
));
}
let mut reg = self.registry.as_ref().write();
match reg.as_mut() {
Some(reg) => {
reg.update_registry(endpoints)?;
Ok(())
}
None => Err(ClusterNodeError::InvalidState(
"Leader doesn't have registry initialized".to_owned(),
)),
}
}
fn group_key(&self) -> Result<String, ClusterNodeError> {
match &self.group_key {
Some(g) => Ok(g.clone()),
None => Err(ClusterNodeError::InvalidState(
"Group ID is not known".to_owned(),
)),
}
}
fn flip_nodetype(&mut self) {
match self.nodetype {
NodeType::Member => self.nodetype = NodeType::Leader,
NodeType::Leader => self.nodetype = NodeType::Member,
}
}
async fn register(&mut self, group_id: usize) -> Result<(), ClusterNodeError> {
let lease = self.client.lease_grant(10, None).await?;
let putoptions = PutOptions::new().with_lease(lease.id());
let svc_node = serde_json::to_string(&self.svc_node)?;
let group_membership_key = group_membership_key_gen!(self.config.dbname(), group_id);
let grp_key = format!("{}-{:#?}", group_membership_key, svc_node.clone());
let resp = self
.client
.put(grp_key.clone(), svc_node, Some(putoptions))
.await?;
self.group_key = Some(grp_key);
info!("Registration successful: {resp:#?}");
Ok(())
}
async fn node_id(&mut self) -> Result<usize, ClusterNodeError> {
if let Some(id) = self.node_id {
return Ok(id);
}
let (id_key, failover_key) = self.config.id_keys();
// First check if any of the leaders are reporting failed group memebers.
let getoptions = GetOptions::new().with_prefix();
debug!("Lock Options");
let resp = self
.client
.get(failover_key, Some(getoptions))
.await
.expect("Failed to get node ID");
debug!("Locked");
let kvs = resp.kvs();
if !kvs.is_empty() {
for kv in kvs {
let key = kv.key();
let group = kv
.value_str()
.expect("Failed to get node ID")
.parse::<usize>()
.expect("Failed to parse node ID");
// Attempt to join group.
if self.join_group(key, group).await {
return Ok(group);
};
}
}
debug!("Getting new ID");
// At this point we have not been reassigned any old ID from a failed node.
// So we will ask for a new ID.
self.new_id(id_key).await
}
// Lock the group joining key and add the node to the group.
async fn join_group(&mut self, key: &[u8], group_id: usize) -> bool {
let group_lock_key = group_add_lock!(group_id);
// We expect to finish the op in 10 seconds.
let lease = match self.client.lease_grant(10, None).await {
Ok(lease_response) => lease_response,
Err(e) => {
error!("Failed to get lease to join group: {e}");
return false;
}
};
let lock_options = LockOptions::new().with_lease(lease.id());
let lock_resp = match self.client.lock(group_lock_key, Some(lock_options)).await {
Ok(resp) => resp,
Err(e) => {
error!("Error locking group add key: {}", e);
return false;
}
};
if let Err(e) = self.register(group_id).await {
error!("Error registering node to {group_id}: {e}");
return false;
}
// If added successfully delete the key that indicates this particular requirement.
// so other nodes do not try to join this group.
let _resp = match self.client.delete(key, None).await {
Ok(resp) => resp,
Err(e) => {
error!("Error deleting lock key: {}", e);
return false;
}
};
// Unlock the distributed mutex
match self.client.unlock(lock_resp.key()).await {
Ok(resp) => debug!("Join group key unlocked: {resp:#?}"),
Err(e) => error!("Join group unlock failed: {e}"),
};
true
}
// Get a new ID from the etcd cluster.
async fn new_id(&mut self, id_key: String) -> Result<usize, ClusterNodeError> {
// The node expects get the ID in 10 seconds.
let lease = self.client.lease_grant(10, None).await?;
let lock_options = LockOptions::new().with_lease(lease.id());
debug!("New ID lock");
let lock_resp = self
.client
.lock(id_key_lock!(), Some(lock_options))
.await
.expect("Failed to get node ID");
debug!("ID key: {}", id_key.clone());
let resp = self
.client
.get(id_key.clone(), None)
.await
.expect("Failed to get node ID");
debug!("New ID get response: {resp:#?}");
let kv = resp.kvs();
debug!("New id response");
debug!("New ID kv: {kv:#?}");
let mut val = None;
if !kv.is_empty() {
let inner_val = kv[0]
.value_str()
.expect("Failed to get the latest ID")
.parse::<u64>()
.expect("Failed to parse ID");
// Increment the ID here.
let _resp = self
.client
.put(id_key, format!("{}", inner_val + 1), None)
.await?;
val = Some(inner_val);
} else {
error!("New ID kv=0 error");
}
// Unlock id key.
match self.client.unlock(lock_resp.key()).await {
Ok(resp) => debug!("New id key unlocked: {resp:#?}"),
Err(e) => error!("New id unlock failed: {e}"),
}
// Send value or error.
match val {
Some(val) => {
self.node_id = Some(val as usize);
Ok(val as usize)
}
None => Err(ClusterNodeError::ServerCreationError(
"Failed to read node ID".to_owned(),
)),
}
}
/// keepalive keeps the etcd lease for this member alive.
/// This lease is used to both watch for group leaders if node is member
/// and watch for peers.
async fn keepalive(&mut self) -> Result<(), ClusterNodeError> {
let (mut lease_keeper, mut lease_keepalive_stream) = self
.client
.lease_keep_alive(self.lease)
.await
.expect("failed to start keep alive channels");
lease_keeper.keep_alive().await?;
if let Some(msg) = lease_keepalive_stream.message().await? {
debug!("lease {:?} keep alive, new ttl {:?}", msg.id(), msg.ttl());
}
Ok(())
}
/// Get peers.
async fn get_peers(&self) -> Result<Vec<String>, ClusterNodeError> {
if self.node_id.is_none() {
return Err(ClusterNodeError::InvalidState(
"Node ID is not known".to_owned(),
));
}
let getoptions = GetOptions::new().with_prefix();
let mut client = self.client.clone();
let group_key = self.group_key()?;
let get_resp = client
.get(group_key.clone(), Some(getoptions))
.await
.expect("Failed to get service");
let kvs = get_resp.kvs();
match kvs.len() {
0 => Err(ClusterNodeError::InvalidState(
"Invalid group key".to_owned(),
)),
_ => {
let mut endpoints = Vec::new();
for kv in kvs {
let ep = kv.value_str().expect("failed to get value");
info!("Key: {}, Service Node: {ep}", group_key);
endpoints.push(ep.to_owned());
}
Ok(endpoints)
}
}
}
/// Watch the peers.
async fn watch_peers(&self) -> Result<(), ClusterNodeError> {
let mut group_missing_node = Vec::new();
let peers = self.get_peers().await?;
for peer in peers {
self.add_endpoint(peer)?;
}
// A leader has to count peers in multiple groups.
// A member only checks its own group.
let leader = serde_json::to_string(&self.svc_node)?;
let groups = match self.config.group_ids(leader) {
Some(groups) => groups,
None => {
let node_id = match self.node_id {
Some(nid) => nid,
None => {
return Err(ClusterNodeError::InvalidState(
"Node ID undefined".to_owned(),
))
}
};
let group_id = match self.config.group_id(node_id) {
Some(gid) => gid,
None => {
return Err(ClusterNodeError::InvalidState(
"Group ID undefined".to_owned(),
))
}
};
vec![group_id]
}
};
let mut client = self.client.clone();
let svc_node = serde_json::to_string(&self.svc_node)?;
for group in groups {
let group_membership_key = group_membership_key_gen!(self.config.dbname(), group);
let grp_key = format!("{}-{:#?}", group_membership_key, svc_node.clone());
let watchoptions = WatchOptions::new().with_prefix();
let (mut peer_watcher, mut peer_watchstream) = client
.watch(grp_key.clone(), Some(watchoptions.clone()))
.await
.expect("Peer Watch failed");
info!("Created peer watcher");
peer_watcher
.request_progress()
.await
.expect("Peer watcher request progress failed");
if let Some(msg) = peer_watchstream
.message()
.await
.expect("Failed to watch peers")
{
for event in msg.events() {
match event.event_type() {
EventType::Put => {
if let Some(event) = event.kv() {
let svc_node = event.value_str()?;
info!("Added node: {svc_node}");
self.add_endpoint(svc_node.to_owned())?;
}
}
EventType::Delete => {
info!("One member has died");
let peers = self.get_peers().await?;
self.update_registry(peers)?;
}
}
}
}
// The leader only can report failed nodes.
if self.nodetype == NodeType::Leader {
if let Some(reg) = self.registry.as_ref().read().as_ref() {
if reg.member_count() < self.config.reads() {
let (_, failover_key) = self.config.id_keys();
let svc_node = serde_json::to_string(&self.svc_node)?;
let key = format!("{}-{}", failover_key, svc_node);
group_missing_node.push((key, format!("{group}")));
}
}
}
}
if !group_missing_node.is_empty() {
for (key, grp_str) in group_missing_node {
let put_resp = client.put(key, grp_str, None).await?;
info!("{put_resp:#?}");
}
}
Ok(())
}
fn node_id_from_registry(&self) -> Result<usize, ClusterNodeError> {
match self.node_id {
Some(nid) => Ok(nid),
None => Err(ClusterNodeError::InvalidState(
"Node ID undefined".to_owned(),
)),
}
}
fn fetch_keys(&self, typ: KeyType) -> Result<(String, String), ClusterNodeError> {
let group_id = match self.config.group_id(self.node_id_from_registry()?) {
Some(gid) => gid,
None => {
return Err(ClusterNodeError::InvalidState(
"Group ID undefined".to_owned(),
))
}
};
match typ {
KeyType::Leader => match self.config.leader_key(group_id) {
Some(keys) => Ok(keys),
None => Err(ClusterNodeError::InvalidState(
"Leader keys not found".to_owned(),
)),
},
KeyType::Election => match self.config.election_keys(group_id) {
Some(keys) => Ok(keys),
None => Err(ClusterNodeError::InvalidState(
"Election keys not found".to_owned(),
)),
},
}
}
fn leader_keys(&self) -> Result<(String, String), ClusterNodeError> {
self.fetch_keys(KeyType::Leader)
}
fn election_keys(&self) -> Result<(String, String), ClusterNodeError> {
self.fetch_keys(KeyType::Election)
}
/// Get the leaders in the system.
pub fn get_leaders(&self) -> Result<Vec<ServiceNode>, ClusterNodeError> {
let mut client = self.client.clone();
let getoptions = GetOptions::new().with_prefix();
let leader_keys = self.leader_keys()?;
let mut leaders = Vec::new();
for i in 0..2 {
let leader_key = if i == 0 {
leader_keys.0.clone()
} else {
leader_keys.1.clone()
};
let handle = tokio::runtime::Handle::current();
let resp = handle.block_on(client.get(leader_key, Some(getoptions.clone())))?;
// let resp = client.get(leader_key, Some(getoptions.clone())).await?;
let kvs = resp.kvs();
if !kvs.is_empty() {
let leader = kvs[0].value_str()?.to_owned();
let service_node: ServiceNode = serde_json::from_str(&leader)?;
leaders.push(service_node);
}
}
match leaders.is_empty() {
true => Err(ClusterNodeError::InvalidState("No Leader found".to_owned())),
false => Ok(leaders),
}
}
async fn campaign(&mut self, election_key: String) -> Result<(), ClusterNodeError> {
let leader_keys = self.leader_keys()?;
let election_keys = self.election_keys()?;
// Lock with lease.
let lease = self.client.lease_grant(10, None).await?;
let lock_options = LockOptions::new().with_lease(lease.id());
let lock_resp = self
.client
.lock(election_key.clone(), Some(lock_options))
.await?;
match self.client.unlock(lock_resp.key()).await {
Ok(resp) => debug!("New id key unlocked: {resp:#?}"),
Err(e) => error!("New id unlock failed: {e}"),
}
info!("Locking with lease: {:#?}", lock_resp.key());
let svc_node = serde_json::to_string(&self.svc_node)?;
let leader_key = if election_keys.0 == election_key.clone() {
leader_keys.0.clone()
} else {
leader_keys.1.clone()
};
let put_resp = self.client.put(leader_key.clone(), svc_node, None).await?;
info!("Put response: {:#?}", put_resp);
Ok(())
}
/// Watch the group leaders continuously.
/// If any of them fail, then return
async fn watch_group_leaders(&self) -> Result<Option<String>, ClusterNodeError> {
let leader_keys = self.leader_keys()?;
let election_keys = self.election_keys()?;
// Check if the keys already exist.
let getoptions = GetOptions::new().with_prefix();
let mut client = self.client.clone();
for i in 0..2 {
let election_key = if i == 0 {
election_keys.0.clone()
} else {
election_keys.1.clone()
};
// Does the key already exist?
let resp = client
.get(election_key.clone(), Some(getoptions.clone()))
.await
.expect("Failed to query queue");
let kvs = resp.kvs();
if kvs.is_empty() {
return Ok(Some(election_key));
}
}
let watchoptions = WatchOptions::new().with_prefix();
// Add the first key to the watcher.
let (mut election_watcher, mut election_watchstream) = client
.watch(election_keys.0.clone(), Some(watchoptions.clone()))
.await
.expect("Election watch failed");
info!("Created election watcher");
election_watcher
.request_progress()
.await
.expect("Election watcher request progress failed");
election_watcher
.watch(election_keys.1.clone(), Some(watchoptions.clone()))
.await
.expect("Election watch failed");
info!("Added the second election key");
if let Some(msg) = election_watchstream
.message()
.await
.expect("failed to watch the stream")
{
for event in msg.events() {
if event.event_type() == EventType::Delete {
// These are the only two possibilities.
if let Some(kv) = event.kv() {
if kv
.key_str()
.expect("failed to get the key")
.contains(&leader_keys.0.clone())
{
return Ok(Some(election_keys.0.clone()));
}
}
}
}
}
// No delete type event was encountered.
// But there are no errors either.
Ok(None)
}
// // pub fn run_cluster_node(&mut self) -> BoxFuture<Result<(), ClusterNodeError>> {
// pub async fn run_cluster_node(&mut self) -> anyhow::Result<()> {
pub async fn run_cluster_node(&mut self) -> Result<(), ClusterNodeError> {
// Register the node
let node_id = self.node_id().await?;
info!("Node ID: {node_id}");
let group_id = match self.config.group_id(node_id) {
Some(group_id) => group_id,
None => {
return Err(ClusterNodeError::InvalidState(
"Group ID invalid".to_owned(),
))
}
};
info!("Group ID: {group_id}");
self.register(group_id).await?;
info!("Registered");
// Start operations.
let mut interval = interval(Duration::from_secs(self.refresh_interval));
loop {
match self.nodetype {
// If this member is not a leader then:
// Keep the lease alive.
// Watch for new peers.
// Watch the election. Campaign to become the leader.
NodeType::Member => select! {
_ = interval.tick() => self.keepalive().await?,
Ok(Some(key)) = self.watch_group_leaders() => {
match self.campaign(key).await {
Ok(_) => self.flip_nodetype(),
Err(e) => match e {
ClusterNodeError::EtcdError(e) => match e {
etcd_client::Error::GRpcStatus(e) => info!("Did not become leader: {}", e),
_ => {
error!("System error while campaigning: {}", e);
return Err(ClusterNodeError::EtcdError(e));
// return Err(anyhow!("{}", e));
},
},
_ => {
error!("Error while campaigning: {}", e);
return Err(e);
// return Err(anyhow!("{}", e));
},
}
}
},
_ = self.watch_peers() => {},
},
// If this node is the leader,
// Keep the lease alive.
// Keep track of the peers.
NodeType::Leader => select! {
_ = interval.tick() => self.keepalive().await?,
_ = self.watch_peers() => {},
},
}
}
}
}