Skip to content

Commit

Permalink
gc the transaction table
Browse files Browse the repository at this point in the history
this is step two (and not the last) for safely garbage collecting
transaction entries (see cockroachdb#2062).

EndTransaction calls now store the non-local portion of the intents
in their transaction record (if there are any; otherwise the record
is instantly deleted; this is not new in this change).

on each run of the GC queue for a given range, its transaction table
is scanned and the following actions taken

* intents referenced from any "old" transaction records are resolved
  asynchronously
* old pending transactions are pushed (which will succeed), effectively
  aborting them
* old aborted transactions are added to the GC request.

TODO:
* necessary to send extra GC request?
* unregister intents from their Txn's entries when they've been resolved
  (currently they are never removed, so each GC run will waste work resolving
  a long gone intent every time and can never GC Txns with non-local intents)
  • Loading branch information
tbg committed Aug 14, 2015
1 parent d889c7e commit 30ed625
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 0 deletions.
96 changes: 96 additions & 0 deletions storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package storage

import (
"fmt"
"math"
"sync"
"time"

"github.com/cockroachdb/cockroach/client"
"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/security"
"github.com/cockroachdb/cockroach/storage/engine"
Expand All @@ -47,6 +49,12 @@ const (
// intentAgeThreshold is the threshold after which an extant intent
// will be resolved.
intentAgeThreshold = 2 * time.Hour // 2 hour
// txnCleanupThreshold is the threshold after which a transaction is
// considered abandoned and fit for removal, as measured by the maximum
// of its last heartbeat and timestamp.
// TODO(tschottdorf): need to enforce at all times that this is much
// larger than the heartbeat interval used by the coordinator.
txnCleanupThreshold = time.Hour
)

// gcQueue manages a queue of replicas slated to be scanned in their
Expand All @@ -57,6 +65,9 @@ const (
// as implemented going forward).
// - Resolve extant write intents and determine oldest non-resolvable
// intent.
// - GC of old transaction entries. This should include most committed
// entries almost immediately and, after a threshold on inactivity,
// all others.
//
// The shouldQueue function combines the need for both tasks into a
// single priority. If any task is overdue, shouldQueue returns true.
Expand Down Expand Up @@ -127,13 +138,16 @@ func (gcq *gcQueue) process(now proto.Timestamp, repl *Replica) error {
// Compute intent expiration (intent age at which we attempt to resolve).
intentExp := now
intentExp.WallTime -= intentAgeThreshold.Nanoseconds()
txnExp := now
txnExp.WallTime -= txnCleanupThreshold.Nanoseconds()

gcArgs := &proto.GCRequest{
RequestHeader: proto.RequestHeader{
Timestamp: now,
RangeID: repl.Desc().RangeID,
},
}

var mu sync.Mutex
var oldestIntentNanos int64 = math.MaxInt64
var expBaseKey proto.Key
Expand Down Expand Up @@ -222,9 +236,24 @@ func (gcq *gcQueue) process(now proto.Timestamp, repl *Replica) error {
gcArgs.Key = gcArgs.Keys[0].Key
gcArgs.EndKey = gcArgs.Keys[len(gcArgs.Keys)-1].Key.Next()

txnKeys, err := processTransactionTable(repl, txnMap, intentMap, txnExp)
if err != nil {
return err
}

// Append the keys only now, after having set the key range for gcArgs.
// Those keys are all range-local.
// TODO(tschottdorf): Probably need two requests because we need to have
// the full key range in the header, but we can't span from range-local
// to range-data.
gcArgs.Keys = append(gcArgs.Keys, txnKeys...)

// Process push transactions in parallel.
var wg sync.WaitGroup
for _, txn := range txnMap {
if txn.Status != proto.PENDING {
continue
}
wg.Add(1)
go pushTxn(repl, now, txn, updateOldestIntent, &wg)
}
Expand Down Expand Up @@ -261,6 +290,73 @@ func (gcq *gcQueue) process(now proto.Timestamp, repl *Replica) error {
return nil
}

// processTransactionTable scans the transaction table and updates txnMap with
// those transactions which are old and either PENDING or with intents
// registered. In the first case we want to push the transaction so that it is
// aborted, and in the second case we care about checking the intents and
// updating the record accordingly so that the next gc cycle can dispose of
// them. The transaction records which can be gc'ed are returned separately and
// not added to txnMap nor intentMap.
func processTransactionTable(r *Replica, txnMap map[string]*proto.Transaction, intentMap map[string][]proto.Intent, cutoff proto.Timestamp) ([]proto.GCRequest_GCKey, error) {
snap := r.rm.Engine().NewSnapshot()
defer snap.Close()

var gcKeys []proto.GCRequest_GCKey
handleOne := func(kv proto.KeyValue) error {
var txn proto.Transaction
if err := gogoproto.Unmarshal(kv.Value.Bytes, &txn); err != nil {
return err
}
ts := txn.Timestamp
if heartbeatTS := txn.LastHeartbeat; heartbeatTS != nil {
ts.Forward(*heartbeatTS)
}
if !ts.Less(cutoff) {
return nil
}

id := string(txn.ID)
// Always collect all intents referred to by stale transactions. In
// practice, there should barely be any intents here, and (currently)
// they only occur on committed transactions.
if len(txn.Intents) > 0 {
intentMap[id] = append(intentMap[id], txn.Intents...)
txnMap[id] = &txn
}

// The transaction record should be considered for removal.
switch txn.Status {
case proto.PENDING:
// Marked as running, so we need to push it to abort it.
txnMap[id] = &txn
return nil
case proto.COMMITTED:
// It's committed, so it doesn't need a push but it might be
// waiting for some intents to be cleaned up.
if len(txn.Intents) > 0 {
// Can't GC until the intents are gone. We've already
// collected them above, so nothing else to do.
return nil
}
case proto.ABORTED:
// It's aborted and old, so once removed a push to it will
// still succeed (by design). So this is safe to remove.
default:
panic(fmt.Sprintf("invalid transaction state: %s", txn))
}
gcKeys = append(gcKeys, proto.GCRequest_GCKey{Key: kv.Key}) // zero timestamp
return nil
}

startKey := keys.TransactionKey(proto.KeyMin, nil)
endKey := keys.TransactionKey(proto.KeyMax, nil)

_, err := engine.MVCCIterate(snap, startKey, endKey, proto.ZeroTimestamp, true /* consistent */, nil /* txn */, false /* !reverse */, func(kv proto.KeyValue) (bool, error) {
return false, handleOne(kv)
})
return gcKeys, err
}

// timer returns a constant duration to space out GC processing
// for successive queued replicas.
func (gcq *gcQueue) timer() time.Duration {
Expand Down
105 changes: 105 additions & 0 deletions storage/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ package storage
import (
"fmt"
"math"
"reflect"
"testing"
"time"

"github.com/cockroachdb/cockroach/config"
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/cockroachdb/cockroach/util/log"
gogoproto "github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -293,6 +297,107 @@ func TestGCQueueProcess(t *testing.T) {
}
}

func TestGCQueueTransactionTable(t *testing.T) {
defer leaktest.AfterTest(t)

const now time.Duration = 3 * 24 * time.Hour
const dAbandon = -2 * DefaultHeartbeatInterval
type spec struct {
status proto.TransactionStatus
ts time.Duration
heartbeatTS time.Duration
intents []proto.Intent
}
// Describes the state of the Txn table before the test.
before := map[string]spec{
// Too young, should not touch.
"a": {proto.PENDING, now - txnCleanupThreshold + 1, 0, []proto.Intent{{Key: proto.Key("q")}}},
// Old, but still heartbeat. No GC.
"b": {proto.PENDING, 0, now - txnCleanupThreshold + 1, nil},
// Old and aborted, should delete.
"c": {proto.ABORTED, now - txnCleanupThreshold - 1, 0, nil},
// Old and pending, so should push and abort it successfully.
"d": {proto.PENDING, now - dAbandon, 0, nil},
// Committed and fresh, so no action.
"e": {proto.COMMITTED, now - txnCleanupThreshold + 1, 0, nil},
// Committed, old and intentless. Bye bye.
"f": {proto.COMMITTED, now - txnCleanupThreshold - 1, 0, nil},
// Committed and old, but with intent. No action on txn record,
// but tries to resolve intents.
"g": {proto.COMMITTED, now - txnCleanupThreshold - 1, 0,
[]proto.Intent{{Key: proto.Key("z")}}},
}

after := map[string]*spec{}
for k := range before {
sCopy := before[k]
after[k] = &sCopy
}

// Test outcome follows, described as changes to the previous state.
after["a"].intents = nil // expect no attempts to resolve the intent
after["c"].status = -1
after["d"].status = proto.ABORTED
after["f"].status = -1

resolved := map[string][]proto.Intent{}
TestingCommandFilter = func(req proto.Request) error {
if resArgs, ok := req.(*proto.ResolveIntentRequest); ok {
id := string(resArgs.Txn.Key)
resolved[id] = append(resolved[id], proto.Intent{
Key: resArgs.Header().Key,
EndKey: resArgs.Header().EndKey,
})
}
return nil
}
tc := testContext{}
tc.Start(t)
defer tc.Stop()
defer func() { TestingCommandFilter = nil }()
tc.manualClock.Set(int64(now))

txns := map[string]proto.Transaction{}
for strKey, sp := range before {
baseKey := proto.Key(strKey)
txnClock := hlc.NewClock(hlc.NewManualClock(int64(sp.ts)).UnixNano)
txn := newTransaction("txn1", baseKey, 1, proto.SERIALIZABLE, txnClock)
txn.Status = sp.status
txn.Intents = sp.intents
txn.LastHeartbeat = &proto.Timestamp{WallTime: int64(sp.heartbeatTS)}
txns[strKey] = *txn
key := keys.TransactionKey(baseKey, txn.ID)
if err := engine.MVCCPutProto(tc.engine, nil, key, proto.ZeroTimestamp, nil, txn); err != nil {
t.Fatal(err)
}
}

// Run GC.
gcQ := newGCQueue()
if err := gcQ.process(tc.clock.Now(), tc.rng); err != nil {
t.Fatal(err)
}

util.SucceedsWithin(t, time.Second, func() error {
for strKey, sp := range after {
txn := &proto.Transaction{}
key := keys.TransactionKey(proto.Key(strKey), txns[strKey].ID)
ok, err := engine.MVCCGetProto(tc.engine, key, proto.ZeroTimestamp, true, nil, txn)
if err != nil {
return err
}
if expGC := (sp.status == -1); expGC != !ok {
return fmt.Errorf("%s: expected gc: %t, but found %s", strKey, expGC, txn)
}
if !reflect.DeepEqual(resolved[strKey], sp.intents) {
return fmt.Errorf("%s: unexpected intent resolutions:\nexpected: %s\nobserved: %s",
strKey, sp.intents, resolved[strKey])
}
}
return nil
})
}

// TestGCQueueIntentResolution verifies intent resolution with many
// intents spanning just two transactions.
func TestGCQueueIntentResolution(t *testing.T) {
Expand Down

0 comments on commit 30ed625

Please sign in to comment.