Skip to content

Commit

Permalink
GODRIVER-2821 Extend write structs with Acknowledged
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez committed May 22, 2024
1 parent 5296166 commit f88f3d4
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 47 deletions.
11 changes: 4 additions & 7 deletions internal/integration/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package integration

import (
"context"
"errors"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1698,12 +1697,10 @@ func TestCollection(t *testing.T) {
models := []mongo.WriteModel{
mongo.NewInsertOneModel().SetDocument(bson.D{{"x", 1}}),
}
_, err := mt.Coll.BulkWrite(context.Background(), models)
if !errors.Is(err, mongo.ErrUnacknowledgedWrite) {
// Use a direct comparison rather than assert.Equal because assert.Equal will compare the error strings,
// so the assertion would succeed even if the error had not been wrapped.
mt.Fatalf("expected BulkWrite error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
}

res, err := mt.Coll.BulkWrite(context.Background(), models)
assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})
mt.RunOpts("insert and delete with batches", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
// grouped together because delete requires the documents to be inserted
Expand Down
14 changes: 0 additions & 14 deletions internal/integration/index_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package integration

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -263,19 +262,6 @@ func TestIndexView(t *testing.T) {
})
}
})
unackClientOpts := options.Client().
SetWriteConcern(writeconcern.Unacknowledged())
unackMtOpts := mtest.NewOptions().
ClientOptions(unackClientOpts).
MinServerVersion("3.6")
mt.RunOpts("unacknowledged write", unackMtOpts, func(mt *mtest.T) {
_, err := mt.Coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{Keys: bson.D{{"x", 1}}})
if !errors.Is(err, mongo.ErrUnacknowledgedWrite) {
// Use a direct comparison rather than assert.Equal because assert.Equal will compare the error strings,
// so the assertion would succeed even if the error had not been wrapped.
mt.Fatalf("expected CreateOne error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
}
})
// Needs to run on these versions for failpoints
mt.RunOpts("replace error", mtest.NewOptions().Topologies(mtest.ReplicaSet).MinServerVersion("4.0"), func(mt *mtest.T) {
mt.SetFailPoint(mtest.FailPoint{
Expand Down
9 changes: 6 additions & 3 deletions internal/integration/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,16 @@ func TestSessions(t *testing.T) {
assert.Nil(mt, err, "StartSession error: %v", err)
defer sess.EndSession(context.Background())

var res *mongo.InsertOneResult

err = mongo.WithSession(context.Background(), sess, func(sc context.Context) error {
_, err := mt.Coll.InsertOne(sc, bson.D{{"x", 1}})
res, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}})

return err
})

assert.Equal(mt, err, mongo.ErrUnacknowledgedWrite,
"expected ErrUnacknowledgedWrite on unacknowledged write in session, got %v", err)
assert.NoError(mt, err)
assert.False(mt, res.Acknowledged)
})

// Regression test for GODRIVER-2533. Note that this test assumes the race
Expand Down
10 changes: 7 additions & 3 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,14 @@ func (bw *bulkWrite) execute(ctx context.Context) error {
}

