Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
Merge 6f211ac into cdf3a26
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurnn committed Nov 9, 2013
2 parents cdf3a26 + 6f211ac commit d76e89d
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 76 deletions.
19 changes: 12 additions & 7 deletions lib/moped/authenticatable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,19 @@ def credentials
# @since 2.0.0
def login(database, username, password)
getnonce = Protocol::Command.new(database, getnonce: 1)
result = Operation::Read.new(getnonce).execute(self)
authenticate = Protocol::Commands::Authenticate.new(database, username, password, result["nonce"])
connection do |conn|
conn.write([ authenticate ])
document = conn.read.documents.first
raise Errors::AuthenticationFailure.new(authenticate, document) unless document["ok"] == 1
credentials[database] = [username, password]
self.write([getnonce])
reply = self.receive_replies([getnonce]).first#.first.documents.first
if getnonce.failure?(reply)
return
end
result = getnonce.results(reply)

authenticate = Protocol::Commands::Authenticate.new(database, username, password, result["nonce"])
self.write([ authenticate ])
document = self.read.documents.first

raise Errors::AuthenticationFailure.new(authenticate, document) unless document["ok"] == 1
credentials[database] = [username, password]
end

# Logout the user from the provided database.
Expand Down
6 changes: 4 additions & 2 deletions lib/moped/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ def with_primary(&block)
if node = nodes.find(&:primary?)
begin
node.ensure_primary do
return yield(node.apply_credentials(credentials))
node.credentials = credentials
return yield(node)
end
rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured
end
Expand All @@ -251,7 +252,8 @@ def with_secondary(&block)
available_nodes = nodes.select(&:secondary?).shuffle!
while node = available_nodes.shift
begin
return yield(node.apply_credentials(credentials))
node.credentials = credentials
return yield(node)
rescue Errors::ConnectionFailure => e
next
end
Expand Down
1 change: 1 addition & 0 deletions lib/moped/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Moped
#
# @since 2.0.0
class Connection
include Authenticatable

# The default connection timeout, in seconds.
#
Expand Down
20 changes: 20 additions & 0 deletions lib/moped/connection/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,26 @@ def size
end
end

# Disconnect all connections including the ones
# waiting on the pool and the ones in use.
#
# @example Disconnect all connections from the pool
# pool.disconnect
#
# @since 2.0.0
def disconnect
mutex.synchronize do
pinned.values.each(&:disconnect)
conns = []
while unpinned.size > 0
conn = unpinned.shift
conn.disconnect
conns.push(conn)
end
conns.each { |c| unpinned.push(c) }
end
end

# Get the timeout when attempting to check out items from the pool.
#
# @example Get the checkout timeout.
Expand Down
4 changes: 3 additions & 1 deletion lib/moped/failover/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ module Retry
def execute(exception, node)
node.disconnect
begin
yield if block_given?
node.connection do |conn|
yield(conn) if block_given?
end
rescue Exception => e
node.down!
raise(e)
Expand Down
112 changes: 61 additions & 51 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ module Moped
#
# @since 1.0.0
class Node
include Authenticatable
include Executable
include Instrumentable

Expand All @@ -29,6 +28,10 @@ class Node
# @return [ Time ] The last time the node did a refresh.
attr_reader :address, :down_at, :latency, :options, :refreshed_at

# @!attribute credentials
# @return [ Hash ] The credentials of the node.
attr_accessor :credentials

# Is this node equal to another?
#
# @example Is the node equal to another.
Expand Down Expand Up @@ -87,24 +90,6 @@ def command(database, cmd, options = {})
read(Protocol::Command.new(database, cmd, options))
end

# Connect the node on the underlying connection.
#
# @example Connect the node.
# node.connect
#
# @raise [ Errors::ConnectionFailure ] If connection failed.
#
# @return [ true ] If the connection suceeded.
#
# @since 2.0.0
def connect
start = Time.now
connection{ |conn| conn.connect }
@latency = Time.now - start
@down_at = nil
true
end

# Is the node currently connected?
#
# @example Is the node connected?
Expand All @@ -131,20 +116,6 @@ def connection
end
end

# Force the node to disconnect from the server.
#
# @example Disconnect the node.
# node.disconnect
#
# @return [ true ] If the disconnection succeeded.
#
# @since 1.2.0
def disconnect
credentials.clear
connection{ |conn| conn.disconnect }
true
end

# Is the node down?
#
# @example Is the node down?
Expand All @@ -157,6 +128,20 @@ def down?
@down_at
end

# Force the node to disconnect from the server.
#
# @example Disconnect the node.
# node.disconnect
#
# @return [ true ] If the disconnection succeeded.
#
# @since 1.2.0
def disconnect
@credentials.clear
pool.disconnect
true
end

# Mark the node as down.
#
# @example Mark the node as down.
Expand All @@ -168,7 +153,7 @@ def down?
def down!
@down_at = Time.new
@latency = nil
disconnect if connected?
disconnect
end

# Yields the block if a connection can be established, retrying when a
Expand All @@ -185,15 +170,23 @@ def down!
#
# @since 1.0.0
def ensure_connected(&block)
return yield if executing?(:connection)
execute(:connection) do
begin
connect unless connected?
yield(self)
rescue Exception => e
Failover.get(e).execute(e, self, &block)
unless (conn = stack(:connection)).empty?
return yield(conn.first)
end

begin
connection do |conn|
stack(:connection) << conn
connect(conn) unless conn.connected?
conn.apply_credentials(@credentials)
yield(conn)
end
rescue Exception => e
Failover.get(e).execute(e, self, &block)
ensure
end_execution(:connection)
end

end

# Set a flag on the node for the duration of provided block so that an
Expand Down Expand Up @@ -259,6 +252,7 @@ def initialize(address, options = {})
@latency = nil
@primary = nil
@secondary = nil
@credentials = {}
@instrumenter = options[:instrumenter] || Instrumentable::Log
@address = Address.new(address, timeout)
@address.resolve(self)
Expand Down Expand Up @@ -523,6 +517,24 @@ def inspect

private

# Connect the node on the underlying connection.
#
# @example Connect the node.
# node.connect
#
# @raise [ Errors::ConnectionFailure ] If connection failed.
#
# @return [ true ] If the connection suceeded.
#
# @since 2.0.0
def connect(conn)
start = Time.now
conn.connect
@latency = Time.now - start
@down_at = nil
true
end

# Configure the node based on the return from the ismaster command.
#
# @api private
Expand Down Expand Up @@ -554,7 +566,7 @@ def configure(settings)
def discover(*nodes)
nodes.flatten.compact.each do |peer|
node = Node.new(peer, options)
node.credentials.merge!(credentials)
node.credentials.merge!(@credentials)
peers.push(node)
end
end
Expand All @@ -574,16 +586,14 @@ def discover(*nodes)
def flush(ops = queue)
operations, callbacks = ops.transpose
logging(operations) do
ensure_connected do
replies = nil
connection do |conn|
conn.write(operations)
replies = conn.receive_replies(operations)
end
replies.zip(callbacks).map do |reply, callback|
callback ? callback[reply] : reply
end.last
replies = nil
ensure_connected do |conn|
conn.write(operations)
replies = conn.receive_replies(operations)
end
replies.zip(callbacks).map do |reply, callback|
callback ? callback[reply] : reply
end.last
end
ensure
ops.clear
Expand Down
7 changes: 1 addition & 6 deletions lib/moped/operation/read.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,7 @@ def initialize(operation)
def execute(node)
node.process(operation) do |reply|
if operation.failure?(reply)
if reply.unauthorized? && node.credentials.has_key?(database)
node.login(database, *node.credentials[database])
return execute(node)
else
raise operation.failure_exception(reply)
end
raise operation.failure_exception(reply)
end
operation.results(reply)
end
Expand Down
4 changes: 0 additions & 4 deletions spec/moped/failover/disconnect_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@
Moped::Node.new("127.0.0.1:27017")
end

before do
node.send(:connect)
end

it "raises a socket error" do
expect {
described_class.execute(exception, node)
Expand Down
4 changes: 0 additions & 4 deletions spec/moped/failover/retry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@

context "when the retry fails" do

before do
node.send(:connect)
end

it "re-raises the exception" do
expect {
described_class.execute(exception, node) do
Expand Down
4 changes: 3 additions & 1 deletion spec/moped/node_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@
context "when the node is connected" do

before do
node.connect
node.connection do |conn|
node.send(:connect, conn)
end
end

it "returns the latency in seconds" do
Expand Down

0 comments on commit d76e89d

Please sign in to comment.