/
Actors.lhs
705 lines (609 loc) · 25.3 KB
/
Actors.lhs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
> {-# LANGUAGE GeneralizedNewtypeDeriving, MultiParamTypeClasses, TypeFamilies, TypeOperators #-}
This module exports a simple, idiomatic implementation of the Actor Model.
> module Control.Concurrent.Actors (
>
> {- |
> Here we demonstrate a binary tree of actors that supports concurrent
> insert and query operations:
>
> > import Control.Concurrent.Actors
> > import Control.Applicative
> > import Control.Concurrent.MVar
> >
> > -- the actor equivalent of a Nil leaf node:
> > nil :: Behavior Operation
> > nil = Receive $ do
> > (Query _ var) <- received
> > send var False -- signal Int is not present in tree
> > return nil -- await next message
> >
> > <|> do -- else, Insert received
> > l <- spawn nil -- spawn child nodes
> > r <- spawn nil
> > branch l r . val <$> received -- create branch from inserted val
> >
> > -- a branch node with a value 'v' and two children
> > branch :: Node -> Node -> Int -> Behavior Operation
> > branch l r v = loop where
> > loop = Receive $ do
> > m <- received
> > case compare (val m) v of
> > LT -> send l m
> > GT -> send r m
> > EQ -> case m of -- signal Int present in tree:
> > (Query _ var) -> send var True
> > _ -> return ()
> > return loop
> >
> > type Node = Mailbox Operation
> >
> > -- operations supported by the network:
> > data Operation = Insert { val :: Int }
> > | Query { val :: Int
> > , sigVar :: Mailbox Bool }
> >
> > insert :: Node -> Int -> IO ()
> > insert t = send t . Insert
> >
> > query :: Node -> Int -> IO Bool
> > query t a = do
> > -- turn an MVar into a Mailbox actors can send to with 'out'
> > v <- newEmptyMVar
> > send t (Query a $ out v)
> > takeMVar v
>
> You can use the tree defined above in GHCi:
>
> >>> :l TreeExample.hs
> Ok
> >>> t <- spawn nil
> >>> query t 7
> False
> >>> insert t 7
> >>> query t 7
> True
>
> -}
>
> -- * Actor Behaviors
> Action()
> , Behavior(..)
> -- ** Composing and Transforming Behaviors
> , (<.|>)
>
> -- * Available actions
> -- ** Message passing
> , Mailbox()
> , out
> , send , send' , (<->)
> , received
> , guardReceived
> -- ** Spawning actors
> , Sources(), Joined
> --, (:-:)(..)
> , spawn
> -- *** Mailboxes and scoping
> {- |
> Straightforward use of the 'spawn' function will be sufficient for
> forking actors in most cases, but launching mutually-communicating actors
> presents a problem.
>
> In cases where a 'Behavior' needs access to its own 'Mailbox' or that of
> an actor that must be forked later, the 'MonadFix' instance should be
> used. GHC\'s \"Recursive Do\" notation make this especially easy:
>
> > {-# LANGUAGE DoRec #-}
> > beh = Receive $ do
> > i <- received
> > -- similar to the scoping in a "let" block:
> > rec b1 <- spawn (senderTo b2)
> > b2 <- spawn (senderTo b1)
> > b3 <- spawn (senderTo b3)
> > -- send initial messages to actors spawned above:
> > send b3 i
> > send b2 "first"
> > yield
>
> -}
>
> -- ** Building an actor computation
> {- |
> An actor computation can be halted immediately by calling 'yield',
> a synonym for 'mzero'. When an 'Action' calling @yield@ is composed with
> another using @\<|\>@ the second takes over processing the /same/ input
> which the former @yield@-ed on.
>
> Here is an example of a computation using 'guard' which returns @mzero@ if
> the test is false:
>
> > foo c n = Receive $
> > do i <- received
> > guard (n<10)
> > send c i
> > return (foo c $ n+1)
> >
> > <|> do i <- received -- same as the 'i' above
> > send c $ "TENTH INPUT: "++i
> > return (foo c 0)
>
> The @Monoid@ instance for 'Behavior' works on the same principle.
> -}
> , yield
> , receive
>
> -- ** Composing and Transforming Mailboxes
> {- |
> We offer some operations to split and combine 'Mailbox'es of sum and
> product types.
> -}
> , coproductMb
> , contraProduct
> , zipMb
> , contraFanin
> , contraFanout
>
> -- * Utility functions
> {- |
> These are useful for debugging 'Behavior's
> -}
> , runBehavior_
> , runBehavior
>
> -- * Useful predefined @Behavior@s
> , printB
> , putStrB
> , signalB
> , constB
>
> ) where
>
> import Control.Monad
> import Control.Applicative
> import Control.Monad.Reader(ask)
> import qualified Data.Foldable as F
> import Control.Monad.IO.Class
> import Control.Concurrent(forkIO)
> import Data.Monoid
> import Control.Arrow((***),(&&&),(|||))
>
> -- from the contravariant package
> import Data.Functor.Contravariant
> -- from the chan-split package
> import Control.Concurrent.Chan.Split
> -- internal:
> import Control.Concurrent.Actors.Behavior
TODO
-----
0.4.1
- performance tuning / benchmarking:
- first optimize TreeExample, by way of Benchmark.hs
- criterion and profiling w/r/t lib.:
- play with underlying Behavior Monad stack?
- be more controlled about the source lists (do once before defaultMain), use 'evaluate'
- run with +RTS -s and make sure everything is 0
- see if case-based nil is better
- try storing the same chan (observable sharing) in each node, and use for streaming
send an MVar with messages for the query operation
- get accurate baseline comparison between actors and set
- use INLINABLE
- test again with SPECIALIZE instead
- try adding INLINE to all with higher-order args (or higher-order newtype wrappers)
and make sure our LHS looks good for inlining
- specialize `Action i (Behavior i)` or allow lots of unfolding... ? Optimize those loops, somehow. Rewrite rules?
- look at "let floating" and INLINEABLE to get functions with "fully-applied (syntactically) LHS"
- split-chan ChItem in heap profile -hy
- take a look at threadscope for random tree test
- forkOnIO to keep communicating actors on same HEC?
- compare with previous version (cp to /tmp to use previous version)
Later:
- make that also work with Behaviors of arbitrary input types using new GHC generics?
- can we make joins work with arbitrary types using Generics?
- can we support Either in Sources?
- get complete code coverage into simple test module
- interesting solution to exit detection:
http://en.wikipedia.org/wiki/Huang%27s_algorithm
- dynamically-bounded chans, based on number of writers to control
producer/consumer issues? Possibly add more goodies to chan-split
see: http://hackage.haskell.org/package/stm-chans
- look at what Functor/Contravariant for read/write ends, and corresponding
natural transformations those allow suggest about limits of Actor model
and investigate inverse of Actors (Reducers?)
- create an experimental Collectors sub-module
- investigate ways of positively influencing thread scheduling based on
actor work agenda?
- export some more useful Actors and global thingies
- 'loop' which keeps consuming (is this provided by a class?)
- function returning an actor to "load balance" inputs over multiple
actors
- an actor that sends a random stream?
- a pre-declared Mailbox for IO?
Eventually:
- some sort of exception handling technique a.la erlang
- abilty to launch an actor that automatically "replicates" if its chan needs more
consumers. This should probably be restricted to an `Action i ()` that we
repeat.
- can we automatically throttle producers on an Actor system level,
optimizing message flow with some algorithm?
- provide an "adapter" for amazon SQS, allowing truly distributed message
passing
- play w/ distributed-process (cloud haskell)
- consider: combining TChans, where values are popped off when available,
for chan-split?
- look at ways we can represent network IO as channels to interface with
this. E.g:
- https://github.com/ztellman/aleph
- http://akka.io/ (scala remote actors lib)
- http://www.zeromq.org/intro:read-the-manual
- interface to amazon SQS
- http://msgpack.org/
- "shared memory" approaches?
- cloudhaskell, haskell-mpi, etc. see:
http://stackoverflow.com/questions/8362998/distributed-haskell-state-of-the-art-in-2011
-Behavior -> enumeratee package translator (and vice versa)
(maybe letting us use useful enumerators)
...also now pipes, conduits, etc. etc.
- study ambient/join/fusion calculi for clues to where it's really at
CHAN TYPES
==========
By defining our Mailbox as the bare "send" operation we get a very convenient
way of defining contravariant instance, without all the overhead we had before,
while ALSO now supporting some great natural transformations on Mailboxes &
Messages.
We use this newtype to get 'Contravariant' for free, possibly revealing other
insights:
> type Sender a = Op (IO ()) a
>
> mailbox :: (a -> IO ()) -> Mailbox a
> mailbox = Mailbox . Op
>
> runMailbox :: Mailbox a -> a -> IO ()
> runMailbox = getOp . sender
>
> mkMessages :: OutChan a -> Messages a
> mkMessages = Messages . readChan
>
> -- | One can 'send' a messages to a @Mailbox@ where it will be processed
> -- according to an actor\'s defined 'Behavior'
> --
> -- > type Joined (Mailbox a) = a
> newtype Mailbox a = Mailbox { sender :: Sender a }
> deriving (Contravariant)
Previously we were polymorphic in SplitChan in many places. Now that spawn
has polymorphic result type we simply export a function to convert from
any SplitChan type. Otherwise we'd have to provide type annotations everywhere.
I liked the previous version, since a send within an actor is semantically-
identical regardless of the channel type.
> -- | Convert the input side of a @SplitChan@ to a @Mailbox@. Useful for
> -- sending data out from an actor system via a channel created in IO.
> out :: (SplitChan i x)=> i a -> Mailbox a
> out = mailbox . writeChan
We don't need to expose this thanks to the miracle of MonadFix and recursive do,
but this can be generated via the NewSplitChan class below if the user imports
the library:
> newtype Messages a = Messages { readMsg :: IO a }
> deriving (Functor)
>
> -- Not sure how to derive this or if possible:
> instance SplitChan Mailbox Messages where
> readChan = readMsg
> writeChan = runMailbox
>
> instance NewSplitChan Mailbox Messages where
> newSplitChan = (out *** mkMessages) `fmap` newSplitChan
For Mailboxes we can define all transformations associated with Cartesian and
CoCartesian (from 'categories') but where the category is Dual (->), i.e. the
order of the transformation is flipped.
I don't know if/how these precisely fit into an existing class, but for now here
are a handful of useful combinators:
> coproductMb :: Mailbox a -> Mailbox b -> Mailbox (Either a b)
> coproductMb m1 m2 = mailbox $ either (writeChan m1) (writeChan m2)
>
> zipMb :: Mailbox a -> Mailbox b -> Mailbox (a,b)
> zipMb m1 m2 = mailbox $ \(a,b) -> writeChan m1 a >> writeChan m2 b
>
The naming here doesn't make much sense now that these are general. Keep for
now and hope we can deprecate in favor of functionality in one of E.K.'s
libs?
> -- | > contraProduct = contramap Left &&& contramap Right
> contraProduct :: Contravariant f => f (Either a b) -> (f a, f b)
> contraProduct = contramap Left &&& contramap Right
>
> -- | > contraFanin f g = contramap (f ||| g)
> contraFanin :: Contravariant f => (b -> a) -> (c -> a) -> f a -> f (Either b c)
> contraFanin f g = contramap (f ||| g)
>
> -- | > contraFanout f g = contramap (f &&& g)
> contraFanout :: Contravariant f=> (a -> b) -> (a -> c) -> f (b,c) -> f a
> contraFanout f g = contramap (f &&& g)
ACTIONS
=======
Functionality is based on our underlying type classes, but users shouldn't need
to import a bunch of libraries to get basic Behavior building functionality.
> infixl 3 <.|>
>
> -- | Sequence two @Behavior@s. After the first 'yield's the second takes over,
> -- discarding the message the former was processing. See also the 'Monoid'
> -- instance for @Behavior@.
> --
> -- > b <.|> b' = b `mappend` constB b'
> (<.|>) :: Behavior i -> Behavior i -> Behavior i
> b <.|> b' = b `mappend` constB b'
The 'yield' function is so named because it is "relinquishing control", i.e. I
think the name reminds of the functionality of <|> and mappend (the last input
is passed along) and also has the meaning "quit".
Its similarity (or not) to the 'enumerator' function of the same same may be a
source of confusion (or the opposite)... I'm not sure.
> -- | Immediately give up processing an input, perhaps relinquishing the input
> -- to an 'Alternative' computation or exiting the actor.
> --
> -- > yield = mzero
> yield :: Action i a
> yield = mzero
>
> -- | Useful to make defining a continuing Behavior more readable as a
> -- \"receive block\", e.g.
> --
> -- > pairUpAndSendTo mb = Receive $ do
> -- > a <- received
> -- > receive $ do
> -- > b <- received
> -- > send mb (b,a)
> -- > return (pairUpAndSendTo mb)
> --
> -- Defined as:
> --
> -- > receive = return . Receive
> receive :: Action i (Behavior i) -> Action i (Behavior i)
> receive = return . Receive
> -- | Return the message received to start this 'Action' block. /N.B/ the value
> -- returned here does not change between calls in the same 'Action'.
> --
> -- > received = ask
> received :: Action i i
> received = ask
> -- | Return 'received' message matching predicate, otherwise 'yield'.
> --
> -- > guardReceived p = ask >>= \i-> guard (p i) >> return i
> guardReceived :: (i -> Bool) -> Action i i
> guardReceived p = ask >>= \i-> guard (p i) >> return i
> -- | Send a message asynchronously to an actor receiving from Mailbox. See
> -- also 'out' for converting other types of chans to 'Mailbox'.
> --
> -- > send b = liftIO . writeChan b
> send :: (MonadIO m)=> Mailbox a -> a -> m ()
> send b = liftIO . writeChan b
> -- | A strict 'send':
> --
> -- > send' b a = a `seq` send b a
> send' :: (MonadIO m)=> Mailbox a -> a -> m ()
> send' b a = a `seq` send b a
> infixr 1 <->
>
> -- | Like 'send' but supports chaining sends by returning the Mailbox.
> -- Convenient for initializing an Actor with its first input after spawning,
> -- e.g.
> --
> -- > do mb <- 0 <-> spawn foo
> (<->) :: (MonadIO m)=> a -> m (Mailbox a) -> m (Mailbox a)
> a <-> mmb = mmb >>= \mb-> send mb a >> return mb
FORKING AND RUNNING ACTORS:
===========================
The strict Actor Model is limited in expressiveness, in that it doesn't allow
for a method of synchronization, e.g. we cannot have an actor that pairs up
incoming messages from two different channels. I think this leads to nonsense
like "selective receive" in Erlang (disclaimer: IANA erlang-xpert).
I've realized that I can keep all the nice semantics of actors (i.e. this
change doesn't affect Behaviors) , while supporting synchronization and
simplifying the API all at the same time! This method is inspired by the "join
calculus", and I'm sure this isn't a new idea.
To support this elegantly in the API, we define a class with associated type,
and make 'spawn' the method. This allows the pattern of joins to be determined
polymorphically based on users' pattern match!
NOTE: My original goal was to use GHC.Generic to support arbitrary joins on
any Generic a=> Behavior a ...but it wasn't coming together. Let me know
if you can figure it out.
> -- | We extend the actor model to support joining (or synchronizing) multiple
> -- 'Mailbox'es to a single 'Behavior' input type, using a new class with an
> -- associated type. Functionality is best explained by example:
> --
> -- Spawn an actor returning it's 'Mailbox', and send it its first message:
> --
> -- > sumTuple :: Behavior (Int, Int)
> -- >
> -- > do b <- spawn sumTuple
> -- > send b (4, 1)
> -- > ...
> --
> -- But now we would like our @sumTuple@ actor to receive each number from a different
> -- concurrent actor:
> --
> -- > do (b1, b2) <- spawn sumTuple
> -- > b3 <- spawn (multipliesBy2AndSendsTo b1)
> -- > send b3 2
> -- > send b2 1
> -- > ...
> --
> -- Lastly spawn an actor that starts immediately on an infinite supply of @()@s,
> -- and supplies an endless stream of @Int@s to @sumTuple@
> --
> -- > do (b1, b2) <- spawn sumTuple
> -- > () <- spawn (sendsIntsTo b2)
> -- > send b1 4
> -- > ...
> class Sources s where
> type Joined s
> newJoinedChan :: IO (s, Messages (Joined s)) -- private
Spawn uses un-exported newJoinedChan where we used newSplitChan previously:
> -- | Fork an actor performing the specified 'Behavior'. /N.B./ an actor
> -- begins execution of its 'headBehavior' only after a message becomes
> -- available to process; for sending an initial message to an actor right
> -- after 'spawn'ing it, ('<|>') can be convenient.
> spawn :: (MonadIO m, Sources s)=> Behavior (Joined s) -> m s
> spawn b = liftIO $ do
> (srcs, msgs) <- newJoinedChan
> let runner b' = readChan msgs >>= runBehaviorStep b' >>= F.mapM_ runner
> void $ forkIO (runner b)
> return srcs
...and our instance for Mailbox completes previous simple spawn functionality:
> instance Sources (Mailbox a) where
> type Joined (Mailbox a) = a
> newJoinedChan = newSplitChan
By adding an instance for (,) synchronization and wonderful new things become
possible!
> instance (Sources a, Sources b)=> Sources (a,b) where
> type Joined (a,b) = (Joined a, Joined b)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> let m' = Messages $ liftM2 (,) (readMsg ma) (readMsg mb)
> return ((sa,sb), m')
We'll add instances up to 7-tuples, since that seems to be standard, but people
can use nested tuples:
> instance (Sources a, Sources b, Sources c, Sources d, Sources e, Sources f, Sources g)=> Sources (a,b,c,d,e,f,g) where
> type Joined (a,b,c,d,e,f,g) = (Joined a, Joined b,Joined c,Joined d,Joined e,Joined f,Joined g)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> (sc, mc) <- newJoinedChan
> (sd, md) <- newJoinedChan
> (se, me) <- newJoinedChan
> (sf, mf) <- newJoinedChan
> (sg, mg) <- newJoinedChan
> let m' = Messages $ (,,,,,,) <$> readMsg ma <*> readMsg mb <*> readMsg mc <*> readMsg md <*> readMsg me <*> readMsg mf <*> readMsg mg
> return ((sa,sb,sc,sd,se,sf,sg), m')
>
> instance (Sources a, Sources b, Sources c, Sources d, Sources e, Sources f)=> Sources (a,b,c,d,e,f) where
> type Joined (a,b,c,d,e,f) = (Joined a, Joined b,Joined c,Joined d,Joined e,Joined f)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> (sc, mc) <- newJoinedChan
> (sd, md) <- newJoinedChan
> (se, me) <- newJoinedChan
> (sf, mf) <- newJoinedChan
> let m' = Messages $ (,,,,,) <$> readMsg ma <*> readMsg mb <*> readMsg mc <*> readMsg md <*> readMsg me <*> readMsg mf
> return ((sa,sb,sc,sd,se,sf), m')
>
> instance (Sources a, Sources b, Sources c, Sources d, Sources e)=> Sources (a,b,c,d,e) where
> type Joined (a,b,c,d,e) = (Joined a, Joined b,Joined c,Joined d,Joined e)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> (sc, mc) <- newJoinedChan
> (sd, md) <- newJoinedChan
> (se, me) <- newJoinedChan
> let m' = Messages $ (,,,,) <$> readMsg ma <*> readMsg mb <*> readMsg mc <*> readMsg md <*> readMsg me
> return ((sa,sb,sc,sd,se), m')
>
> instance (Sources a, Sources b, Sources c, Sources d)=> Sources (a,b,c,d) where
> type Joined (a,b,c,d) = (Joined a, Joined b,Joined c,Joined d)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> (sc, mc) <- newJoinedChan
> (sd, md) <- newJoinedChan
> let m' = Messages $ (,,,) <$> readMsg ma <*> readMsg mb <*> readMsg mc <*> readMsg md
> return ((sa,sb,sc,sd), m')
>
> instance (Sources a, Sources b, Sources c)=> Sources (a,b,c) where
> type Joined (a,b,c) = (Joined a, Joined b,Joined c)
> newJoinedChan = do
> (sa, ma) <- newJoinedChan
> (sb, mb) <- newJoinedChan
> (sc, mc) <- newJoinedChan
> let m' = Messages $ (,,) <$> readMsg ma <*> readMsg mb <*> readMsg mc
> return ((sa,sb,sc), m')
I give up for now on defining an instance for sums. This probably requires a
different formulation for class
...and we also support Either as a source, since this is the only way to get a joined
product of sums; otherwise users could just use 'contraProduct', a pure operation.
> -- | > type Joined (a :-: b) = Either (Joined a) (Joined b)
> --
> -- A product of 'Sources' corresponding to a @Behavior (Either a b)@. Allows
> -- 'spawn'-ing a @Behavior@ which receives a sum of perhaps-'Joined' products.
> --
> -- See also: 'contraProduct'
> data a :-: b = (:-:) { sourceLeft :: a
> , sourceRight :: b }
>
> instance (Sources a, Sources b)=> Sources (a :-: b) where
> type Joined (a :-: b) = Either (Joined a) (Joined b)
> --newJoinedChan :: IO (a :-: b, Messages (Either (Joined a) (Joined b)))
> newJoinedChan = do
> (src, msgs) <- newSplitChan
> let (s1, s2) = contraProduct src
> return (decompose s1 :-: decompose s2, msgs)
class Sources s where
type Joined s :: *
newJoinedChan :: IO (s, Messages (Joined s))
decomp :: Mailbox (a,b) -> (Mailbox a, Mailbox b)
decomp :: Mailbox a -> Mailbox a
decomp :: Mailbox (Either a b) -> (Mailbox a :-: Mailbox b)
We can subsume the old 'spawn_' functionality in our class as well, and imagine
returning an infinite source of ()s:
> -- | > type Joined () = ()
> --
> -- Represents an endless supply of @()@s. Allows 'spawn'-ing
> -- a @Behavior ()@ that starts immediately and loops until it 'yield'-s, e.g.
> --
> -- > do () <- spawn startsImmediately -- :: Behavior ()
> instance Sources () where
> type Joined () = ()
> newJoinedChan =
> return ((), Messages $ return ())
Replace polymorphic craziness with old spawn_ function, when we can:
> {-# RULES "spawn_" spawn = spawn_ #-}
> spawn_ :: (MonadIO m)=> Behavior () -> m ()
> spawn_ = liftIO . void . forkIO . runBehavior_
NOTE: spawnReading removed in 0.4, since it was unused (by me), exposed
confusing implementation details, supports e.g. launching an actor on a
bounded channel which violates the Model, and doesn't provide an effective
way to do much cool stuff like reading from a network socket.
Instead I guess we should expose enough internals in a separate module to
support future cool stuff.
RUNNING ACTORS
--------------
These work in IO, returning () when the actor finishes with done/mzero:
> -- | Run a @Behavior ()@ in the main thread, returning when the computation
> -- exits.
> runBehavior_ :: Behavior () -> IO ()
> runBehavior_ b = runBehavior b $ repeat ()
>
> -- | run a 'Behavior' in the IO monad, taking its \"messages\" from the list.
> runBehavior :: Behavior a -> [a] -> IO ()
> runBehavior b (a:as) = runBehaviorStep b a >>= F.mapM_ (`runBehavior` as)
> runBehavior _ _ = return ()
USEFUL GENERAL BEHAVIORS
========================
> -- | Prints all messages to STDOUT in the order they are received,
> -- 'yield'-ing /immediately/ after @n@ inputs are printed.
> printB :: (Show s, Eq n, Num n)=> n -> Behavior s
> printB = contramap (unlines . return . show) . putStrB
We want to yield right after printing the last input to print. This lets us
compose with signalB for instance:
write5ThenExit = putStrB 5 `mappend` signalB c
and the above will signal as soon as it has printed the last message. If we try
to define this in a more traditional recursive way the signal above would only
happen as soon as the sixth message was received.
For now we allow negative
> -- | Like 'printB' but using @putStr@.
> putStrB :: (Eq n, Num n)=> n -> Behavior String
> putStrB 0 = mempty --special case when called directly w/ 0
> putStrB n = Receive $ do
> s <- received
> liftIO $ putStr s
> guard (n /= 1)
> return $ putStrB (n-1)
> -- | Sends a @()@ to the passed chan. This is useful with 'mappend' for
> -- signalling the end of some other 'Behavior'.
> --
> -- > signalB c = Receive (send c () >> yield)
> signalB :: Mailbox () -> Behavior i
> signalB c = Receive (send c () >> yield)
> -- | A @Behavior@ that discard its first input, returning the passed Behavior
> -- for processing subsequent inputs. Useful with 'Alternative' or 'Monoid'
> -- compositions when one wants to ignore the leftover 'yield'ed message.
> --
> -- > constB = Receive . return
> constB :: Behavior i -> Behavior i
> constB = Receive . return