Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Further changes to the replica set refresh test to address race conditions. #165

Closed
wants to merge 4 commits into from

1 participant

Emily
Emily
Collaborator

In the first test, test_connect_and_manual_refresh_with_secondary_down, I'm stopping only one secondary as opposed to all of them so that the primary doesn't step itself down and become a secondary. I assert that the number of secondaries has decreased by one after a refresh.

In the second test, test_automated_refresh_with_secondary_down, I stop the secondary that has been pinned for reads. Then I assert that the version and read pool change after an automatic refresh.

Emily estolfo closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
2  lib/mongo/cursor.rb
@@ -552,7 +552,7 @@ def construct_query_message
552 552 message.put_int(@options)
553 553 BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@collection.name}")
554 554 message.put_int(@skip)
555   - message.put_int(@limit)
  555 + @batch_size > 1 ? message.put_int(@batch_size) : message.put_int(@limit)
556 556 spec = query_contains_special_fields? ? construct_query_spec : @selector
557 557 message.put_binary(BSON::BSON_CODER.serialize(spec, false, false, @connection.max_bson_size).to_s)
558 558 message.put_binary(BSON::BSON_CODER.serialize(@fields, false, false, @connection.max_bson_size).to_s) if @fields
60 test/functional/collection_test.rb
@@ -902,6 +902,66 @@ def test_save_symbol_find_string
902 902 # assert_equal :mike, @@test.find_one("foo" => "mike")["foo"]
903 903 end
904 904
  905 + def test_batch_size
  906 + n_docs = 6
  907 + batch_size = n_docs/2
  908 + n_docs.times do |i|
  909 + @@test.save(:foo => i)
  910 + end
  911 +
  912 + doc_count = 0
  913 + cursor = @@test.find({}, :batch_size => batch_size)
  914 + cursor.next
  915 + assert_equal batch_size, cursor.instance_variable_get(:@returned)
  916 + doc_count += batch_size
  917 + batch_size.times { cursor.next }
  918 + assert_equal doc_count + batch_size, cursor.instance_variable_get(:@returned)
  919 + doc_count += batch_size
  920 + assert_equal n_docs, doc_count
  921 + end
  922 +
  923 + def test_batch_size_with_smaller_limit
  924 + n_docs = 6
  925 + batch_size = n_docs/2
  926 + n_docs.times do |i|
  927 + @@test.insert(:foo => i)
  928 + end
  929 +
  930 + cursor = @@test.find({}, :batch_size => batch_size, :limit => 2)
  931 + cursor.next
  932 + assert_equal 2, cursor.instance_variable_get(:@returned)
  933 + end
  934 +
  935 + def test_batch_size_with_larger_limit
  936 + n_docs = 6
  937 + batch_size = n_docs/2
  938 + n_docs.times do |i|
  939 + @@test.insert(:foo => i)
  940 + end
  941 +
  942 + doc_count = 0
  943 + cursor = @@test.find({}, :batch_size => batch_size, :limit => n_docs + 5)
  944 + cursor.next
  945 + assert_equal batch_size, cursor.instance_variable_get(:@returned)
  946 + doc_count += batch_size
  947 + batch_size.times { cursor.next }
  948 + assert_equal doc_count + batch_size, cursor.instance_variable_get(:@returned)
  949 + doc_count += batch_size
  950 + assert_equal n_docs, doc_count
  951 +end
  952 +
  953 + def test_batch_size_with_negative_limit
  954 + n_docs = 6
  955 + batch_size = n_docs/2
  956 + n_docs.times do |i|
  957 + @@test.insert(:foo => i)
  958 + end
  959 +
  960 + cursor = @@test.find({}, :batch_size => batch_size, :limit => -7)
  961 + cursor.next
  962 + assert_equal n_docs, cursor.instance_variable_get(:@returned)
  963 + end
  964 +
