From d9a10ea91febc3b6decf3d2616448dc12b883fbc Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 20 Nov 2015 20:35:15 -0500 Subject: [PATCH] GC Txn entries through GC queue see #2062. on each run of the GC queue for a given range, the transaction and sequence prefixes are scanned and the following actions taken: * old pending transactions are pushed (which will succeed), effectively aborting them * old aborted transactions are added to the GC request. * aborted and committed transactions will have the intents referenced in their record resolved synchronously and are GCed (on success) * sequence cache entries which are "old" and belong to "old" (or nonexistent) transactions are deleted. --- roachpb/data.go | 13 +- roachpb/data.pb.go | 33 +-- roachpb/data.proto | 5 +- .../rocksdb/cockroach/roachpb/data.pb.cc | 117 ++-------- .../rocksdb/cockroach/roachpb/data.pb.h | 34 --- storage/gc_queue.go | 210 ++++++++++++++---- storage/gc_queue_test.go | 138 +++++++++++- storage/replica.go | 14 +- storage/replica_command.go | 16 +- storage/sequence_cache.go | 27 +++ 10 files changed, 372 insertions(+), 235 deletions(-) diff --git a/roachpb/data.go b/roachpb/data.go index 8612a160612b..302e718d8171 100644 --- a/roachpb/data.go +++ b/roachpb/data.go @@ -666,8 +666,7 @@ func (t Transaction) Short() string { // nanoseconds since the Unix epoch. func NewGCMetadata(nowNanos int64) *GCMetadata { return &GCMetadata{ - LastScanNanos: nowNanos, - OldestIntentNanos: proto.Int64(nowNanos), + LastScanNanos: nowNanos, } } @@ -713,6 +712,16 @@ func (l Lease) OwnedBy(storeID StoreID) bool { return l.Replica.StoreID == storeID } +// AsIntents takes a slice of spans and returns it as a slice of intents for +// the given transaction. +func AsIntents(spans []Span, txn *Transaction) []Intent { + ret := make([]Intent, len(spans)) + for i := range spans { + ret[i].Span, ret[i].Txn = spans[i], *txn + } + return ret +} + // RSpan is a key range with an inclusive start RKey and an exclusive end RKey. type RSpan struct { Key, EndKey RKey diff --git a/roachpb/data.pb.go b/roachpb/data.pb.go index c573f9d33f52..fc7e9c674e91 100644 --- a/roachpb/data.pb.go +++ b/roachpb/data.pb.go @@ -465,12 +465,11 @@ func (*Lease) ProtoMessage() {} // GCMetadata holds information about the last complete key/value // garbage collection scan of a range. +// TODO(tschottdorf): can avoid an extra message unless we're planning +// to add more content. type GCMetadata struct { // The last GC scan timestamp in nanoseconds since the Unix epoch. LastScanNanos int64 `protobuf:"varint,1,opt,name=last_scan_nanos" json:"last_scan_nanos"` - // The oldest unresolved write intent in nanoseconds since epoch. - // Null if there are no unresolved write intents. - OldestIntentNanos *int64 `protobuf:"varint,2,opt,name=oldest_intent_nanos" json:"oldest_intent_nanos,omitempty"` } func (m *GCMetadata) Reset() { *m = GCMetadata{} } @@ -1101,11 +1100,6 @@ func (m *GCMetadata) MarshalTo(data []byte) (int, error) { data[i] = 0x8 i++ i = encodeVarintData(data, i, uint64(m.LastScanNanos)) - if m.OldestIntentNanos != nil { - data[i] = 0x10 - i++ - i = encodeVarintData(data, i, uint64(*m.OldestIntentNanos)) - } return i, nil } @@ -1373,9 +1367,6 @@ func (m *GCMetadata) Size() (n int) { var l int _ = l n += 1 + sovData(uint64(m.LastScanNanos)) - if m.OldestIntentNanos != nil { - n += 1 + sovData(uint64(*m.OldestIntentNanos)) - } return n } @@ -3421,26 +3412,6 @@ func (m *GCMetadata) Unmarshal(data []byte) error { break } } - case 2: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field OldestIntentNanos", wireType) - } - var v int64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowData - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - v |= (int64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - m.OldestIntentNanos = &v default: iNdEx = preIndex skippy, err := skipData(data[iNdEx:]) diff --git a/roachpb/data.proto b/roachpb/data.proto index 8764224a0ed7..105233bd19a1 100644 --- a/roachpb/data.proto +++ b/roachpb/data.proto @@ -295,12 +295,11 @@ message Lease { // GCMetadata holds information about the last complete key/value // garbage collection scan of a range. +// TODO(tschottdorf): can avoid an extra message unless we're planning +// to add more content. message GCMetadata { // The last GC scan timestamp in nanoseconds since the Unix epoch. optional int64 last_scan_nanos = 1 [(gogoproto.nullable) = false]; - // The oldest unresolved write intent in nanoseconds since epoch. - // Null if there are no unresolved write intents. - optional int64 oldest_intent_nanos = 2; } // SequenceCacheEntry holds information which together with the key at which diff --git a/storage/engine/rocksdb/cockroach/roachpb/data.pb.cc b/storage/engine/rocksdb/cockroach/roachpb/data.pb.cc index 23fc48956846..6641bea03369 100644 --- a/storage/engine/rocksdb/cockroach/roachpb/data.pb.cc +++ b/storage/engine/rocksdb/cockroach/roachpb/data.pb.cc @@ -327,9 +327,8 @@ void protobuf_AssignDesc_cockroach_2froachpb_2fdata_2eproto() { GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Lease, _internal_metadata_), -1); GCMetadata_descriptor_ = file->message_type(14); - static const int GCMetadata_offsets_[2] = { + static const int GCMetadata_offsets_[1] = { GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(GCMetadata, last_scan_nanos_), - GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(GCMetadata, oldest_intent_nanos_), }; GCMetadata_reflection_ = ::google::protobuf::internal::GeneratedMessageReflection::NewGeneratedMessageReflection( @@ -515,19 +514,19 @@ void protobuf_AddDesc_cockroach_2froachpb_2fdata_2eproto() { "\034.cockroach.roachpb.TimestampB\004\310\336\037\000\0226\n\ne" "xpiration\030\002 \001(\0132\034.cockroach.roachpb.Time" "stampB\004\310\336\037\000\022;\n\007replica\030\003 \001(\0132$.cockroach" - ".roachpb.ReplicaDescriptorB\004\310\336\037\000:\004\230\240\037\000\"H" + ".roachpb.ReplicaDescriptorB\004\310\336\037\000:\004\230\240\037\000\"+" "\n\nGCMetadata\022\035\n\017last_scan_nanos\030\001 \001(\003B\004\310" - "\336\037\000\022\033\n\023oldest_intent_nanos\030\002 \001(\003\"a\n\022Sequ" - "enceCacheEntry\022\024\n\003key\030\001 \001(\014B\007\372\336\037\003Key\0225\n\t" - "timestamp\030\002 \001(\0132\034.cockroach.roachpb.Time" - "stampB\004\310\336\037\000*Q\n\tValueType\022\013\n\007UNKNOWN\020\000\022\007\n" - "\003INT\020\001\022\t\n\005FLOAT\020\002\022\t\n\005BYTES\020\003\022\010\n\004TIME\020\004\022\016" - "\n\nTIMESERIES\020d*>\n\021ReplicaChangeType\022\017\n\013A" - "DD_REPLICA\020\000\022\022\n\016REMOVE_REPLICA\020\001\032\004\210\243\036\000*5" - "\n\rIsolationType\022\020\n\014SERIALIZABLE\020\000\022\014\n\010SNA" - "PSHOT\020\001\032\004\210\243\036\000*B\n\021TransactionStatus\022\013\n\007PE" - "NDING\020\000\022\r\n\tCOMMITTED\020\001\022\013\n\007ABORTED\020\002\032\004\210\243\036" - "\000B\035Z\007roachpb\310\341\036\000\220\343\036\000\310\342\036\001\340\342\036\001\320\342\036\001X\001", 2954); + "\336\037\000\"a\n\022SequenceCacheEntry\022\024\n\003key\030\001 \001(\014B\007" + "\372\336\037\003Key\0225\n\ttimestamp\030\002 \001(\0132\034.cockroach.r" + "oachpb.TimestampB\004\310\336\037\000*Q\n\tValueType\022\013\n\007U" + "NKNOWN\020\000\022\007\n\003INT\020\001\022\t\n\005FLOAT\020\002\022\t\n\005BYTES\020\003\022" + "\010\n\004TIME\020\004\022\016\n\nTIMESERIES\020d*>\n\021ReplicaChan" + "geType\022\017\n\013ADD_REPLICA\020\000\022\022\n\016REMOVE_REPLIC" + "A\020\001\032\004\210\243\036\000*5\n\rIsolationType\022\020\n\014SERIALIZAB" + "LE\020\000\022\014\n\010SNAPSHOT\020\001\032\004\210\243\036\000*B\n\021TransactionS" + "tatus\022\013\n\007PENDING\020\000\022\r\n\tCOMMITTED\020\001\022\013\n\007ABO" + "RTED\020\002\032\004\210\243\036\000B\035Z\007roachpb\310\341\036\000\220\343\036\000\310\342\036\001\340\342\036\001\320" + "\342\036\001X\001", 2925); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "cockroach/roachpb/data.proto", &protobuf_RegisterTypes); Span::default_instance_ = new Span(); @@ -7305,7 +7304,6 @@ void Lease::clear_replica() { #ifndef _MSC_VER const int GCMetadata::kLastScanNanosFieldNumber; -const int GCMetadata::kOldestIntentNanosFieldNumber; #endif // !_MSC_VER GCMetadata::GCMetadata() @@ -7328,7 +7326,6 @@ GCMetadata::GCMetadata(const GCMetadata& from) void GCMetadata::SharedCtor() { _cached_size_ = 0; last_scan_nanos_ = GOOGLE_LONGLONG(0); - oldest_intent_nanos_ = GOOGLE_LONGLONG(0); ::memset(_has_bits_, 0, sizeof(_has_bits_)); } @@ -7368,19 +7365,7 @@ GCMetadata* GCMetadata::New(::google::protobuf::Arena* arena) const { } void GCMetadata::Clear() { -#define ZR_HELPER_(f) reinterpret_cast(\ - &reinterpret_cast(16)->f) - -#define ZR_(first, last) do {\ - ::memset(&first, 0,\ - ZR_HELPER_(last) - ZR_HELPER_(first) + sizeof(last));\ -} while (0) - - ZR_(last_scan_nanos_, oldest_intent_nanos_); - -#undef ZR_HELPER_ -#undef ZR_ - + last_scan_nanos_ = GOOGLE_LONGLONG(0); ::memset(_has_bits_, 0, sizeof(_has_bits_)); if (_internal_metadata_.have_unknown_fields()) { mutable_unknown_fields()->Clear(); @@ -7407,21 +7392,6 @@ bool GCMetadata::MergePartialFromCodedStream( } else { goto handle_unusual; } - if (input->ExpectTag(16)) goto parse_oldest_intent_nanos; - break; - } - - // optional int64 oldest_intent_nanos = 2; - case 2: { - if (tag == 16) { - parse_oldest_intent_nanos: - DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< - ::google::protobuf::int64, ::google::protobuf::internal::WireFormatLite::TYPE_INT64>( - input, &oldest_intent_nanos_))); - set_has_oldest_intent_nanos(); - } else { - goto handle_unusual; - } if (input->ExpectAtEnd()) goto success; break; } @@ -7456,11 +7426,6 @@ void GCMetadata::SerializeWithCachedSizes( ::google::protobuf::internal::WireFormatLite::WriteInt64(1, this->last_scan_nanos(), output); } - // optional int64 oldest_intent_nanos = 2; - if (has_oldest_intent_nanos()) { - ::google::protobuf::internal::WireFormatLite::WriteInt64(2, this->oldest_intent_nanos(), output); - } - if (_internal_metadata_.have_unknown_fields()) { ::google::protobuf::internal::WireFormat::SerializeUnknownFields( unknown_fields(), output); @@ -7476,11 +7441,6 @@ ::google::protobuf::uint8* GCMetadata::SerializeWithCachedSizesToArray( target = ::google::protobuf::internal::WireFormatLite::WriteInt64ToArray(1, this->last_scan_nanos(), target); } - // optional int64 oldest_intent_nanos = 2; - if (has_oldest_intent_nanos()) { - target = ::google::protobuf::internal::WireFormatLite::WriteInt64ToArray(2, this->oldest_intent_nanos(), target); - } - if (_internal_metadata_.have_unknown_fields()) { target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray( unknown_fields(), target); @@ -7492,22 +7452,13 @@ ::google::protobuf::uint8* GCMetadata::SerializeWithCachedSizesToArray( int GCMetadata::ByteSize() const { int total_size = 0; - if (_has_bits_[0 / 32] & 3) { - // optional int64 last_scan_nanos = 1; - if (has_last_scan_nanos()) { - total_size += 1 + - ::google::protobuf::internal::WireFormatLite::Int64Size( - this->last_scan_nanos()); - } - - // optional int64 oldest_intent_nanos = 2; - if (has_oldest_intent_nanos()) { - total_size += 1 + - ::google::protobuf::internal::WireFormatLite::Int64Size( - this->oldest_intent_nanos()); - } - + // optional int64 last_scan_nanos = 1; + if (has_last_scan_nanos()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::Int64Size( + this->last_scan_nanos()); } + if (_internal_metadata_.have_unknown_fields()) { total_size += ::google::protobuf::internal::WireFormat::ComputeUnknownFieldsSize( @@ -7537,9 +7488,6 @@ void GCMetadata::MergeFrom(const GCMetadata& from) { if (from.has_last_scan_nanos()) { set_last_scan_nanos(from.last_scan_nanos()); } - if (from.has_oldest_intent_nanos()) { - set_oldest_intent_nanos(from.oldest_intent_nanos()); - } } if (from._internal_metadata_.have_unknown_fields()) { mutable_unknown_fields()->MergeFrom(from.unknown_fields()); @@ -7569,7 +7517,6 @@ void GCMetadata::Swap(GCMetadata* other) { } void GCMetadata::InternalSwap(GCMetadata* other) { std::swap(last_scan_nanos_, other->last_scan_nanos_); - std::swap(oldest_intent_nanos_, other->oldest_intent_nanos_); std::swap(_has_bits_[0], other->_has_bits_[0]); _internal_metadata_.Swap(&other->_internal_metadata_); std::swap(_cached_size_, other->_cached_size_); @@ -7610,30 +7557,6 @@ void GCMetadata::clear_last_scan_nanos() { // @@protoc_insertion_point(field_set:cockroach.roachpb.GCMetadata.last_scan_nanos) } -// optional int64 oldest_intent_nanos = 2; -bool GCMetadata::has_oldest_intent_nanos() const { - return (_has_bits_[0] & 0x00000002u) != 0; -} -void GCMetadata::set_has_oldest_intent_nanos() { - _has_bits_[0] |= 0x00000002u; -} -void GCMetadata::clear_has_oldest_intent_nanos() { - _has_bits_[0] &= ~0x00000002u; -} -void GCMetadata::clear_oldest_intent_nanos() { - oldest_intent_nanos_ = GOOGLE_LONGLONG(0); - clear_has_oldest_intent_nanos(); -} - ::google::protobuf::int64 GCMetadata::oldest_intent_nanos() const { - // @@protoc_insertion_point(field_get:cockroach.roachpb.GCMetadata.oldest_intent_nanos) - return oldest_intent_nanos_; -} - void GCMetadata::set_oldest_intent_nanos(::google::protobuf::int64 value) { - set_has_oldest_intent_nanos(); - oldest_intent_nanos_ = value; - // @@protoc_insertion_point(field_set:cockroach.roachpb.GCMetadata.oldest_intent_nanos) -} - #endif // PROTOBUF_INLINE_NOT_IN_HEADERS // =================================================================== diff --git a/storage/engine/rocksdb/cockroach/roachpb/data.pb.h b/storage/engine/rocksdb/cockroach/roachpb/data.pb.h index 0715d15300eb..33d3e45939f7 100644 --- a/storage/engine/rocksdb/cockroach/roachpb/data.pb.h +++ b/storage/engine/rocksdb/cockroach/roachpb/data.pb.h @@ -1877,25 +1877,15 @@ class GCMetadata : public ::google::protobuf::Message { ::google::protobuf::int64 last_scan_nanos() const; void set_last_scan_nanos(::google::protobuf::int64 value); - // optional int64 oldest_intent_nanos = 2; - bool has_oldest_intent_nanos() const; - void clear_oldest_intent_nanos(); - static const int kOldestIntentNanosFieldNumber = 2; - ::google::protobuf::int64 oldest_intent_nanos() const; - void set_oldest_intent_nanos(::google::protobuf::int64 value); - // @@protoc_insertion_point(class_scope:cockroach.roachpb.GCMetadata) private: inline void set_has_last_scan_nanos(); inline void clear_has_last_scan_nanos(); - inline void set_has_oldest_intent_nanos(); - inline void clear_has_oldest_intent_nanos(); ::google::protobuf::internal::InternalMetadataWithArena _internal_metadata_; ::google::protobuf::uint32 _has_bits_[1]; mutable int _cached_size_; ::google::protobuf::int64 last_scan_nanos_; - ::google::protobuf::int64 oldest_intent_nanos_; friend void protobuf_AddDesc_cockroach_2froachpb_2fdata_2eproto(); friend void protobuf_AssignDesc_cockroach_2froachpb_2fdata_2eproto(); friend void protobuf_ShutdownFile_cockroach_2froachpb_2fdata_2eproto(); @@ -3859,30 +3849,6 @@ inline void GCMetadata::set_last_scan_nanos(::google::protobuf::int64 value) { // @@protoc_insertion_point(field_set:cockroach.roachpb.GCMetadata.last_scan_nanos) } -// optional int64 oldest_intent_nanos = 2; -inline bool GCMetadata::has_oldest_intent_nanos() const { - return (_has_bits_[0] & 0x00000002u) != 0; -} -inline void GCMetadata::set_has_oldest_intent_nanos() { - _has_bits_[0] |= 0x00000002u; -} -inline void GCMetadata::clear_has_oldest_intent_nanos() { - _has_bits_[0] &= ~0x00000002u; -} -inline void GCMetadata::clear_oldest_intent_nanos() { - oldest_intent_nanos_ = GOOGLE_LONGLONG(0); - clear_has_oldest_intent_nanos(); -} -inline ::google::protobuf::int64 GCMetadata::oldest_intent_nanos() const { - // @@protoc_insertion_point(field_get:cockroach.roachpb.GCMetadata.oldest_intent_nanos) - return oldest_intent_nanos_; -} -inline void GCMetadata::set_oldest_intent_nanos(::google::protobuf::int64 value) { - set_has_oldest_intent_nanos(); - oldest_intent_nanos_ = value; - // @@protoc_insertion_point(field_set:cockroach.roachpb.GCMetadata.oldest_intent_nanos) -} - // ------------------------------------------------------------------- // SequenceCacheEntry diff --git a/storage/gc_queue.go b/storage/gc_queue.go index fb13ced7bcd7..28e7cb375b5e 100644 --- a/storage/gc_queue.go +++ b/storage/gc_queue.go @@ -19,13 +19,13 @@ package storage import ( "fmt" - "math" "sync" "time" "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" + "github.com/cockroachdb/cockroach/keys" "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/storage/engine" "github.com/cockroachdb/cockroach/util/log" @@ -46,6 +46,12 @@ const ( // intentAgeThreshold is the threshold after which an extant intent // will be resolved. intentAgeThreshold = 2 * time.Hour // 2 hour + // txnCleanupThreshold is the threshold after which a transaction is + // considered abandoned and fit for removal, as measured by the maximum + // of its last heartbeat and timestamp. + // TODO(tschottdorf): need to enforce at all times that this is much + // larger than the heartbeat interval used by the coordinator. + txnCleanupThreshold = time.Hour ) // gcQueue manages a queue of replicas slated to be scanned in their @@ -54,8 +60,10 @@ const ( // // - GC of version data via TTL expiration (and more complex schemes // as implemented going forward). -// - Resolve extant write intents and determine oldest non-resolvable -// intent. +// - Resolve extant write intents (pushing their transactions). +// - GC of old transaction and sequence cache entries. This should include +// most committed entries almost immediately and, after a threshold on +// inactivity, all others. // // The shouldQueue function combines the need for both tasks into a // single priority. If any task is overdue, shouldQueue returns true. @@ -139,14 +147,17 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, // Compute intent expiration (intent age at which we attempt to resolve). intentExp := now intentExp.WallTime -= intentAgeThreshold.Nanoseconds() + txnExp := now + txnExp.WallTime -= txnCleanupThreshold.Nanoseconds() - // TODO(tschottdorf): execution will use a leader-assigned local - // timestamp to compute intent age. While this should be fine, could - // consider adding a Now timestamp to GCRequest which would be used - // instead. gcArgs := &roachpb.GCRequest{} - var mu sync.Mutex - var oldestIntentNanos int64 = math.MaxInt64 + // TODO(tschottdorf): This is one of these instances in which we want + // to be more careful that the request ends up on the correct Replica, + // and we might have to worry about mixing range-local and global keys + // in a batch which might up spanning Ranges by the time it executes. + gcArgs.Key = desc.StartKey.AsRawKey() + gcArgs.EndKey = desc.EndKey.AsRawKey() + var expBaseKey roachpb.Key var keys []engine.MVCCKey var vals [][]byte @@ -155,15 +166,6 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, txnMap := map[string]*roachpb.Transaction{} intentSpanMap := map[string][]roachpb.Span{} - // updateOldestIntent atomically updates the oldest intent. - updateOldestIntent := func(intentNanos int64) { - mu.Lock() - defer mu.Unlock() - if intentNanos < oldestIntentNanos { - oldestIntentNanos = intentNanos - } - } - // processKeysAndValues is invoked with each key and its set of // values. Intents older than the intent age threshold are sent for // resolution and values after the MVCC metadata, and possible @@ -185,8 +187,6 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, id := string(meta.Txn.ID) txnMap[id] = meta.Txn intentSpanMap[id] = append(intentSpanMap[id], roachpb.Span{Key: expBaseKey}) - } else { - updateOldestIntent(meta.Txn.OrigTimestamp.WallTime) } // With an active intent, GC ignores MVCC metadata & intent value. startIdx = 2 @@ -230,11 +230,26 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, // Handle last collected set of keys/vals. processKeysAndValues() + txnKeys, err := processTransactionTable(repl, txnMap, txnExp) + if err != nil { + return err + } + + // From now on, all newly added keys are range-local. + // TODO(tschottdorf): Might need to use two requests at some point since we + // hard-coded the full non-local key range in the header, but that does + // not take into account the range-local keys. It will be OK as long as + // we send directly to the Replica, though. + gcArgs.Keys = append(gcArgs.Keys, txnKeys...) + // Process push transactions in parallel. var wg sync.WaitGroup for _, txn := range txnMap { + if txn.Status != roachpb.PENDING { + continue + } wg.Add(1) - go gcq.pushTxn(repl, now, txn, updateOldestIntent, &wg) + go pushTxn(repl, now, txn, roachpb.ABORT_TXN, &wg) } wg.Wait() @@ -248,32 +263,21 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, } } - done := true - if len(intents) > 0 { - done = false - if err := repl.resolveIntents(repl.context(), intents, true /* wait */); err != nil { - return err - } - } - - // Set start and end keys. - if len(gcArgs.Keys) > 0 { - done = false - gcArgs.Key = gcArgs.Keys[0].Key - gcArgs.EndKey = gcArgs.Keys[len(gcArgs.Keys)-1].Key.Next() + if err := repl.resolveIntents(repl.context(), intents, true /* wait */); err != nil { + return err } - if done { - return nil - } + // Deal with any leftover sequence cache keys. There shouldn't be many of + // them. + gcArgs.Keys = append(gcArgs.Keys, processSequenceCache(repl, now, txnExp)...) // Send GC request through range. - gcMeta.OldestIntentNanos = proto.Int64(oldestIntentNanos) gcArgs.GCMeta = *gcMeta var ba roachpb.BatchRequest // Technically not needed since we're talking directly to the Range. ba.RangeID = desc.RangeID + ba.Timestamp = now ba.Add(gcArgs) if _, pErr := repl.Send(repl.context(), ba); pErr != nil { return pErr.GoError() @@ -288,17 +292,134 @@ func (gcq *gcQueue) process(now roachpb.Timestamp, repl *Replica, return nil } +// processSequenceCache iterates through the local sequence cache entries, +// pushing the transactions (in cleanup mode) for those entries which appear +// to be old enough. In case the transaction indicates that it's terminated, +// the sequence cache keys are included in the result. +func processSequenceCache(r *Replica, now, cutoff roachpb.Timestamp) []roachpb.GCRequest_GCKey { + snap := r.store.Engine().NewSnapshot() + defer snap.Close() + + txns := make(map[string]*roachpb.Transaction) + idToKeys := make(map[string][]roachpb.GCRequest_GCKey) + r.sequence.Iterate(snap, func(key, id []byte, v roachpb.SequenceCacheEntry) { + if !cutoff.Less(v.Timestamp) { + idStr := string(id) + txns[idStr] = &roachpb.Transaction{ID: id, Key: v.Key, Status: roachpb.PENDING} + idToKeys[idStr] = append(idToKeys[idStr], roachpb.GCRequest_GCKey{Key: key}) + } + }) + + var wg sync.WaitGroup + wg.Add(len(txns)) + for _, txn := range txns { + // Check if the Txn is still alive. If this indicates that the Txn is + // aborted and old enough to guarantee that any running coordinator + // would have realized that the transaction wasn't running by means + // of a heartbeat, then we're free to remove the sequence cache entry. + // In the most likely case, there isn't even an entry (which will + // be apparent by a zero timestamp and nil last heartbeat). + go pushTxn(r, now, txn, roachpb.CLEANUP_TXN, &wg) + } + wg.Wait() + + var gcKeys []roachpb.GCRequest_GCKey + for idStr, txn := range txns { + if txn.Status == roachpb.PENDING { + continue + } + ts := txn.Timestamp + if txn.LastHeartbeat != nil { + ts.Forward(*txn.LastHeartbeat) + } + if !cutoff.Less(ts) { + // This is it, we can delete our sequence cache entries. + gcKeys = append(gcKeys, idToKeys[idStr]...) + } + } + return gcKeys +} + +// processTransactionTable scans the transaction table and updates txnMap with +// those transactions which are old and either PENDING or with intents +// registered. In the first case we want to push the transaction so that it is +// aborted, and in the second case we may have to resolve the intents success- +// fully before GCing the entry. The transaction records which can be gc'ed are +// returned separately and are not added to txnMap nor intentSpanMap. +func processTransactionTable(r *Replica, txnMap map[string]*roachpb.Transaction, cutoff roachpb.Timestamp) ([]roachpb.GCRequest_GCKey, error) { + snap := r.store.Engine().NewSnapshot() + defer snap.Close() + + var gcKeys []roachpb.GCRequest_GCKey + handleOne := func(kv roachpb.KeyValue) error { + var txn roachpb.Transaction + if err := kv.Value.GetProto(&txn); err != nil { + return err + } + ts := txn.Timestamp + if heartbeatTS := txn.LastHeartbeat; heartbeatTS != nil { + ts.Forward(*heartbeatTS) + } + if !ts.Less(cutoff) { + return nil + } + + id := string(txn.ID) + + // The transaction record should be considered for removal. + switch txn.Status { + case roachpb.PENDING: + // Marked as running, so we need to push it to abort it but won't + // try to GC it in this cycle. + txnMap[id] = &txn + return nil + case roachpb.ABORTED: + // If we remove this transaction, it effectively still counts as + // ABORTED (by design). So this can be GC'ed even if we can't + // resolve the intents. + // Note: Most aborted transaction weren't aborted by their client, + // but instead by the coordinator - those will not have any intents + // persisted, though they still might exist in the system. + if err := r.resolveIntents(r.context(), + roachpb.AsIntents(txn.Intents, &txn), true /* wait */); err != nil { + log.Warningf("failed to resolve intents of aborted txn on gc: %s", err) + } + case roachpb.COMMITTED: + // It's committed, so it doesn't need a push but we can only + // GC it after its intents are resolved. + if err := r.resolveIntents(r.context(), + roachpb.AsIntents(txn.Intents, &txn), true /* wait */); err != nil { + log.Warningf("unable to resolve intents of committed txn on gc: %s", err) + // Returning the error here would abort the whole GC run, and + // we don't want that. Instead, we simply don't GC this entry. + return nil + } + default: + panic(fmt.Sprintf("invalid transaction state: %s", txn)) + } + gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: kv.Key}) // zero timestamp + return nil + } + + startKey := keys.TransactionKey(roachpb.KeyMin, nil) + endKey := keys.TransactionKey(roachpb.KeyMax, nil) + + _, err := engine.MVCCIterate(snap, startKey, endKey, roachpb.ZeroTimestamp, true /* consistent */, nil /* txn */, false /* !reverse */, func(kv roachpb.KeyValue) (bool, error) { + return false, handleOne(kv) + }) + return gcKeys, err +} + // timer returns a constant duration to space out GC processing // for successive queued replicas. func (*gcQueue) timer() time.Duration { return gcQueueTimerDuration } -// pushTxn attempts to abort the txn via push. If the transaction -// cannot be aborted, the oldestIntentNanos value is atomically -// updated to the min of oldestIntentNanos and the intent's -// timestamp. The wait group is signaled on completion. -func (*gcQueue) pushTxn(repl *Replica, now roachpb.Timestamp, txn *roachpb.Transaction, updateOldestIntent func(int64), wg *sync.WaitGroup) { +// pushTxn attempts to abort the txn via push. The wait group is signaled on +// completion. +func pushTxn(repl *Replica, now roachpb.Timestamp, txn *roachpb.Transaction, + typ roachpb.PushTxnType, wg *sync.WaitGroup) { defer wg.Done() // signal wait group always on completion if log.V(1) { log.Infof("pushing txn %s ts=%s", txn, txn.OrigTimestamp) @@ -312,14 +433,13 @@ func (*gcQueue) pushTxn(repl *Replica, now roachpb.Timestamp, txn *roachpb.Trans Now: now, PusherTxn: roachpb.Transaction{Priority: roachpb.MaxPriority}, PusheeTxn: *txn, - PushType: roachpb.ABORT_TXN, + PushType: typ, } b := &client.Batch{} b.InternalAddRequest(pushArgs) br, err := repl.store.DB().RunWithResponse(b) if err != nil { log.Warningf("push of txn %s failed: %s", txn, err) - updateOldestIntent(txn.OrigTimestamp.WallTime) return } // Update the supplied txn on successful push. diff --git a/storage/gc_queue_test.go b/storage/gc_queue_test.go index 220f25d8d9e3..6f9f7f04faa0 100644 --- a/storage/gc_queue_test.go +++ b/storage/gc_queue_test.go @@ -20,13 +20,16 @@ package storage import ( "fmt" "math" + "reflect" "testing" + "time" "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/keys" "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/storage/engine" "github.com/cockroachdb/cockroach/util" + "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/leaktest" "github.com/cockroachdb/cockroach/util/log" "github.com/gogo/protobuf/proto" @@ -300,9 +303,6 @@ func TestGCQueueProcess(t *testing.T) { if gcMeta.LastScanNanos != now { t.Errorf("expected last scan nanos=%d; got %d", now, gcMeta.LastScanNanos) } - if *gcMeta.OldestIntentNanos != ts4.WallTime { - t.Errorf("expected oldest intent nanos=%d; got %d", ts4.WallTime, gcMeta.OldestIntentNanos) - } // Verify that the last verification timestamp was updated as whole range was scanned. ts, err := tc.rng.GetLastVerificationTimestamp() @@ -314,6 +314,138 @@ func TestGCQueueProcess(t *testing.T) { } } +func TestGCQueueTransactionTable(t *testing.T) { + defer leaktest.AfterTest(t) + + const now time.Duration = 3 * 24 * time.Hour + const dAbandon = -2 * DefaultHeartbeatInterval + type spec struct { + status roachpb.TransactionStatus + ts time.Duration + heartbeatTS time.Duration + intentSpans []roachpb.Span + seqGC bool // expect sequence cache entries removed? + } + // Describes the state of the Txn table before the test. + before := map[string]spec{ + // Too young, should not touch. + "a": {roachpb.PENDING, now - txnCleanupThreshold + 1, 0, []roachpb.Span{{Key: roachpb.Key("q")}}, false}, + // Old, but still heartbeat. No GC. + "b": {roachpb.PENDING, 0, now - txnCleanupThreshold + 1, nil, false}, + // Old and aborted, should delete. + "c": {roachpb.ABORTED, now - txnCleanupThreshold - 1, 0, nil, true}, + // Abandoned and pending, so should push and abort it successfully. + // But it's not old enough to actively go after the sequence cache. + // There's some room for optimization here by ways of sharing the + // transaction grooming results with the corresponding operation for + // the sequence cache, but that's only worthwhile in weird cases. + "d": {roachpb.PENDING, now - dAbandon, 0, nil, false}, + // Committed and fresh, so no action. + "e": {roachpb.COMMITTED, now - txnCleanupThreshold + 1, 0, nil, false}, + // Committed, old and intentless. Bye bye. + "f": {roachpb.COMMITTED, now - txnCleanupThreshold - 1, 0, nil, true}, + // Committed and old, but with intent. The intent is resolvable, + // so the txn entry should be GC'ed. + "g": {roachpb.COMMITTED, now - txnCleanupThreshold - 1, 0, + []roachpb.Span{{Key: roachpb.Key("z")}}, true}, + // Same as the previous one, but we've rigged things so that the intent + // resolution here will fail and consequently no GC is expected. + "h": {roachpb.COMMITTED, now - txnCleanupThreshold - 1, 0, + []roachpb.Span{{Key: roachpb.Key("z")}}, true}, + } + + after := map[string]*spec{} + for k := range before { + sCopy := before[k] + after[k] = &sCopy + } + + // Test outcome follows, described as changes to the previous state. + // A status of -1 corresponds to a GC'ed record. + after["a"].intentSpans = nil // expect no attempts to resolve the intent + after["c"].status = -1 + after["d"].status = roachpb.ABORTED + after["f"].status = -1 + after["g"].status = -1 + + resolved := map[string][]roachpb.Span{} + TestingCommandFilter = func(req roachpb.Request, _ roachpb.Header) error { + if resArgs, ok := req.(*roachpb.ResolveIntentRequest); ok { + id := string(resArgs.IntentTxn.Key) + resolved[id] = append(resolved[id], roachpb.Span{ + Key: resArgs.Key, + EndKey: resArgs.EndKey, + }) + // We've special cased one test case. Note that the intent is still + // counted in `resolved`. + if id == "h" { + return util.Errorf("boom") + } + } + return nil + } + tc := testContext{} + tc.Start(t) + defer tc.Stop() + defer func() { TestingCommandFilter = nil }() + tc.manualClock.Set(int64(now)) + + txns := map[string]roachpb.Transaction{} + var epo uint32 + for strKey, sp := range before { + epo++ + baseKey := roachpb.Key(strKey) + txnClock := hlc.NewClock(hlc.NewManualClock(int64(sp.ts)).UnixNano) + txn := newTransaction("txn1", baseKey, 1, roachpb.SERIALIZABLE, txnClock) + txn.Status = sp.status + txn.Intents = sp.intentSpans + txn.LastHeartbeat = &roachpb.Timestamp{WallTime: int64(sp.heartbeatTS)} + txns[strKey] = *txn + key := keys.TransactionKey(baseKey, txn.ID) + if err := engine.MVCCPutProto(tc.engine, nil, key, roachpb.ZeroTimestamp, nil, txn); err != nil { + t.Fatal(err) + } + if err := tc.rng.sequence.Put(tc.engine, txn.ID, epo, 2*epo, txn.Key, txn.Timestamp, nil /* err */); err != nil { + t.Fatal(err) + } + } + + // Run GC. + gcQ := newGCQueue(tc.gossip) + cfg := tc.gossip.GetSystemConfig() + if cfg == nil { + t.Fatal("nil config") + } + + if err := gcQ.process(tc.clock.Now(), tc.rng, cfg); err != nil { + t.Fatal(err) + } + + util.SucceedsWithin(t, time.Second, func() error { + for strKey, sp := range after { + txn := &roachpb.Transaction{} + key := keys.TransactionKey(roachpb.Key(strKey), txns[strKey].ID) + ok, err := engine.MVCCGetProto(tc.engine, key, roachpb.ZeroTimestamp, true, nil, txn) + if err != nil { + return err + } + if expGC := (sp.status == -1); expGC != !ok { + return fmt.Errorf("%s: expected gc: %t, but found %s", strKey, expGC, txn) + } + if !reflect.DeepEqual(resolved[strKey], sp.intentSpans) { + return fmt.Errorf("%s: unexpected intent resolutions:\nexpected: %s\nobserved: %s", + strKey, sp.intentSpans, resolved[strKey]) + } + if kvs, err := tc.rng.sequence.GetAllID(tc.store.Engine(), txns[strKey].ID); err != nil { + t.Fatal(err) + } else if (len(kvs) != 0) == sp.seqGC { + return fmt.Errorf("%s: expected sequence cache gc: %t, found %+v", strKey, sp.seqGC, kvs) + } + } + return nil + }) +} + // TestGCQueueIntentResolution verifies intent resolution with many // intents spanning just two transactions. func TestGCQueueIntentResolution(t *testing.T) { diff --git a/storage/replica.go b/storage/replica.go index a6e23a5adbe7..9795c45520b2 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -1416,12 +1416,9 @@ func (r *Replica) handleSkippedIntents(intents []intentsWithArg) { Span: roachpb.Span{Key: r.Desc().StartKey.AsRawKey()}, } { - prefix := keys.SequenceCacheKeyPrefix(r.Desc().RangeID, txn.ID) - kvs, _, err := engine.MVCCScan(r.store.Engine(), prefix, - prefix.PrefixEnd(), 0 /* max */, roachpb.ZeroTimestamp, - false /* consistent */, nil /* txn */) + kvs, err := r.sequence.GetAllID(r.store.Engine(), txn.ID) if err != nil { - log.Warning(err) + panic(err) // TODO(tschottdorf): ReplicaCorruptionError } // Allocate slots for the transaction key and the sequence // cache keys. @@ -1522,9 +1519,6 @@ func (r *Replica) maybeSetCorrupt(err error) error { // commands have been **proposed** (not executed). This ensures that if a // waiting client retries immediately after calling this function, it will not // hit the same intents again. -// TODO(tschottdorf): once Txn records have a list of possibly open intents, -// resolveIntents should send an RPC to update the transaction(s) as well (for -// those intents with non-pending Txns). func (r *Replica) resolveIntents(ctx context.Context, intents []roachpb.Intent, wait bool) error { trace := tracer.FromCtx(ctx) tracer.ToCtx(ctx, nil) // we're doing async stuff below; those need new traces @@ -1589,8 +1583,8 @@ func (r *Replica) resolveIntents(ctx context.Context, intents []roachpb.Intent, case *roachpb.NotLeaderError: case *roachpb.RangeNotFoundError: default: - // TODO(tschottdorf): Does this need to be a panic? - panic(fmt.Sprintf("local intent resolution failed with unexpected error: %s", err)) + log.Warningf("local intent resolution failed with unexpected error: %s", err) + return err } } } diff --git a/storage/replica_command.go b/storage/replica_command.go index eb05f9ca7508..459f1b4793d4 100644 --- a/storage/replica_command.go +++ b/storage/replica_command.go @@ -327,14 +327,6 @@ func (r *Replica) EndTransaction(batch engine.Engine, ms *engine.MVCCStats, h ro reply.Txn.Status = roachpb.ABORTED } - asSkippedIntents := func(txn *roachpb.Transaction, spans []roachpb.Span) []roachpb.Intent { - ret := make([]roachpb.Intent, len(spans)) - for i := range spans { - ret[i].Span, ret[i].Txn = spans[i], *txn - } - return ret - } - if deadlineLapsed { // FIXME(#3037): // If the deadline has lapsed, return all the intents for @@ -342,7 +334,7 @@ func (r *Replica) EndTransaction(batch engine.Engine, ms *engine.MVCCStats, h ro // and (b) not able to write on error (see #1989), we can't write // ABORTED into the master transaction record, which remains // PENDING, and that's pretty bad. - return reply, asSkippedIntents(reply.Txn, args.IntentSpans), roachpb.NewTransactionAbortedError(reply.Txn) + return reply, roachpb.AsIntents(args.IntentSpans, reply.Txn), roachpb.NewTransactionAbortedError(reply.Txn) } // Verify that we can either commit it or abort it (according @@ -356,7 +348,7 @@ func (r *Replica) EndTransaction(batch engine.Engine, ms *engine.MVCCStats, h ro // that we know them, so we return them all for asynchronous // resolution (we're currently not able to write on error, but // see #1989). - return reply, asSkippedIntents(reply.Txn, args.IntentSpans), roachpb.NewTransactionAbortedError(reply.Txn) + return reply, roachpb.AsIntents(args.IntentSpans, reply.Txn), roachpb.NewTransactionAbortedError(reply.Txn) } else if h.Txn.Epoch < reply.Txn.Epoch { // TODO(tschottdorf): this leaves the Txn record (and more // importantly, intents) dangling; we can't currently write on @@ -924,6 +916,10 @@ func (r *Replica) PushTxn(batch engine.Engine, ms *engine.MVCCStats, h roachpb.H // The transaction doesn't exist yet on disk; we're allowed to abort it. reply.PusheeTxn = *args.PusheeTxn.Clone() reply.PusheeTxn.Status = roachpb.ABORTED + if args.PushType == roachpb.CLEANUP_TXN { + // If we're only here to clean up, no reason to persist anything. + return reply, nil + } return reply, engine.MVCCPutProto(batch, ms, key, roachpb.ZeroTimestamp, nil, &reply.PusheeTxn) } diff --git a/storage/sequence_cache.go b/storage/sequence_cache.go index 5bbb377087e1..45c910efe69c 100644 --- a/storage/sequence_cache.go +++ b/storage/sequence_cache.go @@ -104,6 +104,33 @@ func (sc *SequenceCache) Get(e engine.Engine, id []byte, dest *roachpb.SequenceC return epoch, seq, nil } +// GetAllID returns all the key-value pairs for the given ID from the engine. +func (sc *SequenceCache) GetAllID(e engine.Engine, id []byte) ([]roachpb.KeyValue, error) { + prefix := keys.SequenceCacheKeyPrefix(sc.rangeID, id) + kvs, _, err := engine.MVCCScan(e, prefix, prefix.PrefixEnd(), 0, /* max */ + roachpb.ZeroTimestamp, true /* consistent */, nil /* txn */) + return kvs, err +} + +// Iterate walks through the sequence cache, invoking the given callback for +// each unmarshaled entry with the key, the ID and the decoded entry. +func (sc *SequenceCache) Iterate(e engine.Engine, f func([]byte, []byte, roachpb.SequenceCacheEntry)) { + _, _ = engine.MVCCIterate(e, sc.min, sc.max, roachpb.ZeroTimestamp, + true /* consistent */, nil /* txn */, false, /* !reverse */ + func(kv roachpb.KeyValue) (bool, error) { + var entry roachpb.SequenceCacheEntry + id, _, _, err := decodeSequenceCacheKey(kv.Key, nil) + if err != nil { + panic(err) // TODO(tschottdorf): ReplicaCorruptionError + } + if err := kv.Value.GetProto(&entry); err != nil { + panic(err) // TODO(tschottdorf): ReplicaCorruptionError + } + f(kv.Key, id, entry) + return false, nil + }) +} + func copySeqCache(e engine.Engine, srcID, dstID roachpb.RangeID, keyMin, keyMax engine.MVCCKey) error { var scratch [64]byte return e.Iterate(keyMin, keyMax,