Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions doc/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ msg = messages.take
puts msg
```

By default, channels are *unbuffered*, meaning that they have a size of zero and only accept puts and takes when both a putting and a taking thread are available. If a `put` is started when there is no taker thread the call will block. As soon as another thread calls `take` the exchange will occur and both calls will return on their respective threads. Similarly, is a `take` is started when there is no putting thread the call will block until another thread calls `put`.
By default, channels are *unbuffered*, meaning that they have a capacity of zero and only accept puts and takes when both a putting and a taking thread are available. If a `put` is started when there is no taker thread the call will block. As soon as another thread calls `take` the exchange will occur and both calls will return on their respective threads. Similarly, is a `take` is started when there is no putting thread the call will block until another thread calls `put`.

The following, slightly more complex example, concurrently sums two different halves of a list then combines the results. It uses an unbuffered channel to pass the results from the two goroutines back to the main thread. The main thread blocks on the two `take` calls until the worker goroutines are done. This example also uses the convenience aliases {#<<} and {#~}. Since channels in Go are part of the language, channel operations are performed using special channel operators rather than functions. These operators help clearly indicate that channel operations are being performed. The operator overloads `<<` for `put` and `~` for `take` help reinforce this idea in Ruby.

Expand All @@ -80,12 +80,12 @@ puts [x, y, x+y].join(' ')

## Channel Buffering

One common channel variation is a *buffered* channel. A buffered channel has a finite number of slots in the buffer which can be filled. Putting threads can put values into the channel even if there is no taking threads, up to the point where the buffer is filled. Once a buffer becomes full the normal blocking behavior resumes. A buffered channel is created by giving a `:size` option on channel creation:
One common channel variation is a *buffered* channel. A buffered channel has a finite number of slots in the buffer which can be filled. Putting threads can put values into the channel even if there is no taking threads, up to the point where the buffer is filled. Once a buffer becomes full the normal blocking behavior resumes. A buffered channel is created by giving a `:capacity` option on channel creation:

The following example creates a buffered channel with two slots. It then makes two `put` calls, adding values to the channel. These calls do not block because the buffer has room. Were a third `put` call to be made before an `take` calls, the third `put` would block.

```ruby
ch = Concurrent::Channel.new(size: 2)
ch = Concurrent::Channel.new(capacity: 2)
ch << 1
ch << 2

Expand All @@ -95,7 +95,7 @@ puts ~ch

## Channel Synchronization

The main purpose of channels is to synchronize operations across goroutines. One common pattern for this is to created a `size: 1` buffered channel which is used to signal that work is complete. The following example calls a `worker` function on a goroutine and passes it a "done" channel. The main thread then calls `take` on the "done" channel and blocks until signaled.
The main purpose of channels is to synchronize operations across goroutines. One common pattern for this is to created a `capacity: 1` buffered channel which is used to signal that work is complete. The following example calls a `worker` function on a goroutine and passes it a "done" channel. The main thread then calls `take` on the "done" channel and blocks until signaled.

```ruby
def worker(done_channel)
Expand All @@ -106,7 +106,7 @@ def worker(done_channel)
done_channel << true
end

done = Concurrent::Channel.new(size: 1)
done = Concurrent::Channel.new(capacity: 1)
Concurrent::Channel.go{ worker(done) }

~done # block until signaled
Expand Down Expand Up @@ -176,7 +176,7 @@ fibonacci(c, quit)

## Closing and Iterating Over Channels

Newly created channels are in an "open" state. Open channels can receive values via `put` operations. When a program is done with a channel it can be closed by calling the {#close} method. Once a channel is closed it will no longer allow values to be `put`. If the channel is buffered and values are in the buffer when the channel is closed, the remaining values can still be removed via `take` operations.
Newly created channels are in an "open" state. Open channels can receive values via `put` operations. When a program is done with a channel it can be closed by calling the `#close` method. Once a channel is closed it will no longer allow values to be `put`. If the channel is buffered and values are in the buffer when the channel is closed, the remaining values can still be removed via `take` operations.

