Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Channels

ztellman edited this page Jan 3, 2011 · 28 revisions

The basics

A channel in Aleph is just an event-driven queue. This means that it uses a FIFO (first-in, first-out) ordering of messages, and that messages can only be consumed using callbacks. Channels are created using (channel). Messages are enqueued using (enqueue channel message), and consumed using (receive channel callback).

If there are no callbacks registered when a message is enqueued, the message will remain in the channel.

> (def ch (channel))
<== []
> (enqueue ch 1 2 3) 
true

> (receive ch #(println "received" %))
received 1
true

If two callbacks are registered on an empty channel, they both receive the next message.

> (def ch (channel))
<== []
> (receive ch #(println "callback a:" %) #(println "callback b:" %))
true
> (enqueue ch 1)
callback a: 1
callback b: 1
true

To consume all messages with a callback, use (receive-all channel callback).

> (def ch (channel 1 2))
<== [1 2]
> (receive-all ch println)
1
2
true
> (enqueue ch 3)
3
true

To cancel a callback registered with receive or receive-all, call (cancel-callback channel callback).

You can prevent messages from being enqueued into a channel using (close channel) or (enqueue-and-close channel message). Once close has been called, the channel is sealed. Any attempt to enqueue a message into a sealed channel will return false.

> (def ch (channel))
<== []
> (close ch)
true
> (sealed? ch)
true
> (enqueue ch 1)
false

Once all messages in the channel have been consumed, the channel is closed. Any attempt to consume messages from a closed channel will return false.

> (receive ch println)
1
nil
> (closed? ch)
true
> (receive ch println)
false

We can hook into the channel’s closing and sealing using the on-closed and on-sealed functions. Both functions take callbacks which have zero parameters. However, it can also be convenient to simply test whether the channel is closed when we receive a message. For this reason, if there are no messages queued up in the channel when close is called, a final nil message will be enqueued. All functions which operate on channels ignore this trailing nil.

Let’s further consider the case of modeling a network socket. Channels are unidirectional, so to model bidirectional communication, we’d need two channels: one for receiving messages, and the other for enqueueing messages.

But this is needlessly complex; it’s much simpler to model bidirectional communication as a pair of endpoints.

Making a pair of endpoints is easy: just call (channel-pair), which will return a list of two channels. Any message enqueued into one channel can be received from the other. This means that you can both send and receive messages with a single channel.

For more on this topic, check out the wiki entry on connections.

using channels

fork

receive-all is only useful if we don’t care about the order of messages; if the contents of one message affects how we handle the next one, the only solution is to use receive, and to register a second callback based on what we receive.

This is fine if we’re the only one receiving messages from the channel, but if other callbacks are registered, they might consume a message in between when we receive the first message and register our second callback. To prevent this, we need to create our own copy of the channel.

(fork ch) will return a new channel that is a copy of ch, allowing for the same stream of messages to be consumed at different rates. Channels returned by fork are receive-only, and will close when ch closes.

> (def ch (channel 1 2 3))
#'ch
> (fork ch)
<== [1 2 3]
> ch
<== []

(fork n ch) will give us a list of n-many copies of ch.

> (def ch (channel 1 2 3))
#'ch
> (fork 2 ch)
[<== [1 2 3], <== [1 2 3]]
> ch
<== []

sequence operations on channels

map*, filter*, reduce*, reductions*, take*, and take-while* all do pretty much what you’d expect. All operations return a new channel, and consume the source channel.

(map* f ch) returns a new channel which maps f over all functions coming from ch.

> (map* inc (channel 1 2 3)
<== [2 3 4]

(filter* f ch) returns a new channel which contains all messages from ch which satisfy f.

> (filter* even? (channel 2 3 4))
<== [2 4]

(reduce* f val? ch) returns a constant channel (see advanced topics below) which will return the reduced value once all messages from ch have been consumed.

> (reduce* + (sealed-channel 1 2 3))
<== [6 ...]

(reductions* f val? ch) returns a channel which will emit all intermediate values for the reduce.

> (reductions* max (channel 1 3 2))
<== [1 3 3]

(take* n ch) returns a channel which will contain the first n messages from ch.

> (def ch (channel 1 2 3))
#'ch
> (take* 1 ch)
<== [1]
> ch
<== [2 3]

(take-while* f ch) will return a channel which contains all messages from ch until one doesn’t satisfy f. This channel will end on a trailing nil.

> (def ch (channel 1 2 3))
#'ch
> (take-while* #(< % 3) ch)
<== [1 2 nil]
> ch
<== [3]

siphon

(siphon source destination) will route all channels from source into destination. source will not close once destination is exhausted, so siphon can be used to merge the output of many channels together.

using channels inside transactions

Enqueuing a message into a channel representing an external resource results in a side effect. These operations are wrapped in io!, and therefore attempting to enqueue into them while in a transaction will throw an exception. This can happen via enqueue, or adding a callback which uses enqueue.

;; this will throw an exception
(receive channel #(enqueue io-channel %))	
(dosync
  (enqueue channel "hello"))

;; so will this
(enqueue channel "hello")
(dosync
  (receive channel #(enqueue io-channel %)))) 

fork and all sequence operations are transaction-safe, so to use channels within a transaction, we need to generate the source channel we need within the transaction, and siphon it into the I/O channel outside the transaction. This code accomplishes the same as the two above examples, but does it safely:

(siphon 
  (dosync (take* channel 1)) 
  io-channel)

Using channels synchronously

receive and receive-all are both asynchronous methods. They return immediately, and the callbacks are executed by the same thread that enqueues the message.

There are several functions for interacting synchronously with channels:

wait-for-message

(wait-for-message channel) receives and returns a message from a channel. If there is not a message currently in the channel, the thread will wait until a message is enqueued. A timeout (in milliseconds) can be optionally specified, using (wait-for-message channel timeout). If the timeout elapses, it will throw a java.util.concurrent.TimeoutException.

> (def ch (channel 1))
<== [1]
> (wait-for-message channel)
1

channel-seq

(channel-seq channel) consumes all messages currently in the channel, and returns them as a sequence. If there are no messages, it returns an empty sequence. If a timeout is specified using (channel-seq channel timeout), the call will wait until the timeout elapses or the channel is closed, and then returns all messages it received in that time.

So if we are enqueuing ten messages per second into a channel, and call (channel-seq ch 1000), the call will wait for one second and then return a sequence of ten messages.

> (def ch (channel))
<== []
> (future
     (dotimes [i 100]
       (enqueue ch i)
       (Thread/sleep 100)))
nil
> (channel-seq ch 1000)
(0 1 2 3 4 5 6 7 8 9)

Trailing nils will not show up in the returned seq.

lazy-channel-seq

(lazy-channel-seq channel) returns a lazy sequence which will only receive messages as it is realized. If it is realized when no messages are in the channel, it will wait until a message is enqueued. (take 1 (lazy-seq-channel channel)) has identical behavior to (wait-for-message channel). If you specify a timeout, using (lazy-channel-seq channel timeout), it will wait that many milliseconds for each message. If the timeout elapses, the sequence will terminate.

This means that if we’re willing to dedicate a thread to a channel, we can iterate over all the messages that pass through it.

(doseq [msg (lazy-channel-seq ch)]
  (println "received" msg))

Trailing nils will not show up in the returned seq.

Some simple applications

pub-sub model

A channel can be used as a simple publish/subcribe mechanism, if we think of enqueue as publish and receive-all as subscribe.

Advanced topics

constant-channel

A future in Clojure is a convenient way to represent an asynchronous computation, but getting the value is a synchronous operation. A constant channel is an asynchronous alternative. A constant channel is created using (constant-channel). Once a value has been enqueued into the channel, any attempt to receive a message from that channel will always return that value.

> (def ch (constant-channel))
<= []
> (enqueue ch 5)
nil
> (wait-for-message ch)
5
> (take 5 (lazy-channel-seq ch))
(5 5 5 5 5)

poll

Channels are not pure data structures; receiving a message has a side effect, namely that no one else can subsequently receive that same message from the channel. For this reason, we want to be able to carefully limit how many messages we receive. Using receive instead of receive-all is a start, but when presented with multiple channels the very least we can do is receive one message from each.

So, what if we want to only receive a single message from any of them? For this, we use poll. Let’s say we have two channels, a and b, and only want to receive from the first channel that has a message.

(poll {:a a, :b b} 1000)

This will return a constant channel. When a message is received, the constant channel will be populated with [channel-name message]. For instance, if 5 is enqueued into a, the constant channel will contain [:a 5]. If the timeout elapses without a message, the channel will contain nil.