Skip to content

Commit

Permalink
Remove thread-local map and socket map (complexity creep).
Browse files Browse the repository at this point in the history
  • Loading branch information
banker committed Nov 18, 2011
1 parent 177fad3 commit fa10508
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 297 deletions.
66 changes: 0 additions & 66 deletions lib/mongo/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -481,68 +481,6 @@ def max_bson_size
@max_bson_size
end

def get_local_reader
self.connections ||= {}
if !connected? && self.connections[self.object_id]
self.connections[self.object_id]
else
self.connections[self.object_id] = {}
end
self.connections[self.object_id][:reader] ||= checkout_reader
end

def get_local_writer
self.connections ||= {}
if !connected? && self.connections[self.object_id]
self.connections[self.object_id]
else
self.connections[self.object_id] = {}
end
self.connections[self.object_id][:writer] ||= checkout_writer
end

# Allow the current thread’s connection to return to the pool.
#
# Calling this method allows the socket that has been reserved
# for this thread to be returned to the pool. Other threads will
# then be able to re-use that socket. If your application uses many
# threads, or has long-running threads that infrequently perform MongoDB
# operations, then judicious use of this method can lead to performance gains.
# Care should be taken, however, to make sure that end_request is not called
# in the middle of a sequence of operations in which ordering is important. This
# could lead to unexpected results.
#
# One important case is when a thread is dying permanently. It is best to call
# end_request when you know a thread is finished, as otherwise its socket will
# not be reclaimed.
def end_request
if socket = self.connections[self.object_id][:reader]
checkin(socket)
end

if socket = self.connections[self.object_id][:writer]
checkin(socket)
end
end

# Used to close, check in, or refresh sockets held
# in thread-local variables.
def local_socket_done(socket)
if self.connections[self.object_id][:reader] == socket
if self.read_pool.sockets_low?
checkin(socket)
self.connections[self.object_id][:reader] = nil
end
end

if self.connections[self.object_id][:writer] == socket
if self.primary_pool && self.primary_pool.sockets_low?
checkin(socket)
self.connections[self.object_id][:writer] = nil
end
end
end

# Checkout a socket for reading (i.e., a secondary node).
# Note: this is overridden in ReplSetConnection.
def checkout_reader
Expand All @@ -560,16 +498,12 @@ def checkout_writer
# Checkin a socket used for reading.
# Note: this is overridden in ReplSetConnection.
def checkin_reader(socket)
warn "Connection#checkin_writer is not deprecated and will be removed " +
"in driver v2.0. Use Connection#checkin instead."
checkin(socket)
end

# Checkin a socket used for writing.
# Note: this is overridden in ReplSetConnection.
def checkin_writer(socket)
warn "Connection#checkin_writer is not deprecated and will be removed " +
"in driver v2.0. Use Connection#checkin instead."
checkin(socket)
end

Expand Down
16 changes: 12 additions & 4 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,10 @@ def send_get_more
def checkout_socket_from_connection
@checkin_connection = true
if @command || @read_preference == :primary
@connection.get_local_writer
@connection.checkout_writer
else
@read_pool = @connection.read_pool
@connection.get_local_reader
@connection.checkout_reader
end
end

Expand Down Expand Up @@ -556,7 +556,11 @@ def checkin_socket(sock)
@read_pool.checkin(sock)
@checkin_read_pool = false
elsif @checkin_connection
@connection.local_socket_done(sock)
if @command || @read_preference == :primary
@connection.checkin_writer(sock)
else
@connection.checkin_reader(sock)
end
@checkin_connection = false
end
end
Expand All @@ -566,7 +570,11 @@ def force_checkin_socket(sock)
@read_pool.checkin(sock)
@checkin_read_pool = false
else
@connection.checkin(sock)
if @command || @read_preference == :primary
@connection.checkin_writer(sock)
else
@connection.checkin_reader(sock)
end
@checkin_connection = false
end
end
Expand Down
74 changes: 40 additions & 34 deletions lib/mongo/networking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,23 @@ def send_message(operation, message, opts={})

connection = opts.fetch(:connection, :writer)

begin
add_message_headers(message, operation)
packed_message = message.to_s
add_message_headers(message, operation)
packed_message = message.to_s

if connection == :writer
sock = checkout_writer
else
sock = checkout_reader
end

begin
send_message_on_socket(packed_message, sock)
ensure
if connection == :writer
sock = get_local_writer
checkin_writer(sock)
else
sock = get_local_reader
checkin_reader(sock)
end

send_message_on_socket(packed_message, sock)
local_socket_done(sock)
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
checkin(sock)
raise ex
end
end

Expand All @@ -64,13 +66,13 @@ def send_message_with_safe_check(operation, message, db_name, log_message=nil, l
last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)

packed_message = message.append!(last_error_message).to_s
sock = checkout_writer
begin
sock = get_local_writer
send_message_on_socket(packed_message, sock)
docs, num_received, cursor_id = receive(sock, last_error_id)
local_socket_done(sock)
checkin_writer(sock)
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
checkin(sock)
checkin_writer(sock)
raise ex
end

Expand Down Expand Up @@ -101,30 +103,34 @@ def receive_message(operation, message, log_message=nil, socket=nil, command=fal
read=:primary, exhaust=false)
request_id = add_message_headers(message, operation)
packed_message = message.to_s
begin
if socket
sock = socket
should_checkin = false
if socket
sock = socket
should_checkin = false
else
if command || read == :primary
sock = checkout_writer
elsif read == :secondary
sock = checkout_reader
else
if command
sock = get_local_writer
elsif read == :primary
sock = get_local_writer
elsif read == :secondary
sock = get_local_reader
else
sock = checkout_tagged(read)
end
should_checkin = true
sock = checkout_tagged(read)
end
should_checkin = true
end

result = ''
result = ''
begin
send_message_on_socket(packed_message, sock)
result = receive(sock, request_id, exhaust)
local_socket_done(sock) if should_checkin
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
checkin(sock) if should_checkin
raise ex
ensure
if should_checkin
if command || read == :primary
checkin_writer(sock)
elsif read == :secondary
checkin_reader(sock)
else
# TODO: sock = checkout_tagged(read)
end
end
end
result
end
Expand Down Expand Up @@ -281,7 +287,7 @@ def send_message_on_socket(packed_message, socket)
total_bytes_sent
rescue => ex
close
raise ConnectionFailure, "Operation failed with the following exception: #{ex}"
raise ConnectionFailure, "Operation failed with the following exception: #{ex}:#{ex.message}"
end
end

Expand Down
Loading

0 comments on commit fa10508

Please sign in to comment.