Skip to content

Commit

Permalink
fs.ReadStream should emit Buffers
Browse files Browse the repository at this point in the history
And do proper utf8 encoding.
  • Loading branch information
ry committed May 24, 2010
1 parent 3768aaa commit 19f475c
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 75 deletions.
98 changes: 72 additions & 26 deletions lib/fs.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
var sys = require('sys'),
events = require('events'),
Buffer = require('buffer').Buffer;
var sys = require('sys');
var events = require('events');
var Buffer = require('buffer').Buffer;

var binding = process.binding('fs');
var fs = exports;

var kMinPoolSpace = 128;
var kPoolSize = 40*1024;

fs.Stats = binding.Stats;

fs.Stats.prototype._checkModeProperty = function (property) {
Expand Down Expand Up @@ -565,8 +568,16 @@ fs.realpath = function (path, callback) {
}
}
next();
};

var pool;
function allocNewPool () {
pool = new Buffer(kPoolSize);
pool.used = 0;
}



fs.createReadStream = function(path, options) {
return new ReadStream(path, options);
};
Expand All @@ -580,7 +591,6 @@ var ReadStream = fs.ReadStream = function(path, options) {
this.paused = false;

this.flags = 'r';
this.encoding = 'binary';
this.mode = 0666;
this.bufferSize = 4 * 1024;

Expand Down Expand Up @@ -614,19 +624,40 @@ sys.inherits(ReadStream, events.EventEmitter);

fs.FileReadStream = fs.ReadStream; // support the legacy name

ReadStream.prototype.setEncoding = function(encoding) {
this.encoding = encoding;
ReadStream.prototype.setEncoding = function (encoding) {
var Utf8Decoder = require("utf8decoder").Utf8Decoder; // lazy load
var self = this;
this._encoding = enc.toLowerCase();
if (this._encoding == 'utf-8' || this._encoding == 'utf8') {
this._decoder = new Utf8Decoder();
this._decoder.onString = function(str) {
self.emit('data', str);
};
} else if (this._decoder) {
delete this._decoder;
}
};


ReadStream.prototype._read = function () {
var self = this;
if (!self.readable || self.paused) return;

fs.read(self.fd,
self.bufferSize,
undefined,
self.encoding,
function(err, data, bytesRead) {
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// discard the old pool. Can't add to the free list because
// users might have refernces to slices on it.
pool = null;
allocNewPool();
}

// Grab another reference to the pool in the case that while we're in the
// thread pool another read() finishes up the pool, and allocates a new
// one.
var thisPool = pool;
var toRead = Math.min(pool.length - pool.used, this.bufferSize);
var start = pool.used;

function afterRead (err, bytesRead) {
if (err) {
self.emit('error', err);
self.readable = false;
Expand All @@ -639,20 +670,39 @@ ReadStream.prototype._read = function () {
return;
}

var b = thisPool.slice(start, start+bytesRead);

// Possible optimizition here?
// Reclaim some bytes if bytesRead < toRead?
// Would need to ensure that pool === thisPool.

// do not emit events if the stream is paused
if (self.paused) {
self.buffer = data;
self.buffer = b;
return;
}

// do not emit events anymore after we declared the stream unreadable
if (!self.readable) {
return;
}
if (!self.readable) return;

self.emit('data', data);
self._emitData(b);
self._read();
});
}

fs.read(self.fd, pool, pool.used, toRead, undefined, afterRead);
pool.used += toRead;
};


ReadStream.prototype._emitData = function (d) {
if (!this._encoding) {
this.emit('data', d);
} else if (this._decoder) {
this._decoder.write(d);
} else {
var string = d.toString(this._encoding, 0, d.length);
this.emit('data', string);
}
};


Expand All @@ -664,7 +714,7 @@ ReadStream.prototype.forceClose = function (cb) {
sys.error(readStreamForceCloseWarning);
}
return this.destroy(cb);
}
};


ReadStream.prototype.destroy = function (cb) {
Expand All @@ -674,16 +724,12 @@ ReadStream.prototype.destroy = function (cb) {
function close() {
fs.close(self.fd, function(err) {
if (err) {
if (cb) {
cb(err);
}
if (cb) cb(err);
self.emit('error', err);
return;
}

if (cb) {
cb(null);
}
if (cb) cb(null);
self.emit('close');
});
}
Expand All @@ -705,7 +751,7 @@ ReadStream.prototype.resume = function() {
this.paused = false;

if (this.buffer) {
this.emit('data', this.buffer);
this._emitData(this.buffer);
this.buffer = null;
}

Expand Down Expand Up @@ -858,7 +904,7 @@ WriteStream.prototype.forceClose = function (cb) {
sys.error(writeStreamForceCloseWarning);
}
return this.destroy(cb);
}
};


WriteStream.prototype.forceClose = function (cb) {
Expand Down
109 changes: 60 additions & 49 deletions test/simple/test-file-read-stream.js
Original file line number Diff line number Diff line change
@@ -1,54 +1,65 @@
require('../common');

var
path = require('path'),
fs = require('fs'),
fn = path.join(fixturesDir, 'test_ca.pem'),
file = fs.createReadStream(fn),

callbacks = {
open: -1,
end: -1,
close: -1,
destroy: -1
},

paused = false,

fileContent = '';

file
.addListener('open', function(fd) {
callbacks.open++;
assert.equal('number', typeof fd);
assert.ok(file.readable);
})
.addListener('error', function(err) {
throw err;
})
.addListener('data', function(data) {
assert.ok(!paused);
fileContent += data;

paused = true;
file.pause();
assert.ok(file.paused);

setTimeout(function() {
paused = false;
file.resume();
assert.ok(!file.paused);
}, 10);
})
.addListener('end', function(chunk) {
callbacks.end++;
})
.addListener('close', function() {
callbacks.close++;
assert.ok(!file.readable);

assert.equal(fs.readFileSync(fn), fileContent);
});
// TODO Improved this test. test_ca.pem is too small. A proper test would
// great a large utf8 (with multibyte chars) file and stream it in,
// performing sanity checks throughout.

Buffer = require('buffer').Buffer;
path = require('path');
fs = require('fs');
fn = path.join(fixturesDir, 'test_ca.pem');

file = fs.createReadStream(fn);

callbacks = {
open: -1,
end: -1,
data: -1,
close: -1,
destroy: -1
};

paused = false;

fileContent = '';

file.addListener('open', function(fd) {
callbacks.open++;
assert.equal('number', typeof fd);
assert.ok(file.readable);
});

file.addListener('error', function(err) {
throw err;
});

file.addListener('data', function(data) {
callbacks.data++;
assert.ok(data instanceof Buffer);
assert.ok(!paused);
fileContent += data;

paused = true;
file.pause();
assert.ok(file.paused);

setTimeout(function() {
paused = false;
file.resume();
assert.ok(!file.paused);
}, 10);
});

file.addListener('end', function(chunk) {
callbacks.end++;
});

file.addListener('close', function() {
callbacks.close++;
assert.ok(!file.readable);

assert.equal(fs.readFileSync(fn), fileContent);
});

var file2 = fs.createReadStream(fn);
file2.destroy(function(err) {
Expand Down

0 comments on commit 19f475c

Please sign in to comment.