Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

fix a deadlock with threaded queries

  • Loading branch information...
commit 1e4728a77f32c4c339417729ea14377aa9425e73 1 parent 691e65f
Mike Dirolf authored
View
14 lib/mongo/cursor.rb
@@ -194,8 +194,10 @@ def next_object_on_wire
def refill_via_get_more
send_query_if_needed
return if @cursor_id == 0
- @db.send_to_db(GetMoreMessage.new(@admin ? 'admin' : @db.name, @collection.name, @cursor_id))
- read_all
+ @db._synchronize {
+ @db.send_to_db(GetMoreMessage.new(@admin ? 'admin' : @db.name, @collection.name, @cursor_id))
+ read_all
+ }
end
def object_from_stream
@@ -212,9 +214,11 @@ def object_from_stream
def send_query_if_needed
# Run query first time we request an object from the wire
unless @query_run
- @db.send_query_message(QueryMessage.new(@admin ? 'admin' : @db.name, @collection.name, @query))
- @query_run = true
- read_all
+ @db._synchronize {
+ @db.send_query_message(QueryMessage.new(@admin ? 'admin' : @db.name, @collection.name, @query))
+ @query_run = true
+ read_all
+ }
end
end
View
18 lib/mongo/db.rb
@@ -345,15 +345,13 @@ def query(collection, query, admin=false)
# Used by a Cursor to lazily send the query to the database.
def send_query_message(query_message)
- @semaphore.synchronize {
- send_to_db(query_message)
- }
+ send_to_db(query_message)
end
# Remove the records that match +selector+ from +collection_name+.
# Normally called by Collection#remove or Collection#clear.
def remove_from_db(collection_name, selector)
- @semaphore.synchronize {
+ _synchronize {
send_to_db(RemoveMessage.new(@name, collection_name, selector))
}
end
@@ -361,7 +359,7 @@ def remove_from_db(collection_name, selector)
# Update records in +collection_name+ that match +selector+ by
# applying +obj+ as an update. Normally called by Collection#replace.
def replace_in_db(collection_name, selector, obj)
- @semaphore.synchronize {
+ _synchronize {
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, false))
}
end
@@ -373,7 +371,7 @@ def replace_in_db(collection_name, selector, obj)
# applying +obj+ as an update. If no match, inserts (???). Normally
# called by Collection#repsert.
def repsert_in_db(collection_name, selector, obj)
- @semaphore.synchronize {
+ _synchronize {
obj = @pk_factory.create_pk(obj) if @pk_factory
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, true))
obj
@@ -469,7 +467,7 @@ def create_index(collection_name, field_or_spec, unique=false)
:key => field_h,
:unique => unique
}
- @semaphore.synchronize {
+ _synchronize {
send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, false, sel))
}
name
@@ -479,7 +477,7 @@ def create_index(collection_name, field_or_spec, unique=false)
# Collection#insert. Returns a new array containing +objects+,
# possibly modified by @pk_factory.
def insert_into_db(collection_name, objects)
- @semaphore.synchronize {
+ _synchronize {
if @pk_factory
objects.collect! { |o|
@pk_factory.create_pk(o)
@@ -532,6 +530,10 @@ def db_command(selector, use_admin_db=false)
query(Collection.new(self, SYSTEM_COMMAND_COLLECTION), q, use_admin_db).next_object
end
+ def _synchronize &block
+ @semaphore.synchronize &block
+ end
+
private
def hash_password(username, plaintext)
View
1  mongo-ruby-driver.gemspec
@@ -75,6 +75,7 @@ TEST_FILES = ['tests/mongo-qa/_common.rb',
'tests/test_mongo.rb',
'tests/test_objectid.rb',
'tests/test_ordered_hash.rb',
+ 'tests/test_threading.rb',
'tests/test_round_trip.rb']
Gem::Specification.new do |s|
View
37 tests/test_threading.rb
@@ -0,0 +1,37 @@
+$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
+require 'mongo'
+require 'test/unit'
+
+class TestThreading < Test::Unit::TestCase
+
+ include XGen::Mongo::Driver
+
+ @@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
+ @@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT
+ @@db = Mongo.new(@@host, @@port).db('ruby-mongo-test')
+ @@coll = @@db.collection('thread-test-collection')
+
+ def test_threading
+ @@coll.clear
+
+ 1000.times do |i|
+ @@coll.insert("x" => i)
+ end
+
+ threads = []
+
+ 10.times do |i|
+ threads[i] = Thread.new{
+ sum = 0
+ @@coll.find().each { |document|
+ sum += document["x"]
+ }
+ assert_equal 499500, sum
+ }
+ end
+
+ 10.times do |i|
+ threads[i].join
+ end
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.