Skip to content
13 changes: 12 additions & 1 deletion lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

require 'mongo/cluster/topology'
require 'mongo/cluster/cursor_reaper'

module Mongo

Expand Down Expand Up @@ -42,6 +43,7 @@ class Cluster
attr_reader :topology

def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, :single?, :unknown?
def_delegators :@cursor_reaper, :register_cursor, :schedule_kill_cursor, :unregister_cursor

# Determine if this cluster of servers is equal to another object. Checks the
# servers currently in the cluster, not what was configured.
Expand Down Expand Up @@ -113,6 +115,10 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self))

seeds.each{ |seed| add(seed) }

@cursor_reaper = CursorReaper.new
@cursor_reaper.run!

ObjectSpace.define_finalizer(self, self.class.finalize(pools))
end

Expand All @@ -130,6 +136,8 @@ def initialize(seeds, monitoring, options = Options::Redacted.new)
# @since 2.2.0
def self.finalize(pools)
proc do
begin; @cursor_reaper.kill_cursors; rescue; end
@cursor_reaper.stop!
pools.values.each do |pool|
pool.disconnect!
end
Expand Down Expand Up @@ -288,6 +296,8 @@ def servers
#
# @since 2.1.0
def disconnect!
begin; @cursor_reaper.kill_cursors; rescue; end
@cursor_reaper.stop!
@servers.each { |server| server.disconnect! } and true
end

Expand All @@ -301,7 +311,8 @@ def disconnect!
# @since 2.1.0
def reconnect!
scan!
servers.each { |server| server.reconnect! } and true
servers.each { |server| server.reconnect! }
@cursor_reaper.restart! and true
end

# Add hosts in a description to the cluster.
Expand Down
174 changes: 174 additions & 0 deletions lib/mongo/cluster/cursor_reaper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# Copyright (C) 2014-2015 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'set'

module Mongo
class Cluster

# A manager that sends kill cursors operations at regular intervals to close
# cursors that have been garbage collected without being exhausted.
#
# @since 2.3.0
class CursorReaper
extend Forwardable
include Retryable

# The default time interval for the cursor reaper to send pending kill cursors operations.
#
# @since 2.3.0
FREQUENCY = 1.freeze

# Create a cursor reaper.
#
# @example Create a CursorReaper.
# Mongo::Cluster::CursorReaper.new(cluster)
#
# @api private
#
# @since 2.3.0
def initialize
@to_kill = {}
@active_cursors = Set.new
@mutex = Mutex.new
end

# Start the cursor reaper's thread.
#
# @example Start the cursor reaper's thread.
# reaper.run!
#
# @api private
#
# @since 2.3.0
def run!
@thread && @thread.alive? ? @thread : start!
end
alias :restart! :run!

# Schedule a kill cursors operation to be eventually executed.
#
# @example Schedule a kill cursors operation.
# cursor_reaper.schedule_kill_cursor(id, op_spec, server)
#
# @param [ Integer ] id The id of the cursor to kill.
# @param [ Hash ] op_spec The spec for the kill cursors op.
# @param [ Mongo::Server ] server The server to send the kill cursors operation to.
#
# @api private
#
# @since 2.3.0
def schedule_kill_cursor(id, op_spec, server)
@mutex.synchronize do
if @active_cursors.include?(id)
@to_kill[server] ||= Set.new
@to_kill[server] << op_spec
end
end
end

# Register a cursor id as active.
#
# @example Register a cursor as active.
# cursor_reaper.register_cursor(id)
#
# @param [ Integer ] id The id of the cursor to register as active.
#
# @api private
#
# @since 2.3.0
def register_cursor(id)
if id && id > 0
@mutex.synchronize do
@active_cursors << id
end
end
end

# Unregister a cursor id, indicating that it's no longer active.
#
# @example Unregister a cursor.
# cursor_reaper.unregister_cursor(id)
#
# @param [ Integer ] id The id of the cursor to unregister.
#
# @api private
#
# @since 2.3.0
def unregister_cursor(id)
@mutex.synchronize do
@active_cursors.delete(id)
end
end

# Stop the cursor reaper's thread.
#
# @example Stop the cursor reaper's thread.
# reaper.stop!
#
# @api private
#
# @since 2.3.0
def stop!
@thread.kill && @thread.stop?
end

# Execute all pending kill cursors operations.
#
# @example Execute pending kill cursors operations.
# cursor_reaper.kill_cursors
#
# @api private
#
# @since 2.3.0
def kill_cursors
to_kill_copy = {}
active_cursors_copy = []

@mutex.synchronize do
to_kill_copy = @to_kill.dup
active_cursors_copy = @active_cursors.dup
@to_kill = {}
end

