diff --git a/storage/gc_queue.go b/storage/gc_queue.go index 287b8ff8d202..24cc124d70d9 100644 --- a/storage/gc_queue.go +++ b/storage/gc_queue.go @@ -18,6 +18,7 @@ package storage import ( + "fmt" "math" "sync" "time" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/client" "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" + "github.com/cockroachdb/cockroach/keys" "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/security" "github.com/cockroachdb/cockroach/storage/engine" @@ -47,6 +49,12 @@ const ( // intentAgeThreshold is the threshold after which an extant intent // will be resolved. intentAgeThreshold = 2 * time.Hour // 2 hour + // txnCleanupThreshold is the threshold after which a transaction is + // considered abandoned and fit for removal, as measured by the maximum + // of its last heartbeat and timestamp. + // TODO(tschottdorf): need to enforce at all times that this is much + // larger than the heartbeat interval used by the coordinator. + txnCleanupThreshold = time.Hour ) // gcQueue manages a queue of replicas slated to be scanned in their @@ -57,6 +65,9 @@ const ( // as implemented going forward). // - Resolve extant write intents and determine oldest non-resolvable // intent. +// - GC of old transaction entries. This should include most committed +// entries almost immediately and, after a threshold on inactivity, +// all others. // // The shouldQueue function combines the need for both tasks into a // single priority. If any task is overdue, shouldQueue returns true. @@ -127,6 +138,8 @@ func (gcq *gcQueue) process(now proto.Timestamp, repl *Replica) error { // Compute intent expiration (intent age at which we attempt to resolve). intentExp := now intentExp.WallTime -= intentAgeThreshold.Nanoseconds() + txnExp := now + txnExp.WallTime -= txnCleanupThreshold.Nanoseconds() gcArgs := &proto.GCRequest{ RequestHeader: proto.RequestHeader{ @@ -134,6 +147,7 @@ func (gcq *gcQueue) process(now proto.Timestamp, repl *Replica) error { RangeID: repl.Desc().RangeID, }, } + var mu sync.Mutex var oldestIntentNanos int64 = math.MaxInt64 var expBaseKey proto.Key @@ -222,9 +236,24 @@ func (gcq *gcQueue) process(now proto.Timestamp, repl *Replica) error { gcArgs.Key = gcArgs.Keys[0].Key gcArgs.EndKey = gcArgs.Keys[len(gcArgs.Keys)-1].Key.Next() + txnKeys, err := processTransactionTable(repl, txnMap, intentMap, txnExp) + if err != nil { + return err + } + + // Append the keys only now, after having set the key range for gcArgs. + // Those keys are all range-local. + // TODO(tschottdorf): Probably need two requests because we need to have + // the full key range in the header, but we can't span from range-local + // to range-data. + gcArgs.Keys = append(gcArgs.Keys, txnKeys...) + // Process push transactions in parallel. var wg sync.WaitGroup for _, txn := range txnMap { + if txn.Status != proto.PENDING { + continue + } wg.Add(1) go pushTxn(repl, now, txn, updateOldestIntent, &wg) } @@ -261,6 +290,73 @@ func (gcq *gcQueue) process(now proto.Timestamp, repl *Replica) error { return nil } +// processTransactionTable scans the transaction table and updates txnMap with +// those transactions which are old and either PENDING or with intents +// registered. In the first case we want to push the transaction so that it is +// aborted, and in the second case we care about checking the intents and +// updating the record accordingly so that the next gc cycle can dispose of +// them. The transaction records which can be gc'ed are returned separately and +// not added to txnMap nor intentMap. +func processTransactionTable(r *Replica, txnMap map[string]*proto.Transaction, intentMap map[string][]proto.Intent, cutoff proto.Timestamp) ([]proto.GCRequest_GCKey, error) { + snap := r.rm.Engine().NewSnapshot() + defer snap.Close() + + var gcKeys []proto.GCRequest_GCKey + handleOne := func(kv proto.KeyValue) error { + var txn proto.Transaction + if err := gogoproto.Unmarshal(kv.Value.Bytes, &txn); err != nil { + return err + } + ts := txn.Timestamp + if heartbeatTS := txn.LastHeartbeat; heartbeatTS != nil { + ts.Forward(*heartbeatTS) + } + if !ts.Less(cutoff) { + return nil + } + + id := string(txn.ID) + // Always collect all intents referred to by stale transactions. In + // practice, there should barely be any intents here, and (currently) + // they only occur on committed transactions. + if len(txn.Intents) > 0 { + intentMap[id] = append(intentMap[id], txn.Intents...) + txnMap[id] = &txn + } + + // The transaction record should be considered for removal. + switch txn.Status { + case proto.PENDING: + // Marked as running, so we need to push it to abort it. + txnMap[id] = &txn + return nil + case proto.COMMITTED: + // It's committed, so it doesn't need a push but it might be + // waiting for some intents to be cleaned up. + if len(txn.Intents) > 0 { + // Can't GC until the intents are gone. We've already + // collected them above, so nothing else to do. + return nil + } + case proto.ABORTED: + // It's aborted and old, so once removed a push to it will + // still succeed (by design). So this is safe to remove. + default: + panic(fmt.Sprintf("invalid transaction state: %s", txn)) + } + gcKeys = append(gcKeys, proto.GCRequest_GCKey{Key: kv.Key}) // zero timestamp + return nil + } + + startKey := keys.TransactionKey(proto.KeyMin, nil) + endKey := keys.TransactionKey(proto.KeyMax, nil) + + _, err := engine.MVCCIterate(snap, startKey, endKey, proto.ZeroTimestamp, true /* consistent */, nil /* txn */, false /* !reverse */, func(kv proto.KeyValue) (bool, error) { + return false, handleOne(kv) + }) + return gcKeys, err +} + // timer returns a constant duration to space out GC processing // for successive queued replicas. func (gcq *gcQueue) timer() time.Duration { diff --git a/storage/gc_queue_test.go b/storage/gc_queue_test.go index 5d8f0c42cbcd..02c32e74a538 100644 --- a/storage/gc_queue_test.go +++ b/storage/gc_queue_test.go @@ -20,13 +20,17 @@ package storage import ( "fmt" "math" + "reflect" "testing" + "time" "github.com/cockroachdb/cockroach/config" "github.com/cockroachdb/cockroach/gossip" "github.com/cockroachdb/cockroach/keys" "github.com/cockroachdb/cockroach/proto" "github.com/cockroachdb/cockroach/storage/engine" + "github.com/cockroachdb/cockroach/util" + "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/leaktest" "github.com/cockroachdb/cockroach/util/log" gogoproto "github.com/gogo/protobuf/proto" @@ -293,6 +297,107 @@ func TestGCQueueProcess(t *testing.T) { } } +func TestGCQueueTransactionTable(t *testing.T) { + defer leaktest.AfterTest(t) + + const now time.Duration = 3 * 24 * time.Hour + const dAbandon = -2 * DefaultHeartbeatInterval + type spec struct { + status proto.TransactionStatus + ts time.Duration + heartbeatTS time.Duration + intents []proto.Intent + } + // Describes the state of the Txn table before the test. + before := map[string]spec{ + // Too young, should not touch. + "a": {proto.PENDING, now - txnCleanupThreshold + 1, 0, []proto.Intent{{Key: proto.Key("q")}}}, + // Old, but still heartbeat. No GC. + "b": {proto.PENDING, 0, now - txnCleanupThreshold + 1, nil}, + // Old and aborted, should delete. + "c": {proto.ABORTED, now - txnCleanupThreshold - 1, 0, nil}, + // Old and pending, so should push and abort it successfully. + "d": {proto.PENDING, now - dAbandon, 0, nil}, + // Committed and fresh, so no action. + "e": {proto.COMMITTED, now - txnCleanupThreshold + 1, 0, nil}, + // Committed, old and intentless. Bye bye. + "f": {proto.COMMITTED, now - txnCleanupThreshold - 1, 0, nil}, + // Committed and old, but with intent. No action on txn record, + // but tries to resolve intents. + "g": {proto.COMMITTED, now - txnCleanupThreshold - 1, 0, + []proto.Intent{{Key: proto.Key("z")}}}, + } + + after := map[string]*spec{} + for k := range before { + sCopy := before[k] + after[k] = &sCopy + } + + // Test outcome follows, described as changes to the previous state. + after["a"].intents = nil // expect no attempts to resolve the intent + after["c"].status = -1 + after["d"].status = proto.ABORTED + after["f"].status = -1 + + resolved := map[string][]proto.Intent{} + TestingCommandFilter = func(req proto.Request) error { + if resArgs, ok := req.(*proto.ResolveIntentRequest); ok { + id := string(resArgs.Txn.Key) + resolved[id] = append(resolved[id], proto.Intent{ + Key: resArgs.Header().Key, + EndKey: resArgs.Header().EndKey, + }) + } + return nil + } + tc := testContext{} + tc.Start(t) + defer tc.Stop() + defer func() { TestingCommandFilter = nil }() + tc.manualClock.Set(int64(now)) + + txns := map[string]proto.Transaction{} + for strKey, sp := range before { + baseKey := proto.Key(strKey) + txnClock := hlc.NewClock(hlc.NewManualClock(int64(sp.ts)).UnixNano) + txn := newTransaction("txn1", baseKey, 1, proto.SERIALIZABLE, txnClock) + txn.Status = sp.status + txn.Intents = sp.intents + txn.LastHeartbeat = &proto.Timestamp{WallTime: int64(sp.heartbeatTS)} + txns[strKey] = *txn + key := keys.TransactionKey(baseKey, txn.ID) + if err := engine.MVCCPutProto(tc.engine, nil, key, proto.ZeroTimestamp, nil, txn); err != nil { + t.Fatal(err) + } + } + + // Run GC. + gcQ := newGCQueue() + if err := gcQ.process(tc.clock.Now(), tc.rng); err != nil { + t.Fatal(err) + } + + util.SucceedsWithin(t, time.Second, func() error { + for strKey, sp := range after { + txn := &proto.Transaction{} + key := keys.TransactionKey(proto.Key(strKey), txns[strKey].ID) + ok, err := engine.MVCCGetProto(tc.engine, key, proto.ZeroTimestamp, true, nil, txn) + if err != nil { + return err + } + if expGC := (sp.status == -1); expGC != !ok { + return fmt.Errorf("%s: expected gc: %t, but found %s", strKey, expGC, txn) + } + if !reflect.DeepEqual(resolved[strKey], sp.intents) { + return fmt.Errorf("%s: unexpected intent resolutions:\nexpected: %s\nobserved: %s", + strKey, sp.intents, resolved[strKey]) + } + } + return nil + }) +} + // TestGCQueueIntentResolution verifies intent resolution with many // intents spanning just two transactions. func TestGCQueueIntentResolution(t *testing.T) {