Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node源码粗读(10):通过fs.write来看异步I/O和回调执行的整体流程 #23

Open
xtx1130 opened this issue Apr 4, 2018 · 0 comments

Comments

@xtx1130
Copy link
Owner

xtx1130 commented Apr 4, 2018

这篇文章主要从fs.write入手,简单讲述node中写文件同步与异步的实现以及详细解释异步I/O的回调如何通过AsyncWrap串起来nextTickMicroTasks

昨天有个朋友问我:

读源码什么都不懂,无从入手该怎么办?

我感觉透过现象来看本质是一个很好的入手方向。这篇文章就从我们熟悉的fs.writefs.writeSync入手,透过这些简单的API,来看看node究竟在里面做了什么。

fs.write和fs.writeSync

相信有一些node基础的开发者或者之前读过我的文章的读者都会知道console.log是基于process.stdout.write实现的,意即console.log是异步操作(可能有人会提出疑问:既然是异步,如何保证输出是正确的?请自己移步./lib/console.js查看)。所以如果我们想要调试node源码异步回调的时候,如果使用console.log会造成递归,这种情况下一般都会使用fs.writeSync。在js层面的源码中,fs.writefs.writeSync在调用的时候其实只差了一个参数:

// ./lib/fs.js
fs.write = function(fd, buffer, offset, length, position, callback) {
  function wrapper(err, written) {
    // Retain a reference to buffer so that it can't be GC'ed too soon.
    callback(err, written || 0, buffer);
  }
  // ...
  const req = new FSReqWrap();
  req.oncomplete = wrapper;

  if (isUint8Array(buffer)) {
   // ...
    return binding.writeBuffer(fd, buffer, offset, length, position, req); //注意这里
  }
  // ...
  return binding.writeString(fd, buffer, offset, length, req); //注意这里
};
fs.writeSync = function(fd, buffer, offset, length, position) {
  validateUint32(fd, 'fd');
  const ctx = {};
  let result;
  if (isUint8Array(buffer)) {
    // ...
    result = binding.writeBuffer(fd, buffer, offset, length, position,
                                 undefined, ctx); //注意这里
  } else {
    // ...
    result = binding.writeString(fd, buffer, offset, length,
                                 undefined, ctx); //注意这里
  }
  handleErrorFromBinding(ctx);
  return result;
};

通过对比可以发现,在调用writeBuffer或者writeString的时候,fs.write多了一个req的参数,而fs.writeSync拥有ctx参数。接下来我们去node_file.cc看看这两个到底区别在哪里。在这里我们以writeBuffer为例:

static void WriteString(const FunctionCallbackInfo<Value>& args) {
  // ...
  FSReqBase* req_wrap = GetReqWrap(env, args[4]);
  const bool is_async = req_wrap != nullptr;
  // ...
  if (is_async) {  // write(fd, string, pos, enc, req)
    CHECK_NE(req_wrap, nullptr);
    len = StringBytes::StorageSize(env->isolate(), value, enc);
    FSReqBase::FSReqBuffer& stack_buffer =
        req_wrap->Init("write", len, enc);
    // ...
    int err = uv_fs_write(env->event_loop(), req_wrap->req(),
                          fd, &uvbuf, 1, pos, AfterInteger); //注意这里
    req_wrap->Dispatched();
    if (err < 0) {
     // ...
    } else {
      req_wrap->SetReturnValue(args);
    }
  } else { 
    CHECK_EQ(argc, 6);
    fs_req_wrap req_wrap;
    // ...
    uv_buf_t uvbuf = uv_buf_init(buf, len);
    int bytesWritten = SyncCall(env, args[5], &req_wrap, "write",
                                uv_fs_write, fd, &uvbuf, 1, pos); //注意这里
    args.GetReturnValue().Set(bytesWritten);
  }
}

在这里我们可以很明显的开出来其中用一个if...else把同步和异步的逻辑区分开了。async直接调用uv_fs_write而sync则调用SyncCall()。当时我看到这里的时候还有些许错愕,因为node_file.cc中提供了AsyncCall()方法,单独这个API没有使用,于是我翻到了这个pr,是为了防止内存泄漏,所以才进行的单独的处理,具体信息可以去pr中了解。
对比一下uv_fs_write和下面的SyncCall,可以发现uv_fs_write多出了AfterInteger这个参数,AfterInteger定义如下:

void AfterInteger(uv_fs_t* req) {
  FSReqBase* req_wrap = static_cast<FSReqBase*>(req->data);
  FSReqAfterScope after(req_wrap, req);

  if (after.Proceed())
    req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));
}

其中after.Processd()定义如下:

bool FSReqAfterScope::Proceed() {
  if (req_->result < 0) {
    Reject(req_);
    return false;
  }
  return true;
}

