Skip to content

Commit

Permalink
Remove streaming interface
Browse files Browse the repository at this point in the history
Use `unified-stream` instead.

Closes GH-26.
  • Loading branch information
wooorm committed Feb 11, 2017
1 parent e566fe6 commit 111d3c6
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 682 deletions.
169 changes: 1 addition & 168 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
'use strict';

/* Dependencies. */
var events = require('events');
var has = require('has');
var once = require('once');
var extend = require('extend');
var bail = require('bail');
var vfile = require('vfile');
Expand Down Expand Up @@ -42,24 +40,7 @@ function unified() {
var attachers = [];
var transformers = trough();
var namespace = {};
var chunks = [];
var emitter = new events.EventEmitter();
var ended = false;
var concrete = true;
var settings;
var key;

/* Mix in methods. */
for (key in emitter) {
processor[key] = emitter[key];
}

/* Throw as early as possible.
* As events are triggered synchroneously, the stack
* is preserved. */
processor.on('pipe', function () {
assertConcrete();
});

/* Data management. */
processor.data = data;
Expand All @@ -71,13 +52,6 @@ function unified() {
processor.attachers = attachers;
processor.use = use;

/* Streaming. */
processor.writable = true;
processor.readable = true;
processor.write = write;
processor.end = end;
processor.pipe = pipe;

/* API. */
processor.parse = parse;
processor.stringify = stringify;
Expand Down Expand Up @@ -123,9 +97,7 @@ function unified() {
function assertConcrete(name) {
if (!concrete) {
throw new Error(
'Cannot ' +
(name ? 'invoke `' + name + '` on' : 'pipe into') +
' abstract processor.\n' +
'Cannot invoke `' + name + '` on abstract processor.\n' +
'To make the processor concrete, invoke it: ' +
'use `processor()` instead of `processor`.'
);
Expand Down Expand Up @@ -360,145 +332,6 @@ function unified() {

return file;
}

/* Streams. */

/* Write a chunk into memory. */
function write(chunk, encoding, callback) {
assertConcrete('write');

if (isFunction(encoding)) {
callback = encoding;
encoding = null;
}

if (ended) {
throw new Error('Did not expect `write` after `end`');
}

chunks.push((chunk || '').toString(encoding || 'utf8'));

if (callback) {
callback();
}

/* Signal succesful write. */
return true;
}

/* End the writing. Passes all arguments to a final
* `write`. Starts the process, which will trigger
* `error`, with a fatal error, if any; `data`, with
* the generated document in `string` form, if
* succesful. If messages are triggered during the
* process, those are triggerd as `warning`s. */
function end() {
assertConcrete('end');
assertParser('end');
assertCompiler('end');

write.apply(null, arguments);

ended = true;

process(chunks.join(''), settings, function (err, file) {
var messages = file.messages;
var length = messages.length;
var index = -1;

chunks = settings = null;

/* Trigger messages as warnings, except for fatal error. */
while (++index < length) {
if (messages[index] !== err) {
processor.emit('warning', messages[index]);
}
}

if (err) {
/* Don’t enter an infinite error throwing loop. */
global.setTimeout(function () {
processor.emit('error', err);
}, 4);
} else {
processor.emit('data', file.contents);
processor.emit('end');
}
});

return true;
}

/* Pipe the processor into a writable stream.
*
* Basically `Stream#pipe`, but inlined and
* simplified to keep the bundled size down.
*
* See https://github.com/nodejs/node/blob/master/lib/stream.js#L26. */
function pipe(dest, options) {
var onend = once(onended);

assertConcrete('pipe');

settings = options || {};

processor.on('data', ondata);
processor.on('error', onerror);
processor.on('end', cleanup);
processor.on('close', cleanup);

/* If the 'end' option is not supplied, dest.end() will be
* called when the 'end' or 'close' events are received.
* Only dest.end() once. */
if (!dest._isStdio && settings.end !== false) {
processor.on('end', onend);
}

dest.on('error', onerror);
dest.on('close', cleanup);

dest.emit('pipe', processor);

return dest;

/* End destination. */
function onended() {
if (dest.end) {
dest.end();
}
}

/* Handle data. */
function ondata(chunk) {
if (dest.writable) {
dest.write(chunk);
}
}

/* Clean listeners. */
function cleanup() {
processor.removeListener('data', ondata);
processor.removeListener('end', onend);
processor.removeListener('error', onerror);
processor.removeListener('end', cleanup);
processor.removeListener('close', cleanup);

dest.removeListener('error', onerror);
dest.removeListener('close', cleanup);
}

/* Close dangling pipes and handle unheard errors. */
function onerror(err) {
var handlers = processor._events.error;

cleanup();

/* Cannot use `listenerCount` in node <= 0.12. */
if (!handlers || handlers.length === 0 || handlers === onerror) {
throw err; /* Unhandled stream error in pipe. */
}
}
}
}

/* Check if `node` is a Unist node. */
Expand Down

0 comments on commit 111d3c6

Please sign in to comment.