Skip to content
Browse files

New stack implementation

  • Loading branch information...
1 parent e6a1381 commit 5b8a80b06595f24a514f93e6096ab553b0c27618 @wdavidw committed Nov 29, 2012
Showing with 443 additions and 0 deletions.
  1. +172 −0 lib/StackBufferedStream.js
  2. +146 −0 src/StackBufferedStream.coffee
  3. +125 −0 test/StackBufferedStream.coffee
View
172 lib/StackBufferedStream.js
@@ -0,0 +1,172 @@
+// Generated by CoffeeScript 1.4.0
+var BufferStream, Stream, util;
+
+util = require('util');
+
+Stream = require('stream');
+
+/*
+
+Performances
+------------
+
+The results presented below are obtained by running `coffee samples/speed.coffee`.
+
+Writting 100000 lines of 100 bytes (about 9.5 Mo)
+```
+0 b : 2 s 105 ms
+64 b : 2 s 226 ms
+128 b : 2 s 205 ms
+256 b : 1 s 294 ms
+512 b : 771 ms
+1 Kb : 548 ms
+1 Mb : 344 ms
+4 Mb : 345 ms
+16 Mb : 326 ms
+64 Mb : 322 ms
+128 Mb : 329 ms
+```
+
+Writting 1000000 lines of 100 bytes (about 95 Mo)
+```
+0 b : 21 s 122 ms
+64 b : 21 s 345 ms
+128 b : 21 s 156 ms
+256 b : 12 s 224 ms
+512 b : 7 s 344 ms
+1 Kb : 5 s 237 ms
+1 Mb : 3 s 76 ms
+4 Mb : 3 s 138 ms
+16 Mb : 3 s 186 ms
+64 Mb : 3 s 380 ms
+128 Mb : 3 s 314 ms
+```
+*/
+
+
+BufferStream = function(size) {
+ Stream.call(this);
+ this.readable = true;
+ this.writable = true;
+ this.bufferSize = size != null ? parseInt(size, 10) : 1024 * 1024;
+ this.paused = false;
+ this.buffers = [];
+ return this.stackSize = 1;
+};
+
+util.inherits(BufferStream, Stream);
+
+/*
+Emit "end" if the "end" function has been called and there is no more buffer to flush.
+*/
+
+
+BufferStream.prototype.flush = function() {
+ var buffer, ended;
+ ended = !this.writable;
+ if (!ended && this.paused) {
+ return;
+ }
+ if (!ended && this.buffers.length <= 1) {
+ return this.emit('drain');
+ }
+ if ((ended && this.buffers.length) || this.buffers.length > 1) {
+ buffer = this.buffers.shift();
+ this.emit('data', buffer.slice(0, buffer.position));
+ }
+ if (ended && !this.buffers.length) {
+ if (this.paused) {
+ this.on('drain', function() {
+ return this.emit('end');
+ });
+ } else {
+ this.emit('end');
+ }
+ return;
+ }
+ return this.flush();
+};
+
+BufferStream.prototype.destroy = function() {
+ return this.destroySoon();
+};
+
+BufferStream.prototype.destroySoon = function() {
+ this.end();
+ this.readable = false;
+ return this.writable = false;
+};
+
+/*
+Write API
+drain Emitted after a write() method was called that returned false to indicate that it is safe to write again
+error Emitted on error with the exception exception
+close Emitted when the underlying file descriptor has been closed
+pipe Emitted when the stream is passed to a readable stream's pipe method
+*/
+
+
+BufferStream.prototype.write = function(data, opt_encoding) {
+ var buffer, encoding, flush;
+ if (data) {
+ flush = false;
+ encoding = opt_encoding || 'utf8';
+ if (!Buffer.isBuffer(data)) {
+ data = new Buffer(data, encoding);
+ }
+ if (data.length > this.bufferSize) {
+ this.emit('data', data);
+ return !this.paused;
+ }
+ if (data.length > this.bufferSize) {
+ throw new Error('Data length greater than buffer');
+ }
+ if (!this.buffers.length) {
+ buffer = new Buffer(this.bufferSize);
+ buffer.position = 0;
+ this.buffers.push(buffer);
+ } else {
+ buffer = this.buffers[this.buffers.length - 1];
+ }
+ if (buffer.position + data.length > buffer.length) {
+ buffer = new Buffer(this.bufferSize);
+ buffer.position = 0;
+ this.buffers.push(buffer);
+ flush = true;
+ }
+ data.copy(buffer, buffer.position);
+ buffer.position += data.length;
+ if (flush) {
+ this.flush();
+ }
+ }
+ return this.buffers.length <= this.stackSize;
+};
+
+BufferStream.prototype.end = function(data, opt_encoding) {
+ if (data) {
+ this.write(data, opt_encoding);
+ }
+ this.writable = false;
+ return this.flush();
+};
+
+/*
+Read API
+data The 'data' event emits either a Buffer (by default) or a string if setEncoding() was used.
+end Emitted when the stream has received an EOF (FIN in TCP terminology). Indicates that no more 'data' events will happen. If the stream is also writable, it may be possible to continue writing.
+error Emitted if there was an error receiving data.
+close Emitted when the underlying file descriptor has been closed. Not all streams will emit this. (For example, an incoming HTTP request will not emit 'close'.)
+*/
+
+
+BufferStream.prototype.pause = function() {
+ return this.paused = true;
+};
+
+BufferStream.prototype.resume = function() {
+ this.paused = false;
+ return this.flush();
+};
+
+module.exports = BufferStream;
View
146 src/StackBufferedStream.coffee
@@ -0,0 +1,146 @@
+util = require 'util'
+Stream = require 'stream'
+
+###
+
+Performances
+------------
+
+The results presented below are obtained by running `coffee samples/speed.coffee`.
+
+Writting 100000 lines of 100 bytes (about 9.5 Mo)
+
+```
+0 b : 2 s 146 ms
+64 b : 2 s 172 ms
+128 b : 2 s 155 ms
+256 b : 1 s 256 ms
+512 b : 749 ms
+1 Kb : 565 ms
+1 Mb : 333 ms
+4 Mb : 341 ms
+16 Mb : 342 ms
+64 Mb : 350 ms
+128 Mb : 351 ms
+```
+
+Writting 1000000 lines of 100 bytes (about 95 Mo)
+
+```
+0 b : 20 s 636 ms
+64 b : 20 s 217 ms
+128 b : 21 s 749 ms
+256 b : 12 s 769 ms
+512 b : 7 s 520 ms
+1 Kb : 5 s 452 ms
+1 Mb : 3 s 193 ms
+4 Mb : 3 s 218 ms
+16 Mb : 3 s 326 ms
+64 Mb : 3 s 415 ms
+128 Mb : 3 s 368 ms
+```
+
+###
+BufferStream = (size) ->
+ Stream.call @
+ @readable = true
+ @writable = true
+ @bufferSize = if size? then parseInt(size, 10) else 1024 * 1024
+ @paused = false
+ @buffers = []
+ @stackSize = 1
+util.inherits BufferStream, Stream
+
+###
+Emit "end" if the "end" function has been called and there is no more buffer to flush.
+###
+BufferStream.prototype.flush = () ->
+ ended = not @writable
+ return if not ended and @paused
+ # Restart the pump if running and buffer back to 1
+ # Note, used to be:
+ # if (ended and not @buffers.length) or (not ended and @buffers.length <= 1)
+ if not ended and @buffers.length <= 1
+ return @emit 'drain'
+ if (ended and @buffers.length) or @buffers.length > 1
+ buffer = @buffers.shift();
+ @emit 'data', buffer.slice 0, buffer.position
+ if ended and not @buffers.length
+ if @paused
+ @on 'drain', ->
+ @emit 'end'
+ else
+ @emit 'end'
+ return
+ @flush()
+
+
+BufferStream.prototype.destroy = ->
+ @destroySoon()
+
+BufferStream.prototype.destroySoon = ->
+ @end()
+ @readable = false
+ @writable = false
+
+###
+Write API
+drain Emitted after a write() method was called that returned false to indicate that it is safe to write again
+error Emitted on error with the exception exception
+close Emitted when the underlying file descriptor has been closed
+pipe Emitted when the stream is passed to a readable stream's pipe method
+###
+BufferStream.prototype.write = (data, opt_encoding) ->
+ if data
+ flush = false
+ encoding = opt_encoding or 'utf8'
+ data = new Buffer data, encoding unless Buffer.isBuffer data
+ if data.length > @bufferSize
+ @emit 'data', data
+ return not @paused
+ if data.length > @bufferSize
+ throw new Error 'Data length greater than buffer'
+ if not @buffers.length
+ # Create a new buffer if none is present
+ buffer = new Buffer @bufferSize
+ buffer.position = 0
+ @buffers.push buffer
+ else
+ # Get the last buffer
+ buffer = @buffers[@buffers.length-1]
+ # Check if the last buffer is not about to overflow
+ if buffer.position + data.length > buffer.length
+ # If so, create a new buffer
+ buffer = new Buffer @bufferSize
+ buffer.position = 0
+ @buffers.push buffer
+ flush = true
+ # Now, write into our buffer
+ # buffer.write data, encoding, buffer.position
+ data.copy buffer, buffer.position
+ buffer.position += data.length
+ # Flush old buffer
+ @flush() if flush
+ # Pause stream if number of buffers greater than 1
+ return @buffers.length <= @stackSize
+
+BufferStream.prototype.end = (data, opt_encoding) ->
+ @write data, opt_encoding if data
+ @writable = false
+ @flush()
+
+###
+Read API
+data The 'data' event emits either a Buffer (by default) or a string if setEncoding() was used.
+end Emitted when the stream has received an EOF (FIN in TCP terminology). Indicates that no more 'data' events will happen. If the stream is also writable, it may be possible to continue writing.
+error Emitted if there was an error receiving data.
+close Emitted when the underlying file descriptor has been closed. Not all streams will emit this. (For example, an incoming HTTP request will not emit 'close'.)
+###
+BufferStream.prototype.pause = ->
+ @paused = true
+
+BufferStream.prototype.resume = ->
+ @paused = false
+ @flush()
+
+module.exports = BufferStream
View
125 test/StackBufferedStream.coffee
@@ -0,0 +1,125 @@
+
+should = require 'should'
+stream = require 'stream'
+fs = require 'fs'
+BufferedStream = if process.env.BF_COV then require '../lib-cov/StackBufferedStream' else require '../lib/StackBufferedStream'
+
+file = "/tmp/buffered-stream"
+
+# Create an Input Stream
+Input = ->
+ @readable = true
+ stream.call @
+Input.prototype.__proto__ = stream.prototype
+Input.prototype.pause = ->
+ @paused = true
+Input.prototype.resume = ->
+ @paused = false
+Input.prototype.end = -> @ended = true && @emit 'end'
+Input.prototype.destroySoon = -> @emit 'end' unless @ended
+Input.prototype.destroy = -> @emit 'close'
+# Create an Output Stream
+Output = ->
+ @count = 0
+ @writable = true
+ stream.call @
+Output.prototype.__proto__ = stream.prototype
+Output.prototype.write = ->
+ @count++
+ return true if Math.floor(Math.random()*10) is 0
+ setTimeout (self) ->
+ self.emit 'drain'
+ , Math.floor(Math.random()*10), @
+ false
+Output.prototype.end = ->
+ setTimeout (self) ->
+ self.emit 'close'
+ , 10, @
+
+describe 'fixed buffered stream', ->
+
+ it 'should write utf8 each 100 chars as 1 Ko chuncks', (next) ->
+ data = ''
+ for i in [0...100000] then data += "#{i} ♥✈☺♬☑♠☎☻♫☒♤☤☹♪♀✩✉☠✔♂★✇♺✖♨❦☁✌♛❁☪☂✏♝❀☭☃☛♞✿☮☼☚♘✾☯☾☝♖✽✝☄☟♟✺☥✂✍♕✵\n"
+ buffer = new BufferedStream 3*1024*1024
+ out = fs.createWriteStream file
+ buffer.on 'error', (err) ->
+ next err
+ out.on 'error', (err) ->
+ next err
+ out.on 'close', ->
+ fs.readFile file, 'utf8', (err, content) ->
+ should.not.exist err
+ content.should.eql data
+ fs.unlink file, (err) ->
+ should.not.exist err
+ next()
+ buffer.pipe out
+ offset = 0
+ length = 100
+ while offset + length < data.length
+ buffer.write data.substr offset, length
+ offset += length
+ buffer.write data.substr offset
+ buffer.end()
+
+ it 'should pipe between file input and output stream', (next) ->
+ data = ''
+ for i in [0...1000000] then data += "àèêûîô#{i}\n"
+ fs.writeFile "#{file}-input", data, 'utf8', (err) ->
+ input = fs.createReadStream "#{file}-input", flags: 'r'
+ buffer = new BufferedStream 1024*1024
+ output = fs.createWriteStream file
+ input.pipe(buffer).pipe(output)
+ output.on 'close', ->
+ fs.readFile file, 'utf8', (err, content) ->
+ should.not.exist err
+ content.should.eql data
+ fs.unlink file, (err) ->
+ should.not.exist err
+ next()
+
+ it 'should pipe with a small buffer', (next) ->
+ data = ''
+ for i in [0...1000000] then data += "àèêûîô#{i}\n"
+ fs.writeFile "#{file}-input", data, 'utf8', (err) ->
+ input = fs.createReadStream "#{file}-input", flags: 'r'
+ buffer = new BufferedStream 1024*1024
+ output = fs.createWriteStream file
+ input.pipe(buffer).pipe(output)
+ output.on 'close', ->
+ fs.readFile file, 'utf8', (err, content) ->
+ should.not.exist err
+ content.should.eql data
+ fs.unlink file, (err) ->
+ should.not.exist err
+ next()
+
+ it 'should pipe between custom input and output stream', (next) ->
+ start = Date.now()
+ count = 0
+ input = new Input
+ output = new Output
+ wait = ->
+ setTimeout ->
+ if input.paused
+ then process.nextTick wait
+ else process.nextTick run
+ , 1
+ run = ->
+ input.emit 'data', "#{count++} ♥✈☺♬☑♠☎☻♫☒♤☤☹♪♀✩✉☠✔♂★✇♺✖♨❦☁✌♛❁☪☂✏♝❀☭☃☛♞✿☮☼☚♘✾☯☾☝♖✽✝☄☟♟✺☥✂✍♕✵\n"
+ return input.end() if Date.now() - start > 1000
+ if input.paused
+ then process.nextTick wait
+ else process.nextTick run
+ output.on 'close', ->
+ next()
+ input.pipe(new BufferedStream).pipe(output)
+ run()
+
+
+
+
+
+
+

0 comments on commit 5b8a80b

Please sign in to comment.
Something went wrong with that request. Please try again.