根据函数中的if判断以及req_wrap->Resolve很明显可以看出来这是一个回调函数,经过比对可以发现uv_fs_write多出的AfterInteger其实是一个函数。req_wrap会在下面进行介绍,我们现在重点先关注一下uv_fs_write

uv_fs_write

视线转移到libuv中的fs.c文件中,其中定义了uv_fs_write,这里只摘抄重点的部分进行解读:

#define INIT(subtype)                                                         \
  do {                                                                        \
    if (req == NULL)                                                          \
      return UV_EINVAL;                                                       \
    UV_REQ_INIT(req, UV_FS);                                                  \
    req->fs_type = UV_FS_ ## subtype;                                         \
    req->result = 0;                                                          \
    req->ptr = NULL;                                                          \
    req->loop = loop;                                                         \
    req->path = NULL;                                                         \
    req->new_path = NULL;                                                     \
    req->bufs = NULL;                                                         \
    req->cb = cb;                                                             \
  }                                                                           \
  while (0)

#define POST                                                                  \
  do {                                                                        \
    if (cb != NULL) {                                                         \
      uv__req_register(loop, req);                                            \
      uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done);        \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      uv__fs_work(&req->work_req);                                            \
      return req->result;                                                     \
    }                                                                         \
  }                                                                           \
  while (0)
int uv_fs_write(uv_loop_t* loop,
                uv_fs_t* req,
                uv_file file,
                const uv_buf_t bufs[],
                unsigned int nbufs,
                int64_t off,
                uv_fs_cb cb) {
  INIT(WRITE);
  // ...
  POST;
}

根据定义可以发现,上文中的AfterInteger其实是作为cb参数传入到了uv_fs_write中。接下来注意一下POST宏:

 if (cb != NULL) {                                                         \
      uv__req_register(loop, req);                                            \
      uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done);        \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      uv__fs_work(&req->work_req);                                            \
      return req->result;                                                     \
    }    

在这里对cb进行了判断,如果无cb则直接调用uv__fs_work,如果有cb则会把uv__fs_work放到thread_pool中调用以形成异步I/O。libuv通过这种方式,实现了同步和异步。

异步的回调和同步的返回

同步fs.writeSync的返回

首先我们先关注一下同步的fs.writeSync的返回,视线回到node_file.cc中,关于同步的返回在这里:

int bytesWritten = SyncCall(env, args[5], &req_wrap, "write",
                                uv_fs_write, fd, &uvbuf, 1, pos);
    args.GetReturnValue().Set(bytesWritten);

可以很容易的看到,fs.writeSync的返回为写入的字节数。

异步fs.write的回调

在刚才已经介绍过fs.write会把AfterInteger作为cb传入到uv_fs_write中,在事件循环开始之后会在poll阶段执行AfterInteger回调。而AfterIntenger中最终执行的是:

req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));

Resolve代码如下:

void FSReqWrap::Resolve(Local<Value> value) {
  Local<Value> argv[2] {
    Null(env()->isolate()),
    value
  };
  MakeCallback(env()->oncomplete_string(), arraysize(argv), argv);
}

可以看到最终执行的是MakeCallback,如果读过我之前文章的读者,很容易联想到之前timers API的MakeCallback。这两个MakeCallback其实还是有区别的,区别就在于--FSReqWrap继承自AsyncWrap,而这个MakeCallback的声明在async_wrap-inl.h:

inline v8::MaybeLocal<v8::Value> AsyncWrap::MakeCallback(
    const v8::Local<v8::String> symbol,
    int argc,
    v8::Local<v8::Value>* argv) {
  v8::Local<v8::Value> cb_v = object()->Get(symbol);
  CHECK(cb_v->IsFunction());
  return MakeCallback(cb_v.As<v8::Function>(), argc, argv);
}

最终调用的 return MakeCallback(cb_v.As<v8::Function>(), argc, argv);则在async_wrap.cc中:

MaybeLocal<Value> AsyncWrap::MakeCallback(const Local<Function> cb,
                                          int argc,
                                          Local<Value>* argv) {
  EmitTraceEventBefore();
  async_context context { get_async_id(), get_trigger_async_id() };
  MaybeLocal<Value> ret = InternalMakeCallback(
      env(), object(), cb, argc, argv, context);
  EmitTraceEventAfter();
  return ret;
}

InternalMakeCallback不知道大家还有没有印象,之前的文章中曾经介绍过,在node源码粗读(9):nextTick、timers API、MicroTasks注册到执行全阶段解读event-loop阶段章节,有这样一句话:

也正是InternalMakeCallbackInternalCallbackScope::Close使得libuv和v8紧紧的联系在了一起

没错,AsyncWrap::MakeCallback最终调用的还是node::InternalMakeCallback。殊途同归,最终使得整体形成了一个闭环(ps: InternalCallbackScope::Close会继续调用nextTick以及RunMicrotasks)。

by 小菜

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant