Skip to content

Conversation

jdantonio
Copy link
Member

Go-like Channels

This is a spike of a new channel implementation. It is not ready for merging. It is functional enough that I was able to replicate all the channel examples from Go by Example and A Tour of Go. The sample code can be found in examples/a-tour-of-go-channels and examples/go-by-example-channels. I'm very happy with this direction and think it should eventually replace the implementation currently in Edge. I've created the PR to solicit feedback from anyone interested in the topic.

The primary model for this implementation is Go channels. Some features were also inspired by Clojure core.async, which may provide additional inspiration in the future. The select syntax was inspired by the Ruby Agent gem, but all code in this spike is original.

Random Notes

  • Clearly we don't have access to Go's runtime so we don't have real goroutines. Channels can be used with any of the executors but, for semantic consistency with Go, I created a Channel.go function. It just posts to a thread pool.
  • Go's basic operations are send and receive, but I when with the Clojure terms take and put. They are functionally the same (and I've created aliases for send and receive), but I also implemented Clojure's offer and pool so I remained consistent with terms.
  • Because both Go and Clojure use arrow-like symbols for send/receive operations, I wanted to do something similar here. I used << for send/put since that aligns with the post methods of our executors. For receive/take I needed a unary operator, which limited my choices. I went with ~, e.g. bucket = ~channel. I think it works well (see the code examples).
  • Ruby doesn't include the return value in the signature so implementing both of Go's receive variants with the same method name in Ruby doesn't work. I implemented the 2-value form as a method called next. It returns two values.
  • I implemented Go's range using Ruby's idiomatic each. I also included Enumerable in Channel. Most of the enumerable methods will block until the channel is closed. This is OK, it aligns with Go's range and it mirrors the behavior of similar methods in Clojure's core.async.
  • Go's channels are strongly typed. This doesn't work very well in Ruby. To support better typing I implemented a :validator option on the Channel constructor. It follows the same rules as Agent. Validation occurs before the given item is enqueued.
  • In addition to Go's buffered and unbuffered channels, I've also implemented Clojure's dropping and sliding buffers.
  • Go implements timers and tickers as channels. Although we already have ScheduledTask and TimerTask, they don't fit cleanly into channel operations. So I implemented Go-like timer and ticker channels. Unfortunately, Go's "timer" is analogous to our ScheduledTask and Go's "ticker" is analogous to our TimerTask. I've retained Go's nomenclature.

To-do

  • Full unit tests
  • Full yardoc
  • Stress testing

Examples

More examples can be found in examples/go-by-example-channels and examples/a-tour-of-go-channels.

A simple put/take example:

require 'concurrent-edge'
Channel = Concurrent::Edge::Channel

ch = Channel.new(size: 2)
ch << 1
ch << 2

puts ~ch
puts ~ch

produces:

1
2

A more complext example:

require 'concurrent-edge'
Channel = Concurrent::Edge::Channel

def fibonacci(c, quit)
  x, y = 0, 1
  loop do
    Channel.select do |s|
      s.put(c, x) { x, y = y, x+y; x }
      s.take(quit) do
        puts 'quit'
        return
      end
    end
  end
end

c = Channel.new
quit = Channel.new

Channel.go do
  10.times { puts ~c }
  quit << 0
end

fibonacci(c, quit)

produces:

0
1
1
2
3
5
8
13
21
34
quit

Finally, tickers, timers, and select:

require 'concurrent-edge'
Channel = Concurrent::Edge::Channel

tick = Channel.tick(0.1)
boom = Channel.after(0.5)

loop do
  Channel.select do |s|
    s.take(tick) { print "tick.\n" }
    s.take(boom) do
      print "BOOM!\n"
      exit
    end
    s.default do
      print "    .\n"
      sleep(0.05)
    end
  end
end

produces:

.
.
tick.
.
.
tick.
.
.
tick.
.
.
tick.
.
.
tick.
BOOM!

@jdantonio jdantonio changed the title New channel implementation; functionally equivalent to Go. [SPIKE] New channel implementation Sep 3, 2015
@jdantonio
Copy link
Member Author

The most recent commit interoperates the new Edge::Future with the new Edge::Channel.

@sschepens
Copy link
Contributor

I'm not very fond of the select syntax, but I guess it's as close as you can get in ruby to go's syntax.
Great job!

@jdantonio
Copy link
Member Author

The Agent gem uses case in select, but it requires a direction symbol as a parameter:

  Channel.select do |s|
    s.case(tick, :receive) { print "tick.\n" }
    s.case(boom, :receive) do
      print "BOOM!\n"
      exit
    end
    s.default do
      print "    .\n"
      sleep(0.05)
    end
  end

I have mixed feelings: The use of case makes the code look more like Go, but it's also redundant. Why pass in a parameter when the method name can be used?

@sschepens Do you like the Agent syntax better?

@sschepens
Copy link
Contributor

@jdantonio No, I also feel it's redundant, I prefer your syntax, what i'm not fond of is having to receive and use the select object in the block, though I don't know if there's a way around it.
Maybe changing the method names receive and send feel more natural to me, though I don't know if it's a good decision to replace ruby's default send method.

@pitr-ch
Copy link
Member

pitr-ch commented Sep 7, 2015

Wow, looks very nice. I'll look at the code more deeply later.

@jdantonio jdantonio force-pushed the new-channel branch 4 times, most recently from 8569d36 to 00ba136 Compare September 11, 2015 02:38
@jdantonio jdantonio changed the title [SPIKE] New channel implementation New channel implementation Sep 17, 2015
@jdantonio jdantonio mentioned this pull request Sep 21, 2015
@jdantonio jdantonio added the enhancement Adding features, adding tests, improving documentation. label Sep 24, 2015
@jdantonio jdantonio added this to the 1.0.0 Release milestone Sep 24, 2015
@jdantonio jdantonio self-assigned this Sep 24, 2015
jdantonio added a commit that referenced this pull request Sep 24, 2015
@jdantonio jdantonio merged commit c65962c into master Sep 24, 2015
include Enumerable

GOROUTINES = Concurrent::CachedThreadPool.new
private_constant :GOROUTINES
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is extra pool created? It could use global IO pool afaik.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely it could. But since this is still early in development I wanted to keep these separate from the global pools, just in case, should channels exhibit any erratic behavior. I discuss this a little in channel.md. Should this ever move from Edge to Core I expect that they won't need a separate pool any more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Could you also add a TODO to the code if you are editing it?

@jdantonio jdantonio deleted the new-channel branch October 5, 2015 16:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Adding features, adding tests, improving documentation.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants