Permalink
Browse files

* lib/thread.rb (Queue#pop): Fixed double registration issue when

  mutex.sleep is interrupted. [Bug #5258] [ruby-dev:44448]
* lib/thread.rb (SizedQueue#push): ditto.

* test/thread/test_queue.rb (test_sized_queue_and_wakeup,
test_queue_pop_interrupt, test_sized_queue_pop_interrupt,
test_sized_queue_push_interrupt): new tests.

git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36938 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
  • Loading branch information...
1 parent 61f5305 commit 7198053a49d39a5c80551fc9147b9c0ab21e75a2 @kosaki kosaki committed Sep 9, 2012
Showing with 73 additions and 13 deletions.
  1. +10 −0 ChangeLog
  2. +21 −13 lib/thread.rb
  3. +42 −0 test/thread/test_queue.rb
View
10 ChangeLog
@@ -1,3 +1,13 @@
+Sun Sep 9 21:21:15 2012 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
+
+ * lib/thread.rb (Queue#pop): Fixed double registration issue when
+ mutex.sleep is interrupted. [Bug #5258] [ruby-dev:44448]
+ * lib/thread.rb (SizedQueue#push): ditto.
+
+ * test/thread/test_queue.rb (test_sized_queue_and_wakeup,
+ test_queue_pop_interrupt, test_sized_queue_pop_interrupt,
+ test_sized_queue_push_interrupt): new tests.
+
Sun Sep 9 20:20:31 2012 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
* lib/sync.rb (Sync_m#sync_lock): Fixed wakeup/raise unsafe code.
View
34 lib/thread.rb
@@ -182,16 +182,20 @@ def push(obj)
#
def pop(non_block=false)
@mutex.synchronize{
- while true
- if @que.empty?
- raise ThreadError, "queue empty" if non_block
- # @waiting.include? check is necessary for avoiding a race against
- # Thread.wakeup [Bug 5195]
- @waiting.push Thread.current unless @waiting.include?(Thread.current)
- @mutex.sleep
- else
- return @que.shift
+ begin
+ while true
+ if @que.empty?
+ raise ThreadError, "queue empty" if non_block
+ # @waiting.include? check is necessary for avoiding a race against
+ # Thread.wakeup [Bug 5195]
+ @waiting.push Thread.current unless @waiting.include?(Thread.current)
+ @mutex.sleep
+ else
+ return @que.shift
+ end
end
+ ensure
+ @waiting.delete(Thread.current)
end
}
end
@@ -298,10 +302,14 @@ def max=(max)
#
def push(obj)
@mutex.synchronize{
- while true
- break if @que.length < @max
- @queue_wait.push Thread.current
- @mutex.sleep
+ begin
+ while true
+ break if @que.length < @max
+ @queue_wait.push Thread.current unless @queue_wait.include?(Thread.current)
+ @mutex.sleep
+ end
+ ensure
+ @queue_wait.delete(Thread.current)
end
@que.push obj
View
42 test/thread/test_queue.rb
@@ -56,6 +56,48 @@ def test_sized_queue_assign_max
assert_equal(1, q.max)
end
+ def test_sized_queue_and_wakeup
+ sq = SizedQueue.new(1)
+ sq.push(0)
+
+ t1 = Thread.start { sq.push(1) ; sleep }
+
+ sleep 0.1 until t1.stop?
+ t1.wakeup
+ sleep 0.1 until t1.stop?
+
+ t2 = Thread.start { sq.push(2) }
+ sleep 0.1 until t1.stop? && t2.stop?
+
+ queue_wait = sq.instance_eval{ @queue_wait }
+ assert_equal(queue_wait.uniq, queue_wait)
+ end
+
+ def test_queue_pop_interrupt
+ q = Queue.new
+ t1 = Thread.new { q.pop }
+ sleep 0.01 until t1.stop?
+ t1.kill.join
+ assert_equal(0, q.num_waiting)
+ end
+
+ def test_sized_queue_pop_interrupt
+ q = SizedQueue.new(1)
+ t1 = Thread.new { q.pop }
+ sleep 0.01 until t1.stop?
+ t1.kill.join
+ assert_equal(0, q.num_waiting)
+ end
+
+ def test_sized_queue_push_interrupt
+ q = SizedQueue.new(1)
+ q.push(1)
+ t1 = Thread.new { q.push(2) }
+ sleep 0.01 until t1.stop?
+ t1.kill.join
+ assert_equal(0, q.num_waiting)
+ end
+
def test_thr_kill
bug5343 = '[ruby-core:39634]'
Dir.mktmpdir {|d|

0 comments on commit 7198053

Please sign in to comment.