/
GetStream.purs
92 lines (86 loc) · 3.31 KB
/
GetStream.purs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
-- A majority of the below code was ported from this JavaScript library
-- https://github.com/sindresorhus/get-stream
-- Copyright `get-stream` contributors
-- MIT License: https://opensource.org/license/mit/
module Node.Library.Execa.GetStream where
import Prelude
import Data.Array as Array
import Data.Either (Either(..))
import Data.Int (toNumber)
import Data.Maybe (Maybe(..), fromMaybe, maybe)
import Data.Nullable (Nullable, toMaybe)
import Data.Number (infinity)
import Effect (Effect)
import Effect.Aff (Aff, Error, error, makeAff, nonCanceler)
import Effect.Class (liftEffect)
import Effect.Ref as Ref
import Effect.Uncurried (EffectFn1, EffectFn3, mkEffectFn1, runEffectFn3)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Buffer.Immutable (ImmutableBuffer)
import Node.Library.Execa.Utils (newPassThroughStream)
import Node.Stream (Readable, Writable, Duplex, onData)
import Unsafe.Coerce (unsafeCoerce)
type Interface =
{ getBufferedValue :: Effect Buffer
, getBufferedLength :: Effect Number
, stream :: Duplex
}
getStreamBuffer
:: forall r
. Readable r
-> { maxBuffer :: Maybe Number }
-> Aff { buffer :: ImmutableBuffer, inputError :: Maybe Error }
getStreamBuffer inputStream initialOptions = do
let options = { maxBuffer: fromMaybe infinity initialOptions.maxBuffer }
interface <- liftEffect bufferStream
-- PureScript implementation note:
-- Execa gets the buffered data whether the stream
-- fails or not. It also destroys the input stream if an error occurs
-- but we'll handle that outside of this function.
makeAff \cb -> do
runEffectFn3 pipeline inputStream interface.stream $ mkEffectFn1 \err -> do
bufferedData <- interface.getBufferedValue
buff <- Buffer.unsafeFreeze bufferedData
cb $ Right { buffer: buff, inputError: toMaybe err }
onData interface.stream \_ -> do
bufferedLen <- interface.getBufferedLength
when (bufferedLen > options.maxBuffer) do
bufferedData <- interface.getBufferedValue
buff <- Buffer.unsafeFreeze bufferedData
cb $ Right
{ buffer: buff
, inputError: Just $ error $ maybe
("Max buffer exceeded")
(\size -> "Max buffer size exceeded. Buffer size was: " <> show size)
initialOptions.maxBuffer
}
pure nonCanceler
where
-- PureScript implementation note:
-- - object mode == false due to 'buffer' usage
-- - encoding = null due to 'buffer' usage
bufferStream = do
chunksRef <- Ref.new []
lengthRef <- Ref.new 0.0
stream <- newPassThroughStream
onData stream \buf -> do
Ref.modify_ (\chunks -> Array.snoc chunks buf) chunksRef
bufLen <- Buffer.size buf
Ref.modify_ (_ + (toNumber bufLen)) lengthRef
pure
{ getBufferedValue: do
chunks <- Ref.read chunksRef
len <- Ref.read lengthRef
let
-- PureScript implementation note:
-- maxBufferLength = 2^32
-- PS Int type = 2^31
asTooLargeInt :: Number -> Int
asTooLargeInt = unsafeCoerce
Buffer.concat' chunks $ asTooLargeInt len
, getBufferedLength: Ref.read lengthRef
, stream
}
foreign import maxBufferLength :: Number
foreign import pipeline :: forall w r. EffectFn3 (Readable w) (Writable r) (EffectFn1 (Nullable Error) Unit) Unit