Skip to content

Commit

Permalink
⚡ mongo: disables using the mongo transactional api.
Browse files Browse the repository at this point in the history
If attempting to use the transactions api mongo <4.0 mongo returns a BSON serialization error.
For now, since sessions now work properly, default to creating and using a unique session. Sessions in the mongo driver default to casual consistency, which should provide the required atomicity.

Refer: https://jira.mongodb.org/projects/GODRIVER/issues/GODRIVER-1732
  • Loading branch information
matthewhartstonge committed Aug 29, 2020
1 parent e415e71 commit b03b61d
Showing 1 changed file with 32 additions and 26 deletions.
58 changes: 32 additions & 26 deletions mongo/transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import (
"github.com/ory/fosite"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
)

func (s *Store) BeginTX(ctx context.Context) (context context.Context, err error) {
Expand All @@ -25,24 +21,32 @@ func (s *Store) BeginTX(ctx context.Context) (context context.Context, err error
return ctx, err
}

// Define mongo transaction options...
opts := options.Transaction().
SetReadPreference(readpref.Primary()).
SetReadConcern(readconcern.Majority()).
SetWriteConcern(writeconcern.New(writeconcern.WMajority())).
SetMaxCommitTime(&s.timeout)
// TODO: reimplement when we can detect running mongodb version.
// If attempting to use the transactions api mongo <4.0 mongo returns a
// BSON serialization error.
// For now, default to creating and using a unique session - sessions in
// the mongo driver default to casual consistency, which should provide the
// required atomicity.
// Refer: https://jira.mongodb.org/projects/GODRIVER/issues/GODRIVER-1732

err = session.StartTransaction(opts)
if err != nil {
_ = session.AbortTransaction(ctx)
return ctx, err
}
// // Define mongo transaction options...
// opts := options.Transaction().
// SetReadPreference(readpref.Primary()).
// SetReadConcern(readconcern.Majority()).
// SetWriteConcern(writeconcern.New(writeconcern.WMajority())).
// SetMaxCommitTime(&s.timeout)
//
// err = session.StartTransaction(opts)
// if err != nil {
// _ = session.AbortTransaction(ctx)
// return ctx, err
// }

return TransactionToContext(ctx, session), nil
return SessionToContext(ctx, session), nil
}

func (s *Store) Commit(ctx context.Context) error {
txn, ok := ContextToTransaction(ctx)
txn, ok := ContextToSession(ctx)
if !ok {
return errors.Wrap(
fosite.ErrServerError,
Expand All @@ -51,16 +55,17 @@ func (s *Store) Commit(ctx context.Context) error {
}
defer txn.EndSession(ctx)

err := txn.CommitTransaction(ctx)
if err != nil {
return errors.Wrap(fosite.ErrSerializationFailure, err.Error())
}
// TODO: reimplement when we can detect running mongodb version.
// err := txn.CommitTransaction(ctx)
// if err != nil {
// return errors.Wrap(fosite.ErrSerializationFailure, err.Error())
// }

return nil
}

func (s *Store) Rollback(ctx context.Context) error {
txn, ok := ContextToTransaction(ctx)
txn, ok := ContextToSession(ctx)
if !ok {
return errors.Wrap(
fosite.ErrServerError,
Expand All @@ -69,10 +74,11 @@ func (s *Store) Rollback(ctx context.Context) error {
}
defer txn.EndSession(ctx)

err := txn.AbortTransaction(ctx)
if err != nil {
return errors.Wrap(fosite.ErrSerializationFailure, err.Error())
}
// TODO: reimplement when we can detect running mongodb version.
// err := txn.AbortTransaction(ctx)
// if err != nil {
// return errors.Wrap(fosite.ErrSerializationFailure, err.Error())
// }

return nil
}

0 comments on commit b03b61d

Please sign in to comment.