The `Channel` class implements an {#each} method which can be used to retrieve successive values from the channel. The `each` method is a blocking method. When the channel is open and there are no values in the buffer, `each` will block until a new item is `put`. The `each` method will not exit until the channel is closed.

Expand All @@ -192,7 +192,7 @@ def fibonacci(n, c)
c.close
end

chan = Concurrent::Channel.new(size: 10)
chan = Concurrent::Channel.new(capacity: 10)
Concurrent::Channel.go { fibonacci(chan.capacity, c) }
chan.each { |i| puts i }
```
Expand Down
26 changes: 13 additions & 13 deletions doc/synchronization.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Provides common parent for all objects which need to be synchronized or be using other synchronization tools. It provides:

- Synchronized block
- Methods for waiting and signaling
- Methods for waiting and signaling
- Volatile fields
- Ensure visibility of final fields
- Fields with CAS operations
Expand Down Expand Up @@ -49,7 +49,7 @@ private
def ns_compute
ns_compute_reduce ns_compute_map
end
```
```
where `compute` defines how is it synchronized and `ns_compute` handles the behavior (in this case the computation). `ns_` methods should only call other `ns_` methods or `pr_` methods. They can call normal methods on other objects, but that should be done with care (better to avoid) because the thread escapes this object while the lock is still held, which can lead to deadlock. That's why the `report` method is called in `compute` and not in `ns_compute`.

`pr_` methods are pure functions they can be used in and outside of synchronized blocks.
Expand All @@ -60,16 +60,16 @@ Sometimes while already inside the synchronized block some condition is not met.

To fulfill these needs there are private methods:

- `ns_wait` {include:Concurrent::Synchronization::AbstractObject#ns_wait}
- `ns_wait_until` {include:Concurrent::Synchronization::AbstractObject#ns_wait_until}
- `ns_signal` {include:Concurrent::Synchronization::AbstractObject#ns_signal}
- `ns_broadcast` {include:Concurrent::Synchronization::AbstractObject#ns_broadcast}
- `ns_wait` {include:Concurrent::Synchronization::AbstractLockableObject#ns_wait}
- `ns_wait_until` {include:Concurrent::Synchronization::AbstractLockableObject#ns_wait_until}
- `ns_signal` {include:Concurrent::Synchronization::AbstractLockableObject#ns_signal}
- `ns_broadcast` {include:Concurrent::Synchronization::AbstractLockableObject#ns_broadcast}

All methods have to be called inside synchronized block.

## Volatile fields

`Synchronization::Object` can have volatile fields (Java semantic). They are defined by `attr_volatile :field_name`. `attr_volatile` defines reader and writer with the `field_name`. Any write is always immediately visible for any subsequent reads of the same field.
`Synchronization::Object` can have volatile fields (Java semantic). They are defined by `attr_volatile :field_name`. `attr_volatile` defines reader and writer with the `field_name`. Any write is always immediately visible for any subsequent reads of the same field.

## Ensure visibility of final fields

Expand All @@ -83,7 +83,7 @@ class AbstractPromise < Synchronization::Object
ensure_ivar_visibility!
end
# ...
end
end
```

### Naming conventions
Expand Down Expand Up @@ -112,10 +112,10 @@ class Event < Synchronization::Object
self
end
# ...
end
end
```

Operations on `@Touched` field have volatile semantic.
Operations on `@Touched` field have volatile semantic.

## Memory model

Expand All @@ -125,7 +125,7 @@ When writing libraries in `concurrent-ruby` we are reasoning based on following

The memory model is constructed based on our best effort and knowledge of the 3 main Ruby implementations (CRuby, JRuby, Rubinius). When considering certain aspect we always choose the weakest guarantee (e.g. local variable updates are always visible in CRuby but not in JRuby, so in this case JRuby behavior is picked). If some Ruby behavior is omitted here it is considered unsafe for use in parallel environment (Reasons may be lack of information, or difficulty of verification).

This takes in account following implementations:
This takes in account following implementations:

- CRuby 1.9 - 2.2 (no differences found)
- JRuby 1.7
Expand All @@ -139,10 +139,10 @@ We are interested in following behaviors:

### Variables

- **Local variables** - atomic assignment (only Integer and Object), non-volatile.
- **Local variables** - atomic assignment (only Integer and Object), non-volatile.
- Consequence: a lambda defined on `thread1` executing on `thread2` may not see updated values in local variables captured in its closure.
- Reason: local variables are non-volatile on Jruby and Rubinius.
- **Instance variables** - atomic assignment (only Integer and Object), non-volatile.
- **Instance variables** - atomic assignment (only Integer and Object), non-volatile.
- Consequence: Different thread may see old values; different thread may see not fully-initialized object.
- Reason: local variables are non-volatile on Jruby and Rubinius.
- **Constants** - atomic assignment, volatile.
Expand Down
7 changes: 3 additions & 4 deletions examples/a-tour-of-go-channels/buffered-channels.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
Channel = Concurrent::Channel

## A Tour of Go: Buffered Channels
# https://tour.golang.org/concurrency/3
# https://tour.golang.org/concurrency/3

ch = Channel.new(size: 2)
ch = Channel.new(capacity: 2)
ch << 1
ch << 2

puts ~ch
puts ~ch

expected = <<-STDOUT
__END__
1
2
STDOUT
5 changes: 2 additions & 3 deletions examples/a-tour-of-go-channels/channels.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Channel = Concurrent::Channel

## A Tour of Go: Channels
# https://tour.golang.org/concurrency/2
# https://tour.golang.org/concurrency/2

def sum(a, c)
sum = a.reduce(0, &:+)
Expand All @@ -22,6 +22,5 @@ def sum(a, c)

puts [x, y, x+y].join(' ')

expected = <<-STDOUT
__END__
-5 17 12
STDOUT
5 changes: 2 additions & 3 deletions examples/a-tour-of-go-channels/default-selection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

loop do
Channel.select do |s|
s.take(tick) { print "tick.\n" }
s.take(tick) { |t| print "tick.\n" if t }
s.take(boom) do
print "BOOM!\n"
exit
Expand All @@ -24,7 +24,7 @@
end
end

expected = <<-STDOUT
__END__
.
.
tick.
Expand All @@ -41,4 +41,3 @@
.
tick.
BOOM!
STDOUT
3 changes: 1 addition & 2 deletions examples/a-tour-of-go-channels/equivalent-binary-trees.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ def same(t1, t2)
puts same(new_tree(1), new_tree(1))
puts same(new_tree(1), new_tree(2))

expected = <<-STDOUT
__END__
true
false
STDOUT
7 changes: 3 additions & 4 deletions examples/a-tour-of-go-channels/range-and-close.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Channel = Concurrent::Channel

## A Tour of Go: Range and Close
# https://tour.golang.org/concurrency/4
# https://tour.golang.org/concurrency/4

def fibonacci(n, c)
x, y = 0, 1
Expand All @@ -16,11 +16,11 @@ def fibonacci(n, c)
c.close
end

c = Channel.new(size: 10)
c = Channel.new(capacity: 10)
Channel.go { fibonacci(c.capacity, c) }
c.each { |i| puts i }

expected = <<-STDOUT
__END__
0
1
1
Expand All @@ -31,4 +31,3 @@ def fibonacci(n, c)
13
21
34
STDOUT
3 changes: 1 addition & 2 deletions examples/a-tour-of-go-channels/select.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def fibonacci(c, quit)

fibonacci(c, quit)

expected = <<-STDOUT
__END__
0
1
1
Expand All @@ -42,4 +42,3 @@ def fibonacci(c, quit)
21
34
quit
STDOUT
5 changes: 2 additions & 3 deletions examples/go-by-example-channels/channel-buffering.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
## Go by Example: Channel Buffering
# https://gobyexample.com/channel-buffering

messages = Channel.new(size: 2) # buffered
messages = Channel.new(capacity: 2) # buffered

messages.put 'buffered'
messages.put 'channel'

puts messages.take
puts messages.take

expected = <<-STDOUT
__END__
buffered
channel
STDOUT
7 changes: 3 additions & 4 deletions examples/go-by-example-channels/channel-directions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ def pong(pings, pongs)
pongs << msg
end

pings = Channel.new(size: 1) # buffered
pongs = Channel.new(size: 1) # buffered
pings = Channel.new(capacity: 1) # buffered
pongs = Channel.new(capacity: 1) # buffered

ping(pings, 'passed message')
pong(pings, pongs)

puts ~pongs

expected = <<-STDOUT
__END__
passed message
STDOUT
5 changes: 2 additions & 3 deletions examples/go-by-example-channels/channel-synchronization.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ def worker(done_channel)
done_channel << true # alias for `#put`
end

done = Channel.new(size: 1) # buffered
done = Channel.new(capacity: 1) # buffered
Channel.go{ worker(done) }

~done # alias for `#take`

expected = <<-STDOUT
__END__
working...
done
STDOUT
5 changes: 2 additions & 3 deletions examples/go-by-example-channels/channels.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Channel = Concurrent::Channel

## Go by Example: Unbuffered Channel
# https://gobyexample.com/channels
# https://gobyexample.com/channels

messages = Channel.new # unbuffered

Expand All @@ -16,6 +16,5 @@
msg = messages.take
puts msg

expected = <<-STDOUT
__END__
ping
STDOUT
7 changes: 3 additions & 4 deletions examples/go-by-example-channels/closing-channels.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
Channel = Concurrent::Channel

## Go by Example: Closing Channels
# https://gobyexample.com/closing-channels
# https://gobyexample.com/closing-channels

validator = ->(v){ v.is_a? Numeric }
jobs = Channel.new(buffer: :buffered, size: 5,
jobs = Channel.new(buffer: :buffered, capacity: 5,
validator: validator)
done = Channel.new(buffer: :unbuffered)

Expand All @@ -34,7 +34,7 @@
print "sent all jobs\n"
~done

expected = <<-STDOUT
__END__
sent job 1
received job 1
sent job 2
Expand All @@ -43,4 +43,3 @@
received job 3
sent all jobs
received all jobs
STDOUT
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Channel = Concurrent::Channel

## Go by Example: Non-Blocking Channel Operations
# https://gobyexample.com/non-blocking-channel-operations
# https://gobyexample.com/non-blocking-channel-operations

messages = Channel.new # unbuffered
signals = Channel.new # unbuffered
Expand All @@ -27,8 +27,7 @@
s.default { print "no activity\n" }
end

expected = <<-STDOUT
__END__
no message received
no message sent
no activity
STDOUT
Loading