/
driver.go
180 lines (150 loc) · 6.88 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package driver // import "go.mongodb.org/mongo-driver/x/mongo/driver"
import (
"context"
"go.mongodb.org/mongo-driver/x/mongo/driver/address"
"go.mongodb.org/mongo-driver/x/mongo/driver/description"
)
// Deployment is implemented by types that can select a server from a deployment.
type Deployment interface {
SelectServer(context.Context, description.ServerSelector) (Server, error)
Kind() description.TopologyKind
}
// Connector represents a type that can connect to a server.
type Connector interface {
Connect() error
}
// Disconnector represents a type that can disconnect from a server.
type Disconnector interface {
Disconnect(context.Context) error
}
// Subscription represents a subscription to topology updates. A subscriber can receive updates through the
// Updates field.
type Subscription struct {
Updates <-chan description.Topology
ID uint64
}
// Subscriber represents a type to which another type can subscribe. A subscription contains a channel that
// is updated with topology descriptions.
type Subscriber interface {
Subscribe() (*Subscription, error)
Unsubscribe(*Subscription) error
}
// Server represents a MongoDB server. Implementations should pool connections and handle the
// retrieving and returning of connections.
type Server interface {
Connection(context.Context) (Connection, error)
}
// Connection represents a connection to a MongoDB server.
type Connection interface {
WriteWireMessage(context.Context, []byte) error
ReadWireMessage(ctx context.Context, dst []byte) ([]byte, error)
Description() description.Server
Close() error
ID() string
Address() address.Address
Stale() bool
}
// LocalAddresser is a type that is able to supply its local address
type LocalAddresser interface {
LocalAddress() address.Address
}
// Expirable represents an expirable object.
type Expirable interface {
Expire() error
Alive() bool
}
// StreamerConnection represents a Connection that supports streaming wire protocol messages using the moreToCome and
// exhaustAllowed flags.
//
// The SetStreaming and CurrentlyStreaming functions correspond to the moreToCome flag on server responses. If a
// response has moreToCome set, SetStreaming(true) will be called and CurrentlyStreaming() should return true.
//
// CanStream corresponds to the exhaustAllowed flag. The operations layer will set exhaustAllowed on outgoing wire
// messages to inform the server that the driver supports streaming.
type StreamerConnection interface {
Connection
SetStreaming(bool)
CurrentlyStreaming() bool
SupportsStreaming() bool
}
// Compressor is an interface used to compress wire messages. If a Connection supports compression
// it should implement this interface as well. The CompressWireMessage method will be called during
// the execution of an operation if the wire message is allowed to be compressed.
type Compressor interface {
CompressWireMessage(src, dst []byte) ([]byte, error)
}
// ErrorProcessor implementations can handle processing errors, which may modify their internal state.
// If this type is implemented by a Server, then Operation.Execute will call it's ProcessError
// method after it decodes a wire message.
type ErrorProcessor interface {
ProcessError(err error, conn Connection)
}
// Handshaker is the interface implemented by types that can perform a MongoDB
// handshake over a provided driver.Connection. This is used during connection
// initialization. Implementations must be goroutine safe.
type Handshaker interface {
GetDescription(context.Context, address.Address, Connection) (description.Server, error)
FinishHandshake(context.Context, Connection) error
}
// SingleServerDeployment is an implementation of Deployment that always returns a single server.
type SingleServerDeployment struct{ Server }
var _ Deployment = SingleServerDeployment{}
// SelectServer implements the Deployment interface. This method does not use the
// description.SelectedServer provided and instead returns the embedded Server.
func (ssd SingleServerDeployment) SelectServer(context.Context, description.ServerSelector) (Server, error) {
return ssd.Server, nil
}
// Kind implements the Deployment interface. It always returns description.Single.
func (SingleServerDeployment) Kind() description.TopologyKind { return description.Single }
// SingleConnectionDeployment is an implementation of Deployment that always returns the same Connection. This
// implementation should only be used for connection handshakes and server heartbeats as it does not implement
// ErrorProcessor, which is necessary for application operations and wraps the connection in nopCloserConnection,
// which does not implement Compressor.
type SingleConnectionDeployment struct{ C Connection }
var _ Deployment = SingleConnectionDeployment{}
var _ Server = SingleConnectionDeployment{}
// SelectServer implements the Deployment interface. This method does not use the
// description.SelectedServer provided and instead returns itself. The Connections returned from the
// Connection method have a no-op Close method.
func (ssd SingleConnectionDeployment) SelectServer(context.Context, description.ServerSelector) (Server, error) {
return ssd, nil
}
// Kind implements the Deployment interface. It always returns description.Single.
func (ssd SingleConnectionDeployment) Kind() description.TopologyKind { return description.Single }
// Connection implements the Server interface. It always returns the embedded connection.
func (ssd SingleConnectionDeployment) Connection(context.Context) (Connection, error) {
return ssd.C, nil
}
// TODO(GODRIVER-617): We can likely use 1 type for both the Type and the RetryMode by using
// 2 bits for the mode and 1 bit for the type. Although in the practical sense, we might not want to
// do that since the type of retryability is tied to the operation itself and isn't going change,
// e.g. and insert operation will always be a write, however some operations are both reads and
// writes, for instance aggregate is a read but with a $out parameter it's a write.
// Type specifies whether an operation is a read, write, or unknown.
type Type uint
// THese are the availables types of Type.
const (
_ Type = iota
Write
Read
)
// RetryMode specifies the way that retries are handled for retryable operations.
type RetryMode uint
// These are the modes available for retrying.
const (
// RetryNone disables retrying.
RetryNone RetryMode = iota
// RetryOnce will enable retrying the entire operation once.
RetryOnce
// RetryOncePerCommand will enable retrying each command associated with an operation. For
// example, if an insert is batch split into 4 commands then each of those commands is eligible
// for one retry.
RetryOncePerCommand
// RetryContext will enable retrying until the context.Context's deadline is exceeded or it is
// cancelled.
RetryContext
)
// Enabled returns if this RetryMode enables retrying.
func (rm RetryMode) Enabled() bool {
return rm == RetryOnce || rm == RetryOncePerCommand || rm == RetryContext
}