Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Channels

ossareh edited this page Mar 14, 2011 · 28 revisions

Introduction

The publish/subscribe model is simple. Subscribers register a callback function with the publisher, and the publisher passes every new event into all the registered callbacks. If there are no subscribers, the event is ignored.

But there’s a problem: if we allow multiple threads to publish events, our event stream is no longer ordered. The same callback may handle two different events at the same time, which means that we can’t rely on using one event to alter how we handle the next event. Many event-driven systems handle this by only allowing a single thread to publish events, but that’s a big limitation to impose on ourselves.

The publish/subscribe model also has difficulties with Clojure’s STM. If any of our callbacks cause side-effects, publishing inside of a transaction can cause the side-effect to be be executed multiple times. The only way to get around this is to create the event stream within the transaction, but delay the consumption of the event stream until we’ve exited the transaction.

What we need is finer-grained control over our event streams. We can push events using the publish/subscribe mechanism, but we also want to be able to pull them out of the stream at our own pace without losing messages. Channels allow us to do both.

The basics

A channel in Lamina is just an event-driven queue. This means that it uses a FIFO (first-in, first-out) ordering of messages, and that messages can only be consumed using callbacks. Channels are created using (channel & messages).

> (channel 1 2 3)
<== [1 2 3]

Messages are published using (enqueue channel & messages). If there are no subscribers, the messages will simply queue up inside the channel.

> (def ch (channel 1))
<== [1]
> (enqueue ch 2 3)
true
> ch
<== [1 2 3] 

Subscribers are registered using (receive-all channel & callbacks). The callback will be passed all messages currently in the channel, and all messages subsequently enqueued into the channel.

