Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ func (s *PruneSuite) maybePrune(c *gc.C, pruneFactor float32) {
}

func (s *PruneSuite) maybePruneWithTimestamp(c *gc.C, pruneFactor float32, timestamp time.Time) {
r := jujutxn.NewRunner(jujutxn.RunnerParams{
r, err := jujutxn.NewRunner(jujutxn.RunnerParams{
Database: s.db,
TransactionCollectionName: s.txns.Name,
ChangeLogName: s.txns.Name + ".log",
Clock: testclock.NewClock(time.Now()),
})
err := r.MaybePruneTransactions(jujutxn.PruneOptions{
c.Assert(err, jc.ErrorIsNil)
err = r.MaybePruneTransactions(jujutxn.PruneOptions{
PruneFactor: pruneFactor,
MinNewTransactions: 1,
MaxNewTransactions: 1000,
Expand Down
21 changes: 12 additions & 9 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,15 @@ type RunnerParams struct {
// NewRunner returns a Runner which runs transactions for the database specified in params.
// Collection names used to manage the transactions and change log may also be specified in
// params, but if not, default values will be used.
func NewRunner(params RunnerParams) Runner {
func NewRunner(params RunnerParams) (Runner, error) {
sstxn := params.ServerSideTransactions
if sstxn {
sstxn = SupportsServerSideTransactions(params.Database)
if !sstxn {
logger.Warningf("server-side transactions requested, but database does not support them")
supported, err := SupportsServerSideTransactions(params.Database)
if err != nil {
return nil, errors.Trace(err)
}
if !supported {
return nil, errors.New("server-side transactions requested, but mongod does not support them")
}
}
txnRunner := &transactionRunner{
Expand Down Expand Up @@ -284,7 +287,7 @@ func NewRunner(params RunnerParams) Runner {
// they also specify a RunTransactionObserver.
txnRunner.clock = clock.WallClock
}
return txnRunner
return txnRunner, nil
}

func (tr *transactionRunner) newRunnerImpl() txnRunner {
Expand Down Expand Up @@ -461,13 +464,13 @@ func TestHooks(runner Runner) chan ([]TestHook) {

// SupportsServerSideTransactions lets you know if the given database can support
// server-side transactions.
func SupportsServerSideTransactions(db *mgo.Database) bool {
func SupportsServerSideTransactions(db *mgo.Database) (bool, error) {
info, err := db.Session.BuildInfo()
if err != nil {
return false
return false, errors.Annotate(err, "checking mongo version for sstxn support")
}
if len(info.VersionArray) < 1 || info.VersionArray[0] < 4 {
return false
return false, nil
}
return true
return true, nil
}
51 changes: 33 additions & 18 deletions txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ func (s *txnSuite) SetUpTest(c *gc.C) {
txnsLog := db.C("txns.log")
txnsLog.Create(&mgo.CollectionInfo{})
s.backoffs = nil
s.txnRunner = jujutxn.NewRunner(jujutxn.RunnerParams{
var err error
s.txnRunner, err = jujutxn.NewRunner(jujutxn.RunnerParams{
Database: db,
ChangeLogName: "txns.log",
ServerSideTransactions: false,
PauseFunc: func(dur time.Duration) {
s.backoffs = append(s.backoffs, dur)
},
})
c.Assert(err, jc.ErrorIsNil)
s.supportsSST = false
}

Expand Down Expand Up @@ -99,7 +101,9 @@ func (s *sstxnSuite) CheckSSTXNSupported(c *gc.C) {
c.Assert(err, gc.IsNil)
defer session.Close()
db := session.DB(info.Database)
if !jujutxn.SupportsServerSideTransactions(db) {
supported, err := jujutxn.SupportsServerSideTransactions(db)
c.Assert(err, gc.IsNil)
if !supported {
c.Skip(fmt.Sprintf("mongo version doesn't support server-side-transactions"))
}
}
Expand All @@ -115,26 +119,30 @@ func (s *sstxnSuite) TearDownSuite(c *gc.C) {

func (s *sstxnSuite) SetUpTest(c *gc.C) {
s.txnSuite.SetUpTest(c)
s.txnRunner = jujutxn.NewRunner(jujutxn.RunnerParams{
var err error
s.txnRunner, err = jujutxn.NewRunner(jujutxn.RunnerParams{
Database: s.collection.Database,
ChangeLogName: "txns.log",
ServerSideTransactions: true,
PauseFunc: func(dur time.Duration) {
s.backoffs = append(s.backoffs, dur)
},
})
c.Assert(err, jc.ErrorIsNil)
s.supportsSST = true
}

func (s *sstxnSuite) TestNoChangeLog(c *gc.C) {
s.txnRunner = jujutxn.NewRunner(jujutxn.RunnerParams{
var err error
s.txnRunner, err = jujutxn.NewRunner(jujutxn.RunnerParams{
Database: s.collection.Database,
ChangeLogName: "-",
ServerSideTransactions: true,
PauseFunc: func(dur time.Duration) {
s.backoffs = append(s.backoffs, dur)
},
})
c.Assert(err, jc.ErrorIsNil)

before, err := s.collection.Database.C("txns.log").Count()
c.Assert(err, gc.IsNil)
Expand Down Expand Up @@ -269,14 +277,15 @@ func (s *sstxnSuite) TestStartedHooks(c *gc.C) {

secondSession := s.collection.Database.Session.Copy()
defer secondSession.Close()
secondRunner := jujutxn.NewRunner(jujutxn.RunnerParams{
secondRunner, err := jujutxn.NewRunner(jujutxn.RunnerParams{
Database: secondSession.DB("juju"),
ChangeLogName: "txns.log",
ServerSideTransactions: true,
PauseFunc: func(dur time.Duration) {
s.backoffs = append(s.backoffs, dur)
},
})
c.Assert(err, jc.ErrorIsNil)

setDocName := func(id, name string) {
ops := []txn.Op{{
Expand Down Expand Up @@ -308,7 +317,7 @@ func (s *sstxnSuite) TestStartedHooks(c *gc.C) {
}}
return ops, nil
}
err := s.txnRunner.Run(buildTxn)
err = s.txnRunner.Run(buildTxn)
c.Assert(err, gc.IsNil)
var found simpleDoc
err = s.collection.FindId("1").One(&found)
Expand All @@ -323,14 +332,15 @@ func (s *sstxnSuite) TestAssertedHooks(c *gc.C) {

secondSession := s.collection.Database.Session.Copy()
defer secondSession.Close()
secondRunner := jujutxn.NewRunner(jujutxn.RunnerParams{
secondRunner, err := jujutxn.NewRunner(jujutxn.RunnerParams{
Database: secondSession.DB("juju"),
ChangeLogName: "txns.log",
ServerSideTransactions: true,
PauseFunc: func(dur time.Duration) {
s.backoffs = append(s.backoffs, dur)
},
})
c.Assert(err, jc.ErrorIsNil)

setDocName := func(id, name string) {
ops := []txn.Op{{
Expand Down Expand Up @@ -366,7 +376,7 @@ func (s *sstxnSuite) TestAssertedHooks(c *gc.C) {
}}
return ops, nil
}
err := s.txnRunner.Run(buildTxn)
err = s.txnRunner.Run(buildTxn)
c.Assert(err, gc.IsNil)
c.Assert(asserted, jc.IsTrue)
var found simpleDoc
Expand Down Expand Up @@ -571,7 +581,8 @@ func (s *txnSuite) TestTransientFailure(c *gc.C) {
}

func (s *txnSuite) TestRunFailureIntermittentUnexpectedMessage(c *gc.C) {
runner := jujutxn.NewRunner(jujutxn.RunnerParams{})
runner, err := jujutxn.NewRunner(jujutxn.RunnerParams{})
c.Assert(err, jc.ErrorIsNil)
fake := &fakeRunner{errors: []error{errors.New("unexpected message")}}
jujutxn.SetRunnerFunc(runner, fake.new)
tries := 0
Expand All @@ -581,13 +592,14 @@ func (s *txnSuite) TestRunFailureIntermittentUnexpectedMessage(c *gc.C) {
// return 1 op that happens to do nothing
return []txn.Op{{}}, nil
}
err := runner.Run(buildTxn)
err = runner.Run(buildTxn)
c.Check(err, gc.Equals, nil)
c.Check(tries, gc.Equals, 2)
}

func (s *txnSuite) TestRunFailureAlwaysUnexpectedMessage(c *gc.C) {
runner := jujutxn.NewRunner(jujutxn.RunnerParams{})
runner, err := jujutxn.NewRunner(jujutxn.RunnerParams{})
c.Assert(err, jc.ErrorIsNil)
fake := &fakeRunner{errors: []error{
errors.New("unexpected message"),
errors.New("unexpected message"),
Expand All @@ -602,13 +614,14 @@ func (s *txnSuite) TestRunFailureAlwaysUnexpectedMessage(c *gc.C) {
// return 1 op that happens to do nothing
return []txn.Op{{}}, nil
}
err := runner.Run(buildTxn)
err = runner.Run(buildTxn)
c.Check(err, gc.ErrorMatches, "unexpected message")
c.Check(tries, gc.Equals, 3)
}

func (s *txnSuite) TestRunFailureIOTimeout(c *gc.C) {
runner := jujutxn.NewRunner(jujutxn.RunnerParams{})
runner, err := jujutxn.NewRunner(jujutxn.RunnerParams{})
c.Assert(err, jc.ErrorIsNil)
fake := &fakeRunner{errors: []error{errors.New("i/o timeout")}}
jujutxn.SetRunnerFunc(runner, fake.new)
tries := 0
Expand All @@ -618,13 +631,14 @@ func (s *txnSuite) TestRunFailureIOTimeout(c *gc.C) {
// return 1 op that happens to do nothing
return []txn.Op{{}}, nil
}
err := runner.Run(buildTxn)
err = runner.Run(buildTxn)
c.Check(err, gc.Equals, nil)
c.Check(tries, gc.Equals, 2)
}

func (s *txnSuite) TestRunFailureAlwaysIOTimeout(c *gc.C) {
runner := jujutxn.NewRunner(jujutxn.RunnerParams{})
runner, err := jujutxn.NewRunner(jujutxn.RunnerParams{})
c.Assert(err, jc.ErrorIsNil)
fake := &fakeRunner{errors: []error{
errors.New("i/o timeout"),
errors.New("i/o timeout"),
Expand All @@ -639,20 +653,21 @@ func (s *txnSuite) TestRunFailureAlwaysIOTimeout(c *gc.C) {
// return 1 op that happens to do nothing
return []txn.Op{{}}, nil
}
err := runner.Run(buildTxn)
err = runner.Run(buildTxn)
c.Check(err, gc.ErrorMatches, "i/o timeout")
c.Check(tries, gc.Equals, 3)
}

func (s *txnSuite) TestRunTransactionObserver(c *gc.C) {
var calls []jujutxn.Transaction
clock := testclock.NewClock(time.Now())
runner := jujutxn.NewRunner(jujutxn.RunnerParams{
runner, err := jujutxn.NewRunner(jujutxn.RunnerParams{
RunTransactionObserver: func(txn jujutxn.Transaction) {
calls = append(calls, txn)
},
Clock: clock,
})
c.Assert(err, jc.ErrorIsNil)
fake := &fakeRunner{
errors: []error{
txn.ErrAborted,
Expand All @@ -674,7 +689,7 @@ func (s *txnSuite) TestRunTransactionObserver(c *gc.C) {
buildTxn := func(attempt int) ([]txn.Op, error) {
return ops, nil
}
err := runner.Run(buildTxn)
err = runner.Run(buildTxn)
c.Check(err, gc.IsNil)
c.Check(calls, gc.HasLen, 2)
c.Check(calls[0].Ops, gc.DeepEquals, ops)
Expand Down