Skip to content

Commit

Permalink
Merge pull request globalsign#10 from globalsign/jameinel-txn-id-caching
Browse files Browse the repository at this point in the history
Cache TXN ID
  • Loading branch information
domodwyer committed Jun 15, 2017
2 parents 6bfcb19 + 20f026c commit bf84b71
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 10 deletions.
47 changes: 37 additions & 10 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func flush(r *Runner, t *transaction) error {
Runner: r,
goal: t,
goalKeys: make(map[docKey]bool),
queue: make(map[docKey][]token),
queue: make(map[docKey][]tokenAndId),
debugId: debugPrefix(),
}
for _, dkey := range f.goal.docKeys() {
Expand All @@ -26,10 +26,36 @@ type flusher struct {
*Runner
goal *transaction
goalKeys map[docKey]bool
queue map[docKey][]token
queue map[docKey][]tokenAndId
debugId string
}

type tokenAndId struct {
tt token
bid bson.ObjectId
}

func (ti tokenAndId) id() bson.ObjectId {
return ti.bid
}

func (ti tokenAndId) nonce() string {
return ti.tt.nonce()
}

func (ti tokenAndId) String() string {
return string(ti.tt)
}

func tokensWithIds(q []token) []tokenAndId {
out := make([]tokenAndId, len(q))
for i, tt := range q {
out[i].tt = tt
out[i].bid = tt.id()
}
return out
}

func (f *flusher) run() (err error) {
if chaosEnabled {
defer f.handleChaos(&err)
Expand Down Expand Up @@ -248,7 +274,7 @@ NextDoc:
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
f.queue[dkey] = tokensWithIds(info.Queue)
f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
continue NextDoc
} else {
Expand Down Expand Up @@ -310,7 +336,7 @@ NextDoc:
f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
}
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
f.queue[dkey] = tokensWithIds(info.Queue)
continue NextDoc
}
}
Expand Down Expand Up @@ -452,7 +478,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error)
break
}
}
f.queue[dkey] = info.Queue
f.queue[dkey] = tokensWithIds(info.Queue)
if !found {
// Rescanned transaction id was not in the queue. This could mean one
// of three things:
Expand Down Expand Up @@ -516,12 +542,13 @@ func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 {

func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) {
found = true
ttId := tt.id()
NextDoc:
for _, dkey := range dkeys {
for _, dtt := range f.queue[dkey] {
if dtt == tt {
if dtt.tt == tt {
continue NextDoc
} else if dtt.id() != tt.id() {
} else if dtt.id() != ttId {
prereqs = true
}
}
Expand Down Expand Up @@ -909,17 +936,17 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err
return nil
}

func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token {
func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) []token {
var result []token
for j := len(dqueue) - 1; j >= 0; j-- {
dtt := dqueue[j]
if dtt == dontPull {
if dtt.tt == dontPull {
continue
}
if _, ok := pull[dtt.id()]; ok {
// It was handled before and this is a leftover invalid
// nonce in the queue. Cherry-pick it out.
result = append(result, dtt)
result = append(result, dtt.tt)
}
}
return result
Expand Down
116 changes: 116 additions & 0 deletions txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
}

var flaky = flag.Bool("flaky", false, "Include flaky tests")
var txnQueueLength = flag.Int("qlength", 100, "txn-queue length for tests")


func (s *S) TestTxnQueueStressTest(c *C) {
// This fails about 20% of the time on Mongo 3.2 (I haven't tried
Expand Down Expand Up @@ -776,3 +778,117 @@ func (s *S) TestTxnQueueStressTest(c *C) {
}
}
}

type txnQueue struct {
Queue []string `bson:"txn-queue"`
}

func (s *S) TestTxnQueueAssertionGrowth(c *C) {
txn.SetDebug(false) // too much spam
err := s.accounts.Insert(M{"_id": 0, "balance": 0})
c.Assert(err, IsNil)
// Create many assertion only transactions.
t := time.Now()
ops := []txn.Op{{
C: "accounts",
Id: 0,
Assert: M{"balance": 0},
}}
for n := 0; n < *txnQueueLength; n++ {
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
}
var qdoc txnQueue
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, *txnQueueLength)
c.Logf("%8.3fs to set up %d assertions", time.Since(t).Seconds(), *txnQueueLength)
t = time.Now()
txn.SetChaos(txn.Chaos{})
ops = []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
err = s.runner.Run(ops, "", nil)
c.Logf("%8.3fs to clear N=%d assertions and add one more txn",
time.Since(t).Seconds(), *txnQueueLength)
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, 1)
}

func (s *S) TestTxnQueueBrokenPrepared(c *C) {
txn.SetDebug(false) // too much spam
badTxnToken := "123456789012345678901234_deadbeef"
err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{badTxnToken}})
c.Assert(err, IsNil)
t := time.Now()
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$set": M{"balance": 0}},
}}
errString := `cannot find transaction ObjectIdHex("123456789012345678901234")`
for n := 0; n < *txnQueueLength; n++ {
err = s.runner.Run(ops, "", nil)
c.Assert(err.Error(), Equals, errString)
}
var qdoc txnQueue
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, *txnQueueLength+1)
c.Logf("%8.3fs to set up %d 'prepared' txns", time.Since(t).Seconds(), *txnQueueLength)
t = time.Now()
s.accounts.UpdateId(0, bson.M{"$pullAll": bson.M{"txn-queue": []string{badTxnToken}}})
ops = []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
err = s.runner.ResumeAll()
c.Assert(err, IsNil)
c.Logf("%8.3fs to ResumeAll N=%d 'prepared' txns",
time.Since(t).Seconds(), *txnQueueLength)
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, 1)
}

func (s *S) TestTxnQueuePreparing(c *C) {
txn.SetDebug(false) // too much spam
err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{}})
c.Assert(err, IsNil)
t := time.Now()
txn.SetChaos(txn.Chaos{
KillChance: 1.0,
Breakpoint: "set-prepared",
})
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$set": M{"balance": 0}},
}}
for n := 0; n < *txnQueueLength; n++ {
err = s.runner.Run(ops, "", nil)
c.Assert(err, Equals, txn.ErrChaos)
}
var qdoc txnQueue
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, *txnQueueLength)
c.Logf("%8.3fs to set up %d 'preparing' txns", time.Since(t).Seconds(), *txnQueueLength)
txn.SetChaos(txn.Chaos{})
t = time.Now()
err = s.runner.ResumeAll()
c.Logf("%8.3fs to ResumeAll N=%d 'preparing' txns",
time.Since(t).Seconds(), *txnQueueLength)
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
expectedCount := 100
if *txnQueueLength <= expectedCount {
expectedCount = *txnQueueLength - 1
}
c.Check(len(qdoc.Queue), Equals, expectedCount)
}

0 comments on commit bf84b71

Please sign in to comment.