diff --git a/etc/compile_check.sh b/etc/compile_check.sh index 1f17248695..f63fd62d85 100755 --- a/etc/compile_check.sh +++ b/etc/compile_check.sh @@ -12,7 +12,7 @@ function version { echo "$@" | awk -F. '{ printf("%d%03d%03d%03d\n", $1,$2,$3,$4); }'; } -# compile_check will attempt to build the the internal/test/compilecheck project +# compile_check will attempt to build the internal/test/compilecheck project # using the provided Go version. This is to simulate an end-to-end use case. # This check will only run on environments where the Go version is greater than # or equal to the given version. diff --git a/internal/docexamples/examples.go b/internal/docexamples/examples.go index df1ccae692..2f95bce65f 100644 --- a/internal/docexamples/examples.go +++ b/internal/docexamples/examples.go @@ -1758,8 +1758,10 @@ func UpdateEmployeeInfo(ctx context.Context, client *mongo.Client) error { employees := client.Database("hr").Collection("employees") events := client.Database("reporting").Collection("events") - return client.UseSession(ctx, func(sctx mongo.SessionContext) error { - err := sctx.StartTransaction(options.Transaction(). + return client.UseSession(ctx, func(ctx context.Context) error { + sess := mongo.SessionFromContext(ctx) + + err := sess.StartTransaction(options.Transaction(). SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.Majority()), ) @@ -1767,21 +1769,21 @@ func UpdateEmployeeInfo(ctx context.Context, client *mongo.Client) error { return err } - _, err = employees.UpdateOne(sctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}}) + _, err = employees.UpdateOne(ctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}}) if err != nil { - sctx.AbortTransaction(sctx) + sess.AbortTransaction(ctx) log.Println("caught exception during transaction, aborting.") return err } - _, err = events.InsertOne(sctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}}) + _, err = events.InsertOne(ctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}}) if err != nil { - sctx.AbortTransaction(sctx) + sess.AbortTransaction(ctx) log.Println("caught exception during transaction, aborting.") return err } for { - err = sctx.CommitTransaction(sctx) + err = sess.CommitTransaction(ctx) switch e := err.(type) { case nil: return nil @@ -1805,9 +1807,9 @@ func UpdateEmployeeInfo(ctx context.Context, client *mongo.Client) error { // Start Transactions Retry Example 1 // RunTransactionWithRetry is an example function demonstrating transaction retry logic. -func RunTransactionWithRetry(sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error { +func RunTransactionWithRetry(ctx context.Context, txnFn func(context.Context) error) error { for { - err := txnFn(sctx) // Performs transaction. + err := txnFn(ctx) // Performs transaction. if err == nil { return nil } @@ -1828,9 +1830,11 @@ func RunTransactionWithRetry(sctx mongo.SessionContext, txnFn func(mongo.Session // Start Transactions Retry Example 2 // CommitWithRetry is an example function demonstrating transaction commit with retry logic. -func CommitWithRetry(sctx mongo.SessionContext) error { +func CommitWithRetry(ctx context.Context) error { + sess := mongo.SessionFromContext(ctx) + for { - err := sctx.CommitTransaction(sctx) + err := sess.CommitTransaction(ctx) switch e := err.(type) { case nil: log.Println("Transaction committed.") @@ -1872,9 +1876,9 @@ func TransactionsExamples(ctx context.Context, client *mongo.Client) error { } // Start Transactions Retry Example 3 - runTransactionWithRetry := func(sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error { + runTransactionWithRetry := func(ctx context.Context, txnFn func(context.Context) error) error { for { - err := txnFn(sctx) // Performs transaction. + err := txnFn(ctx) // Performs transaction. if err == nil { return nil } @@ -1890,9 +1894,11 @@ func TransactionsExamples(ctx context.Context, client *mongo.Client) error { } } - commitWithRetry := func(sctx mongo.SessionContext) error { + commitWithRetry := func(ctx context.Context) error { + sess := mongo.SessionFromContext(ctx) + for { - err := sctx.CommitTransaction(sctx) + err := sess.CommitTransaction(ctx) switch e := err.(type) { case nil: log.Println("Transaction committed.") @@ -1913,11 +1919,13 @@ func TransactionsExamples(ctx context.Context, client *mongo.Client) error { } // Updates two collections in a transaction. - updateEmployeeInfo := func(sctx mongo.SessionContext) error { + updateEmployeeInfo := func(ctx context.Context) error { employees := client.Database("hr").Collection("employees") events := client.Database("reporting").Collection("events") - err := sctx.StartTransaction(options.Transaction(). + sess := mongo.SessionFromContext(ctx) + + err := sess.StartTransaction(options.Transaction(). SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.Majority()), ) @@ -1925,26 +1933,26 @@ func TransactionsExamples(ctx context.Context, client *mongo.Client) error { return err } - _, err = employees.UpdateOne(sctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}}) + _, err = employees.UpdateOne(ctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}}) if err != nil { - sctx.AbortTransaction(sctx) + sess.AbortTransaction(ctx) log.Println("caught exception during transaction, aborting.") return err } - _, err = events.InsertOne(sctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}}) + _, err = events.InsertOne(ctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}}) if err != nil { - sctx.AbortTransaction(sctx) + sess.AbortTransaction(ctx) log.Println("caught exception during transaction, aborting.") return err } - return commitWithRetry(sctx) + return commitWithRetry(ctx) } return client.UseSessionWithOptions( ctx, options.Session().SetDefaultReadPreference(readpref.Primary()), - func(sctx mongo.SessionContext) error { - return runTransactionWithRetry(sctx, updateEmployeeInfo) + func(ctx context.Context) error { + return runTransactionWithRetry(ctx, updateEmployeeInfo) }, ) } @@ -1976,13 +1984,13 @@ func WithTransactionExample(ctx context.Context) error { barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts) // Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction. - callback := func(sessCtx mongo.SessionContext) (interface{}, error) { - // Important: You must pass sessCtx as the Context parameter to the operations for them to be executed in the + callback := func(sesctx context.Context) (interface{}, error) { + // Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the // transaction. - if _, err := fooColl.InsertOne(sessCtx, bson.D{{"abc", 1}}); err != nil { + if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil { return nil, err } - if _, err := barColl.InsertOne(sessCtx, bson.D{{"xyz", 999}}); err != nil { + if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil { return nil, err } @@ -2560,15 +2568,15 @@ func CausalConsistencyExamples(client *mongo.Client) error { } defer session1.EndSession(context.TODO()) - err = client.UseSessionWithOptions(context.TODO(), opts, func(sctx mongo.SessionContext) error { + err = client.UseSessionWithOptions(context.TODO(), opts, func(ctx context.Context) error { // Run an update with our causally-consistent session - _, err = coll.UpdateOne(sctx, bson.D{{"sku", 111}}, bson.D{{"$set", bson.D{{"end", currentDate}}}}) + _, err = coll.UpdateOne(ctx, bson.D{{"sku", 111}}, bson.D{{"$set", bson.D{{"end", currentDate}}}}) if err != nil { return err } // Run an insert with our causally-consistent session - _, err = coll.InsertOne(sctx, bson.D{{"sku", "nuts-111"}, {"name", "Pecans"}, {"start", currentDate}}) + _, err = coll.InsertOne(ctx, bson.D{{"sku", "nuts-111"}, {"name", "Pecans"}, {"start", currentDate}}) if err != nil { return err } @@ -2593,7 +2601,7 @@ func CausalConsistencyExamples(client *mongo.Client) error { } defer session2.EndSession(context.TODO()) - err = client.UseSessionWithOptions(context.TODO(), opts, func(sctx mongo.SessionContext) error { + err = client.UseSessionWithOptions(context.TODO(), opts, func(ctx context.Context) error { // Set cluster time of session2 to session1's cluster time clusterTime := session1.ClusterTime() session2.AdvanceClusterTime(clusterTime) @@ -2602,13 +2610,13 @@ func CausalConsistencyExamples(client *mongo.Client) error { operationTime := session1.OperationTime() session2.AdvanceOperationTime(operationTime) // Run a find on session2, which should find all the writes from session1 - cursor, err := coll.Find(sctx, bson.D{{"end", nil}}) + cursor, err := coll.Find(ctx, bson.D{{"end", nil}}) if err != nil { return err } - for cursor.Next(sctx) { + for cursor.Next(ctx) { doc := cursor.Current fmt.Printf("Document: %v\n", doc.String()) } @@ -2984,7 +2992,7 @@ func snapshotQueryPetExample(mt *mtest.T) error { defer sess.EndSession(ctx) var adoptablePetsCount int32 - err = mongo.WithSession(ctx, sess, func(ctx mongo.SessionContext) error { + err = mongo.WithSession(ctx, sess, func(ctx context.Context) error { // Count the adoptable cats const adoptableCatsOutput = "adoptableCatsCount" cursor, err := db.Collection("cats").Aggregate(ctx, mongo.Pipeline{ @@ -3048,7 +3056,7 @@ func snapshotQueryRetailExample(mt *mtest.T) error { defer sess.EndSession(ctx) var totalDailySales int32 - err = mongo.WithSession(ctx, sess, func(ctx mongo.SessionContext) error { + err = mongo.WithSession(ctx, sess, func(ctx context.Context) error { // Count the total daily sales const totalDailySalesOutput = "totalDailySales" cursor, err := db.Collection("sales").Aggregate(ctx, mongo.Pipeline{ diff --git a/internal/integration/causal_consistency_test.go b/internal/integration/causal_consistency_test.go index f48c5b75b7..25a33466f2 100644 --- a/internal/integration/causal_consistency_test.go +++ b/internal/integration/causal_consistency_test.go @@ -41,8 +41,8 @@ func TestCausalConsistency_Supported(t *testing.T) { // first read in a causally consistent session must not send afterClusterTime to the server ccOpts := options.Session().SetCausalConsistency(true) - _ = mt.Client.UseSessionWithOptions(context.Background(), ccOpts, func(sc mongo.SessionContext) error { - _, _ = mt.Coll.Find(sc, bson.D{}) + _ = mt.Client.UseSessionWithOptions(context.Background(), ccOpts, func(ctx context.Context) error { + _, _ = mt.Coll.Find(ctx, bson.D{}) return nil }) @@ -57,8 +57,8 @@ func TestCausalConsistency_Supported(t *testing.T) { assert.Nil(mt, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { - _, _ = mt.Coll.Find(sc, bson.D{}) + _ = mongo.WithSession(context.Background(), sess, func(ctx context.Context) error { + _, _ = mt.Coll.Find(ctx, bson.D{}) return nil }) @@ -85,8 +85,8 @@ func TestCausalConsistency_Supported(t *testing.T) { assert.Nil(mt, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { - _ = mt.Coll.FindOne(sc, bson.D{}) + _ = mongo.WithSession(context.Background(), sess, func(ctx context.Context) error { + _ = mt.Coll.FindOne(ctx, bson.D{}) return nil }) currOptime := sess.OperationTime() @@ -120,8 +120,8 @@ func TestCausalConsistency_Supported(t *testing.T) { assert.NotNil(mt, currOptime, "expected session operation time, got nil") mt.ClearEvents() - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { - _ = mt.Coll.FindOne(sc, bson.D{}) + _ = mongo.WithSession(context.Background(), sess, func(ctx context.Context) error { + _ = mt.Coll.FindOne(ctx, bson.D{}) return nil }) _, sentOptime := getReadConcernFields(mt, mt.GetStartedEvent().Command) @@ -134,10 +134,10 @@ func TestCausalConsistency_Supported(t *testing.T) { // a read operation in a non causally-consistent session should not include afterClusterTime sessOpts := options.Session().SetCausalConsistency(false) - _ = mt.Client.UseSessionWithOptions(context.Background(), sessOpts, func(sc mongo.SessionContext) error { - _, _ = mt.Coll.Find(sc, bson.D{}) + _ = mt.Client.UseSessionWithOptions(context.Background(), sessOpts, func(ctx context.Context) error { + _, _ = mt.Coll.Find(ctx, bson.D{}) mt.ClearEvents() - _, _ = mt.Coll.Find(sc, bson.D{}) + _, _ = mt.Coll.Find(ctx, bson.D{}) return nil }) evt := mt.GetStartedEvent() @@ -152,14 +152,14 @@ func TestCausalConsistency_Supported(t *testing.T) { assert.Nil(mt, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { - _ = mt.Coll.FindOne(sc, bson.D{}) + _ = mongo.WithSession(context.Background(), sess, func(ctx context.Context) error { + _ = mt.Coll.FindOne(ctx, bson.D{}) return nil }) currOptime := sess.OperationTime() mt.ClearEvents() - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { - _ = mt.Coll.FindOne(sc, bson.D{}) + _ = mongo.WithSession(context.Background(), sess, func(ctx context.Context) error { + _ = mt.Coll.FindOne(ctx, bson.D{}) return nil }) @@ -174,14 +174,14 @@ func TestCausalConsistency_Supported(t *testing.T) { assert.Nil(mt, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { - _ = mt.Coll.FindOne(sc, bson.D{}) + _ = mongo.WithSession(context.Background(), sess, func(ctx context.Context) error { + _ = mt.Coll.FindOne(ctx, bson.D{}) return nil }) currOptime := sess.OperationTime() mt.ClearEvents() - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { - _ = mt.Coll.FindOne(sc, bson.D{}) + _ = mongo.WithSession(context.Background(), sess, func(ctx context.Context) error { + _ = mt.Coll.FindOne(ctx, bson.D{}) return nil }) @@ -215,8 +215,8 @@ func TestCausalConsistency_NotSupported(t *testing.T) { // support cluster times sessOpts := options.Session().SetCausalConsistency(true) - _ = mt.Client.UseSessionWithOptions(context.Background(), sessOpts, func(sc mongo.SessionContext) error { - _, _ = mt.Coll.Find(sc, bson.D{}) + _ = mt.Client.UseSessionWithOptions(context.Background(), sessOpts, func(ctx context.Context) error { + _, _ = mt.Coll.Find(ctx, bson.D{}) return nil }) diff --git a/internal/integration/client_test.go b/internal/integration/client_test.go index 677bb44868..ceae58ac81 100644 --- a/internal/integration/client_test.go +++ b/internal/integration/client_test.go @@ -371,8 +371,7 @@ func TestClient(t *testing.T) { sess, err := mt.Client.StartSession(tc.opts) assert.Nil(mt, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - xs := sess.(mongo.XSession) - consistent := xs.ClientSession().Consistent + consistent := sess.ClientSession().Consistent assert.Equal(mt, tc.consistent, consistent, "expected consistent to be %v, got %v", tc.consistent, consistent) }) } diff --git a/internal/integration/crud_helpers_test.go b/internal/integration/crud_helpers_test.go index 973370c1a1..355f934add 100644 --- a/internal/integration/crud_helpers_test.go +++ b/internal/integration/crud_helpers_test.go @@ -156,7 +156,7 @@ type watcher interface { Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error) } -func executeAggregate(mt *mtest.T, agg aggregator, sess mongo.Session, args bson.Raw) (*mongo.Cursor, error) { +func executeAggregate(mt *mtest.T, agg aggregator, sess *mongo.Session, args bson.Raw) (*mongo.Cursor, error) { mt.Helper() var pipeline []interface{} @@ -186,7 +186,7 @@ func executeAggregate(mt *mtest.T, agg aggregator, sess mongo.Session, args bson if sess != nil { var cur *mongo.Cursor - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var aerr error cur, aerr = agg.Aggregate(sc, pipeline, opts) return aerr @@ -196,7 +196,7 @@ func executeAggregate(mt *mtest.T, agg aggregator, sess mongo.Session, args bson return agg.Aggregate(context.Background(), pipeline, opts) } -func executeWatch(mt *mtest.T, w watcher, sess mongo.Session, args bson.Raw) (*mongo.ChangeStream, error) { +func executeWatch(mt *mtest.T, w watcher, sess *mongo.Session, args bson.Raw) (*mongo.ChangeStream, error) { mt.Helper() pipeline := []interface{}{} @@ -215,7 +215,7 @@ func executeWatch(mt *mtest.T, w watcher, sess mongo.Session, args bson.Raw) (*m if sess != nil { var stream *mongo.ChangeStream - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var csErr error stream, csErr = w.Watch(sc, pipeline) return csErr @@ -225,7 +225,7 @@ func executeWatch(mt *mtest.T, w watcher, sess mongo.Session, args bson.Raw) (*m return w.Watch(context.Background(), pipeline) } -func executeCountDocuments(mt *mtest.T, sess mongo.Session, args bson.Raw) (int64, error) { +func executeCountDocuments(mt *mtest.T, sess *mongo.Session, args bson.Raw) (int64, error) { mt.Helper() filter := emptyDoc @@ -253,7 +253,7 @@ func executeCountDocuments(mt *mtest.T, sess mongo.Session, args bson.Raw) (int6 if sess != nil { var count int64 - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var countErr error count, countErr = mt.Coll.CountDocuments(sc, filter, opts) return countErr @@ -263,7 +263,7 @@ func executeCountDocuments(mt *mtest.T, sess mongo.Session, args bson.Raw) (int6 return mt.Coll.CountDocuments(context.Background(), filter, opts) } -func executeInsertOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.InsertOneResult, error) { +func executeInsertOne(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.InsertOneResult, error) { mt.Helper() doc := emptyDoc @@ -287,7 +287,7 @@ func executeInsertOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.In if sess != nil { var res *mongo.InsertOneResult - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var insertErr error res, insertErr = mt.Coll.InsertOne(sc, doc, opts) return insertErr @@ -297,7 +297,7 @@ func executeInsertOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.In return mt.Coll.InsertOne(context.Background(), doc, opts) } -func executeInsertMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.InsertManyResult, error) { +func executeInsertMany(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.InsertManyResult, error) { mt.Helper() var docs []interface{} @@ -325,7 +325,7 @@ func executeInsertMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.I if sess != nil { var res *mongo.InsertManyResult - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var insertErr error res, insertErr = mt.Coll.InsertMany(sc, docs, opts) return insertErr @@ -360,7 +360,7 @@ func setFindModifiers(modifiersDoc bson.Raw, opts *options.FindOptions) { } } -func executeFind(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Cursor, error) { +func executeFind(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.Cursor, error) { mt.Helper() filter := emptyDoc @@ -398,7 +398,7 @@ func executeFind(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Cursor, if sess != nil { var c *mongo.Cursor - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var findErr error c, findErr = mt.Coll.Find(sc, filter, opts) return findErr @@ -408,7 +408,7 @@ func executeFind(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Cursor, return mt.Coll.Find(context.Background(), filter, opts) } -func executeRunCommand(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.SingleResult { +func executeRunCommand(mt *mtest.T, sess *mongo.Session, args bson.Raw) *mongo.SingleResult { mt.Helper() cmd := emptyDoc @@ -432,7 +432,7 @@ func executeRunCommand(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.Si if sess != nil { var sr *mongo.SingleResult - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + _ = mongo.WithSession(context.Background(), sess, func(sc context.Context) error { sr = mt.DB.RunCommand(sc, cmd, opts) return nil }) @@ -441,7 +441,7 @@ func executeRunCommand(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.Si return mt.DB.RunCommand(context.Background(), cmd, opts) } -func executeListCollections(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Cursor, error) { +func executeListCollections(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.Cursor, error) { mt.Helper() filter := emptyDoc @@ -460,7 +460,7 @@ func executeListCollections(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mo if sess != nil { var c *mongo.Cursor - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var lcErr error c, lcErr = mt.DB.ListCollections(sc, filter) return lcErr @@ -470,7 +470,7 @@ func executeListCollections(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mo return mt.DB.ListCollections(context.Background(), filter) } -func executeListCollectionNames(mt *mtest.T, sess mongo.Session, args bson.Raw) ([]string, error) { +func executeListCollectionNames(mt *mtest.T, sess *mongo.Session, args bson.Raw) ([]string, error) { mt.Helper() filter := emptyDoc @@ -489,7 +489,7 @@ func executeListCollectionNames(mt *mtest.T, sess mongo.Session, args bson.Raw) if sess != nil { var res []string - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var lcErr error res, lcErr = mt.DB.ListCollectionNames(sc, filter) return lcErr @@ -499,7 +499,7 @@ func executeListCollectionNames(mt *mtest.T, sess mongo.Session, args bson.Raw) return mt.DB.ListCollectionNames(context.Background(), filter) } -func executeListDatabaseNames(mt *mtest.T, sess mongo.Session, args bson.Raw) ([]string, error) { +func executeListDatabaseNames(mt *mtest.T, sess *mongo.Session, args bson.Raw) ([]string, error) { mt.Helper() filter := emptyDoc @@ -518,7 +518,7 @@ func executeListDatabaseNames(mt *mtest.T, sess mongo.Session, args bson.Raw) ([ if sess != nil { var res []string - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var ldErr error res, ldErr = mt.Client.ListDatabaseNames(sc, filter) return ldErr @@ -528,7 +528,7 @@ func executeListDatabaseNames(mt *mtest.T, sess mongo.Session, args bson.Raw) ([ return mt.Client.ListDatabaseNames(context.Background(), filter) } -func executeListDatabases(mt *mtest.T, sess mongo.Session, args bson.Raw) (mongo.ListDatabasesResult, error) { +func executeListDatabases(mt *mtest.T, sess *mongo.Session, args bson.Raw) (mongo.ListDatabasesResult, error) { mt.Helper() filter := emptyDoc @@ -547,7 +547,7 @@ func executeListDatabases(mt *mtest.T, sess mongo.Session, args bson.Raw) (mongo if sess != nil { var res mongo.ListDatabasesResult - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var ldErr error res, ldErr = mt.Client.ListDatabases(sc, filter) return ldErr @@ -557,7 +557,7 @@ func executeListDatabases(mt *mtest.T, sess mongo.Session, args bson.Raw) (mongo return mt.Client.ListDatabases(context.Background(), filter) } -func executeFindOne(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.SingleResult { +func executeFindOne(mt *mtest.T, sess *mongo.Session, args bson.Raw) *mongo.SingleResult { mt.Helper() filter := emptyDoc @@ -576,7 +576,7 @@ func executeFindOne(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.Singl if sess != nil { var res *mongo.SingleResult - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + _ = mongo.WithSession(context.Background(), sess, func(sc context.Context) error { res = mt.Coll.FindOne(sc, filter) return nil }) @@ -585,14 +585,14 @@ func executeFindOne(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.Singl return mt.Coll.FindOne(context.Background(), filter) } -func executeListIndexes(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Cursor, error) { +func executeListIndexes(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.Cursor, error) { mt.Helper() // no arguments expected. add a Fatal in case arguments are added in the future assert.Equal(mt, 0, len(args), "unexpected listIndexes arguments: %v", args) if sess != nil { var cursor *mongo.Cursor - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var listErr error cursor, listErr = mt.Coll.Indexes().List(sc) return listErr @@ -602,7 +602,7 @@ func executeListIndexes(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo. return mt.Coll.Indexes().List(context.Background()) } -func executeDistinct(mt *mtest.T, sess mongo.Session, args bson.Raw) (bson.RawArray, error) { +func executeDistinct(mt *mtest.T, sess *mongo.Session, args bson.Raw) (bson.RawArray, error) { mt.Helper() var fieldName string @@ -629,8 +629,8 @@ func executeDistinct(mt *mtest.T, sess mongo.Session, args bson.Raw) (bson.RawAr var res *mongo.DistinctResult if sess != nil { - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { - res = mt.Coll.Distinct(sc, fieldName, filter, opts) + err := mongo.WithSession(context.Background(), sess, func(ctx context.Context) error { + res = mt.Coll.Distinct(ctx, fieldName, filter, opts) return res.Err() }) @@ -645,7 +645,7 @@ func executeDistinct(mt *mtest.T, sess mongo.Session, args bson.Raw) (bson.RawAr return res.Raw() } -func executeFindOneAndDelete(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.SingleResult { +func executeFindOneAndDelete(mt *mtest.T, sess *mongo.Session, args bson.Raw) *mongo.SingleResult { mt.Helper() filter := emptyDoc @@ -675,7 +675,7 @@ func executeFindOneAndDelete(mt *mtest.T, sess mongo.Session, args bson.Raw) *mo if sess != nil { var res *mongo.SingleResult - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + _ = mongo.WithSession(context.Background(), sess, func(sc context.Context) error { res = mt.Coll.FindOneAndDelete(sc, filter, opts) return nil }) @@ -684,7 +684,7 @@ func executeFindOneAndDelete(mt *mtest.T, sess mongo.Session, args bson.Raw) *mo return mt.Coll.FindOneAndDelete(context.Background(), filter, opts) } -func executeFindOneAndUpdate(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.SingleResult { +func executeFindOneAndUpdate(mt *mtest.T, sess *mongo.Session, args bson.Raw) *mongo.SingleResult { mt.Helper() filter := emptyDoc @@ -732,7 +732,7 @@ func executeFindOneAndUpdate(mt *mtest.T, sess mongo.Session, args bson.Raw) *mo if sess != nil { var res *mongo.SingleResult - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + _ = mongo.WithSession(context.Background(), sess, func(sc context.Context) error { res = mt.Coll.FindOneAndUpdate(sc, filter, update, opts) return nil }) @@ -741,7 +741,7 @@ func executeFindOneAndUpdate(mt *mtest.T, sess mongo.Session, args bson.Raw) *mo return mt.Coll.FindOneAndUpdate(context.Background(), filter, update, opts) } -func executeFindOneAndReplace(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.SingleResult { +func executeFindOneAndReplace(mt *mtest.T, sess *mongo.Session, args bson.Raw) *mongo.SingleResult { mt.Helper() filter := emptyDoc @@ -785,7 +785,7 @@ func executeFindOneAndReplace(mt *mtest.T, sess mongo.Session, args bson.Raw) *m if sess != nil { var res *mongo.SingleResult - _ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + _ = mongo.WithSession(context.Background(), sess, func(sc context.Context) error { res = mt.Coll.FindOneAndReplace(sc, filter, replacement, opts) return nil }) @@ -794,7 +794,7 @@ func executeFindOneAndReplace(mt *mtest.T, sess mongo.Session, args bson.Raw) *m return mt.Coll.FindOneAndReplace(context.Background(), filter, replacement, opts) } -func executeDeleteOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.DeleteResult, error) { +func executeDeleteOne(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.DeleteResult, error) { mt.Helper() filter := emptyDoc @@ -820,7 +820,7 @@ func executeDeleteOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.De if sess != nil { var res *mongo.DeleteResult - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var derr error res, derr = mt.Coll.DeleteOne(sc, filter, opts) return derr @@ -830,7 +830,7 @@ func executeDeleteOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.De return mt.Coll.DeleteOne(context.Background(), filter, opts) } -func executeDeleteMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.DeleteResult, error) { +func executeDeleteMany(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.DeleteResult, error) { mt.Helper() filter := emptyDoc @@ -856,7 +856,7 @@ func executeDeleteMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.D if sess != nil { var res *mongo.DeleteResult - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var derr error res, derr = mt.Coll.DeleteMany(sc, filter, opts) return derr @@ -866,7 +866,7 @@ func executeDeleteMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.D return mt.Coll.DeleteMany(context.Background(), filter, opts) } -func executeUpdateOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.UpdateResult, error) { +func executeUpdateOne(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.UpdateResult, error) { mt.Helper() filter := emptyDoc @@ -904,7 +904,7 @@ func executeUpdateOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Up if sess != nil { var res *mongo.UpdateResult - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var uerr error res, uerr = mt.Coll.UpdateOne(sc, filter, update, opts) return uerr @@ -914,7 +914,7 @@ func executeUpdateOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Up return mt.Coll.UpdateOne(context.Background(), filter, update, opts) } -func executeUpdateMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.UpdateResult, error) { +func executeUpdateMany(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.UpdateResult, error) { mt.Helper() filter := emptyDoc @@ -952,7 +952,7 @@ func executeUpdateMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.U if sess != nil { var res *mongo.UpdateResult - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var uerr error res, uerr = mt.Coll.UpdateMany(sc, filter, update, opts) return uerr @@ -962,7 +962,7 @@ func executeUpdateMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.U return mt.Coll.UpdateMany(context.Background(), filter, update, opts) } -func executeReplaceOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.UpdateResult, error) { +func executeReplaceOne(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.UpdateResult, error) { mt.Helper() filter := emptyDoc @@ -996,7 +996,7 @@ func executeReplaceOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.U if sess != nil { var res *mongo.UpdateResult - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var uerr error res, uerr = mt.Coll.ReplaceOne(sc, filter, replacement, opts) return uerr @@ -1013,7 +1013,7 @@ type withTransactionArgs struct { Options bson.Raw `bson:"options"` } -func runWithTransactionOperations(mt *mtest.T, operations []*operation, sess mongo.Session) error { +func runWithTransactionOperations(mt *mtest.T, operations []*operation, sess *mongo.Session) error { mt.Helper() for _, op := range operations { @@ -1041,7 +1041,7 @@ func runWithTransactionOperations(mt *mtest.T, operations []*operation, sess mon return nil } -func executeWithTransaction(mt *mtest.T, sess mongo.Session, args bson.Raw) error { +func executeWithTransaction(mt *mtest.T, sess *mongo.Session, args bson.Raw) error { mt.Helper() var testArgs withTransactionArgs @@ -1049,14 +1049,14 @@ func executeWithTransaction(mt *mtest.T, sess mongo.Session, args bson.Raw) erro assert.Nil(mt, err, "error creating withTransactionArgs: %v", err) opts := createTransactionOptions(mt, testArgs.Options) - _, err = sess.WithTransaction(context.Background(), func(sc mongo.SessionContext) (interface{}, error) { + _, err = sess.WithTransaction(context.Background(), func(sc context.Context) (interface{}, error) { err := runWithTransactionOperations(mt, testArgs.Callback.Operations, sess) return nil, err }, opts) return err } -func executeBulkWrite(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.BulkWriteResult, error) { +func executeBulkWrite(mt *mtest.T, sess *mongo.Session, args bson.Raw) (*mongo.BulkWriteResult, error) { mt.Helper() models := createBulkWriteModels(mt, bson.Raw(args.Lookup("requests").Array())) @@ -1080,7 +1080,7 @@ func executeBulkWrite(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Bu if sess != nil { var res *mongo.BulkWriteResult - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var bwerr error res, bwerr = mt.Coll.BulkWrite(sc, models, opts) return bwerr @@ -1200,7 +1200,7 @@ func createBulkWriteModel(mt *mtest.T, rawModel bson.Raw) mongo.WriteModel { return nil } -func executeEstimatedDocumentCount(mt *mtest.T, sess mongo.Session, args bson.Raw) (int64, error) { +func executeEstimatedDocumentCount(mt *mtest.T, sess *mongo.Session, args bson.Raw) (int64, error) { mt.Helper() // no arguments expected. add a Fatal in case arguments are added in the future @@ -1209,7 +1209,7 @@ func executeEstimatedDocumentCount(mt *mtest.T, sess mongo.Session, args bson.Ra if sess != nil { var res int64 - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var countErr error res, countErr = mt.Coll.EstimatedDocumentCount(sc) return countErr @@ -1259,7 +1259,7 @@ func executeGridFSDownloadByName(mt *mtest.T, bucket *mongo.GridFSBucket, args b return bucket.DownloadToStreamByName(context.Background(), file, new(bytes.Buffer)) } -func executeCreateIndex(mt *mtest.T, sess mongo.Session, args bson.Raw) (string, error) { +func executeCreateIndex(mt *mtest.T, sess *mongo.Session, args bson.Raw) (string, error) { mt.Helper() model := mongo.IndexModel{ @@ -1283,7 +1283,7 @@ func executeCreateIndex(mt *mtest.T, sess mongo.Session, args bson.Raw) (string, if sess != nil { var indexName string - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var indexErr error indexName, indexErr = mt.Coll.Indexes().CreateOne(sc, model) return indexErr @@ -1293,7 +1293,7 @@ func executeCreateIndex(mt *mtest.T, sess mongo.Session, args bson.Raw) (string, return mt.Coll.Indexes().CreateOne(context.Background(), model) } -func executeDropIndex(mt *mtest.T, sess mongo.Session, args bson.Raw) (bson.Raw, error) { +func executeDropIndex(mt *mtest.T, sess *mongo.Session, args bson.Raw) (bson.Raw, error) { mt.Helper() var name string @@ -1312,7 +1312,7 @@ func executeDropIndex(mt *mtest.T, sess mongo.Session, args bson.Raw) (bson.Raw, if sess != nil { var res bson.Raw - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { var indexErr error res, indexErr = mt.Coll.Indexes().DropOne(sc, name) return indexErr @@ -1322,7 +1322,7 @@ func executeDropIndex(mt *mtest.T, sess mongo.Session, args bson.Raw) (bson.Raw, return mt.Coll.Indexes().DropOne(context.Background(), name) } -func executeDropCollection(mt *mtest.T, sess mongo.Session, args bson.Raw) error { +func executeDropCollection(mt *mtest.T, sess *mongo.Session, args bson.Raw) error { mt.Helper() var collName string @@ -1344,7 +1344,7 @@ func executeDropCollection(mt *mtest.T, sess mongo.Session, args bson.Raw) error coll := mt.DB.Collection(collName) if sess != nil { - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { return coll.Drop(sc, dco) }) return err @@ -1352,7 +1352,7 @@ func executeDropCollection(mt *mtest.T, sess mongo.Session, args bson.Raw) error return coll.Drop(context.Background(), dco) } -func executeCreateCollection(mt *mtest.T, sess mongo.Session, args bson.Raw) error { +func executeCreateCollection(mt *mtest.T, sess *mongo.Session, args bson.Raw) error { mt.Helper() cco := options.CreateCollection() @@ -1377,7 +1377,7 @@ func executeCreateCollection(mt *mtest.T, sess mongo.Session, args bson.Raw) err } if sess != nil { - err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err := mongo.WithSession(context.Background(), sess, func(sc context.Context) error { return mt.DB.CreateCollection(sc, collName, cco) }) return err diff --git a/internal/integration/mongos_pinning_test.go b/internal/integration/mongos_pinning_test.go index 06b31762c9..76477c02be 100644 --- a/internal/integration/mongos_pinning_test.go +++ b/internal/integration/mongos_pinning_test.go @@ -31,21 +31,22 @@ func TestMongosPinning(t *testing.T) { mt.Run("unpin for next transaction", func(mt *mtest.T) { addresses := map[string]struct{}{} - _ = mt.Client.UseSession(context.Background(), func(sc mongo.SessionContext) error { + _ = mt.Client.UseSession(context.Background(), func(sctx context.Context) error { + sess := mongo.SessionFromContext(sctx) // Insert a document in a transaction to pin session to a mongos - err := sc.StartTransaction() + err := sess.StartTransaction() assert.Nil(mt, err, "StartTransaction error: %v", err) - _, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}}) + _, err = mt.Coll.InsertOne(sctx, bson.D{{"x", 1}}) assert.Nil(mt, err, "InsertOne error: %v", err) - err = sc.CommitTransaction(sc) + err = sess.CommitTransaction(sctx) assert.Nil(mt, err, "CommitTransaction error: %v", err) for i := 0; i < 50; i++ { // Call Find in a new transaction to unpin from the old mongos and select a new one - err = sc.StartTransaction() + err = sess.StartTransaction() assert.Nil(mt, err, iterationErrmsg("StartTransaction", i, err)) - cursor, err := mt.Coll.Find(sc, bson.D{}) + cursor, err := mt.Coll.Find(sctx, bson.D{}) assert.Nil(mt, err, iterationErrmsg("Find", i, err)) assert.True(mt, cursor.Next(context.Background()), "Next returned false on iteration %v", i) @@ -55,7 +56,7 @@ func TestMongosPinning(t *testing.T) { err = descConn.Close() assert.Nil(mt, err, iterationErrmsg("connection Close", i, err)) - err = sc.CommitTransaction(sc) + err = sess.CommitTransaction(sctx) assert.Nil(mt, err, iterationErrmsg("CommitTransaction", i, err)) } return nil @@ -64,18 +65,20 @@ func TestMongosPinning(t *testing.T) { }) mt.Run("unpin for non transaction operation", func(mt *mtest.T) { addresses := map[string]struct{}{} - _ = mt.Client.UseSession(context.Background(), func(sc mongo.SessionContext) error { + _ = mt.Client.UseSession(context.Background(), func(sctx context.Context) error { + sess := mongo.SessionFromContext(sctx) + // Insert a document in a transaction to pin session to a mongos - err := sc.StartTransaction() + err := sess.StartTransaction() assert.Nil(mt, err, "StartTransaction error: %v", err) - _, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}}) + _, err = mt.Coll.InsertOne(sctx, bson.D{{"x", 1}}) assert.Nil(mt, err, "InsertOne error: %v", err) - err = sc.CommitTransaction(sc) + err = sess.CommitTransaction(sctx) assert.Nil(mt, err, "CommitTransaction error: %v", err) for i := 0; i < 50; i++ { // Call Find with the session but outside of a transaction - cursor, err := mt.Coll.Find(sc, bson.D{}) + cursor, err := mt.Coll.Find(sctx, bson.D{}) assert.Nil(mt, err, iterationErrmsg("Find", i, err)) assert.True(mt, cursor.Next(context.Background()), "Next returned false on iteration %v", i) diff --git a/internal/integration/retryable_writes_prose_test.go b/internal/integration/retryable_writes_prose_test.go index 8f70354962..1dbd881482 100644 --- a/internal/integration/retryable_writes_prose_test.go +++ b/internal/integration/retryable_writes_prose_test.go @@ -108,7 +108,7 @@ func TestRetryableWritesProse(t *testing.T) { mt.ClearEvents() - err = mongo.WithSession(context.Background(), sess, func(ctx mongo.SessionContext) error { + err = mongo.WithSession(context.Background(), sess, func(ctx context.Context) error { doc := bson.D{{"foo", 1}} _, err := mt.Coll.InsertOne(ctx, doc) return err diff --git a/internal/integration/sessions_test.go b/internal/integration/sessions_test.go index a9f4eadb44..0150a21fa2 100644 --- a/internal/integration/sessions_test.go +++ b/internal/integration/sessions_test.go @@ -35,14 +35,14 @@ func TestSessionPool(t *testing.T) { sess, err := mt.Client.StartSession() assert.Nil(mt, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - initialLastUsedTime := getSessionLastUsedTime(mt, sess) + initialLastUsedTime := sess.ClientSession().LastUsed - err = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err = mongo.WithSession(context.Background(), sess, func(sc context.Context) error { return mt.Client.Ping(sc, readpref.Primary()) }) assert.Nil(mt, err, "WithSession error: %v", err) - newLastUsedTime := getSessionLastUsedTime(mt, sess) + newLastUsedTime := sess.ClientSession().LastUsed assert.True(mt, newLastUsedTime.After(initialLastUsedTime), "last used time %s is not after the initial last used time %s", newLastUsedTime, initialLastUsedTime) }) @@ -63,7 +63,6 @@ func TestSessions(t *testing.T) { defer sess.EndSession(context.Background()) ctx := mongo.NewSessionContext(context.Background(), sess) - assert.Equal(mt, sess.ID(), ctx.ID(), "expected Session ID %v, got %v", sess.ID(), ctx.ID()) gotSess := mongo.SessionFromContext(ctx) assert.NotNil(mt, gotSess, "expected SessionFromContext to return non-nil value, got nil") @@ -77,7 +76,7 @@ func TestSessions(t *testing.T) { mt.RunOpts("run transaction", txnOpts, func(mt *mtest.T) { // Test that the imperative sessions API can be used to run a transaction. - createSessionContext := func(mt *mtest.T) mongo.SessionContext { + createSessionContext := func(mt *mtest.T) context.Context { sess, err := mt.Client.StartSession() assert.Nil(mt, err, "StartSession error: %v", err) @@ -114,7 +113,7 @@ func TestSessions(t *testing.T) { assert.Nil(mt, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - err = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + err = mongo.WithSession(context.Background(), sess, func(sc context.Context) error { _, err := mt.Coll.InsertOne(sc, bson.D{{"x", 1}}) return err }) @@ -513,7 +512,7 @@ type sessionFunction struct { params []interface{} // should not include context } -func (sf sessionFunction) execute(mt *mtest.T, sess mongo.Session) error { +func (sf sessionFunction) execute(mt *mtest.T, sess *mongo.Session) error { var target reflect.Value switch sf.target { case "client": @@ -538,7 +537,7 @@ func (sf sessionFunction) execute(mt *mtest.T, sess mongo.Session) error { paramsValues := interfaceSliceToValueSlice(sf.params) if sess != nil { - return mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error { + return mongo.WithSession(context.Background(), sess, func(sc context.Context) error { valueArgs := []reflect.Value{reflect.ValueOf(sc)} valueArgs = append(valueArgs, paramsValues...) returnValues := fn.Call(valueArgs) @@ -641,9 +640,3 @@ func extractSentSessionID(mt *mtest.T) []byte { _, data := lsid.Document().Lookup("id").Binary() return data } - -func getSessionLastUsedTime(mt *mtest.T, sess mongo.Session) time.Time { - xsess, ok := sess.(mongo.XSession) - assert.True(mt, ok, "expected session to implement mongo.XSession, but got %T", sess) - return xsess.ClientSession().LastUsed -} diff --git a/internal/integration/unified/entity.go b/internal/integration/unified/entity.go index 75bbee6035..873a828da7 100644 --- a/internal/integration/unified/entity.go +++ b/internal/integration/unified/entity.go @@ -191,7 +191,7 @@ type EntityMap struct { clientEntities map[string]*clientEntity dbEntites map[string]*mongo.Database collEntities map[string]*mongo.Collection - sessions map[string]mongo.Session + sessions map[string]*mongo.Session gridfsBuckets map[string]*mongo.GridFSBucket bsonValues map[string]bson.RawValue eventListEntities map[string][]bson.Raw @@ -225,7 +225,7 @@ func newEntityMap() *EntityMap { clientEntities: make(map[string]*clientEntity), collEntities: make(map[string]*mongo.Collection), dbEntites: make(map[string]*mongo.Database), - sessions: make(map[string]mongo.Session), + sessions: make(map[string]*mongo.Session), eventListEntities: make(map[string][]bson.Raw), bsonArrayEntities: make(map[string][]bson.Raw), successValues: make(map[string]int32), @@ -422,7 +422,7 @@ func (em *EntityMap) database(id string) (*mongo.Database, error) { return db, nil } -func (em *EntityMap) session(id string) (mongo.Session, error) { +func (em *EntityMap) session(id string) (*mongo.Session, error) { sess, ok := em.sessions[id] if !ok { return nil, newEntityNotFoundError("session", id) diff --git a/internal/integration/unified/session_operation_execution.go b/internal/integration/unified/session_operation_execution.go index 98e576093d..f8c91eb1da 100644 --- a/internal/integration/unified/session_operation_execution.go +++ b/internal/integration/unified/session_operation_execution.go @@ -11,7 +11,6 @@ import ( "fmt" "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -103,7 +102,7 @@ func executeWithTransaction(ctx context.Context, op *operation, loopDone <-chan return fmt.Errorf("error unmarshalling arguments to transactionOptions: %v", err) } - _, err = sess.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { + _, err = sess.WithTransaction(ctx, func(ctx context.Context) (interface{}, error) { for idx, oper := range operations { if err := oper.execute(ctx, loopDone); err != nil { return nil, fmt.Errorf("error executing operation %q at index %d: %v", oper.Name, idx, err) diff --git a/internal/integration/unified/testrunner_operation.go b/internal/integration/unified/testrunner_operation.go index 38f81bfed3..1079f33840 100644 --- a/internal/integration/unified/testrunner_operation.go +++ b/internal/integration/unified/testrunner_operation.go @@ -443,8 +443,8 @@ func waitForEvent(ctx context.Context, args waitForEventArguments) error { } } -func extractClientSession(sess mongo.Session) *session.Client { - return sess.(mongo.XSession).ClientSession() +func extractClientSession(sess *mongo.Session) *session.Client { + return sess.ClientSession() } func verifySessionPinnedState(ctx context.Context, sessionID string, expectedPinned bool) error { diff --git a/internal/integration/unified_spec_test.go b/internal/integration/unified_spec_test.go index 88c762d8a1..cba3244db3 100644 --- a/internal/integration/unified_spec_test.go +++ b/internal/integration/unified_spec_test.go @@ -363,12 +363,12 @@ func createBucket(mt *mtest.T, testFile testFile, testCase *testCase) { testCase.bucket = mt.DB.GridFSBucket(bucketOpts) } -func runOperation(mt *mtest.T, testCase *testCase, op *operation, sess0, sess1 mongo.Session) error { +func runOperation(mt *mtest.T, testCase *testCase, op *operation, sess0, sess1 *mongo.Session) error { if op.Name == "count" { mt.Skip("count has been deprecated") } - var sess mongo.Session + var sess *mongo.Session if sessVal, err := op.Arguments.LookupErr("session"); err == nil { sessStr := sessVal.StringValue() switch sessStr { @@ -439,14 +439,10 @@ func executeGridFSOperation(mt *mtest.T, bucket *mongo.GridFSBucket, op *operati return nil } -func executeTestRunnerOperation(mt *mtest.T, testCase *testCase, op *operation, sess mongo.Session) error { +func executeTestRunnerOperation(mt *mtest.T, testCase *testCase, op *operation, sess *mongo.Session) error { var clientSession *session.Client if sess != nil { - xsess, ok := sess.(mongo.XSession) - if !ok { - return fmt.Errorf("expected session type %T to implement mongo.XSession", sess) - } - clientSession = xsess.ClientSession() + clientSession = sess.ClientSession() } switch op.Name { @@ -632,7 +628,7 @@ func lastTwoIDs(mt *mtest.T) (bson.RawValue, bson.RawValue) { return first, second } -func executeSessionOperation(mt *mtest.T, op *operation, sess mongo.Session) error { +func executeSessionOperation(mt *mtest.T, op *operation, sess *mongo.Session) error { switch op.Name { case "startTransaction": var txnOpts *options.TransactionOptions @@ -651,7 +647,7 @@ func executeSessionOperation(mt *mtest.T, op *operation, sess mongo.Session) err } } -func executeCollectionOperation(mt *mtest.T, op *operation, sess mongo.Session) error { +func executeCollectionOperation(mt *mtest.T, op *operation, sess *mongo.Session) error { switch op.Name { case "countDocuments": // no results to verify with count @@ -795,7 +791,7 @@ func executeCollectionOperation(mt *mtest.T, op *operation, sess mongo.Session) return nil } -func executeDatabaseOperation(mt *mtest.T, op *operation, sess mongo.Session) error { +func executeDatabaseOperation(mt *mtest.T, op *operation, sess *mongo.Session) error { switch op.Name { case "runCommand": res := executeRunCommand(mt, sess, op.Arguments) @@ -850,7 +846,7 @@ func executeDatabaseOperation(mt *mtest.T, op *operation, sess mongo.Session) er return nil } -func executeClientOperation(mt *mtest.T, op *operation, sess mongo.Session) error { +func executeClientOperation(mt *mtest.T, op *operation, sess *mongo.Session) error { switch op.Name { case "listDatabaseNames": _, err := executeListDatabaseNames(mt, sess, op.Arguments) @@ -879,7 +875,7 @@ func executeClientOperation(mt *mtest.T, op *operation, sess mongo.Session) erro return nil } -func setupSessions(mt *mtest.T, test *testCase) (mongo.Session, mongo.Session) { +func setupSessions(mt *mtest.T, test *testCase) (*mongo.Session, *mongo.Session) { mt.Helper() var sess0Opts, sess1Opts *options.SessionOptions diff --git a/mongo/client.go b/mongo/client.go index 6cba70ce8d..36f6fbc35f 100644 --- a/mongo/client.go +++ b/mongo/client.go @@ -373,7 +373,7 @@ func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error { // // If the DefaultReadConcern, DefaultWriteConcern, or DefaultReadPreference options are not set, the client's read // concern, write concern, or read preference will be used, respectively. -func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) { +func (c *Client) StartSession(opts ...*options.SessionOptions) (*Session, error) { if c.sessionPool == nil { return nil, ErrClientDisconnected } @@ -438,7 +438,7 @@ func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) sess.RetryWrite = false sess.RetryRead = c.retryReads - return &sessionImpl{ + return &Session{ clientSession: sess, client: c, deployment: c.deployment, @@ -775,40 +775,51 @@ func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts return names, nil } -// WithSession creates a new SessionContext from the ctx and sess parameters and uses it to call the fn callback. The -// SessionContext must be used as the Context parameter for any operations in the fn callback that should be executed -// under the session. +// WithSession creates a new session context from the ctx and sess parameters +// and uses it to call the fn callback. // -// WithSession is safe to call from multiple goroutines concurrently. However, the SessionContext passed to the -// WithSession callback function is not safe for concurrent use by multiple goroutines. +// WithSession is safe to call from multiple goroutines concurrently. However, +// the context passed to the WithSession callback function is not safe for +// concurrent use by multiple goroutines. // -// If the ctx parameter already contains a Session, that Session will be replaced with the one provided. +// If the ctx parameter already contains a Session, that Session will be +// replaced with the one provided. // -// Any error returned by the fn callback will be returned without any modifications. -func WithSession(ctx context.Context, sess Session, fn func(SessionContext) error) error { +// Any error returned by the fn callback will be returned without any +// modifications. +func WithSession(ctx context.Context, sess *Session, fn func(context.Context) error) error { return fn(NewSessionContext(ctx, sess)) } -// UseSession creates a new Session and uses it to create a new SessionContext, which is used to call the fn callback. -// The SessionContext parameter must be used as the Context parameter for any operations in the fn callback that should -// be executed under a session. After the callback returns, the created Session is ended, meaning that any in-progress -// transactions started by fn will be aborted even if fn returns an error. +// UseSession creates a new Session and uses it to create a new session context, +// which is used to call the fn callback. After the callback returns, the +// created Session is ended, meaning that any in-progress transactions started +// by fn will be aborted even if fn returns an error. // -// UseSession is safe to call from multiple goroutines concurrently. However, the SessionContext passed to the -// UseSession callback function is not safe for concurrent use by multiple goroutines. +// UseSession is safe to call from multiple goroutines concurrently. However, +// the context passed to the UseSession callback function is not safe for +// concurrent use by multiple goroutines. // -// If the ctx parameter already contains a Session, that Session will be replaced with the newly created one. +// If the ctx parameter already contains a Session, that Session will be +// replaced with the newly created one. // -// Any error returned by the fn callback will be returned without any modifications. -func (c *Client) UseSession(ctx context.Context, fn func(SessionContext) error) error { +// Any error returned by the fn callback will be returned without any +// modifications. +func (c *Client) UseSession(ctx context.Context, fn func(context.Context) error) error { return c.UseSessionWithOptions(ctx, options.Session(), fn) } -// UseSessionWithOptions operates like UseSession but uses the given SessionOptions to create the Session. +// UseSessionWithOptions operates like UseSession but uses the given +// SessionOptions to create the Session. // -// UseSessionWithOptions is safe to call from multiple goroutines concurrently. However, the SessionContext passed to -// the UseSessionWithOptions callback function is not safe for concurrent use by multiple goroutines. -func (c *Client) UseSessionWithOptions(ctx context.Context, opts *options.SessionOptions, fn func(SessionContext) error) error { +// UseSessionWithOptions is safe to call from multiple goroutines concurrently. +// However, the context passed to the UseSessionWithOptions callback function is +// not safe for concurrent use by multiple goroutines. +func (c *Client) UseSessionWithOptions( + ctx context.Context, + opts *options.SessionOptions, + fn func(context.Context) error, +) error { defaultSess, err := c.StartSession(opts) if err != nil { return err diff --git a/mongo/client_test.go b/mongo/client_test.go index ddba3062fe..e5d08642b3 100644 --- a/mongo/client_test.go +++ b/mongo/client_test.go @@ -380,7 +380,7 @@ func TestClient(t *testing.T) { // Do an application operation and create the number of sessions specified by the test. _, err = coll.CountDocuments(bgCtx, bson.D{}) assert.Nil(t, err, "CountDocuments error: %v", err) - var sessions []Session + var sessions []*Session for i := 0; i < tc.numSessions; i++ { sess, err := client.StartSession() assert.Nil(t, err, "StartSession error at index %d: %v", i, err) diff --git a/mongo/crud_examples_test.go b/mongo/crud_examples_test.go index cb839c7e92..9ef1a63acd 100644 --- a/mongo/crud_examples_test.go +++ b/mongo/crud_examples_test.go @@ -630,8 +630,8 @@ func ExampleWithSession() { err = mongo.WithSession( context.TODO(), sess, - func(ctx mongo.SessionContext) error { - // Use the mongo.SessionContext as the Context parameter for + func(ctx context.Context) error { + // Use the context.Context as the Context parameter for // InsertOne and FindOne so both operations are run under the new // Session. @@ -687,12 +687,13 @@ func ExampleClient_UseSessionWithOptions() { err := client.UseSessionWithOptions( context.TODO(), opts, - func(ctx mongo.SessionContext) error { - // Use the mongo.SessionContext as the Context parameter for + func(ctx context.Context) error { + sess := mongo.SessionFromContext(ctx) + // Use the context.Context as the Context parameter for // InsertOne and FindOne so both operations are run under the new // Session. - if err := ctx.StartTransaction(); err != nil { + if err := sess.StartTransaction(); err != nil { return err } @@ -703,7 +704,7 @@ func ExampleClient_UseSessionWithOptions() { // context.Background() to ensure that the abort can complete // successfully even if the context passed to mongo.WithSession // is changed to have a timeout. - _ = ctx.AbortTransaction(context.Background()) + _ = sess.AbortTransaction(context.Background()) return err } @@ -717,7 +718,7 @@ func ExampleClient_UseSessionWithOptions() { // context.Background() to ensure that the abort can complete // successfully even if the context passed to mongo.WithSession // is changed to have a timeout. - _ = ctx.AbortTransaction(context.Background()) + _ = sess.AbortTransaction(context.Background()) return err } fmt.Println(result) @@ -725,7 +726,7 @@ func ExampleClient_UseSessionWithOptions() { // Use context.Background() to ensure that the commit can complete // successfully even if the context passed to mongo.WithSession is // changed to have a timeout. - return ctx.CommitTransaction(context.Background()) + return sess.CommitTransaction(context.Background()) }) if err != nil { log.Fatal(err) @@ -755,8 +756,8 @@ func ExampleClient_StartSession_withTransaction() { SetReadPreference(readpref.PrimaryPreferred()) result, err := sess.WithTransaction( context.TODO(), - func(ctx mongo.SessionContext) (interface{}, error) { - // Use the mongo.SessionContext as the Context parameter for + func(ctx context.Context) (interface{}, error) { + // Use the context.Context as the Context parameter for // InsertOne and FindOne so both operations are run in the same // transaction. @@ -794,7 +795,7 @@ func ExampleNewSessionContext() { defer sess.EndSession(context.TODO()) ctx := mongo.NewSessionContext(context.TODO(), sess) - // Start a transaction and use the mongo.SessionContext as the Context + // Start a transaction and use the context.Context as the Context // parameter for InsertOne and FindOne so both operations are run in the // transaction. if err = sess.StartTransaction(); err != nil { diff --git a/mongo/options/mongooptions.go b/mongo/options/mongooptions.go index 2279f66d0d..756684af7b 100644 --- a/mongo/options/mongooptions.go +++ b/mongo/options/mongooptions.go @@ -102,7 +102,7 @@ const ( // UpdateLookup includes a delta describing the changes to the document and a copy of the entire document that // was changed. UpdateLookup FullDocument = "updateLookup" - // WhenAvailable includes a post-image of the the modified document for replace and update change events + // WhenAvailable includes a post-image of the modified document for replace and update change events // if the post-image for this event is available. WhenAvailable FullDocument = "whenAvailable" ) diff --git a/mongo/session.go b/mongo/session.go index 3af4452f5f..bbcdf6a7f5 100644 --- a/mongo/session.go +++ b/mongo/session.go @@ -26,45 +26,47 @@ var ErrWrongClient = errors.New("session was not created by this client") var withTransactionTimeout = 120 * time.Second -// SessionContext combines the context.Context and mongo.Session interfaces. It should be used as the Context arguments -// to operations that should be executed in a session. -// -// Implementations of SessionContext are not safe for concurrent use by multiple goroutines. +// Session is a MongoDB logical session. Sessions can be used to enable causal +// consistency for a group of operations or to execute operations in an ACID +// transaction. A new Session can be created from a Client instance. A Session +// created from a Client must only be used to execute operations using that +// Client or a Database or Collection created from that Client. For more +// information about sessions, and their use cases, see +// https://www.mongodb.com/docs/manual/reference/server-sessions/, +// https://www.mongodb.com/docs/manual/core/read-isolation-consistency-recency/#causal-consistency, and +// https://www.mongodb.com/docs/manual/core/transactions/. // -// There are two ways to create a SessionContext and use it in a session/transaction. The first is to use one of the -// callback-based functions such as WithSession and UseSession. These functions create a SessionContext and pass it to -// the provided callback. The other is to use NewSessionContext to explicitly create a SessionContext. -type SessionContext interface { - context.Context - Session -} - -type sessionContext struct { - context.Context - Session +// Implementations of Session are not safe for concurrent use by multiple +// goroutines. +type Session struct { + clientSession *session.Client + client *Client + deployment driver.Deployment + didCommitAfterStart bool // true if commit was called after start with no other operations } -type sessionKey struct { -} +type sessionKey struct{} -// NewSessionContext creates a new SessionContext associated with the given Context and Session parameters. -func NewSessionContext(ctx context.Context, sess Session) SessionContext { - return &sessionContext{ - Context: context.WithValue(ctx, sessionKey{}, sess), - Session: sess, - } +// NewSessionContext returns a Context that holds the given Session. If the +// Context already contains a Session, that Session will be replaced with the +// one provided. +// +// The returned Context can be used with Collection methods like +// [Collection.InsertOne] or [Collection.Find] to run operations in a Session. +func NewSessionContext(parent context.Context, sess *Session) context.Context { + return context.WithValue(parent, sessionKey{}, sess) } // SessionFromContext extracts the mongo.Session object stored in a Context. This can be used on a SessionContext that // was created implicitly through one of the callback-based session APIs or explicitly by calling NewSessionContext. If // there is no Session stored in the provided Context, nil is returned. -func SessionFromContext(ctx context.Context) Session { +func SessionFromContext(ctx context.Context) *Session { val := ctx.Value(sessionKey{}) if val == nil { return nil } - sess, ok := val.(Session) + sess, ok := val.(*Session) if !ok { return nil } @@ -72,104 +74,22 @@ func SessionFromContext(ctx context.Context) Session { return sess } -// Session is an interface that represents a MongoDB logical session. Sessions can be used to enable causal consistency -// for a group of operations or to execute operations in an ACID transaction. A new Session can be created from a Client -// instance. A Session created from a Client must only be used to execute operations using that Client or a Database or -// Collection created from that Client. Custom implementations of this interface should not be used in production. For -// more information about sessions, and their use cases, see -// https://www.mongodb.com/docs/manual/reference/server-sessions/, -// https://www.mongodb.com/docs/manual/core/read-isolation-consistency-recency/#causal-consistency, and -// https://www.mongodb.com/docs/manual/core/transactions/. -// -// Implementations of Session are not safe for concurrent use by multiple goroutines. -type Session interface { - // StartTransaction starts a new transaction, configured with the given options, on this - // session. This method returns an error if there is already a transaction in-progress for this - // session. - StartTransaction(...*options.TransactionOptions) error - - // AbortTransaction aborts the active transaction for this session. This method returns an error - // if there is no active transaction for this session or if the transaction has been committed - // or aborted. - AbortTransaction(context.Context) error - - // CommitTransaction commits the active transaction for this session. This method returns an - // error if there is no active transaction for this session or if the transaction has been - // aborted. - CommitTransaction(context.Context) error - - // WithTransaction starts a transaction on this session and runs the fn callback. Errors with - // the TransientTransactionError and UnknownTransactionCommitResult labels are retried for up to - // 120 seconds. Inside the callback, the SessionContext must be used as the Context parameter - // for any operations that should be part of the transaction. If the ctx parameter already has a - // Session attached to it, it will be replaced by this session. The fn callback may be run - // multiple times during WithTransaction due to retry attempts, so it must be idempotent. - // Non-retryable operation errors or any operation errors that occur after the timeout expires - // will be returned without retrying. If the callback fails, the driver will call - // AbortTransaction. Because this method must succeed to ensure that server-side resources are - // properly cleaned up, context deadlines and cancellations will not be respected during this - // call. For a usage example, see the Client.StartSession method documentation. - WithTransaction(ctx context.Context, fn func(ctx SessionContext) (interface{}, error), - opts ...*options.TransactionOptions) (interface{}, error) - - // EndSession aborts any existing transactions and close the session. - EndSession(context.Context) - - // ClusterTime returns the current cluster time document associated with the session. - ClusterTime() bson.Raw - - // OperationTime returns the current operation time document associated with the session. - OperationTime() *bson.Timestamp - - // Client the Client associated with the session. - Client() *Client - - // ID returns the current ID document associated with the session. The ID document is in the - // form {"id": }. - ID() bson.Raw - - // AdvanceClusterTime advances the cluster time for a session. This method returns an error if - // the session has ended. - AdvanceClusterTime(bson.Raw) error - - // AdvanceOperationTime advances the operation time for a session. This method returns an error - // if the session has ended. - AdvanceOperationTime(*bson.Timestamp) error - - session() -} - -// XSession is an unstable interface for internal use only. +// ClientSession returns the experimental client session. // -// Deprecated: This interface is unstable because it provides access to a session.Client object, which exists in the -// "x" package. It should not be used by applications and may be changed or removed in any release. -type XSession interface { - ClientSession() *session.Client -} - -// sessionImpl represents a set of sequential operations executed by an application that are related in some way. -type sessionImpl struct { - clientSession *session.Client - client *Client - deployment driver.Deployment - didCommitAfterStart bool // true if commit was called after start with no other operations -} - -var _ Session = &sessionImpl{} -var _ XSession = &sessionImpl{} - -// ClientSession implements the XSession interface. -func (s *sessionImpl) ClientSession() *session.Client { +// Deprecated: This method is for internal use only and should not be used (see +// GODRIVER-2700). It may be changed or removed in any release. +func (s *Session) ClientSession() *session.Client { return s.clientSession } -// ID implements the Session interface. -func (s *sessionImpl) ID() bson.Raw { +// ID returns the current ID document associated with the session. The ID +// document is in the form {"id": }. +func (s *Session) ID() bson.Raw { return bson.Raw(s.clientSession.SessionID) } -// EndSession implements the Session interface. -func (s *sessionImpl) EndSession(ctx context.Context) { +// EndSession aborts any existing transactions and close the session. +func (s *Session) EndSession(ctx context.Context) { if s.clientSession.TransactionInProgress() { // ignore all errors aborting during an end session _ = s.AbortTransaction(ctx) @@ -177,8 +97,21 @@ func (s *sessionImpl) EndSession(ctx context.Context) { s.clientSession.EndSession() } -// WithTransaction implements the Session interface. -func (s *sessionImpl) WithTransaction(ctx context.Context, fn func(ctx SessionContext) (interface{}, error), +// WithTransaction starts a transaction on this session and runs the fn +// callback. Errors with the TransientTransactionError and +// UnknownTransactionCommitResult labels are retried for up to 120 seconds. +// Inside the callback, the SessionContext must be used as the Context parameter +// for any operations that should be part of the transaction. If the ctx +// parameter already has a Session attached to it, it will be replaced by this +// session. The fn callback may be run multiple times during WithTransaction due +// to retry attempts, so it must be idempotent. Non-retryable operation errors +// or any operation errors that occur after the timeout expires will be returned +// without retrying. If the callback fails, the driver will call +// AbortTransaction. Because this method must succeed to ensure that server-side +// resources are properly cleaned up, context deadlines and cancellations will +// not be respected during this call. For a usage example, see the +// Client.StartSession method documentation. +func (s *Session) WithTransaction(ctx context.Context, fn func(ctx context.Context) (interface{}, error), opts ...*options.TransactionOptions) (interface{}, error) { timeout := time.NewTimer(withTransactionTimeout) defer timeout.Stop() @@ -257,8 +190,9 @@ func (s *sessionImpl) WithTransaction(ctx context.Context, fn func(ctx SessionCo } } -// StartTransaction implements the Session interface. -func (s *sessionImpl) StartTransaction(opts ...*options.TransactionOptions) error { +// StartTransaction starts a new transaction. This method returns an error if +// there is already a transaction in-progress for this session. +func (s *Session) StartTransaction(opts ...*options.TransactionOptions) error { err := s.clientSession.CheckStartTransaction() if err != nil { return err @@ -294,8 +228,10 @@ func (s *sessionImpl) StartTransaction(opts ...*options.TransactionOptions) erro return s.clientSession.StartTransaction(coreOpts) } -// AbortTransaction implements the Session interface. -func (s *sessionImpl) AbortTransaction(ctx context.Context) error { +// AbortTransaction aborts the active transaction for this session. This method +// returns an error if there is no active transaction for this session or if the +// transaction has been committed or aborted. +func (s *Session) AbortTransaction(ctx context.Context) error { err := s.clientSession.CheckAbortTransaction() if err != nil { return err @@ -320,8 +256,10 @@ func (s *sessionImpl) AbortTransaction(ctx context.Context) error { return nil } -// CommitTransaction implements the Session interface. -func (s *sessionImpl) CommitTransaction(ctx context.Context) error { +// CommitTransaction commits the active transaction for this session. This +// method returns an error if there is no active transaction for this session or +// if the transaction has been aborted. +func (s *Session) CommitTransaction(ctx context.Context) error { err := s.clientSession.CheckCommitTransaction() if err != nil { return err @@ -364,40 +302,39 @@ func (s *sessionImpl) CommitTransaction(ctx context.Context) error { return commitErr } -// ClusterTime implements the Session interface. -func (s *sessionImpl) ClusterTime() bson.Raw { +// ClusterTime returns the current cluster time document associated with the +// session. +func (s *Session) ClusterTime() bson.Raw { return s.clientSession.ClusterTime } -// AdvanceClusterTime implements the Session interface. -func (s *sessionImpl) AdvanceClusterTime(d bson.Raw) error { +// AdvanceClusterTime advances the cluster time for a session. This method +// returns an error if the session has ended. +func (s *Session) AdvanceClusterTime(d bson.Raw) error { return s.clientSession.AdvanceClusterTime(d) } -// OperationTime implements the Session interface. -func (s *sessionImpl) OperationTime() *bson.Timestamp { +// OperationTime returns the current operation time document associated with the +// session. +func (s *Session) OperationTime() *bson.Timestamp { return s.clientSession.OperationTime } -// AdvanceOperationTime implements the Session interface. -func (s *sessionImpl) AdvanceOperationTime(ts *bson.Timestamp) error { +// AdvanceOperationTime advances the operation time for a session. This method +// returns an error if the session has ended. +func (s *Session) AdvanceOperationTime(ts *bson.Timestamp) error { return s.clientSession.AdvanceOperationTime(ts) } -// Client implements the Session interface. -func (s *sessionImpl) Client() *Client { +// Client is the Client associated with the session. +func (s *Session) Client() *Client { return s.client } -// session implements the Session interface. -func (*sessionImpl) session() { -} - // sessionFromContext checks for a sessionImpl in the argued context and returns the session if it // exists func sessionFromContext(ctx context.Context) *session.Client { - s := ctx.Value(sessionKey{}) - if ses, ok := s.(*sessionImpl); ses != nil && ok { + if ses := SessionFromContext(ctx); ses != nil { return ses.clientSession } diff --git a/mongo/with_transactions_test.go b/mongo/with_transactions_test.go index f65ba7b4f1..30835fc5e9 100644 --- a/mongo/with_transactions_test.go +++ b/mongo/with_transactions_test.go @@ -76,7 +76,7 @@ func TestConvenientTransactions(t *testing.T) { defer sess.EndSession(context.Background()) testErr := errors.New("test error") - _, err = sess.WithTransaction(context.Background(), func(SessionContext) (interface{}, error) { + _, err = sess.WithTransaction(context.Background(), func(context.Context) (interface{}, error) { return nil, testErr }) assert.Equal(t, testErr, err, "expected error %v, got %v", testErr, err) @@ -90,7 +90,7 @@ func TestConvenientTransactions(t *testing.T) { assert.Nil(t, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - res, err := sess.WithTransaction(context.Background(), func(SessionContext) (interface{}, error) { + res, err := sess.WithTransaction(context.Background(), func(context.Context) (interface{}, error) { return false, nil }) assert.Nil(t, err, "WithTransaction error: %v", err) @@ -110,7 +110,7 @@ func TestConvenientTransactions(t *testing.T) { assert.Nil(t, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - _, err = sess.WithTransaction(context.Background(), func(SessionContext) (interface{}, error) { + _, err = sess.WithTransaction(context.Background(), func(context.Context) (interface{}, error) { return nil, CommandError{Name: "test Error", Labels: []string{driver.TransientTransactionError}} }) assert.NotNil(t, err, "expected WithTransaction error, got nil") @@ -142,7 +142,7 @@ func TestConvenientTransactions(t *testing.T) { assert.Nil(t, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - _, err = sess.WithTransaction(context.Background(), func(ctx SessionContext) (interface{}, error) { + _, err = sess.WithTransaction(context.Background(), func(ctx context.Context) (interface{}, error) { _, err := coll.InsertOne(ctx, bson.D{{"x", 1}}) return nil, err }) @@ -175,7 +175,7 @@ func TestConvenientTransactions(t *testing.T) { assert.Nil(t, err, "StartSession error: %v", err) defer sess.EndSession(context.Background()) - _, err = sess.WithTransaction(context.Background(), func(ctx SessionContext) (interface{}, error) { + _, err = sess.WithTransaction(context.Background(), func(ctx context.Context) (interface{}, error) { _, err := coll.InsertOne(ctx, bson.D{{"x", 1}}) return nil, err }) @@ -240,8 +240,8 @@ func TestConvenientTransactions(t *testing.T) { // insert succeeds, it cancels the Context created above and returns a non-retryable error, which forces // WithTransaction to abort the txn. callbackErr := errors.New("error") - callback := func(sc SessionContext) (interface{}, error) { - _, err = coll.InsertOne(sc, bson.D{{"x", 1}}) + callback := func(ctx context.Context) (interface{}, error) { + _, err = coll.InsertOne(ctx, bson.D{{"x", 1}}) if err != nil { return nil, err } @@ -306,17 +306,17 @@ func TestConvenientTransactions(t *testing.T) { defer session.EndSession(bgCtx) assert.Nil(t, err, "StartSession error: %v", err) - _ = WithSession(bgCtx, session, func(sessionContext SessionContext) error { + _ = WithSession(bgCtx, session, func(ctx context.Context) error { // Start transaction. err = session.StartTransaction() assert.Nil(t, err, "StartTransaction error: %v", err) // Insert a document. - _, err := coll.InsertOne(sessionContext, bson.D{{"val", 17}}) + _, err := coll.InsertOne(ctx, bson.D{{"val", 17}}) assert.Nil(t, err, "InsertOne error: %v", err) // Set a timeout of 0 for commitTransaction. - commitTimeoutCtx, commitCancel := context.WithTimeout(sessionContext, 0) + commitTimeoutCtx, commitCancel := context.WithTimeout(ctx, 0) defer commitCancel() // CommitTransaction results in context.DeadlineExceeded. @@ -325,7 +325,7 @@ func TestConvenientTransactions(t *testing.T) { "expected timeout error error; got %v", commitErr) // Assert session state is not Committed. - clientSession := session.(XSession).ClientSession() + clientSession := session.ClientSession() assert.False(t, clientSession.TransactionCommitted(), "expected session state to not be Committed") // AbortTransaction without error. @@ -402,7 +402,7 @@ func TestConvenientTransactions(t *testing.T) { callback := func(ctx context.Context) { transactionCtx, cancel := context.WithCancel(ctx) - _, _ = sess.WithTransaction(transactionCtx, func(ctx SessionContext) (interface{}, error) { + _, _ = sess.WithTransaction(transactionCtx, func(ctx context.Context) (interface{}, error) { _, err := coll.InsertOne(ctx, bson.M{"x": 1}) assert.Nil(t, err, "InsertOne error: %v", err) cancel() @@ -426,7 +426,7 @@ func TestConvenientTransactions(t *testing.T) { // returnError tracks whether or not the callback is being retried returnError := true - res, err := sess.WithTransaction(context.Background(), func(SessionContext) (interface{}, error) { + res, err := sess.WithTransaction(context.Background(), func(context.Context) (interface{}, error) { if returnError { returnError = false return nil, fmt.Errorf("%w", @@ -464,7 +464,7 @@ func TestConvenientTransactions(t *testing.T) { withTransactionContext, cancel := context.WithTimeout(ctx, time.Nanosecond) defer cancel() - _, _ = sess.WithTransaction(withTransactionContext, func(ctx SessionContext) (interface{}, error) { + _, _ = sess.WithTransaction(withTransactionContext, func(ctx context.Context) (interface{}, error) { _, err := coll.InsertOne(ctx, bson.D{{}}) return nil, err }) @@ -494,7 +494,7 @@ func TestConvenientTransactions(t *testing.T) { withTransactionContext, cancel := context.WithTimeout(ctx, 2*time.Second) cancel() - _, _ = sess.WithTransaction(withTransactionContext, func(ctx SessionContext) (interface{}, error) { + _, _ = sess.WithTransaction(withTransactionContext, func(ctx context.Context) (interface{}, error) { _, err := coll.InsertOne(ctx, bson.D{{}}) return nil, err }) @@ -541,7 +541,7 @@ func TestConvenientTransactions(t *testing.T) { defer sess.EndSession(context.Background()) callback := func(ctx context.Context) { - _, err = sess.WithTransaction(ctx, func(ctx SessionContext) (interface{}, error) { + _, err = sess.WithTransaction(ctx, func(ctx context.Context) (interface{}, error) { // Set a timeout of 300ms to cause a timeout on first insertOne // and force a retry. c, cancel := context.WithTimeout(ctx, 300*time.Millisecond) diff --git a/mongo/writeconcern/writeconcern.go b/mongo/writeconcern/writeconcern.go index 59695a3458..2e4d2ade16 100644 --- a/mongo/writeconcern/writeconcern.go +++ b/mongo/writeconcern/writeconcern.go @@ -51,7 +51,7 @@ var ErrNegativeWTimeout = errors.New("write concern `wtimeout` field cannot be n type WriteConcern struct { // W requests acknowledgment that the write operation has propagated to a // specified number of mongod instances or to mongod instances with - // specified tags. It sets the the "w" option in a MongoDB write concern. + // specified tags. It sets the "w" option in a MongoDB write concern. // // W values must be a string or an int. // diff --git a/x/mongo/driver/auth/speculative_x509_test.go b/x/mongo/driver/auth/speculative_x509_test.go index 6ec2b8ea64..4c74f921fc 100644 --- a/x/mongo/driver/auth/speculative_x509_test.go +++ b/x/mongo/driver/auth/speculative_x509_test.go @@ -130,7 +130,7 @@ func createSpeculativeX509Handshake() []bsoncore.Document { return []bsoncore.Document{hello} } -// createSpeculativeX509Handshake creates the server replies for a handshake + X509 authentication attempt. +// createRegularX509Handshake creates the server replies for a handshake + X509 authentication attempt. // There are two replies: // // 1. hello reply diff --git a/x/mongo/driver/operation.go b/x/mongo/driver/operation.go index cef93b1533..432692295f 100644 --- a/x/mongo/driver/operation.go +++ b/x/mongo/driver/operation.go @@ -599,7 +599,7 @@ func (op Operation) Execute(ctx context.Context) error { } }() for { - // If we're starting a retry and the the error from the previous try was + // If we're starting a retry and the error from the previous try was // a context canceled or deadline exceeded error, stop retrying and // return that error. if errors.Is(prevErr, context.Canceled) || errors.Is(prevErr, context.DeadlineExceeded) { diff --git a/x/mongo/driver/operation_test.go b/x/mongo/driver/operation_test.go index 964a23f6c3..87c629c662 100644 --- a/x/mongo/driver/operation_test.go +++ b/x/mongo/driver/operation_test.go @@ -79,7 +79,7 @@ func TestOperation(t *testing.T) { _, err := op.selectServer(context.Background(), 1, nil) noerr(t, err) - // Assert the the selector is an operation selector wrapper. + // Assert the selector is an operation selector wrapper. oss, ok := d.params.selector.(*opServerSelector) require.True(t, ok) diff --git a/x/mongo/driver/topology/polling_srv_records_test.go b/x/mongo/driver/topology/polling_srv_records_test.go index 99161f0ce0..df0704afc6 100644 --- a/x/mongo/driver/topology/polling_srv_records_test.go +++ b/x/mongo/driver/topology/polling_srv_records_test.go @@ -309,7 +309,7 @@ func TestPollingSRVRecordsLoadBalanced(t *testing.T) { func TestPollSRVRecordsMaxHosts(t *testing.T) { // simulateSRVPoll creates a topology with srvMaxHosts, mocks the DNS changes described by - // recordsToAdd and recordsToRemove, and returns the the topology. + // recordsToAdd and recordsToRemove, and returns the topology. simulateSRVPoll := func(srvMaxHosts int, recordsToAdd []*net.SRV, recordsToRemove []*net.SRV) (*Topology, func(ctx context.Context) error) { t.Helper() diff --git a/x/mongo/driver/wiremessage/wiremessage.go b/x/mongo/driver/wiremessage/wiremessage.go index abf09c15bd..a13dda4e89 100644 --- a/x/mongo/driver/wiremessage/wiremessage.go +++ b/x/mongo/driver/wiremessage/wiremessage.go @@ -486,7 +486,7 @@ func ReadReplyCursorID(src []byte) (cursorID int64, rem []byte, ok bool) { return readi64(src) } -// ReadReplyStartingFrom reads the starting from from src. +// ReadReplyStartingFrom reads the starting from src. func ReadReplyStartingFrom(src []byte) (startingFrom int32, rem []byte, ok bool) { return readi32(src) }