Skip to content

Commit

Permalink
Merge pull request #128 from mvidner/threads-signals
Browse files Browse the repository at this point in the history
Signal thread safety: Add a mutex to MessageQueue#push,pop
  • Loading branch information
mvidner committed Mar 21, 2023
2 parents 51913bf + 045a33d commit 4c51ecc
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 23 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Features:
* For EXTERNAL authentication, try also without the user id, to work with
containers ([#126][]).
* Thread safety, as long as the non-main threads only send signals.

[#126]: https://github.com/mvidner/ruby-dbus/issues/126

Expand Down
28 changes: 17 additions & 11 deletions lib/dbus/message_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def initialize(address)
@address = address
@buffer = ""
@is_tcp = false
@mutex = Mutex.new
connect
end

Expand All @@ -33,23 +34,28 @@ def initialize(address)
# @raise EOFError
# @todo failure modes
def pop(blocking: true)
buffer_from_socket_nonblock
message = message_from_buffer_nonblock
if blocking
# we can block
while message.nil?
r, _d, _d = IO.select([@socket])
if r && r[0] == @socket
buffer_from_socket_nonblock
message = message_from_buffer_nonblock
# FIXME: this is not enough, the R/W test deadlocks on shared connections
@mutex.synchronize do
buffer_from_socket_nonblock
message = message_from_buffer_nonblock
if blocking
# we can block
while message.nil?
r, _d, _d = IO.select([@socket])
if r && r[0] == @socket
buffer_from_socket_nonblock
message = message_from_buffer_nonblock
end
end
end
message
end
message
end

def push(message)
@socket.write(message.marshall)
@mutex.synchronize do
@socket.write(message.marshall)
end
end
alias << push

Expand Down
67 changes: 55 additions & 12 deletions spec/thread_safety_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,71 @@
require_relative "spec_helper"
require "dbus"

describe "ThreadSafetyTest" do
it "tests thread competition" do
print "Thread competition: "
jobs = []
5.times do
jobs << Thread.new do
Thread.current.abort_on_exception = true
class TestSignalRace < DBus::Object
dbus_interface "org.ruby.ServerTest" do
dbus_signal :signal_without_arguments
end
end

# Run *count* threads all doing *body*, wait for their finish
def race_threads(count, &body)
jobs = count.times.map do |j|
Thread.new do
Thread.current.abort_on_exception = true

body.call(j)
end
end
jobs.each(&:join)
end

# Repeat *count* times: { random sleep, *body* }, printing progress
def repeat_with_jitter(count, &body)
count.times do |i|
sleep 0.1 * rand
print "#{i} "
$stdout.flush

body.call
end
end

describe "thread safety" do
context "R/W: when the threads call methods with return values" do
it "it works with separate bus connections" do
race_threads(5) do |_j|
# use separate connections to avoid races
bus = DBus::ASessionBus.new
svc = bus.service("org.ruby.service")
obj = svc.object("/org/ruby/MyInstance")
obj.default_iface = "org.ruby.SampleInterface"

10.times do |i|
print "#{i} "
$stdout.flush
repeat_with_jitter(10) do
expect(obj.the_answer[0]).to eq(42)
sleep 0.1 * rand
end
end
puts
end
end

context "W/O: when the threads only send signals" do
it "it works with a shared separate bus connection" do
race_threads(5) do |j|
# shared connection
bus = DBus::SessionBus.instance
# hackish: we do not actually request the name
svc = DBus::Service.new("org.ruby.server-test#{j}", bus)

obj = TestSignalRace.new "/org/ruby/Foo"
svc.export obj

repeat_with_jitter(10) do
obj.signal_without_arguments
end

svc.unexport(obj)
end
puts
end
jobs.each(&:join)
end
end

0 comments on commit 4c51ecc

Please sign in to comment.