Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【node 源码学习笔记】stream 可写流 #11

Open
xiaoxiaojx opened this issue Jul 17, 2021 · 0 comments
Open

【node 源码学习笔记】stream 可写流 #11

xiaoxiaojx opened this issue Jul 17, 2021 · 0 comments
Labels
Node.js Node.js® is a JavaScript runtime built on Chrome's V8 JavaScript engine.

Comments

@xiaoxiaojx
Copy link
Owner

xiaoxiaojx commented Jul 17, 2021

Node.js

Table of Contents

1. 前言

stream 流是许多 nodejs 核心模块的基类, 在讲解它们之前还是要认真说一下 nodejs stream 的实现。其实在 【libuv 源码学习笔记】网络与流BSD 套接字 中就开始提到 c 中的流, nodejs 的 c++ 代码的实现更多的是作为一个胶水层, 实际调用的 js 层面的 stream 实例的方法。

流是用于在 Node.js 中处理流数据的抽象接口。 stream 模块提供了用于实现流接口的 API。
Node.js 提供了许多流对象。 例如,对 HTTP 服务器的请求和 process.stdout 都是流的实例。
流可以是可读的、可写的、或两者兼而有之。 所有的流都是 EventEmitter 的实例。

涉及的知识点

2. 可写流

可写流的例子包括:

  • 客户端上的 HTTP 请求
  • 服务器上的 HTTP 响应
  • 文件系统写流
  • 压缩流
  • 加密流
  • TCP 套接字
  • 子进程标准输入
  • process.stdout、process.stderr

所有的 Writable 流都实现了 stream.Writable 类定义的接口。

2.1. Writable

实现一个可写流的核心是继承 Writable, 并至少实现一个 _write 或者 _writev 方法。

  • isDuplex: 判断此时是不是双工流, 双工流(Duplex)是同时实现了 Readable 和 Writable 接口的流。这个我们后面在单独讲一下
  • this._writableState : 用于保存流状态变化及一些其他属性的数据
    options: 传入一些配置参数, 因为 Writable 是不能直接使用的, 你可以继承于 Writable 传入 options 实现自己的可写流
  • destroyImpl.construct: 流开始前的准备工作, 如 fs 的可写流就需要先调用 open 方法获取到 fd, 流才算准备就绪。
// lib/internal/streams/writable.js

function Writable(options) {
  const isDuplex = (this instanceof Stream.Duplex);

  if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this))
    return new Writable(options);

  this._writableState = new WritableState(options, this, isDuplex);

  if (options) {
    if (typeof options.write === 'function')
      this._write = options.write;

    if (typeof options.writev === 'function')
      this._writev = options.writev;

    if (typeof options.destroy === 'function')
      this._destroy = options.destroy;

    if (typeof options.final === 'function')
      this._final = options.final;

    if (typeof options.construct === 'function')
      this._construct = options.construct;
    if (options.signal)
      addAbortSignalNoValidate(options.signal, this);
  }

  Stream.call(this, options);

  destroyImpl.construct(this, () => {
    const state = this._writableState;

    if (!state.writing) {
      clearBuffer(this, state);
    }

    finishMaybe(this, state);
  });
}

2.2. 可写流的实现之 myWritable

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  }
});

2.3. 可写流的实现之 fs.WriteStream

完整的实现在 lib/internal/fs/streams.js 文件中

总体上和上一篇 【node 源码学习笔记】stream 可读流 类似, 其中的 _destroy, construct 参数就不在这篇重复讲了。

WriteStream 是继承于 Writable, 其中的 options 参数可以传入, 也可以在 WriteStream 中自己实现 options 需要的 _write, _writev, _construct, _final, _destroy 方法

2.3.1. _write

  • _write 方法主要是消费可读流产生的数据

可以看见对于一个文件的可读流的 _write 方法, 当有数据传入时, 会把当前数据就是不断写入 fd, 每写入一次 this.pos += data.length 偏移量加上本次写入的数据的长度, 保证数据都被写入到了正确的位置。

// lib/internal/fs/streams.js

