forked from twmb/franz-go
/
txn.go
1209 lines (1085 loc) · 40.2 KB
/
txn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package kgo
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/tdx/franz-go/pkg/kerr"
"github.com/tdx/franz-go/pkg/kmsg"
)
// TransactionEndTry is simply a named bool.
type TransactionEndTry bool
const (
// TryAbort attempts to end a transaction with an abort.
TryAbort TransactionEndTry = false
// TryCommit attempts to end a transaction with a commit.
TryCommit TransactionEndTry = true
)
// GroupTransactSession abstracts away the proper way to begin and end a
// transaction when consuming in a group, modifying records, and producing
// (EOS).
//
// If you are running Kafka 2.5+, it is strongly recommended that you also use
// RequireStableFetchOffsets. See that config option's documentation for more
// details.
type GroupTransactSession struct {
cl *Client
cooperative bool
failMu sync.Mutex
revoked bool
revokedCh chan struct{} // closed once when revoked is set; reset after End
lost bool
lostCh chan struct{} // closed once when lost is set; reset after End
}
// NewGroupTransactSession is exactly the same as NewClient, but wraps the
// client's OnPartitionsRevoked / OnPartitionsLost to ensure that transactions
// are correctly aborted whenever necessary so as to properly provide EOS.
//
// When ETLing in a group in a transaction, if a rebalance happens before the
// transaction is ended, you either (a) must block the rebalance from finishing
// until you are done producing, and then commit before unblocking, or (b)
// allow the rebalance to happen, but abort any work you did.
//
// The problem with (a) is that if your ETL work loop is slow, you run the risk
// of exceeding the rebalance timeout and being kicked from the group. You will
// try to commit, and depending on the Kafka version, the commit may even be
// erroneously successful (pre Kafka 2.5). This will lead to duplicates.
//
// Instead, for safety, a GroupTransactSession favors (b). If a rebalance
// occurs at any time before ending a transaction with a commit, this will
// abort the transaction.
//
// This leaves the risk that ending the transaction itself exceeds the
// rebalance timeout, but this is just one request with no cpu logic. With a
// proper rebalance timeout, this single request will not fail and the commit
// will succeed properly.
//
// If this client detects you are talking to a pre-2.5 cluster, OR if you have
// not enabled RequireStableFetchOffsets, the client will sleep for 200ms after
// a successful commit to allow Kafka's txn markers to propagate. This is not
// foolproof in the event of some extremely unlikely communication patterns and
// **potentially** could allow duplicates. See this repo's transaction's doc
// for more details.
func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error) {
s := &GroupTransactSession{
revokedCh: make(chan struct{}),
lostCh: make(chan struct{}),
}
var noGroup error
// We append one option, which will get applied last. Because it is
// applied last, we can execute some logic and override some existing
// options.
opts = append(opts, groupOpt{func(cfg *cfg) {
if cfg.group == "" {
cfg.seedBrokers = nil // force a validation error
noGroup = errors.New("missing required group")
return
}
s.cooperative = cfg.cooperative()
userRevoked := cfg.onRevoked
cfg.onRevoked = func(ctx context.Context, cl *Client, rev map[string][]int32) {
s.failMu.Lock()
defer s.failMu.Unlock()
if s.revoked {
return
}
if s.cooperative && len(rev) == 0 && !s.revoked {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke with nothing to revoke; allowing next commit")
} else {
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_revoke; aborting next commit if we are currently in a transaction")
s.revoked = true
close(s.revokedCh)
}
if userRevoked != nil {
userRevoked(ctx, cl, rev)
}
}
userLost := cfg.onLost
cfg.onLost = func(ctx context.Context, cl *Client, lost map[string][]int32) {
s.failMu.Lock()
defer s.failMu.Unlock()
if s.lost {
return
}
cl.cfg.logger.Log(LogLevelInfo, "transact session in on_lost; aborting next commit if we are currently in a transaction")
s.lost = true
close(s.lostCh)
if userLost != nil {
userLost(ctx, cl, lost)
} else if userRevoked != nil {
userRevoked(ctx, cl, lost)
}
}
}})
cl, err := NewClient(opts...)
if err != nil {
if noGroup != nil {
err = noGroup
}
return nil, err
}
s.cl = cl
return s, nil
}
// Client returns the underlying client that this transact session wraps. This
// can be useful for functions that require a client, such as raw requests. The
// returned client should not be used to manage transactions (leave that to the
// GroupTransactSession).
func (s *GroupTransactSession) Client() *Client {
return s.cl
}
// Close is a wrapper around Client.Close, with the exact same semantics.
// Refer to that function's documentation.
//
// This function must be called to leave the group before shutting down.
func (s *GroupTransactSession) Close() {
s.cl.Close()
}
// PollFetches is a wrapper around Client.PollFetches, with the exact same
// semantics. Refer to that function's documentation.
//
// It is invalid to call PollFetches concurrently with Begin or End.
func (s *GroupTransactSession) PollFetches(ctx context.Context) Fetches {
return s.cl.PollFetches(ctx)
}
// PollRecords is a wrapper around Client.PollRecords, with the exact same
// semantics. Refer to that function's documentation.
//
// It is invalid to call PollRecords concurrently with Begin or End.
func (s *GroupTransactSession) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
return s.cl.PollRecords(ctx, maxPollRecords)
}
// ProduceSync is a wrapper around Client.ProduceSync, with the exact same
// semantics. Refer to that function's documentation.
//
// It is invalid to call ProduceSync concurrently with Begin or End.
func (s *GroupTransactSession) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults {
return s.cl.ProduceSync(ctx, rs...)
}
// Produce is a wrapper around Client.Produce, with the exact same semantics.
// Refer to that function's documentation.
//
// It is invalid to call Produce concurrently with Begin or End.
func (s *GroupTransactSession) Produce(ctx context.Context, r *Record, promise func(*Record, error)) {
s.cl.Produce(ctx, r, promise)
}
// TryProduce is a wrapper around Client.TryProduce, with the exact same
// semantics. Refer to that function's documentation.
//
// It is invalid to call TryProduce concurrently with Begin or End.
func (s *GroupTransactSession) TryProduce(ctx context.Context, r *Record, promise func(*Record, error)) {
s.cl.TryProduce(ctx, r, promise)
}
// Begin begins a transaction, returning an error if the client has no
// transactional id or is already in a transaction. Begin must be called
// before producing records in a transaction.
func (s *GroupTransactSession) Begin() error {
s.cl.cfg.logger.Log(LogLevelInfo, "beginning transact session")
return s.cl.BeginTransaction()
}
func (s *GroupTransactSession) failed() bool {
return s.revoked || s.lost
}
// End ends a transaction, committing if commit is true, if the group did not
// rebalance since the transaction began, and if committing offsets is
// successful. If any of these conditions are false, this aborts. This flushes
// or aborts depending on `commit`.
//
// This returns whether the transaction committed or any error that occurred.
// No returned error is retriable. Either the transactional ID has entered a
// failed state, or the client retried so much that the retry limit was hit,
// and odds are you should not continue. While a context is allowed, canceling
// it will likely leave the client in an invalid state. Canceling should only
// be done if you want to shut down.
func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry) (committed bool, err error) {
defer func() {
s.failMu.Lock()
s.revoked = false
s.revokedCh = make(chan struct{})
s.lost = false
s.lostCh = make(chan struct{})
s.failMu.Unlock()
}()
switch commit {
case TryCommit:
if err := s.cl.Flush(ctx); err != nil {
return false, err // we do not abort below, because an error here is ctx closing
}
case TryAbort:
if err := s.cl.AbortBufferedRecords(ctx); err != nil {
return false, err // same
}
}
wantCommit := bool(commit)
s.failMu.Lock()
failed := s.failed()
precommit := s.cl.CommittedOffsets()
postcommit := s.cl.UncommittedOffsets()
s.failMu.Unlock()
var hasAbortableCommitErr bool
var commitErr error
var g *groupConsumer
kip447 := false
if wantCommit && !failed {
isAbortableCommitErr := func(err error) bool {
// ILLEGAL_GENERATION: rebalance began and completed
// before we committed.
//
// REBALANCE_IN_PREGRESS: rebalance began, abort.
//
// COORDINATOR_NOT_AVAILABLE,
// COORDINATOR_LOAD_IN_PROGRESS,
// NOT_COORDINATOR: request failed too many times
//
// CONCURRENT_TRANSACTIONS: Kafka not harmonized,
// we can just abort.
//
// UNKNOWN_SERVER_ERROR: technically should not happen,
// but we can just abort. Redpanda returns this in
// certain versions.
switch {
case errors.Is(err, kerr.IllegalGeneration),
errors.Is(err, kerr.RebalanceInProgress),
errors.Is(err, kerr.CoordinatorNotAvailable),
errors.Is(err, kerr.CoordinatorLoadInProgress),
errors.Is(err, kerr.NotCoordinator),
errors.Is(err, kerr.ConcurrentTransactions),
errors.Is(err, kerr.UnknownServerError):
return true
}
return false
}
var commitErrs []string
committed := make(chan struct{})
g = s.cl.commitTransactionOffsets(context.Background(), postcommit,
func(_ *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) {
defer close(committed)
if err != nil {
if isAbortableCommitErr(err) {
hasAbortableCommitErr = true
return
}
commitErrs = append(commitErrs, err.Error())
return
}
kip447 = resp.Version >= 3
for _, t := range resp.Topics {
for _, p := range t.Partitions {
if err := kerr.ErrorForCode(p.ErrorCode); err != nil {
if isAbortableCommitErr(err) {
hasAbortableCommitErr = true
} else {
commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err))
}
}
}
}
},
)
<-committed
if len(commitErrs) > 0 {
commitErr = fmt.Errorf("unable to commit transaction offsets: %s", strings.Join(commitErrs, ", "))
}
}
// Now that we have committed our offsets, before we allow them to be
// used, we force a heartbeat. By forcing a heartbeat, if there is no
// error, then we know we have up to RebalanceTimeout to write our
// EndTxnRequest without a problem.
//
// We should not be booted from the group if we receive an ok
// heartbeat, meaning that, as mentioned, we should be able to end the
// transaction safely.
var okHeartbeat bool
if g != nil && commitErr == nil {
waitHeartbeat := make(chan struct{})
var heartbeatErr error
select {
case g.heartbeatForceCh <- func(err error) {
defer close(waitHeartbeat)
heartbeatErr = err
}:
select {
case <-waitHeartbeat:
okHeartbeat = heartbeatErr == nil
case <-s.revokedCh:
case <-s.lostCh:
}
case <-s.revokedCh:
case <-s.lostCh:
}
}
s.failMu.Lock()
// If we know we are KIP-447 and the user is requiring stable, we can
// unlock immediately because Kafka will itself block a rebalance
// fetching offsets from outstanding transactions.
//
// If either of these are false, we spin up a goroutine that sleeps for
// 200ms before unlocking to give Kafka a chance to avoid some odd race
// that would permit duplicates (i.e., what KIP-447 is preventing).
//
// This 200ms is not perfect but it should be well enough time on a
// stable cluster. On an unstable cluster, I still expect clients to be
// slower than intra-cluster communication, but there is a risk.
if kip447 && s.cl.cfg.requireStable {
defer s.failMu.Unlock()
} else {
defer func() {
if committed {
s.cl.cfg.logger.Log(LogLevelDebug, "sleeping 200ms before allowing a rebalance to continue to give the brokers a chance to write txn markers and avoid duplicates")
go func() {
time.Sleep(200 * time.Millisecond)
s.failMu.Unlock()
}()
} else {
s.failMu.Unlock()
}
}()
}
tryCommit := !s.failed() && commitErr == nil && !hasAbortableCommitErr && okHeartbeat
willTryCommit := wantCommit && tryCommit
s.cl.cfg.logger.Log(LogLevelInfo, "transaction session ending",
"was_failed", s.failed(),
"want_commit", wantCommit,
"can_try_commit", tryCommit,
"will_try_commit", willTryCommit,
)
// We have a few potential retryable errors from EndTransaction.
// OperationNotAttempted will be returned at most once.
//
// UnknownServerError should not be returned, but some brokers do:
// technically this is fatal, but there is no downside to retrying
// (even retrying a commit) and seeing if we are successful or if we
// get a better error.
var tries int
retry:
endTxnErr := s.cl.EndTransaction(ctx, TransactionEndTry(willTryCommit))
tries++
if endTxnErr != nil && tries < 10 {
switch {
case errors.Is(endTxnErr, kerr.OperationNotAttempted):
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit not attempted; retrying as abort")
willTryCommit = false
goto retry
case errors.Is(endTxnErr, kerr.UnknownServerError):
s.cl.cfg.logger.Log(LogLevelInfo, "end transaction with commit unknown server error; retrying")
after := time.NewTimer(s.cl.cfg.retryBackoff(tries))
select {
case <-after.C: // context canceled; we will see when we retry
case <-s.cl.ctx.Done():
after.Stop()
}
goto retry
}
}
if !willTryCommit || endTxnErr != nil {
currentCommit := s.cl.CommittedOffsets()
s.cl.cfg.logger.Log(LogLevelInfo, "transact session resetting to current committed state (potentially after a rejoin)",
"tried_commit", willTryCommit,
"commit_err", endTxnErr,
"state_precommit", precommit,
"state_currently_committed", currentCommit,
)
s.cl.setOffsets(currentCommit, false)
} else if willTryCommit && endTxnErr == nil {
s.cl.cfg.logger.Log(LogLevelInfo, "transact session successful, setting to newly committed state",
"tried_commit", willTryCommit,
"postcommit", postcommit,
)
s.cl.setOffsets(postcommit, false)
}
switch {
case commitErr != nil && endTxnErr == nil:
return false, commitErr
case commitErr == nil && endTxnErr != nil:
return false, endTxnErr
case commitErr != nil && endTxnErr != nil:
return false, endTxnErr
default: // both errs nil
committed = willTryCommit
return willTryCommit, nil
}
}
// BeginTransaction sets the client to a transactional state, erroring if there
// is no transactional ID, or if the producer is currently in a fatal
// (unrecoverable) state, or if the client is already in a transaction.
//
// This must not be called concurrently with other client functions.
func (cl *Client) BeginTransaction() error {
if cl.cfg.txnID == nil {
return errNotTransactional
}
cl.producer.txnMu.Lock()
defer cl.producer.txnMu.Unlock()
if cl.producer.inTxn {
return errors.New("invalid attempt to begin a transaction while already in a transaction")
}
needRecover, didRecover, err := cl.maybeRecoverProducerID()
if needRecover && !didRecover {
cl.cfg.logger.Log(LogLevelInfo, "unable to begin transaction due to unrecoverable producer id error", "err", err)
return fmt.Errorf("producer ID has a fatal, unrecoverable error, err: %w", err)
}
cl.producer.inTxn = true
atomic.StoreUint32(&cl.producer.producingTxn, 1) // allow produces for txns now
cl.cfg.logger.Log(LogLevelInfo, "beginning transaction", "transactional_id", *cl.cfg.txnID)
return nil
}
// EndBeginTxnHow controls the safety of how EndAndBeginTransaction executes.
type EndBeginTxnHow uint8
const (
// EndBeginTxnSafe ensures a "safe" execution of EndAndBeginTransaction
// at the expense of speed. This option blocks all produce requests and
// only resumes produce requests when onEnd finishes. Note that some
// produce requests may have finished successfully and records that
// were a part of a transaction may have their promises waiting to be
// called: not all promises are guaranteed to be called.
EndBeginTxnSafe EndBeginTxnHow = iota
// EndBeginTxnUnsafe opts for less safe EndAndBeginTransaction flow to
// achieve higher throughput. This option allows produce requests to
// continue while EndTxn actually commits. This is unsafe because a
// produce request itself only half begins a transaction. Internally,
// AddPartitionsToTxn actually begins a transaction. If your
// application dies before the client is able to successfully issue
// AddPartitionsToTxn, then a transaction will have partially begun
// within Kafka: the partial transaction will prevent the partition
// from being consumable past where the transaction begun, and the
// transaction will not timeout. You will have to restart your
// application with the SAME transactional ID and produce to all the
// same partitions to ensure to resume the transaction and unstick the
// partitions.
//
// Also note: this option does not work on all broker implementations.
// This relies on Kafka internals. Some brokers (notably Redpanda) are
// more strict with enforcing transaction correctness and this option
// cannot be used and will cause errors.
EndBeginTxnUnsafe
)
// EndAndBeginTransaction is a combination of EndTransaction and
// BeginTransaction, and relaxes the restriction that the client must have no
// buffered records. This function does not flush nor abort any buffered
// records. It is ok to concurrently produce while this function executes.
//
// This function has different safety guarantees which are up to the user to
// decide. See the documentation on EndBeginTxnHow for which you would like to
// choose.
//
// The onEnd function is called with your input context and the result of
// EndTransaction. Promises are paused while onEnd executes. If onEnd returns
// an error, BeginTransaction is not called and this function returns the
// result of onEnd. Otherwise, this function returns the result of
// BeginTransaction. See the documentation on EndTransaction and
// BeginTransaction for further details. It is invalid to call this function
// more than once at a time, and it is invalid to call concurrent with
// EndTransaction or BeginTransaction.
func (cl *Client) EndAndBeginTransaction(
ctx context.Context,
how EndBeginTxnHow,
commit TransactionEndTry,
onEnd func(context.Context, error) error,
) (rerr error) {
if g := cl.consumer.g; g != nil {
return errors.New("cannot use EndAndBeginTransaction with EOS")
}
cl.producer.txnMu.Lock()
defer cl.producer.txnMu.Unlock()
// From BeginTransaction: if we return with no error, we begin. Unlike
// BeginTransaction, we do not error if in a transaction, because we
// expect to be in one.
defer func() {
if rerr == nil {
needRecover, didRecover, err := cl.maybeRecoverProducerID()
if needRecover && !didRecover {
cl.cfg.logger.Log(LogLevelInfo, "unable to begin transaction due to unrecoverable producer id error", "err", err)
rerr = fmt.Errorf("producer ID has a fatal, unrecoverable error, err: %w", err)
return
}
cl.producer.inTxn = true
cl.cfg.logger.Log(LogLevelInfo, "beginning transaction", "transactional_id", *cl.cfg.txnID)
}
}()
// If end/beginning safely, we have to pause AddPartitionsToTxn and
// ProduceRequest, and we only resume after the user's onEnd has been
// called.
if how == EndBeginTxnSafe {
if err := cl.producer.pause(ctx); err != nil {
return err
}
defer cl.producer.resume()
}
// Before BeginTransaction, we block promises & call onEnd with whatever
// the return error is.
cl.producer.promisesMu.Lock()
var promisesUnblocked bool
unblockPromises := func() {
if promisesUnblocked {
return
}
promisesUnblocked = true
defer cl.producer.promisesMu.Unlock()
rerr = onEnd(ctx, rerr)
}
defer unblockPromises()
if !cl.producer.inTxn {
return nil
}
var anyAdded bool
var readd map[string][]int32
for topic, parts := range cl.producer.topics.load() {
for i, part := range parts.load().partitions {
if part.records.addedToTxn.swap(false) {
if how == EndBeginTxnUnsafe {
if readd == nil {
readd = make(map[string][]int32)
}
readd[topic] = append(readd[topic], int32(i))
}
anyAdded = true
}
}
}
anyAdded = anyAdded || cl.producer.readded
// EndTxn when no txn was started returns INVALID_TXN_STATE.
if !anyAdded {
cl.cfg.logger.Log(LogLevelDebug, "no records were produced during the commit; thus no transaction was began; ending without doing anything")
return nil
}
// From EndTransaction: if the pid has an error, we may try to recover.
id, epoch, err := cl.producerID()
if err != nil {
if commit {
return kerr.OperationNotAttempted
}
if _, didRecover, _ := cl.maybeRecoverProducerID(); didRecover {
return nil
}
}
cl.cfg.logger.Log(LogLevelInfo, "ending transaction",
"transactional_id", *cl.cfg.txnID,
"producer_id", id,
"epoch", epoch,
"commit", commit,
)
cl.producer.readded = false
err = cl.doWithConcurrentTransactions(ctx, "EndTxn", func() error {
req := kmsg.NewPtrEndTxnRequest()
req.TransactionalID = *cl.cfg.txnID
req.ProducerID = id
req.ProducerEpoch = epoch
req.Commit = bool(commit)
resp, err := req.RequestWith(ctx, cl)
if err != nil {
return err
}
// When ending a transaction, if the user is using unsafe mode,
// there is a logic race where the user can actually end before
// AddPartitionsToTxn is issued. This should be rare and is
// most likely only to happen whenever a new transaction is
// starting from a not-in-transaction state (i.e., the first
// transaction). If we see InvalidTxnState in unsafe mode, we
// assume that a transaction was not actually begun and we
// return success.
//
// In Kafka, InvalidTxnState is also returned when producing
// non-transactional records from a producer that is currently
// in a transaction.
//
// All other cases it is returned is in EndTxn:
// * state == CompleteCommit and EndTxn != commit
// * state == CompleteAbort and EndTxn != abort
// * state == PrepareCommit and EndTxn != commit (otherwise, returns concurrent transactions)
// * state == PrepareAbort and EndTxn != abort (otherwise, returns concurrent transactions)
// * state == Empty
//
// This basically guards against the final case, all others are
// Kafka internal state transitioning and we should never hit
// them.
if how == EndBeginTxnUnsafe && resp.ErrorCode == kerr.InvalidTxnState.Code {
return nil
}
return kerr.ErrorForCode(resp.ErrorCode)
})
var ke *kerr.Error
if errors.As(err, &ke) && !ke.Retriable {
cl.failProducerID(id, epoch, err)
}
if err != nil || how != EndBeginTxnUnsafe {
return err
}
unblockPromises()
// If we are end/beginning unsafely, then we need to re-add all
// partitions to a new transaction immediately. Timing makes it
// impossible to know what was truly added before EndTxn, so we
// pessimistically assume that every partition must be re-added.
//
// We track readd before the txn and swap those to un-added, but we
// also need to track anything that is newly added that raced with our
// EndTxn. We swap before the txn to ensure that *eventually*,
// partitions will be tracked as not in a transaction if people stop
// producing.
//
// We do this before the user callback because we *need* to start a new
// transaction within Kafka to ensure there will be a timeout. Per the
// unsafe aspect, the client could die or this request could error and
// there could be a stranded txn within Kafka's ProducerStateManager,
// but ideally the user will reconnect with the same txnal id.
cl.producer.readded = true
return cl.doWithConcurrentTransactions(ctx, "AddPartitionsToTxn", func() error {
req := kmsg.NewPtrAddPartitionsToTxnRequest()
req.TransactionalID = *cl.cfg.txnID
req.ProducerID = id
req.ProducerEpoch = epoch
for topic, parts := range cl.producer.topics.load() {
for i, part := range parts.load().partitions {
if part.records.addedToTxn.get() {
readd[topic] = append(readd[topic], int32(i))
}
}
}
ps := make(map[int32]struct{})
for topic, parts := range readd {
t := kmsg.NewAddPartitionsToTxnRequestTopic()
t.Topic = topic
for _, part := range parts {
ps[part] = struct{}{}
}
for p := range ps {
t.Partitions = append(t.Partitions, p)
delete(ps, p)
}
if len(t.Partitions) > 0 {
req.Topics = append(req.Topics, t)
}
}
resp, err := req.RequestWith(ctx, cl)
if err != nil {
return err
}
for i := range resp.Topics {
t := &resp.Topics[i]
for j := range t.Partitions {
p := &t.Partitions[j]
if err := kerr.ErrorForCode(p.ErrorCode); err != nil {
return err
}
}
}
return nil
})
}
// AbortBufferedRecords fails all unflushed records with ErrAborted and waits
// for there to be no buffered records.
//
// This accepts a context to quit the wait early, but quitting the wait may
// lead to an invalid state and should only be used if you are quitting your
// application. This function waits to abort records at safe points: if records
// are known to not be in flight. This function is safe to call multiple times
// concurrently, and safe to call concurrent with Flush.
func (cl *Client) AbortBufferedRecords(ctx context.Context) error {
atomic.AddInt32(&cl.producer.aborting, 1)
defer atomic.AddInt32(&cl.producer.aborting, -1)
cl.cfg.logger.Log(LogLevelInfo, "producer state set to aborting; continuing to wait via flushing")
defer cl.cfg.logger.Log(LogLevelDebug, "aborted buffered records")
// We must clear unknown topics ourselves, because flush just waits
// like normal.
p := &cl.producer
p.unknownTopicsMu.Lock()
for _, unknown := range p.unknownTopics {
select {
case unknown.fatal <- ErrAborting:
default:
}
}
p.unknownTopicsMu.Unlock()
// Setting the aborting state allows records to fail before
// or after produce requests; thus, now we just flush.
return cl.Flush(ctx)
}
// EndTransaction ends a transaction and resets the client's internal state to
// not be in a transaction.
//
// Flush and CommitOffsetsForTransaction must be called before this function;
// this function does not flush and does not itself ensure that all buffered
// records are flushed. If no record yet has caused a partition to be added to
// the transaction, this function does nothing and returns nil. Alternatively,
// AbortBufferedRecords should be called before aborting a transaction to
// ensure that any buffered records not yet flushed will not be a part of a new
// transaction.
//
// If the producer ID has an error and you are trying to commit, this will
// return with kerr.OperationNotAttempted. If this happened, retry
// EndTransaction with TryAbort. Not other error is retriable, and you should
// not retry with TryAbort.
//
// If records failed with UnknownProducerID and your Kafka version is at least
// 2.5, then aborting here will potentially allow the client to recover for
// more production.
//
// Note that canceling the context will likely leave the client in an
// undesirable state, because canceling the context may cancel the in-flight
// EndTransaction request, making it impossible to know whether the commit or
// abort was successful. It is recommended to not cancel the context.
func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) error {
cl.producer.txnMu.Lock()
defer cl.producer.txnMu.Unlock()
if !cl.producer.inTxn {
return nil
}
cl.producer.inTxn = false
atomic.StoreUint32(&cl.producer.producingTxn, 0) // forbid any new produces while ending txn
// anyAdded tracks if any partitions were added to this txn, because
// any partitions written to triggers AddPartitionToTxn, which triggers
// the txn to actually begin within Kafka.
//
// If we consumed at all but did not produce, the transaction ending
// issues AddOffsetsToTxn, which internally adds a __consumer_offsets
// partition to the transaction. Thus, if we added offsets, then we
// also produced.
var anyAdded bool
if g := cl.consumer.g; g != nil {
// We do not lock because we expect commitTransactionOffsets to
// be called *before* ending a transaction.
if g.offsetsAddedToTxn {
g.offsetsAddedToTxn = false
anyAdded = true
}
} else {
cl.cfg.logger.Log(LogLevelDebug, "transaction ending, no group loaded; this must be a producer-only transaction, not consume-modify-produce EOS")
}
// After the flush, no records are being produced to, and we can set
// addedToTxn to false outside of any mutex.
for _, parts := range cl.producer.topics.load() {
for _, part := range parts.load().partitions {
anyAdded = part.records.addedToTxn.swap(false) || anyAdded
}
}
// If the user previously used EndAndBeginTransaction with
// EndBeginTxnUnsafe, we may have to end a transaction even though
// nothing may be in it.
anyAdded = anyAdded || cl.producer.readded
// If no partition was added to a transaction, then we have nothing to commit.
//
// Note that anyAdded is true if the producer ID was failed, meaning we will
// get to the potential recovery logic below if necessary.
if !anyAdded {
cl.cfg.logger.Log(LogLevelDebug, "no records were produced during the commit; thus no transaction was began; ending without doing anything")
return nil
}
id, epoch, err := cl.producerID()
if err != nil {
if commit {
return kerr.OperationNotAttempted
}
// If we recovered the producer ID, we return early, since
// there is no reason to issue an abort now that the id is
// different. Otherwise, we issue our EndTxn which will likely
// fail, but that is ok, we will just return error.
_, didRecover, _ := cl.maybeRecoverProducerID()
if didRecover {
return nil
}
}
cl.cfg.logger.Log(LogLevelInfo, "ending transaction",
"transactional_id", *cl.cfg.txnID,
"producer_id", id,
"epoch", epoch,
"commit", commit,
)
cl.producer.readded = false
err = cl.doWithConcurrentTransactions(ctx, "EndTxn", func() error {
req := kmsg.NewPtrEndTxnRequest()
req.TransactionalID = *cl.cfg.txnID
req.ProducerID = id
req.ProducerEpoch = epoch
req.Commit = bool(commit)
resp, err := req.RequestWith(ctx, cl)
if err != nil {
return err
}
return kerr.ErrorForCode(resp.ErrorCode)
})
// If the returned error is still a Kafka error, this is fatal and we
// need to fail our producer ID we loaded above.
//
// UNKNOWN_SERVER_ERROR can theoretically be returned (not all brokers
// do). This technically is fatal, but we do not really know whether it
// is. We can just return this error and let the caller decide to
// continue, if the caller does continue, we will try something and
// eventually then receive our proper transactional error, if any.
var ke *kerr.Error
if errors.As(err, &ke) && !ke.Retriable && ke.Code != kerr.UnknownServerError.Code {
cl.failProducerID(id, epoch, err)
}
return err
}
// This returns if it is necessary to recover the producer ID (it has an
// error), whether it is possible to recover, and, if not, the error.
//
// We call this when beginning a transaction or when ending with an abort.
func (cl *Client) maybeRecoverProducerID() (necessary, did bool, err error) {
cl.producer.mu.Lock()
defer cl.producer.mu.Unlock()
id, epoch, err := cl.producerID()
if err == nil {
return false, false, nil
}
var ke *kerr.Error
if ok := errors.As(err, &ke); !ok {
return true, false, err
}
kip360 := cl.producer.idVersion >= 3 && (errors.Is(ke, kerr.UnknownProducerID) || errors.Is(ke, kerr.InvalidProducerIDMapping))
kip588 := cl.producer.idVersion >= 4 && errors.Is(ke, kerr.InvalidProducerEpoch /* || err == kerr.TransactionTimedOut when implemented in Kafka */)
recoverable := kip360 || kip588
if !recoverable {
return true, false, err // fatal, unrecoverable
}
// Storing errReloadProducerID will reset sequence numbers as appropriate
// when the producer ID is reloaded successfully.
cl.producer.id.Store(&producerID{
id: id,
epoch: epoch,
err: errReloadProducerID,
})
return true, true, nil
}
// If a transaction is begun too quickly after finishing an old transaction,
// Kafka may still be finalizing its commit / abort and will return a
// concurrent transactions error. We handle that by retrying for a bit.
func (cl *Client) doWithConcurrentTransactions(ctx context.Context, name string, fn func() error) error {
start := time.Now()
tries := 0
backoff := cl.cfg.txnBackoff
start:
err := fn()
if errors.Is(err, kerr.ConcurrentTransactions) {
// The longer we are stalled, the more we enforce a minimum
// backoff.
since := time.Since(start)
switch {
case since > time.Second:
if backoff < 200*time.Millisecond {
backoff = 200 * time.Millisecond
}
case since > 5*time.Second/2:
if backoff < 500*time.Millisecond {
backoff = 500 * time.Millisecond
}
case since > 5*time.Second:
if backoff < time.Second {
backoff = time.Second
}
}
tries++
cl.cfg.logger.Log(LogLevelDebug, fmt.Sprintf("%s failed with CONCURRENT_TRANSACTIONS, which may be because we ended a txn and began producing in a new txn too quickly; backing off and retrying", name),
"backoff", backoff,
"since_request_tries_start", time.Since(start),
"tries", tries,
)
select {
case <-time.After(backoff):
case <-ctx.Done():
cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("abandoning %s retry due to client ctx quitting", name))
return err
case <-cl.ctx.Done():
cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("abandoning %s retry due to client ctx quitting", name))
return err
}
goto start
}
return err
}
////////////////////////////////////////////////////////////////////////////////////////////
// TRANSACTIONAL COMMITTING //
// MOSTLY DUPLICATED CODE DUE TO NO GENERICS AND BECAUSE THE TYPES ARE SLIGHTLY DIFFERENT //
////////////////////////////////////////////////////////////////////////////////////////////
// commitTransactionOffsets is exactly like CommitOffsets, but specifically for