-
Notifications
You must be signed in to change notification settings - Fork 525
/
repl_set_connection.rb
450 lines (385 loc) · 14.1 KB
/
repl_set_connection.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# encoding: UTF-8
# --
# Copyright (C) 2008-2011 10gen Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ++
require 'sync'
module Mongo
# Instantiates and manages connections to a MongoDB replica set.
class ReplSetConnection < Connection
attr_reader :nodes, :secondaries, :arbiters, :secondary_pools,
:replica_set_name, :read_pool, :seeds, :tags_to_pools, :refresh_interval, :auto_refresh
# Create a connection to a MongoDB replica set.
#
# Once connected to a replica set, you can find out which nodes are primary, secondary, and
# arbiters with the corresponding accessors: Connection#primary, Connection#secondaries, and
# Connection#arbiters. This is useful if your application needs to connect manually to nodes other
# than the primary.
#
# @param [Array] args A list of host-port pairs to be used as seed nodes followed by a
# hash containing any options. See the examples below for exactly how to use the constructor.
#
# @option options [String] :rs_name (nil) The name of the replica set to connect to. You
# can use this option to verify that you're connecting to the right replica set.
# @option options [Boolean, Hash] :safe (false) Set the default safe-mode options
# propogated to DB objects instantiated off of this Connection. This
# default can be overridden upon instantiation of any DB by explicity setting a :safe value
# on initialization.
# @option options [:primary, :secondary] :read (:primary) The default read preference for Mongo::DB
# objects created from this connection object. If +:secondary+ is chosen, reads will be sent
# to one of the closest available secondary nodes. If a secondary node cannot be located, the
# read will be sent to the primary.
# @option options [Logger, #debug] :logger (nil) Logger instance to receive driver operation log.
# @option options [Integer] :pool_size (1) The maximum number of socket connections allowed per
# connection pool. Note: this setting is relevant only for multi-threaded applications.
# @option options [Float] :pool_timeout (5.0) When all of the connections a pool are checked out,
# this is the number of seconds to wait for a new connection to be released before throwing an exception.
# Note: this setting is relevant only for multi-threaded applications.
# @option opts [Float] :op_timeout (nil) The number of seconds to wait for a read operation to time out.
# Disabled by default.
# @option opts [Float] :connect_timeout (nil) The number of seconds to wait before timing out a
# connection attempt.
# @option opts [Boolean] :ssl (false) If true, create the connection to the server using SSL.
# @option opts [Boolean] :auto_refresh (false) Set this to true to enable a background thread that
# periodically updates the state of the connection. If, for example, you initially connect while a secondary
# is down, :auto_refresh will reconnect to that secondary behind the scenes to
# prevent you from having to reconnect manually.
# @option opts [Integer] :refresh_interval (90) If :auto_refresh is enabled, this is the number of seconds
# that the background thread will sleep between calls to check the replica set's state.
# @option opts [Boolean] :require_primary (true) If true, require a primary node for the connection
# to succeed. Otherwise, connection will succeed as long as there's at least one secondary.
#
# @example Connect to a replica set and provide two seed nodes. Note that the number of seed nodes does
# not have to be equal to the number of replica set members. The purpose of seed nodes is to permit
# the driver to find at least one replica set member even if a member is down.
# ReplSetConnection.new(['localhost', 30000], ['localhost', 30001])
#
# @example Connect to a replica set providing two seed nodes and ensuring a connection to the replica set named 'prod':
# ReplSetConnection.new(['localhost', 30000], ['localhost', 30001], :rs_name => 'prod')
#
# @example Connect to a replica set providing two seed nodes and allowing reads from a secondary node:
# ReplSetConnection.new(['localhost', 30000], ['localhost', 30001], :read_secondary => true)
#
# @see http://api.mongodb.org/ruby/current/file.REPLICA_SETS.html Replica sets in Ruby
#
# @raise [ReplicaSetConnectionError] This is raised if a replica set name is specified and the
# driver fails to connect to a replica set with that name.
def initialize(*args)
extend Sync_m
if args.last.is_a?(Hash)
opts = args.pop
else
opts = {}
end
unless args.length > 0
raise MongoArgumentError, "A ReplSetConnection requires at least one seed node."
end
# The list of seed nodes
@seeds = args
# TODO: get rid of this
@nodes = @seeds.dup
# The members of the replica set, stored as instances of Mongo::Node.
@members = []
# Connection pool for primary node
@primary = nil
@primary_pool = nil
# Connection pools for each secondary node
@secondaries = []
@secondary_pools = []
# The secondary pool to which we'll be sending reads.
# This may be identical to the primary pool.
@read_pool = nil
# A list of arbiter addresses (for client information only)
@arbiters = []
# Refresh
@auto_refresh = opts.fetch(:auto_refresh, false)
@refresh_interval = opts[:refresh_interval] || 90
# Are we allowing reads from secondaries?
if opts[:read_secondary]
warn ":read_secondary options has now been deprecated and will " +
"be removed in driver v2.0. Use the :read option instead."
@read_secondary = opts.fetch(:read_secondary, false)
@read = :secondary
else
@read = opts.fetch(:read, :primary)
Mongo::Support.validate_read_preference(@read)
end
@connected = false
# Store the refresher thread
@refresh_thread = nil
# Maps
@sockets_to_pools = {}
@tags_to_pools = {}
# Replica set name
if opts[:rs_name]
warn ":rs_name option has been deprecated and will be removed in v2.0. " +
"Please use :name instead."
@replica_set_name = opts[:rs_name]
else
@replica_set_name = opts[:name]
end
# Require a primary node to connect?
@require_primary = opts.fetch(:require_primary, false)
setup(opts)
end
def inspect
"<Mongo::ReplSetConnection:0x#{self.object_id.to_s(16)} @seeds=#{@seeds} " +
"@connected=#{@connected}>"
end
# Initiate a connection to the replica set.
def connect
log(:debug, "Connecting.")
sync_synchronize(:EX) do
return if @connected
manager = PoolManager.new(self, @seeds)
manager.connect
update_config(manager)
initiate_auto_refresh
if @require_primary && @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
raise ConnectionFailure, "Failed to connect to primary node."
elsif !@read_pool
raise ConnectionFailure, "Failed to connect to any node."
else
@connected = true
end
end
end
# Note: this method must be called from within
# an exclusive lock.
def update_config(manager)
@arbiters = manager.arbiters.nil? ? [] : manager.arbiters.dup
@primary = manager.primary.nil? ? nil : manager.primary.dup
@secondaries = manager.secondaries.dup
@hosts = manager.hosts.dup
@primary_pool = manager.primary_pool
@read_pool = manager.read_pool
@secondary_pools = manager.secondary_pools
@tags_to_pools = manager.tags_to_pools
@seeds = manager.seeds
@manager = manager
@nodes = manager.nodes
@max_bson_size = manager.max_bson_size
end
# Refresh the current replica set configuration.
def refresh(opts={})
return false if !connected?
# Return if another thread is already in the process of refreshing.
return if sync_exclusive?
sync_synchronize(:EX) do
log(:debug, "Refreshing...")
@background_manager ||= PoolManager.new(self, @seeds)
@background_manager.connect
update_config(@background_manager)
end
return true
end
def connected?
@primary_pool || @read_pool
end
# @deprecated
def connecting?
false
end
# The replica set primary's host name.
#
# @return [String]
def host
super
end
# The replica set primary's port.
#
# @return [Integer]
def port
super
end
def nodes
warn "DEPRECATED"
@seeds
end
# Determine whether we're reading from a primary node. If false,
# this connection connects to a secondary node and @read_secondaries is true.
#
# @return [Boolean]
def read_primary?
sync_synchronize(:SH) do
@read_pool == @primary_pool
end
end
alias :primary? :read_primary?
def read_preference
@read
end
# Close the connection to the database.
# TODO: we should get an exclusive lock here.
def close
sync_synchronize(:EX) do
@connected = false
super
if @refresh_thread
@refresh_thread.kill
@refresh_thread = nil
end
if @nodes
@nodes.each do |member|
member.close
end
end
@nodes = []
@read_pool = nil
if @secondary_pools
@secondary_pools.each do |pool|
pool.close
end
end
@secondaries = []
@secondary_pools = []
@arbiters = []
@tags_to_pools.clear
@sockets_to_pools.clear
end
end
# If a ConnectionFailure is raised, this method will be called
# to close the connection and reset connection values.
# @deprecated
def reset_connection
close
warn "ReplSetConnection#reset_connection is now deprecated and will be removed in v2.0. " +
"Use ReplSetConnection#close instead."
end
# Returns +true+ if it's okay to read from a secondary node.
# Since this is a replica set, this must always be true.
#
# This method exist primarily so that Cursor objects will
# generate query messages with a slaveOkay value of +true+.
#
# @return [Boolean] +true+
def slave_ok?
true
end
def authenticate_pools
super
@secondary_pools.each do |pool|
pool.authenticate_existing
end
end
def logout_pools(db)
super
@secondary_pools.each do |pool|
pool.logout_existing(db)
end
end
private
def initiate_auto_refresh
if @auto_refresh
return if @refresh_thread && @refresh_thread.alive?
@refresh_thread = Thread.new do
while true do
sleep(@refresh_interval)
refresh
end
end
else
@last_refresh = Time.now
end
end
# Checkout a socket for reading (i.e., a secondary node).
# Note that @read_pool might point to the primary pool
# if no read pool has been defined.
def checkout_reader
connect unless connected?
socket = get_socket_from_pool(@read_pool)
if !socket
refresh
socket = get_socket_from_pool(@primary_pool)
end
if socket
socket
else
raise ConnectionFailure.new("Could not connect to a node for reading.")
end
end
# Checkout a socket connected to a node with one of
# the provided tags. If no such node exists, raise
# an exception.
#
# NOTE: will be available in driver release v2.0.
def checkout_tagged(tags)
sync_synchronize(:SH) do
tags.each do |k, v|
pool = @tags_to_pools[{k.to_s => v}]
if pool
socket = pool.checkout
@sockets_to_pools[socket] = pool
return socket
end
end
end
raise NodeWithTagsNotFound,
"Could not find a connection tagged with #{tags}."
end
# Checkout a socket for writing (i.e., a primary node).
def checkout_writer
connect unless connected?
socket = get_socket_from_pool(@primary_pool)
if !socket
refresh
socket = get_socket_from_pool(@primary_pool)
end
if socket
socket
else
raise ConnectionFailure.new("Could not connect to primary node.")
end
end
def get_socket_from_pool(pool)
begin
sync_synchronize(:SH) do
if pool
socket = pool.checkout
@sockets_to_pools[socket] = pool
socket
end
end
rescue ConnectionFailure => ex
log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}")
return nil
end
end
# Checkin a socket used for reading.
def checkin_reader(socket)
warn "ReplSetConnection#checkin_writer is deprecated and will be removed " +
"in driver v2.0. Use ReplSetConnection#checkin instead."
checkin(socket)
end
# Checkin a socket used for writing.
def checkin_writer(socket)
warn "ReplSetConnection#checkin_writer is deprecated and will be removed " +
"in driver v2.0. Use ReplSetConnection#checkin instead."
checkin(socket)
end
def checkin(socket)
sync_synchronize(:SH) do
if pool = @sockets_to_pools[socket]
pool.checkin(socket)
elsif socket
socket.close
end
end
if !@auto_refresh &&
((Time.now - @last_refresh) > @refresh_interval)
refresh
end
end
end
end