WriteStream.prototype._write = function(data, encoding, cb) {
  this[kIsPerformingIO] = true;
  this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
    this[kIsPerformingIO] = false;
    if (this.destroyed) {
      // Tell ._destroy() that it's safe to close the fd now.
      cb(er);
      return this.emit(kIoDone, er);
    }

    if (er) {
      return cb(er);
    }

    this.bytesWritten += bytes;
    cb();
  });

  if (this.pos !== undefined)
    this.pos += data.length;
};

2.3.2. _writev

与 _write 方法不同的是每次写的是一组数据(如 chunk[], _write 仅为一个 chunk), 其来源为内存中 state.buffered 的数据

// lib/internal/fs/streams.js

WriteStream.prototype._writev = function(data, cb) {
  const len = data.length;
  const chunks = new Array(len);
  let size = 0;

  for (let i = 0; i < len; i++) {
    const chunk = data[i].chunk;

    chunks[i] = chunk;
    size += chunk.length;
  }

  this[kIsPerformingIO] = true;
  this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
    this[kIsPerformingIO] = false;
    if (this.destroyed) {
      // Tell ._destroy() that it's safe to close the fd now.
      cb(er);
      return this.emit(kIoDone, er);
    }

    if (er) {
      return cb(er);
    }

    this.bytesWritten += bytes;
    cb();
  });

  if (this.pos !== undefined)
    this.pos += size;
};

doWrite 方法中可以看见如果同时实现了 _write 与 _writev 方法, 会调用 _writev 方法。

// lib/internal/streams/writable.js

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  state.writelen = len;
  state.writecb = cb;
  state.writing = true;
  state.sync = true;
  if (state.destroyed)
    state.onwrite(new ERR_STREAM_DESTROYED('write'));
  else if (writev)
    stream._writev(chunk, state.onwrite);
  else
    stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}

writev 在 c 中的使用如下, writev以顺序iov[0],iov[1]至iov[iovcnt-1]从缓冲区中聚集输出数据。writev返回输出的字节总数,通常,它应等于所有缓冲区长度之和。

char *str0 = "hello ";
char *str1 = "world\n";
struct iovec iov[2];
ssize_t nwritten;

iov[0].iov_base = str0;
iov[0].iov_len = strlen(str0);
iov[1].iov_base = str1;
iov[1].iov_len = strlen(str1);

nwritten = writev(STDOUT_FILENO, iov, 2);

在 writeOrBuffer 方法中可以看见, 满足如下条件数据讲会先写入内存 state.buffered 中

  • state.writing: 上一个数据正在写入过程中, 在【libuv 源码学习笔记】线程池与i/o 详细说到过 fs 等操作是通过线程池中取出一个线程同步去完成, 为了不出现预期外的错误, 都需要等待上一次任务完成后再进行
  • state.corked: 这一般出现在手动调用了 Writable.prototype.cork 方法, 强制让当前产生的数据都写入内存中, 比如此时流还未准备就绪可以主动调用一次 cork, 在流准备好后再调用一次 uncork 解除锁定即可
  • state.errored: 发生错误的时候
  • state.constructed: _construct 构造函数还未执行完成的时候, 即流还未准备就绪
// lib/internal/streams/writable.js

function writeOrBuffer(stream, state, chunk, encoding, callback) {
  const len = state.objectMode ? 1 : chunk.length;

  state.length += len;

  // stream._write resets state.length
  const ret = state.length < state.highWaterMark;
  // We must ensure that previous needDrain will not be reset to false.
  if (!ret)
    state.needDrain = true;

  if (state.writing || state.corked || state.errored || !state.constructed) {
    state.buffered.push({ chunk, encoding, callback });
    if (state.allBuffers && encoding !== 'buffer') {
      state.allBuffers = false;
    }
    if (state.allNoop && callback !== nop) {
      state.allNoop = false;
    }
  } else {
    state.writelen = len;
    state.writecb = callback;
    state.writing = true;
    state.sync = true;
    stream._write(chunk, encoding, state.onwrite);
    state.sync = false;
  }

  // Return false if errored or destroyed in order to break
  // any synchronous while(stream.write(data)) loops.
  return ret && !state.errored && !state.destroyed;
}

