Skip to content

Commit

Permalink
GODRIVER-2828 Use topology version from Server instead of Connection …
Browse files Browse the repository at this point in the history
…in ProcessError. (#1252)

Co-authored-by: Preston Vasquez <prestonvasquez@icloud.com>
  • Loading branch information
matthewdale and prestonvasquez committed Jun 5, 2023
1 parent b1ab40e commit d797822
Show file tree
Hide file tree
Showing 3 changed files with 439 additions and 181 deletions.
6 changes: 0 additions & 6 deletions x/mongo/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,6 @@ const (
ConnectionPoolCleared
)

// ServerChanged returns true if the ProcessErrorResult indicates that the server changed from an SDAM perspective
// during a ProcessError() call.
func (p ProcessErrorResult) ServerChanged() bool {
return p != NoChange
}

// ErrorProcessor implementations can handle processing errors, which may modify their internal state.
// If this type is implemented by a Server, then Operation.Execute will call it's ProcessError
// method after it decodes a wire message.
Expand Down
62 changes: 48 additions & 14 deletions x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

const minHeartbeatInterval = 500 * time.Millisecond
const wireVersion42 = 8 // Wire version for MongoDB 4.2

// Server state constants.
const (
Expand Down Expand Up @@ -294,6 +295,8 @@ func (s *Server) ProcessHandshakeError(err error, startingGenerationNumber uint6
return
}

// Unwrap any connection errors. If there is no wrapped connection error, then the error should
// not result in any Server state change (e.g. a command error from the database).
wrappedConnErr := unwrapConnectionError(err)
if wrappedConnErr == nil {
return
Expand Down Expand Up @@ -384,27 +387,58 @@ func getWriteConcernErrorForProcessing(err error) (*driver.WriteConcernError, bo

// ProcessError handles SDAM error handling and implements driver.ErrorProcessor.
func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessErrorResult {
// ignore nil error
// Ignore nil errors.
if err == nil {
return driver.NoChange
}

// Ignore errors from stale connections because the error came from a previous generation of the
// connection pool. The root cause of the error has aleady been handled, which is what caused
// the pool generation to increment. Processing errors for stale connections could result in
// handling the same error root cause multiple times (e.g. a temporary network interrupt causing
// all connections to the same server to return errors).
if conn.Stale() {
return driver.NoChange
}

// Must hold the processErrorLock while updating the server description and clearing the pool.
// Not holding the lock leads to possible out-of-order processing of pool.clear() and
// pool.ready() calls from concurrent server description updates.
s.processErrorLock.Lock()
defer s.processErrorLock.Unlock()

// ignore stale error
if conn.Stale() {
return driver.NoChange
// Get the wire version and service ID from the connection description because they will never
// change for the lifetime of a connection and can possibly be different between connections to
// the same server.
connDesc := conn.Description()
wireVersion := connDesc.WireVersion
serviceID := connDesc.ServiceID

// Get the topology version from the Server description because the Server description is
// updated by heartbeats and errors, so typically has a more up-to-date topology version.
serverDesc := s.desc.Load().(description.Server)
topologyVersion := serverDesc.TopologyVersion

// We don't currently update the Server topology version when we create new application
// connections, so it's possible for a connection's topology version to be newer than the
// Server's topology version. Pick the "newest" of the two topology versions.
// Technically a nil topology version on a new database response should be considered a new
// topology version and replace the Server's topology version. However, we don't know if the
// connection's topology version is based on a new or old database response, so we ignore a nil
// topology version on the connection for now.
//
// TODO(GODRIVER-2841): Remove this logic once we set the Server description when we create
// TODO application connections because then the Server's topology version will always be the
// TODO latest known.
if tv := connDesc.TopologyVersion; tv != nil && topologyVersion.CompareToIncoming(tv) < 0 {
topologyVersion = tv
}

// Invalidate server description if not primary or node recovering error occurs.
// These errors can be reported as a command error or a write concern error.
desc := conn.Description()
if cerr, ok := err.(driver.Error); ok && (cerr.NodeIsRecovering() || cerr.NotPrimary()) {
// ignore stale error
if desc.TopologyVersion.CompareToIncoming(cerr.TopologyVersion) >= 0 {
// Ignore errors that came from when the database was on a previous topology version.
if topologyVersion.CompareToIncoming(cerr.TopologyVersion) >= 0 {
return driver.NoChange
}

Expand All @@ -414,16 +448,16 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE

res := driver.ServerMarkedUnknown
// If the node is shutting down or is older than 4.2, we synchronously clear the pool
if cerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 {
if cerr.NodeIsShuttingDown() || wireVersion == nil || wireVersion.Max < wireVersion42 {
res = driver.ConnectionPoolCleared
s.pool.clear(err, desc.ServiceID)
s.pool.clear(err, serviceID)
}

return res
}
if wcerr, ok := getWriteConcernErrorForProcessing(err); ok {
// ignore stale error
if desc.TopologyVersion.CompareToIncoming(wcerr.TopologyVersion) >= 0 {
// Ignore errors that came from when the database was on a previous topology version.
if topologyVersion.CompareToIncoming(wcerr.TopologyVersion) >= 0 {
return driver.NoChange
}

Expand All @@ -433,9 +467,9 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE

res := driver.ServerMarkedUnknown
// If the node is shutting down or is older than 4.2, we synchronously clear the pool
if wcerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 {
if wcerr.NodeIsShuttingDown() || wireVersion == nil || wireVersion.Max < wireVersion42 {
res = driver.ConnectionPoolCleared
s.pool.clear(err, desc.ServiceID)
s.pool.clear(err, serviceID)
}
return res
}
Expand All @@ -457,7 +491,7 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE
// monitoring check. The check is cancelled last to avoid a post-cancellation reconnect racing with
// updateDescription.
s.updateDescription(description.NewServerFromError(s.address, err, nil))
s.pool.clear(err, desc.ServiceID)
s.pool.clear(err, serviceID)
s.cancelCheck()
return driver.ConnectionPoolCleared
}
Expand Down
Loading

0 comments on commit d797822

Please sign in to comment.