905 965 def test_limit_and_skip
906 966 10.times do |i|
907 967 @@test.save(:foo => i)
30 test/replica_set/cursor_test.rb
@@ -41,10 +41,14 @@ def test_cursors_get_closed_secondary_query
41 41 assert_cursors_on_members(:secondary)
42 42 end
43 43
  44 + def test_intervening_query_secondary
  45 + setup_client(:primary)
  46 + refresh_while_iterating(:secondary)
  47 + end
  48 +
44 49 private
45 50
46 51 def setup_client(read=:primary)
47   - route_read ||= read
48 52 # Setup ReplicaSet Connection
49 53 @client = MongoReplicaSetClient.new(@rs.repl_set_seeds, :read => read)
50 54
@@ -52,9 +56,6 @@ def setup_client(read=:primary)
52 56 @db.drop_collection("cursor_tests")
53 57 @coll = @db.collection("cursor_tests")
54 58 insert_docs
55   -
56   - # Setup Direct Connections
57   - @primary = Mongo::MongoClient.new(*@client.manager.primary)
58 59 end
59 60
60 61 def insert_docs
@@ -169,4 +170,25 @@ def assert_cursors_on_members(read=:primary)
169 170 cursor_clone.next
170 171 end
171 172 end
  173 +
  174 + def refresh_while_iterating(read)
  175 + set_read_client_and_tag(read)
  176 +
  177 + read_opts = {:read => read}
  178 + read_opts[:tag_sets] = [{:node => @tag}]
  179 + read_opts[:batch_size] = 2
  180 + cursor = @coll.find({}, read_opts)
  181 +
  182 + 2.times { cursor.next }
  183 + port = cursor.instance_variable_get(:@pool).port
  184 + host = cursor.instance_variable_get(:@pool).host
  185 + # Refresh connection
  186 + @client.refresh
  187 + assert_nothing_raised do
  188 + cursor.next
  189 + end
  190 +
  191 + assert_equal port, cursor.instance_variable_get(:@pool).port
  192 + assert_equal host, cursor.instance_variable_get(:@pool).host
  193 + end
172 194 end
69 test/replica_set/refresh_test.rb
... ... @@ -1,5 +1,4 @@
1 1 require 'test_helper'
2   -require 'benchmark'
3 2
4 3 class ReplicaSetRefreshTest < Test::Unit::TestCase
5 4
@@ -7,98 +6,76 @@ def setup
7 6 ensure_cluster(:rs)
8 7 end
9 8
10   - def test_connect_and_manual_refresh_with_secondaries_down
  9 + def test_connect_and_manual_refresh_with_secondary_down
11 10 num_secondaries = @rs.secondaries.size
12 11 client = MongoReplicaSetClient.new(@rs.repl_set_seeds, :refresh_mode => false)
13 12
14 13 assert_equal num_secondaries, client.secondaries.size
15 14 assert client.connected?
16 15 assert_equal client.read_pool, client.primary_pool
  16 + old_refresh_version = client.refresh_version
17 17
18   - @rs.secondaries.each {|s| s.stop}
  18 + @rs.stop_secondary
19 19
20 20 client.refresh
21   - assert client.secondaries.empty?
  21 + assert_equal num_secondaries - 1, client.secondaries.size
22 22 assert client.connected?
23 23 assert_equal client.read_pool, client.primary_pool
  24 + assert client.refresh_version > old_refresh_version
  25 + old_refresh_version = client.refresh_version
24 26
25 27 # Test no changes after restart until manual refresh
26 28 @rs.restart
27   - assert client.secondaries.empty?
  29 + assert_equal num_secondaries - 1, client.secondaries.size
28 30 assert client.connected?
29 31 assert_equal client.read_pool, client.primary_pool
  32 + assert_equal client.refresh_version, old_refresh_version
30 33
31 34 # Refresh and ensure state
32 35 client.refresh
33   - assert_equal client.read_pool, client.primary_pool
34 36 assert_equal num_secondaries, client.secondaries.size
  37 + assert client.connected?
  38 + assert_equal client.read_pool, client.primary_pool
  39 + assert client.refresh_version > old_refresh_version