实现自己的可写流时一定要注意 writeOrBuffer 的返回值, 因为此时可能出现了积压的问题, 详见上一篇 【node 源码学习笔记】stream 可读流

从下面的 afterWrite 函数发现,在每次 write 后,如果当前流没有结束 & 没有摧毁 & 内存中的数据清空后才会触发 drain 事件,即自从积压问题出现后第一次释放出的可以继续开始流动的信号。其实也好理解,如果一出现积压,内存中的数据刚下降一点就触发 drain 事件的话,短时间内会不断触发积压机制。

// lib/internal/streams/writable.js

function afterWrite(stream, state, count, cb) {
  const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
    state.needDrain;
  if (needDrain) {
    state.needDrain = false;
    stream.emit('drain');
  }

  while (count-- > 0) {
    state.pendingcb--;
    cb();
  }

  if (state.destroyed) {
    errorBuffer(state);
  }

  finishMaybe(stream, state);
}

2.3.3. _final

fs.WriteStream 没有实现该方法, 在 lib/internal/streams/writable.js 在 write, end 等方法后, 会在 finishMaybe > needFinish > prefinish 中调用 _final 方法。

从 state 的属性可知道只会被调用一次, 并且是 destroyed 之前。

// lib/internal/streams/writable.js

function prefinish(stream, state) {
  if (!state.prefinished && !state.finalCalled) {
    if (typeof stream._final === 'function' && !state.destroyed) {
      state.finalCalled = true;
      callFinal(stream, state);
    } else {
      state.prefinished = true;
      stream.emit('prefinish');
    }
  }
}

从 callFinal 方法中发现, 实现的 _final 会传入一个 callback, 在 _final 方法的最后必须调用一次 callback, 因为该 callback 调用 finish 方法开始走接下来的结束流程。

// lib/internal/streams/writable.js

function callFinal(stream, state) {
  state.sync = true;
  state.pendingcb++;
  const result = stream._final((err) => {
    state.pendingcb--;
    if (err) {
      const onfinishCallbacks = state[kOnFinished].splice(0);
      for (let i = 0; i < onfinishCallbacks.length; i++) {
        onfinishCallbacks[i](err);
      }
      errorOrDestroy(stream, err, state.sync);
    } else if (needFinish(state)) {
      state.prefinished = true;
      stream.emit('prefinish');
      // Backwards compat. Don't check state.sync here.
      // Some streams assume 'finish' will be emitted
      // asynchronously relative to _final callback.
      state.pendingcb++;
      process.nextTick(finish, stream, state);
    }
  });
  if (result !== undefined && result !== null) {
    // ...
  }
  state.sync = false;
}

总体看上去 _final 更像是一个结束前的勾子, 没有像 _destroy 直接关闭 fd 那么"沉重", 其实现该接口的流有 net 模块

这里 js 里面 Socket 对象实际操作的是 【libuv 源码学习笔记】网络与流 中提到到 accept 返回的一个连接的 acceptFd, 如下 _final 方法主要是调用了 shutdown 方法。

// lib/net.js

Socket.prototype._final = function(cb) {
  // If still connecting - defer handling `_final` until 'connect' will happen
  if (this.pending) {
    debug('_final: not yet connected');
    return this.once('connect', () => this._final(cb));
  }

  if (!this._handle)
    return cb();

  debug('_final: not ended, call shutdown()');

  const req = new ShutdownWrap();
  req.oncomplete = afterShutdown;
  req.handle = this._handle;
  req.callback = cb;
  const err = this._handle.shutdown(req);

  if (err === 1 || err === UV_ENOTCONN)  // synchronous finish
    return cb();
  else if (err !== 0)
    return cb(errnoException(err, 'shutdown'));
};

shutdown 追溯下去是的调用是在 libuv 中的流的 i/o 观察者回调函数 uv__stream_io 中, 如下当写入队列未空时调用 uv__drain 方法

