diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index 0268ccb75f..9e7cf96f1a 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -13,6 +13,7 @@ # limitations under the License. require 'mongo/cluster/topology' +require 'mongo/cluster/cursor_reaper' module Mongo @@ -42,6 +43,7 @@ class Cluster attr_reader :topology def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, :single?, :unknown? + def_delegators :@cursor_reaper, :register_cursor, :schedule_kill_cursor, :unregister_cursor # Determine if this cluster of servers is equal to another object. Checks the # servers currently in the cluster, not what was configured. @@ -113,6 +115,10 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self)) seeds.each{ |seed| add(seed) } + + @cursor_reaper = CursorReaper.new + @cursor_reaper.run! + ObjectSpace.define_finalizer(self, self.class.finalize(pools)) end @@ -130,6 +136,8 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) # @since 2.2.0 def self.finalize(pools) proc do + begin; @cursor_reaper.kill_cursors; rescue; end + @cursor_reaper.stop! pools.values.each do |pool| pool.disconnect! end @@ -288,6 +296,8 @@ def servers # # @since 2.1.0 def disconnect! + begin; @cursor_reaper.kill_cursors; rescue; end + @cursor_reaper.stop! @servers.each { |server| server.disconnect! } and true end @@ -301,7 +311,8 @@ def disconnect! # @since 2.1.0 def reconnect! scan! - servers.each { |server| server.reconnect! } and true + servers.each { |server| server.reconnect! } + @cursor_reaper.restart! and true end # Add hosts in a description to the cluster. diff --git a/lib/mongo/cluster/cursor_reaper.rb b/lib/mongo/cluster/cursor_reaper.rb new file mode 100644 index 0000000000..155e463e6b --- /dev/null +++ b/lib/mongo/cluster/cursor_reaper.rb @@ -0,0 +1,174 @@ +# Copyright (C) 2014-2015 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'set' + +module Mongo + class Cluster + + # A manager that sends kill cursors operations at regular intervals to close + # cursors that have been garbage collected without being exhausted. + # + # @since 2.3.0 + class CursorReaper + extend Forwardable + include Retryable + + # The default time interval for the cursor reaper to send pending kill cursors operations. + # + # @since 2.3.0 + FREQUENCY = 1.freeze + + # Create a cursor reaper. + # + # @example Create a CursorReaper. + # Mongo::Cluster::CursorReaper.new(cluster) + # + # @api private + # + # @since 2.3.0 + def initialize + @to_kill = {} + @active_cursors = Set.new + @mutex = Mutex.new + end + + # Start the cursor reaper's thread. + # + # @example Start the cursor reaper's thread. + # reaper.run! + # + # @api private + # + # @since 2.3.0 + def run! + @thread && @thread.alive? ? @thread : start! + end + alias :restart! :run! + + # Schedule a kill cursors operation to be eventually executed. + # + # @example Schedule a kill cursors operation. + # cursor_reaper.schedule_kill_cursor(id, op_spec, server) + # + # @param [ Integer ] id The id of the cursor to kill. + # @param [ Hash ] op_spec The spec for the kill cursors op. + # @param [ Mongo::Server ] server The server to send the kill cursors operation to. + # + # @api private + # + # @since 2.3.0 + def schedule_kill_cursor(id, op_spec, server) + @mutex.synchronize do + if @active_cursors.include?(id) + @to_kill[server] ||= Set.new + @to_kill[server] << op_spec + end + end + end + + # Register a cursor id as active. + # + # @example Register a cursor as active. + # cursor_reaper.register_cursor(id) + # + # @param [ Integer ] id The id of the cursor to register as active. + # + # @api private + # + # @since 2.3.0 + def register_cursor(id) + if id && id > 0 + @mutex.synchronize do + @active_cursors << id + end + end + end + + # Unregister a cursor id, indicating that it's no longer active. + # + # @example Unregister a cursor. + # cursor_reaper.unregister_cursor(id) + # + # @param [ Integer ] id The id of the cursor to unregister. + # + # @api private + # + # @since 2.3.0 + def unregister_cursor(id) + @mutex.synchronize do + @active_cursors.delete(id) + end + end + + # Stop the cursor reaper's thread. + # + # @example Stop the cursor reaper's thread. + # reaper.stop! + # + # @api private + # + # @since 2.3.0 + def stop! + @thread.kill && @thread.stop? + end + + # Execute all pending kill cursors operations. + # + # @example Execute pending kill cursors operations. + # cursor_reaper.kill_cursors + # + # @api private + # + # @since 2.3.0 + def kill_cursors + to_kill_copy = {} + active_cursors_copy = [] + + @mutex.synchronize do + to_kill_copy = @to_kill.dup + active_cursors_copy = @active_cursors.dup + @to_kill = {} + end + + to_kill_copy.each do |server, op_specs| + op_specs.each do |op_spec| + if server.features.find_command_enabled? + Cursor::Builder::KillCursorsCommand.update_cursors(op_spec, active_cursors_copy.to_a) + if Cursor::Builder::KillCursorsCommand.get_cursors_list(op_spec).size > 0 + Operation::Commands::Command.new(op_spec).execute(server) + end + else + Cursor::Builder::OpKillCursors.update_cursors(op_spec, active_cursors_copy.to_a) + if Cursor::Builder::OpKillCursors.get_cursors_list(op_spec).size > 0 + Operation::KillCursors.new(op_spec).execute(server) + end + end + end + end + end + + private + + def start! + @thread = Thread.new(FREQUENCY) do |i| + loop do + sleep(i) + kill_cursors + end + end + end + end + end +end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 58cbeece55..bf8e754609 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -57,6 +57,31 @@ def initialize(view, result, server) @server = server @initial_result = result @remaining = limit if limited? + @cursor_id = result.cursor_id + register + ObjectSpace.define_finalizer(self, self.class.finalize(result.cursor_id, + cluster, + kill_cursors_op_spec, + server)) + end + + + # Finalize the cursor for garbage collection. Schedules this cursor to be included + # in a killCursors operation executed by the Cluster's CursorReaper. + # + # @example Finalize the cursor. + # Cursor.finalize(id, cluster, op, server) + # + # @param [ Integer ] cursor_id The cursor's id. + # @param [ Mongo::Cluster ] cluster The cluster associated with this cursor and its server. + # @param [ Hash ] op_spec The killCursors operation specification. + # @param [ Mongo::Server ] server The server to send the killCursors operation to. + # + # @return [ Proc ] The Finalizer. + # + # @since 2.3.0 + def self.finalize(cursor_id, cluster, op_spec, server) + proc { cluster.schedule_kill_cursor(cursor_id, op_spec, server) } end # Get a human-readable string representation of +Cursor+. @@ -173,16 +198,25 @@ def get_more_operation end def kill_cursors - read_with_retry do + unregister + read_with_one_retry do kill_cursors_operation.execute(@server) end end def kill_cursors_operation if @server.features.find_command_enabled? - Operation::Commands::Command.new(Builder::KillCursorsCommand.new(self).specification) + Operation::Commands::Command.new(kill_cursors_op_spec) + else + Operation::KillCursors.new(kill_cursors_op_spec) + end + end + + def kill_cursors_op_spec + if @server.features.find_command_enabled? + Builder::KillCursorsCommand.new(self).specification else - Operation::KillCursors.new(Builder::OpKillCursors.new(self).specification) + Builder::OpKillCursors.new(self).specification end end @@ -196,13 +230,22 @@ def more? def process(result) @remaining -= result.returned_count if limited? - @cursor_id = result.cursor_id @coll_name ||= result.namespace.sub("#{database.name}.", '') if result.namespace + unregister if result.cursor_id == 0 + @cursor_id = result.cursor_id result.documents end def use_limit? limited? && batch_size >= @remaining end + + def register + cluster.register_cursor(@cursor_id) + end + + def unregister + cluster.unregister_cursor(@cursor_id) + end end end diff --git a/lib/mongo/cursor/builder/kill_cursors_command.rb b/lib/mongo/cursor/builder/kill_cursors_command.rb index ef8d6fc110..cc92b464db 100644 --- a/lib/mongo/cursor/builder/kill_cursors_command.rb +++ b/lib/mongo/cursor/builder/kill_cursors_command.rb @@ -56,6 +56,34 @@ def specification def kill_cursors_command { :killCursors => collection_name, :cursors => [ cursor.id ] } end + + class << self + + # Update a specification's list of cursor ids. + # + # @example Update a specification's list of cursor ids. + # KillCursorsCommand.update_cursors(spec, ids) + # + # @return [ Hash ] The specification. + # @return [ Array ] The ids to update with. + # + # @since 2.3.0 + def update_cursors(spec, ids) + spec[:selector].merge!(cursors: spec[:selector][:cursors] & ids) + end + + # Get the list of cursor ids from a spec generated by this Builder. + # + # @example Get the list of cursor ids. + # KillCursorsCommand.cursors(spec) + # + # @return [ Hash ] The specification. + # + # @since 2.3.0 + def get_cursors_list(spec) + spec[:selector][:cursors] + end + end end end end diff --git a/lib/mongo/cursor/builder/op_kill_cursors.rb b/lib/mongo/cursor/builder/op_kill_cursors.rb index 7af36c00b2..98dab29484 100644 --- a/lib/mongo/cursor/builder/op_kill_cursors.rb +++ b/lib/mongo/cursor/builder/op_kill_cursors.rb @@ -50,6 +50,34 @@ def initialize(cursor) def specification { :coll_name => collection_name, :db_name => database.name, :cursor_ids => [ cursor.id ] } end + + class << self + + # Update a specification's list of cursor ids. + # + # @example Update a specification's list of cursor ids. + # OpKillCursors.update_cursors(spec, ids) + # + # @return [ Hash ] The specification. + # @return [ Array ] The ids to update with. + # + # @since 2.3.0 + def update_cursors(spec, ids) + spec.merge!(cursor_ids: spec[:cursor_ids] & ids) + end + + # Get the list of cursor ids from a spec generated by this Builder. + # + # @example Get the list of cursor ids. + # OpKillCursors.cursors(spec) + # + # @return [ Hash ] The specification. + # + # @since 2.3.0 + def get_cursors_list(spec) + spec[:cursor_ids] + end + end end end end diff --git a/spec/mongo/cluster/cursor_reaper_spec.rb b/spec/mongo/cluster/cursor_reaper_spec.rb new file mode 100644 index 0000000000..752290a3e1 --- /dev/null +++ b/spec/mongo/cluster/cursor_reaper_spec.rb @@ -0,0 +1,215 @@ +require 'spec_helper' + +describe Mongo::Cluster::CursorReaper do + + after do + authorized_collection.delete_many + end + + let(:reaper) do + described_class.new + end + + let(:active_cursors) do + reaper.instance_variable_get(:@active_cursors) + end + + describe '#intialize' do + + it 'initializes a hash for servers and their kill cursors ops' do + expect(reaper.instance_variable_get(:@to_kill)).to be_a(Hash) + end + + it 'initializes a set for the list of active cursors' do + expect(reaper.instance_variable_get(:@active_cursors)).to be_a(Set) + end + end + + describe '#run' do + + it 'starts a thread calling #kill_cursors' do + reaper.run! + expect(reaper.instance_variable_get(:@thread)).to be_a(Thread) + end + + context 'when run is called more than once' do + + let!(:reaper_thread) do + reaper.run! + reaper.instance_variable_get(:@reaper) + end + + it 'only starts a thread once' do + reaper.run! + expect(reaper.instance_variable_get(:@reaper)).to be(reaper_thread) + end + end + + context 'when there are ops in the list to execute' do + + let(:server) { double('server') } + let(:cursor_id) { 1 } + let(:op_spec_1) { double('op_spec_1') } + let(:op_spec_2) { double('op_spec_2') } + let(:to_kill) { reaper.instance_variable_get(:@to_kill)} + + before do + reaper.register_cursor(cursor_id) + reaper.schedule_kill_cursor(cursor_id, op_spec_1, server) + reaper.run! + sleep(Mongo::Cluster::CursorReaper::FREQUENCY + 0.5) + end + + it 'executes the ops in the thread' do + expect(reaper.instance_variable_get(:@to_kill).size).to eq(0) + end + end + end + + describe '#schedule_kill_cursor' do + + let(:server) { double('server') } + let(:cursor_id) { 1 } + let(:op_spec_1) { double('op_spec_1') } + let(:op_spec_2) { double('op_spec_2') } + let(:to_kill) { reaper.instance_variable_get(:@to_kill)} + + context 'when the cursor is on the list of active cursors' do + + before do + reaper.register_cursor(cursor_id) + end + + context 'when there is not a list already for the server' do + + before do + reaper.schedule_kill_cursor(cursor_id, op_spec_1, server) + end + + it 'initializes the list of op specs to a set' do + expect(to_kill.keys).to eq([ server ]) + expect(to_kill[server]).to eq(Set.new([op_spec_1])) + end + end + + context 'when there is a list of ops already for the server' do + + before do + reaper.schedule_kill_cursor(cursor_id, op_spec_1, server) + reaper.schedule_kill_cursor(cursor_id, op_spec_2, server) + end + + it 'adds the op to the server list' do + expect(to_kill.keys).to eq([ server ]) + expect(to_kill[server]).to contain_exactly(op_spec_1, op_spec_2) + end + + context 'when the same op is added more than once' do + + before do + reaper.schedule_kill_cursor(cursor_id, op_spec_2, server) + end + + it 'does not allow duplicates ops for a server' do + expect(to_kill.keys).to eq([ server ]) + expect(to_kill[server]).to contain_exactly(op_spec_1, op_spec_2) + end + end + end + end + + context 'when the cursor is not on the list of active cursors' do + + before do + reaper.schedule_kill_cursor(cursor_id, op_spec_1, server) + end + + it 'does not add the kill cursors op spec to the list' do + expect(to_kill).to eq({}) + end + end + end + + describe '#register_cursor' do + + before do + reaper.register_cursor(cursor_id) + end + + context 'when the cursor id is nil' do + + let(:cursor_id) do + nil + end + + it 'does not register the cursor' do + expect(active_cursors.size).to be(0) + end + end + + context 'when the cursor id is 0' do + + let(:cursor_id) do + 0 + end + + it 'does not register the cursor' do + expect(active_cursors.size).to be(0) + end + end + + context 'when the cursor id is a valid id' do + + let(:cursor_id) do + 2 + end + + it 'registers the cursor id as active' do + expect(active_cursors).to eq(Set.new([2])) + end + end + end + + describe '#unregister_cursor' do + + context 'when the cursor id is in the active cursors list' do + + before do + reaper.register_cursor(2) + reaper.unregister_cursor(2) + end + + it 'removes the cursor id' do + expect(active_cursors.size).to eq(0) + end + end + end + + describe '#stop!' do + + let(:thread) do + reaper.run! + reaper.stop! + sleep(0.5) + reaper.instance_variable_get(:@thread) + end + + it 'stops the thread from running' do + expect(thread.alive?).to be(false) + end + end + + describe '#restart!' do + + let(:thread) do + reaper.run! + reaper.stop! + reaper.restart! + reaper.instance_variable_get(:@thread) + end + + it 'restarts the thread' do + expect(thread.alive?).to be(true) + end + end +end diff --git a/spec/mongo/cluster_spec.rb b/spec/mongo/cluster_spec.rb index f021c7a1b6..7fb380f3da 100644 --- a/spec/mongo/cluster_spec.rb +++ b/spec/mongo/cluster_spec.rb @@ -223,26 +223,36 @@ cluster.instance_variable_get(:@servers) end + let(:cursor_reaper) do + cluster.instance_variable_get(:@cursor_reaper) + end + before do known_servers.each do |server| expect(server).to receive(:disconnect!).and_call_original end + expect(cursor_reaper).to receive(:stop!).and_call_original end - it 'disconnects each server and returns true' do + it 'disconnects each server and the cursor reaper and returns true' do expect(cluster.disconnect!).to be(true) end end describe '#reconnect!' do + let(:cursor_reaper) do + cluster.instance_variable_get(:@cursor_reaper) + end + before do cluster.servers.each do |server| expect(server).to receive(:reconnect!).and_call_original end + expect(cursor_reaper).to receive(:restart!).and_call_original end - it 'reconnects each server and returns true' do + it 'reconnects each server and the cursor reaper and returns true' do expect(cluster.reconnect!).to be(true) end end diff --git a/spec/mongo/cursor_spec.rb b/spec/mongo/cursor_spec.rb index 5ae1b3eeef..b19ccb0306 100644 --- a/spec/mongo/cursor_spec.rb +++ b/spec/mongo/cursor_spec.rb @@ -232,6 +232,95 @@ end end end + + context 'when the cursor is not fully iterated and is garbage collected' do + + let(:documents) do + (1..3).map{ |i| { field: "test#{i}" }} + end + + before do + authorized_collection.insert_many(documents) + cursor_reaper.schedule_kill_cursor(cursor.id, + cursor.send(:kill_cursors_op_spec), + cursor.instance_variable_get(:@server)) + end + + after do + authorized_collection.delete_many + end + + let(:view) do + Mongo::Collection::View.new( + authorized_collection, + {}, + :batch_size => 2 + ) + end + + let!(:cursor) do + view.to_enum.next + view.instance_variable_get(:@cursor) + end + + let(:cursor_reaper) do + authorized_client.cluster.instance_variable_get(:@cursor_reaper) + end + + + it 'schedules a kill cursors op' do + sleep(Mongo::Cluster::CursorReaper::FREQUENCY + 0.5) + expect { + cursor.to_a + }.to raise_exception(Mongo::Error::OperationFailure) + end + + context 'when the cursor is unregistered before the kill cursors operations are executed' do + + it 'does not send a kill cursors operation for the unregistered cursor' do + cursor_reaper.unregister_cursor(cursor.id) + expect(cursor.to_a.size).to eq(documents.size) + end + end + end + + context 'when the cursor is fully iterated' do + + let(:documents) do + (1..3).map{ |i| { field: "test#{i}" }} + end + + before do + authorized_collection.insert_many(documents) + end + + after do + authorized_collection.delete_many + end + + let(:view) do + authorized_collection.find({}, batch_size: 2) + end + + let!(:cursor_id) do + enum.next + enum.next + view.instance_variable_get(:@cursor).id + end + + let(:enum) do + view.to_enum + end + + let(:cursor_reaper) do + authorized_collection.client.cluster.instance_variable_get(:@cursor_reaper) + end + + it 'removes the cursor id from the active cursors tracked by the cluster cursor manager' do + enum.next + expect(cursor_reaper.instance_variable_get(:@active_cursors)).not_to include(cursor_id) + end + end end describe '#inspect' do