-
Notifications
You must be signed in to change notification settings - Fork 70
/
service.h
2777 lines (2265 loc) · 112 KB
/
service.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include "common.h"
#include <fs.h>
#include <network.h>
#include <switch.h>
#include <switch_ll.h>
#include <sys/mman.h>
#include <sys/sendfile.h>
#include <thread.h>
#include <ext/ebtree/eb64tree.h>
#include <thread>
#include <text.h>
#include <unordered_set>
#include <deque>
#include <switch_bitops.h>
#include <queue>
#include <zlib.h>
#include <crypto.h>
#include <switch_mallocators.h>
#include <condition_variable>
#include <atomic>
#include <unordered_map>
#ifdef __clang__
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wc99-designator"
#endif
// if HWM_UPDATE_BASED_ON_ACKS is defined, the current semantics apply:
// Assuming two producers PR1 and PR2, both publishing to partition P0.
// Assuming PR1 publishes a single message with LSN 100 with required acks = 2 (+1 for implicit local ack)
// and PR2 publishing a single message that gets LSN 101, with required acks = 1 (+1 for implicit local ack)
// If HWM_UPDATE_BASED_ON_ACKS is defined, then the HWM will advance as soon as both messages 100 and 101 are acknowledged.
//
// Assuming one of the RS nodes ask to cosume from 102. That will ackn. the message from PR2 (because it required
// just 1 ack. from remote RS peer). However, PR1's message can't be ackowledged yet (requires one more consumer to fetch
// from 101+). We would only acknowlege PR2's message if all other messages before it(i.e PR1's) were acknowledged, and we would only
// advance the HWMark to X as long as no other messages < X are not acknowledged yet.
// This is very powerful, but apparently it doesn't comply with Kafka semantics. Kafka semantics are much simpler to implement
//
// See: https://gist.github.com/markpapadakis/5c0e0ee74fe5fcc4d06fd87563c29236
//
// Kafka semantics in a nutshell, based on Gwen's input(thanks, Gwen!):
// - HWM advances based on the "lowest offset replicated by the entire ISR" (so really, just the lowest LSN
// in a ConsumePeer request among all nodes in the ISR)
// - Producers get an ack **regardless** of HWM advancement (i.e if producer P produces a message with LSN 100 and ack = 2(requires
// the implicit ack from the leader and 2 acks from the remote nodes in the ISR), then as soon as 2+ different remote peers
// ConsumePeer for LSN > 100, that producer P gets the ack).
//
//#define HWM_UPDATE_BASED_ON_ACKS 1
namespace TANKUtil {
// A handy class for reading ahead
// as an alternative to entering the kernel too often.
//
// This works great in practice, but it's not optimal.
//
// We should cache blocks of e.g 8k each. A simple eviction policy wouldn't really work
// because whenever we 'd get rid of an ROLog or the current log we 'd need to purge whatever blocks we had from those.
//
// However, we could also maintaina a RA cache per ROLog/iCur
// where the key is offset aligned to e.g 4096
// TODO: figure out a better alternative.
//
// We need a simple API where we request data by (offset, range)
// and we 'll get back either a str_view32 to that data, or a str_view32 to data copied somewhere if
// the data cross blocks boundaries
// see 2Q eviction algorithm
static constexpr const std::size_t read_ahead_default_stride = 4096;
struct range_start final {
bool first_bundle_is_sparse;
uint64_t abs_seqnum;
uint32_t file_offset;
};
template <size_t BUF_SIZE = read_ahead_default_stride>
class read_ahead final {
enum {
trace = false
};
private:
int fd{-1};
uint8_t buf[BUF_SIZE];
uint64_t start{0};
uint64_t end{0};
private:
auto at(const uint64_t offset) const TANK_NOEXCEPT_IF_NORUNTIME_CHECKS {
if (trace) {
SLog("Accessing offset = ", offset, ", start = ", start, ", end = ", end, "\n");
}
TANK_EXPECT(offset >= start);
TANK_EXPECT(offset < end);
return range_base<const uint8_t *, size_t>(buf + (offset - start),
end - offset);
}
auto buffer() const noexcept {
return str_view32(reinterpret_cast<const char *>(buf), end - start);
}
int read_impl([[maybe_unused]] const size_t bytes) {
enum {
trace = false,
};
const auto at = size();
const auto capacity = sizeof(buf) - at;
int r;
assert(fd > 2);
if (trace) {
struct stat st;
fstat(fd, &st);
SLog("Attempting to read ", capacity, " at ", end, " in buffer + ", at, " for fd = ", fd, "\n");
SLog("filesize = ", st.st_size, "\n");
}
for (;;) {
r = pread64(fd, buf + at, capacity, end);
if (-1 == r) {
if (EINTR == errno) {
continue;
} else {
if (trace) {
SLog("pread64() into ", at, " for ", capacity, " at ", end, " failed with:", strerror(errno), "\n");
}
return -1;
}
} else {
break;
}
}
if (trace) {
SLog("end += ", r, " => ", end + r, "\n");
}
end += r;
return 0;
}
bool reset_to(const uint64_t offset, const size_t n) {
start = offset;
end = offset;
return read_impl(sizeof(buf)) != -1;
}
bool expand(const uint64_t offset, const size_t n) {
const auto upto = offset + n;
if (upto <= end) {
// hit
return true;
}
const auto required = upto - end;
const auto buffer_len = size();
// can we expand the buffer?
if (required + buffer_len <= sizeof(buf)) {
return read_impl(required) != -1;
}
const auto useful_span = end - offset;
memmove(buf, buf + (offset - start), useful_span);
start = offset;
end = start + useful_span;
return read_impl(required) != -1;
}
bool read1(const uint64_t offset, const size_t n) {
if (offset < start) {
// optimized for read-ahead
// so we 'll just reset (i.e not optimised for this)
return reset_to(offset, n);
}
if (offset >= end) {
return reset_to(offset, n);
} else if (const auto upto = offset + n; upto < start) {
return reset_to(offset, n);
} else if (upto <= end) {
// hit
return true;
} else {
return expand(offset, n);
}
}
public:
range_base<const uint8_t *, std::size_t> read(const uint64_t offset, const size_t n) {
if (not read1(offset, n)) {
return {nullptr, 0};
}
return at(offset);
}
void clear() noexcept {
start = 0;
end = 0;
}
void reset_to(int _fd) {
assert(fd == -1 or fd > 2);
fd = _fd;
clear();
}
read_ahead(const int _fd = -1) TANK_NOEXCEPT_IF_NORUNTIME_CHECKS
: fd{_fd} {
//
assert(fd == -1 or fd > 2);
}
std::size_t size() const noexcept {
return end - start;
}
auto get_fd() const noexcept {
return fd;
}
};
} // namespace TANKUtil
struct index_record final {
uint32_t relSeqNum;
uint32_t absPhysical;
};
struct ro_segment_lookup_res final {
index_record record;
uint32_t span;
};
struct fd_handle final {
int fd{-1};
fd_handle(int f)
: fd{f} {
}
~fd_handle() {
if (fd != -1) {
fdatasync(fd);
TANKUtil::safe_close(fd);
}
}
};
struct adjust_range_start_cache_value final {
bool first_bundle_is_sparse;
uint32_t file_offset;
uint64_t seq_num;
};
struct topic_partition;
// A read-only (immutable, frozen-sealed) partition commit log(Segment) (and the index file for quick lookups)
// we don't need to acquire a lock to access this
//
// we need to ref-count ro_segments so that we can hand off the list of current ro segments to another thread for compaction, so that
// while the compaction is in progess, we won't delete any of those segments passed to the thread (i.e in consider_ro_segments() )
// For now, we can return immediately from consider_ro_segments() if compaction is scheduled for the partition, and later we can
// do this properly.
struct ro_segment final {
// the absolute sequence number of the first message in this segment
const uint64_t baseSeqNum;
// the absolute sequence number of the last message in this segment
// i.e this segment contains [baseSeqNum, lastAssignedSeqNum]
// See: https://github.com/phaistos-networks/TANK/issues/2 for rationale
uint64_t lastAvailSeqNum;
// For RO segments, this used to be set to the creation time of the
// mutable segment that was then turned into a R/O segment.
//
// This however turned out to be problematic, because retention logic would
// consider that timestamp for retentions, instead of what makes more sense, the time when
// the last message was appended to the mutable segment, before it was frozen as a RO segment.
//
// We need to encode this in the file path, because a process may update the mtime of the RO segment for whatever reason
// and so it's important that we do not depend on the file's mtime, and instead encode it in the path.
// see: https://github.com/phaistos-networks/TANK/issues/37
const uint32_t createdTS;
std::shared_ptr<fd_handle> fdh;
uint32_t fileSize;
// In order to support compactions (in the future), in the very improbable and unlikely case compaction leads
// to situations where because of deduplication we will end up having to store messages in a segment where any of those
// message.absSeqNum - segment.baseSeqNum > UINT32_MAX, and we don't want to just create a new immutable segment to deal with it(maybe because
// that'd lead to creating very small segments), then we encode {absSeqNum:u64, fileOffet:u32} instead in the immutable segment's index and
// to set that that, we 'll use a different name for those .index files.
//
// For now, the implementation is missing, but we 'll implement what's required if and when we need to do so.
const bool haveWideEntries;
// Every log file is associated with this skip-list index
struct
{
const uint8_t *data{nullptr};
uint32_t fileSize{0};
// last record in the index
index_record lastRecorded;
} index;
ro_segment(const uint64_t absSeqNum, const uint64_t lastAbsSeqNum, const uint32_t creationTS)
: baseSeqNum{absSeqNum}
, lastAvailSeqNum{lastAbsSeqNum}
, createdTS{creationTS}
, haveWideEntries{false} {
assert(not fdh);
}
ro_segment(const uint64_t absSeqNum, uint64_t lastAbsSeqNum, const str_view32 base, const uint32_t, const bool haveWideEntries);
~ro_segment() {
if (index.data && index.data != MAP_FAILED) {
munmap((void *)index.data, index.fileSize);
}
}
bool prepare_access(const topic_partition *);
};
struct timer_node final {
enum class ContainerType : uint8_t {
ConnEstTimeout = 0,
PeerConnEstTimeout,
Connection,
WaitCtx,
CleanupTracker,
ScheduleRenewConsulSess,
TryConsulClusterReg,
SchedConsulReq,
ShutdownConsumerConn,
ForceSetReactorStateIdle,
TryBecomeClusterLeader,
} type;
eb64_node node;
void reset() {
node.node.leaf_p = nullptr;
}
bool is_linked() const noexcept {
return node.node.leaf_p;
}
};
struct append_res final {
std::shared_ptr<fd_handle> fdh;
range32_t dataRange;
range_base<uint64_t, uint16_t> msgSeqNumRange;
};
struct lookup_res final {
enum class Fault : uint8_t {
NoFault = 0,
Empty,
BoundaryCheck,
PastMax,
AtEOF,
SystemFault,
} fault;
// This is set to either fileSize of the segment log file or lower if
// we are are setting a boundary based on last assigned committed sequence number phys.offset
// adjust_range() cannot exceed that offset
uint32_t fileOffsetCeiling;
std::shared_ptr<fd_handle> fdh;
// Absolute base sequence number of the first message in the first bundle
// of all bundles in the log chunk in range.
// Incremenent this by each bundle.header.msgSetMsgsCnt to compute the absolute sequence number
// of each bundle message (use post-increment!)
uint64_t absBaseSeqNum;
// file offset for the bundle with the first message == absBaseSeqNum
uint32_t fileOffset;
bool first_bundle_is_sparse;
lookup_res(lookup_res &&o)
: fault{o.fault}
, fileOffsetCeiling{o.fileOffsetCeiling}
, fdh(std::move(o.fdh))
, absBaseSeqNum{o.absBaseSeqNum}
, fileOffset{o.fileOffset}
, first_bundle_is_sparse{o.first_bundle_is_sparse} {
}
lookup_res(const lookup_res &o)
: fileOffsetCeiling{o.fileOffsetCeiling}
, fdh{o.fdh}
, absBaseSeqNum{o.absBaseSeqNum}
, fileOffset{o.fileOffset}
, first_bundle_is_sparse{o.first_bundle_is_sparse} {
}
lookup_res()
: fault{Fault::NoFault} {
}
lookup_res(std::shared_ptr<fd_handle> f, const uint32_t c, const uint64_t seqNum, const uint32_t o, const bool s)
: fault{Fault::NoFault}
, fileOffsetCeiling{c}
, fdh{f}
, absBaseSeqNum{seqNum}
, fileOffset{o}
, first_bundle_is_sparse{s} {
}
lookup_res(const Fault f)
: fault{f} {
}
auto &operator=(const lookup_res &o) {
fault = o.fault;
fileOffsetCeiling = o.fileOffsetCeiling;
fdh = o.fdh;
absBaseSeqNum = o.absBaseSeqNum;
fileOffset = o.fileOffset;
first_bundle_is_sparse = o.first_bundle_is_sparse;
return *this;
}
};
enum class CleanupPolicy : uint8_t {
DELETE = 0,
CLEANUP
};
extern struct partition_config final {
// Kafka defaults
size_t roSegmentsCnt{0}; // maximum segments to retain (0 all)
uint64_t roSegmentsSize{0};
uint64_t maxSegmentSize{1 * 1024 * 1024 * 1024};
size_t indexInterval{4096};
size_t maxIndexSize{10 * 1024 * 1024};
size_t maxRollJitterSecs{0};
size_t lastSegmentMaxAge{0}; // Kafka's default is 1 week, we don't want to explicitly specify a retention limit
size_t curSegmentMaxAge{86400 * 7}; // 1 week (soft limit)
size_t flushIntervalMsgs{0}; // never
size_t flushIntervalSecs{0}; // never
CleanupPolicy logCleanupPolicy{CleanupPolicy::DELETE};
float logCleanRatioMin{0.5}; //
} config;
static void PrintImpl(Buffer &out, const lookup_res &res) {
if (res.fault != lookup_res::Fault::NoFault) {
out.append("{fd = ", res.fdh ? res.fdh->fd : -1, ", absBaseSeqNum = ", res.absBaseSeqNum, ", fileOffset = ", res.fileOffset, "}");
} else {
out.append("{fault = ", unsigned(res.fault), "}");
}
}
// An append-only log for storing bundles, divided into segments
struct topic_partition;
struct topic_partition_log final {
// the absolute sequence number of the first available message across all log segments
uint64_t firstAvailableSeqNum;
// the absolute sequence number of the last available message across all log segments
uint64_t lastAssignedSeqNum{0};
topic_partition *partition;
std::atomic<bool> compacting{false};
// Whenever we cleanup, we update lastCleanupMaxSeqNum with the lastAvailSeqNum of the latest ro segment compacted
uint64_t lastCleanupMaxSeqNum{0};
auto first_dirty_offset() const {
return lastCleanupMaxSeqNum + 1;
}
struct {
std::shared_ptr<fd_handle> fdh;
// The absolute sequence number of the first message in the current segment
uint64_t baseSeqNum;
uint32_t fileSize;
// We are going to be updating the index and skiplist frequently
// This is always initialized to UINT32_MAX, so that we always index the first bundle in the segment, for impl.simplicity
uint32_t sinceLastUpdate;
// Computed whenever we roll/create a new mutable segment
uint32_t rollJitterSecs{0};
// When this was created, in seconds
uint32_t createdTS{0};
bool nameEncodesTS;
// make sure we flush when we rotate
robin_hood::unordered_map<uint64_t, adjust_range_start_cache_value> triangulation_cache;
struct {
TANKUtil::read_ahead<TANKUtil::read_ahead_default_stride> ra;
void clear() {
ra.clear();
}
} ra_proxy;
struct
{
int fd{-1};
// relative sequence number to (absolute base sequence number, file offset)
// See https://github.com/phaistos-networks/TANK/issues/63
robin_hood::unordered_map<uint64_t, std::pair<uint64_t, uint32_t>> cache;
// this is populated by append_bundle().
// When it reaches 64k or so entries, it's flushed.
//
// We consult it whenever we can, but because we only populate it with
// monotonically increasing sequence numbers, we also use a tiny cache(ondisk.cache)
// that protects us from what would otherwise need to access the on-disk memory mapped index
// and should provide us with some real befits
// relative sequence number => file physical offset
// relative sequence number = absSeqNum - baseSeqNum
std::vector<std::pair<uint32_t, uint32_t>> skipList;
// see above
bool haveWideEntries;
// We may access the index both directly, if ondisk is valid, and via the skipList
// This is so that we won't have to restore the skiplist on init
struct
{
const uint8_t *data;
uint32_t span;
// small cache that help with tailing semantics
// make sure we flush whenever we rotate
robin_hood::unordered_map<uint32_t, index_record> cache;
// last recorded tuple in the index; we need this here
struct
{
uint32_t relSeqNum;
uint32_t absPhysical;
} lastRecorded;
} ondisk;
} index;
// TODO:
// This is definitely not optimal
// We should perhaps have a fixed size cache of N lanes, each lane M slots wide
// For now, we 'll use a hash map though
std::unordered_map<uint64_t, TANKUtil::range_start> file_range_start_cache;
void reset_cache() {
file_range_start_cache.clear();
index.ondisk.cache.clear();
ra_proxy.clear();
}
struct
{
uint64_t pendingFlushMsgs{0};
uint32_t nextFlushTS;
} flush_state;
void sanity_checks() const {
// no-op
}
} cur; // the _current_ (latest) segment
partition_config config;
// a topic partition is comprised of a set of segments(log file, index file) which
// are immutable, and we don't need to serialize access to them, and a cur(rent) segment, which is not immutable.
//
// roSegments can also be atomically exchanged with a new vector, so we don't need to protect that either
// make sure roSegments is sorted
std::shared_ptr<std::vector<ro_segment *>> roSegments;
~topic_partition_log() {
if (auto ptr = reinterpret_cast<void *>(const_cast<uint8_t *>(cur.index.ondisk.data)); ptr && ptr != MAP_FAILED) {
munmap(ptr, cur.index.ondisk.span);
}
if (roSegments) {
for (ro_segment *it : *roSegments) {
delete it;
}
}
if (cur.index.fd != -1) {
fdatasync(cur.index.fd);
TANKUtil::safe_close(cur.index.fd);
}
}
lookup_res read_cur(const uint64_t absSeqNum, const uint32_t maxSize, const uint64_t maxAbsSeqNum);
lookup_res range_for(uint64_t, const uint32_t, uint64_t);
lookup_res range_for_immutable_segments(uint64_t, const uint32_t, uint64_t);
lookup_res no_immutable_segment(const bool);
lookup_res from_immutable_segment(const topic_partition_log *,
ro_segment *,
const uint64_t,
const uint32_t,
const uint64_t);
append_res append_bundle(const time_t, const void *bundle, const size_t bundleSize, const uint32_t bundleMsgsCnt, const uint64_t, const uint64_t);
// utility method: appends a non-sparse bundle
// This is handy for appending bundles to the internal topics/partitions
append_res append_bundle(const time_t ts, const void *bundle, const size_t bundleSize, const uint32_t bundleMsgsCnt) {
return append_bundle(ts, bundle, bundleSize, bundleMsgsCnt, 0, 0);
}
append_res append_msg(const time_t ts, const strwlen8_t key, const str_view32 msg);
bool should_roll(const uint32_t) const;
void roll(const uint64_t, const uint64_t);
void flush_index_skiplist();
bool may_switch_index_wide(const uint64_t);
void schedule_flush(const uint32_t);
void consider_ro_segments();
};
struct pending_compaction final {
pending_compaction *next;
char basePartitionPath[PATH_MAX];
std::vector<ro_segment *> prevSegments;
topic_partition_log *log;
};
using nodeid_t = uint16_t;
struct cluster_node;
namespace NodesPartitionsUpdates {
// topic number of replicas were reduded to new_size
struct reduced_rs final {
topic_partition *p;
uint16_t new_size;
};
// topic number of replicas increased; new replicas appended to
// p->cluster.replicas.nodes
// and size of p->rcluster.replicas.nodes is tracked on original_size
// (for convenience we will append and then later we resize back to original_size)
struct expanded_rs final {
topic_partition *p;
uint16_t original_size;
};
// partition leader has changed to n(can be nullptr)
struct leadership_promotion final {
topic_partition *p;
cluster_node *n;
};
} // namespace NodesPartitionsUpdates
struct connection;
// In order to support minBytes semantics, we will
// need to track produced data for each tracked topic partition, so that
// we will be able to flush once we can satisfy the minBytes semantics
struct wait_ctx_partition final {
std::shared_ptr<fd_handle> fdh;
uint64_t seqNum;
uint64_t hwmark_threshold;
range32_t range;
topic_partition *partition;
};
struct wait_ctx final {
connection *c; // connection of consumer or peer replication partition content from this node
TankAPIMsgType _msg; // ConsumePeer if this is for a peer replication content from this node
uint32_t requestId; // this is meaningfuil for Consume requests;
switch_dlist list; // ll for c->as.tank.waitCtxList
timer_node exp_tree_node; // node for timer
uint32_t minBytes;
// A request may involve multiple partitions
// minBytes applies to the sum of all captured content for all specified partitions
uint32_t capturedSize;
// we don't really need this anymore
wait_ctx_partition *find(const topic_partition *tp) const noexcept {
for (size_t i{0}; i < total_partitions; ++i) {
auto &it = partitions[i];
if (it.partition == tp) {
return const_cast<wait_ctx_partition *>(&it);
}
}
return nullptr;
}
uint16_t total_partitions;
wait_ctx_partition partitions[0];
};
// tracks an ISR(In-Sync-Replica) (node, partition)
struct isr_entry final {
// XXX: if this becomes expensive because
// we are tracking pointers, we can use a handle:u32 instead
// which just points to a global vector that holds all distinct partitions and nodes
cluster_node *node_;
topic_partition *partition_;
#ifdef HWM_UPDATE_BASED_ON_ACKS
// see partition::cluster::pending_client_produce_acks_tracker::pending_ack
// this is a very important optimization
//
// see topic_partition::cluster::pending_client_produce_acks_tracker
uint8_t partition_isr_node_id;
#endif
auto node() {
return node_;
}
auto partition() {
return partition_;
}
switch_dlist node_ll; // cluster_node::isr::list
switch_dlist partition_ll; // partition::Cluster::isr::list
// for tracking ack.expiration
switch_dlist pending_next_ack_ll;
uint64_t pending_next_ack_timeout;
#ifndef HWM_UPDATE_BASED_ON_ACKS
uint64_t last_msg_lsn;
#endif
void reset() {
node_ll.reset();
partition_ll.reset();
pending_next_ack_ll.reset();
#ifdef HWM_UPDATE_BASED_ON_ACKS
partition_isr_node_id = 0;
#endif
node_ = nullptr;
partition_ = nullptr;
#ifndef HWM_UPDATE_BASED_ON_ACKS
last_msg_lsn = 0;
#endif
}
};
struct connection_handle final {
connection *c_{nullptr};
uint64_t c_gen;
void reset() {
c_gen = std::numeric_limits<uint64_t>::max();
c_ = nullptr;
}
inline void set(connection *const c) TANK_NOEXCEPT_IF_NORUNTIME_CHECKS;
inline connection *get() noexcept;
inline const connection *get() const noexcept;
};
// For each client PRODUCE request, we collect
// all participants; the partitions involved
// in the request.
struct topic;
struct produce_response final {
struct {
connection_handle ch;
uint32_t req_id;
switch_dlist connection_ll; // c->as.tank.deferred_produce_responses_list.push_back(&dpr->connection_ll);
void reset() {
ch.reset();
req_id = 0;
connection_ll.reset();
}
} client_ctx;
// a partition participating in the produce op.
struct participant final {
str_view8 topic_name; // needed because topic can be == nullptr
topic *topic;
topic_partition *p;
uint16_t partition;
struct {
range_base<const uint8_t *, size_t> bundle;
uint64_t first_msg_seq_num;
} update;
enum class OpRes : uint8_t {
OK = 0,
ReadOnly,
InvalidSeqNums,
UnknownTopic,
UnknownPartition,
NoLeader,
OtherLeader,
Pending,
IO_Fault,
InsufficientReplicas,
} res;
};
#if 1
uint64_t gen;
#else
// instead of a gen, we can just use an rc
// so 1 for when associated with a node, and 1 for when in the std::deque of pending produce reqs to ack
uint8_t rc;
#endif
std::vector<participant> participants;
std::vector<char *> unknown_topic_names;
// a produce resposne may be DEFERRED
struct {
uint8_t pending_partitions;
struct {
// see deferred_produce_responses_expiration_list
switch_dlist ll;
uint64_t ts;
} expiration;
void reset() {
expiration.ll.reset();
expiration.ts = 0;
pending_partitions = 0;
}
} deferred;
void reset() {
participants.clear();
deferred.reset();
for (auto ptr : unknown_topic_names) {
std::free(ptr);
}
unknown_topic_names.clear();
client_ctx.reset();
}
};
struct repl_stream final {
topic_partition *partition; // fetching content for this partion
cluster_node *src; // from this peer
size_t min_fetch_size; // next request must be for at least as much data
switch_dlist repl_streams_ll; // links to cluster_state.replication_streams
connection_handle ch;
void reset() {
partition = nullptr;
src = nullptr;
min_fetch_size = 512;
repl_streams_ll.reset();
ch.reset();
}
};
struct cluster_node final {
const nodeid_t id;
bool available_{true}; // lock (session) set for the node ns key
Switch::endpoint ep{}; // if we have no endpoint, we consider it N/A
// TODO: if we fail to connect, block it
// for a while and enable it again later(when you do, try_replicate_from(node))
bool blocked{false};
cluster_node(const nodeid_t _id)
: id{_id} {
}
bool likely_reachable() const noexcept {
return available_ and not blocked;
}
bool available() const noexcept {
return available_ and ep;
}
struct Leadership final {
// track partitions this node is a leader for
size_t partitions_cnt{0};
switch_dlist list{&list, &list};
bool dirty{false};
// whenever needed, we will rebuild this list
// it will include all partitions <local node> node is a leader of, that we are interested in (i.e we
// are replica of)
// i.e all partitions this node is a leader of that are also replicated to the <local node>
std::unique_ptr<std::vector<topic_partition *>> local_replication_list;
} leadership;
// this node may also be a replica of 0+ partitions
// this rarely changes so we should use an array
std::vector<topic_partition *> replica_for;
bool is_replica_for(const topic_partition *) const TANK_NOEXCEPT_IF_NORUNTIME_CHECKS;
struct {
// A node may also be in the ISR of a partition
// this should change fairly often, so we need to
// use a list instead.
// see isr_entry
///
// A problem with this idea is that we may have 50k partitions
// and maybe just 2 nodes. We can't reallistically expect to scan a linked list, so we
// need a fast hash map instead.
// Thankfully we never have to use linear search to find anything in this lit
switch_dlist list{&list, &list};
uint16_t cnt{0}; // how many ISR lists is this node in?
auto size() const TANK_NOEXCEPT_IF_NORUNTIME_CHECKS {
TANK_EXPECT(cnt == list.size());
return cnt;
}
} isr;
struct {
// a connection from this node to that peer
// for CONSUMING from it
connection_handle ch;
// total replication streams in cluster_state.local_node.replication_streams
// that depend on this connection
size_t repl_streams_cnt{0};
} consume_conn;
struct {
switch_dlist ll{&ll, &ll};
uint64_t when{0};
} consume_retry_ctx;
};
struct node_partition_rel_update final {
cluster_node *n;
topic_partition *p;
bool assign; // if false, it's no longer assigned
};
struct cluster_nodeid_update final {
nodeid_t id;
Switch::endpoint ep;
bool have_owner;
};
struct topic;
struct topic_partition final {
enum class Flags : uint8_t {
// this is set when reset_partition_log()
// runs, so that subsequent calls to reset_partition_log() won't need to iterate the dir. contents
// it is cleared when we open() a file in the partition dir.
NoDataFiles = 1u << 0,
// Failed to persist messages
// likely ran out of disk space or disk is busted
IOFailed = 1u << 1,
};
uint16_t idx; // (0, ...)
uint32_t distinctId;
topic *owner{nullptr};
partition_config config;
uint8_t flags{0};
struct {