-
Notifications
You must be signed in to change notification settings - Fork 3
/
BoundedQueue.purs
90 lines (81 loc) · 2.84 KB
/
BoundedQueue.purs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
-- | A concurrent FIFO data structure with bounded capacity.
-- |
-- | This datastructure is useful in various consumer/producer situations.
module Concurrent.BoundedQueue
( new
, write
, read
, isEmpty
, tryRead
, tryWrite
, module Export
) where
import Prelude
import Concurrent.BoundedQueue.Internal (BoundedQueue(..))
import Concurrent.BoundedQueue.Internal (BoundedQueue) as Export
import Data.Array (unsafeIndex)
import Data.Maybe (Maybe(..))
import Data.Unfoldable (replicateA)
import Effect.Aff (Aff)
import Effect.Aff.AVar as AVar
import Partial.Unsafe (unsafePartial)
-- | Creates a new `BoundedQueue` with the given capacity,
new ∷ ∀ a. Int → Aff (BoundedQueue a)
new size = do
contents ← replicateA size AVar.empty
readPos ← AVar.new 0
writePos ← AVar.new 0
pure (BoundedQueue { size, contents, readPos, writePos })
-- | Writes an element to the given queue. Will block if the queue is full until
-- | someone reads from it.
write ∷ ∀ a. BoundedQueue a → a → Aff Unit
write (BoundedQueue q) a = do
w ← AVar.take q.writePos
AVar.put a (unsafePartial unsafeIndex q.contents w)
AVar.put ((w + 1) `mod` q.size) q.writePos
-- | Reads an element from the given queue, will block if the queue is empty,
-- | until someone writes to it.
read ∷ ∀ a. BoundedQueue a → Aff a
read (BoundedQueue q) = do
r ← AVar.take q.readPos
v ← AVar.take (unsafePartial unsafeIndex q.contents r)
AVar.put ((r + 1) `mod` q.size) q.readPos
pure v
-- | Checks whether the given queue is empty. Never blocks.
isEmpty ∷ ∀ a. BoundedQueue a → Aff Boolean
isEmpty (BoundedQueue q) = do
AVar.tryRead q.readPos >>= case _ of
Nothing → pure true
Just r → AVar.tryRead (unsafePartial unsafeIndex q.contents r) <#> case _ of
Nothing → true
Just _ → false
-- | Attempts to read an element from the given queue. If the queue is empty,
-- | returns `Nothing`.
-- |
-- | *Careful!* If other readers are blocked on the queue `tryRead` will also
-- | block.
tryRead ∷ ∀ a. BoundedQueue a → Aff (Maybe a)
tryRead (BoundedQueue q) = do
r ← AVar.take q.readPos
AVar.tryTake (unsafePartial unsafeIndex q.contents r) >>= case _ of
Just v → do
AVar.put ((r + 1) `mod` q.size) q.readPos
pure (Just v)
Nothing → do
AVar.put r q.readPos
pure Nothing
-- | Attempts to write an element into the given queue. If the queue is full,
-- | returns `false` otherwise `true`.
-- |
-- | *Careful!* If other writers are blocked on the queue `tryWrite` will also
-- | block.
tryWrite ∷ ∀ a. BoundedQueue a → a → Aff Boolean
tryWrite (BoundedQueue q) a = do
w ← AVar.take q.writePos
AVar.tryPut a (unsafePartial unsafeIndex q.contents w) >>= if _
then do
AVar.put ((w + 1) `mod` q.size) q.writePos
pure true
else do
AVar.put w q.writePos
pure false