// deps/uv/src/unix/stream.c

static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  uv_stream_t* stream;

  // ...

    /* Write queue drained. */
    if (QUEUE_EMPTY(&stream->write_queue))
      uv__drain(stream);
  }
}

uv__drain 主要是调用了 shutdown(2) - Linux man page 方法, 用于关闭部分全双工连接, 如当前传入 SHUT_WR 即关闭写端, 表示服务端降不会再发送数据。

shutdown() 调用导致全部或部分全双工 与要关闭的 sockfd 关联的套接字上的连接。 如果如何是 SHUT_RD,将不允许进一步接收。如果如何 是 SHUT_WR,将不允许进一步传输。如果怎么样 SHUT_RDWR,进一步的接收和传输将是 不允许。

// deps/uv/src/unix/stream.c

static void uv__drain(uv_stream_t* stream) {
  uv_shutdown_t* req;
  int err;

  assert(QUEUE_EMPTY(&stream->write_queue));
  uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
  uv__stream_osx_interrupt_select(stream);

  /* Shutdown? */
  if ((stream->flags & UV_HANDLE_SHUTTING) &&
      !(stream->flags & UV_HANDLE_CLOSING) &&
      !(stream->flags & UV_HANDLE_SHUT)) {
    assert(stream->shutdown_req);

    req = stream->shutdown_req;
    stream->shutdown_req = NULL;
    stream->flags &= ~UV_HANDLE_SHUTTING;
    uv__req_unregister(stream->loop, req);

    err = 0;
    if (shutdown(uv__stream_fd(stream), SHUT_WR))
      err = UV__ERR(errno);

    if (err == 0)
      stream->flags |= UV_HANDLE_SHUT;

    if (req->cb != NULL)
      req->cb(req, err);
  }
}

以及实现 _final 接口的 【node 源码学习笔记】stream 双工流、转换流、eos、pipeline 中提到的 Transform 流,其 _final 方法是主要调用了 flush 方法将缓冲区中的数据强制写出

2.4. end 流结束

通常可写流会调用 end 方法表示流写入工作完成, 如 http server 的 res.end() 调用,end 方法可以传入数据进行最后一次的数据写入工作,后开始结束流程。如果你只有一份数据,其实也可仅调用一次 end 方法,即不用单独调用 write 方法

Writable.prototype.end 方法的结束流程主要是调用了如下的 finish 方法

  • state.errorEmitted || state.closeEmitted: 如果此前调用过 emit('error') 或者 emit('close'), 直接返回,因为流已经非正常状态结束了
  • state.finished: finished 状态标示为 true
  • stream.emit('finish'): 发布 finish 事件
  • stream.destroy(): 调用 destroy 方法, 进行关闭 fd 或者内存回收等操作
// lib/internal/streams/writable.js

function finish(stream, state) {
  state.pendingcb--;
  // TODO (ronag): Unify with needFinish.
  if (state.errorEmitted || state.closeEmitted)
    return;

  state.finished = true;

  const onfinishCallbacks = state[kOnFinished].splice(0);
  for (let i = 0; i < onfinishCallbacks.length; i++) {
    onfinishCallbacks[i]();
  }

  stream.emit('finish');

  if (state.autoDestroy) {
    // In case of duplex streams we need a way to detect
    // if the readable side is ready for autoDestroy as well.
    const rState = stream._readableState;
    const autoDestroy = !rState || (
      rState.autoDestroy &&
      // We don't expect the readable to ever 'end'
      // if readable is explicitly set to false.
      (rState.endEmitted || rState.readable === false)
    );
    if (autoDestroy) {
      stream.destroy();
    }
  }
}

3. 小结

本文主要讲了可写流基类 Writable 的实现以及 fs.WriteStream 可写流的实现。

@xiaoxiaojx xiaoxiaojx added the Node.js Node.js® is a JavaScript runtime built on Chrome's V8 JavaScript engine. label Jul 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Node.js Node.js® is a JavaScript runtime built on Chrome's V8 JavaScript engine.
Projects
None yet
Development

No branches or pull requests

1 participant