-
Notifications
You must be signed in to change notification settings - Fork 16
/
raft.h
1660 lines (1443 loc) · 53.2 KB
/
raft.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Copyright (c) 2013, Willem-Hendrik Thiart
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*
* @file
* @author Willem Thiart himself@willemthiart.com
*/
#ifndef RAFT_H_
#define RAFT_H_
#include <stddef.h>
#include "raft_types.h"
typedef enum {
RAFT_ERR_NOT_LEADER = -2,
RAFT_ERR_ONE_VOTING_CHANGE_ONLY = -3,
RAFT_ERR_SHUTDOWN = -4,
RAFT_ERR_NOMEM = -5,
RAFT_ERR_SNAPSHOT_IN_PROGRESS = -6,
RAFT_ERR_SNAPSHOT_ALREADY_LOADED = -7,
RAFT_ERR_INVALID_NODEID = -8,
RAFT_ERR_LEADER_TRANSFER_IN_PROGRESS = -9,
RAFT_ERR_DONE = -10,
RAFT_ERR_NOTFOUND = -11,
RAFT_ERR_MISUSE = -12,
RAFT_ERR_TRYAGAIN = -13,
} raft_error_e;
typedef enum {
RAFT_MEMBERSHIP_ADD,
RAFT_MEMBERSHIP_REMOVE,
} raft_membership_e;
typedef enum {
RAFT_STATE_FOLLOWER = 1,
RAFT_STATE_PRECANDIDATE,
RAFT_STATE_CANDIDATE,
RAFT_STATE_LEADER,
} raft_state_e;
typedef enum {
RAFT_LEADER_TRANSFER_TIMEOUT,
RAFT_LEADER_TRANSFER_UNEXPECTED_LEADER,
RAFT_LEADER_TRANSFER_EXPECTED_LEADER,
} raft_leader_transfer_e;
typedef enum {
RAFT_CONFIG_ELECTION_TIMEOUT = 1,
RAFT_CONFIG_REQUEST_TIMEOUT,
RAFT_CONFIG_AUTO_FLUSH,
RAFT_CONFIG_LOG_ENABLED,
RAFT_CONFIG_NONBLOCKING_APPLY,
RAFT_CONFIG_DISABLE_APPLY,
} raft_config_e;
#define RAFT_NODE_ID_NONE (-1)
typedef enum {
/** Regular log type. This is for application data intended for the FSM. */
RAFT_LOGTYPE_NORMAL,
/** Membership change. Non-voting nodes can't cast votes. Nodes in this
* non-voting state are used to catch up with the cluster, when trying to
* the join the cluster.
*/
RAFT_LOGTYPE_ADD_NONVOTING_NODE,
/** Membership change. Add a voting node. */
RAFT_LOGTYPE_ADD_NODE,
/** Membership change. Remove a node */
RAFT_LOGTYPE_REMOVE_NODE,
/** A no-op entry appended automatically when a leader begins a new term in
* order to determine the current commit index.
*/
RAFT_LOGTYPE_NO_OP,
/** Users can piggyback the entry mechanism by specifying log types that
* are higher than RAFT_LOGTYPE_NUM.
*/
RAFT_LOGTYPE_NUM = 100,
} raft_logtype_e;
typedef struct raft_server_stats {
/** Miscellaneous */
unsigned long long appendentries_req_with_entry;
unsigned long long snapshots_created;
unsigned long long snapshots_received;
unsigned long long exec_throttled;
/** Message types */
unsigned long long appendentries_req_sent;
unsigned long long appendentries_req_received;
unsigned long long appendentries_req_failed;
unsigned long long appendentries_resp_received;
unsigned long long snapshot_req_sent;
unsigned long long snapshot_req_received;
unsigned long long snapshot_req_failed;
unsigned long long snapshot_resp_received;
unsigned long long requestvote_prevote_req_sent;
unsigned long long requestvote_prevote_req_received;
unsigned long long requestvote_prevote_req_failed;
unsigned long long requestvote_prevote_req_granted;
unsigned long long requestvote_prevote_resp_received;
unsigned long long requestvote_req_sent;
unsigned long long requestvote_req_received;
unsigned long long requestvote_req_failed;
unsigned long long requestvote_req_granted;
unsigned long long requestvote_resp_received;
} raft_server_stats_t;
/** Entry that is stored in the server's entry log. */
typedef struct raft_entry
{
/** the entry's term at the point it was created */
raft_term_t term;
/** the entry's unique ID */
raft_entry_id_t id;
/** session this entry belongs to **/
raft_session_t session;
/** type of entry */
int type;
/** number of references */
unsigned short refs;
/** private local data */
void *user_data;
/** free function, used instead of __free if specified */
void (*free_func) (struct raft_entry *entry);
/** data length */
raft_size_t data_len;
/** data */
char data[];
} raft_entry_t;
/** Message sent from client to server.
* The client sends this message to a server with the intention of having it
* applied to the FSM. */
typedef raft_entry_t raft_entry_req_t;
/** Entry message response.
* Indicates to client if entry was committed or not. */
typedef struct
{
/** the entry's unique ID */
raft_entry_id_t id;
/** the entry's term */
raft_term_t term;
/** the entry's index */
raft_index_t idx;
} raft_entry_resp_t;
typedef struct
{
/** chunk offset */
raft_size_t offset;
/** Chunk data pointer */
void *data;
/** Chunk len */
raft_size_t len;
/** 1 if this is the last chunk */
int last_chunk;
} raft_snapshot_chunk_t;
/** Vote request message.
* Sent to nodes when a server wants to become leader.
* This message could force a leader/candidate to become a follower. */
typedef struct
{
/** 1 if this is a prevote message, 0 otherwise */
int prevote;
/** currentTerm, to force other leader/candidate to step down */
raft_term_t term;
/** candidate requesting vote */
raft_node_id_t candidate_id;
/** index of candidate's last log entry */
raft_index_t last_log_idx;
/** term of candidate's last log entry */
raft_term_t last_log_term;
} raft_requestvote_req_t;
/** Vote request response message.
* Indicates if node has accepted the server's vote request. */
typedef struct
{
/** 1 if this is a prevote message, 0 otherwise */
int prevote;
/** term of received requestvote msg */
raft_term_t request_term;
/** currentTerm, for candidate to update itself */
raft_term_t term;
/** true means candidate received vote */
int vote_granted;
} raft_requestvote_resp_t;
typedef struct
{
/** currentTerm, for follower to update itself */
raft_term_t term;
/** used to identify the sender node. Useful when this message is received
* from the nodes that are not part of the configuration yet. */
raft_node_id_t leader_id;
/** id, to make it possible to associate responses with requests. */
raft_msg_id_t msg_id;
/** last included index of the snapshot */
raft_index_t snapshot_index;
/** last included term of the snapshot */
raft_term_t snapshot_term;
/** snapshot chunk **/
raft_snapshot_chunk_t chunk;
} raft_snapshot_req_t;
typedef struct
{
/** the msg_id this response refers to */
raft_msg_id_t msg_id;
/** last included index of the snapshot this response refers to */
raft_index_t snapshot_index;
/** currentTerm, to force other leader to step down */
raft_term_t term;
/** indicates last acknowledged snapshot offset by the follower */
raft_size_t offset;
/** 1 if request is accepted */
int success;
/** 1 if this is a response to the final chunk */
int last_chunk;
} raft_snapshot_resp_t;
/** Appendentries message.
* This message is used to tell nodes if it's safe to apply entries to the FSM.
* Can be sent without any entries as a keep alive message.
* This message could force a leader/candidate to become a follower. */
typedef struct
{
/** used to identify the sender node. Useful when this message is received
* from the nodes that are not part of the configuration yet. **/
raft_node_id_t leader_id;
/** id, to make it possible to associate responses with requests. */
raft_msg_id_t msg_id;
/** currentTerm, to force other leader/candidate to step down */
raft_term_t term;
/** the index of the log just before the newest entry for the node who
* receives this message */
raft_index_t prev_log_idx;
/** the term of the log just before the newest entry for the node who
* receives this message */
raft_term_t prev_log_term;
/** the index of the entry that has been appended to the majority of the
* cluster. Entries up to this index will be applied to the FSM */
raft_index_t leader_commit;
/** number of entries within this message */
raft_index_t n_entries;
/** array of pointers to entries within this message */
raft_entry_req_t** entries;
} raft_appendentries_req_t;
/** Appendentries response message.
* Can be sent without any entries as a keep alive message.
* This message could force a leader/candidate to become a follower. */
typedef struct
{
/** the msg_id this response refers to */
raft_msg_id_t msg_id;
/** currentTerm, to force other leader/candidate to step down */
raft_term_t term;
/** true if follower contained entry matching prevLogidx and prevLogTerm */
int success;
/* Non-Raft fields follow: */
/* Having the following fields allows us to do less book keeping in
* regards to full fledged RPC */
/** If success, this is the highest log IDX we've received and appended to
* our log; otherwise, this is the our currentIndex */
raft_index_t current_idx;
} raft_appendentries_resp_t;
typedef struct raft_server raft_server_t;
typedef struct raft_node raft_node_t;
/** Callback for sending request vote messages.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node The node's ID that we are sending this message to
* @param[in] msg The request vote message to be sent
* @return 0 on success */
typedef int (
*raft_send_requestvote_f
) (
raft_server_t* raft,
void *user_data,
raft_node_t* node,
raft_requestvote_req_t* msg
);
/** Callback for sending append entries messages.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node The node's ID that we are sending this message to
* @param[in] msg The appendentries message to be sent
* @return 0 on success */
typedef int (
*raft_send_appendentries_f
) (
raft_server_t* raft,
void *user_data,
raft_node_t* node,
raft_appendentries_req_t* msg
);
/** Callback for sending snapshot messages.
* @param[in] raft Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node Node's ID that needs a snapshot sent to
* @param[in] msg Snapshot msg
**/
typedef int (
*raft_send_snapshot_f
) (
raft_server_t* raft,
void *user_data,
raft_node_t* node,
raft_snapshot_req_t* msg
);
/** Callback for loading the received snapshot. User should load snapshot using
* raft_begin_load_snapshot() and raft_end_load_snapshot();
* e.g
*
* int loadsnapshot_callback()
* {
* // User loads the received snapshot
* int rc = loadSnapshotData();
* if (rc != 0) {
* return rc;
* }
*
* rc = raft_begin_load_snapshot(raft, snapshot_term, snapshot_index);
* if (rc != 0) {
* return -1;
* }
*
* // User should configure nodes using configuration data in the snapshot
* // e.g Using raft_add_node(), raft_node_set_voting() etc.
* configureNodesFromSnapshot();
*
* raft_end_load_snapshot(raft);
* return 0;
* }
*
* @param[in] raft Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] snapshot_term Received snapshot term
* @param[in] snapshot_index Received snapshot index
* @return 0 on success */
typedef int (
*raft_load_snapshot_f
) (
raft_server_t* raft,
void *user_data,
raft_term_t snapshot_term,
raft_index_t snapshot_index
);
/** Callback to get a chunk from the snapshot file. This chunk will be sent
* to the follower.
*
* 'chunk' struct fields should be filled with the appropriate data.
* To apply backpressure, return RAFT_ERR_DONE.
* @param[in] raft Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node Chunk will be sent to this node
* @param[in] offset Snapshot offset we request
* @param[in] chunk Snapshot chunk
* @return 0 on success */
typedef int (
*raft_get_snapshot_chunk_f
) (
raft_server_t* raft,
void *user_data,
raft_node_t* node,
raft_size_t offset,
raft_snapshot_chunk_t* chunk
);
/** Callback to store a snapshot chunk. This chunk is received from the leader.
* @param[in] raft Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] snapshot_index Last index of the received snapshot
* @param[in] offset Offset of the chunk we received
* @param[in] chunk Snapshot chunk
* @return 0 on success */
typedef int (
*raft_store_snapshot_chunk_f
) (
raft_server_t* raft,
void *user_data,
raft_index_t snapshot_index,
raft_size_t offset,
raft_snapshot_chunk_t* chunk
);
/** Callback to clear incoming snapshot file. This might be called to clean up
* a partial snapshot file. e.g While we are still receiving snapshot, leader
* takes another snapshot and starts to send it.
*
* @param[in] raft Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @return 0 on success */
typedef int (
*raft_clear_snapshot_f
) (
raft_server_t* raft,
void *user_data
);
/** Callback for detecting when non-voting nodes have obtained enough logs.
* This triggers only when there are no pending configuration changes.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node The node
* @return 0 does not want to be notified again; otherwise -1 */
typedef int (
*raft_node_has_sufficient_logs_f
) (
raft_server_t* raft,
void *user_data,
raft_node_t* node
);
/** Callback for providing debug logging information.
* This callback is optional
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] buf The buffer that was logged */
typedef void (
*raft_log_f
) (
raft_server_t* raft,
void *user_data,
const char *buf
);
/** Callback for saving current term and vote to the disk.
* For safety reasons this callback MUST flush the term and vote changes to
* disk atomically.
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] term Current term
* @param[in] vote Vote
* @return 0 on success */
typedef int (
*raft_persist_metadata_f
) (
raft_server_t *raft,
void *user_data,
raft_term_t term,
raft_node_id_t vote
);
/** Callback for saving log entry changes.
*
* This callback is used for:
* <ul>
* <li>Adding entries to the log (ie. offer)</li>
* <li>Removing the first entry from the log (ie. polling)</li>
* <li>Removing the last entry from the log (ie. popping)</li>
* <li>Applying entries</li>
* </ul>
*
* For safety reasons this callback MUST flush the change to disk.
*
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] entry The entry that the event is happening to.
* @param[in] entry_idx The entries index in the log
* @return 0 on success
* */
typedef int (
*raft_logentry_event_f
) (
raft_server_t* raft,
void *user_data,
raft_entry_t *entry,
raft_index_t entry_idx
);
/** Callback for determining which node this configuration log entry
* affects. This call only applies to configuration change log entries.
*
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] entry The entry that the event is happening to.
* @param[in] entry_idx The entries index in the log
* @return the node ID of the node
* */
typedef raft_node_id_t (
*raft_get_node_id_f
) (
raft_server_t* raft,
void *user_data,
raft_entry_t *entry,
raft_index_t entry_idx
);
/** Callback for being notified of membership changes.
*
* Implementing this callback is optional.
*
* Remove notification happens before the node is about to be removed.
*
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node The node that is the subject of this log. Could be NULL.
* @param[in] entry The entry that was the trigger for the event. Could be NULL.
* @param[in] type The type of membership change */
typedef void (
*raft_membership_event_f
) (
raft_server_t* raft,
void *user_data,
raft_node_t *node,
raft_entry_t *entry,
raft_membership_e type
);
/** Callback for being notified of state changes.
*
* Implementing this callback is optional.
*
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] state The new cluster state. */
typedef void (
*raft_state_event_f
) (
raft_server_t* raft,
void *user_data,
raft_state_e state
);
/** Call for being notified of leadership transfer events.
*
* Implementing this callback is optional
*
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] result the leadership transfer result
*/
typedef void (
*raft_transfer_event_f
) (
raft_server_t* raft,
void *user_data,
raft_leader_transfer_e result
);
/** Callback for sending TimeoutNow RPC messages
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @param[in] node The node that we are sending this message to
* @return 0 on success
*/
typedef int (
*raft_send_timeoutnow_f
) (
raft_server_t* raft,
void *user_data,
raft_node_t* node
);
/** Callback to skip sending raft_appendentries_req to the node
*
* Implementing this callback is optional
*
* If there are already pending appendentries messages in flight, you may want
* to skip sending more until you receive response for the previous ones.
* If the node is a slow consumer and you create raft_appendentries_req for each
* batch of new entries received, it may cause out of memory.
*
* Also, this way you can do batching. If new entries are received with an
* interval, creating a new appendentries message for each one might be
* inefficient. For each appendentries message, follower has to write entries
* to the disk before sending the response. e.g If there are 1000 appendentries
* message in flight, to commit a new entry, previous 1000 disk write operations
* must be completed. Considering disk write operations are quite slow, 1000
* write operations will take quite a time. A better approach would be limiting
* in flight appendentries messages depending on network conditions and disk
* performance.
*
* @param[in] raft The Raft server making this callback
* @param[in] node The node that we are about to send raft_appendentries_req to
* @return 0 to send message
* Any other value to skip sending message
*/
typedef int (
*raft_backpressure_f
) (
raft_server_t* raft,
void *user_data,
raft_node_t* node
);
/** Callback for fetching entries to send in a appendentries message.
*
* This callback is useful when you want to limit appendentries message size.
* Application is supposed to fill the `entries` array by using
* raft_get_entry_from_idx() and raft_get_entries_from_idx() functions. If the
* application wants to limit the appendentries message size, it can fill the
* array partially. As this callback is inside a loop, the remaining entries
* will be fetched and sent as another append entries message in the next
* callback.
*
* @param[in] raft The Raft server making this callback.
* @param[in] user_data User data that is passed from Raft server.
* @param[in] node The node that we are sending this message to.
* @param[in] idx Index of first entry to fetch.
* @param[in] entries_n Length of entries (max. entries to fetch).
* @param[out] entries An initialized array of raft_entry_t*.
* @return Number of entries fetched
*/
typedef raft_index_t (
*raft_get_entries_to_send_f
) (
raft_server_t *raft,
void *user_data,
raft_node_t *node,
raft_index_t idx,
raft_index_t entries_n,
raft_entry_t **entries
);
/** Callback to retrieve monotonic timestamp in microseconds .
*
* @param[in] raft The Raft server making this callback
* @param[in] user_data User data that is passed from Raft server
* @return Timestamp in microseconds
*/
typedef raft_time_t (
*raft_timestamp_f
) (
raft_server_t *raft,
void *user_data
);
typedef struct
{
/** Callback for sending request vote messages */
raft_send_requestvote_f send_requestvote;
/** Callback for sending appendentries messages */
raft_send_appendentries_f send_appendentries;
/** Callback for sending snapshot messages */
raft_send_snapshot_f send_snapshot;
/** Callback for sending timeoutnow message */
raft_send_timeoutnow_f send_timeoutnow;
/** Callback for loading snapshot. This will be called when we complete
* receiving snapshot from the leader */
raft_load_snapshot_f load_snapshot;
/** Callback to get a chunk of the snapshot file */
raft_get_snapshot_chunk_f get_snapshot_chunk;
/** Callback to store a chunk of the snapshot */
raft_store_snapshot_chunk_f store_snapshot_chunk;
/** Callback to dismiss temporary file which is used for incoming
* snapshot chunks */
raft_clear_snapshot_f clear_snapshot;
/** Callback for finite state machine application
* Return 0 on success.
* Return RAFT_ERR_SHUTDOWN if you want the server to shutdown. */
raft_logentry_event_f applylog;
/** Callback for persisting term and vote data
* For safety reasons this callback MUST flush the term and vote changes to
* disk atomically. */
raft_persist_metadata_f persist_metadata;
/** Callback for determining which node this configuration log entry
* affects. This call only applies to configuration change log entries.
* @return the node ID of the node */
raft_get_node_id_f get_node_id;
/** Callback for detecting when a non-voting node has sufficient logs. */
raft_node_has_sufficient_logs_f node_has_sufficient_logs;
/** Callback to retrieve monotonic timestamp in microseconds */
raft_timestamp_f timestamp;
/** (optional) Callback for being notified of membership changes. */
raft_membership_event_f notify_membership_event;
/** (optional) Callback for being notified of state changes. */
raft_state_event_f notify_state_event;
/** (optional) Callback for being notified of transfer leadership events. */
raft_transfer_event_f notify_transfer_event;
/** (optional) Callback for catching debugging log messages. */
raft_log_f log;
/** (optional) Callback for deciding whether to send raft_appendentries_req
* to a node. */
raft_backpressure_f backpressure;
/** (optional) Callback for preparing entries to send in
* a raft_appendentries_req. */
raft_get_entries_to_send_f get_entries_to_send;
} raft_cbs_t;
/** A callback used to notify when queued read requests can be processed.
*
* @param[in] arg Argument passed in the original call.
* @param[in] can_read If non-zero, the read requests may be processed and
* returned to the user. Otherwise the request should be treated as if
* arriving to a non leader.
*/
typedef void (
*raft_read_request_callback_f
) (
void *arg,
int can_read
);
/** Generic Raft Log implementation.
*
* This is an abstract interface that can be used to implement pluggable
* Raft Log implementations, unlike the built-in implementation which is
* more opinionated (e.g. is entirely in-memory, etc.).
*
* The log implementation is expected to be persistent, so it must avoid
* losing entries that have been appended to it.
*/
typedef struct raft_log_impl
{
/** Log implementation construction, called exactly once when Raft
* initializes.
*
* @param[in] raft The Raft server using the log.
* @param[in] arg User-specified initialization argument, as passed to
* raft_new().
* @return Initialized log handle. This handle is passed as 'log' on
* all subsequent calls.
*
* @note A common pattern may involve initializing the log engine
* in advance and passing a handle to it as arg. The init function
* can then simply return arg.
*/
void *(*init) (void *raft, void *arg);
/** Log implementation destruction, called exactly once when Raft
* shuts down.
*
* All memory and resources allocated since init() should be released.
*
* @param[in] log The log handle.
*/
void (*free) (void *log);
/** Reset log. All entries should be deleted, and the log is configured
* such that the next appended log entry would be assigned with the
* specified index.
*
* A log implementation that has been initialized for the first time and
* contains no persisted data should implicitly perform reset(1).
*
* A reset operation with a higher first_idx is expected when the log
* is compacted after a snapshot is taken. In this case the log
* implementation is expected to persist the index and term.
*
* @param[in] first_idx Index to assign to the first entry in the log.
* @param[in] term Term of last applied entry, if reset is called after
* a snapshot.
*/
void (*reset) (void *log, raft_index_t first_idx, raft_term_t term);
/** Append an entry to the log.
* @param[in] entry Entry to append.
* @return
* 0 on success;
* RAFT_ERR_SHUTDOWN server should shutdown;
* RAFT_ERR_NOMEM memory allocation failure.
*
* @note
* The passed raft_entry_t is expected to be allocated by raft_entry_new().
* The caller is expected to call raft_entry_release() after the append.
*
* The log implementation shall call raft_entry_hold() in order to
* maintain its reference count, and call raft_entry_release() when
* the entry is no longer needed.
*
* @todo
* 1. Batch append of multiple entries.
* 2. Consider an async option to make it possible to implement
* I/O in a background thread.
*/
int (*append) (void *log, raft_entry_t *entry);
/** Remove entries from the start of the log, as necessary when compacting
* the log and deleting the oldest entries.
*
* The log implementation must call raft_entry_release() on any removed
* in-memory entries.
*
* @param[in] first_idx Index of first entry to be left in log.
* @return
* 0 on success;
* -1 on error (e.g. log is empty).
*/
int (*poll) (void *log, raft_index_t first_idx);
/** Remove entries from the end of the log, as necessary when rolling back
* append operations that have not been committed.
*
* The log implementation must call raft_entry_release() on any removed
* in-memory entries
*
* @param[in] from_idx Index of first entry to be removed. All entries
* starting from and including this index shall be removed.
* @return
* 0 on success;
* -1 on error.
*/
int (*pop) (void *log, raft_index_t from_idx);
/** Get a single entry from the log.
*
* The log implementation must call raft_entry_hold() on the fetched entry
*
* @param[in] idx Index of entry to fetch.
* @return
* Pointer to entry on success;
* NULL if no entry in specified index.
*
* @note
* Caller must use raft_entry_release() when no longer requiring the
* entry.
*/
raft_entry_t* (*get) (void *log, raft_index_t idx);
/** Get a batch of entries from the log.
*
* The log implementation must call raft_entry_hold() on the fetched entries
*
* @param[in] idx Index of first entry to fetch.
* @param[in] entries_n Length of entries (max. entries to fetch).
* @param[out] entries An initialized array of raft_entry_t*.
* @return
* Number of entries fetched;
* -1 on error.
*
* @note
* Caller must use raft_entry_release_list() when no longer requiring
* the returned entries.
*/
raft_index_t (*get_batch) (void *log, raft_index_t idx,
raft_index_t entries_n, raft_entry_t **entries);
/** Get first entry's index.
* @return
* Index of first entry.
*/
raft_index_t (*first_idx) (void *log);
/** Get current (latest) entry's index.
* @return
* Index of latest entry.
*/
raft_index_t (*current_idx) (void *log);
/** Get number of entries in the log.
* @return
* Number of entries.
*/
raft_index_t (*count) (void *log);
/** Persist log file to the disk. Usually, implemented as calling fsync()
* for the log file.
* @return 0 on success
* -1 on error
*/
int (*sync) (void *log);
} raft_log_impl_t;
/** Initialise a new Raft server, using the in-memory log implementation.
*
* Request timeout defaults to 200 milliseconds
* Election timeout defaults to 1000 milliseconds
*
* @return newly initialised Raft server */
raft_server_t* raft_new(void);
/** Initializes a new Raft server with a custom Raft Log implementation.
*
* @param[in] log_impl Callbacks structure for the Log implementation to use.
* @param[in] log_arg Argument to pass to Log implementation's init().
*
* @return newly initialised Raft server
*/
raft_server_t* raft_new_with_log(const raft_log_impl_t *log_impl, void *log_arg);
/** De-initialise Raft server.
* Frees all memory */
void raft_destroy(raft_server_t* me);
/** De-initialise Raft server. */
void raft_clear(raft_server_t* me);
/** Restore term and vote after reading the metadata file from the disk.
*
* On a restart, the application should set term and vote after reading metadata
* file from the disk. See `raft_persist_metadata_f`.
*
* @param[in] raft The Raft server
* @param[in] term term in the metadata file
* @param[in] vote vote in the metadata file
* @return 0 on success
*/
int raft_restore_metadata(raft_server_t *me,
raft_term_t term,
raft_node_id_t vote);
/** Set callbacks and user data.
*
* @param[in] funcs Callbacks
* @param[in] user_data "User data" - user's context that's included in a callback */
void raft_set_callbacks(raft_server_t* me, raft_cbs_t* funcs, void* user_data);
/** Add node.
*
* If a voting node already exists the call will fail.
*
* @param[in] user_data The user data for the node.
* This is obtained using raft_node_get_udata.
* Examples of what this could be:
* - void* pointing to implementor's networking data
* - a (IP,Port) tuple
* @param[in] id The integer ID of this node
* This is used for identifying clients across sessions.
* @param[in] is_self Set to 1 if this "node" is this server
* @return
* node if it was successfully added;
* NULL if a voting node already exists */
raft_node_t* raft_add_node(raft_server_t* me, void* user_data, raft_node_id_t id, int is_self);
/** Add a node which does not participate in voting.