-
Notifications
You must be signed in to change notification settings - Fork 3
/
streams.coffee
174 lines (142 loc) · 4.38 KB
/
streams.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
sync = require 'synchronize'
stream = require 'stream'
util = require 'util'
Buffer = require('buffer').Buffer
# ReadStream.
exports.ReadStream = ReadStream = (args...) ->
[@openFn, @closeFn] = if args[0] instanceof Function then args
else [(-> args[0])]
null
ReadStream.size = 100 * 1024
ReadStream::open = (encoding..., cb) ->
encoding = encoding[0]
try
nStream = @openFn()
nStream.setEncoding encoding if encoding
[deferCb, error, ended] = [null, null, false]
nStream.on 'readable', -> deferCb?()
nStream.on 'error', (err) -> error = err; deferCb?()
nStream.on 'end', -> ended = true; deferCb?()
read = (size) ->
if error then throw error
else if ended then null
else
if (chunk = nStream.read(size || @size)) != null then chunk
else
deferCb = sync.defer()
sync.await()
deferCb = null
read(size)
cb read
finally
@closeFn? nStream if nStream
ReadStream::read = (encoding) ->
buff = []
@open encoding, (read) -> buff.push read()
# Buffer.concat buff
buff.join('')
ReadStream.fromFile = (path) ->
fs = require 'fs'
[fileDescriptor, nStream] = [null, null]
open = ->
nStream = fs.createReadStream path
# Opening stream.
[deferCb, error] = [null, null]
nStream.on 'open', (fd) -> deferCb?(null, fd)
nStream.on 'error', (err) -> error = err; deferCb?()
# Waiting for `open` event.
deferCb = sync.defer()
fileDescriptor = sync.await()
deferCb = null
throw error if error
nStream
close = ->
if fileDescriptor and !nStream.closed
fs.close fileDescriptor, sync.defer()
sync.await()
new ReadStream open, close
ReadStream.fromString = (string) ->
stream = new ReadStream()
stream.open = (encoding..., cb) ->
encoding = encoding[0]
throw new Error "encoding not supported String stream!" if encoding
readed = false
cb (size) ->
throw new Error "size not supported for String stream!" if size
return null if readed
readed = true
string
stream.read = (encoding) ->
throw new Error "encoding not supported String stream!" if encoding
string
stream
# WriteStream.
exports.WriteStream = WriteStream = (args...) ->
[@openFn, @closeFn] = if args[0] instanceof Function then args
else [(-> args[0])]
null
WriteStream::open = (cb) ->
try
nStream = @openFn()
[deferCb, error] = [null, null]
nStream.on 'error', (err) -> error = err; deferCb?()
# Writing chunk to stream and waiting when it will be flushed.
write = (args...) ->
throw error if error
nStream.write args..., -> deferCb?()
deferCb = sync.defer()
sync.await()
deferCb = null
throw error if error
cb write
nStream.end sync.defer()
sync.await()
# If specified also waiting for some external condition.
@alsoWaitFor?()
finally
@closeFn? nStream if nStream
WriteStream.fromFile = (path) ->
fs = require 'fs'
open = -> fs.createWriteStream path
close = (nStream) -> nStream.close()
new WriteStream open, close
# NodeWriteReadStream.
exports.NodeWriteReadStream = NodeWriteReadStream = ->
@writable = true
@readable = true
util.inherits NodeWriteReadStream, stream.Stream
NodeWriteReadStream::write = (chunk, encoding..., callback) ->
@emit 'data', chunk
process.nextTick -> callback?()
NodeWriteReadStream::end = (callback) ->
@emit 'end'
process.nextTick -> callback?()
NodeWriteReadStream::pause = -> @emit 'pause'
NodeWriteReadStream::resume = -> @emit 'resume'
# StringStream.
exports.StringStream = StringStream = (@data) ->
@readable = true
util.inherits StringStream, stream.Stream
StringStream::onWithoutWrite = StringStream::on
StringStream::on = (name, cb) ->
@_write() if name == 'data'
@onWithoutWrite name, cb
StringStream::resume = ->
@paused = false
@_write()
# StringStream::read = (size) ->
# throw new Error "read with size is not supported for StringStream" if size
# return null if @closed
# @closed = true
# @data
StringStream::setEncoding = (@encoding) ->
StringStream::pause = -> @paused = true
StringStream::destroy = ->
StringStream::_write = ->
process.nextTick =>
return if @paused or @closed
if @encoding && Buffer.isBuffer(@data) then @emit 'data', @data.toString(@encoding)
else @emit 'data', @data
@emit('end')
@emit('close')
@closed = true