to_kill_copy.each do |server, op_specs|
op_specs.each do |op_spec|
if server.features.find_command_enabled?
Cursor::Builder::KillCursorsCommand.update_cursors(op_spec, active_cursors_copy.to_a)
if Cursor::Builder::KillCursorsCommand.get_cursors_list(op_spec).size > 0
Operation::Commands::Command.new(op_spec).execute(server)
end
else
Cursor::Builder::OpKillCursors.update_cursors(op_spec, active_cursors_copy.to_a)
if Cursor::Builder::OpKillCursors.get_cursors_list(op_spec).size > 0
Operation::KillCursors.new(op_spec).execute(server)
end
end
end
end
end

private

def start!
@thread = Thread.new(FREQUENCY) do |i|
loop do
sleep(i)
kill_cursors
end
end
end
end
end
end
51 changes: 47 additions & 4 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,31 @@ def initialize(view, result, server)
@server = server
@initial_result = result
@remaining = limit if limited?
@cursor_id = result.cursor_id
register
ObjectSpace.define_finalizer(self, self.class.finalize(result.cursor_id,
cluster,
kill_cursors_op_spec,
server))
end


# Finalize the cursor for garbage collection. Schedules this cursor to be included
# in a killCursors operation executed by the Cluster's CursorReaper.
#
# @example Finalize the cursor.
# Cursor.finalize(id, cluster, op, server)
#
# @param [ Integer ] cursor_id The cursor's id.
# @param [ Mongo::Cluster ] cluster The cluster associated with this cursor and its server.
# @param [ Hash ] op_spec The killCursors operation specification.
# @param [ Mongo::Server ] server The server to send the killCursors operation to.
#
# @return [ Proc ] The Finalizer.
#
# @since 2.3.0
def self.finalize(cursor_id, cluster, op_spec, server)
proc { cluster.schedule_kill_cursor(cursor_id, op_spec, server) }
end

# Get a human-readable string representation of +Cursor+.
Expand Down Expand Up @@ -173,16 +198,25 @@ def get_more_operation
end

def kill_cursors
read_with_retry do
unregister
read_with_one_retry do
kill_cursors_operation.execute(@server)
end
end

def kill_cursors_operation
if @server.features.find_command_enabled?
Operation::Commands::Command.new(Builder::KillCursorsCommand.new(self).specification)
Operation::Commands::Command.new(kill_cursors_op_spec)
else
Operation::KillCursors.new(kill_cursors_op_spec)
end
end

def kill_cursors_op_spec
if @server.features.find_command_enabled?
Builder::KillCursorsCommand.new(self).specification
else
Operation::KillCursors.new(Builder::OpKillCursors.new(self).specification)
Builder::OpKillCursors.new(self).specification
end
end

Expand All @@ -196,13 +230,22 @@ def more?

def process(result)
@remaining -= result.returned_count if limited?
@cursor_id = result.cursor_id
@coll_name ||= result.namespace.sub("#{database.name}.", '') if result.namespace
unregister if result.cursor_id == 0
@cursor_id = result.cursor_id
result.documents
end

def use_limit?
limited? && batch_size >= @remaining
end

def register
cluster.register_cursor(@cursor_id)
end

def unregister
cluster.unregister_cursor(@cursor_id)
end
end
end
28 changes: 28 additions & 0 deletions lib/mongo/cursor/builder/kill_cursors_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,34 @@ def specification
def kill_cursors_command
{ :killCursors => collection_name, :cursors => [ cursor.id ] }
end

class << self

# Update a specification's list of cursor ids.
#
# @example Update a specification's list of cursor ids.
# KillCursorsCommand.update_cursors(spec, ids)
#
# @return [ Hash ] The specification.
# @return [ Array ] The ids to update with.
#
# @since 2.3.0
def update_cursors(spec, ids)
spec[:selector].merge!(cursors: spec[:selector][:cursors] & ids)
end

# Get the list of cursor ids from a spec generated by this Builder.
#
# @example Get the list of cursor ids.
# KillCursorsCommand.cursors(spec)
#
# @return [ Hash ] The specification.
#
# @since 2.3.0
def get_cursors_list(spec)
spec[:selector][:cursors]
end
end
end
end
end
Expand Down
28 changes: 28 additions & 0 deletions lib/mongo/cursor/builder/op_kill_cursors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,34 @@ def initialize(cursor)
def specification
{ :coll_name => collection_name, :db_name => database.name, :cursor_ids => [ cursor.id ] }
end

class << self

# Update a specification's list of cursor ids.
#
# @example Update a specification's list of cursor ids.
# OpKillCursors.update_cursors(spec, ids)
#
# @return [ Hash ] The specification.
# @return [ Array ] The ids to update with.
#
# @since 2.3.0
def update_cursors(spec, ids)
spec.merge!(cursor_ids: spec[:cursor_ids] & ids)
end

# Get the list of cursor ids from a spec generated by this Builder.
#
# @example Get the list of cursor ids.
# OpKillCursors.cursors(spec)
#
# @return [ Hash ] The specification.
#
# @since 2.3.0
def get_cursors_list(spec)
spec[:cursor_ids]
end
end
end
end
end
Expand Down
Loading