diff --git a/doc/channel.md b/doc/channel.md index 0a921fc89..1461803c1 100644 --- a/doc/channel.md +++ b/doc/channel.md @@ -57,7 +57,7 @@ msg = messages.take puts msg ``` -By default, channels are *unbuffered*, meaning that they have a size of zero and only accept puts and takes when both a putting and a taking thread are available. If a `put` is started when there is no taker thread the call will block. As soon as another thread calls `take` the exchange will occur and both calls will return on their respective threads. Similarly, is a `take` is started when there is no putting thread the call will block until another thread calls `put`. +By default, channels are *unbuffered*, meaning that they have a capacity of zero and only accept puts and takes when both a putting and a taking thread are available. If a `put` is started when there is no taker thread the call will block. As soon as another thread calls `take` the exchange will occur and both calls will return on their respective threads. Similarly, is a `take` is started when there is no putting thread the call will block until another thread calls `put`. The following, slightly more complex example, concurrently sums two different halves of a list then combines the results. It uses an unbuffered channel to pass the results from the two goroutines back to the main thread. The main thread blocks on the two `take` calls until the worker goroutines are done. This example also uses the convenience aliases {#<<} and {#~}. Since channels in Go are part of the language, channel operations are performed using special channel operators rather than functions. These operators help clearly indicate that channel operations are being performed. The operator overloads `<<` for `put` and `~` for `take` help reinforce this idea in Ruby. @@ -80,12 +80,12 @@ puts [x, y, x+y].join(' ') ## Channel Buffering -One common channel variation is a *buffered* channel. A buffered channel has a finite number of slots in the buffer which can be filled. Putting threads can put values into the channel even if there is no taking threads, up to the point where the buffer is filled. Once a buffer becomes full the normal blocking behavior resumes. A buffered channel is created by giving a `:size` option on channel creation: +One common channel variation is a *buffered* channel. A buffered channel has a finite number of slots in the buffer which can be filled. Putting threads can put values into the channel even if there is no taking threads, up to the point where the buffer is filled. Once a buffer becomes full the normal blocking behavior resumes. A buffered channel is created by giving a `:capacity` option on channel creation: The following example creates a buffered channel with two slots. It then makes two `put` calls, adding values to the channel. These calls do not block because the buffer has room. Were a third `put` call to be made before an `take` calls, the third `put` would block. ```ruby -ch = Concurrent::Channel.new(size: 2) +ch = Concurrent::Channel.new(capacity: 2) ch << 1 ch << 2 @@ -95,7 +95,7 @@ puts ~ch ## Channel Synchronization -The main purpose of channels is to synchronize operations across goroutines. One common pattern for this is to created a `size: 1` buffered channel which is used to signal that work is complete. The following example calls a `worker` function on a goroutine and passes it a "done" channel. The main thread then calls `take` on the "done" channel and blocks until signaled. +The main purpose of channels is to synchronize operations across goroutines. One common pattern for this is to created a `capacity: 1` buffered channel which is used to signal that work is complete. The following example calls a `worker` function on a goroutine and passes it a "done" channel. The main thread then calls `take` on the "done" channel and blocks until signaled. ```ruby def worker(done_channel) @@ -106,7 +106,7 @@ def worker(done_channel) done_channel << true end -done = Concurrent::Channel.new(size: 1) +done = Concurrent::Channel.new(capacity: 1) Concurrent::Channel.go{ worker(done) } ~done # block until signaled @@ -176,7 +176,7 @@ fibonacci(c, quit) ## Closing and Iterating Over Channels -Newly created channels are in an "open" state. Open channels can receive values via `put` operations. When a program is done with a channel it can be closed by calling the {#close} method. Once a channel is closed it will no longer allow values to be `put`. If the channel is buffered and values are in the buffer when the channel is closed, the remaining values can still be removed via `take` operations. +Newly created channels are in an "open" state. Open channels can receive values via `put` operations. When a program is done with a channel it can be closed by calling the `#close` method. Once a channel is closed it will no longer allow values to be `put`. If the channel is buffered and values are in the buffer when the channel is closed, the remaining values can still be removed via `take` operations. The `Channel` class implements an {#each} method which can be used to retrieve successive values from the channel. The `each` method is a blocking method. When the channel is open and there are no values in the buffer, `each` will block until a new item is `put`. The `each` method will not exit until the channel is closed. @@ -192,7 +192,7 @@ def fibonacci(n, c) c.close end -chan = Concurrent::Channel.new(size: 10) +chan = Concurrent::Channel.new(capacity: 10) Concurrent::Channel.go { fibonacci(chan.capacity, c) } chan.each { |i| puts i } ``` diff --git a/doc/synchronization.md b/doc/synchronization.md index a5bd5fc26..db69c010f 100644 --- a/doc/synchronization.md +++ b/doc/synchronization.md @@ -7,7 +7,7 @@ Provides common parent for all objects which need to be synchronized or be using other synchronization tools. It provides: - Synchronized block -- Methods for waiting and signaling +- Methods for waiting and signaling - Volatile fields - Ensure visibility of final fields - Fields with CAS operations @@ -49,7 +49,7 @@ private def ns_compute ns_compute_reduce ns_compute_map end -``` +``` where `compute` defines how is it synchronized and `ns_compute` handles the behavior (in this case the computation). `ns_` methods should only call other `ns_` methods or `pr_` methods. They can call normal methods on other objects, but that should be done with care (better to avoid) because the thread escapes this object while the lock is still held, which can lead to deadlock. That's why the `report` method is called in `compute` and not in `ns_compute`. `pr_` methods are pure functions they can be used in and outside of synchronized blocks. @@ -60,16 +60,16 @@ Sometimes while already inside the synchronized block some condition is not met. To fulfill these needs there are private methods: -- `ns_wait` {include:Concurrent::Synchronization::AbstractObject#ns_wait} -- `ns_wait_until` {include:Concurrent::Synchronization::AbstractObject#ns_wait_until} -- `ns_signal` {include:Concurrent::Synchronization::AbstractObject#ns_signal} -- `ns_broadcast` {include:Concurrent::Synchronization::AbstractObject#ns_broadcast} +- `ns_wait` {include:Concurrent::Synchronization::AbstractLockableObject#ns_wait} +- `ns_wait_until` {include:Concurrent::Synchronization::AbstractLockableObject#ns_wait_until} +- `ns_signal` {include:Concurrent::Synchronization::AbstractLockableObject#ns_signal} +- `ns_broadcast` {include:Concurrent::Synchronization::AbstractLockableObject#ns_broadcast} All methods have to be called inside synchronized block. ## Volatile fields -`Synchronization::Object` can have volatile fields (Java semantic). They are defined by `attr_volatile :field_name`. `attr_volatile` defines reader and writer with the `field_name`. Any write is always immediately visible for any subsequent reads of the same field. +`Synchronization::Object` can have volatile fields (Java semantic). They are defined by `attr_volatile :field_name`. `attr_volatile` defines reader and writer with the `field_name`. Any write is always immediately visible for any subsequent reads of the same field. ## Ensure visibility of final fields @@ -83,7 +83,7 @@ class AbstractPromise < Synchronization::Object ensure_ivar_visibility! end # ... -end +end ``` ### Naming conventions @@ -112,10 +112,10 @@ class Event < Synchronization::Object self end # ... -end +end ``` -Operations on `@Touched` field have volatile semantic. +Operations on `@Touched` field have volatile semantic. ## Memory model @@ -125,7 +125,7 @@ When writing libraries in `concurrent-ruby` we are reasoning based on following The memory model is constructed based on our best effort and knowledge of the 3 main Ruby implementations (CRuby, JRuby, Rubinius). When considering certain aspect we always choose the weakest guarantee (e.g. local variable updates are always visible in CRuby but not in JRuby, so in this case JRuby behavior is picked). If some Ruby behavior is omitted here it is considered unsafe for use in parallel environment (Reasons may be lack of information, or difficulty of verification). -This takes in account following implementations: +This takes in account following implementations: - CRuby 1.9 - 2.2 (no differences found) - JRuby 1.7 @@ -139,10 +139,10 @@ We are interested in following behaviors: ### Variables -- **Local variables** - atomic assignment (only Integer and Object), non-volatile. +- **Local variables** - atomic assignment (only Integer and Object), non-volatile. - Consequence: a lambda defined on `thread1` executing on `thread2` may not see updated values in local variables captured in its closure. - Reason: local variables are non-volatile on Jruby and Rubinius. -- **Instance variables** - atomic assignment (only Integer and Object), non-volatile. +- **Instance variables** - atomic assignment (only Integer and Object), non-volatile. - Consequence: Different thread may see old values; different thread may see not fully-initialized object. - Reason: local variables are non-volatile on Jruby and Rubinius. - **Constants** - atomic assignment, volatile. diff --git a/examples/a-tour-of-go-channels/buffered-channels.rb b/examples/a-tour-of-go-channels/buffered-channels.rb index ebd9743d1..a372f1faa 100755 --- a/examples/a-tour-of-go-channels/buffered-channels.rb +++ b/examples/a-tour-of-go-channels/buffered-channels.rb @@ -5,16 +5,15 @@ Channel = Concurrent::Channel ## A Tour of Go: Buffered Channels -# https://tour.golang.org/concurrency/3 +# https://tour.golang.org/concurrency/3 -ch = Channel.new(size: 2) +ch = Channel.new(capacity: 2) ch << 1 ch << 2 puts ~ch puts ~ch -expected = <<-STDOUT +__END__ 1 2 -STDOUT diff --git a/examples/a-tour-of-go-channels/channels.rb b/examples/a-tour-of-go-channels/channels.rb index 685012ed4..be27aaf23 100755 --- a/examples/a-tour-of-go-channels/channels.rb +++ b/examples/a-tour-of-go-channels/channels.rb @@ -5,7 +5,7 @@ Channel = Concurrent::Channel ## A Tour of Go: Channels -# https://tour.golang.org/concurrency/2 +# https://tour.golang.org/concurrency/2 def sum(a, c) sum = a.reduce(0, &:+) @@ -22,6 +22,5 @@ def sum(a, c) puts [x, y, x+y].join(' ') -expected = <<-STDOUT +__END__ -5 17 12 -STDOUT diff --git a/examples/a-tour-of-go-channels/default-selection.rb b/examples/a-tour-of-go-channels/default-selection.rb index e181553b0..1c05f3679 100755 --- a/examples/a-tour-of-go-channels/default-selection.rb +++ b/examples/a-tour-of-go-channels/default-selection.rb @@ -12,7 +12,7 @@ loop do Channel.select do |s| - s.take(tick) { print "tick.\n" } + s.take(tick) { |t| print "tick.\n" if t } s.take(boom) do print "BOOM!\n" exit @@ -24,7 +24,7 @@ end end -expected = <<-STDOUT +__END__ . . tick. @@ -41,4 +41,3 @@ . tick. BOOM! -STDOUT diff --git a/examples/a-tour-of-go-channels/equivalent-binary-trees.rb b/examples/a-tour-of-go-channels/equivalent-binary-trees.rb index b617692a6..2f1d594d3 100755 --- a/examples/a-tour-of-go-channels/equivalent-binary-trees.rb +++ b/examples/a-tour-of-go-channels/equivalent-binary-trees.rb @@ -65,7 +65,6 @@ def same(t1, t2) puts same(new_tree(1), new_tree(1)) puts same(new_tree(1), new_tree(2)) -expected = <<-STDOUT +__END__ true false -STDOUT diff --git a/examples/a-tour-of-go-channels/range-and-close.rb b/examples/a-tour-of-go-channels/range-and-close.rb index 0176c0fe3..a1bd58172 100755 --- a/examples/a-tour-of-go-channels/range-and-close.rb +++ b/examples/a-tour-of-go-channels/range-and-close.rb @@ -5,7 +5,7 @@ Channel = Concurrent::Channel ## A Tour of Go: Range and Close -# https://tour.golang.org/concurrency/4 +# https://tour.golang.org/concurrency/4 def fibonacci(n, c) x, y = 0, 1 @@ -16,11 +16,11 @@ def fibonacci(n, c) c.close end -c = Channel.new(size: 10) +c = Channel.new(capacity: 10) Channel.go { fibonacci(c.capacity, c) } c.each { |i| puts i } -expected = <<-STDOUT +__END__ 0 1 1 @@ -31,4 +31,3 @@ def fibonacci(n, c) 13 21 34 -STDOUT diff --git a/examples/a-tour-of-go-channels/select.rb b/examples/a-tour-of-go-channels/select.rb index 1f229a809..476a73588 100755 --- a/examples/a-tour-of-go-channels/select.rb +++ b/examples/a-tour-of-go-channels/select.rb @@ -30,7 +30,7 @@ def fibonacci(c, quit) fibonacci(c, quit) -expected = <<-STDOUT +__END__ 0 1 1 @@ -42,4 +42,3 @@ def fibonacci(c, quit) 21 34 quit -STDOUT diff --git a/examples/go-by-example-channels/channel-buffering.rb b/examples/go-by-example-channels/channel-buffering.rb index 24e18a135..ec9ac2532 100755 --- a/examples/go-by-example-channels/channel-buffering.rb +++ b/examples/go-by-example-channels/channel-buffering.rb @@ -7,7 +7,7 @@ ## Go by Example: Channel Buffering # https://gobyexample.com/channel-buffering -messages = Channel.new(size: 2) # buffered +messages = Channel.new(capacity: 2) # buffered messages.put 'buffered' messages.put 'channel' @@ -15,7 +15,6 @@ puts messages.take puts messages.take -expected = <<-STDOUT +__END__ buffered channel -STDOUT diff --git a/examples/go-by-example-channels/channel-directions.rb b/examples/go-by-example-channels/channel-directions.rb index 8051c8e77..4e1169ae5 100755 --- a/examples/go-by-example-channels/channel-directions.rb +++ b/examples/go-by-example-channels/channel-directions.rb @@ -19,14 +19,13 @@ def pong(pings, pongs) pongs << msg end -pings = Channel.new(size: 1) # buffered -pongs = Channel.new(size: 1) # buffered +pings = Channel.new(capacity: 1) # buffered +pongs = Channel.new(capacity: 1) # buffered ping(pings, 'passed message') pong(pings, pongs) puts ~pongs -expected = <<-STDOUT +__END__ passed message -STDOUT diff --git a/examples/go-by-example-channels/channel-synchronization.rb b/examples/go-by-example-channels/channel-synchronization.rb index 2caf5e2da..3e869b40c 100755 --- a/examples/go-by-example-channels/channel-synchronization.rb +++ b/examples/go-by-example-channels/channel-synchronization.rb @@ -15,12 +15,11 @@ def worker(done_channel) done_channel << true # alias for `#put` end -done = Channel.new(size: 1) # buffered +done = Channel.new(capacity: 1) # buffered Channel.go{ worker(done) } ~done # alias for `#take` -expected = <<-STDOUT +__END__ working... done -STDOUT diff --git a/examples/go-by-example-channels/channels.rb b/examples/go-by-example-channels/channels.rb index 92a128e4b..cfb30f011 100755 --- a/examples/go-by-example-channels/channels.rb +++ b/examples/go-by-example-channels/channels.rb @@ -5,7 +5,7 @@ Channel = Concurrent::Channel ## Go by Example: Unbuffered Channel -# https://gobyexample.com/channels +# https://gobyexample.com/channels messages = Channel.new # unbuffered @@ -16,6 +16,5 @@ msg = messages.take puts msg -expected = <<-STDOUT +__END__ ping -STDOUT diff --git a/examples/go-by-example-channels/closing-channels.rb b/examples/go-by-example-channels/closing-channels.rb index 915da06e9..33d865631 100755 --- a/examples/go-by-example-channels/closing-channels.rb +++ b/examples/go-by-example-channels/closing-channels.rb @@ -5,10 +5,10 @@ Channel = Concurrent::Channel ## Go by Example: Closing Channels -# https://gobyexample.com/closing-channels +# https://gobyexample.com/closing-channels validator = ->(v){ v.is_a? Numeric } -jobs = Channel.new(buffer: :buffered, size: 5, +jobs = Channel.new(buffer: :buffered, capacity: 5, validator: validator) done = Channel.new(buffer: :unbuffered) @@ -34,7 +34,7 @@ print "sent all jobs\n" ~done -expected = <<-STDOUT +__END__ sent job 1 received job 1 sent job 2 @@ -43,4 +43,3 @@ received job 3 sent all jobs received all jobs -STDOUT diff --git a/examples/go-by-example-channels/non-blocking-channel-operations.rb b/examples/go-by-example-channels/non-blocking-channel-operations.rb index 5a0bba4dc..572d4b1f7 100755 --- a/examples/go-by-example-channels/non-blocking-channel-operations.rb +++ b/examples/go-by-example-channels/non-blocking-channel-operations.rb @@ -5,7 +5,7 @@ Channel = Concurrent::Channel ## Go by Example: Non-Blocking Channel Operations -# https://gobyexample.com/non-blocking-channel-operations +# https://gobyexample.com/non-blocking-channel-operations messages = Channel.new # unbuffered signals = Channel.new # unbuffered @@ -27,8 +27,7 @@ s.default { print "no activity\n" } end -expected = <<-STDOUT +__END__ no message received no message sent no activity -STDOUT diff --git a/examples/go-by-example-channels/range-over-channels.rb b/examples/go-by-example-channels/range-over-channels.rb index 4e8bfdce6..c638a5cef 100755 --- a/examples/go-by-example-channels/range-over-channels.rb +++ b/examples/go-by-example-channels/range-over-channels.rb @@ -5,9 +5,9 @@ Channel = Concurrent::Channel ## Go by Example: Range over Channels -# https://gobyexample.com/range-over-channels +# https://gobyexample.com/range-over-channels -queue = Channel.new(size: 2) # buffered +queue = Channel.new(capacity: 2) # buffered queue << 'one' queue << 'two' queue.close @@ -16,31 +16,6 @@ print "#{elem}\n" end -expected = <<-STDOUT +__END__ one two -STDOUT - -def blocking_variant - queue = Channel.new(size: 2) - queue << 'one' - queue << 'two' - - Channel.go do - sleep(1) - queue.close - end - - queue.each do |elem| - print "#{elem}\n" - end -end - -def sorting - count = 10 - queue = Channel.new(size: count) - count.times { queue << rand(100) } - queue.close - - puts queue.sort -end diff --git a/examples/go-by-example-channels/rate-limiting.rb b/examples/go-by-example-channels/rate-limiting.rb index c6642781a..81afab790 100755 --- a/examples/go-by-example-channels/rate-limiting.rb +++ b/examples/go-by-example-channels/rate-limiting.rb @@ -9,7 +9,7 @@ ## Go by Example: Rate Limiting # https://gobyexample.com/tickers -requests = Channel.new(buffer: :buffered, size: 5) +requests = Channel.new(buffer: :buffered, capacity: 5) (1..5).each do |i| requests << i end @@ -17,23 +17,23 @@ limiter = Channel.ticker(0.2) requests.each do |req| - ~limiter - print "request #{req} #{Channel::Tick.new}\n" + print "request #{req} #{Channel::Tick.new}\n" if ~limiter end print "\n" -bursty_limiter = Channel.new(buffer: :buffered, size: 3) +bursty_limiter = Channel.new(buffer: :buffered, capacity: 3) (1..3).each do bursty_limiter << Channel::Tick.new end +ticker = Channel.ticker(0.2) Channel.go do - Channel.ticker(0.2).each do |t| + ticker.each do |t| bursty_limiter << t end end -bursty_requests = Channel.new(buffer: :buffered, size: 5) +bursty_requests = Channel.new(buffer: :buffered, capacity: 5) (1..5).each do |i| bursty_requests << i end @@ -44,7 +44,10 @@ print "request #{req} #{Channel::Tick.new}\n" end -expected = <<-STDOUT +limiter.close +ticker.close + +__END__ request 1 2012-10-19 00:38:18.687438 +0000 UTC request 2 2012-10-19 00:38:18.887471 +0000 UTC request 3 2012-10-19 00:38:19.087238 +0000 UTC @@ -56,4 +59,3 @@ request 3 2012-10-19 00:38:20.487676 +0000 UTC request 4 2012-10-19 00:38:20.687483 +0000 UTC request 5 2012-10-19 00:38:20.887542 +0000 UTC -STDOUT diff --git a/examples/go-by-example-channels/select.rb b/examples/go-by-example-channels/select.rb index c3276e2b0..0848c8ef4 100755 --- a/examples/go-by-example-channels/select.rb +++ b/examples/go-by-example-channels/select.rb @@ -27,7 +27,6 @@ end end -expected = <<-STDOUT +__END__ received one received two -STDOUT diff --git a/examples/go-by-example-channels/ticker.rb b/examples/go-by-example-channels/ticker.rb index efa149899..6e6701444 100755 --- a/examples/go-by-example-channels/ticker.rb +++ b/examples/go-by-example-channels/ticker.rb @@ -10,7 +10,7 @@ ticker = Channel.ticker(0.5) Channel.go do ticker.each do |tick| - print "Tick at #{tick}\n" + print "Tick at #{tick}\n" if tick end end @@ -18,9 +18,8 @@ ticker.stop print "Ticker stopped\n" -expected = <<-STDOUT +__END__ Tick at 2012-09-23 11:29:56.487625 -0700 PDT Tick at 2012-09-23 11:29:56.988063 -0700 PDT Tick at 2012-09-23 11:29:57.488076 -0700 PDT Ticker stopped -STDOUT diff --git a/examples/go-by-example-channels/timeouts.rb b/examples/go-by-example-channels/timeouts.rb index c6c8d5896..0568924ca 100755 --- a/examples/go-by-example-channels/timeouts.rb +++ b/examples/go-by-example-channels/timeouts.rb @@ -7,7 +7,7 @@ ## Go by Example: Timeouts # https://gobyexample.com/timeouts -c1 = Channel.new(size: 1) # buffered +c1 = Channel.new(capacity: 1) # buffered Channel.go do sleep(2) c1 << 'result 1' @@ -18,7 +18,7 @@ s.after(1) { print "timeout 1\n" } end -c2 = Channel.new(size: 1) # buffered +c2 = Channel.new(capacity: 1) # buffered Channel.go do sleep(2) c2 << 'result 2' @@ -29,7 +29,6 @@ s.after(3) { print "timeout 2\n" } end -expected = <<-STDOUT +__END__ timeout 1 result 2 -STDOUT diff --git a/examples/go-by-example-channels/timers.rb b/examples/go-by-example-channels/timers.rb index 2dd36864c..2e667c072 100755 --- a/examples/go-by-example-channels/timers.rb +++ b/examples/go-by-example-channels/timers.rb @@ -9,19 +9,16 @@ timer1 = Channel.timer(2) -~timer1 -puts 'Timer 1 expired' +puts 'Timer 1 expired' if ~timer1 timer2 = Channel.timer(1) Channel.go do - ~timer2 - print "Timer 2 expired\n" + print "Timer 2 expired\n" if ~timer2 end stop2 = timer2.stop print "Timer 2 stopped\n" if stop2 -expected = <<-STDOUT +__END__ Timer 1 expired Timer 2 stopped -STDOUT diff --git a/examples/go-by-example-channels/worker-pools.rb b/examples/go-by-example-channels/worker-pools.rb index 54b009a85..9cce8e2e1 100755 --- a/examples/go-by-example-channels/worker-pools.rb +++ b/examples/go-by-example-channels/worker-pools.rb @@ -15,8 +15,8 @@ def worker(id, jobs, results) end end -jobs = Channel.new(buffer: :buffered, size: 100) -results = Channel.new(buffer: :buffered, size: 100) +jobs = Channel.new(buffer: :buffered, capacity: 100) +results = Channel.new(buffer: :buffered, capacity: 100) (1..3).each do |w| Channel.go { worker(w, jobs, results) } @@ -31,7 +31,7 @@ def worker(id, jobs, results) ~results end -expected = <<-STDOUT +__END__ worker 1 processing job 1 worker 2 processing job 2 worker 3 processing job 3 @@ -41,4 +41,3 @@ def worker(id, jobs, results) worker 1 processing job 7 worker 2 processing job 8 worker 3 processing job 9 -STDOUT diff --git a/examples/who.rb b/examples/who.rb new file mode 100755 index 000000000..9a7799300 --- /dev/null +++ b/examples/who.rb @@ -0,0 +1,28 @@ +#!/usr/bin/env ruby + +require 'net/http' +require 'json' + +# http://www.schneems.com/blogs/2015-09-30-reverse-rubygems/ + +gem_name = "concurrent-ruby" + +def rubygems_get(gem_name: "", endpoint: "") + path = File.join("/api/v1/gems/", gem_name, endpoint).chomp("/") + ".json" + JSON.parse(Net::HTTP.get("rubygems.org", path)) +end + +results = rubygems_get(gem_name: gem_name, endpoint: "reverse_dependencies") + +weighted_results = {} +results.each do |name| + begin + weighted_results[name] = rubygems_get(gem_name: name)["downloads"] + rescue => e + puts "#{name} #{e.message}" + end +end + +weighted_results.sort {|(k1, v1), (k2, v2)| v2 <=> v1 }.first(50).each_with_index do |(k, v), i| + puts "#{i}) #{k}: #{v}" +end diff --git a/lib/concurrent/agent.rb b/lib/concurrent/agent.rb index 8af58aed6..38cc8c772 100644 --- a/lib/concurrent/agent.rb +++ b/lib/concurrent/agent.rb @@ -166,6 +166,7 @@ class Agent < Synchronization::LockableObject class Error < StandardError def initialize(message = nil) message ||= 'agent must be restarted before jobs can post' + super(message) end end @@ -174,6 +175,7 @@ def initialize(message = nil) class ValidationError < Error def initialize(message = nil) message ||= 'invalid value' + super(message) end end diff --git a/lib/concurrent/channel.rb b/lib/concurrent/channel.rb index ccaaae7be..a1b6dc118 100644 --- a/lib/concurrent/channel.rb +++ b/lib/concurrent/channel.rb @@ -12,7 +12,7 @@ class Channel include Enumerable # NOTE: Move to global IO pool once stable - GOROUTINES = Concurrent::CachedThreadPool.new + GOROUTINES = Concurrent::CachedThreadPool.new(auto_terminate: true) private_constant :GOROUTINES BUFFER_TYPES = { @@ -48,22 +48,22 @@ def initialize(opts = {}) return end - size = opts[:size] + capacity = opts[:capacity] || opts[:size] buffer = opts[:buffer] - if size && buffer == :unbuffered - raise ArgumentError.new('unbuffered channels cannot have a size') - elsif size.nil? && buffer.nil? + if capacity && buffer == :unbuffered + raise ArgumentError.new('unbuffered channels cannot have a capacity') + elsif capacity.nil? && buffer.nil? self.buffer = BUFFER_TYPES[:unbuffered].new - elsif size == 0 && buffer == :buffered + elsif capacity == 0 && buffer == :buffered self.buffer = BUFFER_TYPES[:unbuffered].new elsif buffer == :unbuffered self.buffer = BUFFER_TYPES[:unbuffered].new - elsif size.nil? || size < 1 - raise ArgumentError.new('size must be at least 1 for this buffer type') + elsif capacity.nil? || capacity < 1 + raise ArgumentError.new('capacity must be at least 1 for this buffer type') else buffer ||= :buffered - self.buffer = BUFFER_TYPES[buffer].new(size) + self.buffer = BUFFER_TYPES[buffer].new(capacity) end self.validator = opts.fetch(:validator, DEFAULT_VALIDATOR) @@ -116,20 +116,25 @@ def offer?(item) end def take - item, _ = self.next - item + item = do_take + item == Concurrent::NULL ? nil : item end alias_method :receive, :take alias_method :~, :take def take! - item, _ = do_next - raise Error if item == Buffer::NO_VALUE + item = do_take + raise Error if item == Concurrent::NULL item end def take? - item, _ = self.next? + item = do_take + item = if item == Concurrent::NULL + Concurrent::Maybe.nothing + else + Concurrent::Maybe.just(item) + end item end @@ -150,13 +155,13 @@ def take? # end def next item, more = do_next - item = nil if item == Buffer::NO_VALUE + item = nil if item == Concurrent::NULL return item, more end def next? item, more = do_next - item = if item == Buffer::NO_VALUE + item = if item == Concurrent::NULL Concurrent::Maybe.nothing else Concurrent::Maybe.just(item) @@ -165,17 +170,17 @@ def next? end def poll - (item = do_poll) == Buffer::NO_VALUE ? nil : item + (item = do_poll) == Concurrent::NULL ? nil : item end def poll! item = do_poll - raise Error if item == Buffer::NO_VALUE + raise Error if item == Concurrent::NULL item end def poll? - if (item = do_poll) == Buffer::NO_VALUE + if (item = do_poll) == Concurrent::NULL Concurrent::Maybe.nothing else Concurrent::Maybe.just(item) @@ -186,7 +191,7 @@ def each raise ArgumentError.new('no block given') unless block_given? loop do item, more = do_next - if item != Buffer::NO_VALUE + if item != Concurrent::NULL yield(item) elsif !more break @@ -238,7 +243,21 @@ def go_loop_via(executor, *args, &block) private - attr_accessor :buffer, :validator + def validator + @validator + end + + def validator=(value) + @validator = value + end + + def buffer + @buffer + end + + def buffer=(value) + @buffer = value + end def validate(value, allow_nil, raise_error) if !allow_nil && value.nil? @@ -261,6 +280,10 @@ def do_offer(item) buffer.offer(item) end + def do_take + buffer.take + end + def do_next buffer.next end diff --git a/lib/concurrent/channel/buffer/base.rb b/lib/concurrent/channel/buffer/base.rb index c6817a33b..c558c8a35 100644 --- a/lib/concurrent/channel/buffer/base.rb +++ b/lib/concurrent/channel/buffer/base.rb @@ -4,9 +4,6 @@ module Concurrent class Channel module Buffer - # Placeholder for when a buffer slot contains no value. - NO_VALUE = Object.new - # Abstract base class for all Channel buffers. # # {Concurrent::Channel} objects maintain an internal, queue-like @@ -18,11 +15,6 @@ module Buffer # used as a channel buffer should extend this class. class Base < Synchronization::LockableObject - # @!macro [attach] channel_buffer_size_reader - # - # The number of items currently in the buffer. - attr_reader :size - # @!macro [attach] channel_buffer_capacity_reader # # The maximum number of values which can be {#put} onto the buffer @@ -53,6 +45,13 @@ def blocking? true end + # @!macro [attach] channel_buffer_size_reader + # + # The number of items currently in the buffer. + def size + synchronize { ns_size } + end + # @!macro [attach] channel_buffer_empty_question # # Predicate indicating if the buffer is empty. @@ -61,7 +60,7 @@ def blocking? # # @raise [NotImplementedError] until overridden in a subclass. def empty? - raise NotImplementedError + synchronize { ns_empty? } end # @!macro [attach] channel_buffer_full_question @@ -72,7 +71,7 @@ def empty? # # @raise [NotImplementedError] until overridden in a subclass. def full? - raise NotImplementedError + synchronize { ns_full? } end # @!macro [attach] channel_buffer_put @@ -116,7 +115,7 @@ def offer(item) # are available the remaining items can still be taken. Once the # buffer closes, no remaining items can be taken. # - # @return [Object] the item removed from the buffer; `NO_VALUE` once + # @return [Object] the item removed from the buffer; `Concurrent::NULL` once # the buffer has closed. # # @raise [NotImplementedError] until overridden in a subclass. @@ -126,19 +125,16 @@ def take # @!macro [attach] channel_buffer_next # - # Take the next item from the buffer and also return a boolean - # indicating if subsequent items can be taken. Used for iterating + # Take the next "item" from the buffer and also return a boolean + # indicating if "more" items can be taken. Used for iterating # over a buffer until it is closed and empty. # # If the buffer is open but no items remain the calling thread will # block until an item is available. The second of the two return - # values, a boolean, will always be `true` when the buffer is open. - # When the buffer is closed but more items remain the second return - # value will also be `true`. When the buffer is closed and the last - # item is taken the second return value will be `false`. When the - # buffer is both closed and empty the first return value will be - # `NO_VALUE` and the second return value will be `false`. - # be `false` when the buffer is both closed and empty. + # values, "more" (a boolean), will always be `true` when the buffer is + # open. The "more" value will be `false` when the channel has been + # closed and all values have already been received. When "more" is + # false the returned item will be `Concurrent::NULL`. # # Note that when multiple threads access the same channel a race # condition can occur when using this method. A call to `next` from @@ -162,7 +158,7 @@ def next # immediately. Failing to return a value does not necessarily # indicate that the buffer is closed, just that it is empty. # - # @return [Object] the next item from the buffer or `NO_VALUE` if + # @return [Object] the next item from the buffer or `Concurrent::NULL` if # the buffer is empty. # # @raise [NotImplementedError] until overridden in a subclass. @@ -194,12 +190,44 @@ def closed? private - attr_accessor :buffer - attr_writer :closed, :capacity, :size + def buffer + @buffer + end + + def buffer=(value) + @buffer = value + end + + def closed=(value) + @closed = value + end + + def capacity=(value) + @capacity = value + end + + def size=(value) + @size = value + end def ns_initialize(*args) end + # @!macro channel_buffer_size_reader + def ns_size + raise NotImplementedError + end + + # @!macro channel_buffer_empty_question + def ns_empty? + raise NotImplementedError + end + + # @!macro channel_buffer_full_question + def ns_full? + raise NotImplementedError + end + # @!macro channel_buffer_closed_question def ns_closed? @closed diff --git a/lib/concurrent/channel/buffer/buffered.rb b/lib/concurrent/channel/buffer/buffered.rb index 5985b90f8..fd61ecfe0 100644 --- a/lib/concurrent/channel/buffer/buffered.rb +++ b/lib/concurrent/channel/buffer/buffered.rb @@ -1,3 +1,4 @@ +require 'concurrent/constants' require 'concurrent/channel/buffer/base' module Concurrent @@ -10,19 +11,6 @@ module Buffer # an item is removed from the buffer, creating spare capacity. class Buffered < Base - # @!macro channel_buffer_empty_question - def empty? - synchronize { ns_empty? } - end - - # @!macro channel_buffer_full_question - # - # Will return `true` once the number of items in the buffer reaches - # the {#size} value specified during initialization. - def full? - synchronize { ns_full? } - end - # @!macro channel_buffer_put # # New items can be put onto the buffer until the number of items in @@ -69,11 +57,10 @@ def next loop do synchronize do if ns_closed? && ns_empty? - return NO_VALUE, false + return Concurrent::NULL, false elsif !ns_empty? item = buffer.shift - more = !ns_empty? || !ns_closed? - return item, more + return item, true end end Thread.pass @@ -84,7 +71,7 @@ def next def poll synchronize do if ns_empty? - NO_VALUE + Concurrent::NULL else buffer.shift end @@ -104,14 +91,19 @@ def ns_initialize(size) self.buffer = [] end + # @!macro channel_buffer_size_reader + def ns_size + buffer.size + end + # @!macro channel_buffer_empty_question def ns_empty? - buffer.length == 0 + ns_size == 0 end # @!macro channel_buffer_full_question def ns_full? - buffer.length == capacity + ns_size == capacity end # @!macro channel_buffer_put diff --git a/lib/concurrent/channel/buffer/ticker.rb b/lib/concurrent/channel/buffer/ticker.rb index 2286bd4c6..169fc16e1 100644 --- a/lib/concurrent/channel/buffer/ticker.rb +++ b/lib/concurrent/channel/buffer/ticker.rb @@ -1,74 +1,33 @@ +require 'concurrent/constants' require 'concurrent/utility/monotonic_time' require 'concurrent/channel/tick' -require 'concurrent/channel/buffer/base' +require 'concurrent/channel/buffer/timer' module Concurrent class Channel module Buffer - class Ticker < Base - - def size() 1; end - - def empty?() false; end - - def full?() true; end - - def put(item) - false - end - - def offer(item) - false - end - - def take - loop do - result, _ = do_poll - if result.nil? - return NO_VALUE - elsif result != NO_VALUE - return result - end - end - end - - def next - loop do - result, _ = do_poll - if result.nil? - return NO_VALUE, false - elsif result != NO_VALUE - return result, true - end - end - end - - def poll - result, _ = do_poll - if result.nil? || result == NO_VALUE - NO_VALUE - else - result - end - end + class Ticker < Timer private def ns_initialize(interval) @interval = interval.to_f @next_tick = Concurrent.monotonic_time + interval + self.capacity = 1 end def do_poll - if ns_closed? - return nil, false - elsif (now = Concurrent.monotonic_time) > @next_tick - tick = Concurrent::Channel::Tick.new(@next_tick) - @next_tick = now + @interval - return tick, true - else - return NO_VALUE, true + synchronize do + if ns_closed? + return Concurrent::NULL, false + elsif (now = Concurrent.monotonic_time) >= @next_tick + tick = Concurrent::Channel::Tick.new(@next_tick) + @next_tick = now + @interval + return tick, true + else + return nil, true + end end end end diff --git a/lib/concurrent/channel/buffer/timer.rb b/lib/concurrent/channel/buffer/timer.rb index dca5c5442..29fa8146b 100644 --- a/lib/concurrent/channel/buffer/timer.rb +++ b/lib/concurrent/channel/buffer/timer.rb @@ -1,3 +1,4 @@ +require 'concurrent/constants' require 'concurrent/utility/monotonic_time' require 'concurrent/channel/tick' require 'concurrent/channel/buffer/base' @@ -8,16 +9,6 @@ module Buffer class Timer < Base - def size() 1; end - - def empty? - synchronized { @empty } - end - - def full? - !empty? - end - def put(item) false end @@ -27,44 +18,59 @@ def offer(item) end def take - self.next.first + loop do + tick, _ = do_poll + if tick + return tick + else + Thread.pass + end + end end def next loop do - status, tick = do_poll - if status == :tick - return tick, false - # AFAIK a Go timer will block forever if stopped - #elsif status == :closed - #return false, false - end + tick, more = do_poll + return tick, more if tick Thread.pass end end def poll - status, tick = do_poll - status == :tick ? tick : NO_VALUE + tick, _ = do_poll + tick = Concurrent::NULL unless tick + tick end private def ns_initialize(delay) @tick = Concurrent.monotonic_time + delay.to_f - @closed = false - @empty = false + self.capacity = 1 + end + + def ns_size + 0 + end + + def ns_empty? + false + end + + def ns_full? + true end def do_poll synchronize do - return :closed, false if ns_closed? - - if Concurrent.monotonic_time > @tick + if ns_closed? + return Concurrent::NULL, false + elsif Concurrent.monotonic_time >= @tick # only one listener gets notified - return :tick, Concurrent::Channel::Tick.new(@tick) + self.closed = true + return Concurrent::Channel::Tick.new(@tick), false else - return :wait, true + return nil, true end end end diff --git a/lib/concurrent/channel/buffer/unbuffered.rb b/lib/concurrent/channel/buffer/unbuffered.rb index 390609d60..fd68ddc70 100644 --- a/lib/concurrent/channel/buffer/unbuffered.rb +++ b/lib/concurrent/channel/buffer/unbuffered.rb @@ -1,3 +1,4 @@ +require 'concurrent/constants' require 'concurrent/channel/buffer/base' require 'concurrent/atomic/atomic_reference' @@ -16,19 +17,21 @@ module Buffer class Unbuffered < Base # @!macro channel_buffer_size_reader - # - # Always returns zero (0). - def size() 0; end + def size + synchronize do + putting.empty? ? 0 : 1 + end + end # @!macro channel_buffer_empty_question - # - # Always returns `true`. - def empty?() true; end + def empty? + size == 0 + end # @!macro channel_buffer_full_question - # - # Always returns `false`. - def full?() false; end + def full? + !empty? + end # @!macro channel_buffer_put # @@ -85,7 +88,7 @@ def offer(item) # and this method will return. def take mine = synchronize do - return NO_VALUE if ns_closed? && putting.empty? + return Concurrent::NULL if ns_closed? && putting.empty? ref = Concurrent::AtomicReference.new(nil) if putting.empty? @@ -110,10 +113,10 @@ def take # waiting to {#put} items onto the buffer. When there is a thread # waiting to put an item this method will take the item and return # it immediately. When there are no threads waiting to put or the - # buffer is closed, this method will return `NO_VALUE` immediately. + # buffer is closed, this method will return `Concurrent::NULL` immediately. def poll synchronize do - return NO_VALUE if putting.empty? + return Concurrent::NULL if putting.empty? put = putting.shift value = put.value @@ -131,19 +134,21 @@ def poll # @see {#take} def next item = take - more = synchronize { !putting.empty? } + more = (item != Concurrent::NULL) return item, more end private - attr_accessor :putting, :taking + def putting() @putting; end + + def taking() @taking; end # @!macro channel_buffer_initialize def ns_initialize # one will always be empty - self.putting = [] - self.taking = [] + @putting = [] + @taking = [] self.closed = false self.capacity = 1 end diff --git a/lib/concurrent/channel/selector.rb b/lib/concurrent/channel/selector.rb index cde7b3311..105987c4e 100644 --- a/lib/concurrent/channel/selector.rb +++ b/lib/concurrent/channel/selector.rb @@ -54,6 +54,7 @@ def error(&block) end def execute + raise Channel::Error.new('no clauses given') if @clauses.empty? loop do done = @clauses.each do |clause| result = clause.execute @@ -63,7 +64,11 @@ def execute Thread.pass end rescue => ex - @error_handler.call(ex) if @error_handler + if @error_handler + @error_handler.call(ex) + else + raise ex + end end end end diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index b4d16275f..b8fe0a12d 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -304,7 +304,7 @@ def self.execute(opts = {}, &block) # @return [Promise] the new promise def then(rescuer = nil, &block) raise ArgumentError.new('rescuers and block are both missing') if rescuer.nil? && !block_given? - block = Proc.new { |result| result } if block.nil? + block = Proc.new { |result| result } unless block_given? child = Promise.new( parent: self, executor: @executor, diff --git a/lib/concurrent/synchronization/object.rb b/lib/concurrent/synchronization/object.rb index 36e0329fd..732a1efab 100644 --- a/lib/concurrent/synchronization/object.rb +++ b/lib/concurrent/synchronization/object.rb @@ -20,7 +20,7 @@ module Synchronization # @!macro [attach] synchronization_object # # Safe synchronization under any Ruby implementation. - # It provides methods like {#synchronize}, {#wait}, {#signal} and {#broadcast}. + # It provides methods like `#synchronize`, `#wait`, `#signal` and `#broadcast`. # Provides a single layer which can improve its implementation over time without changes needed to # the classes using it. Use {Synchronization::Object} not this abstract class. # diff --git a/spec/concurrent/atomic/cyclic_barrier_spec.rb b/spec/concurrent/atomic/cyclic_barrier_spec.rb index 46c6b7f00..fdcc34026 100644 --- a/spec/concurrent/atomic/cyclic_barrier_spec.rb +++ b/spec/concurrent/atomic/cyclic_barrier_spec.rb @@ -56,7 +56,7 @@ module Concurrent end describe 'reset' do - it 'should release all waiting threads' do + it 'should release all waiting threads', buggy: true do start_latch = CountDownLatch.new(1) continue_latch = CountDownLatch.new(1) diff --git a/spec/concurrent/channel/buffer/base_shared.rb b/spec/concurrent/channel/buffer/base_shared.rb index fdc344e46..43f13016b 100644 --- a/spec/concurrent/channel/buffer/base_shared.rb +++ b/spec/concurrent/channel/buffer/base_shared.rb @@ -1,5 +1,9 @@ shared_examples :channel_buffer do + specify do + expect(subject).to respond_to(:blocking?) + end + context '#capacity' do specify { expect(subject.capacity).to be >= 0 } end @@ -61,17 +65,17 @@ end context '#take' do - it 'returns NO_VALUE when closed' do + it 'returns Concurrent::NULL when closed' do subject.close - expect(subject.take).to eq Concurrent::Channel::Buffer::NO_VALUE + expect(subject.take).to eq Concurrent::NULL end end context '#next' do - it 'returns NO_VALUE, false when closed' do + it 'returns Concurrent::NULL, false when closed' do subject.close item, more = subject.next - expect(item).to eq Concurrent::Channel::Buffer::NO_VALUE + expect(item).to eq Concurrent::NULL expect(more).to be false end end @@ -88,13 +92,13 @@ expect(subject.poll).to eq 42 end - it 'returns NO_VALUE immediately if no item is available' do - expect(subject.poll).to eq Concurrent::Channel::Buffer::NO_VALUE + it 'returns Concurrent::NULL immediately if no item is available' do + expect(subject.poll).to eq Concurrent::NULL end - it 'returns NO_VALUE when closed' do + it 'returns Concurrent::NULL when closed' do subject.close - expect(subject.poll).to eq Concurrent::Channel::Buffer::NO_VALUE + expect(subject.poll).to eq Concurrent::NULL end end diff --git a/spec/concurrent/channel/buffer/base_spec.rb b/spec/concurrent/channel/buffer/base_spec.rb new file mode 100644 index 000000000..86d2ebc1e --- /dev/null +++ b/spec/concurrent/channel/buffer/base_spec.rb @@ -0,0 +1,74 @@ +require_relative 'buffered_shared' + +module Concurrent::Channel::Buffer + + describe Base do + + subject { described_class.new } + + specify do + expect(subject.capacity).to eq 0 + end + + specify do + expect(subject).to be_blocking + end + + specify do + expect { + subject.size + }.to raise_error(NotImplementedError) + end + + specify do + expect { + subject.empty? + }.to raise_error(NotImplementedError) + end + + specify do + expect { + subject.full? + }.to raise_error(NotImplementedError) + end + + specify do + expect { + subject.put(42) + }.to raise_error(NotImplementedError) + end + + specify do + expect { + subject.offer(42) + }.to raise_error(NotImplementedError) + end + + specify do + expect { + subject.take + }.to raise_error(NotImplementedError) + end + + specify do + expect { + subject.poll + }.to raise_error(NotImplementedError) + end + + specify do + expect { + subject.next + }.to raise_error(NotImplementedError) + end + + specify do + expect(subject).to_not be_closed + end + + specify do + subject.close + expect(subject).to be_closed + end + end +end diff --git a/spec/concurrent/channel/buffer/buffered_shared.rb b/spec/concurrent/channel/buffer/buffered_shared.rb index 7789fe0d7..b6500c5d5 100644 --- a/spec/concurrent/channel/buffer/buffered_shared.rb +++ b/spec/concurrent/channel/buffer/buffered_shared.rb @@ -4,8 +4,7 @@ it_behaves_like :channel_buffer - context 'initialization' do - + context '#initialize' do it 'raises an exception if size <= 0' do expect { described_class.new(0) @@ -14,15 +13,33 @@ end context '#capacity' do - it 'returns the maximum capacity of the buffer' do subject = described_class.new(10) expect(subject.capacity).to eq 10 end end - context '#empty?' do + context '#size' do + + it 'is 0 when first created' do + expect(subject.size).to eq 0 + end + + it 'returns the number of items in the buffer' do + fill = subject.capacity / 2 + fill.times { subject.put(:foo) } + expect(subject.size).to eq fill + end + it 'is 0 when there are taking threads but no putting threads' do + t = Thread.new { subject.take } + t.join(0.1) + expect(subject.size).to eq 0 + t.kill # cleanup + end + end + + context '#empty?' do it 'returns true when empty' do subject = described_class.new(10) expect(subject).to be_empty @@ -42,7 +59,6 @@ end context '#offer' do - it 'enqueues the item immediately when not full and not closed' do subject.offer(:foo) expect(subject.take).to eq :foo @@ -74,9 +90,9 @@ expect(t.status).to be false end - it 'returns NO_VALUE when closed and empty' do + it 'returns Concurrent::NULL when closed and empty' do subject.close - expect(subject.take).to eq Concurrent::Channel::Buffer::NO_VALUE + expect(subject.take).to eq Concurrent::NULL end end @@ -128,19 +144,32 @@ expect(more3).to be true end - it 'returns false when closed and last item' do - subject.offer(:foo) - subject.offer(:bar) - subject.offer(:baz) + it 'returns , true when closed and last item' do + capacity = subject.capacity + expect(capacity).to be >= 1 + + capacity.times { subject.put(:foo) } subject.close - _, more1 = subject.next - _, more2 = subject.next - _, more3 = subject.next + capacity.times do + item, more = subject.next + expect(item).to eq :foo + expect(more).to be true + end + end - expect(more1).to be true - expect(more2).to be true - expect(more3).to be false + it 'returns Concurrent::NULL, false when closed and no items remain' do + capacity = subject.capacity + expect(capacity).to be >= 1 + + capacity.times { subject.put(:foo) } + subject.close + + capacity.times { subject.next } + + item, more = subject.next + expect(item).to eq Concurrent::NULL + expect(more).to be false end end end diff --git a/spec/concurrent/channel/buffer/buffered_spec.rb b/spec/concurrent/channel/buffer/buffered_spec.rb index e819b5729..78d2c3de3 100644 --- a/spec/concurrent/channel/buffer/buffered_spec.rb +++ b/spec/concurrent/channel/buffer/buffered_spec.rb @@ -4,22 +4,47 @@ module Concurrent::Channel::Buffer describe Buffered do - specify { expect(subject).to be_blocking } + let(:capacity) { 10 } + subject { described_class.new(capacity) } - subject { described_class.new(10) } it_behaves_like :channel_buffered_buffer - context '#full?' do + specify do + expect(subject).to be_blocking + end - it 'returns true when full' do + context '#full?' do + it 'returns true when at max capacity' do subject = described_class.new(1) subject.put(:foo) expect(subject).to be_full end end - context '#offer' do + context '#put' do + it 'blocks when at capacity until a thread is ready to take' do + subject = described_class.new(1) + subject.put(13) + bucket = Concurrent::AtomicReference.new(nil) + t = Thread.new do + subject.put(42) + bucket.value = 42 + end + + t.join(0.1) + + before = bucket.value + subject.take + t.join(0.1) + after = bucket.value + + expect(before).to be nil + expect(after).to eq 42 + expect(t.status).to be false + end + end + context '#offer' do it 'returns false immediately when full' do subject = described_class.new(1) subject.put(:foo) diff --git a/spec/concurrent/channel/buffer/dropping_spec.rb b/spec/concurrent/channel/buffer/dropping_spec.rb index b279c3494..ecb321a98 100644 --- a/spec/concurrent/channel/buffer/dropping_spec.rb +++ b/spec/concurrent/channel/buffer/dropping_spec.rb @@ -4,11 +4,14 @@ module Concurrent::Channel::Buffer describe Dropping do - specify { expect(subject).to_not be_blocking } - subject { described_class.new(10) } + it_behaves_like :channel_buffered_buffer + specify do + expect(subject).to_not be_blocking + end + context '#put' do it 'does not block when full' do diff --git a/spec/concurrent/channel/buffer/sliding_spec.rb b/spec/concurrent/channel/buffer/sliding_spec.rb index 6287e06e3..eb6aa9300 100644 --- a/spec/concurrent/channel/buffer/sliding_spec.rb +++ b/spec/concurrent/channel/buffer/sliding_spec.rb @@ -4,11 +4,14 @@ module Concurrent::Channel::Buffer describe Sliding do - specify { expect(subject).to_not be_blocking } - subject { described_class.new(10) } + it_behaves_like :channel_buffered_buffer + specify do + expect(subject).to_not be_blocking + end + context '#put' do it 'does not block when full' do diff --git a/spec/concurrent/channel/buffer/ticker_spec.rb b/spec/concurrent/channel/buffer/ticker_spec.rb new file mode 100644 index 000000000..9e7ac2df6 --- /dev/null +++ b/spec/concurrent/channel/buffer/ticker_spec.rb @@ -0,0 +1,60 @@ +require_relative 'timing_buffer_shared' + +module Concurrent::Channel::Buffer + + describe Ticker do + + let(:delay) { 0.1 } + subject { described_class.new(delay) } + + it_behaves_like :channel_timing_buffer + + context '#take' do + it 'triggers until closed' do + expected = 3 + actual = 0 + expected.times { actual += 1 if subject.take.is_a? Concurrent::Channel::Tick } + expect(actual).to eq expected + end + + it 'returns Concurrent::NULL when closed after trigger' do + subject.take + subject.close + expect(subject).to be_closed + expect(subject.take).to eq Concurrent::NULL + end + end + + context '#poll' do + it 'triggers until closed' do + expected = 3 + actual = 0 + expected.times do + until subject.poll.is_a?(Concurrent::Channel::Tick) + actual += 1 + end + end + end + end + + context '#next' do + it 'triggers until closed' do + expected = 3 + actual = 0 + expected.times { actual += 1 if subject.next.first.is_a? Concurrent::Channel::Tick } + expect(actual).to eq expected + end + + it 'returns true for more while open' do + _, more = subject.next + expect(more).to be true + end + + it 'returns false for more once closed' do + subject.close + _, more = subject.next + expect(more).to be false + end + end + end +end diff --git a/spec/concurrent/channel/buffer/timer_spec.rb b/spec/concurrent/channel/buffer/timer_spec.rb index 46aa0813c..82c349180 100644 --- a/spec/concurrent/channel/buffer/timer_spec.rb +++ b/spec/concurrent/channel/buffer/timer_spec.rb @@ -1,39 +1,43 @@ +require_relative 'timing_buffer_shared' + module Concurrent::Channel::Buffer describe Timer do - subject { described_class.new(0) } - - specify { expect(subject).to be_blocking } + let(:delay) { 0.1 } + subject { described_class.new(0.1) } - specify { expect(subject.size).to eq 1 } - - context '#empty?' do - pending - end + it_behaves_like :channel_timing_buffer - context '#full?' do - pending - end - - context '#put' do - pending - end - - context '#offer' do - pending + context '#take' do + it 'closes automatically on first take' do + expect(subject.take).to be_truthy + expect(subject).to be_closed + end end - context '#take' do - pending + context '#poll' do + it 'closes automatically on first take' do + loop do + break if subject.poll != Concurrent::NULL + end + expect(subject).to be_closed + end end context '#next' do - pending + it 'closes automatically on first take' do + loop do + value, _ = subject.next + break if value != Concurrent::NULL + end + expect(subject).to be_closed + end + + it 'returns false for more' do + _, more = subject.next + expect(more).to be false end - - context '#poll' do - pending end end end diff --git a/spec/concurrent/channel/buffer/timing_buffer_shared.rb b/spec/concurrent/channel/buffer/timing_buffer_shared.rb new file mode 100644 index 000000000..c889d79da --- /dev/null +++ b/spec/concurrent/channel/buffer/timing_buffer_shared.rb @@ -0,0 +1,167 @@ +require_relative 'base_shared' + +shared_examples :channel_timing_buffer do + + specify do + expect(subject).to be_blocking + end + + context '#capacity' do + specify do + expect(subject.capacity).to eq 1 + end + end + + context '#size' do + specify do + expect(subject.size).to eq 0 + end + end + + context '#empty?' do + specify do + expect(subject).to_not be_empty + end + end + + context '#full?' do + specify do + expect(subject).to be_full + end + end + + context '#put' do + specify do + expect(subject.put(:foo)).to be false + end + end + + context '#offer' do + specify do + expect(subject.offer(:foo)).to be false + end + end + + context '#take' do + + it 'blocks when the timer is not ready' do + actual = Concurrent::AtomicBoolean.new(false) + subject = described_class.new(10) + t = Thread.new do + subject.take + actual.make_true + end + t.join(0.1) + actual = actual.value + t.kill # clean up + expect(actual).to be false + end + + it 'returns a Tick' do + subject = described_class.new(0.1) + expect(subject.take).to be_a Concurrent::Channel::Tick + end + + it 'triggers after the specified time interval' do + start = Concurrent::Channel::Tick.new.monotonic + subject = described_class.new(0.1) + actual = subject.take.monotonic + expect(actual - start).to be >= 0.1 + end + + it 'returns Concurrent::NULL when closed' do + subject.close + expect(subject.take).to eq Concurrent::NULL + end + end + + context '#poll' do + + it 'returns Concurrent::NULL when the timer is not ready' do + subject = described_class.new(0.1) + expect(subject.poll).to eq Concurrent::NULL + end + + it 'returns a Tick' do + subject = described_class.new(0.1) + sleep(0.2) + expect(subject.poll).to be_a Concurrent::Channel::Tick + end + + it 'returns Concurrent::NULL when closed' do + subject.close + expect(subject.poll).to eq Concurrent::NULL + end + + it 'triggers after the specified time interval' do + start = Concurrent::Channel::Tick.new.monotonic + subject = described_class.new(0.1) + sleep(0.2) + actual = subject.poll.monotonic + expect(actual - start).to be >= 0.1 + end + end + + context '#next' do + + it 'blocks when the timer is not ready' do + actual = Concurrent::AtomicBoolean.new(false) + subject = described_class.new(10) + t = Thread.new do + subject.next + actual.make_true + end + t.join(0.1) + actual = actual.value + t.kill # clean up + expect(actual).to be false + end + + it 'returns a Tick when open' do + subject = described_class.new(0.1) + value, _ = subject.next + expect(value).to be_a Concurrent::Channel::Tick + end + + it 'returns Concurrent::NULL, false when closed' do + subject.close + expect(subject.take).to eq Concurrent::NULL + end + + it 'triggers after the specified time interval' do + start = Concurrent::Channel::Tick.new.monotonic + subject = described_class.new(0.1) + actual, _ = subject.next + expect(actual.monotonic - start).to be >= 0.1 + end + end + + context '#close' do + + it 'sets #closed? to false' do + subject.close + expect(subject).to be_closed + end + + it 'returns true when not previously closed' do + expect(subject.close).to be true + end + + it 'returns false when already closed' do + subject.close + expect(subject.close).to be false + end + end + + context '#closed?' do + + it 'returns true when new' do + expect(subject).to_not be_closed + end + + it 'returns false after #close' do + subject.close + expect(subject).to be_closed + end + end +end diff --git a/spec/concurrent/channel/buffer/unbuffered_spec.rb b/spec/concurrent/channel/buffer/unbuffered_spec.rb index 4e4bbe6b8..19470cce2 100644 --- a/spec/concurrent/channel/buffer/unbuffered_spec.rb +++ b/spec/concurrent/channel/buffer/unbuffered_spec.rb @@ -4,13 +4,80 @@ module Concurrent::Channel::Buffer describe Unbuffered do - specify { expect(subject).to be_blocking } - subject { described_class.new } + it_behaves_like :channel_buffer + specify do + expect(subject).to be_blocking + end + + specify do + expect(subject.capacity).to eq 1 + end + + context '#size' do + + it 'is 0 when first created' do + expect(subject.size).to eq 0 + end + + it 'is 1 when a putting thread is waiting' do + t = Thread.new { subject.put(:foo) } + t.join(0.1) + expect(subject.size).to eq 1 + t.kill # cleanup + end + + it 'is 0 when there are taking threads but no putting threads' do + t = Thread.new { subject.take } + t.join(0.1) + expect(subject.size).to eq 0 + t.kill # cleanup + end + end + + context '#empty?' do + + it 'is true when there are no putting threads' do + expect(subject).to be_empty + end + + it 'is false when there are waiting putting threads' do + t = Thread.new { subject.put(:foo) } + t.join(0.1) + expect(subject).to_not be_empty + t.kill # cleanup + end + end + + context '#full?' do + + it 'is false when there are no putting threads' do + expect(subject).to_not be_full + end + + it 'is false when there are waiting putting threads' do + t = Thread.new { subject.put(:foo) } + t.join(0.1) + expect(subject).to be_full + t.kill # cleanup + end + end + context '#put' do + it 'does not enqueue the item when closed' do + subject.close + subject.put(:foo) + expect(subject).to be_empty + end + + it 'returns false when closed' do + subject.close + expect(subject.put(:foo)).to be false + end + it 'blocks until a thread is ready to take' do subject # initialize on this thread bucket = Concurrent::AtomicReference.new(nil) @@ -30,11 +97,71 @@ module Concurrent::Channel::Buffer expect(after).to eq 42 expect(t.status).to be false end + + it 'delivers when closed after put starts' do + t = Thread.new do + subject.put(:foo) + end + t.join(0.1) + subject.close + + item = subject.take + t.kill #clean up + + expect(item).to eq :foo + end + end + + context '#offer' do + + it 'returns false immediately when a put in in progress' do + subject # initialize on this thread + t = Thread.new do + subject.put(:foo) # block the thread + end + t.join(0.1) + + ok = subject.offer(:bar) + subject.poll # release the blocked thread + + expect(ok).to be false + end + + it 'gives the item to a waiting taker and returns true' do + subject # initialize on this thread + bucket = Concurrent::AtomicReference.new(nil) + t = Thread.new do + bucket.value = subject.take + end + t.join(0.1) + + before = bucket.value + ok = subject.offer(42) + t.join(0.1) + after = bucket.value + + expect(ok).to be true + expect(before).to be nil + expect(after).to eq 42 + end end context '#take' do - it 'blocks until not empty the returns the first item' do + it 'returns false immediately when a put in in progress' do + subject # initialize on this thread + t = Thread.new do + subject.put(:foo) # block the thread + end + t.join(0.1) + + ok = subject.offer(:bar) + subject.poll # release the blocked thread + + expect(ok).to be false + end + + it 'gives the item to a waiting taker and returns true' do subject # initialize on this thread bucket = Concurrent::AtomicReference.new(nil) t = Thread.new do @@ -43,13 +170,13 @@ module Concurrent::Channel::Buffer t.join(0.1) before = bucket.value - subject.put(42) + ok = subject.offer(42) t.join(0.1) after = bucket.value + expect(ok).to be true expect(before).to be nil expect(after).to eq 42 - expect(t.status).to be false end end @@ -70,7 +197,7 @@ module Concurrent::Channel::Buffer expect(before).to eq [] expect(after.first).to eq 42 - expect(after.last).to be false + expect(after.last).to be true expect(t.status).to be false end @@ -89,39 +216,33 @@ module Concurrent::Channel::Buffer expect(item).to eq 42 expect(more).to be true end - end - - context '#offer' do - it 'returns false immediately when a put in in progress' do - subject # initialize on this thread + it 'returns , true when closed and last item' do t = Thread.new do - subject.put(:foo) # block the thread + subject.put(:foo) end t.join(0.1) + subject.close - ok = subject.offer(:bar) - subject.poll # release the blocked thread + item, more = subject.next + t.kill #clean up - expect(ok).to be false + expect(item).to eq :foo + expect(more).to be true end - it 'gives the item to a waiting taker and returns true' do - subject # initialize on this thread - bucket = Concurrent::AtomicReference.new(nil) + it 'returns Concurrent::NULL, false when closed and no items remain' do t = Thread.new do - bucket.value = subject.take + subject.put(:foo) end - t.join(0.1) + subject.close - before = bucket.value - ok = subject.offer(42) - t.join(0.1) - after = bucket.value + subject.next + item, more = subject.next + t.kill #clean up - expect(ok).to be true - expect(before).to be nil - expect(after).to eq 42 + expect(item).to eq Concurrent::NULL + expect(more).to be false end end end diff --git a/spec/concurrent/channel/integration_spec.rb b/spec/concurrent/channel/integration_spec.rb index 9d5ae1dc3..98d03872d 100644 --- a/spec/concurrent/channel/integration_spec.rb +++ b/spec/concurrent/channel/integration_spec.rb @@ -1,4 +1,4 @@ -describe 'channel integration tests', notravis: true do +describe 'channel integration tests' do let!(:examples_root) { File.expand_path(File.join(File.dirname(__FILE__), '../../../examples')) } @@ -65,7 +65,7 @@ expect(result).to eq expected end - specify 'default-selection.rb' do + specify 'default-selection.rb', notravis: true do expected = <<-STDOUT . . diff --git a/spec/concurrent/channel/tick_spec.rb b/spec/concurrent/channel/tick_spec.rb index 2237cc911..08c70e80e 100644 --- a/spec/concurrent/channel/tick_spec.rb +++ b/spec/concurrent/channel/tick_spec.rb @@ -26,6 +26,10 @@ class Channel expect(subject.utc.to_f).to eq subject.epoch end + specify do + expect(subject.to_s).to match /\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6} \+\d{4} UTC/ + end + context 'comparison' do it 'correctly compares to a Numeric (monotonic)' do diff --git a/spec/concurrent/channel_spec.rb b/spec/concurrent/channel_spec.rb index 3683b700d..62f85ad9d 100644 --- a/spec/concurrent/channel_spec.rb +++ b/spec/concurrent/channel_spec.rb @@ -10,7 +10,7 @@ module Concurrent }.to raise_error(ArgumentError) end - it 'is :unbuffered when neither :buffer nore :size is given' do + it 'is :unbuffered when neither :buffer nore :capacity is given' do expect(Channel::Buffer::Unbuffered).to receive(:new).with(no_args).and_call_original Channel.new end @@ -20,184 +20,313 @@ module Concurrent Channel.new(buffer: :unbuffered) end - it 'is :unbuffered when :buffered and size: 0' do + it 'is :unbuffered when :buffered and capacity: 0' do expect(Channel::Buffer::Unbuffered).to receive(:new).with(no_args).and_call_original - Channel.new(buffer: :buffered, size: 0) + Channel.new(buffer: :buffered, capacity: 0) end - it 'raises an exception when both :unbuffered and :size are given' do + it 'raises an exception when both :unbuffered and :capacity are given' do expect { - Channel.new(buffer: :unbuffered, size: 0) + Channel.new(buffer: :unbuffered, capacity: 0) }.to raise_error(ArgumentError) end - it 'is :buffered when :size > 0 and no :buffer given' do + it 'is :buffered when :capacity > 0 and no :buffer given' do expect(Channel::Buffer::Buffered).to receive(:new).with(5).and_call_original - Channel.new(size: 5) + Channel.new(capacity: 5) end it 'is :buffered when :buffered given' do expect(Channel::Buffer::Buffered).to receive(:new).with(5).and_call_original - Channel.new(buffer: :buffered, size: 5) + Channel.new(buffer: :buffered, capacity: 5) end - it 'raises an exception when :buffered given without :size' do + it 'raises an exception when :buffered given without :capacity' do expect { Channel.new(buffer: :buffered) }.to raise_error(ArgumentError) end - it 'raises an exception when :buffered and :size < 0' do + it 'raises an exception when :buffered and :capacity < 0' do expect { - Channel.new(buffer: :buffered, size: -1) + Channel.new(buffer: :buffered, capacity: -1) }.to raise_error(ArgumentError) end - it 'is :dropping when :dropping and :size > 0' do + it 'is :dropping when :dropping and :capacity > 0' do expect(Channel::Buffer::Dropping).to receive(:new).with(5).and_call_original - Channel.new(buffer: :dropping, size: 5) + Channel.new(buffer: :dropping, capacity: 5) end - it 'raises an exception when :dropping given without :size' do + it 'raises an exception when :dropping given without :capacity' do expect { Channel.new(buffer: :dropping) }.to raise_error(ArgumentError) end - it 'raises an exception when :dropping and :size < 1' do + it 'raises an exception when :dropping and :capacity < 1' do expect { - Channel.new(buffer: :dropping, size: 0) + Channel.new(buffer: :dropping, capacity: 0) }.to raise_error(ArgumentError) end - it 'is :sliding when :sliding and :size > 0' do + it 'is :sliding when :sliding and :capacity > 0' do expect(Channel::Buffer::Sliding).to receive(:new).with(5).and_call_original - Channel.new(buffer: :sliding, size: 5) + Channel.new(buffer: :sliding, capacity: 5) end - it 'raises an exception when :sliding given without :size' do + it 'raises an exception when :sliding given without :capacity' do expect { Channel.new(buffer: :sliding) }.to raise_error(ArgumentError) end - it 'raises an exception when :sliding and :size < 1' do + it 'raises an exception when :sliding and :capacity < 1' do expect { - Channel.new(buffer: :sliding, size: 0) + Channel.new(buffer: :sliding, capacity: 0) }.to raise_error(ArgumentError) end + + it 'uses the given buffer' do + buffer = Channel::Buffer::Buffered.new(10) + subject = Channel.new(buffer) + expect(subject).to receive(:put).with(42) + subject.put(42) + end end - context '#put' do + context 'factories' do - it 'enqueues the item when not full and not closed' do - subject = Channel.new(buffer: :buffered, size: 2) - subject.put(:foo) - internal_buffer = subject.instance_variable_get(:@buffer) - expect(internal_buffer).to_not be_empty + specify do + expect(Channel::Buffer::Ticker).to receive(:new).with(10).and_call_original + Channel.ticker(10) end + specify do + expect(Channel::Buffer::Timer).to receive(:new).with(10).and_call_original + Channel.timer(10) + end + end + + context '#put' do + it 'returns true on success' do - subject = Channel.new(buffer: :buffered, size: 2) + subject = Channel.new(buffer: :buffered, capacity: 2) expect(subject.put(:foo)).to be true end - it 'returns false when closed' do - subject = Channel.new(buffer: :buffered, size: 2) + it 'returns false on failure' do + subject = Channel.new(buffer: :buffered, capacity: 2) subject.close expect(subject.put(:foo)).to be false end + + it 'rejects when the validator returns false' do + validator = ->(value) { false } + subject = Channel.new(capacity: 10, validator: validator) + expect(subject.put(42)).to be false + end + + it 'rejects when the validator raises an exception' do + validator = ->(value) { raise StandardError } + subject = Channel.new(capacity: 10, validator: validator) + expect(subject.put(42)).to be false + end + + it 'rejects nil' do + expect(subject.put(nil)).to be false + end end context 'put!' do + it 'returns true on success' do + subject = Channel.new(buffer: :buffered, capacity: 2) + expect(subject.put!(:foo)).to be true + end + it 'raises an exception on failure' do - subject = Channel.new(buffer: :buffered, size: 2) + subject = Channel.new(buffer: :buffered, capacity: 2) subject.close expect { subject.put!(:foo) }.to raise_error(Channel::Error) end + + it 'rejects when the validator returns false' do + validator = ->(value) { false } + subject = Channel.new(capacity: 10, validator: validator) + expect{ + subject.put!(42) + }.to raise_error(Channel::ValidationError) + end + + it 'rejects when the validator raises an exception' do + validator = ->(value) { raise StandardError } + subject = Channel.new(capacity: 10, validator: validator) + expect{ + subject.put!(42) + }.to raise_error(StandardError) + end + + it 'rejects nil' do + expect { + subject.put!(nil) + }.to raise_error(Channel::ValidationError) + end end context 'put?' do it 'returns a just Maybe on success' do - subject = Channel.new(buffer: :buffered, size: 2) + subject = Channel.new(buffer: :buffered, capacity: 2) result = subject.put?(:foo) expect(result).to be_a Concurrent::Maybe expect(result).to be_just end it 'returns a nothing Maybe on failure' do - subject = Channel.new(buffer: :buffered, size: 2) + subject = Channel.new(buffer: :buffered, capacity: 2) subject.close result = subject.put?(:foo) expect(result).to be_a Concurrent::Maybe expect(result).to be_nothing end - end - context '#offer' do + it 'rejects when the validator returns false' do + validator = ->(value) { false } + subject = Channel.new(capacity: 10, validator: validator) + expect(subject.put?(42)).to be_nothing + end - it 'enqueues the item when not full and not closed' do - subject = Channel.new(buffer: :buffered, size: 2) - subject.offer(:foo) - internal_buffer = subject.instance_variable_get(:@buffer) - expect(internal_buffer).to_not be_empty + it 'rejects when the validator raises an exception' do + validator = ->(value) { false } + subject = Channel.new(capacity: 10, validator: validator) + expect(subject.put?(42)).to be_nothing + end + + it 'accepts nil' do + result = subject.put?(nil) + expect(result).to be_a Concurrent::Maybe + expect(result).to be_just end + end + + context '#offer' do it 'returns true on success' do - subject = Channel.new(buffer: :buffered, size: 2) + subject = Channel.new(buffer: :buffered, capacity: 2) expect(subject.offer(:foo)).to be true end - it 'returns false when closed' do - subject = Channel.new(buffer: :buffered, size: 2) + it 'returns false on failure' do + subject = Channel.new(buffer: :buffered, capacity: 2) subject.close expect(subject.offer(:foo)).to be false end + + it 'rejects when the validator returns false' do + validator = ->(value) { false } + subject = Channel.new(capacity: 10, validator: validator) + expect(subject.offer(42)).to be false + end + + it 'rejects when the validator raises an exception' do + validator = ->(value) { raise StandardError } + subject = Channel.new(capacity: 10, validator: validator) + expect(subject.offer(42)).to be false + end + + it 'rejects nil' do + expect(subject.offer(nil)).to be false + end end context 'offer!' do + it 'returns true on success' do + subject = Channel.new(buffer: :buffered, capacity: 2) + expect(subject.offer!(:foo)).to be true + end + it 'raises an exception on failure' do - subject = Channel.new(buffer: :buffered, size: 2) + subject = Channel.new(buffer: :buffered, capacity: 2) subject.close expect { subject.offer!(:foo) }.to raise_error(Channel::Error) end + + it 'rejects when the validator returns false' do + validator = ->(value) { false } + subject = Channel.new(capacity: 10, validator: validator) + expect{ + subject.offer!(42) + }.to raise_error(Channel::ValidationError) + end + + it 'rejects when the validator raises an exception' do + validator = ->(value) { raise StandardError } + subject = Channel.new(capacity: 10, validator: validator) + expect{ + subject.offer!(42) + }.to raise_error(StandardError) + end + + it 'rejects nil' do + expect { + subject.offer!(nil) + }.to raise_error(Channel::ValidationError) + end end context 'offer?' do it 'returns a just Maybe on success' do - subject = Channel.new(buffer: :buffered, size: 2) + subject = Channel.new(buffer: :buffered, capacity: 2) result = subject.offer?(:foo) expect(result).to be_a Concurrent::Maybe expect(result).to be_just end it 'returns a nothing Maybe on failure' do - subject = Channel.new(buffer: :buffered, size: 2) + subject = Channel.new(buffer: :buffered, capacity: 2) subject.close result = subject.offer?(:foo) expect(result).to be_a Concurrent::Maybe expect(result).to be_nothing end + + it 'rejects when the validator returns false' do + validator = ->(value) { false } + subject = Channel.new(capacity: 10, validator: validator) + expect(subject.offer?(42)).to be_nothing + end + + it 'rejects when the validator raises an exception' do + validator = ->(value) { false } + subject = Channel.new(capacity: 10, validator: validator) + expect(subject.offer?(42)).to be_nothing + end + + it 'accepts nil' do + subject = Channel.new(buffer: :buffered, capacity: 2) + result = subject.offer?(nil) + expect(result).to be_a Concurrent::Maybe + expect(result).to be_just + end end context '#take' do - subject { Channel.new(buffer: :buffered, size: 2) } + subject { Channel.new(buffer: :buffered, capacity: 2) } it 'takes the next item when not empty' do subject.put(:foo) expect(subject.take).to eq :foo end - it 'returns nil when empty and closed' do + it 'returns nil on failure' do subject.close expect(subject.take).to be nil end @@ -205,7 +334,12 @@ module Concurrent context '#take!' do - subject { Channel.new(buffer: :buffered, size: 2) } + subject { Channel.new(buffer: :buffered, capacity: 2) } + + it 'takes the next item when not empty' do + subject.put(:foo) + expect(subject.take!).to eq :foo + end it 'raises an exception on failure' do subject.close @@ -217,7 +351,7 @@ module Concurrent context '#take?' do - subject { Channel.new(buffer: :buffered, size: 2) } + subject { Channel.new(buffer: :buffered, capacity: 2) } it 'returns a just Maybe on success' do subject.put(:foo) @@ -237,7 +371,7 @@ module Concurrent context '#next' do - subject { Channel.new(buffer: :buffered, size: 3) } + subject { Channel.new(buffer: :buffered, capacity: 3) } it 'returns , true when there is one item' do subject.put(:foo) @@ -263,25 +397,38 @@ module Concurrent expect(more).to be false end - it 'returns , false when closed and last item' do - subject.offer(:foo) - subject.offer(:bar) - subject.offer(:baz) + it 'returns , true when closed and last item' do + capacity = subject.capacity + expect(capacity).to be >= 1 + + capacity.times { subject.put(:foo) } + subject.close + + capacity.times do + item, more = subject.next + expect(item).to eq :foo + expect(more).to be true + end + end + + it 'returns nil, false when closed and no items remain' do + capacity = subject.capacity + expect(capacity).to be >= 1 + + capacity.times { subject.put(:foo) } subject.close - _, more1 = subject.next - _, more2 = subject.next - _, more3 = subject.next + capacity.times { subject.next } - expect(more1).to be true - expect(more2).to be true - expect(more3).to be false + item, more = subject.next + expect(item).to be_nil + expect(more).to be false end end context '#next?' do - subject { Channel.new(buffer: :buffered, size: 2) } + subject { Channel.new(buffer: :buffered, capacity: 2) } it 'returns a just Maybe and true when there is one item' do subject.put(:foo) @@ -329,7 +476,7 @@ module Concurrent expect(subject.poll).to be nil end - it 'returns nil when closed' do + it 'returns nil on failure' do subject.close expect(subject.poll).to be nil end @@ -337,13 +484,23 @@ module Concurrent context '#poll!' do + it 'returns the next item immediately if available' do + subject # initialize on this thread + t = Thread.new do + subject.put(42) + end + t.join(0.1) + + expect(subject.poll!).to eq 42 + end + it 'raises an exception immediately if no item is available' do expect { subject.poll! }.to raise_error(Channel::Error) end - it 'raises an exception when closed' do + it 'raises an exception on failure' do subject.close expect { subject.poll! @@ -372,7 +529,7 @@ module Concurrent expect(result).to be_nothing end - it 'returns a nothing Maybe when closed' do + it 'returns a nothing Maybe on failure' do subject.close result = subject.poll? expect(result).to be_a Concurrent::Maybe @@ -381,15 +538,124 @@ module Concurrent end context '.each' do - pending + + it 'raises and exception when no block is given' do + expect { + subject.each + }.to raise_error(ArgumentError) + end + + it 'iterates until the channel is closed' do + expected = [13, 42, 2001] + subject = Channel.new(capacity: expected.length) + expected.each { |value| subject.put(value) } + subject.close + + actual = [] + subject.each { |value| actual << value } + expect(actual).to eq expected + end end - context '.go' do - pending + context 'goroutines', notravis: true do + + let(:default_executor) { Channel.const_get(:GOROUTINES) } + + context '.go' do + + it 'raises an exception when no block is given' do + expect { + Channel.go + }.to raise_error(ArgumentError) + end + + specify do + expect(default_executor).to receive(:post).with(1, 2, 3) + Channel.go(1, 2, 3) { nil } + end + end + + context '.go_via' do + + it 'raises an exception when no block is given' do + expect { + Channel.go_via + }.to raise_error(ArgumentError) + end + + specify do + executor = ImmediateExecutor.new + expect(executor).to receive(:post).with(1, 2, 3) + Channel.go_via(executor, 1, 2, 3) { nil } + end + end + + context '.go_loop' do + + it 'raises an exception when no block is given' do + expect { + Channel.go_loop + }.to raise_error(ArgumentError) + end + + it 'loops until the block returns false' do + actual = 0 + expected = 3 + latch = Concurrent::CountDownLatch.new(expected) + Channel.go_loop do + actual += 1 + latch.count_down + actual < expected + end + + latch.wait(10) + expect(actual).to eq expected + end + end + + context '.go_loop_via' do + + it 'raises an exception when no block is given' do + expect { + Channel.go_loop_via + }.to raise_error(ArgumentError) + end + + it 'loops until the block returns false' do + actual = 0 + expected = 3 + executor = ImmediateExecutor.new + latch = Concurrent::CountDownLatch.new(expected) + Channel.go_loop_via(executor) do + actual += 1 + latch.count_down + actual < expected + end + + latch.wait(3) + expect(actual).to eq expected + end + end end - context '.timer' do - pending + context 'select' do + + it 'raises an exception when no block is given' do + expect { + Channel.select + }.to raise_error(ArgumentError) + end + + it 'passes a selector to the block' do + actual = nil + Channel.select { |s| actual = s; s.error { } } + expect(actual).to be_a Channel::Selector + end + + specify do + expect_any_instance_of(Channel::Selector).to receive(:execute) + Channel.select { |s| s.error { } } + end end end end diff --git a/spec/concurrent/edge/lock_free_linked_set_spec.rb b/spec/concurrent/edge/lock_free_linked_set_spec.rb index 4231d4b8e..dbca94e44 100644 --- a/spec/concurrent/edge/lock_free_linked_set_spec.rb +++ b/spec/concurrent/edge/lock_free_linked_set_spec.rb @@ -19,7 +19,7 @@ expect(subject.add 'test string1').to be true end - context 'in a multi-threaded environment' do + context 'in a multi-threaded environment', buggy: true do it 'adds the items to the set' do to_insert = %w(one two three four five six) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c0075c151..f369238d9 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,10 +1,7 @@ $VERBOSE = nil # suppress our deprecation warnings -require 'concurrent' -require 'concurrent-edge' -Concurrent.use_stdlib_logger Logger::FATAL - -unless Concurrent.on_jruby_9000? +# wwe can't use our helpers here because we need to load the gem _after_ simplecov +unless RUBY_ENGINE == 'jruby' && 0 == (JRUBY_VERSION =~ /^9\.0\.0\.0/) if ENV['COVERAGE'] || ENV['CI'] || ENV['TRAVIS'] require 'simplecov' require 'coveralls' @@ -21,18 +18,17 @@ SimpleCov.start do project_name 'concurrent-ruby' add_filter '/build-tests/' - add_filter '/coverage/' - add_filter '/doc/' add_filter '/examples/' - add_filter '/pkg/' add_filter '/spec/' - add_filter '/tasks/' - add_filter '/yard-template/' - add_filter '/yardoc/' end end end +require 'concurrent' +require 'concurrent-edge' + +Concurrent.use_stdlib_logger Logger::FATAL + # import all the support files Dir[File.join(File.dirname(__FILE__), 'support/**/*.rb')].each { |f| require File.expand_path(f) }