This repository has been archived by the owner on Aug 5, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
bufferstream.coffee
224 lines (162 loc) Β· 4.85 KB
/
bufferstream.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
Stream = require 'stream'
###
BufferStream
A streaming interface for a buffer.
You can stream (eg, `pipe`) data in to a `BufferStream` object
and then `pipe` it into another object.
BufferStreams have a few benefits:
- Automatically buffer any amount of data (memory permitting).
- Automatically increase in size in a semi-intelligent fashion.
- No need to wait for `drain` on the write side.
- Makes efficient use of buffers
- You can `pause` a BufferStream and collect data for later.
###
class BufferStream extends Stream
MIN_BUFFER_SIZE = 65536
MAX_BUFFER_SIZE = 65536
constructor: (size = MIN_BUFFER_SIZE) ->
@writeIndex = 0
@readIndex = 0
@buffer = new Buffer(size)
@readBuffer = 1024*16
@encoding = 'utf8'
@emitStrings = false
# Writing
@_endWrite = false
@_endRead = false
@_destroySoon = false
@_destroyed = false
@writable = true
@readable = true
@paused = false
###
#
###
ensureSpace: (bytes) ->
bytesAvailable = @buffer.length - @writeIndex
# Current buffer might be large enough
if bytesAvailable >= bytes
return
# Allocate new buffer
currentSize = @writeIndex - @readIndex
desiredSize = currentSize + bytes
targetSize = MIN_BUFFER_SIZE
while targetSize < desiredSize
targetSize *= 2
# Copying to a new buffer prevents mangling of buffers
# that were returned by _read.
newBuffer = new Buffer targetSize
@buffer.copy newBuffer, 0, @readIndex, @writeIndex
@writeIndex = @writeIndex - @readIndex
@readIndex = 0
@buffer = newBuffer
true
###
# Internal Buffer Access
###
_writeBuffer: (buffer) ->
@ensureSpace buffer.length
buffer.copy @buffer, @writeIndex
@writeIndex = @writeIndex + buffer.length
true
_writeString: (string) ->
bytes = Buffer.byteLength string, @encoding
@ensureSpace bytes
@buffer.write string, @writeIndex, bytes, @encoding
@writeIndex = @writeIndex + bytes
true
_readBuffer: (maxBytes = 0) ->
if maxBytes == 0
@targetIndex = @writeIndex
else
@targetIndex = @readIndex + maxBytes
if @targetIndex > @writeIndex
@targetIndex = @writeIndex
buffer = @buffer.slice @readIndex, @targetIndex
@readIndex = @targetIndex
buffer
_getLength: () ->
@writeIndex - @readIndex
###
# Readable Stream Methods
###
setEncoding: (encoding) ->
if encoding != 'utf8'
throw new Error "Only UTF8 is a supported encoding"
throw new Error "Not Implemented."
@encoding = encoding
@emitStrings = true
pause: () ->
@paused = true
resume: () ->
@paused = false
@flush()
_flush: () ->
if @paused or @_destroyed
return
# Flush some data
@_emitData(@_endWrite or @_destroySoon)
empty = !@_getLength()
# Trigger End Event
if empty and @_endWrite
@_emitEnd()
if empty and @_endWrite and @_destroySoon
@destroy()
_emitData: (force = false) ->
bytesRemaining = @_getLength()
if (bytesRemaining >= MIN_BUFFER_SIZE) or force
data = @_readBuffer MAX_BUFFER_SIZE
if data.length
@emit "data", data
# There may be more data
@flush()
_emitEnd: () ->
@readable = false
@_endRead = true
@emit "end"
@destroy()
###
# Writable Stream Methods
###
write: (data, encoding = 'utf8') ->
if encoding != 'utf8'
throw new Error "Only supports utf8."
if @_endWrite or @_destroyed
throw new Error "Cannot write to stream, has been ended or destroyed."
if Buffer.isBuffer data
@_writeBuffer data
else
# String
@_writeString data, encoding
@flush()
true
end: (data, encoding) ->
if data?
@write data, encoding
@writable = false
@_endWrite = true
@flush()
###
# Readable, Writable Stream Methods
###
flush: () ->
process.nextTick () =>
@_flush()
destroy: () ->
if @_destroyed
return
@_destroyed = true
@writable = false
@readable = false
if !@_endRead
@emit "close"
@cleanup()
destroySoon: () ->
@end()
@_destroySoon = true
@flush()
cleanup: () ->
@readIndex = 0
@writeIndex = 0
@buffer = null
module.exports = BufferStream