forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
txn_coord_sender.go
993 lines (908 loc) · 34.8 KB
/
txn_coord_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Tobias Schottdorf (tobias.schottdorf@gmail.com)
// Author: Spencer Kimball (spencer.kimball@gmail.com)
package kv
import (
"fmt"
"sync/atomic"
"time"
basictracer "github.com/opentracing/basictracer-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/base"
"github.com/cockroachdb/cockroach/internal/client"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/metric"
"github.com/cockroachdb/cockroach/util/protoutil"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/cockroachdb/cockroach/util/syncutil"
"github.com/cockroachdb/cockroach/util/timeutil"
"github.com/cockroachdb/cockroach/util/tracing"
"github.com/cockroachdb/cockroach/util/uuid"
)
const (
statusLogInterval = 5 * time.Second
opTxnCoordSender = "txn coordinator"
opHeartbeatLoop = "heartbeat"
)
var errNoState = errors.New("writing transaction timed out " +
"or ran on multiple coordinators")
// txnMetadata holds information about an ongoing transaction, as
// seen from the perspective of this coordinator. It records all
// keys (and key ranges) mutated as part of the transaction for
// resolution upon transaction commit or abort.
type txnMetadata struct {
// txn is a copy of the transaction record, updated with each request.
txn roachpb.Transaction
// keys stores key ranges affected by this transaction through this
// coordinator. By keeping this record, the coordinator will be able
// to update the write intent when the transaction is committed.
keys []roachpb.Span
// lastUpdateNanos is the latest wall time in nanos the client sent
// transaction operations to this coordinator. Accessed and updated
// atomically.
lastUpdateNanos int64
// Analogous to lastUpdateNanos, this is the wall time at which the
// transaction was instantiated.
firstUpdateNanos int64
// timeoutDuration is the time after which the transaction should be
// considered abandoned by the client. That is, when
// current_timestamp > lastUpdateTS + timeoutDuration.
timeoutDuration time.Duration
// txnEnd is closed when the transaction is aborted or committed,
// terminating the associated heartbeat instance.
txnEnd chan struct{}
}
// setLastUpdate updates the wall time (in nanoseconds) since the most
// recent client operation for this transaction through the coordinator.
func (tm *txnMetadata) setLastUpdate(nowNanos int64) {
atomic.StoreInt64(&tm.lastUpdateNanos, nowNanos)
}
// getLastUpdate atomically loads the nanosecond wall time of the most
// recent client operation.
func (tm *txnMetadata) getLastUpdate() int64 {
return atomic.LoadInt64(&tm.lastUpdateNanos)
}
// hasClientAbandonedCoord returns true if the transaction has not
// been updated by the client adding a request within the allowed
// timeout.
func (tm *txnMetadata) hasClientAbandonedCoord(nowNanos int64) bool {
timeout := nowNanos - tm.timeoutDuration.Nanoseconds()
return tm.getLastUpdate() < timeout
}
// TxnMetrics holds all metrics relating to KV transactions.
type TxnMetrics struct {
Aborts metric.Rates
Commits metric.Rates
Commits1PC metric.Rates // Commits which finished in a single phase
Abandons metric.Rates
Durations metric.Histograms
// Restarts is the number of times we had to restart the transaction.
Restarts *metric.Histogram
}
var (
metaAbortsRates = metric.Metadata{Name: "txn.aborts"}
metaCommitsRates = metric.Metadata{Name: "txn.commits"}
metaCommits1PCRates = metric.Metadata{Name: "txn.commits1PC"}
metaAbandonsRates = metric.Metadata{Name: "txn.abandons"}
metaDurationsHistograms = metric.Metadata{Name: "txn.durations"}
metaRestartsHistogram = metric.Metadata{Name: "txn.restarts"}
)
// MakeTxnMetrics returns a TxnMetrics struct that contains metrics.
func MakeTxnMetrics() TxnMetrics {
return TxnMetrics{
Aborts: metric.NewRates(metaAbortsRates),
Commits: metric.NewRates(metaCommitsRates),
Commits1PC: metric.NewRates(metaCommits1PCRates),
Abandons: metric.NewRates(metaAbandonsRates),
Durations: metric.NewLatency(metaDurationsHistograms),
Restarts: metric.NewHistogram(metaRestartsHistogram, 60*time.Second, 100, 3),
}
}
// A TxnCoordSender is an implementation of client.Sender which
// wraps a lower-level Sender (either a storage.Stores or a DistSender)
// to which it sends commands. It acts as a man-in-the-middle,
// coordinating transaction state for clients. After a transaction is
// started, the TxnCoordSender starts asynchronously sending heartbeat
// messages to that transaction's txn record, to keep it live. It also
// keeps track of each written key or key range over the course of the
// transaction. When the transaction is committed or aborted, it
// clears accumulated write intents for the transaction.
type TxnCoordSender struct {
ctx context.Context
wrapped client.Sender
clock *hlc.Clock
heartbeatInterval time.Duration
clientTimeout time.Duration
syncutil.Mutex // protects txns and txnStats
txns map[uuid.UUID]*txnMetadata // txn key to metadata
linearizable bool // enables linearizable behaviour
stopper *stop.Stopper
metrics TxnMetrics
}
var _ client.Sender = &TxnCoordSender{}
// NewTxnCoordSender creates a new TxnCoordSender for use from a KV
// distributed DB instance.
// ctx is the base context and is used for logs and traces when there isn't a
// more specific context available; it must have a Tracer set.
func NewTxnCoordSender(
ctx context.Context,
wrapped client.Sender,
clock *hlc.Clock,
linearizable bool,
stopper *stop.Stopper,
txnMetrics TxnMetrics,
) *TxnCoordSender {
if ctx == nil || tracing.TracerFromCtx(ctx) == nil {
panic("ctx with tracer must be supplied")
}
if ctx.Done() != nil {
panic("context with cancel or deadline")
}
tc := &TxnCoordSender{
ctx: ctx,
wrapped: wrapped,
clock: clock,
heartbeatInterval: base.DefaultHeartbeatInterval,
clientTimeout: defaultClientTimeout,
txns: map[uuid.UUID]*txnMetadata{},
linearizable: linearizable,
stopper: stopper,
metrics: txnMetrics,
}
tc.stopper.RunWorker(tc.startStats)
return tc
}
// startStats blocks and periodically logs transaction statistics (throughput,
// success rates, durations, ...). Note that this only captures write txns,
// since read-only txns are stateless as far as TxnCoordSender is concerned.
// stats).
func (tc *TxnCoordSender) startStats() {
res := time.Millisecond // for duration logging resolution
var statusLogTimer timeutil.Timer
defer statusLogTimer.Stop()
scale := metric.Scale1M
for {
statusLogTimer.Reset(statusLogInterval)
select {
case <-statusLogTimer.C:
statusLogTimer.Read = true
if !log.V(1) {
continue
}
// Take a snapshot of metrics. There's some chance of skew, since the snapshots are
// not done atomically, but that should be fine for these debug stats.
metrics := tc.metrics
durations := metrics.Durations[scale].Current()
restarts := metrics.Restarts.Current()
commitRate := metrics.Commits.Rates[scale].Value()
commit1PCRate := metrics.Commits1PC.Rates[scale].Value()
abortRate := metrics.Aborts.Rates[scale].Value()
abandonRate := metrics.Abandons.Rates[scale].Value()
// Show transaction stats over the last minute. Maybe this should be shorter in the future.
// We'll revisit if we get sufficient feedback.
totalRate := commitRate + abortRate + abandonRate
var pCommitted, pCommitted1PC, pAbandoned, pAborted float64
if totalRate > 0 {
pCommitted = 100 * (commitRate / totalRate)
pCommitted1PC = 100 * (commit1PCRate / totalRate)
pAborted = 100 * (abortRate / totalRate)
pAbandoned = 100 * (abandonRate / totalRate)
}
dMean := durations.Mean()
dDev := durations.StdDev()
dMax := durations.Max()
rMean := restarts.Mean()
rDev := restarts.StdDev()
rMax := restarts.Max()
num := durations.TotalCount()
log.Infof(tc.ctx,
"txn coordinator: %.2f txn/sec, %.2f/%.2f/%.2f/%.2f %%cmmt/cmmt1pc/abrt/abnd, %s/%s/%s avg/σ/max duration, %.1f/%.1f/%d avg/σ/max restarts (%d samples)",
totalRate, pCommitted, pCommitted1PC, pAborted, pAbandoned,
util.TruncateDuration(time.Duration(dMean), res),
util.TruncateDuration(time.Duration(dDev), res),
util.TruncateDuration(time.Duration(dMax), res),
rMean, rDev, rMax, num,
)
case <-tc.stopper.ShouldStop():
return
}
}
}
// Send implements the batch.Sender interface. If the request is part of a
// transaction, the TxnCoordSender adds the transaction to a map of active
// transactions and begins heartbeating it. Every subsequent request for the
// same transaction updates the lastUpdate timestamp to prevent live
// transactions from being considered abandoned and garbage collected.
// Read/write mutating requests have their key or key range added to the
// transaction's interval tree of key ranges for eventual cleanup via resolved
// write intents; they're tagged to an outgoing EndTransaction request, with
// the receiving replica in charge of resolving them.
func (tc *TxnCoordSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
// Start new or pick up active trace. From here on, there's always an active
// Trace, though its overhead is small unless it's sampled.
sp := opentracing.SpanFromContext(ctx)
var tracer opentracing.Tracer
if sp == nil {
tracer = tracing.TracerFromCtx(tc.ctx)
sp = tracer.StartSpan(opTxnCoordSender)
defer sp.Finish()
ctx = opentracing.ContextWithSpan(ctx, sp)
} else {
tracer = sp.Tracer()
}
startNS := tc.clock.PhysicalNow()
if ba.Txn != nil {
// If this request is part of a transaction...
if err := tc.maybeBeginTxn(&ba); err != nil {
return nil, roachpb.NewError(err)
}
// Associate the txnID with the trace. We need to do this after the
// maybeBeginTxn call. We set both a baggage item and a tag because only
// tags show up in the LIghtstep UI.
txnIDStr := ba.Txn.ID.String()
sp.SetTag("txnID", txnIDStr)
sp.SetBaggageItem("txnID", txnIDStr)
var et *roachpb.EndTransactionRequest
var hasET bool
{
var rArgs roachpb.Request
rArgs, hasET = ba.GetArg(roachpb.EndTransaction)
if hasET {
et = rArgs.(*roachpb.EndTransactionRequest)
if len(et.Key) != 0 {
return nil, roachpb.NewErrorf("EndTransaction must not have a Key set")
}
et.Key = ba.Txn.Key
if len(et.IntentSpans) > 0 {
// TODO(tschottdorf): it may be useful to allow this later.
// That would be part of a possible plan to allow txns which
// write on multiple coordinators.
return nil, roachpb.NewErrorf("client must not pass intents to EndTransaction")
}
}
}
if pErr := func() *roachpb.Error {
tc.Lock()
defer tc.Unlock()
if pErr := tc.maybeRejectClientLocked(ctx, *ba.Txn); pErr != nil {
return pErr
}
if !hasET {
return nil
}
// Everything below is carried out only when trying to commit.
// Populate et.IntentSpans, taking into account both any existing
// and new writes, and taking care to perform proper deduplication.
txnMeta := tc.txns[*ba.Txn.ID]
distinctSpans := true
if txnMeta != nil {
et.IntentSpans = txnMeta.keys
// Defensively set distinctSpans to false if we had any previous
// requests in this transaction. This effectively limits the distinct
// spans optimization to 1pc transactions.
distinctSpans = len(txnMeta.keys) == 0
}
ba.IntentSpanIterate(func(key, endKey roachpb.Key) {
et.IntentSpans = append(et.IntentSpans, roachpb.Span{
Key: key,
EndKey: endKey,
})
})
// TODO(peter): Populate DistinctSpans on all batches, not just batches
// which contain an EndTransactionRequest.
var distinct bool
// The request might already be used by an outgoing goroutine, so
// we can't safely mutate anything in-place (as MergeSpans does).
et.IntentSpans = append([]roachpb.Span(nil), et.IntentSpans...)
et.IntentSpans, distinct = roachpb.MergeSpans(et.IntentSpans)
ba.Header.DistinctSpans = distinct && distinctSpans
if len(et.IntentSpans) == 0 {
// If there aren't any intents, then there's factually no
// transaction to end. Read-only txns have all of their state
// in the client.
return roachpb.NewErrorf("cannot commit a read-only transaction")
}
if txnMeta != nil {
txnMeta.keys = et.IntentSpans
}
return nil
}(); pErr != nil {
return nil, pErr
}
if hasET && log.V(1) {
for _, intent := range et.IntentSpans {
log.Eventf(ctx, "intent: [%s,%s)", intent.Key, intent.EndKey)
}
}
}
// Embed the trace metadata into the header for use by RPC recipients. We need
// to do this after the maybeBeginTxn call above.
// TODO(tschottdorf): To get rid of the spurious alloc below we need to
// implement the carrier interface on ba.Header or make Span non-nullable,
// both of which force all of ba on the Heap. It's already there, so may
// not be a big deal, but ba should live on the stack. Also not easy to use
// a buffer pool here since anything that goes into the RPC layer could be
// used by goroutines we didn't wait for.
if ba.Trace == nil {
ba.Trace = &tracing.Span{}
} else {
// We didn't make this object but are about to mutate it, so we
// have to take a copy - the original might already have been
// passed to the RPC layer.
ba.Trace = protoutil.Clone(ba.Trace).(*tracing.Span)
}
if err := tracer.Inject(sp.Context(), basictracer.Delegator, ba.Trace); err != nil {
return nil, roachpb.NewError(err)
}
// Send the command through wrapped sender, taking appropriate measures
// on error.
var br *roachpb.BatchResponse
{
var pErr *roachpb.Error
br, pErr = tc.wrapped.Send(ctx, ba)
if _, ok := pErr.GetDetail().(*roachpb.OpRequiresTxnError); ok {
// TODO(tschottdorf): needs to keep the trace.
br, pErr = tc.resendWithTxn(ba)
}
if pErr = tc.updateState(startNS, ctx, ba, br, pErr); pErr != nil {
log.Eventf(ctx, "error: %s", pErr)
return nil, pErr
}
}
if br.Txn == nil {
return br, nil
}
if _, ok := ba.GetArg(roachpb.EndTransaction); !ok {
return br, nil
}
// If the --linearizable flag is set, we want to make sure that
// all the clocks in the system are past the commit timestamp
// of the transaction. This is guaranteed if either
// - the commit timestamp is MaxOffset behind startNS
// - MaxOffset ns were spent in this function
// when returning to the client. Below we choose the option
// that involves less waiting, which is likely the first one
// unless a transaction commits with an odd timestamp.
if tsNS := br.Txn.Timestamp.WallTime; startNS > tsNS {
startNS = tsNS
}
sleepNS := tc.clock.MaxOffset() -
time.Duration(tc.clock.PhysicalNow()-startNS)
if tc.linearizable && sleepNS > 0 {
defer func() {
if log.V(1) {
log.Infof(ctx, "%v: waiting %s on EndTransaction for linearizability", br.Txn.ID.Short(), util.TruncateDuration(sleepNS, time.Millisecond))
}
time.Sleep(sleepNS)
}()
}
if br.Txn.Status != roachpb.PENDING {
tc.Lock()
tc.cleanupTxnLocked(ctx, *br.Txn)
tc.Unlock()
}
return br, nil
}
// maybeRejectClientLocked checks whether the (transactional) request is in a
// state that prevents it from continuing, such as the coordinator having
// considered the client abandoned, or a heartbeat having reported an error.
func (tc *TxnCoordSender) maybeRejectClientLocked(
ctx context.Context,
txn roachpb.Transaction,
) *roachpb.Error {
if !txn.Writing {
return nil
}
txnMeta, ok := tc.txns[*txn.ID]
// Check whether the transaction is still tracked and has a chance of
// completing. It's possible that the coordinator learns about the
// transaction having terminated from a heartbeat, and GC queue correctness
// (along with common sense) mandates that we don't let the client
// continue.
switch {
case !ok:
log.VEventf(2, ctx, "rejecting unknown txn: %s", txn.ID)
// TODO(spencerkimball): Could add coordinator node ID to the
// transaction session so that we can definitively return the right
// error between these possible errors. Or update the code to make an
// educated guess based on the incoming transaction timestamp.
return roachpb.NewError(errNoState)
case txnMeta.txn.Status == roachpb.ABORTED:
txn := txnMeta.txn.Clone()
tc.cleanupTxnLocked(ctx, txn)
return roachpb.NewErrorWithTxn(roachpb.NewTransactionAbortedError(),
&txn)
case txnMeta.txn.Status == roachpb.COMMITTED:
txn := txnMeta.txn.Clone()
tc.cleanupTxnLocked(ctx, txn)
return roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError(
"transaction is already committed"), &txn)
default:
return nil
}
}
// maybeBeginTxn begins a new transaction if a txn has been specified
// in the request but has a nil ID. The new transaction is initialized
// using the name and isolation in the otherwise uninitialized txn.
// The Priority, if non-zero is used as a minimum.
//
// No transactional writes are allowed unless preceded by a begin
// transaction request within the same batch. The exception is if the
// transaction is already in state txn.Writing=true.
func (tc *TxnCoordSender) maybeBeginTxn(ba *roachpb.BatchRequest) error {
if len(ba.Requests) == 0 {
return errors.Errorf("empty batch with txn")
}
if ba.Txn.ID == nil {
// Create transaction without a key. The key is set when a begin
// transaction request is received.
// The initial timestamp may be communicated by a higher layer.
// If so, use that. Otherwise make up a new one.
timestamp := ba.Txn.OrigTimestamp
if timestamp == hlc.ZeroTimestamp {
timestamp = tc.clock.Now()
}
newTxn := roachpb.NewTransaction(ba.Txn.Name, nil, ba.UserPriority,
ba.Txn.Isolation, timestamp, tc.clock.MaxOffset().Nanoseconds())
// Use existing priority as a minimum. This is used on transaction
// aborts to ratchet priority when creating successor transaction.
if newTxn.Priority < ba.Txn.Priority {
newTxn.Priority = ba.Txn.Priority
}
ba.Txn = newTxn
}
// Check for a begin transaction to set txn key based on the key of
// the first transactional write. Also enforce that no transactional
// writes occur before a begin transaction.
var haveBeginTxn bool
for _, req := range ba.Requests {
args := req.GetInner()
if bt, ok := args.(*roachpb.BeginTransactionRequest); ok {
if haveBeginTxn || ba.Txn.Writing {
return errors.Errorf("begin transaction requested twice in the same transaction: %s", ba.Txn)
}
haveBeginTxn = true
if ba.Txn.Key == nil {
ba.Txn.Key = bt.Key
}
}
if roachpb.IsTransactionWrite(args) && !haveBeginTxn && !ba.Txn.Writing {
return errors.Errorf("transactional write before begin transaction")
}
}
return nil
}
// cleanupTxnLocked is called when a transaction ends. The transaction record
// is updated and the heartbeat goroutine signaled to clean up the transaction
// gracefully.
func (tc *TxnCoordSender) cleanupTxnLocked(ctx context.Context, txn roachpb.Transaction) {
log.Event(ctx, "coordinator stops")
txnMeta, ok := tc.txns[*txn.ID]
// The heartbeat might've already removed the record. Or we may have already
// closed txnEnd but we are racing with the heartbeat cleanup.
if !ok || txnMeta.txnEnd == nil {
return
}
// The supplied txn may be newer than the one in txnMeta, which is relevant
// for stats.
txnMeta.txn = txn
// Trigger heartbeat shutdown.
close(txnMeta.txnEnd)
txnMeta.txnEnd = nil
}
// unregisterTxn deletes a txnMetadata object from the sender
// and collects its stats. It assumes the lock is held. Returns
// the duration, restarts, finalized txn status, and whether the
// transaction committed on the 1PC fast path.
func (tc *TxnCoordSender) unregisterTxnLocked(txnID uuid.UUID) (
duration, restarts int64, status roachpb.TransactionStatus) {
txnMeta := tc.txns[txnID] // guaranteed to exist
if txnMeta == nil {
panic(fmt.Sprintf("attempt to unregister non-existent transaction: %s", txnID))
}
duration = tc.clock.PhysicalNow() - txnMeta.firstUpdateNanos
restarts = int64(txnMeta.txn.Epoch)
status = txnMeta.txn.Status
txnMeta.keys = nil
delete(tc.txns, txnID)
return duration, restarts, status
}
// heartbeatLoop periodically sends a HeartbeatTxn RPC to an extant transaction,
// stopping in the event the transaction is aborted or committed after
// attempting to resolve the intents. When the heartbeat stops, the transaction
// is unregistered from the coordinator.
//
// TODO(dan): The Context we use for this is currently the one from the first
// request in a Txn, but the semantics of this aren't good. Each context has its
// own associated lifetime and we're ignoring all but the first. It happens now
// that we pass the same one in every request, but it's brittle to rely on this
// forever.
// TODO(wiz): Update (*DBServer).Batch to not use context.TODO().
func (tc *TxnCoordSender) heartbeatLoop(ctx context.Context, txnID uuid.UUID) {
var tickChan <-chan time.Time
{
ticker := time.NewTicker(tc.heartbeatInterval)
tickChan = ticker.C
defer ticker.Stop()
}
defer func() {
tc.Lock()
duration, restarts, status := tc.unregisterTxnLocked(txnID)
tc.Unlock()
tc.updateStats(duration, restarts, status, false)
}()
var closer <-chan struct{}
// TODO(tschottdorf): this should join to the trace of the request
// which starts this goroutine.
sp := tracing.TracerFromCtx(tc.ctx).StartSpan(opHeartbeatLoop)
defer sp.Finish()
ctx = opentracing.ContextWithSpan(ctx, sp)
{
tc.Lock()
txnMeta := tc.txns[txnID] // do not leak to outer scope
closer = txnMeta.txnEnd
tc.Unlock()
}
if closer == nil {
// Avoid race in which a Txn is cleaned up before the heartbeat
// goroutine gets a chance to start.
return
}
// Loop with ticker for periodic heartbeats.
for {
select {
case <-tickChan:
if !tc.heartbeat(ctx, txnID) {
return
}
case <-closer:
// Transaction finished normally.
return
case <-ctx.Done():
// Note that if ctx is not cancellable, then ctx.Done() returns a nil
// channel, which blocks forever. In this case, the heartbeat loop is
// responsible for timing out transactions. If ctx.Done() is not nil, then
// then heartbeat loop ignores the timeout check and this case is
// responsible for client timeouts.
tc.tryAsyncAbort(txnID)
return
case <-tc.stopper.ShouldQuiesce():
return
}
}
}
// tryAsyncAbort (synchronously) grabs a copy of the txn proto and the intents
// (which it then clears from txnMeta), and asynchronously tries to abort the
// transaction.
func (tc *TxnCoordSender) tryAsyncAbort(txnID uuid.UUID) {
tc.Lock()
txnMeta := tc.txns[txnID]
// Clone the intents and the txn to avoid data races.
intentSpans, _ := roachpb.MergeSpans(append([]roachpb.Span(nil), txnMeta.keys...))
txnMeta.keys = nil
txn := txnMeta.txn.Clone()
tc.Unlock()
// Since we don't hold the lock continuously, it's possible that two aborts
// raced here. That's fine (and probably better than the alternative, which
// is missing new intents sometimes).
if txn.Status != roachpb.PENDING {
return
}
ba := roachpb.BatchRequest{}
ba.Txn = &txn
et := &roachpb.EndTransactionRequest{
Span: roachpb.Span{
Key: txn.Key,
},
Commit: false,
IntentSpans: intentSpans,
}
ba.Add(et)
if err := tc.stopper.RunAsyncTask(tc.ctx, func(ctx context.Context) {
// Use the wrapped sender since the normal Sender does not allow
// clients to specify intents.
if _, pErr := tc.wrapped.Send(ctx, ba); pErr != nil {
if log.V(1) {
log.Warningf(ctx, "abort due to inactivity failed for %s: %s ", txn, pErr)
}
}
}); err != nil {
log.Warning(tc.ctx, err)
}
}
func (tc *TxnCoordSender) heartbeat(ctx context.Context, txnID uuid.UUID) bool {
tc.Lock()
txnMeta := tc.txns[txnID]
txn := txnMeta.txn.Clone()
hasAbandoned := txnMeta.hasClientAbandonedCoord(tc.clock.PhysicalNow())
tc.Unlock()
if txn.Status != roachpb.PENDING {
// A previous iteration has already determined that the transaction is
// already finalized, so we wait for the client to realize that and
// want to keep our state for the time being (to dish out the right
// error once it returns).
return true
}
// Before we send a heartbeat, determine whether this transaction should be
// considered abandoned. If so, exit heartbeat. If ctx.Done() is not nil, then
// it is a cancellable Context and we skip this check and use the ctx lifetime
// instead of a timeout.
if ctx.Done() == nil && hasAbandoned {
if log.V(1) {
log.Infof(ctx, "transaction %s abandoned; stopping heartbeat", txnMeta.txn)
}
tc.tryAsyncAbort(txnID)
return false
}
ba := roachpb.BatchRequest{}
ba.Txn = &txn
hb := &roachpb.HeartbeatTxnRequest{
Now: tc.clock.Now(),
}
hb.Key = txn.Key
ba.Add(hb)
log.Event(ctx, "heartbeat")
br, pErr := tc.wrapped.Send(ctx, ba)
// Correctness mandates that when we can't heartbeat the transaction, we
// make sure the client doesn't keep going. This is particularly relevant
// in the case of an ABORTED transaction, but if we can't reach the
// transaction record at all, we're going to have to assume we're aborted
// as well.
if pErr != nil {
log.Warningf(ctx, "heartbeat to %s failed: %s", txn, pErr)
// We're not going to let the client carry out additional requests, so
// try to clean up.
tc.tryAsyncAbort(*txn.ID)
txn.Status = roachpb.ABORTED
} else {
txn.Update(br.Responses[0].GetInner().(*roachpb.HeartbeatTxnResponse).Txn)
}
// Give the news to the txn in the txns map. This will update long-running
// transactions (which may find out that they have to restart in that way),
// but in particular makes sure that they notice when they've been aborted
// (in which case we'll give them an error on their next request).
tc.Lock()
tc.txns[txnID].txn.Update(&txn)
tc.Unlock()
return true
}
// updateState updates the transaction state in both the success and
// error cases, applying those updates to the corresponding txnMeta
// object when adequate. It also updates certain errors with the
// updated transaction for use by client restarts.
func (tc *TxnCoordSender) updateState(
startNS int64,
ctx context.Context,
ba roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
) *roachpb.Error {
tc.Lock()
defer tc.Unlock()
if ba.Txn == nil {
// Not a transactional request.
return pErr
}
var newTxn roachpb.Transaction
newTxn.Update(ba.Txn)
if pErr == nil {
newTxn.Update(br.Txn)
} else if errTxn := pErr.GetTxn(); errTxn != nil {
newTxn.Update(errTxn)
}
switch t := pErr.GetDetail().(type) {
case *roachpb.TransactionStatusError:
// Likely already committed or more obscure errors such as epoch or
// timestamp regressions; consider txn dead.
if txn := pErr.GetTxn(); txn != nil {
defer tc.cleanupTxnLocked(ctx, *txn)
}
case *roachpb.OpRequiresTxnError:
panic("OpRequiresTxnError must not happen at this level")
case *roachpb.ReadWithinUncertaintyIntervalError:
// If the reader encountered a newer write within the uncertainty
// interval, we advance the txn's timestamp just past the last observed
// timestamp from the node.
restartTS, ok := newTxn.GetObservedTimestamp(pErr.OriginNode)
if !ok {
pErr = roachpb.NewError(errors.Errorf("no observed timestamp for node %d found on uncertainty restart", pErr.OriginNode))
} else {
newTxn.Timestamp.Forward(restartTS)
newTxn.Restart(ba.UserPriority, newTxn.Priority, newTxn.Timestamp)
}
case *roachpb.TransactionAbortedError:
// Increase timestamp if applicable.
newTxn.Timestamp.Forward(pErr.GetTxn().Timestamp)
newTxn.Priority = pErr.GetTxn().Priority
// Clean up the freshly aborted transaction in defer(), avoiding a
// race with the state update below.
defer tc.cleanupTxnLocked(ctx, newTxn)
case *roachpb.TransactionPushError:
// Increase timestamp if applicable, ensuring that we're
// just ahead of the pushee.
newTxn.Timestamp.Forward(t.PusheeTxn.Timestamp)
newTxn.Restart(ba.UserPriority, t.PusheeTxn.Priority-1, newTxn.Timestamp)
case *roachpb.TransactionRetryError:
// Increase timestamp so on restart, we're ahead of any timestamp
// cache entries or newer versions which caused the restart.
newTxn.Restart(ba.UserPriority, pErr.GetTxn().Priority, newTxn.Timestamp)
case *roachpb.WriteTooOldError:
newTxn.Restart(ba.UserPriority, newTxn.Priority, t.ActualTimestamp)
case nil:
// Nothing to do here, avoid the default case.
default:
// Do not clean up the transaction here since the client might still
// want to continue the transaction. For example, a client might
// continue its transaction after receiving ConditionFailedError, which
// can come from a unique index violation.
//
// TODO(bdarnell): Is this valid? Unless there is a single CPut in
// the batch, it is difficult to be able to continue after a
// ConditionFailedError because it is unclear which parts of the
// batch had succeeded on other ranges before one range hit the
// failed condition. It may be better to clean up the transaction here.
}
txnID := *newTxn.ID
txnMeta := tc.txns[txnID]
// For successful transactional requests, keep the written intents and
// the updated transaction record to be sent along with the reply.
// The transaction metadata is created with the first writing operation.
// A tricky edge case is that of a transaction which "fails" on the
// first writing request, but actually manages to write some intents
// (for example, due to being multi-range). In this case, there will
// be an error, but the transaction will be marked as Writing and the
// coordinator must track the state, for the client's retry will be
// performed with a Writing transaction which the coordinator rejects
// unless it is tracking it (on top of it making sense to track it;
// after all, it **has** laid down intents and only the coordinator
// can augment a potential EndTransaction call). See #3303.
if txnMeta != nil || pErr == nil || newTxn.Writing {
// Adding the intents even on error reduces the likelihood of dangling
// intents blocking concurrent writers for extended periods of time.
// See #3346.
var keys []roachpb.Span
if txnMeta != nil {
keys = txnMeta.keys
}
ba.IntentSpanIterate(func(key, endKey roachpb.Key) {
keys = append(keys, roachpb.Span{
Key: key,
EndKey: endKey,
})
})
if txnMeta != nil {
txnMeta.keys = keys
} else if len(keys) > 0 {
if !newTxn.Writing {
panic("txn with intents marked as non-writing")
}
// If the transaction is already over, there's no point in
// launching a one-off coordinator which will shut down right
// away. If we ended up here with an error, we'll always start
// the coordinator - the transaction has laid down intents, so
// we expect it to be committed/aborted at some point in the
// future.
if _, isEnding := ba.GetArg(roachpb.EndTransaction); pErr != nil || !isEnding {
log.Event(ctx, "coordinator spawns")
txnMeta = &txnMetadata{
txn: newTxn,
keys: keys,
firstUpdateNanos: startNS,
lastUpdateNanos: tc.clock.PhysicalNow(),
timeoutDuration: tc.clientTimeout,
txnEnd: make(chan struct{}),
}
tc.txns[txnID] = txnMeta
if err := tc.stopper.RunAsyncTask(ctx, func(ctx context.Context) {
tc.heartbeatLoop(ctx, txnID)
}); err != nil {
// The system is already draining and we can't start the
// heartbeat. We refuse new transactions for now because
// they're likely not going to have all intents committed.
// In principle, we can relax this as needed though.
tc.unregisterTxnLocked(txnID)
return roachpb.NewError(err)
}
} else {
// If this was a successful one phase commit, update stats
// directly as they won't otherwise be updated on heartbeat
// loop shutdown.
etArgs, ok := br.Responses[len(br.Responses)-1].GetInner().(*roachpb.EndTransactionResponse)
tc.updateStats(tc.clock.PhysicalNow()-startNS, 0, newTxn.Status, ok && etArgs.OnePhaseCommit)
}
}
}
// Update our record of this transaction, even on error.
if txnMeta != nil {
txnMeta.txn.Update(&newTxn)
if !txnMeta.txn.Writing {
panic("tracking a non-writing txn")
}
txnMeta.setLastUpdate(tc.clock.PhysicalNow())
}
if pErr == nil {
// For successful transactional requests, always send the updated txn
// record back. Note that we make sure not to share data with newTxn
// (which may have made it into txnMeta).
if br.Txn != nil {
br.Txn.Update(&newTxn)
} else {
clonedTxn := newTxn.Clone()
br.Txn = &clonedTxn
}
} else if pErr.GetTxn() != nil {
// Avoid changing existing errors because sometimes they escape into
// goroutines and data races can occur.
pErrShallow := *pErr
pErrShallow.SetTxn(&newTxn) // SetTxn clones newTxn
pErr = &pErrShallow
}
return pErr
}
// TODO(tschottdorf): this method is somewhat awkward but unless we want to
// give this error back to the client, our options are limited. We'll have to
// run the whole thing for them, or any restart will still end up at the client
// which will not be prepared to be handed a Txn.
func (tc *TxnCoordSender) resendWithTxn(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
// Run a one-off transaction with that single command.
if log.V(1) {
log.Infof(tc.ctx, "%s: auto-wrapping in txn and re-executing: ", ba)
}
// TODO(bdarnell): need to be able to pass other parts of DBContext
// through here.
dbCtx := client.DefaultDBContext()
dbCtx.UserPriority = ba.UserPriority
tmpDB := client.NewDBWithContext(tc, dbCtx)
var br *roachpb.BatchResponse
err := tmpDB.Txn(context.TODO(), func(txn *client.Txn) error {
txn.SetDebugName("auto-wrap", 0)
b := txn.NewBatch()
b.Header = ba.Header
for _, arg := range ba.Requests {
req := arg.GetInner()
b.AddRawRequest(req)
}
err := txn.CommitInBatch(b)
br = b.RawResponse()
return err
})
if err != nil {
return nil, roachpb.NewError(err)
}
br.Txn = nil // hide the evidence
return br, nil
}
// updateStats updates transaction metrics after a transaction finishes.
func (tc *TxnCoordSender) updateStats(duration, restarts int64, status roachpb.TransactionStatus, onePC bool) {
tc.metrics.Durations.RecordValue(duration)
tc.metrics.Restarts.RecordValue(restarts)
switch status {
case roachpb.ABORTED:
tc.metrics.Aborts.Add(1)
case roachpb.PENDING:
tc.metrics.Abandons.Add(1)
case roachpb.COMMITTED:
tc.metrics.Commits.Add(1)
if onePC {
tc.metrics.Commits1PC.Add(1)
}
}
}