35 40 end
36 41
37   - def test_automated_refresh_with_secondaries_down
  42 + def test_automated_refresh_with_secondary_down
38 43 num_secondaries = @rs.secondaries.size
39 44 client = MongoReplicaSetClient.new(@rs.repl_set_seeds,
40 45 :refresh_interval => 1, :refresh_mode => :sync, :read => :secondary_preferred)
41 46
42   - # Ensure secondaries not available and read from primary
  47 + # Ensure secondaries are all recognized by client and client is connected
43 48 assert_equal num_secondaries, client.secondaries.size
44 49 assert client.connected?
45 50 assert client.secondary_pools.include?(client.read_pool)
  51 + pool = client.read_pool
46 52
47   - @rs.secondaries.each{|s| s.stop}
  53 + @rs.member_by_name(pool.host_string).stop
48 54 sleep(2)
49 55
  56 + old_refresh_version = client.refresh_version
  57 + # Trigger synchronous refresh
50 58 client['foo']['bar'].find_one
51 59
52   - assert client.secondaries.empty?
53 60 assert client.connected?
54   - assert_equal client.read_pool, client.primary_pool
55   -
56   - old_refresh_version = client.refresh_version
  61 + assert client.refresh_version > old_refresh_version
  62 + assert_equal num_secondaries - 1, client.secondaries.size
  63 + assert client.secondary_pools.include?(client.read_pool)
  64 + assert_not_equal pool, client.read_pool
57 65
58 66 # Restart nodes and ensure refresh interval has passed
59 67 @rs.restart
60 68 sleep(2)
61 69
62   - assert client.refresh_version == old_refresh_version,
63   - "Refresh version has changed."
64   -
  70 + old_refresh_version = client.refresh_version
65 71 # Trigger synchronous refresh
66 72 client['foo']['bar'].find_one
67 73
  74 + assert client.connected?
68 75 assert client.refresh_version > old_refresh_version,
69 76 "Refresh version hasn't changed."
70 77 assert_equal num_secondaries, client.secondaries.size
71 78 "No secondaries have been added."
72   - assert client.manager.read_pool != client.manager.primary,
73   - "Read pool and primary pool are identical."
74   - end
75   -
76   - def test_automated_refresh_when_secondary_goes_down
77   - client = MongoReplicaSetClient.new(@rs.repl_set_seeds,
78   - :refresh_interval => 1, :refresh_mode => :sync)
79   -
80   - num_secondaries = client.secondary_pools.size
81   - old_refresh_version = client.refresh_version
82   -
83   - @rs.stop_secondary
84   - sleep(2)
85   -
86   - assert client.refresh_version == old_refresh_version,
87   - "Refresh version has changed."
88   -
89   - client['foo']['bar'].find_one
90   -
91   - assert client.refresh_version > old_refresh_version,
92   - "Refresh version hasn't changed."
93   - assert_equal num_secondaries - 1, client.secondaries.size
94   - assert_equal num_secondaries - 1, client.secondary_pools.size
95   -
96   - @rs.restart
97   - sleep(2)
98   -
99   - client['foo']['bar'].find_one
100   -
101   - assert_equal num_secondaries, client.secondaries.size
102 79 assert_equal num_secondaries, client.secondary_pools.size
103 80 end
104 81 =begin
6 test/tools/mongo_config.rb
@@ -430,10 +430,14 @@ def arbiter_names
430 430
431 431 def members_by_name(names)
432 432 names.collect do |name|
433   - servers.find{|server| server.host_port == name}
  433 + member_by_name(name)
434 434 end.compact
435 435 end
436 436
  437 + def member_by_name(name)
  438 + servers.find{|server| server.host_port == name}
  439 + end
  440 +
437 441 def primary
438 442 members_by_name([primary_name]).first
439 443 end

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.