Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

not working wip

  • Loading branch information...
commit d0c363c899c721825bebbbb31b79401e0565af5c 1 parent 24469cc
Isaac Z. Schlueter authored
7 lib/_stream_readable.js
@@ -37,6 +37,7 @@ function ReadableState(options, stream) {
37 37 options.bufferSize : 16 * 1024;
38 38
39 39 // number of bytes that we ought to have before emitting 'readable'
  40 + // default to emitting for every chunk we get.
40 41 this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
41 42 options.lowWaterMark : 0;
42 43
@@ -187,6 +188,7 @@ Readable.prototype.read = function(n) {
187 188 ret = null;
188 189
189 190 if (ret === null || ret.length === 0) {
  191 + // wanted something, but didn't get it.
190 192 state.needReadable = true;
191 193 n = 0;
192 194 }
@@ -219,9 +221,10 @@ function onread(er, chunk) {
219 221 // if we've ended and we have some data left, then emit
220 222 // 'readable' now to make sure it gets picked up.
221 223 if (!sync) {
222   - if (state.length > 0)
  224 + if (state.length > 0) {
  225 + state.needReadable = false;
223 226 this.emit('readable');
224   - else
  227 + } else
225 228 endReadable(this);
226 229 }
227 230 return;
24 lib/_stream_transform.js
@@ -62,12 +62,13 @@
62 62 // would be consumed, and then the rest would wait (un-transformed) until
63 63 // the results of the previous transformed chunk were consumed. Because
64 64 // the transform happens on-demand, it will only transform as much as is
65   -// necessary to fill the readable buffer to the specified lowWaterMark.
  65 +// necessary to fill the readable buffer to the specified highWaterMark.
66 66
67 67 module.exports = Transform;
68 68
69 69 var Duplex = require('_stream_duplex');
70 70 var util = require('util');
  71 +var assert = require('assert');
71 72 util.inherits(Transform, Duplex);
72 73
73 74 function TransformState() {
@@ -85,10 +86,10 @@ function Transform(options) {
85 86 // duplex defaults to making the lowWaterMark 0,
86 87 // but we should use the standard defaults for transform
87 88 // streams, since it's not a true duplex conversation.
88   - if (!options || !options.hasOwnProperty('lowWaterMark')) {
89   - this._readableState.lowWaterMark = 1024;
90   - this._writableState.lowWaterMark = 1024;
91   - }
  89 + // if (!options || !options.hasOwnProperty('lowWaterMark')) {
  90 + // this._readableState.lowWaterMark = 1024;
  91 + // this._writableState.lowWaterMark = 1024;
  92 + // }
92 93
93 94 // bind output so that it can be passed around as a regular function.
94 95 this._output = this._output.bind(this);
@@ -133,13 +134,14 @@ Transform.prototype._write = function(chunk, cb) {
133 134 if (ts.pendingReadCb) {
134 135 var readcb = ts.pendingReadCb;
135 136 ts.pendingReadCb = null;
136   - this._read(0, readcb);
  137 + this._read(rs.bufferSize, readcb);
137 138 }
138 139
139 140 // if we weren't waiting for it, but nothing is queued up, then
140 141 // still kick off a transform, just so it's there when the user asks.
141   - var doRead = rs.needReadable || rs.length <= rs.lowWaterMark;
  142 + var doRead = rs.needReadable || rs.length <= rs.highWaterMark;
142 143 if (doRead && !rs.reading) {
  144 + console.error('TS: call read(0)');
143 145 var ret = this.read(0);
144 146 if (ret !== null)
145 147 return cb(new Error('invalid stream transform state'));
@@ -154,6 +156,8 @@ Transform.prototype._read = function(n, readcb) {
154 156 if (ts.pendingReadCb)
155 157 throw new Error('_read while _read already in progress');
156 158
  159 + console.error('TS: _read(%j, readcb)', n);
  160 + assert(typeof n === 'number');
157 161 ts.pendingReadCb = readcb;
158 162
159 163 // if there's nothing pending, then we just wait.
@@ -188,6 +192,7 @@ Transform.prototype._output = function(chunk) {
188 192 var readcb = ts.pendingReadCb;
189 193 if (readcb) {
190 194 ts.pendingReadCb = null;
  195 + console.error('call readcb', this);
191 196 readcb(null, chunk);
192 197 return;
193 198 }
@@ -228,8 +233,9 @@ function done(er) {
228 233 // we may have gotten a 'null' read before, and since there is
229 234 // no more data coming from the writable side, we need to emit
230 235 // now so that the consumer knows to pick up the tail bits.
231   - if (rs.length && rs.needReadable)
  236 + if (rs.length && rs.needReadable) {
  237 + rs.needReadable = false;
232 238 this.emit('readable');
233   - else if (rs.length === 0)
  239 + } else if (rs.length === 0)
234 240 this.emit('end');
235 241 }
165 lib/_stream_writable.js
@@ -27,17 +27,31 @@ module.exports = Writable;
27 27 Writable.WritableState = WritableState;
28 28
29 29 var util = require('util');
  30 +var assert = require('assert');
30 31 var Stream = require('stream');
31 32
32 33 util.inherits(Writable, Stream);
33 34
34   -function WritableState(options) {
  35 +function WritableState(options, stream) {
35 36 options = options || {};
36   - this.highWaterMark = options.highWaterMark || 16 * 1024;
  37 +
  38 + // the point at which write() starts returning false
37 39 this.highWaterMark = options.hasOwnProperty('highWaterMark') ?
38   - options.highWaterMark : 16 * 1024;
  40 + options.highWaterMark : 1024;
  41 +
  42 + // the point that it has to get to before we call _write(chunk,cb)
  43 + // default to pushing everything out as fast as possible.
39 44 this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
40   - options.lowWaterMark : 1024;
  45 + options.lowWaterMark : 0;
  46 +
  47 + // cast to ints.
  48 + assert(typeof this.lowWaterMark === 'number');
  49 + assert(typeof this.highWaterMark === 'number');
  50 + this.lowWaterMark = ~~this.lowWaterMark;
  51 + this.highWaterMark = ~~this.highWaterMark;
  52 + assert(this.lowWaterMark >= 0);
  53 + assert(this.highWaterMark >= this.lowWaterMark);
  54 +
41 55 this.needDrain = false;
42 56 // at the start of calling end()
43 57 this.ending = false;
@@ -59,7 +73,22 @@ function WritableState(options) {
59 73 // socket or file.
60 74 this.length = 0;
61 75
  76 + // a flag to see when we're in the middle of a write.
62 77 this.writing = false;
  78 +
  79 + // a flag to be able to tell if the onwrite cb is called immediately,
  80 + // or on a later tick.
  81 + this.sync = false;
  82 +
  83 + // the callback that's passed to _write(chunk,cb)
  84 + this.onwrite = onwrite.bind(stream);
  85 +
  86 + // the callback that the user supplies to write(chunk,encoding,cb)
  87 + this.writecb = null;
  88 +
  89 + // the amount that is being written when _write is called.
  90 + this.writelen = 0;
  91 +
63 92 this.buffer = [];
64 93 }
65 94
@@ -69,7 +98,7 @@ function Writable(options) {
69 98 if (!(this instanceof Writable) && !(this instanceof Stream.Duplex))
70 99 return new Writable(options);
71 100
72   - this._writableState = new WritableState(options);
  101 + this._writableState = new WritableState(options, this);
73 102
74 103 // legacy.
75 104 this.writable = true;
@@ -77,8 +106,7 @@ function Writable(options) {
77 106 Stream.call(this);
78 107 }
79 108
80   -// Override this method for sync streams
81   -// override the _write(chunk, cb) method for async streams
  109 +// Override this method or _write(chunk, cb)
82 110 Writable.prototype.write = function(chunk, encoding, cb) {
83 111 var state = this._writableState;
84 112
@@ -95,6 +123,8 @@ Writable.prototype.write = function(chunk, encoding, cb) {
95 123 return;
96 124 }
97 125
  126 + state.writecb = cb;
  127 +
98 128 var l = chunk.length;
99 129 if (false === state.decodeStrings)
100 130 chunk = [chunk, encoding || 'utf8'];
@@ -117,72 +147,77 @@ Writable.prototype.write = function(chunk, encoding, cb) {
117 147 }
118 148
119 149 state.writing = true;
120   - var sync = true;
121   - this._write(chunk, writecb.bind(this));
122   - sync = false;
  150 + state.sync = true;
  151 + state.writelen = l;
  152 + this._write(chunk, state.onwrite);
  153 + state.sync = false;
123 154
124 155 return ret;
  156 +};
125 157
126   - function writecb(er) {
127   - state.writing = false;
128   - if (er) {
129   - if (cb) {
130   - if (sync)
131   - process.nextTick(cb.bind(null, er));
132   - else
133   - cb(er);
134   - } else
135   - this.emit('error', er);
136   - return;
137   - }
138   - state.length -= l;
  158 +function onwrite(er) {
  159 + var state = this._writableState;
  160 + var sync = state.sync;
  161 + var cb = state.writecb;
  162 + var l = state.writelen;
139 163
  164 + state.writing = false;
  165 + if (er) {
140 166 if (cb) {
141   - // don't call the cb until the next tick if we're in sync mode.
142   - // also, defer if we're about to write some more right now.
143   - if (sync || state.buffer.length)
144   - process.nextTick(cb);
145   - else
146   - cb();
147   - }
148   -
149   - if (state.length === 0 && (state.ended || state.ending)) {
150   - // emit 'finish' at the very end.
151   - state.finishing = true;
152   - this.emit('finish');
153   - state.finished = true;
154   - return;
155   - }
156   -
157   - // if there's something in the buffer waiting, then do that, too.
158   - if (state.buffer.length) {
159   - var chunkCb = state.buffer.shift();
160   - chunk = chunkCb[0];
161   - cb = chunkCb[1];
162   -
163   - if (false === state.decodeStrings)
164   - l = chunk[0].length;
  167 + if (sync)
  168 + process.nextTick(cb.bind(null, er));
165 169 else
166   - l = chunk.length;
167   -
168   - state.writing = true;
169   - this._write(chunk, writecb.bind(this));
170   - }
171   -
172   - if (state.length <= state.lowWaterMark && state.needDrain) {
173   - // Must force callback to be called on nextTick, so that we don't
174   - // emit 'drain' before the write() consumer gets the 'false' return
175   - // value, and has a chance to attach a 'drain' listener.
176   - process.nextTick(function() {
177   - if (!state.needDrain)
178   - return;
179   - state.needDrain = false;
180   - this.emit('drain');
181   - }.bind(this));
182   - }
  170 + cb(er);
  171 + } else
  172 + this.emit('error', er);
  173 + return;
  174 + }
  175 + state.length -= l;
  176 +
  177 + if (cb) {
  178 + // don't call the cb until the next tick if we're in sync mode.
  179 + // also, defer if we're about to write some more right now.
  180 + if (sync || state.buffer.length)
  181 + process.nextTick(cb);
  182 + else
  183 + cb();
183 184 }
184 185
185   -};
  186 + if (state.length === 0 && (state.ended || state.ending)) {
  187 + // emit 'finish' at the very end.
  188 + state.finishing = true;
  189 + this.emit('finish');
  190 + state.finished = true;
  191 + return;
  192 + }
  193 +
  194 + // if there's something in the buffer waiting, then do that, too.
  195 + if (state.buffer.length) {
  196 + var chunkCb = state.buffer.shift();
  197 + var chunk = chunkCb[0];
  198 + cb = chunkCb[1];
  199 +
  200 + if (false === state.decodeStrings)
  201 + l = chunk[0].length;
  202 + else
  203 + l = chunk.length;
  204 +
  205 + state.writing = true;
  206 + this._write(chunk, state.onwrite);
  207 + }
  208 +
  209 + if (state.length <= state.lowWaterMark && state.needDrain) {
  210 + // Must force callback to be called on nextTick, so that we don't
  211 + // emit 'drain' before the write() consumer gets the 'false' return
  212 + // value, and has a chance to attach a 'drain' listener.
  213 + process.nextTick(function() {
  214 + if (!state.needDrain)
  215 + return;
  216 + state.needDrain = false;
  217 + this.emit('drain');
  218 + }.bind(this));
  219 + }
  220 +}
186 221
187 222 Writable.prototype._write = function(chunk, cb) {
188 223 process.nextTick(cb.bind(this, new Error('not implemented')));
37 test/simple/test-stream2-transform.js
@@ -37,7 +37,7 @@ function run() {
37 37
38 38 var name = next[0];
39 39 var fn = next[1];
40   - console.log('# %s', name);
  40 + console.error('# %s', name);
41 41 fn({
42 42 same: assert.deepEqual,
43 43 equal: assert.equal,
@@ -203,12 +203,14 @@ test('assymetric transform (compress)', function(t) {
203 203
204 204 test('passthrough event emission', function(t) {
205 205 var pt = new PassThrough({
206   - lowWaterMark: 0
  206 + lowWaterMark: 0,
  207 + highWaterMark: 0
207 208 });
208 209 var emits = 0;
209 210 pt.on('readable', function() {
210 211 var state = pt._readableState;
211   - console.error('>>> emit readable %d', emits);
  212 + console.error(state);
  213 + console.error('emit readable ' + emits)
212 214 emits++;
213 215 });
214 216
@@ -250,7 +252,6 @@ test('passthrough event emission reordered', function(t) {
250 252 var pt = new PassThrough;
251 253 var emits = 0;
252 254 pt.on('readable', function() {
253   - console.error('emit readable', emits)
254 255 emits++;
255 256 });
256 257
@@ -259,24 +260,36 @@ test('passthrough event emission reordered', function(t) {
259 260
260 261 t.equal(pt.read(5).toString(), 'foogb');
261 262 t.equal(pt.read(5), null);
  263 + console.error('got null', pt);
262 264
263 265 console.error('need emit 0');
264 266 pt.once('readable', function() {
265   - t.equal(pt.read(5).toString(), 'arkba');
266   - t.equal(pt.read(5).toString(), 'zykue');
  267 + console.error('>>> readable', pt);
  268 + console.trace('>>> readable');
  269 + process.nextTick(function() {
  270 + console.error('>>> readable nt', pt);
  271 + });
  272 + return;
  273 +
  274 + t.equal(pt.read(5) + '', 'arkba');
267 275 t.equal(pt.read(5), null);
268 276
269 277 console.error('need emit 1');
270 278 pt.once('readable', function() {
271   - t.equal(pt.read(5).toString(), 'l');
272   - t.equal(pt.read(5), null);
273   -
274   - t.equal(emits, 2);
275   - t.end();
  279 + t.equal(pt.read(5) + '', 'zykue');
  280 + pt.once('readable', function() {
  281 + t.equal(pt.read(5).toString(), 'l');
  282 + t.equal(pt.read(5), null);
  283 +
  284 + t.equal(emits, 2);
  285 + t.end();
  286 + });
  287 + pt.end();
276 288 });
277   - pt.end();
278 289 });
  290 + console.error('\n\n\n\nabout to trigger readable...');
279 291 pt.write(new Buffer('bazy'));
  292 + console.error('^^^^ theres the problem\n\n\n\n\n\n\n');
280 293 pt.write(new Buffer('kuel'));
281 294 });
282 295

0 comments on commit d0c363c

Please sign in to comment.
Something went wrong with that request. Please try again.