diff --git a/txn/flusher.go b/txn/flusher.go index b1ead31d9..63b03e51c 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -13,7 +13,7 @@ func flush(r *Runner, t *transaction) error { Runner: r, goal: t, goalKeys: make(map[docKey]bool), - queue: make(map[docKey][]token), + queue: make(map[docKey][]tokenAndId), debugId: debugPrefix(), } for _, dkey := range f.goal.docKeys() { @@ -26,10 +26,36 @@ type flusher struct { *Runner goal *transaction goalKeys map[docKey]bool - queue map[docKey][]token + queue map[docKey][]tokenAndId debugId string } +type tokenAndId struct { + tt token + bid bson.ObjectId +} + +func (ti tokenAndId) id() bson.ObjectId { + return ti.bid +} + +func (ti tokenAndId) nonce() string { + return ti.tt.nonce() +} + +func (ti tokenAndId) String() string { + return string(ti.tt) +} + +func tokensWithIds(q []token) []tokenAndId { + out := make([]tokenAndId, len(q)) + for i, tt := range q { + out[i].tt = tt + out[i].bid = tt.id() + } + return out +} + func (f *flusher) run() (err error) { if chaosEnabled { defer f.handleChaos(&err) @@ -248,7 +274,7 @@ NextDoc: if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) continue NextDoc } else { @@ -310,7 +336,7 @@ NextDoc: f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) } revno[dkey] = info.Revno - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) continue NextDoc } } @@ -452,7 +478,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) break } } - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) if !found { // Rescanned transaction id was not in the queue. This could mean one // of three things: @@ -516,12 +542,13 @@ func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 { func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) { found = true + ttId := tt.id() NextDoc: for _, dkey := range dkeys { for _, dtt := range f.queue[dkey] { - if dtt == tt { + if dtt.tt == tt { continue NextDoc - } else if dtt.id() != tt.id() { + } else if dtt.id() != ttId { prereqs = true } } @@ -909,17 +936,17 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err return nil } -func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token { +func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) []token { var result []token for j := len(dqueue) - 1; j >= 0; j-- { dtt := dqueue[j] - if dtt == dontPull { + if dtt.tt == dontPull { continue } if _, ok := pull[dtt.id()]; ok { // It was handled before and this is a leftover invalid // nonce in the queue. Cherry-pick it out. - result = append(result, dtt) + result = append(result, dtt.tt) } } return result diff --git a/txn/txn_test.go b/txn/txn_test.go index ce9d138e2..291537802 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -703,6 +703,8 @@ func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) { } var flaky = flag.Bool("flaky", false, "Include flaky tests") +var txnQueueLength = flag.Int("qlength", 100, "txn-queue length for tests") + func (s *S) TestTxnQueueStressTest(c *C) { // This fails about 20% of the time on Mongo 3.2 (I haven't tried @@ -776,3 +778,117 @@ func (s *S) TestTxnQueueStressTest(c *C) { } } } + +type txnQueue struct { + Queue []string `bson:"txn-queue"` +} + +func (s *S) TestTxnQueueAssertionGrowth(c *C) { + txn.SetDebug(false) // too much spam + err := s.accounts.Insert(M{"_id": 0, "balance": 0}) + c.Assert(err, IsNil) + // Create many assertion only transactions. + t := time.Now() + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Assert: M{"balance": 0}, + }} + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err, IsNil) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength) + c.Logf("%8.3fs to set up %d assertions", time.Since(t).Seconds(), *txnQueueLength) + t = time.Now() + txn.SetChaos(txn.Chaos{}) + ops = []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + err = s.runner.Run(ops, "", nil) + c.Logf("%8.3fs to clear N=%d assertions and add one more txn", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, 1) +} + +func (s *S) TestTxnQueueBrokenPrepared(c *C) { + txn.SetDebug(false) // too much spam + badTxnToken := "123456789012345678901234_deadbeef" + err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{badTxnToken}}) + c.Assert(err, IsNil) + t := time.Now() + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$set": M{"balance": 0}}, + }} + errString := `cannot find transaction ObjectIdHex("123456789012345678901234")` + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err.Error(), Equals, errString) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength+1) + c.Logf("%8.3fs to set up %d 'prepared' txns", time.Since(t).Seconds(), *txnQueueLength) + t = time.Now() + s.accounts.UpdateId(0, bson.M{"$pullAll": bson.M{"txn-queue": []string{badTxnToken}}}) + ops = []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + err = s.runner.ResumeAll() + c.Assert(err, IsNil) + c.Logf("%8.3fs to ResumeAll N=%d 'prepared' txns", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, 1) +} + +func (s *S) TestTxnQueuePreparing(c *C) { + txn.SetDebug(false) // too much spam + err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{}}) + c.Assert(err, IsNil) + t := time.Now() + txn.SetChaos(txn.Chaos{ + KillChance: 1.0, + Breakpoint: "set-prepared", + }) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$set": M{"balance": 0}}, + }} + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength) + c.Logf("%8.3fs to set up %d 'preparing' txns", time.Since(t).Seconds(), *txnQueueLength) + txn.SetChaos(txn.Chaos{}) + t = time.Now() + err = s.runner.ResumeAll() + c.Logf("%8.3fs to ResumeAll N=%d 'preparing' txns", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + expectedCount := 100 + if *txnQueueLength <= expectedCount { + expectedCount = *txnQueueLength - 1 + } + c.Check(len(qdoc.Queue), Equals, expectedCount) +} +