bw.result.MatchedCount -= bw.result.UpsertedCount
if lastErr != nil {
_, lastErr = processWriteError(lastErr)
return lastErr

rr, err := processWriteError(lastErr)
if err != nil {
return err
}

bw.result.Acknowledged = rr.isAcknowledged()

if len(bwErr.WriteErrors) > 0 || bwErr.WriteConcernError != nil {
return bwErr
}
Expand Down
30 changes: 21 additions & 9 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,14 @@ func (coll *Collection) InsertOne(ctx context.Context, document interface{},
res, err := coll.insert(ctx, []interface{}{document}, imOpts)

rr, err := processWriteError(err)
if rr&rrOne == 0 {
if rr&rrOne == 0 && rr.isAcknowledged() {
return nil, err
}
return &InsertOneResult{InsertedID: res[0]}, err

return &InsertOneResult{
InsertedID: res[0],
Acknowledged: rr.isAcknowledged(),
}, err
}

// InsertMany executes an insert command to insert multiple documents into the collection. If write errors occur
Expand Down Expand Up @@ -461,7 +465,10 @@ func (coll *Collection) InsertMany(ctx context.Context, documents interface{},
return nil, err
}

imResult := &InsertManyResult{InsertedIDs: result}
imResult := &InsertManyResult{
InsertedIDs: result,
Acknowledged: rr.isAcknowledged(),
}
var writeException WriteException
if !errors.As(err, &writeException) {
return imResult, err
Expand Down Expand Up @@ -593,7 +600,10 @@ func (coll *Collection) delete(ctx context.Context, filter interface{}, deleteOn
if rr&expectedRr == 0 {
return nil, err
}
return &DeleteResult{DeletedCount: op.Result().N}, err
return &DeleteResult{
DeletedCount: op.Result().N,
Acknowledged: rr.isAcknowledged(),
}, err
}

// DeleteOne executes a delete command to delete at most one document from the collection.
Expand Down Expand Up @@ -744,6 +754,7 @@ func (coll *Collection) updateOrReplace(ctx context.Context, filter bsoncore.Doc
MatchedCount: opRes.N,
ModifiedCount: opRes.NModified,
UpsertedCount: int64(len(opRes.Upserted)),
Acknowledged: rr.isAcknowledged(),
}
if len(opRes.Upserted) > 0 {
res.UpsertedID = opRes.Upserted[0].ID
Expand Down Expand Up @@ -1732,16 +1743,17 @@ func (coll *Collection) findAndModify(ctx context.Context, op *operation.FindAnd
Retry(retry).
Crypt(coll.client.cryptFLE)

_, err = processWriteError(op.Execute(ctx))
rr, err := processWriteError(op.Execute(ctx))
if err != nil {
return &SingleResult{err: err}
}

return &SingleResult{
ctx: ctx,
rdr: bson.Raw(op.Result().Value),
bsonOpts: coll.bsonOpts,
reg: coll.registry,
ctx: ctx,
rdr: bson.Raw(op.Result().Value),
bsonOpts: coll.bsonOpts,
reg: coll.registry,
Acknowledged: rr.isAcknowledged(),
}
}

Expand Down
13 changes: 7 additions & 6 deletions mongo/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,14 @@ func (db *Database) RunCommand(ctx context.Context, runCommand interface{}, opts

err = op.Execute(ctx)
// RunCommand can be used to run a write, thus execute may return a write error
_, convErr := processWriteError(err)
rr, convErr := processWriteError(err)
return &SingleResult{
ctx: ctx,
err: convErr,
rdr: bson.Raw(op.Result()),
bsonOpts: db.bsonOpts,
reg: db.registry,
ctx: ctx,
err: convErr,
rdr: bson.Raw(op.Result()),
bsonOpts: db.bsonOpts,
reg: db.registry,
Acknowledged: rr.isAcknowledged(),
}
}

Expand Down
7 changes: 6 additions & 1 deletion mongo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,10 +617,15 @@ const (
rrNone returnResult = 1 << iota // None means do not return the result ever.
rrOne // One means return the result if this was called by a *One method.
rrMany // Many means return the result is this was called by a *Many method.
rrUnacknowledgedWrite

rrAll returnResult = rrOne | rrMany // All means always return the result.
)

func (rr returnResult) isAcknowledged() bool {
return rr != rrUnacknowledgedWrite
}

// processWriteError handles processing the result of a write operation. If the retrunResult matches
// the calling method's type, it should return the result object in addition to the error.
// This function will wrap the errors from other packages and return them as errors from this package.
Expand All @@ -629,7 +634,7 @@ const (
func processWriteError(err error) (returnResult, error) {
switch {
case errors.Is(err, driver.ErrUnacknowledgedWrite):
return rrAll, ErrUnacknowledgedWrite
return rrUnacknowledgedWrite, nil
case err != nil:
switch tt := err.(type) {
case driver.WriteCommandError:
Expand Down
17 changes: 13 additions & 4 deletions mongo/index_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ func (iv IndexView) ListSpecifications(ctx context.Context, opts ...*options.Lis

// CreateOne executes a createIndexes command to create an index on the collection and returns the name of the new
// index. See the IndexView.CreateMany documentation for more information and an example.
func (iv IndexView) CreateOne(ctx context.Context, model IndexModel, opts ...*options.CreateIndexesOptions) (string, error) {
func (iv IndexView) CreateOne(
ctx context.Context,
model IndexModel,
opts ...*options.CreateIndexesOptions,
) (string, error) {
names, err := iv.CreateMany(ctx, []IndexModel{model}, opts...)
if err != nil {
return "", err
Expand All @@ -183,7 +187,11 @@ func (iv IndexView) CreateOne(ctx context.Context, model IndexModel, opts ...*op
// documentation).
//
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/createIndexes/.
func (iv IndexView) CreateMany(ctx context.Context, models []IndexModel, opts ...*options.CreateIndexesOptions) ([]string, error) {
func (iv IndexView) CreateMany(
ctx context.Context,
models []IndexModel,
opts ...*options.CreateIndexesOptions,
) ([]string, error) {
names := make([]string, 0, len(models))

var indexes bsoncore.Document
Expand Down Expand Up @@ -286,9 +294,10 @@ func (iv IndexView) CreateMany(ctx context.Context, models []IndexModel, opts ..
op.CommitQuorum(commitQuorum)
}

err = op.Execute(ctx)
// Checking for unacknowledged writes when creating indexes appear unnecessary
// for DDL operations.
_, err = processWriteError(op.Execute(ctx))
if err != nil {
_, err = processWriteError(err)
return nil, err
}

Expand Down
20 changes: 20 additions & 0 deletions mongo/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,41 @@ type BulkWriteResult struct {

// A map of operation index to the _id of each upserted document.
UpsertedIDs map[int64]interface{}

// Operation performed with an acknowledged write. Values for other fields may
// not be deterministic if the write operation was unacknowledged.
Acknowledged bool
}

// InsertOneResult is the result type returned by an InsertOne operation.
type InsertOneResult struct {
// The _id of the inserted document. A value generated by the driver will be of type bson.ObjectID.
InsertedID interface{}

// Operation performed with an acknowledged write. Values for other fields may
// not be deterministic if the write operation was unacknowledged.
Acknowledged bool
}

// InsertManyResult is a result type returned by an InsertMany operation.
type InsertManyResult struct {
// The _id values of the inserted documents. Values generated by the driver will be of type bson.ObjectID.
InsertedIDs []interface{}

// Operation performed with an acknowledged write. Values for other fields may
// not be deterministic if the write operation was unacknowledged.
Acknowledged bool
}

// TODO(GODRIVER-2367): Remove the BSON struct tags on DeleteResult.

// DeleteResult is the result type returned by DeleteOne and DeleteMany operations.
type DeleteResult struct {
DeletedCount int64 // The number of documents deleted.

// Operation performed with an acknowledged write. Values for other fields may
// not be deterministic if the write operation was unacknowledged.
Acknowledged bool
}

// RewrapManyDataKeyResult is the result of the bulk write operation used to update the key vault collection with
Expand Down Expand Up @@ -93,6 +109,10 @@ type UpdateResult struct {
ModifiedCount int64 // The number of documents modified by the operation.
UpsertedCount int64 // The number of documents upserted by the operation.
UpsertedID interface{} // The _id field of the upserted document, or nil if no upsert was done.

// Operation performed with an acknowledged write. Values for other fields may
// not be deterministic if the write operation was unacknowledged.
Acknowledged bool
}

// IndexSpecification represents an index in a database. This type is returned by the IndexView.ListSpecifications
Expand Down
5 changes: 5 additions & 0 deletions mongo/single_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type SingleResult struct {
rdr bson.Raw
bsonOpts *options.BSONOptions
reg *bson.Registry

// Operation performed with an acknowledged write. Values returned by
// SingleResult methods may not be deterministic if the write operation was
// unacknowledged and so should not be relied upon.
Acknowledged bool
}

// NewSingleResultFromDocument creates a SingleResult with the provided error, registry, and an underlying Cursor pre-loaded with
Expand Down

0 comments on commit f88f3d4

Please sign in to comment.