> (def ch (channel 1 2))
<== [1 2]
> (receive-all ch #(println "message:" %))
message: 1
message: 2
true
> (enqueue ch 3)
message: 3
true

We can also receive messages one at a time using (receive channel & callbacks).

> (def ch (channel 1 2 3))
<== [1 2 3]
> (receive ch #(println "message:" %))
message: 1
true
> ch
<== [1 2]

If we register multiple receive callbacks on an empty channel, they all receive the next message.

> (def ch (channel))
<== []
> (receive ch #(println "callback a:" %) #(println "callback b:" %))
true
> (enqueue ch 1)
callback a: 1
callback b: 1
true

To cancel a callback registered with receive or receive-all, call (cancel-callback channel callback).

You can prevent messages from being enqueued into a channel using (close channel) or (enqueue-and-close channel & messages). Once close has been called, the channel is closed. Any attempt to enqueue a message into a closed channel will return false.

> (def ch (channel))
<== []
> (close ch)
true
> (`? ch)
true
> (enqueue ch 1)
false

Once all messages in the channel have been consumed, the channel is drained. Any attempt to consume messages from a drained channel will return false.

> (receive ch println)
1
nil
> (drained? ch)
true
> (receive ch println)
false

To create a channel that is already closed, use (closed-channel & messages).

We can hook into the channel’s closing and sealing using the on-drained and on-closed functions. Both functions take callbacks which have zero parameters. However, it can also be convenient to simply test whether the channel is drained when we receive a message. For this reason, if there are no messages queued up in the channel when close is called, a final nil message will be enqueued. All functions which operate on channels ignore this trailing nil.

constant-channels

Channels represent streams of messages, but sometimes we only want to represent a single asynchronous value. A constant-channel is a channel that will only accept a single value, and will repeatedly emit that value once it is enqueued.

> (def ch (constant-channel 42))
<== [42 ...]
> (receive ch #(println "The answer is" %))
The answer is 42
true
> (receive-all ch #(println "The answer is always" %))
The answer is always 42
true
> ch
<== [42 ...]

This is basically an asynchronous variation on Clojure’s promise, and can be used for the same purposes.

Duplicating message streams

If a receive-all callback is registered on a channel, messages will never queue up. This means that if we have another consumer that’s pulling out messages one at a time, we may miss messages that come in between our receive calls.

(fork channel) returns a duplicate of channel, but consuming a message from one channel will not consume it from the other. This allows for the same message stream to be consumed at different rates without losing any messages.

> (def a (channel 1 2 3))
<== [1 2 3]
> (def b (fork a))
<== [1 2 3]
> (receive-all a println)
1
2
3
true
> a
<== []
> b
<== [1 2 3]

(fork n channel) will return a list of n-many copies.

Forked channels are copies of the original, but while messages enqueued into the original will appear in the copy, the inverse is not true. Similarly, closing the original channel will close the copy, but closing the copy will not affect the original.

Modifying message streams

You can map functions across a stream, or filter messages from a stream using (map* f channel) and (filter* predicate channel).

> (channel 1 2 3)
<== [1 2 3]
> (map* inc *1)
<== [2 3 4]
> (filter* even? *1)
<== [2 4]

Modifying a message stream involves consuming all messages from a channel, and returning a channel that represents the modified stream. We can create multiple modifications of the same stream:

> (def a (channel))
<== []
> (def b (map* inc a))
<== []
> (def c (map* dec a))
<== []
> (enqueue a 1 2 3)
true
> a
<== []
> b
<== [2 3 4]
> c
<== [0 1 2]

Consuming message streams

We can partially consume a stream using take* and take-while*.

> (def ch (channel 1 2 3 4 5))
<== [1 2 3 4 5]
> (take* 2 ch)
<== [1 2]
> ch
<== [3 4 5]
> (take-while* #(< % 5) ch)
<== [3 4]
> ch
<== [5]

If there are multiple consumers on the same channel, they’ll receive the same messages:

> (def ch (channel))
<== []
> (def a (take* 2 ch))
<== []
> (def b (take-while* #(< % 5) ch))
<== []
> (enqueue ch 1 2 3 4 5)
true
> a
<== [1 2]
> b
<== [1 2 3 4]
> ch
<== [5]

The channels returned by take* and take-while* will be closed once they have finished consuming the stream.

Reducing message streams

We can reduce over message streams using reduce* or reductions*.

reduce* returns a constant-channel that will emit a value once the source channel has been completely consumed:

> (reduce* + (closed-channel 1 2 3))
<== [6 ...]
> (reduce* conj [] (closed-channel 1 2 3))
<== [[1 2 3] ...]

reductions* returns a channel that emits the intermediate reduction for every new message:

> (reductions* + (channel 1 2 3))
<== [1 3 6]
> (reductions* conj [] (channel 1 2 3))
<== [[] [1] [1 2] [1 2 3]]
> (reductions* max (channel 1 2 3 2 1))
<== [1 2 3 3 3]

The channel returned by reductions* will be closed once the source channel has been consumed.

Connecting channels together

All of the functions presented so far take message streams and construct new message streams. But once we’ve created the message stream we want, what do we do with it? Sometimes, we want to consume it using receive or receive-all. Other times, we may simply wish to forward it to someone else, often via a channel representing a network connection. It’s easy to take all messages from one channel and send them to another:

> (def a (channel))
<== []
> (def b (channel))
<== []
> (receive-all a #(enqueue b %))
true
> (enqueue a 1 2 3)
true
> b
<== [1 2 3]

But what if the destination channel is closed? Enqueueing messages into a closed channel is a no-op, so this isn’t harmful, but this also means that we’ll never clean up after ourselves. If we repeatedly create and close destination channels, they’ll just stick around until the source channel is closed, receiving (and ignoring) messages.

However, if we simply stop consuming messages from the source channel, they’ll just queue up forever. This may be the desired behavior (if the destination channel closes, maybe we want to send the messages somewhere else), but consider this channel definition:

(->> (channel) (map* inc) (filter* even?))

This actually creates three channels: the original channel, the channel which maps inc over all messages, and the channel which filters out any odd numbers. Each feeds into the next, and if we close the filtered channel, we definitely don’t want messages to just queue up in the incremented channel.

(siphon source & destinations) is a declaration that the source channel exists to feed into the destination channels. All messages from source will be fed into the destinations. If all the destinations are closed, then the source is closed. Message stream operations like map* and filter* use siphon, so if we close the channel returned by the above example, all three channels will be closed.

siphon is idempotent; calling it with the same destination multiple times will not cause the same messages to be enqueued multiple times. When testing if there are no open destinations, siphon will also take into account callbacks registered via receive-all.

To create a channel which will not be closed by siphon when there are no consumers, use (permanent-channel & messages).

Using channels synchronously

receive and receive-all are both asynchronous methods. They return immediately, and the callbacks are executed by the same thread that enqueues the message.

There are several functions for interacting synchronously with channels:

wait-for-message

(wait-for-message channel) receives and returns a message from a channel. If there is not a message currently in the channel, the thread will wait until a message is enqueued. A timeout (in milliseconds) can be optionally specified, using (wait-for-message channel timeout). If the timeout elapses, it will throw a java.util.concurrent.TimeoutException.

> (def ch (channel 1))
<== [1]
> (wait-for-message channel)
1

channel-seq

(channel-seq channel) consumes all messages currently in the channel, and returns them as a sequence. If there are no messages, it returns an empty sequence. If a timeout is specified using (channel-seq channel timeout), the call will wait until the timeout elapses or the channel is drained, and then returns all messages it received in that time.

So if we are enqueuing ten messages per second into a channel, and call (channel-seq ch 1000), the call will wait for one second and then return a sequence of ten messages.

> (def ch (channel))
<== []
> (future
     (dotimes [i 100]
       (enqueue ch i)
       (Thread/sleep 100)))
nil
> (channel-seq ch 1000)
(0 1 2 3 4 5 6 7 8 9)

Trailing nils will not show up in the returned seq.

lazy-channel-seq

(lazy-channel-seq channel) returns a lazy sequence which will only receive messages as it is realized. If it is realized when no messages are in the channel, it will wait until a message is enqueued. (take 1 (lazy-seq-channel channel)) has identical behavior to (wait-for-message channel). If you specify a timeout, using (lazy-channel-seq channel timeout), it will wait that many milliseconds for each message. If the timeout elapses, the sequence will terminate.

This means that if we’re willing to dedicate a thread to a channel, we can iterate over all the messages that pass through it.

(doseq [msg (lazy-channel-seq ch)]
  (println "received" msg))

Trailing nils will not show up in the returned seq.

Further topics