Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
require 'mongo/id'
require 'mongo/bson'
require 'mongo/semaphore'
require 'mongo/distinguishing_semaphore'
require 'mongo/options'
require 'mongo/loggable'
require 'mongo/cluster_time'
Expand Down
55 changes: 55 additions & 0 deletions lib/mongo/distinguishing_semaphore.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright (C) 2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
# This is a semaphore that distinguishes waits ending due to the timeout
# being reached from waits ending due to the semaphore being signaled.
#
# @api private
class DistinguishingSemaphore
def initialize
@lock = Mutex.new
@cv = ConditionVariable.new
@queue = []
end

# Waits for the semaphore to be signaled up to timeout seconds.
# If semaphore is not signaled, returns after timeout seconds.
#
# @return [ true | false ] true if semaphore was signaled, false if
# timeout was reached.
def wait(timeout = nil)
@lock.synchronize do
@cv.wait(@lock, timeout)
(!@queue.empty?).tap do
@queue.clear
end
end
end

def broadcast
@lock.synchronize do
@queue.push(true)
@cv.broadcast
end
end

def signal
@lock.synchronize do
@queue.push(true)
@cv.signal
end
end
end
end
63 changes: 63 additions & 0 deletions spec/mongo/distinguishing_semaphore_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
require 'lite_spec_helper'

describe Mongo::DistinguishingSemaphore do
let(:semaphore) do
described_class.new
end

it 'waits until signaled' do
result = nil

consumer = Thread.new do
result = semaphore.wait(3)
end

# Context switch to start the thread
sleep 0.1

start_time = Time.now
semaphore.signal
consumer.join

(Time.now - start_time).should < 1

result.should be true
end

it 'waits until broadcast' do
result = nil

consumer = Thread.new do
result = semaphore.wait(3)
end

# Context switch to start the thread
sleep 0.1

start_time = Time.now
semaphore.broadcast
consumer.join

(Time.now - start_time).should < 1

result.should be true
end

it 'times out' do
result = nil

consumer = Thread.new do
result = semaphore.wait(2)
end

# Context switch to start the thread
sleep 0.1

start_time = Time.now
consumer.join

(Time.now - start_time).should > 1

result.should be false
end
end
51 changes: 51 additions & 0 deletions spec/mongo/semaphore_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
require 'lite_spec_helper'

describe Mongo::Semaphore do
let(:semaphore) do
described_class.new
end

it 'waits until signaled' do
consumer = Thread.new do
semaphore.wait(3)
end

# Context switch to start the thread
sleep 0.1

start_time = Time.now
semaphore.signal
consumer.join

(Time.now - start_time).should < 1
end

it 'waits until broadcast' do
consumer = Thread.new do
semaphore.wait(3)
end

# Context switch to start the thread
sleep 0.1

start_time = Time.now
semaphore.broadcast
consumer.join

(Time.now - start_time).should < 1
end

it 'times out' do
consumer = Thread.new do
semaphore.wait(2)
end

# Context switch to start the thread
sleep 0.1

start_time = Time.now
consumer.join

(Time.now - start_time).should > 1
end
end