forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mvcc.go
1480 lines (1357 loc) · 52.4 KB
/
mvcc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Jiang-Ming Yang (jiangming.yang@gmail.com)
// Author: Spencer Kimball (spencer.kimball@gmail.com)
package engine
import (
"bytes"
"fmt"
"math"
"sync"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/encoding"
"github.com/cockroachdb/cockroach/util/log"
gogoproto "github.com/gogo/protobuf/proto"
)
const (
// The size of the reservoir used by FindSplitKey.
splitReservoirSize = 100
// The size of the timestamp portion of MVCC version keys (used to update stats).
mvccVersionTimestampSize int64 = 12
)
// MergeStats merges accumulated stats to stat counters for specified range.
func MergeStats(ms *proto.MVCCStats, engine Engine, raftID int64) {
MVCCMergeRangeStat(engine, raftID, StatLiveBytes, ms.LiveBytes)
MVCCMergeRangeStat(engine, raftID, StatKeyBytes, ms.KeyBytes)
MVCCMergeRangeStat(engine, raftID, StatValBytes, ms.ValBytes)
MVCCMergeRangeStat(engine, raftID, StatIntentBytes, ms.IntentBytes)
MVCCMergeRangeStat(engine, raftID, StatLiveCount, ms.LiveCount)
MVCCMergeRangeStat(engine, raftID, StatKeyCount, ms.KeyCount)
MVCCMergeRangeStat(engine, raftID, StatValCount, ms.ValCount)
MVCCMergeRangeStat(engine, raftID, StatIntentCount, ms.IntentCount)
MVCCMergeRangeStat(engine, raftID, StatIntentAge, ms.IntentAge)
MVCCMergeRangeStat(engine, raftID, StatGCBytesAge, ms.GCBytesAge)
MVCCMergeRangeStat(engine, raftID, StatLastUpdateNanos, ms.LastUpdateNanos)
}
// SetStats sets stat counters for specified range.
func SetStats(ms *proto.MVCCStats, engine Engine, raftID int64) {
MVCCSetRangeStat(engine, raftID, StatLiveBytes, ms.LiveBytes)
MVCCSetRangeStat(engine, raftID, StatKeyBytes, ms.KeyBytes)
MVCCSetRangeStat(engine, raftID, StatValBytes, ms.ValBytes)
MVCCSetRangeStat(engine, raftID, StatIntentBytes, ms.IntentBytes)
MVCCSetRangeStat(engine, raftID, StatLiveCount, ms.LiveCount)
MVCCSetRangeStat(engine, raftID, StatKeyCount, ms.KeyCount)
MVCCSetRangeStat(engine, raftID, StatValCount, ms.ValCount)
MVCCSetRangeStat(engine, raftID, StatIntentCount, ms.IntentCount)
MVCCSetRangeStat(engine, raftID, StatIntentAge, ms.IntentAge)
MVCCSetRangeStat(engine, raftID, StatGCBytesAge, ms.GCBytesAge)
MVCCSetRangeStat(engine, raftID, StatLastUpdateNanos, ms.LastUpdateNanos)
}
// updateStatsForKey returns whether or not the bytes and counts for
// the specified key should be tracked. Local keys are excluded.
func updateStatsForKey(ms *proto.MVCCStats, key proto.Key) bool {
return ms != nil && !key.Less(KeyLocalMax)
}
// updateStatsForInline updates stat counters for an inline value.
// These are simpler as they don't involve intents or multiple
// versions.
func updateStatsForInline(ms *proto.MVCCStats, key proto.Key, origMetaKeySize, origMetaValSize, metaKeySize, metaValSize int64) {
if !updateStatsForKey(ms, key) {
return
}
// Remove counts for this key if the original size is non-zero.
if origMetaKeySize != 0 {
ms.LiveBytes -= (origMetaKeySize + origMetaValSize)
ms.LiveCount--
ms.KeyBytes -= origMetaKeySize
ms.ValBytes -= origMetaValSize
ms.KeyCount--
ms.ValCount--
}
// Add counts for this key if the new size is non-zero.
if metaKeySize != 0 {
ms.LiveBytes += metaKeySize + metaValSize
ms.LiveCount++
ms.KeyBytes += metaKeySize
ms.ValBytes += metaValSize
ms.KeyCount++
ms.ValCount++
}
}
// updateStatsOnMerge updates metadata stats while merging inlined
// values. Unfortunately, we're unable to keep accurate stats on merge
// as the actual details of the merge play out asynchronously during
// compaction. Instead, we undercount by adding only the size of the
// value.Bytes byte slice. These errors are corrected during splits
// and merges.
func updateStatsOnMerge(ms *proto.MVCCStats, key proto.Key, valSize int64) {
if !updateStatsForKey(ms, key) {
return
}
ms.LiveBytes += valSize
ms.ValBytes += valSize
}
// updateStatsOnPut updates stat counters for a newly put value,
// including both the metadata key & value bytes and the mvcc
// versioned value's key & value bytes. If the value is not a
// deletion tombstone, updates the live stat counters as well.
// If this value is an intent, updates the intent counters.
func updateStatsOnPut(ms *proto.MVCCStats, key proto.Key, origMetaKeySize, origMetaValSize,
metaKeySize, metaValSize int64, orig, meta *proto.MVCCMetadata, origAgeSeconds int64) {
if !updateStatsForKey(ms, key) {
return
}
// Remove current live counts for this key.
if orig != nil {
// If original version value for this key wasn't deleted, subtract
// its contribution from live bytes in anticipation of adding in
// contribution from new version below.
if !orig.Deleted {
ms.LiveBytes -= orig.KeyBytes + orig.ValBytes + origMetaKeySize + origMetaValSize
ms.LiveCount--
// Also, add the bytes from overwritten value to the GC'able bytes age stat.
ms.GCBytesAge += MVCCComputeGCBytesAge(orig.KeyBytes+orig.ValBytes, origAgeSeconds)
} else {
// Remove the meta byte previously counted for deleted value from GC'able bytes age stat.
ms.GCBytesAge -= MVCCComputeGCBytesAge(origMetaKeySize+origMetaValSize, origAgeSeconds)
}
ms.KeyBytes -= origMetaKeySize
ms.ValBytes -= origMetaValSize
ms.KeyCount--
// If the original metadata for this key was an intent, subtract
// its contribution from stat counters as it's being replaced.
if orig.Txn != nil {
// Subtract counts attributable to intent we're replacing.
ms.KeyBytes -= orig.KeyBytes
ms.ValBytes -= orig.ValBytes
ms.ValCount--
ms.IntentBytes -= (orig.KeyBytes + orig.ValBytes)
ms.IntentCount--
ms.IntentAge -= origAgeSeconds
}
}
// If new version isn't a deletion tombstone, add it to live counters.
if !meta.Deleted {
ms.LiveBytes += meta.KeyBytes + meta.ValBytes + metaKeySize + metaValSize
ms.LiveCount++
}
ms.KeyBytes += meta.KeyBytes + metaKeySize
ms.ValBytes += meta.ValBytes + metaValSize
ms.KeyCount++
ms.ValCount++
if meta.Txn != nil {
ms.IntentBytes += meta.KeyBytes + meta.ValBytes
ms.IntentCount++
}
}
// updateStatsOnResolve updates stat counters with the difference
// between the original and new metadata sizes. The size of the
// resolved value (key & bytes) are subtracted from the intents
// counters if commit=true.
func updateStatsOnResolve(ms *proto.MVCCStats, key proto.Key, origMetaKeySize, origMetaValSize,
metaKeySize, metaValSize int64, meta *proto.MVCCMetadata, commit bool, origAgeSeconds int64) {
if !updateStatsForKey(ms, key) {
return
}
// We're pushing or committing an intent; update counts with
// difference in bytes between old metadata and new.
keyDiff := metaKeySize - origMetaKeySize
valDiff := metaValSize - origMetaValSize
if !meta.Deleted {
ms.LiveBytes += keyDiff + valDiff
} else {
ms.GCBytesAge += MVCCComputeGCBytesAge(keyDiff+valDiff, origAgeSeconds)
}
ms.KeyBytes += keyDiff
ms.ValBytes += valDiff
// If committing, subtract out intent counts.
if commit {
ms.IntentBytes -= (meta.KeyBytes + meta.ValBytes)
ms.IntentCount--
ms.IntentAge -= origAgeSeconds
}
}
// updateStatsOnAbort updates stat counters by subtracting an
// aborted value's key and value byte sizes. If an earlier version
// was restored, the restored values are added to live bytes and
// count if the restored value isn't a deletion tombstone.
func updateStatsOnAbort(ms *proto.MVCCStats, key proto.Key, origMetaKeySize, origMetaValSize,
restoredMetaKeySize, restoredMetaValSize int64, orig, restored *proto.MVCCMetadata,
origAgeSeconds, restoredAgeSeconds int64) {
if !updateStatsForKey(ms, key) {
return
}
origTotalBytes := orig.KeyBytes + orig.ValBytes + origMetaKeySize + origMetaValSize
if !orig.Deleted {
ms.LiveBytes -= origTotalBytes
ms.LiveCount--
} else {
// Remove the bytes from previously deleted intent from the GC'able bytes age stat.
ms.GCBytesAge -= MVCCComputeGCBytesAge(origTotalBytes, origAgeSeconds)
}
ms.KeyBytes -= (orig.KeyBytes + origMetaKeySize)
ms.ValBytes -= (orig.ValBytes + origMetaValSize)
ms.KeyCount--
ms.ValCount--
ms.IntentBytes -= (orig.KeyBytes + orig.ValBytes)
ms.IntentCount--
ms.IntentAge -= origAgeSeconds
// If restored version isn't a deletion tombstone, add it to live counters.
if restored != nil {
if !restored.Deleted {
ms.LiveBytes += restored.KeyBytes + restored.ValBytes + restoredMetaKeySize + restoredMetaValSize
ms.LiveCount++
// Also, remove the bytes from previously overwritten value from the GC'able bytes age stat.
ms.GCBytesAge -= MVCCComputeGCBytesAge(restored.KeyBytes+restored.ValBytes, restoredAgeSeconds)
} else {
// Add back in the meta key/value bytes to GC'able bytes age stat.
ms.GCBytesAge += MVCCComputeGCBytesAge(restoredMetaKeySize+restoredMetaValSize, restoredAgeSeconds)
}
ms.KeyBytes += restoredMetaKeySize
ms.ValBytes += restoredMetaValSize
ms.KeyCount++
if restored.Txn != nil {
panic("restored version should never be an intent")
}
}
}
// updateStatsOnGC updates stat counters after garbage collection
// by subtracting key and value byte counts, updating key and
// value counts, and updating the GC'able bytes age. If meta is
// not nil, then the value being GC'd is the mvcc metadata and we
// decrement the key count.
func updateStatsOnGC(ms *proto.MVCCStats, key proto.Key, keySize, valSize int64, meta *proto.MVCCMetadata, ageSeconds int64) {
if !updateStatsForKey(ms, key) {
return
}
ms.KeyBytes -= keySize
ms.ValBytes -= valSize
if meta != nil {
ms.KeyCount--
} else {
ms.ValCount--
}
ms.GCBytesAge -= MVCCComputeGCBytesAge(keySize+valSize, ageSeconds)
}
// MVCCComputeGCBytesAge comptues the value to assign to the specified
// number of bytes, at the given age (in seconds).
func MVCCComputeGCBytesAge(bytes, ageSeconds int64) int64 {
return bytes * ageSeconds
}
// MVCCGetRangeStat returns the value for the specified range stat, by
// Raft ID and stat name.
func MVCCGetRangeStat(engine Engine, raftID int64, stat proto.Key) (int64, error) {
val, err := MVCCGet(engine, RangeStatKey(raftID, stat), proto.ZeroTimestamp, true, nil)
if err != nil || val == nil {
return 0, err
}
return val.GetInteger(), nil
}
// MVCCSetRangeStat sets the value for the specified range stat, by
// Raft ID and stat name.
func MVCCSetRangeStat(engine Engine, raftID int64, stat proto.Key, statVal int64) error {
value := proto.Value{Integer: gogoproto.Int64(statVal)}
if err := MVCCPut(engine, nil, RangeStatKey(raftID, stat), proto.ZeroTimestamp, value, nil); err != nil {
return err
}
return nil
}
// MVCCMergeRangeStat flushes the specified stat to merge counters via
// the provided engine instance.
func MVCCMergeRangeStat(engine Engine, raftID int64, stat proto.Key, statVal int64) error {
if statVal == 0 {
return nil
}
value := proto.Value{Integer: gogoproto.Int64(statVal)}
if err := MVCCMerge(engine, nil, RangeStatKey(raftID, stat), value); err != nil {
return err
}
return nil
}
// MVCCGetRangeSize returns the size of the range, equal to the sum of
// the key and value stats.
func MVCCGetRangeSize(engine Engine, raftID int64) (int64, error) {
keyBytes, err := MVCCGetRangeStat(engine, raftID, StatKeyBytes)
if err != nil {
return 0, err
}
valBytes, err := MVCCGetRangeStat(engine, raftID, StatValBytes)
if err != nil {
return 0, err
}
return keyBytes + valBytes, nil
}
// MVCCGetRangeStats reads stat counters for the specified range and
// sets the values in the supplied MVCCStats struct.
func MVCCGetRangeStats(engine Engine, raftID int64, ms *proto.MVCCStats) error {
var err error
if ms.LiveBytes, err = MVCCGetRangeStat(engine, raftID, StatLiveBytes); err != nil {
return err
}
if ms.KeyBytes, err = MVCCGetRangeStat(engine, raftID, StatKeyBytes); err != nil {
return err
}
if ms.ValBytes, err = MVCCGetRangeStat(engine, raftID, StatValBytes); err != nil {
return err
}
if ms.IntentBytes, err = MVCCGetRangeStat(engine, raftID, StatIntentBytes); err != nil {
return err
}
if ms.LiveCount, err = MVCCGetRangeStat(engine, raftID, StatLiveCount); err != nil {
return err
}
if ms.KeyCount, err = MVCCGetRangeStat(engine, raftID, StatKeyCount); err != nil {
return err
}
if ms.ValCount, err = MVCCGetRangeStat(engine, raftID, StatValCount); err != nil {
return err
}
if ms.IntentCount, err = MVCCGetRangeStat(engine, raftID, StatIntentCount); err != nil {
return err
}
if ms.IntentAge, err = MVCCGetRangeStat(engine, raftID, StatIntentAge); err != nil {
return err
}
if ms.GCBytesAge, err = MVCCGetRangeStat(engine, raftID, StatGCBytesAge); err != nil {
return err
}
if ms.LastUpdateNanos, err = MVCCGetRangeStat(engine, raftID, StatLastUpdateNanos); err != nil {
return err
}
return nil
}
// MVCCGetProto fetches the value at the specified key and unmarshals
// it using a protobuf decoder. Returns true on success or false if
// the key was not found.
func MVCCGetProto(engine Engine, key proto.Key, timestamp proto.Timestamp, consistent bool, txn *proto.Transaction, msg gogoproto.Message) (bool, error) {
value, err := MVCCGet(engine, key, timestamp, consistent, txn)
if err != nil {
return false, err
}
if value == nil || len(value.Bytes) == 0 {
return false, nil
}
if msg != nil {
if err := gogoproto.Unmarshal(value.Bytes, msg); err != nil {
return true, err
}
}
return true, nil
}
// MVCCPutProto sets the given key to the protobuf-serialized byte
// string of msg and the provided timestamp.
func MVCCPutProto(engine Engine, ms *proto.MVCCStats, key proto.Key, timestamp proto.Timestamp, txn *proto.Transaction, msg gogoproto.Message) error {
data, err := gogoproto.Marshal(msg)
if err != nil {
return err
}
value := proto.Value{Bytes: data}
value.InitChecksum(key)
return MVCCPut(engine, ms, key, timestamp, value, txn)
}
type getBuffer struct {
meta proto.MVCCMetadata
value proto.MVCCValue
key [1024]byte
}
var getBufferPool = sync.Pool{
New: func() interface{} {
return &getBuffer{}
},
}
// MVCCGet returns the value for the key specified in the request,
// while satisfying the given timestamp condition. The key may contain
// arbitrary bytes. If no value for the key exists, or it has been
// deleted, returns nil for value.
//
// The values of multiple versions for the given key should
// be organized as follows:
// ...
// keyA : MVCCMetadata of keyA
// keyA_Timestamp_n : value of version_n
// keyA_Timestamp_n-1 : value of version_n-1
// ...
// keyA_Timestamp_0 : value of version_0
// keyB : MVCCMetadata of keyB
// ...
//
// The consistent parameter indicates that intents should cause
// WriteIntentErrors. If set to false, intents are ignored; keys with
// an intent but no earlier committed versions, will be skipped.
func MVCCGet(engine Engine, key proto.Key, timestamp proto.Timestamp, consistent bool, txn *proto.Transaction) (*proto.Value, error) {
if len(key) == 0 {
return nil, emptyKeyError()
}
// Create a function which scans for the first key between start and end keys.
getValue := func(engine Engine, start, end proto.EncodedKey,
msg gogoproto.Message) (proto.EncodedKey, error) {
iter := engine.NewIterator()
defer iter.Close()
iter.Seek(start)
if !iter.Valid() {
return nil, iter.Error()
}
key := iter.Key()
if bytes.Compare(key, end) >= 0 {
return nil, iter.Error()
}
return key, iter.ValueProto(msg)
}
buf := getBufferPool.Get().(*getBuffer)
defer getBufferPool.Put(buf)
metaKey := mvccEncodeKey(buf.key[0:0], key)
ok, _, _, err := engine.GetProto(metaKey, &buf.meta)
if err != nil || !ok {
return nil, err
}
return mvccGetInternal(engine, key, metaKey, timestamp, consistent, txn, getValue, buf)
}
// getEarlierFunc fetches an earlier version of a key starting at
// start and ending at end. Returns the value as a byte slice, the
// timestamp of the earlier version, a boolean indicating whether a
// version value or metadata was found, and error, if applicable.
type getValueFunc func(engine Engine, start, end proto.EncodedKey,
msg gogoproto.Message) (proto.EncodedKey, error)
// mvccGetInternal parses the MVCCMetadata from the specified raw key
// value, and reads the versioned value indicated by timestamp, taking
// the transaction txn into account. getValue is a helper function to
// get an earlier version of the value when doing historical reads.
//
// The consistent parameter specifies whether reads should ignore any
// pending write intents and read the most recent _committed_ value
// instead. In the event that an inconsistent read does encounter
// intents, the intent is returned via a WriteIntentError, in addition
// to the result.
func mvccGetInternal(engine Engine, key proto.Key, metaKey proto.EncodedKey, timestamp proto.Timestamp,
consistent bool, txn *proto.Transaction, getValue getValueFunc, buf *getBuffer) (*proto.Value, error) {
if !consistent && txn != nil {
return nil, util.Errorf("cannot allow inconsistent reads within a transaction")
}
var err error
meta := &buf.meta
// If value is inline, return immediately; txn & timestamp are irrelevant.
if meta.IsInline() {
if err := meta.Value.Verify(key); err != nil {
return nil, err
}
return meta.Value, nil
}
// If we're doing inconsistent reads and there's an intent, we
// ignore the intent by insisting that the timestamp we're reading
// at is a historical timestamp < the intent timestamp. However, we
// return a write intent error so that the intents can be resolved.
var wiErr error
if !consistent && meta.Txn != nil && !timestamp.Less(meta.Timestamp) {
timestamp = meta.Timestamp.Prev()
wiErr = &proto.WriteIntentError{Intents: []proto.WriteIntentError_Intent{{Key: key, Txn: *meta.Txn}}}
}
var valueKey proto.EncodedKey
value := &buf.value
// First case: Our read timestamp is ahead of the latest write, or the
// latest write and current read are within the same transaction.
if !timestamp.Less(meta.Timestamp) ||
(meta.Txn != nil && txn != nil && bytes.Equal(meta.Txn.ID, txn.ID)) {
if meta.Txn != nil && (txn == nil || !bytes.Equal(meta.Txn.ID, txn.ID)) {
// Trying to read the last value, but it's another transaction's
// intent; the reader will have to act on this.
return nil, &proto.WriteIntentError{Intents: []proto.WriteIntentError_Intent{{Key: key, Txn: *meta.Txn}}}
}
latestKey := mvccEncodeTimestamp(metaKey, meta.Timestamp)
// Check for case where we're reading our own txn's intent
// but it's got a different epoch. This can happen if the
// txn was restarted and an earlier iteration wrote the value
// we're now reading. In this case, we skip the intent.
if meta.Txn != nil && txn.Epoch != meta.Txn.Epoch {
valueKey, err = getValue(engine, latestKey.Next(), MVCCEncodeKey(key.Next()), value)
} else {
var ok bool
ok, _, _, err = engine.GetProto(latestKey, value)
if ok {
valueKey = latestKey
}
}
} else if txn != nil && timestamp.Less(txn.MaxTimestamp) {
// In this branch, the latest timestamp is ahead, and so the read of an
// "old" value in a transactional context at time (timestamp, MaxTimestamp]
// occurs, leading to a clock uncertainty error if a version exists in
// that time interval.
if !txn.MaxTimestamp.Less(meta.Timestamp) {
// Second case: Our read timestamp is behind the latest write, but the
// latest write could possibly have happened before our read in
// absolute time if the writer had a fast clock.
// The reader should try again with a later timestamp than the
// one given below.
return nil, &proto.ReadWithinUncertaintyIntervalError{
Timestamp: timestamp,
ExistingTimestamp: meta.Timestamp,
}
}
// We want to know if anything has been written ahead of timestamp, but
// before MaxTimestamp.
nextKey := MVCCEncodeVersionKey(key, txn.MaxTimestamp)
valueKey, err = getValue(engine, nextKey, MVCCEncodeKey(key.Next()), value)
if err == nil && valueKey != nil {
_, ts, _ := MVCCDecodeKey(valueKey)
if timestamp.Less(ts) {
// Third case: Our read timestamp is sufficiently behind the newest
// value, but there is another previous write with the same issues
// as in the second case, so the reader will have to come again
// with a higher read timestamp.
return nil, &proto.ReadWithinUncertaintyIntervalError{
Timestamp: timestamp,
ExistingTimestamp: ts,
}
}
}
// Fourth case: There's no value in our future up to MaxTimestamp, and
// those are the only ones that we're not certain about. The correct
// key has already been read above, so there's nothing left to do.
} else {
// Fifth case: We're reading a historic value either outside of
// a transaction, or in the absence of future versions that clock
// uncertainty would apply to.
nextKey := MVCCEncodeVersionKey(key, timestamp)
valueKey, err = getValue(engine, nextKey, MVCCEncodeKey(key.Next()), value)
}
if err != nil {
return nil, err
} else if valueKey == nil {
return nil, wiErr
}
_, ts, isValue := MVCCDecodeKey(valueKey)
if !isValue {
return nil, util.Errorf("expected scan to versioned value reading key %q; got %q", key, valueKey)
}
if value.Deleted {
value.Value = nil
}
// Set the timestamp if the value is not nil (i.e. not a deletion tombstone).
if value.Value != nil {
value.Value.Timestamp = &ts
if err := value.Value.Verify(key); err != nil {
return nil, err
}
} else if !value.Deleted {
// Sanity check.
panic(fmt.Sprintf("encountered MVCC value at key %q with a nil proto.Value but with !Deleted: %+v", key, value))
}
return value.Value, wiErr
}
// putBuffer holds pointer data needed by mvccPutInternal. Bundling
// this data into a single structure reduces memory
// allocations. Managing this temporary buffer using a sync.Pool
// completely eliminates allocation from the put common path.
type putBuffer struct {
meta proto.MVCCMetadata
newMeta proto.MVCCMetadata
value proto.MVCCValue
pvalue proto.Value
key [1024]byte
}
var putBufferPool = sync.Pool{
New: func() interface{} {
return &putBuffer{}
},
}
// MVCCPut sets the value for a specified key. It will save the value
// with different versions according to its timestamp and update the
// key metadata. We assume the range will check for an existing write
// intent before executing any Put action at the MVCC level.
//
// If the timestamp is specifed as proto.ZeroTimestamp, the value is
// inlined instead of being written as a timestamp-versioned value. A
// zero timestamp write to a key precludes a subsequent write using a
// non-zero timestamp and vice versa. Inlined values require only a
// single row and never accumulate more than a single value. Successive
// zero timestamp writes to a key replace the value and deletes clear
// the value. In addition, zero timestamp values may be merged.
func MVCCPut(engine Engine, ms *proto.MVCCStats, key proto.Key, timestamp proto.Timestamp,
value proto.Value, txn *proto.Transaction) error {
if value.Timestamp != nil && !value.Timestamp.Equal(timestamp) {
return util.Errorf(
"the timestamp %+v provided in value does not match the timestamp %+v in request",
value.Timestamp, timestamp)
}
buf := putBufferPool.Get().(*putBuffer)
buf.pvalue = value
buf.value.Reset()
buf.value.Value = &buf.pvalue
err := mvccPutInternal(engine, ms, key, timestamp, buf.value, txn, buf)
// Using defer would be more convenient, but it is measurably
// slower.
putBufferPool.Put(buf)
return err
}
// MVCCDelete marks the key deleted so that it will not be returned in
// future get responses.
func MVCCDelete(engine Engine, ms *proto.MVCCStats, key proto.Key, timestamp proto.Timestamp,
txn *proto.Transaction) error {
buf := putBufferPool.Get().(*putBuffer)
buf.value.Reset()
buf.value.Deleted = true
err := mvccPutInternal(engine, ms, key, timestamp, buf.value, txn, buf)
// Using defer would be more convenient, but it is measurably
// slower.
putBufferPool.Put(buf)
return err
}
// mvccPutInternal adds a new timestamped value to the specified key.
// If value is nil, creates a deletion tombstone value.
func mvccPutInternal(engine Engine, ms *proto.MVCCStats, key proto.Key, timestamp proto.Timestamp,
value proto.MVCCValue, txn *proto.Transaction, buf *putBuffer) error {
if len(key) == 0 {
return emptyKeyError()
}
if value.Value != nil && value.Value.Bytes != nil && value.Value.Integer != nil {
return util.Errorf("key %q value contains both a byte slice and an integer value: %+v", key, value)
}
meta := &buf.meta
metaKey := mvccEncodeKey(buf.key[0:0], key)
ok, origMetaKeySize, origMetaValSize, err := engine.GetProto(metaKey, meta)
if err != nil {
return err
}
origAgeSeconds := timestamp.WallTime/1E9 - meta.Timestamp.WallTime/1E9
// Verify we're not mixing inline and non-inline values.
putIsInline := timestamp.Equal(proto.ZeroTimestamp)
if ok && putIsInline != meta.IsInline() {
return util.Errorf("%q: put is inline=%t, but existing value is inline=%t",
metaKey, putIsInline, meta.IsInline())
}
if putIsInline {
var metaKeySize, metaValSize int64
if value.Deleted {
metaKeySize, metaValSize, err = 0, 0, engine.Clear(metaKey)
} else {
meta.Value = value.Value
metaKeySize, metaValSize, err = PutProto(engine, metaKey, meta)
}
updateStatsForInline(ms, key, origMetaKeySize, origMetaValSize, metaKeySize, metaValSize)
return err
}
var newMeta *proto.MVCCMetadata
// In case the key metadata exists.
if ok {
// There is an uncommitted write intent and the current Put
// operation does not come from the same transaction.
// This should not happen since range should check the existing
// write intent before executing any Put action at MVCC level.
if meta.Txn != nil && (txn == nil || !bytes.Equal(meta.Txn.ID, txn.ID)) {
return &proto.WriteIntentError{Intents: []proto.WriteIntentError_Intent{{Key: key, Txn: *meta.Txn}}}
}
// We can update the current metadata only if both the timestamp
// and epoch of the new intent are greater than or equal to
// existing. If either of these conditions doesn't hold, it's
// likely the case that an older RPC is arriving out of order.
//
// Note that if meta.Txn!=nil and txn==nil, a WriteIntentError was
// returned above.
if !timestamp.Less(meta.Timestamp) &&
(meta.Txn == nil || txn.Epoch >= meta.Txn.Epoch) {
// If this is an intent and timestamps have changed,
// need to remove old version.
if meta.Txn != nil && !timestamp.Equal(meta.Timestamp) {
versionKey := mvccEncodeTimestamp(metaKey, meta.Timestamp)
engine.Clear(versionKey)
}
newMeta = &buf.newMeta
*newMeta = proto.MVCCMetadata{Txn: txn, Timestamp: timestamp}
} else if timestamp.Less(meta.Timestamp) && meta.Txn == nil {
// If we receive a Put request to write before an already-
// committed version, send write tool old error.
return &proto.WriteTooOldError{Timestamp: timestamp, ExistingTimestamp: meta.Timestamp}
} else {
// Otherwise, it's an old write to the current transaction. Just ignore.
return nil
}
} else { // In case the key metadata does not exist yet.
// If this is a delete, do nothing!
if value.Deleted {
return nil
}
// Create key metadata.
meta = nil
newMeta = &buf.newMeta
*newMeta = proto.MVCCMetadata{Txn: txn, Timestamp: timestamp}
}
// Make sure to zero the redundant timestamp (timestamp is encoded
// into the key, so don't need it in both places).
if value.Value != nil {
value.Value.Timestamp = nil
}
// The metaKey is always the prefix of the versionKey.
versionKey := mvccEncodeTimestamp(metaKey, timestamp)
_, valueSize, err := PutProto(engine, versionKey, &buf.value)
if err != nil {
return err
}
// Write the mvcc metadata now that we have sizes for the latest versioned value.
newMeta.KeyBytes = mvccVersionTimestampSize
newMeta.ValBytes = valueSize
newMeta.Deleted = value.Deleted
metaKeySize, metaValSize, err := PutProto(engine, metaKey, newMeta)
if err != nil {
return err
}
// Update MVCC stats.
updateStatsOnPut(ms, key, origMetaKeySize, origMetaValSize, metaKeySize, metaValSize, meta, newMeta, origAgeSeconds)
return nil
}
// MVCCIncrement fetches the value for key, and assuming the value is
// an "integer" type, increments it by inc and stores the new
// value. The newly incremented value is returned.
func MVCCIncrement(engine Engine, ms *proto.MVCCStats, key proto.Key, timestamp proto.Timestamp, txn *proto.Transaction, inc int64) (int64, error) {
// Handle check for non-existence of key. In order to detect
// the potential write intent by another concurrent transaction
// with a newer timestamp, we need to use the max timestamp
// while reading.
value, err := MVCCGet(engine, key, proto.MaxTimestamp, true, txn)
if err != nil {
return 0, err
}
var int64Val int64
// If the value exists, verify it's an integer type not a byte slice.
if value != nil {
if value.Bytes != nil || value.Integer == nil {
return 0, util.Errorf("cannot increment key %q which already has a generic byte value: %+v", key, *value)
}
int64Val = value.GetInteger()
}
// Check for overflow and underflow.
if encoding.WillOverflow(int64Val, inc) {
return 0, util.Errorf("key %s with value %d incremented by %d results in overflow", key, int64Val, inc)
}
// Skip writing the value in the event the value already exists.
if inc == 0 && value != nil {
return int64Val, nil
}
r := int64Val + inc
newValue := proto.Value{Integer: gogoproto.Int64(r)}
newValue.InitChecksum(key)
return r, MVCCPut(engine, ms, key, timestamp, newValue, txn)
}
// MVCCConditionalPut sets the value for a specified key only if the
// expected value matches. If not, the return a ConditionFailedError
// containing the actual value.
func MVCCConditionalPut(engine Engine, ms *proto.MVCCStats, key proto.Key, timestamp proto.Timestamp, value proto.Value,
expValue *proto.Value, txn *proto.Transaction) error {
// Handle check for non-existence of key. In order to detect
// the potential write intent by another concurrent transaction
// with a newer timestamp, we need to use the max timestamp
// while reading.
existVal, err := MVCCGet(engine, key, proto.MaxTimestamp, true, txn)
if err != nil {
return err
}
if expValue == nil && existVal != nil {
return &proto.ConditionFailedError{
ActualValue: existVal,
}
} else if expValue != nil {
// Handle check for existence when there is no key.
if existVal == nil {
return &proto.ConditionFailedError{}
} else if expValue.Bytes != nil && !bytes.Equal(expValue.Bytes, existVal.Bytes) {
return &proto.ConditionFailedError{
ActualValue: existVal,
}
} else if expValue.Integer != nil && (existVal.Integer == nil || expValue.GetInteger() != existVal.GetInteger()) {
return &proto.ConditionFailedError{
ActualValue: existVal,
}
}
}
return MVCCPut(engine, ms, key, timestamp, value, txn)
}
// MVCCMerge implements a merge operation. Merge adds integer values,
// concatenates undifferentiated byte slice values, and efficiently
// combines time series observations if the proto.Value tag value
// indicates the value byte slice is of type _CR_TS (the internal
// cockroach time series data tag).
func MVCCMerge(engine Engine, ms *proto.MVCCStats, key proto.Key, value proto.Value) error {
if len(key) == 0 {
return emptyKeyError()
}
metaKey := MVCCEncodeKey(key)
// Encode and merge the MVCC metadata with inlined value.
meta := &proto.MVCCMetadata{Value: &value}
data, err := gogoproto.Marshal(meta)
if err != nil {
return err
}
engine.Merge(metaKey, data)
updateStatsOnMerge(ms, key, int64(len(value.Bytes)))
return nil
}
// MVCCDeleteRange deletes the range of key/value pairs specified by
// start and end keys. Specify max=0 for unbounded deletes.
func MVCCDeleteRange(engine Engine, ms *proto.MVCCStats, key, endKey proto.Key, max int64, timestamp proto.Timestamp, txn *proto.Transaction) (int64, error) {
// In order to detect the potential write intent by another
// concurrent transaction with a newer timestamp, we need
// to use the max timestamp for scan.
kvs, err := MVCCScan(engine, key, endKey, max, proto.MaxTimestamp, true, txn)
if err != nil {
return 0, err
}
num := int64(0)
for _, kv := range kvs {
err = MVCCDelete(engine, ms, kv.Key, timestamp, txn)
if err != nil {
return num, err
}
num++
}
return num, nil
}
// MVCCScan scans the key range specified by start key through end key
// up to some maximum number of results. Specify max=0 for unbounded
// scans.
func MVCCScan(engine Engine, key, endKey proto.Key, max int64, timestamp proto.Timestamp,
consistent bool, txn *proto.Transaction) ([]proto.KeyValue, error) {
res := []proto.KeyValue{}
if err := MVCCIterate(engine, key, endKey, timestamp, consistent, txn, func(kv proto.KeyValue) (bool, error) {
res = append(res, kv)
if max != 0 && max == int64(len(res)) {
return true, nil
}
return false, nil
}); err != nil {
// For inconsistent reads, return the results + the error, if the
// error is a write intent.
if _, ok := err.(*proto.WriteIntentError); !consistent && ok {
return res, err
}
return nil, err
}
return res, nil
}
// MVCCIterate iterates over the key range specified by start and end
// keys, At each step of the iteration, f() is invoked with the
// current key/value pair. If f returns true (done) or an error, the
// iteration stops and the error is propagated.
func MVCCIterate(engine Engine, key, endKey proto.Key, timestamp proto.Timestamp,
consistent bool, txn *proto.Transaction, f func(proto.KeyValue) (bool, error)) error {
if !consistent && txn != nil {
return util.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(endKey) == 0 {
return emptyKeyError()
}
buf := getBufferPool.Get().(*getBuffer)
defer getBufferPool.Put(buf)
// We store encEndKey and encKey in the same buffer to avoid memory
// allocations.
encEndKey := mvccEncodeKey(buf.key[0:0], endKey)
keyBuf := encEndKey[len(encEndKey):]
encKey := mvccEncodeKey(keyBuf, key)
// Get a new iterator and define our getEarlierFunc using iter.Seek.
iter := engine.NewIterator()
defer iter.Close()
getValue := func(engine Engine, start, end proto.EncodedKey,
msg gogoproto.Message) (proto.EncodedKey, error) {
iter.Seek(start)
if !iter.Valid() {
return nil, iter.Error()
}
key := iter.Key()
if bytes.Compare(key, end) >= 0 {
return nil, iter.Error()
}
return key, iter.ValueProto(msg)
}
// A cumulative write intent error to gather all write intents.
var wiErr error
for {
iter.Seek(encKey)
if !iter.Valid() {
if err := iter.Error(); err != nil {
return err
}
return wiErr
}
metaKey := iter.Key()
if bytes.Compare(metaKey, encEndKey) >= 0 {
if err := iter.Error(); err != nil {
return err
}
return wiErr
}
key, _, isValue := MVCCDecodeKey(metaKey)
if isValue {
return util.Errorf("expected an MVCC metadata key: %q", metaKey)
}
if err := iter.ValueProto(&buf.meta); err != nil {
return err
}
value, err := mvccGetInternal(engine, key, metaKey, timestamp, consistent, txn, getValue, buf)
if err != nil {
switch t := err.(type) {
case *proto.WriteIntentError:
// In the case of WriteIntentErrors, accumulate affected keys but continue scan.
if wiErr == nil {
wiErr = t
} else {
wiErr.(*proto.WriteIntentError).Intents = append(wiErr.(*proto.WriteIntentError).Intents, t.Intents...)
}
default:
return err
}
}
if value != nil {
done, err := f(proto.KeyValue{Key: key, Value: *value})
if done || err != nil {
if err != nil {
return err
}