mgo/txn: in PurgeMissing, also remove aborted and applied transactions #2

Open
wants to merge 3 commits into
from
View
@@ -2,8 +2,8 @@ package txn_test
import (
"bytes"
- "gopkg.in/mgo.v2"
. "gopkg.in/check.v1"
+ "gopkg.in/mgo.v2"
"os/exec"
"time"
)
View
@@ -2,10 +2,10 @@ package txn_test
import (
"flag"
+ . "gopkg.in/check.v1"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"gopkg.in/mgo.v2/txn"
- . "gopkg.in/check.v1"
"math/rand"
"time"
)
View
@@ -2,8 +2,8 @@ package txn
import (
"fmt"
- "gopkg.in/mgo.v2/bson"
. "gopkg.in/check.v1"
+ "gopkg.in/mgo.v2/bson"
)
type TarjanSuite struct{}
View
@@ -381,37 +381,40 @@ func (r *Runner) ChangeLog(logc *mgo.Collection) {
// a system that has seen unavoidable corruption back in a working state.
func (r *Runner) PurgeMissing(collections ...string) error {
type M map[string]interface{}
- type S []interface{}
type TDoc struct {
- DocId interface{} "_id"
- TxnIds []string "txn-queue"
+ DocId interface{} `bson:"_id"`
+ TxnIds []string `bson:"txn-queue"`
}
- found := make(map[bson.ObjectId]bool)
+ cache := newTxnsCache(r.tc)
colls := make(map[string]bool)
+ var idsToRemove []string
sort.Strings(collections)
for _, collection := range collections {
+ logf("collection %s", collection)
c := r.tc.Database.C(collection)
- iter := c.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter()
+ iter := c.Find(nil).Select(M{"_id": 1, "txn-queue": 1}).Iter()
var tdoc TDoc
for iter.Next(&tdoc) {
+ idsToRemove = idsToRemove[:0]
+ countToKeep := 0
for _, fullTxnId := range tdoc.TxnIds {
- txnId := bson.ObjectIdHex(fullTxnId[:24])
- if found[txnId] {
- continue
- }
- if r.tc.FindId(txnId).One(nil) == nil {
- found[txnId] = true
- continue
- }
- logf("WARNING: purging from document %s/%v the missing transaction id %s", collection, tdoc.DocId, txnId)
- err := c.UpdateId(tdoc.DocId, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
- if err != nil {
- return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
+ if cache.isPending(fullTxnId) {
+ countToKeep++
+ } else {
+ idsToRemove = append(idsToRemove, fullTxnId)
}
}
+ if len(idsToRemove) == 0 {
+ continue
+ }
+ logf("WARNING: purging from document %s/%v %s (keeping %d pending)", collection, tdoc.DocId, missingInfo(idsToRemove), countToKeep)
+ err := c.UpdateId(tdoc.DocId, M{"$pullAll": M{"txn-queue": idsToRemove}})
+ if err != nil {
+ return fmt.Errorf("cannot remove from %s/%v %s: %v", collection, tdoc.DocId, missingInfo(idsToRemove), err)
+ }
}
if err := iter.Close(); err != nil {
return err
@@ -424,28 +427,67 @@ func (r *Runner) PurgeMissing(collections ...string) error {
TxnIds []string "txn-queue"
}
- iter := r.sc.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter()
+ iter := r.sc.Find(nil).Select(M{"_id": 1, "txn-queue": 1}).Iter()
var stdoc StashTDoc
for iter.Next(&stdoc) {
+ idsToRemove = idsToRemove[:0]
+ countToKeep := 0
for _, fullTxnId := range stdoc.TxnIds {
- txnId := bson.ObjectIdHex(fullTxnId[:24])
- if found[txnId] {
- continue
- }
- if r.tc.FindId(txnId).One(nil) == nil {
- found[txnId] = true
- continue
- }
- logf("WARNING: purging from stash document %s/%v the missing transaction id %s", stdoc.Id.C, stdoc.Id.Id, txnId)
- err := r.sc.UpdateId(stdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
- if err != nil {
- return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
+ if cache.isPending(fullTxnId) {
+ countToKeep++
+ } else {
+ idsToRemove = append(idsToRemove, fullTxnId)
}
}
+ logf("WARNING: purging from stash document %s/%v %s (keeping %d pending)", stdoc.Id.C, stdoc.Id.Id, missingInfo(idsToRemove), countToKeep)
+ err := r.sc.UpdateId(stdoc.Id, M{"$pullAll": M{"txn-queue": idsToRemove}})
+ if err != nil {
+ return fmt.Errorf("cannot remove from %s/%v %s: %v", stdoc.Id.C, stdoc.Id.Id, missingInfo(idsToRemove), err)
+ }
}
return iter.Close()
}
+func missingInfo(ids []string) string {
+ if len(ids) < 5 {
+ return fmt.Sprintf("the missing transaction ids %v", ids)
+ }
+ return fmt.Sprintf("%d missing transaction ids", len(ids))
+}
+
+type txnsCache struct {
+ pending map[bson.ObjectId]bool
+ tc *mgo.Collection
+}
+
+func newTxnsCache(tc *mgo.Collection) *txnsCache {
+ return &txnsCache{
+ pending: make(map[bson.ObjectId]bool),
+ tc: tc,
+ }
+}
+
+// isPending reports whether the given transaction id (as found in
+// a txn-queue field) represents a currently pending transaction.
+func (c *txnsCache) isPending(id string) bool {
+ if len(id) < 24 {
+ logf("WARNING: invalid id %q", id)
+ return false
+ }
+ txnId := bson.ObjectIdHex(id[:24])
+ isPending, ok := c.pending[txnId]
+ if ok {
+ return isPending
+ }
+ var txnDoc transaction
+ err := c.tc.FindId(txnId).Select(bson.D{{"_id", 1}, {"s", 1}}).One(&txnDoc)
+ if err == nil {
+ isPending = txnDoc.State != taborted && txnDoc.State != tapplied
+ }
+ c.pending[txnId] = isPending
+ return isPending
+}
+
func (r *Runner) load(id bson.ObjectId) (*transaction, error) {
var t transaction
err := r.tc.FindId(id).One(&t)
View
@@ -427,8 +427,8 @@ func (s *S) TestChangeLog(c *C) {
type IdList []interface{}
type Log struct {
- Docs IdList "d"
- Revnos []int64 "r"
+ Docs IdList `bson:"d"`
+ Revnos []int64 `bson:"r"`
}
var m map[string]*Log
err = chglog.FindId(id).One(&m)
@@ -553,7 +553,7 @@ func (s *S) TestPurgeMissing(c *C) {
err = s.accounts.FindId(want.Id).One(&got)
if want.Balance == -1 {
if err != mgo.ErrNotFound {
- c.Errorf("Account %d should not exist, find got err=%#v", err)
+ c.Errorf("Account %d should not exist, find got err=%#v", want.Id, err)
}
} else if err != nil {
c.Errorf("Account %d should have balance of %d, but wasn't found", want.Id, want.Balance)
@@ -563,6 +563,87 @@ func (s *S) TestPurgeMissing(c *C) {
}
}
+func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
+ // This test ensures that PurgeMissing can handle very large
+ // txn-queue fields. Previous iterations of PurgeMissing would
+ // trigger a 16MB aggregation pipeline result size limit when run
+ // against a documents or stashes with large numbers of txn-queue
+ // entries. PurgeMissing now no longer uses aggregation pipelines
+ // to work around this limit.
+
+ // The pipeline result size limitation was removed from MongoDB in 2.6 so
+ // this test is only run for older MongoDB version.
+ build, err := s.session.BuildInfo()
+ c.Assert(err, IsNil)
+ if build.VersionAtLeast(2, 6) {
+ c.Skip("This tests a problem that can only happen with MongoDB < 2.6 ")
+ }
+
+ // Insert a single document to work with.
+ err = s.accounts.Insert(M{"_id": 0, "balance": 100})
+ c.Assert(err, IsNil)
+
+ ops := []txn.Op{{
+ C: "accounts",
+ Id: 0,
+ Update: M{"$inc": M{"balance": 100}},
+ }}
+
+ // Generate one successful transaction.
+ good := bson.NewObjectId()
+ c.Logf("---- Running ops under transaction %q", good.Hex())
+ err = s.runner.Run(ops, good, nil)
+ c.Assert(err, IsNil)
+
+ // Generate another transaction which which will go missing.
+ missing := bson.NewObjectId()
+ c.Logf("---- Running ops under transaction %q (which will go missing)", missing.Hex())
+ err = s.runner.Run(ops, missing, nil)
+ c.Assert(err, IsNil)
+
+ err = s.tc.RemoveId(missing)
+ c.Assert(err, IsNil)
+
+ // Generate a txn-queue on the test document that's large enough
+ // that it used to cause PurgeMissing to exceed MongoDB's pipeline
+ // result 16MB size limit (MongoDB 2.4 and older only).
+ //
+ // The contents of the txn-queue field doesn't matter, only that
+ // it's big enough to trigger the size limit. The required size
+ // can also be achieved by using multiple documents as long as the
+ // cumulative size of all the txn-queue fields exceeds the
+ // pipeline limit. A single document is easier to work with for
+ // this test however.
+ //
+ // The txn id of the successful transaction is used fill the
+ // txn-queue because this takes advantage of a short circuit in
+ // PurgeMissing, dramatically speeding up the test run time.
+ const fakeQueueLen = 250000
+ fakeTxnQueue := make([]string, fakeQueueLen)
+ token := good.Hex() + "_12345678" // txn id + nonce
+ for i := 0; i < fakeQueueLen; i++ {
+ fakeTxnQueue[i] = token
+ }
+
+ err = s.accounts.UpdateId(0, bson.M{
+ "$set": bson.M{"txn-queue": fakeTxnQueue},
+ })
+ c.Assert(err, IsNil)
+
+ // PurgeMissing could hit the same pipeline result size limit when
+ // processing the txn-queue fields of stash documents so insert
+ // the large txn-queue there too to ensure that no longer happens.
+ err = s.sc.Insert(
+ bson.D{{"c", "accounts"}, {"id", 0}},
+ bson.M{"txn-queue": fakeTxnQueue},
+ )
+ c.Assert(err, IsNil)
+
+ c.Logf("---- Purging missing transactions")
+ err = s.runner.PurgeMissing("accounts")
+ c.Assert(err, IsNil)
+}
+
func (s *S) TestTxnQueueStressTest(c *C) {
txn.SetChaos(txn.Chaos{
SlowdownChance: 0.3,