/
conduits-conduits.html
311 lines (295 loc) · 25.5 KB
/
conduits-conduits.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
<h1>Review and Status</h1>
<p class="hascomments">This is part 4 in the conduits series. You can see the previous posts at:</p>
<ol id="file1715-conceptId-ol_8b450736-bc88-43ca-8b94-17fb0d6b1d77"><li><a href="http://www.yesodweb.com/blog/2011/12/resourcet">The Resource monad transformer</a></li>
<li><a href="http://www.yesodweb.com/blog/2011/12/conduits">Sources</a></li>
<li><a href="http://www.yesodweb.com/blog/2011/12/conduits-sink">Sinks</a></li>
</ol>
<p class="hascomments">This part covers the final major datatype in our package, conduits. While sources produce a
stream of data and sinks consume a stream, conduits transform a stream.</p>
<p class="hascomments">Also, just wanted to give an update on conduits activity. A few of the major enumerator
libraries have been converted over to conduits (<a href="http://hackage.haskell.org/package/http-conduit">http-conduit</a> and
<a href="http://hackage.haskell.org/package/xml-conduit">xml-conduit</a>) and released to Hackage. Also, the Github versions of WAI
(including Warp), Persistent and Yesod have been converted over as well.</p>
<p class="hascomments">Not only does the code all work, it's already allowing <a href="https://plus.google.com/u/0/118323070335349339293/posts/ALpCncffnsK">enhancements we hadn't even thought of</a>. All in all, the change to
conduits has been a very pleasant one.</p>
<h1>Types</h1>
<p class="hascomments">As we did previously, let's start off by looking at the types involved.</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: green;font-weight: bold;">data</span> <span style="color: blue;">ConduitResult</span> input m output =
<span style="color: blue;">Producing</span> (<span style="color: blue;">Conduit</span> input m output) [output]
| <span style="color: blue;">Finished</span> (<span style="color: blue;">Maybe</span> input) [output]
<span style="color: green;font-weight: bold;">data</span> <span style="color: blue;">Conduit</span> input m output = <span style="color: blue;">Conduit</span>
{ conduitPush :: input -> <span style="color: blue;">ResourceT</span> m (<span style="color: blue;">ConduitResult</span> input m output)
, conduitClose :: <span style="color: blue;">ResourceT</span> m [output]
}
</pre></pre>
<p class="hascomments">This should look very similar to what we've seen with sinks. A conduit can be pushed to, in
which case it returns a result. A result either indicates that it is still producing data, or
that it is finished. When a conduit is closed, it returns some more output.</p>
<p class="hascomments">But let's examine the idiosyncracies a bit. Like sinks, we can only push one piece of input at
a time, and leftover data may be 0 or 1 pieces. However, there are a few changes:</p>
<ul id="file1716-conceptId-ul_5aaf20d0-ba07-4960-9fbc-c9dd5241843b"><li>When producing (the equivalent of processing for a sink), we can return output. This is
because a conduit will product a new stream of output instead of producing a single output value
at the end of processing.</li>
<li>A sink always returns a single output value, while a conduit returns 0 or more outputs (a
list). To understand why, consider conduits such as <code>concatMap</code> (produces
multiple outputs for one input) and <code>filter</code> (returns 0 or 1 output for each
input).</li>
<li>We have no special constructor like <code>SinkNoData</code>. That's because we provide no
<code>Monad</code> instance for conduits. We'll see later how you can still use a familiar
Monadic approach to creating conduits.</li>
</ul>
<p class="hascomments">Overall conduits should seem very similar to what we've covered so far.</p>
<h1>Simple conduits</h1>
<p class="hascomments">We'll start off by defining some simple conduits that don't have any state.</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: green;font-weight: bold;">import</span> <span style="color: blue;">Prelude</span> <span style="color: green;font-weight: bold;">hiding</span> (map, concatMap)
<span style="color: green;font-weight: bold;">import</span> <span style="color: blue;">Data.Conduit</span>
<span style="color: gray;">-- A simple conduit that just passes on the data as-is.</span>
<span style="color: blue;font-weight: bold;">passThrough</span> :: <span style="color: blue;">Monad</span> m => <span style="color: blue;">Conduit</span> input m input
<span style="color: blue;font-weight: bold;">passThrough</span> = <span style="color: blue;">Conduit</span>
{ conduitPush = \input -> return $ <span style="color: blue;">Producing</span> passThrough [input]
, conduitClose = return []
}
<span style="color: gray;">-- map values in a stream</span>
<span style="color: blue;font-weight: bold;">map</span> :: <span style="color: blue;">Monad</span> m => (input -> output) -> <span style="color: blue;">Conduit</span> input m output
<span style="color: blue;font-weight: bold;">map</span> f = <span style="color: blue;">Conduit</span>
{ conduitPush = \input -> return $ <span style="color: blue;">Producing</span> (map f) [f input]
, conduitClose = return []
}
<span style="color: gray;">-- map and concatenate</span>
<span style="color: blue;font-weight: bold;">concatMap</span> :: <span style="color: blue;">Monad</span> m => (input -> [output]) -> <span style="color: blue;">Conduit</span> input m output
<span style="color: blue;font-weight: bold;">concatMap</span> f = <span style="color: blue;">Conduit</span>
{ conduitPush = \input -> return $ <span style="color: blue;">Producing</span> (concatMap f) $ f input
, conduitClose = return []
}
</pre></pre>
<h1>Stateful conduits</h1>
<p class="hascomments">Of course, not all conduits can be declared without state. Doing so on the bare metal is not
too difficult.</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: green;font-weight: bold;">import</span> <span style="color: blue;">Prelude</span> <span style="color: green;font-weight: bold;">hiding</span> (reverse)
<span style="color: green;font-weight: bold;">import</span> <span style="color: green;font-weight: bold;">qualified</span> <span style="color: blue;">Data.List</span>
<span style="color: green;font-weight: bold;">import</span> <span style="color: blue;">Data.Conduit</span>
<span style="color: green;font-weight: bold;">import</span> <span style="color: blue;">Control.Monad.Trans.Resource</span>
<span style="color: gray;">-- Reverse the elements in the stream. Note that this has the same downside as</span>
<span style="color: gray;">-- the standard reverse function: you have to read the entire stream into</span>
<span style="color: gray;">-- memory before producing any output.</span>
<span style="color: blue;font-weight: bold;">reverse</span> :: <span style="color: blue;">Resource</span> m => <span style="color: blue;">Conduit</span> input m input
<span style="color: blue;font-weight: bold;">reverse</span> =
mkConduit []
<span style="color: green;font-weight: bold;">where</span>
mkConduit state = <span style="color: blue;">Conduit</span> (push state) (close state)
push state input = return $ <span style="color: blue;">Producing</span> (mkConduit $ input : state) []
close state = return state
<span style="color: gray;">-- Same thing with sort: it will pull everything into memory</span>
<span style="color: blue;font-weight: bold;">sort</span> :: (<span style="color: blue;">Ord</span> input, <span style="color: blue;">Resource</span> m) => <span style="color: blue;">Conduit</span> input m input
<span style="color: blue;font-weight: bold;">sort</span> =
mkConduit []
<span style="color: green;font-weight: bold;">where</span>
mkConduit state = <span style="color: blue;">Conduit</span> (push state) (close state)
push state input = return $ <span style="color: blue;">Producing</span> (mkConduit $ input : state) []
close state = return $ <span style="color: blue;">Data.List.</span>sort state
</pre></pre>
<p class="hascomments">But we can do better. Just like <code>sourceState</code> and <code>sinkState</code>, we
have <code>conduitState</code> to simplify things.</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: green;font-weight: bold;">import</span> <span style="color: blue;">Prelude</span> <span style="color: green;font-weight: bold;">hiding</span> (reverse)
<span style="color: green;font-weight: bold;">import</span> <span style="color: green;font-weight: bold;">qualified</span> <span style="color: blue;">Data.List</span>
<span style="color: green;font-weight: bold;">import</span> <span style="color: blue;">Data.Conduit</span>
<span style="color: gray;">-- Reverse the elements in the stream. Note that this has the same downside as</span>
<span style="color: gray;">-- the standard reverse function: you have to read the entire stream into</span>
<span style="color: gray;">-- memory before producing any output.</span>
<span style="color: blue;font-weight: bold;">reverse</span> :: <span style="color: blue;">Resource</span> m => <span style="color: blue;">Conduit</span> input m input
<span style="color: blue;font-weight: bold;">reverse</span> =
conduitState [] push close
<span style="color: green;font-weight: bold;">where</span>
push state input = return $ <span style="color: blue;">StateProducing</span> (input : state) []
close state = return state
<span style="color: gray;">-- Same thing with sort: it will pull everything into memory</span>
<span style="color: blue;font-weight: bold;">sort</span> :: (<span style="color: blue;">Ord</span> input, <span style="color: blue;">Resource</span> m) => <span style="color: blue;">Conduit</span> input m input
<span style="color: blue;font-weight: bold;">sort</span> =
conduitState [] push close
<span style="color: green;font-weight: bold;">where</span>
push state input = return $ <span style="color: blue;">StateProducing</span> (input : state) []
close state = return $ <span style="color: blue;">Data.List.</span>sort state
</pre></pre>
<h1>Using conduits</h1>
<p class="hascomments">The way <code>Conduit</code>s interact with the rest of the package is via
<dfn>fusing</dfn>. A conduit can be fused into a source, producing a new source, fused into a
sink to produce a new sink, or fused with another conduit to produce a new conduit. It's best to
just look at the fusion operators.</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: gray;">-- Left fusion: source + conduit = source</span>
($=) :: (<span style="color: blue;">Resource</span> m, <span style="color: blue;">IsSource</span> src) => src m a -> <span style="color: blue;">Conduit</span> a m b -> <span style="color: blue;">Source</span> m b
<span style="color: gray;">-- Right fusion: conduit + sink = sink</span>
(=$) :: <span style="color: blue;">Resource</span> m => <span style="color: blue;">Conduit</span> a m b -> <span style="color: blue;">Sink</span> b m c -> <span style="color: blue;">Sink</span> a m c
<span style="color: gray;">-- Middle fusion: conduit + conduit = conduit</span>
(=$=) :: <span style="color: blue;">Resource</span> m => <span style="color: blue;">Conduit</span> a m b -> <span style="color: blue;">Conduit</span> b m c -> <span style="color: blue;">Conduit</span> a m c
</pre></pre>
<p class="hascomments">Using these operators is straightforward.</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: blue;font-weight: bold;">useConduits</span> = <span style="color: green;font-weight: bold;">do</span>
runResourceT
$ <span style="color: blue;">CL.</span>sourceList [<span style="color: teal;">1</span>..<span style="color: teal;">10</span>]
$= reverse
$= <span style="color: blue;">CL.</span>map show
$$ <span style="color: blue;">CL.</span>consume
<span style="color: gray;">-- equivalent to</span>
runResourceT
$ <span style="color: blue;">CL.</span>sourceList [<span style="color: teal;">1</span>..<span style="color: teal;">10</span>]
$$ reverse
=$ <span style="color: blue;">CL.</span>map show
=$ <span style="color: blue;">CL.</span>consume
<span style="color: gray;">-- and equivalent to</span>
runResourceT
$ <span style="color: blue;">CL.</span>sourceList [<span style="color: teal;">1</span>..<span style="color: teal;">10</span>]
$$ (reverse =$= <span style="color: blue;">CL.</span>map show)
=$ <span style="color: blue;">CL.</span>consume
</pre></pre>
<p class="hascomments">There is in fact one last way of expressing the same idea. I'll leave it as an exercise to the
reader to discover it.</p>
<p class="hascomments">It may seem like all these different approaches are redundant. While occasionally you can in
fact choose whichever approach you feel like using, in many cases you will need a specific
approach. For example:</p>
<ul id="file1719-conceptId-ul_c3efe67b-809e-4b36-a534-dace20035bc7"><li>If you have a stream of numbers, and you want to apply a conduit (e.g., <code>map
show</code>) to only some of the stream that will be passed to a specific sink, you'll want
to use the right fusion operator.</li>
<li>If you're reading a file, and want to parse the entire file as textual data, you'll want to
use left fusion to convert the entire stream.</li>
<li>If you want to create reusable conduits that combine together individual, smaller conduits,
you'll use middle fusion.</li>
</ul>
<h1>Data loss</h1>
<p class="hascomments">Let's forget about conduits for a moment. Instead, suppose we want to write a program- using
plain old lists- that will take a list of numbers, apply some kind of transformation to them,
take the first five transformed values and do something with them, and then do something else
with the remaining <b>non-transformed</b> values. For example, we want something like:</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: blue;font-weight: bold;">main</span> = <span style="color: green;font-weight: bold;">do</span>
<span style="color: green;font-weight: bold;">let</span> list = [<span style="color: teal;">1</span>..<span style="color: teal;">10</span>]
transformed = map show list
(begin, end) = splitAt <span style="color: teal;">5</span> transformed
untransformed = map read end
mapM_ putStrLn begin
print $ sum untransformed
</pre></pre>
<p class="hascomments">But clearly this isn't a good general solution, since we don't want to have to transform and
then untransform every element in the list. For one thing, we may not always have an inverse
function. Another issue is efficiency. In this case, we can write something more efficient:</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: blue;font-weight: bold;">main</span> = <span style="color: green;font-weight: bold;">do</span>
<span style="color: green;font-weight: bold;">let</span> list = [<span style="color: teal;">1</span>..<span style="color: teal;">10</span>]
(begin, end) = splitAt <span style="color: teal;">5</span> list
transformed = map show begin
mapM_ putStrLn transformed
print $ sum end
</pre></pre>
<p class="hascomments">Note the change: we perform our split before transforming any elements. This works because,
with <code>map</code>, we have a 1-to-1 correspondence between the input and output elements.
So splitting at 5 before or after mapping <code>show</code> is the same thing. But what
happens if we replace <code>map show</code> with something more devious.</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: blue;font-weight: bold;">deviousTransform</span> =
concatMap go
<span style="color: green;font-weight: bold;">where</span>
go <span style="color: teal;">1</span> = [show <span style="color: teal;">1</span>]
go <span style="color: teal;">2</span> = [show <span style="color: teal;">2</span>, <span style="color: green;">"two"</span>]
go <span style="color: teal;">3</span> = replicate <span style="color: teal;">5</span> <span style="color: green;">"three"</span>
go x = [show x]
</pre></pre>
<p class="hascomments">We no longer have the 1-to-1 correspondence. As a result, we can't use the second method. But
it's even worse: we can't use the first method either, since there's no inverse of our
<code>deviousTransform</code>.</p>
<p class="hascomments">There's only one solution to the problem that I'm aware of: transform elements one at a time.
The final program looks like this:</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: blue;font-weight: bold;">deviousTransform</span> <span style="color: teal;">1</span> = [show <span style="color: teal;">1</span>]
<span style="color: blue;font-weight: bold;">deviousTransform</span> <span style="color: teal;">2</span> = [show <span style="color: teal;">2</span>, <span style="color: green;">"two"</span>]
<span style="color: blue;font-weight: bold;">deviousTransform</span> <span style="color: teal;">3</span> = replicate <span style="color: teal;">5</span> <span style="color: green;">"three"</span>
<span style="color: blue;font-weight: bold;">deviousTransform</span> x = [show x]
<span style="color: blue;font-weight: bold;">transform5</span> :: [<span style="color: blue;">Int</span>] -> ([<span style="color: blue;">String</span>], [<span style="color: blue;">Int</span>])
<span style="color: blue;font-weight: bold;">transform5</span> list =
go [] list
<span style="color: green;font-weight: bold;">where</span>
go output (x:xs)
| newLen >= <span style="color: teal;">5</span> = (take <span style="color: teal;">5</span> output', xs)
| otherwise = go output' xs
<span style="color: green;font-weight: bold;">where</span>
output' = output ++ deviousTransform x
newLen = length output'
<span style="color: gray;">-- Degenerate case: not enough input to make 5 outputs</span>
go output [] = (output, [])
<span style="color: blue;font-weight: bold;">main</span> = <span style="color: green;font-weight: bold;">do</span>
<span style="color: green;font-weight: bold;">let</span> list = [<span style="color: teal;">1</span>..<span style="color: teal;">10</span>]
(begin, end) = transform5 list
mapM_ putStrLn begin
print $ sum end
</pre></pre>
<p class="hascomments">The final output of this program is<pre>1
2
two
three
three
49</pre>What's important
to note is that the number 3 is converted into five copies of the word "three", yet only two of
them show up in the output. The rest are discarded in the <code>take 5</code> call.</p>
<p class="hascomments">This whole exercise is just to demonstrate the issue of data loss in conduits. By forcing
conduits to accept only one input at a time, we avoid the issue of transforming too many elements
at once. That doesn't mean we don't lose <i>any</i> data: if a conduit produces too much output
for the receiving sink to handle, some of it may be lost.</p>
<p class="hascomments">To put all this another way: conduits avoid chunking to get away from data loss. This is not an
issue unique to conduits. If you look in the implementation of <code>concatMapM</code> for
<a href="http://hackage.haskell.org/package/enumerator">enumerator</a>, you'll see that it forces elements to be handled one at a time.
In conduits, we opted to force the issue at the type level.</p>
<h1>SequencedSink</h1>
<p class="hascomments">Suppose we want to be able to combine up existing conduits and sinks to produce a new, more
powerful conduit. For example, we want to write a conduit that takes a stream of numbers and sums
up every five. In other words, for the input <code>[1..50]</code>, it should result in the
sequence <code>[15,40,65,90,115,140,165,190,215,240]</code>. We can definitely do this with
the low-level conduit interface.</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: blue;font-weight: bold;">sum5Raw</span> :: <span style="color: blue;">Resource</span> m => <span style="color: blue;">Conduit</span> <span style="color: blue;">Int</span> m <span style="color: blue;">Int</span>
<span style="color: blue;font-weight: bold;">sum5Raw</span> =
conduitState (<span style="color: teal;">0</span>, <span style="color: teal;">0</span>) push close
<span style="color: green;font-weight: bold;">where</span>
push (total, count) input
| newCount == <span style="color: teal;">5</span> = return $ <span style="color: blue;">StateProducing</span> (<span style="color: teal;">0</span>, <span style="color: teal;">0</span>) [newTotal]
| otherwise = return $ <span style="color: blue;">StateProducing</span> (newTotal, newCount) []
<span style="color: green;font-weight: bold;">where</span>
newTotal = total + input
newCount = count + <span style="color: teal;">1</span>
close (total, count)
| count == <span style="color: teal;">0</span> = return []
| otherwise = return [total]
</pre></pre>
<p class="hascomments">But this is frustrating, since we already have all the tools we need to do this at a high
level! There's the <code>fold</code> sink for adding up the numbers, and the
<code>isolate</code> conduit which will only allow up to a certain number of elements to be
passed to a sink. Can't we combine these somehow?</p>
<p class="hascomments">The answer is a <code>SequencedSink</code>. The idea is to create a normal
<code>Sink</code>, except it returns a special output called a
<code>SequencedSinkResponse</code>. This value can emit new output, stop processing data, or
transfer control to a new conduit. (See the Haddocks for more information.) Then we can turn this
into a <code>Conduit</code> using the <code>sequenceSink</code> function. This function
also takes some state value that gets passed through to the sink.</p>
<p class="hascomments">So we can rewrite <code>sum5Raw</code> in a much more high-level manner.</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: blue;font-weight: bold;">sum5</span> :: <span style="color: blue;">Resource</span> m => <span style="color: blue;">Conduit</span> <span style="color: blue;">Int</span> m <span style="color: blue;">Int</span>
<span style="color: blue;font-weight: bold;">sum5</span> = sequenceSink () $ \() -> <span style="color: green;font-weight: bold;">do</span>
nextSum <- <span style="color: blue;">CL.</span>isolate <span style="color: teal;">5</span> =$ <span style="color: blue;">CL.</span>fold (+) <span style="color: teal;">0</span>
return $ <span style="color: blue;">Emit</span> () [nextSum]
</pre></pre>
<p class="hascomments">All of the <code>()</code> in there are simply the unused state variable being passed
around, they can be ignored. Otherwise, we're doing exactly what we want. We fuse
<code>isolate</code> to <code>fold</code> to get the sum of the next five elements from
the stream. We then emit that value, and start all over again.</p>
<p class="hascomments">Let's say we want to modify this slightly. We want to get the first 8 sums, and then pass
through the remaining values, multiplied by 2. We can keep track of how many values we've
returned in our state, and then use the <code>StartConduit</code> constructor to pass control
to the multiply-by-2 conduit next.</p>
<pre class="codeblock"><pre style="padding: 0;margin: 0;"><span style="color: blue;font-weight: bold;">sum5Pass</span> :: <span style="color: blue;">Resource</span> m => <span style="color: blue;">Conduit</span> <span style="color: blue;">Int</span> m <span style="color: blue;">Int</span>
<span style="color: blue;font-weight: bold;">sum5Pass</span> = sequenceSink <span style="color: teal;">0</span> $ \count -> <span style="color: green;font-weight: bold;">do</span>
<span style="color: green;font-weight: bold;">if</span> count == <span style="color: teal;">8</span>
<span style="color: green;font-weight: bold;">then</span> return $ <span style="color: blue;">StartConduit</span> $ <span style="color: blue;">CL.</span>map (* <span style="color: teal;">2</span>)
<span style="color: green;font-weight: bold;">else</span> <span style="color: green;font-weight: bold;">do</span>
nextSum <- <span style="color: blue;">CL.</span>isolate <span style="color: teal;">5</span> =$ <span style="color: blue;">CL.</span>fold (+) <span style="color: teal;">0</span>
return $ <span style="color: blue;">Emit</span> (count + <span style="color: teal;">1</span>) [nextSum]
</pre></pre>
<p class="hascomments">These are obviously very contrived examples, but I hope it makes clear the power and simplicity
available from this approach.</p>
<h1>Summary</h1>
<p class="hascomments">We're nearing the end of our conduits series. The last remaining major point is buffering and
resumable sources. (I would have included it here, but (a) it doesn't exactly fit with the rest
of the material and (b) it's 10:30 at night and I want to go to sleep.) In addition, we'll try to
cover some real-life use cases for conduits, and give examples of where libraries like
<a href="http://hackage.haskell.org/package/http-conduit">http-conduit</a> and the upcoming conduit-based <a href="http://hackage.haskell.org/package/wai">wai</a> can be
used together.</p>