Skip to content

Commit

Permalink
GODRIVER-1726 Fix SDAM error handling for write concern errors (#489)
Browse files Browse the repository at this point in the history
  • Loading branch information
Divjot Arora committed Aug 25, 2020
1 parent 7969412 commit 5b5e6c9
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 14 deletions.
107 changes: 101 additions & 6 deletions mongo/integration/sdam_error_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package integration

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -19,16 +20,18 @@ import (

func TestSDAMErrorHandling(t *testing.T) {
mt := mtest.New(t, noClientOpts)
clientOpts := options.Client().
ApplyURI(mt.ConnString()).
SetRetryWrites(false).
SetPoolMonitor(poolMonitor).
SetWriteConcern(mtest.MajorityWc)
baseClientOpts := func() *options.ClientOptions {
return options.Client().
ApplyURI(mt.ConnString()).
SetRetryWrites(false).
SetPoolMonitor(poolMonitor).
SetWriteConcern(mtest.MajorityWc)
}
baseMtOpts := func() *mtest.Options {
mtOpts := mtest.NewOptions().
Topologies(mtest.ReplicaSet). // Don't run on sharded clusters to avoid complexity of sharded failpoints.
MinServerVersion("4.0"). // 4.0+ is required to use failpoints on replica sets.
ClientOptions(clientOpts)
ClientOptions(baseClientOpts())

if mt.TopologyKind() == mtest.Sharded {
// Pin to a single mongos because the tests use failpoints.
Expand Down Expand Up @@ -173,5 +176,97 @@ func TestSDAMErrorHandling(t *testing.T) {
assert.False(mt, isPoolCleared(), "expected pool to not be cleared but was")
})
})
mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
// Integration tests for the SDAM error handling code path for errors in server response documents. These
// errors can be part of the top-level document in ok:0 responses or in a nested writeConcernError document.

// On 4.4, some state change errors include a topologyVersion field. Because we're triggering these errors
// via failCommand, the topologyVersion does not actually change as it would in an actual state change.
// This causes the SDAM error handling code path to think we've already handled this state change and
// ignore the error because it's stale. To avoid this altogether, we cap the test to <= 4.2.
serverErrorsMtOpts := baseMtOpts().
MinServerVersion("4.0"). // failCommand support
MaxServerVersion("4.2").
ClientOptions(baseClientOpts().SetRetryWrites(false))

testCases := []struct {
name string
errorCode int32

// For shutdown errors, the pool is always cleared. For non-shutdown errors, the pool is only cleared
// for pre-4.2 servers.
isShutdownError bool
}{
// "node is recovering" errors
{"InterruptedAtShutdown", 11600, true},
{"InterruptedDueToReplStateChange, not shutdown", 11602, false},
{"NotMasterOrSecondary", 13436, false},
{"PrimarySteppedDown", 189, false},
{"ShutdownInProgress", 91, true},

// "not master" errors
{"NotMaster", 10107, false},
{"NotMasterNoSlaveOk", 13435, false},
}
for _, tc := range testCases {
mt.RunOpts(fmt.Sprintf("command error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
clearPoolChan()

// Cause the next insert to fail with an ok:0 response.
fp := mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
ErrorCode: tc.errorCode,
},
}
mt.SetFailPoint(fp)

runServerErrorsTest(mt, tc.isShutdownError)
})
mt.RunOpts(fmt.Sprintf("write concern error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
clearPoolChan()

// Cause the next insert to fail with a write concern error.
fp := mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
WriteConcernError: &mtest.WriteConcernErrorData{
Code: tc.errorCode,
},
},
}
mt.SetFailPoint(fp)

runServerErrorsTest(mt, tc.isShutdownError)
})
}
})
})
}

func runServerErrorsTest(mt *mtest.T, isShutdownError bool) {
mt.Helper()

_, err := mt.Coll.InsertOne(mtest.Background, bson.D{{"x", 1}})
assert.NotNil(mt, err, "expected InsertOne error, got nil")

// The pool should always be cleared for shutdown errors, regardless of server version.
if isShutdownError {
assert.True(mt, isPoolCleared(), "expected pool to be cleared, but was not")
return
}

// For non-shutdown errors, the pool is only cleared if the error is from a pre-4.2 server.
wantCleared := mtest.CompareServerVersions(mt.ServerVersion(), "4.2") < 0
gotCleared := isPoolCleared()
assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %v; pool was cleared: %v",
wantCleared, gotCleared)
}
18 changes: 17 additions & 1 deletion x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,22 @@ func (s *Server) RequestImmediateCheck() {
}
}

// getWriteConcernErrorForProcessing extracts a driver.WriteConcernError from the provided error. This function returns
// (error, true) if the error is a WriteConcernError and the falls under the requirements for SDAM error
// handling and (nil, false) otherwise.
func getWriteConcernErrorForProcessing(err error) (*driver.WriteConcernError, bool) {
writeCmdErr, ok := err.(driver.WriteCommandError)
if !ok {
return nil, false
}

wcerr := writeCmdErr.WriteConcernError
if wcerr != nil && (wcerr.NodeIsRecovering() || wcerr.NotMaster()) {
return wcerr, true
}
return nil, false
}

// ProcessError handles SDAM error handling and implements driver.ErrorProcessor.
func (s *Server) ProcessError(err error, conn driver.Connection) {
// ignore nil error
Expand Down Expand Up @@ -365,7 +381,7 @@ func (s *Server) ProcessError(err error, conn driver.Connection) {
}
return
}
if wcerr, ok := err.(driver.WriteConcernError); ok && (wcerr.NodeIsRecovering() || wcerr.NotMaster()) {
if wcerr, ok := getWriteConcernErrorForProcessing(err); ok {
// ignore stale error
if description.CompareTopologyVersion(desc.TopologyVersion, wcerr.TopologyVersion) >= 0 {
return
Expand Down
16 changes: 9 additions & 7 deletions x/mongo/driver/topology/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,15 @@ func TestServer(t *testing.T) {
s.connectionstate = connected
s.pool.connected = connected

wce := driver.WriteConcernError{
Name: "",
Code: 10107,
Message: "not master",
Details: []byte{},
Labels: []string{},
TopologyVersion: nil,
wce := driver.WriteCommandError{
WriteConcernError: &driver.WriteConcernError{
Name: "",
Code: 10107,
Message: "not master",
Details: []byte{},
Labels: []string{},
TopologyVersion: nil,
},
}
s.ProcessError(wce, initConnection{})

Expand Down

0 comments on commit 5b5e6c9

Please sign in to comment.