From 96f263765edd73b8150df5722d04b01c9b5eaf54 Mon Sep 17 00:00:00 2001 From: Emily Date: Tue, 31 May 2016 11:56:26 +0200 Subject: [PATCH 1/9] RUBY-1104 Add CursorManager for periodically sending kill cursor operations --- lib/mongo/cluster.rb | 7 + lib/mongo/cluster/cursor_manager.rb | 161 +++++++++++++++ lib/mongo/cursor.rb | 34 +++- .../cursor/builder/kill_cursors_command.rb | 28 +++ lib/mongo/cursor/builder/op_kill_cursors.rb | 28 +++ spec/mongo/cluster/cursor_manager_spec.rb | 187 ++++++++++++++++++ spec/mongo/cursor_spec.rb | 89 +++++++++ 7 files changed, 531 insertions(+), 3 deletions(-) create mode 100644 lib/mongo/cluster/cursor_manager.rb create mode 100644 spec/mongo/cluster/cursor_manager_spec.rb diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index 0268ccb75f..70b93c29df 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -13,6 +13,7 @@ # limitations under the License. require 'mongo/cluster/topology' +require 'mongo/cluster/cursor_manager' module Mongo @@ -42,6 +43,7 @@ class Cluster attr_reader :topology def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, :single?, :unknown? + def_delegators :@cursor_manager, :register_cursor, :schedule_kill_cursor, :unregister_cursor # Determine if this cluster of servers is equal to another object. Checks the # servers currently in the cluster, not what was configured. @@ -113,6 +115,10 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) subscribe_to(Event::PRIMARY_ELECTED, Event::PrimaryElected.new(self)) seeds.each{ |seed| add(seed) } + + @cursor_manager = CursorManager.new(self) + @cursor_manager.run + ObjectSpace.define_finalizer(self, self.class.finalize(pools)) end @@ -133,6 +139,7 @@ def self.finalize(pools) pools.values.each do |pool| pool.disconnect! end + cursor_manager.kill_cursors end end diff --git a/lib/mongo/cluster/cursor_manager.rb b/lib/mongo/cluster/cursor_manager.rb new file mode 100644 index 0000000000..db78afdf9c --- /dev/null +++ b/lib/mongo/cluster/cursor_manager.rb @@ -0,0 +1,161 @@ +# Copyright (C) 2014-2015 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'set' + +module Mongo + class Cluster + + # A manager that sends kill cursors operations at regular intervals to close + # cursors that have been garbage collected without being exhausted. + # + # @since 2.3.0 + class CursorManager + extend Forwardable + include Retryable + + # @return [ Mongo::Cluster ] The cluster associated with this cursor manager. + attr_reader :cluster + + # The default time interval for the cursor manager to send pending kill cursors operations. + # + # @since 2.3.0 + FREQUENCY = 1.freeze + + # Create a cursor manager. + # + # @example Create a CursorManager. + # Mongo::Cluster::CursorManager.new(cluster) + # + # @api private + # + # @since 2.3.0 + def initialize(cluster) + @to_kill = {} + @active_cursors = Set.new + @mutex = Mutex.new + @cluster = cluster + end + + # Start the cursor manager's reaper thread. + # + # @example Start the cursor manager's reaper thread. + # manager.run + # + # @api private + # + # @since 2.3.0 + def run + @reaper ||= Thread.new(FREQUENCY) do |i| + loop do + sleep(i) + kill_cursors + end + end + end + + # Schedule a kill cursors operation to be eventually executed. + # + # @example Schedule a kill cursors operation. + # manager.schedule_kill_cursor(id, op_spec, server) + # + # @param [ Integer ] id The id of the cursor to kill. + # @param [ Hash ] op_spec The spec for the kill cursors op. + # @param [ Mongo::Server ] server The server to send the kill cursors operation to. + # + # @api private + # + # @since 2.3.0 + def schedule_kill_cursor(id, op_spec, server) + @mutex.synchronize do + if @active_cursors.include?(id) + @to_kill[server] ||= Set.new + @to_kill[server] << op_spec + end + end + end + + # Register a cursor id as active. + # + # @example Register a cursor as active. + # manager.register_cursor(id) + # + # @param [ Integer ] id The id of the cursor to register as active. + # + # @api private + # + # @since 2.3.0 + def register_cursor(id) + if id && id > 0 + @mutex.synchronize do + @active_cursors << id + end + end + end + + # Unregister a cursor id, indicating that it's no longer active. + # + # @example Unregister a cursor. + # manager.unregister_cursor(id) + # + # @param [ Integer ] id The id of the cursor to unregister. + # + # @api private + # + # @since 2.3.0 + def unregister_cursor(id) + @mutex.synchronize do + @active_cursors.delete(id) + end + end + + # Execute all pending kill cursors operations. + # + # @example Execute pending kill cursors operations. + # manager.kill_cursors + # + # @api private + # + # @since 2.3.0 + def kill_cursors + to_kill_copy = {} + active_cursors_copy = [] + + @mutex.synchronize do + to_kill_copy = @to_kill.dup + active_cursors_copy = @active_cursors.dup + @to_kill = {} + end + + to_kill_copy.each do |server, op_specs| + op_specs.each do |op_spec| + read_with_retry do + if server.features.find_command_enabled? + Cursor::Builder::KillCursorsCommand.update_cursors(op_spec, active_cursors_copy.to_a) + if Cursor::Builder::KillCursorsCommand.cursors(op_spec).size > 0 + Operation::Commands::Command.new(op_spec).execute(server.context) + end + else + Cursor::Builder::OpKillCursors.update_cursors(op_spec, active_cursors_copy.to_a) + if Cursor::Builder::OpKillCursors.cursors(op_spec).size > 0 + Operation::KillCursors.new(op_spec).execute(server.context) + end + end + end + end + end + end + end + end +end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 58cbeece55..da7f840cb9 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -57,6 +57,16 @@ def initialize(view, result, server) @server = server @initial_result = result @remaining = limit if limited? + @cursor_id = result.cursor_id + register_cursor + ObjectSpace.define_finalizer(self, self.class.finalize(result.cursor_id, + cluster, + kill_cursors_op_spec, + server)) + end + + def self.finalize(cursor_id, cluster, op_spec, server) + proc { cluster.schedule_kill_cursor(cursor_id, op_spec, server) } end # Get a human-readable string representation of +Cursor+. @@ -173,6 +183,7 @@ def get_more_operation end def kill_cursors + unregister_cursor read_with_retry do kill_cursors_operation.execute(@server) end @@ -180,9 +191,17 @@ def kill_cursors def kill_cursors_operation if @server.features.find_command_enabled? - Operation::Commands::Command.new(Builder::KillCursorsCommand.new(self).specification) + Operation::Commands::Command.new(kill_cursors_op_spec) + else + Operation::KillCursors.new(kill_cursors_op_spec) + end + end + + def kill_cursors_op_spec + if @server.features.find_command_enabled? + Builder::KillCursorsCommand.new(self).specification else - Operation::KillCursors.new(Builder::OpKillCursors.new(self).specification) + Builder::OpKillCursors.new(self).specification end end @@ -196,13 +215,22 @@ def more? def process(result) @remaining -= result.returned_count if limited? - @cursor_id = result.cursor_id @coll_name ||= result.namespace.sub("#{database.name}.", '') if result.namespace + unregister_cursor if result.cursor_id == 0 + @cursor_id = result.cursor_id result.documents end def use_limit? limited? && batch_size >= @remaining end + + def register_cursor + cluster.register_cursor(@cursor_id) + end + + def unregister_cursor + cluster.unregister_cursor(@cursor_id) + end end end diff --git a/lib/mongo/cursor/builder/kill_cursors_command.rb b/lib/mongo/cursor/builder/kill_cursors_command.rb index ef8d6fc110..92ba1e227d 100644 --- a/lib/mongo/cursor/builder/kill_cursors_command.rb +++ b/lib/mongo/cursor/builder/kill_cursors_command.rb @@ -56,6 +56,34 @@ def specification def kill_cursors_command { :killCursors => collection_name, :cursors => [ cursor.id ] } end + + class << self + + # Update a specification's list of cursor ids. + # + # @example Update a specification's list of cursor ids. + # KillCursorsCommand.update_cursors(spec, ids) + # + # @return [ Hash ] The specification. + # @return [ Array ] The ids to update with. + # + # @since 2.3.0 + def update_cursors(spec, ids) + spec[:selector].merge!(cursors: spec[:selector][:cursors] & ids) + end + + # Get the list of cursor ids from a spec generated by this Builder. + # + # @example Get the list of cursor ids. + # KillCursorsCommand.cursors(spec) + # + # @return [ Hash ] The specification. + # + # @since 2.3.0 + def cursors(spec) + spec[:selector][:cursors] + end + end end end end diff --git a/lib/mongo/cursor/builder/op_kill_cursors.rb b/lib/mongo/cursor/builder/op_kill_cursors.rb index 7af36c00b2..c650867223 100644 --- a/lib/mongo/cursor/builder/op_kill_cursors.rb +++ b/lib/mongo/cursor/builder/op_kill_cursors.rb @@ -50,6 +50,34 @@ def initialize(cursor) def specification { :coll_name => collection_name, :db_name => database.name, :cursor_ids => [ cursor.id ] } end + + class << self + + # Update a specification's list of cursor ids. + # + # @example Update a specification's list of cursor ids. + # OpKillCursors.update_cursors(spec, ids) + # + # @return [ Hash ] The specification. + # @return [ Array ] The ids to update with. + # + # @since 2.3.0 + def update_cursors(spec, ids) + spec.merge!(cursor_ids: spec[:cursor_ids] & ids) + end + + # Get the list of cursor ids from a spec generated by this Builder. + # + # @example Get the list of cursor ids. + # OpKillCursors.cursors(spec) + # + # @return [ Hash ] The specification. + # + # @since 2.3.0 + def cursors(spec) + spec[:cursor_ids] + end + end end end end diff --git a/spec/mongo/cluster/cursor_manager_spec.rb b/spec/mongo/cluster/cursor_manager_spec.rb new file mode 100644 index 0000000000..b1c823d40b --- /dev/null +++ b/spec/mongo/cluster/cursor_manager_spec.rb @@ -0,0 +1,187 @@ +require 'spec_helper' + +describe Mongo::Cluster::CursorManager do + + after do + authorized_collection.delete_many + end + + let(:manager) do + described_class.new(authorized_client.cluster) + end + + let(:active_cursors) do + manager.instance_variable_get(:@active_cursors) + end + + describe '#intialize' do + + it 'initializes a hash for servers and their kill cursors ops' do + expect(manager.instance_variable_get(:@to_kill)).to be_a(Hash) + end + + it 'initializes a set for the list of active cursors' do + expect(manager.instance_variable_get(:@active_cursors)).to be_a(Set) + end + end + + describe '#run' do + + it 'starts a thread calling #kill_cursors' do + manager.run + expect(manager.instance_variable_get(:@reaper)).to be_a(Thread) + end + + context 'when run is called more than once' do + + let!(:reaper_thread) do + manager.run + manager.instance_variable_get(:@reaper) + end + + it 'only starts a thread once' do + manager.run + expect(manager.instance_variable_get(:@reaper)).to be(reaper_thread) + end + end + + context 'when there are ops in the list to execute' do + + let(:server) { double('server') } + let(:cursor_id) { 1 } + let(:op_spec_1) { double('op_spec_1') } + let(:op_spec_2) { double('op_spec_2') } + let(:to_kill) { manager.instance_variable_get(:@to_kill)} + + before do + manager.register_cursor(cursor_id) + manager.schedule_kill_cursor(cursor_id, op_spec_1, server) + manager.run + sleep(Mongo::Cluster::CursorManager::FREQUENCY + 0.5) + end + + it 'executes the ops in the thread' do + expect(manager.instance_variable_get(:@to_kill).size).to eq(0) + end + end + end + + describe '#schedule_kill_cursor' do + + let(:server) { double('server') } + let(:cursor_id) { 1 } + let(:op_spec_1) { double('op_spec_1') } + let(:op_spec_2) { double('op_spec_2') } + let(:to_kill) { manager.instance_variable_get(:@to_kill)} + + context 'when the cursor is on the list of active cursors' do + + before do + manager.register_cursor(cursor_id) + end + + context 'when there is not a list already for the server' do + + before do + manager.schedule_kill_cursor(cursor_id, op_spec_1, server) + end + + it 'initializes the list of op specs to a set' do + expect(to_kill.keys).to eq([ server ]) + expect(to_kill[server]).to eq(Set.new([op_spec_1])) + end + end + + context 'when there is a list of ops already for the server' do + + before do + manager.schedule_kill_cursor(cursor_id, op_spec_1, server) + manager.schedule_kill_cursor(cursor_id, op_spec_2, server) + end + + it 'adds the op to the server list' do + expect(to_kill.keys).to eq([ server ]) + expect(to_kill[server]).to eq(Set.new([op_spec_1, op_spec_2])) + end + + context 'when the same op is added more than once' do + + before do + manager.schedule_kill_cursor(cursor_id, op_spec_2, server) + end + + it 'does not allow duplicates ops for a server' do + expect(to_kill.keys).to eq([ server ]) + expect(to_kill[server]).to eq(Set.new([op_spec_1, op_spec_2])) + end + end + end + end + + context 'when the cursor is not on the list of active cursors' do + + before do + manager.schedule_kill_cursor(cursor_id, op_spec_1, server) + end + + it 'does not add the kill cursors op spec to the list' do + expect(to_kill).to eq({}) + end + end + end + + describe '#register_cursor' do + + before do + manager.register_cursor(cursor_id) + end + + context 'when the cursor id is nil' do + + let(:cursor_id) do + nil + end + + it 'does not register the cursor' do + expect(active_cursors.size).to be(0) + end + end + + context 'when the cursor id is 0' do + + let(:cursor_id) do + 0 + end + + it 'does not register the cursor' do + expect(active_cursors.size).to be(0) + end + end + + context 'when the cursor id is a valid id' do + + let(:cursor_id) do + 2 + end + + it 'registers the cursor id as active' do + expect(active_cursors).to eq(Set.new([2])) + end + end + end + + describe '#unregister_cursor' do + + context 'when the cursor id is in the active cursors list' do + + before do + manager.register_cursor(2) + manager.unregister_cursor(2) + end + + it 'removes the cursor id' do + expect(active_cursors).to eq(Set.new) + end + end + end +end diff --git a/spec/mongo/cursor_spec.rb b/spec/mongo/cursor_spec.rb index 5ae1b3eeef..09116dc37e 100644 --- a/spec/mongo/cursor_spec.rb +++ b/spec/mongo/cursor_spec.rb @@ -232,6 +232,95 @@ end end end + + context 'when the cursor is not fully iterated and is garbage collected' do + + let(:documents) do + (1..3).map{ |i| { field: "test#{i}" }} + end + + before do + authorized_collection.insert_many(documents) + cursor_manager.schedule_kill_cursor(cursor.id, + cursor.send(:kill_cursors_op_spec), + cursor.instance_variable_get(:@server)) + end + + after do + authorized_collection.delete_many + end + + let(:view) do + Mongo::Collection::View.new( + authorized_collection, + {}, + :batch_size => 2 + ) + end + + let!(:cursor) do + view.to_enum.next + view.instance_variable_get(:@cursor) + end + + let(:cursor_manager) do + authorized_client.cluster.instance_variable_get(:@cursor_manager) + end + + + it 'schedules a kill cursors op' do + sleep(Mongo::Cluster::CursorManager::FREQUENCY + 0.5) + expect { + cursor.to_a + }.to raise_exception(Mongo::Error::OperationFailure) + end + + context 'when the cursor is unregistered before the kill cursors operations are executed' do + + it 'does not send a kill cursors operation for the unregistered cursor' do + cursor_manager.unregister_cursor(cursor.id) + expect(cursor.to_a.size).to eq(documents.size) + end + end + end + + context 'when the cursor is fully iterated' do + + let(:documents) do + (1..3).map{ |i| { field: "test#{i}" }} + end + + before do + authorized_collection.insert_many(documents) + end + + after do + authorized_collection.delete_many + end + + let(:view) do + authorized_collection.find({}, batch_size: 2) + end + + let!(:cursor_id) do + enum.next + enum.next + view.instance_variable_get(:@cursor).id + end + + let(:enum) do + view.to_enum + end + + let(:cursor_manager) do + authorized_collection.client.cluster.instance_variable_get(:@cursor_manager) + end + + it 'removes the cursor id from the active cursors tracked by the cluster cursor manager' do + enum.next + expect(cursor_manager.instance_variable_get(:@active_cursors)).not_to include(cursor_id) + end + end end describe '#inspect' do From 7df438e492a9c25715a4a198f995c8e88fc6b459 Mon Sep 17 00:00:00 2001 From: Emily Date: Thu, 2 Jun 2016 15:48:35 +0200 Subject: [PATCH 2/9] RUBY-1104 Change name to CursorReaper --- lib/mongo/cluster.rb | 10 ++-- .../{cursor_manager.rb => cursor_reaper.rb} | 28 +++++----- lib/mongo/cursor.rb | 15 ++++++ ..._manager_spec.rb => cursor_reaper_spec.rb} | 54 +++++++++---------- spec/mongo/cursor_spec.rb | 16 +++--- 5 files changed, 69 insertions(+), 54 deletions(-) rename lib/mongo/cluster/{cursor_manager.rb => cursor_reaper.rb} (86%) rename spec/mongo/cluster/{cursor_manager_spec.rb => cursor_reaper_spec.rb} (69%) diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index 70b93c29df..69e7ea312b 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -13,7 +13,7 @@ # limitations under the License. require 'mongo/cluster/topology' -require 'mongo/cluster/cursor_manager' +require 'mongo/cluster/cursor_reaper' module Mongo @@ -43,7 +43,7 @@ class Cluster attr_reader :topology def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, :single?, :unknown? - def_delegators :@cursor_manager, :register_cursor, :schedule_kill_cursor, :unregister_cursor + def_delegators :@cursor_reaper, :register_cursor, :schedule_kill_cursor, :unregister_cursor # Determine if this cluster of servers is equal to another object. Checks the # servers currently in the cluster, not what was configured. @@ -116,8 +116,8 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) seeds.each{ |seed| add(seed) } - @cursor_manager = CursorManager.new(self) - @cursor_manager.run + @cursor_reaper = CursorReaper.new(self) + @cursor_reaper.run ObjectSpace.define_finalizer(self, self.class.finalize(pools)) end @@ -136,10 +136,10 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) # @since 2.2.0 def self.finalize(pools) proc do + cursor_reaper.kill_cursors pools.values.each do |pool| pool.disconnect! end - cursor_manager.kill_cursors end end diff --git a/lib/mongo/cluster/cursor_manager.rb b/lib/mongo/cluster/cursor_reaper.rb similarity index 86% rename from lib/mongo/cluster/cursor_manager.rb rename to lib/mongo/cluster/cursor_reaper.rb index db78afdf9c..f9e7e0d4a7 100644 --- a/lib/mongo/cluster/cursor_manager.rb +++ b/lib/mongo/cluster/cursor_reaper.rb @@ -21,22 +21,22 @@ class Cluster # cursors that have been garbage collected without being exhausted. # # @since 2.3.0 - class CursorManager + class CursorReaper extend Forwardable include Retryable - # @return [ Mongo::Cluster ] The cluster associated with this cursor manager. + # @return [ Mongo::Cluster ] The cluster associated with this cursor reaper. attr_reader :cluster - # The default time interval for the cursor manager to send pending kill cursors operations. + # The default time interval for the cursor reaper to send pending kill cursors operations. # # @since 2.3.0 FREQUENCY = 1.freeze - # Create a cursor manager. + # Create a cursor reaper. # - # @example Create a CursorManager. - # Mongo::Cluster::CursorManager.new(cluster) + # @example Create a CursorReaper. + # Mongo::Cluster::CursorReaper.new(cluster) # # @api private # @@ -48,16 +48,16 @@ def initialize(cluster) @cluster = cluster end - # Start the cursor manager's reaper thread. + # Start the cursor reapers's thread. # - # @example Start the cursor manager's reaper thread. - # manager.run + # @example Start the cursor reaper's thread. + # reaper.run # # @api private # # @since 2.3.0 def run - @reaper ||= Thread.new(FREQUENCY) do |i| + @thread ||= Thread.new(FREQUENCY) do |i| loop do sleep(i) kill_cursors @@ -68,7 +68,7 @@ def run # Schedule a kill cursors operation to be eventually executed. # # @example Schedule a kill cursors operation. - # manager.schedule_kill_cursor(id, op_spec, server) + # cursor_reaper.schedule_kill_cursor(id, op_spec, server) # # @param [ Integer ] id The id of the cursor to kill. # @param [ Hash ] op_spec The spec for the kill cursors op. @@ -89,7 +89,7 @@ def schedule_kill_cursor(id, op_spec, server) # Register a cursor id as active. # # @example Register a cursor as active. - # manager.register_cursor(id) + # cursor_reaper.register_cursor(id) # # @param [ Integer ] id The id of the cursor to register as active. # @@ -107,7 +107,7 @@ def register_cursor(id) # Unregister a cursor id, indicating that it's no longer active. # # @example Unregister a cursor. - # manager.unregister_cursor(id) + # cursor_reaper.unregister_cursor(id) # # @param [ Integer ] id The id of the cursor to unregister. # @@ -123,7 +123,7 @@ def unregister_cursor(id) # Execute all pending kill cursors operations. # # @example Execute pending kill cursors operations. - # manager.kill_cursors + # cursor_reaper.kill_cursors # # @api private # diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index da7f840cb9..aba1acb1f8 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -65,6 +65,21 @@ def initialize(view, result, server) server)) end + + # Finalize the cursor for garbage collection. Schedules this cursor to be included + # in a killCursors operation executed by the Cluster's CursorReaper. + # + # @example Finalize the cluster. + # Cursor.finalize(id, cluster, op, server) + # + # @param [ Integer ] cursor_id The cursor's id. + # @param [ Mongo::Cluster ] cluster The cluster associated with this cursor and its server. + # @param [ Hash ] op_spec The killCursors operation specification. + # @param [ Mongo::Server ] server The server to send the killCursors operation to. + # + # @return [ Proc ] The Finalizer. + # + # @since 2.3.0 def self.finalize(cursor_id, cluster, op_spec, server) proc { cluster.schedule_kill_cursor(cursor_id, op_spec, server) } end diff --git a/spec/mongo/cluster/cursor_manager_spec.rb b/spec/mongo/cluster/cursor_reaper_spec.rb similarity index 69% rename from spec/mongo/cluster/cursor_manager_spec.rb rename to spec/mongo/cluster/cursor_reaper_spec.rb index b1c823d40b..0b7fdd135c 100644 --- a/spec/mongo/cluster/cursor_manager_spec.rb +++ b/spec/mongo/cluster/cursor_reaper_spec.rb @@ -1,47 +1,47 @@ require 'spec_helper' -describe Mongo::Cluster::CursorManager do +describe Mongo::Cluster::CursorReaper do after do authorized_collection.delete_many end - let(:manager) do + let(:reaper) do described_class.new(authorized_client.cluster) end let(:active_cursors) do - manager.instance_variable_get(:@active_cursors) + reaper.instance_variable_get(:@active_cursors) end describe '#intialize' do it 'initializes a hash for servers and their kill cursors ops' do - expect(manager.instance_variable_get(:@to_kill)).to be_a(Hash) + expect(reaper.instance_variable_get(:@to_kill)).to be_a(Hash) end it 'initializes a set for the list of active cursors' do - expect(manager.instance_variable_get(:@active_cursors)).to be_a(Set) + expect(reaper.instance_variable_get(:@active_cursors)).to be_a(Set) end end describe '#run' do it 'starts a thread calling #kill_cursors' do - manager.run - expect(manager.instance_variable_get(:@reaper)).to be_a(Thread) + reaper.run + expect(reaper.instance_variable_get(:@thread)).to be_a(Thread) end context 'when run is called more than once' do let!(:reaper_thread) do - manager.run - manager.instance_variable_get(:@reaper) + reaper.run + reaper.instance_variable_get(:@reaper) end it 'only starts a thread once' do - manager.run - expect(manager.instance_variable_get(:@reaper)).to be(reaper_thread) + reaper.run + expect(reaper.instance_variable_get(:@reaper)).to be(reaper_thread) end end @@ -51,17 +51,17 @@ let(:cursor_id) { 1 } let(:op_spec_1) { double('op_spec_1') } let(:op_spec_2) { double('op_spec_2') } - let(:to_kill) { manager.instance_variable_get(:@to_kill)} + let(:to_kill) { reaper.instance_variable_get(:@to_kill)} before do - manager.register_cursor(cursor_id) - manager.schedule_kill_cursor(cursor_id, op_spec_1, server) - manager.run - sleep(Mongo::Cluster::CursorManager::FREQUENCY + 0.5) + reaper.register_cursor(cursor_id) + reaper.schedule_kill_cursor(cursor_id, op_spec_1, server) + reaper.run + sleep(Mongo::Cluster::CursorReaper::FREQUENCY + 0.5) end it 'executes the ops in the thread' do - expect(manager.instance_variable_get(:@to_kill).size).to eq(0) + expect(reaper.instance_variable_get(:@to_kill).size).to eq(0) end end end @@ -72,18 +72,18 @@ let(:cursor_id) { 1 } let(:op_spec_1) { double('op_spec_1') } let(:op_spec_2) { double('op_spec_2') } - let(:to_kill) { manager.instance_variable_get(:@to_kill)} + let(:to_kill) { reaper.instance_variable_get(:@to_kill)} context 'when the cursor is on the list of active cursors' do before do - manager.register_cursor(cursor_id) + reaper.register_cursor(cursor_id) end context 'when there is not a list already for the server' do before do - manager.schedule_kill_cursor(cursor_id, op_spec_1, server) + reaper.schedule_kill_cursor(cursor_id, op_spec_1, server) end it 'initializes the list of op specs to a set' do @@ -95,8 +95,8 @@ context 'when there is a list of ops already for the server' do before do - manager.schedule_kill_cursor(cursor_id, op_spec_1, server) - manager.schedule_kill_cursor(cursor_id, op_spec_2, server) + reaper.schedule_kill_cursor(cursor_id, op_spec_1, server) + reaper.schedule_kill_cursor(cursor_id, op_spec_2, server) end it 'adds the op to the server list' do @@ -107,7 +107,7 @@ context 'when the same op is added more than once' do before do - manager.schedule_kill_cursor(cursor_id, op_spec_2, server) + reaper.schedule_kill_cursor(cursor_id, op_spec_2, server) end it 'does not allow duplicates ops for a server' do @@ -121,7 +121,7 @@ context 'when the cursor is not on the list of active cursors' do before do - manager.schedule_kill_cursor(cursor_id, op_spec_1, server) + reaper.schedule_kill_cursor(cursor_id, op_spec_1, server) end it 'does not add the kill cursors op spec to the list' do @@ -133,7 +133,7 @@ describe '#register_cursor' do before do - manager.register_cursor(cursor_id) + reaper.register_cursor(cursor_id) end context 'when the cursor id is nil' do @@ -175,8 +175,8 @@ context 'when the cursor id is in the active cursors list' do before do - manager.register_cursor(2) - manager.unregister_cursor(2) + reaper.register_cursor(2) + reaper.unregister_cursor(2) end it 'removes the cursor id' do diff --git a/spec/mongo/cursor_spec.rb b/spec/mongo/cursor_spec.rb index 09116dc37e..b19ccb0306 100644 --- a/spec/mongo/cursor_spec.rb +++ b/spec/mongo/cursor_spec.rb @@ -241,7 +241,7 @@ before do authorized_collection.insert_many(documents) - cursor_manager.schedule_kill_cursor(cursor.id, + cursor_reaper.schedule_kill_cursor(cursor.id, cursor.send(:kill_cursors_op_spec), cursor.instance_variable_get(:@server)) end @@ -263,13 +263,13 @@ view.instance_variable_get(:@cursor) end - let(:cursor_manager) do - authorized_client.cluster.instance_variable_get(:@cursor_manager) + let(:cursor_reaper) do + authorized_client.cluster.instance_variable_get(:@cursor_reaper) end it 'schedules a kill cursors op' do - sleep(Mongo::Cluster::CursorManager::FREQUENCY + 0.5) + sleep(Mongo::Cluster::CursorReaper::FREQUENCY + 0.5) expect { cursor.to_a }.to raise_exception(Mongo::Error::OperationFailure) @@ -278,7 +278,7 @@ context 'when the cursor is unregistered before the kill cursors operations are executed' do it 'does not send a kill cursors operation for the unregistered cursor' do - cursor_manager.unregister_cursor(cursor.id) + cursor_reaper.unregister_cursor(cursor.id) expect(cursor.to_a.size).to eq(documents.size) end end @@ -312,13 +312,13 @@ view.to_enum end - let(:cursor_manager) do - authorized_collection.client.cluster.instance_variable_get(:@cursor_manager) + let(:cursor_reaper) do + authorized_collection.client.cluster.instance_variable_get(:@cursor_reaper) end it 'removes the cursor id from the active cursors tracked by the cluster cursor manager' do enum.next - expect(cursor_manager.instance_variable_get(:@active_cursors)).not_to include(cursor_id) + expect(cursor_reaper.instance_variable_get(:@active_cursors)).not_to include(cursor_id) end end end From 20c1a57594cc7243003923fdc301101b327ebe68 Mon Sep 17 00:00:00 2001 From: Emily Date: Tue, 7 Jun 2016 11:57:08 +0200 Subject: [PATCH 3/9] RUBY-1104 Add methods for restarting and stopping the cursor reaper --- lib/mongo/cluster.rb | 10 +++-- lib/mongo/cluster/cursor_reaper.rb | 57 +++++++++++++++--------- spec/mongo/cluster/cursor_reaper_spec.rb | 36 +++++++++++++-- spec/mongo/cluster_spec.rb | 14 +++++- 4 files changed, 88 insertions(+), 29 deletions(-) diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index 69e7ea312b..5afdb2c287 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -117,7 +117,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) seeds.each{ |seed| add(seed) } @cursor_reaper = CursorReaper.new(self) - @cursor_reaper.run + @cursor_reaper.run! ObjectSpace.define_finalizer(self, self.class.finalize(pools)) end @@ -136,7 +136,8 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) # @since 2.2.0 def self.finalize(pools) proc do - cursor_reaper.kill_cursors + begin; @cursor_reaper.kill_cursors; rescue; end + @cursor_reaper.stop! pools.values.each do |pool| pool.disconnect! end @@ -295,6 +296,8 @@ def servers # # @since 2.1.0 def disconnect! + begin; @cursor_reaper.kill_cursors; rescue; end + @cursor_reaper.stop! @servers.each { |server| server.disconnect! } and true end @@ -308,7 +311,8 @@ def disconnect! # @since 2.1.0 def reconnect! scan! - servers.each { |server| server.reconnect! } and true + servers.each { |server| server.reconnect! } + @cursor_reaper.restart! and true end # Add hosts in a description to the cluster. diff --git a/lib/mongo/cluster/cursor_reaper.rb b/lib/mongo/cluster/cursor_reaper.rb index f9e7e0d4a7..f187d8be05 100644 --- a/lib/mongo/cluster/cursor_reaper.rb +++ b/lib/mongo/cluster/cursor_reaper.rb @@ -48,22 +48,18 @@ def initialize(cluster) @cluster = cluster end - # Start the cursor reapers's thread. + # Start the cursor reaper's thread. # # @example Start the cursor reaper's thread. - # reaper.run + # reaper.run! # # @api private # # @since 2.3.0 - def run - @thread ||= Thread.new(FREQUENCY) do |i| - loop do - sleep(i) - kill_cursors - end - end + def run! + @thread && @thread.alive? ? @thread : start! end + alias :restart! :run! # Schedule a kill cursors operation to be eventually executed. # @@ -120,6 +116,18 @@ def unregister_cursor(id) end end + # Stop the cursor reaper's thread. + # + # @example Stop the cursor reaper's thread. + # reaper.stop! + # + # @api private + # + # @since 2.3.0 + def stop! + @thread.kill && @thread.stop? + end + # Execute all pending kill cursors operations. # # @example Execute pending kill cursors operations. @@ -140,22 +148,31 @@ def kill_cursors to_kill_copy.each do |server, op_specs| op_specs.each do |op_spec| - read_with_retry do - if server.features.find_command_enabled? - Cursor::Builder::KillCursorsCommand.update_cursors(op_spec, active_cursors_copy.to_a) - if Cursor::Builder::KillCursorsCommand.cursors(op_spec).size > 0 - Operation::Commands::Command.new(op_spec).execute(server.context) - end - else - Cursor::Builder::OpKillCursors.update_cursors(op_spec, active_cursors_copy.to_a) - if Cursor::Builder::OpKillCursors.cursors(op_spec).size > 0 - Operation::KillCursors.new(op_spec).execute(server.context) - end + if server.features.find_command_enabled? + Cursor::Builder::KillCursorsCommand.update_cursors(op_spec, active_cursors_copy.to_a) + if Cursor::Builder::KillCursorsCommand.cursors(op_spec).size > 0 + Operation::Commands::Command.new(op_spec).execute(server.context) + end + else + Cursor::Builder::OpKillCursors.update_cursors(op_spec, active_cursors_copy.to_a) + if Cursor::Builder::OpKillCursors.cursors(op_spec).size > 0 + Operation::KillCursors.new(op_spec).execute(server.context) end end end end end + + private + + def start! + @thread = Thread.new(FREQUENCY) do |i| + loop do + sleep(i) + kill_cursors + end + end + end end end end diff --git a/spec/mongo/cluster/cursor_reaper_spec.rb b/spec/mongo/cluster/cursor_reaper_spec.rb index 0b7fdd135c..9c7c44b793 100644 --- a/spec/mongo/cluster/cursor_reaper_spec.rb +++ b/spec/mongo/cluster/cursor_reaper_spec.rb @@ -28,19 +28,19 @@ describe '#run' do it 'starts a thread calling #kill_cursors' do - reaper.run + reaper.run! expect(reaper.instance_variable_get(:@thread)).to be_a(Thread) end context 'when run is called more than once' do let!(:reaper_thread) do - reaper.run + reaper.run! reaper.instance_variable_get(:@reaper) end it 'only starts a thread once' do - reaper.run + reaper.run! expect(reaper.instance_variable_get(:@reaper)).to be(reaper_thread) end end @@ -56,7 +56,7 @@ before do reaper.register_cursor(cursor_id) reaper.schedule_kill_cursor(cursor_id, op_spec_1, server) - reaper.run + reaper.run! sleep(Mongo::Cluster::CursorReaper::FREQUENCY + 0.5) end @@ -184,4 +184,32 @@ end end end + + describe '#stop!' do + + let(:thread) do + reaper.run! + reaper.stop! + sleep(0.5) + reaper.instance_variable_get(:@thread) + end + + it 'stops the thread from running' do + expect(thread.alive?).to be(false) + end + end + + describe '#restart!' do + + let(:thread) do + reaper.run! + reaper.stop! + reaper.restart! + reaper.instance_variable_get(:@thread) + end + + it 'stops the thread from running' do + expect(thread.alive?).to be(true) + end + end end diff --git a/spec/mongo/cluster_spec.rb b/spec/mongo/cluster_spec.rb index f021c7a1b6..7fb380f3da 100644 --- a/spec/mongo/cluster_spec.rb +++ b/spec/mongo/cluster_spec.rb @@ -223,26 +223,36 @@ cluster.instance_variable_get(:@servers) end + let(:cursor_reaper) do + cluster.instance_variable_get(:@cursor_reaper) + end + before do known_servers.each do |server| expect(server).to receive(:disconnect!).and_call_original end + expect(cursor_reaper).to receive(:stop!).and_call_original end - it 'disconnects each server and returns true' do + it 'disconnects each server and the cursor reaper and returns true' do expect(cluster.disconnect!).to be(true) end end describe '#reconnect!' do + let(:cursor_reaper) do + cluster.instance_variable_get(:@cursor_reaper) + end + before do cluster.servers.each do |server| expect(server).to receive(:reconnect!).and_call_original end + expect(cursor_reaper).to receive(:restart!).and_call_original end - it 'reconnects each server and returns true' do + it 'reconnects each server and the cursor reaper and returns true' do expect(cluster.reconnect!).to be(true) end end From 091622154cfb3a9e6073eba2543e017a00f7f46e Mon Sep 17 00:00:00 2001 From: Emily Date: Wed, 8 Jun 2016 15:16:55 +0200 Subject: [PATCH 4/9] RUBY-1104 No need to pass the cluster to CursorReaper --- lib/mongo/cluster.rb | 2 +- lib/mongo/cluster/cursor_reaper.rb | 6 +----- spec/mongo/cluster/cursor_reaper_spec.rb | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/lib/mongo/cluster.rb b/lib/mongo/cluster.rb index 5afdb2c287..9e7cf96f1a 100644 --- a/lib/mongo/cluster.rb +++ b/lib/mongo/cluster.rb @@ -116,7 +116,7 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) seeds.each{ |seed| add(seed) } - @cursor_reaper = CursorReaper.new(self) + @cursor_reaper = CursorReaper.new @cursor_reaper.run! ObjectSpace.define_finalizer(self, self.class.finalize(pools)) diff --git a/lib/mongo/cluster/cursor_reaper.rb b/lib/mongo/cluster/cursor_reaper.rb index f187d8be05..945b52a53d 100644 --- a/lib/mongo/cluster/cursor_reaper.rb +++ b/lib/mongo/cluster/cursor_reaper.rb @@ -25,9 +25,6 @@ class CursorReaper extend Forwardable include Retryable - # @return [ Mongo::Cluster ] The cluster associated with this cursor reaper. - attr_reader :cluster - # The default time interval for the cursor reaper to send pending kill cursors operations. # # @since 2.3.0 @@ -41,11 +38,10 @@ class CursorReaper # @api private # # @since 2.3.0 - def initialize(cluster) + def initialize @to_kill = {} @active_cursors = Set.new @mutex = Mutex.new - @cluster = cluster end # Start the cursor reaper's thread. diff --git a/spec/mongo/cluster/cursor_reaper_spec.rb b/spec/mongo/cluster/cursor_reaper_spec.rb index 9c7c44b793..dd5692f9ad 100644 --- a/spec/mongo/cluster/cursor_reaper_spec.rb +++ b/spec/mongo/cluster/cursor_reaper_spec.rb @@ -7,7 +7,7 @@ end let(:reaper) do - described_class.new(authorized_client.cluster) + described_class.new end let(:active_cursors) do From 129884ff4ca7fadcf43531d21ee137fba846713c Mon Sep 17 00:00:00 2001 From: Emily Date: Tue, 12 Jul 2016 18:42:58 +0200 Subject: [PATCH 5/9] RUBY-1104 Retry a kill cursors command once --- lib/mongo/cursor.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index aba1acb1f8..524ac96d0f 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -199,7 +199,7 @@ def get_more_operation def kill_cursors unregister_cursor - read_with_retry do + read_with_one_retry do kill_cursors_operation.execute(@server) end end From 8dbb11f99226683c0388fa3bc9c7983cd55d21ae Mon Sep 17 00:00:00 2001 From: Emily Date: Thu, 4 Aug 2016 15:50:20 +0200 Subject: [PATCH 6/9] RUBY-1104 Make method names more precise --- lib/mongo/cluster/cursor_reaper.rb | 4 ++-- lib/mongo/cursor/builder/kill_cursors_command.rb | 2 +- lib/mongo/cursor/builder/op_kill_cursors.rb | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/mongo/cluster/cursor_reaper.rb b/lib/mongo/cluster/cursor_reaper.rb index 945b52a53d..dc64e551cc 100644 --- a/lib/mongo/cluster/cursor_reaper.rb +++ b/lib/mongo/cluster/cursor_reaper.rb @@ -146,12 +146,12 @@ def kill_cursors op_specs.each do |op_spec| if server.features.find_command_enabled? Cursor::Builder::KillCursorsCommand.update_cursors(op_spec, active_cursors_copy.to_a) - if Cursor::Builder::KillCursorsCommand.cursors(op_spec).size > 0 + if Cursor::Builder::KillCursorsCommand.get_cursors_list(op_spec).size > 0 Operation::Commands::Command.new(op_spec).execute(server.context) end else Cursor::Builder::OpKillCursors.update_cursors(op_spec, active_cursors_copy.to_a) - if Cursor::Builder::OpKillCursors.cursors(op_spec).size > 0 + if Cursor::Builder::OpKillCursors.get_cursors_list(op_spec).size > 0 Operation::KillCursors.new(op_spec).execute(server.context) end end diff --git a/lib/mongo/cursor/builder/kill_cursors_command.rb b/lib/mongo/cursor/builder/kill_cursors_command.rb index 92ba1e227d..cc92b464db 100644 --- a/lib/mongo/cursor/builder/kill_cursors_command.rb +++ b/lib/mongo/cursor/builder/kill_cursors_command.rb @@ -80,7 +80,7 @@ def update_cursors(spec, ids) # @return [ Hash ] The specification. # # @since 2.3.0 - def cursors(spec) + def get_cursors_list(spec) spec[:selector][:cursors] end end diff --git a/lib/mongo/cursor/builder/op_kill_cursors.rb b/lib/mongo/cursor/builder/op_kill_cursors.rb index c650867223..98dab29484 100644 --- a/lib/mongo/cursor/builder/op_kill_cursors.rb +++ b/lib/mongo/cursor/builder/op_kill_cursors.rb @@ -74,7 +74,7 @@ def update_cursors(spec, ids) # @return [ Hash ] The specification. # # @since 2.3.0 - def cursors(spec) + def get_cursors_list(spec) spec[:cursor_ids] end end From e56571517b8dfeeb60f63eef30e8753fb3a4b8c1 Mon Sep 17 00:00:00 2001 From: Emily Date: Thu, 4 Aug 2016 15:53:41 +0200 Subject: [PATCH 7/9] RUBY-1104 No need to mention cursor in method names on Cursor --- lib/mongo/cursor.rb | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 524ac96d0f..bf8e754609 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -58,7 +58,7 @@ def initialize(view, result, server) @initial_result = result @remaining = limit if limited? @cursor_id = result.cursor_id - register_cursor + register ObjectSpace.define_finalizer(self, self.class.finalize(result.cursor_id, cluster, kill_cursors_op_spec, @@ -69,7 +69,7 @@ def initialize(view, result, server) # Finalize the cursor for garbage collection. Schedules this cursor to be included # in a killCursors operation executed by the Cluster's CursorReaper. # - # @example Finalize the cluster. + # @example Finalize the cursor. # Cursor.finalize(id, cluster, op, server) # # @param [ Integer ] cursor_id The cursor's id. @@ -198,7 +198,7 @@ def get_more_operation end def kill_cursors - unregister_cursor + unregister read_with_one_retry do kill_cursors_operation.execute(@server) end @@ -231,7 +231,7 @@ def more? def process(result) @remaining -= result.returned_count if limited? @coll_name ||= result.namespace.sub("#{database.name}.", '') if result.namespace - unregister_cursor if result.cursor_id == 0 + unregister if result.cursor_id == 0 @cursor_id = result.cursor_id result.documents end @@ -240,11 +240,11 @@ def use_limit? limited? && batch_size >= @remaining end - def register_cursor + def register cluster.register_cursor(@cursor_id) end - def unregister_cursor + def unregister cluster.unregister_cursor(@cursor_id) end end From 7ae74df7f97521e0a3ea6afa1cceeb6e61be2633 Mon Sep 17 00:00:00 2001 From: Emily Date: Thu, 4 Aug 2016 15:54:06 +0200 Subject: [PATCH 8/9] RUBY-1104 Improve test language and matcher use --- spec/mongo/cluster/cursor_reaper_spec.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/mongo/cluster/cursor_reaper_spec.rb b/spec/mongo/cluster/cursor_reaper_spec.rb index dd5692f9ad..752290a3e1 100644 --- a/spec/mongo/cluster/cursor_reaper_spec.rb +++ b/spec/mongo/cluster/cursor_reaper_spec.rb @@ -101,7 +101,7 @@ it 'adds the op to the server list' do expect(to_kill.keys).to eq([ server ]) - expect(to_kill[server]).to eq(Set.new([op_spec_1, op_spec_2])) + expect(to_kill[server]).to contain_exactly(op_spec_1, op_spec_2) end context 'when the same op is added more than once' do @@ -112,7 +112,7 @@ it 'does not allow duplicates ops for a server' do expect(to_kill.keys).to eq([ server ]) - expect(to_kill[server]).to eq(Set.new([op_spec_1, op_spec_2])) + expect(to_kill[server]).to contain_exactly(op_spec_1, op_spec_2) end end end @@ -180,7 +180,7 @@ end it 'removes the cursor id' do - expect(active_cursors).to eq(Set.new) + expect(active_cursors.size).to eq(0) end end end @@ -208,7 +208,7 @@ reaper.instance_variable_get(:@thread) end - it 'stops the thread from running' do + it 'restarts the thread' do expect(thread.alive?).to be(true) end end From 467305fa0b83e682ea0c5acf082d5e5b5cc1d544 Mon Sep 17 00:00:00 2001 From: Emily Date: Thu, 4 Aug 2016 15:55:57 +0200 Subject: [PATCH 9/9] RUBY-1104 Operations are executed on servers directly now --- lib/mongo/cluster/cursor_reaper.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/mongo/cluster/cursor_reaper.rb b/lib/mongo/cluster/cursor_reaper.rb index dc64e551cc..155e463e6b 100644 --- a/lib/mongo/cluster/cursor_reaper.rb +++ b/lib/mongo/cluster/cursor_reaper.rb @@ -147,12 +147,12 @@ def kill_cursors if server.features.find_command_enabled? Cursor::Builder::KillCursorsCommand.update_cursors(op_spec, active_cursors_copy.to_a) if Cursor::Builder::KillCursorsCommand.get_cursors_list(op_spec).size > 0 - Operation::Commands::Command.new(op_spec).execute(server.context) + Operation::Commands::Command.new(op_spec).execute(server) end else Cursor::Builder::OpKillCursors.update_cursors(op_spec, active_cursors_copy.to_a) if Cursor::Builder::OpKillCursors.get_cursors_list(op_spec).size > 0 - Operation::KillCursors.new(op_spec).execute(server.context) + Operation::KillCursors.new(op_spec).execute(server) end end end