Skip to content

Commit

Permalink
add accum.string
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffbski committed Nov 29, 2012
1 parent 2737ffc commit 5bdb45c
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 13 deletions.
44 changes: 31 additions & 13 deletions lib/accum.js
Expand Up @@ -2,14 +2,17 @@

var passStream = require('pass-stream');

function accum(type, cb) {
if (arguments.length === 1) { // type is optional
cb = type;
type = null;
function accum(options, cb) {
if (arguments.length === 1) { // options is optional
cb = options;
options = {};
}
options = options || {};
options.encoding = options.encoding || 'utf8';
if (typeof cb !== 'function') throw new Error('accum requires a cb function');
if (type && (type !== 'buffer' && type !== 'string' && type !== 'array')) {
throw new Error('if type is provided to accum, it must be one of [buffer, string, array]');
if (options.type &&
(options.type !== 'buffer' && options.type !== 'string' && options.type !== 'array')) {
throw new Error('if options.type is provided to accum, it must be one of [buffer, string, array]');
}

var arrData = [];
Expand All @@ -24,21 +27,27 @@ function accum(type, cb) {
return (Buffer.isBuffer(data)) ? data : new Buffer(data);
}

function ensureString(data) {
return (typeof data === 'string') ? data :
Buffer.isBuffer(data) ? data.toString(options.encoding) :
data.toString();
}

var endConcatFns = {
buffer: function (arrData, cb) { cb(null, Buffer.concat(arrData.map(ensureBuffer))); },
string: function (arrData, cb) { cb(null, arrData.join('')); },
string: function (arrData, cb) { cb(null, arrData.map(ensureString).join('')); },
array: function (arrData, cb) { cb(null, arrData); }
};

function endFn() {
/*jshint validthis:true */
if (!type) { // not provided need to detect
if (!options.type) { // not provided need to detect
var first = arrData[0];
if (Buffer.isBuffer(first)) type = 'buffer';
else if (typeof first === 'string') type = 'string';
else type = 'array';
if (Buffer.isBuffer(first)) options.type = 'buffer';
else if (typeof first === 'string') options.type = 'string';
else options.type = 'array';
}
var fn = endConcatFns[type];
var fn = endConcatFns[options.type];
fn(arrData, cb);
this.queueEnd();
}
Expand All @@ -58,7 +67,16 @@ function accum(type, cb) {
return stream;
}

function buffer(cb) { return accum('buffer', cb); }
function buffer(cb) { return accum({ type: 'buffer'}, cb); }
function string(optEncoding, cb) {
if (arguments.length === 1) { // optEncoding not provided, shift
cb = optEncoding;
optEncoding = null;
}
optEncoding = optEncoding || 'utf8';
return accum({ type: 'string', encoding: optEncoding }, cb);
}

accum.buffer = buffer;
accum.string = string;
module.exports = accum;
35 changes: 35 additions & 0 deletions test/spec.mocha.js
Expand Up @@ -144,3 +144,38 @@ test('spec random pausing Buffer stream with binary data - accum.buffer factory'
});
});

// accum.string

test('spec random pausing string stream - accum.string factory', function (done) {
var result;
var astream = accum.string(function (err, alldata) {
result = alldata;
});
spec(astream)
.through({strict: false})
.validateOnExit();

var master = tester.createConsistentStream();

function gen() {
return 'abc';
}

var manualAccum = [];
tester.createRandomStream(gen, 1000) //1k 3char strings
.pipe(master)
.pipe(tester.createUnpauseStream())
.pipe(astream)
.pipe(tester.createPauseStream())
.pipe(master.createSlave())
.on('error', function (err) { done(err); })
.on('data', function (data) { manualAccum.push(new Buffer(data)); })
.on('end', function () {
t.typeOf(result, 'string');
t.equal(result.length, 3000);
var digest = crypto.createHash('sha1').update(result).digest('base64');
var expectedDigest = crypto.createHash('sha1').update(Buffer.concat(manualAccum).toString()).digest('base64');
t.equal(digest, expectedDigest);
done();
});
});
100 changes: 100 additions & 0 deletions test/string.mocha.js
@@ -0,0 +1,100 @@
/*global suite:false test:false */
'use strict';

var chai = require('chai-stack');
var crypto = require('crypto');
var accum = require('..'); // require('accum');
var passStream = require('pass-stream');

var t = chai.assert;

suite('string');

test('accum.string(cb) with string data, results with concatenated string to cb before end', function (done) {
var DATA = 'abcdefghi';
var stream = passStream();
stream
.pipe(accum.string(function (err, alldata) {
if (err) return done(err);
t.typeOf(alldata, 'string');
t.equal(alldata.length, DATA.length);
t.equal(alldata, DATA);
done();
}));
process.nextTick(function () {
stream.write('abc');
stream.write('def');
stream.end('ghi');
});
});

test('accum.string(cb) with Buffer data, results with concatenated string to cb before end', function (done) {
var DATA = 'abcdefghi';
var stream = passStream();
stream
.pipe(accum.string(function (err, alldata) {
if (err) return done(err);
t.typeOf(alldata, 'string');
t.equal(alldata.length, DATA.length);
t.equal(alldata, DATA);
done();
}));
process.nextTick(function () {
stream.write(new Buffer('abc'));
stream.write(new Buffer('def'));
stream.end(new Buffer('ghi'));
});
});

test('accum.string("utf8", cb) with Buffer data, results with concatenated string to cb before end', function (done) {
var DATA = 'abcdefghi';
var stream = passStream();
stream
.pipe(accum.string('utf8', function (err, alldata) {
if (err) return done(err);
t.typeOf(alldata, 'string');
t.equal(alldata.length, DATA.length);
t.equal(alldata, DATA);
done();
}));
process.nextTick(function () {
stream.write(new Buffer('abc'));
stream.write(new Buffer('def'));
stream.end(new Buffer('ghi'));
});
});


test('accum.string(cb) with err, calls cb with err', function (done) {
var stream = accum.string(function (err, alldata) {
t.equal(err.message, 'my error');
done();
});
process.nextTick(function () {
stream.emit('error', new Error('my error'));
});
});

// currently pipe does not forward error but I have put in
// pull request to fix node.js. Also pause-stream will have
// to be modified as well.
// test('accum.string(cb) with err piped, calls cb with err', function (done) {
// var stream = passStream();
// stream
// .pipe(accum.string(function (err, alldata) {
// t.equal(err.message, 'my error');
// done();
// }));
// process.nextTick(function () {
// stream.emit('error', new Error('my error'));
// });
// });


test('accum.string() missing cb throws error', function () {
function throwsErr() {
var stream = accum.string();
}
t.throws(throwsErr, /accum requires a cb function/);
});

0 comments on commit 5bdb45c

Please sign in to comment.