Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node源码粗读(12):通过net.createConnection来看socket的Event Emitter的实现 #26

Open
xtx1130 opened this issue Jun 21, 2018 · 1 comment

Comments

@xtx1130
Copy link
Owner

xtx1130 commented Jun 21, 2018

这篇文章主要从net.createConnection入手,详细讲解socket常用的connet、data、error、close等事件是如何实现的。

socket的创建

相信用过net.createConnection都比较了解了。这个API会创建一个客户端的socket链接:

net.createConnection(options[, connectListener])

其中connectListener将被添加为返回 socket 上的 'connect' 事件上的监听器。
简单了解了API,我们直奔./lib/net.js来看一下是如何实现的:

// ./lib/net.js connect构造函数
function connect(...args) {
  var normalized = normalizeArgs(args);
  var options = normalized[0];
  // ...
  var socket = new Socket(options);
  // ...
  return Socket.prototype.connect.call(socket, normalized);
}

首先我们关注一下new Socket(options)的实现:

const {
  TCP,
  TCPConnectWrap,
  constants: TCPConstants
} = process.binding('tcp_wrap');
// ... socket构造函数
function Socket(options) {
  // ...
  this._hadError = false;
  this._handle = null;
  this._parent = null;
  this._host = null;
  this[kLastWriteQueueSize] = 0;
  this[kTimeout] = null;

  // ...
  stream.Duplex.call(this, options);
  this.allowHalfOpen = Boolean(allowHalfOpen);

  if (options.handle) {
    this._handle = options.handle; // private
    this[async_id_symbol] = getNewAsyncId(this._handle);
  } else if (options.fd !== undefined) {
    const { fd } = options;
    this._handle = createHandle(fd, false);
    this._handle.open(fd);
    this[async_id_symbol] = this._handle.getAsyncId();
    // ...
  }
  initSocketHandle(this);
  // ...
  }
 // ...
}
util.inherits(Socket, stream.Duplex);
// ... createHandle 函数
function createHandle(fd, is_server) {
  const type = TTYWrap.guessHandleType(fd);
  // ...
  if (type === 'TCP') {
    return new TCP(
      is_server ? TCPConstants.SERVER : TCPConstants.SOCKET
    );
  }
  throw new ERR_INVALID_FD_TYPE(type);
}

通过上面这一段代码我们可以得到两个关键的信息:

  • this._handle = createHandle(fd, false);TCP的实例
  • stream.Duplex.call(this, options); socket的实例继承了stream.Duplex的属性和方法
    接下来是connect:
Socket.prototype.connect = function(...args) {
  if (pipe) {
    // ...
  } else {
    lookupAndConnect(this, options);
  }
}
/// connect调用的核心函数
function internalConnect(
  self, address, port, addressType, localAddress, localPort) {
   if (addressType === 6 || addressType === 4) {
    const req = new TCPConnectWrap();
    req.oncomplete = afterConnect;
    req.address = address;
    req.port = port;
    req.localAddress = localAddress;
    req.localPort = localPort;
    if (addressType === 4)
      err = self._handle.connect(req, address, port);
    // ...
  } else {
    // ...
  }
}

里面需要注意的地方:

  • self._handle.connect(req, address, port); 这一段是真正的实现socket连接的地方
  • req.oncomplete = afterConnect; 这一段是连接成功之后的回调

this._handle和'connect'事件

在上一节我们知道this._handleTCP的实例,进而我们长驱直入tcp_wrap.cc

void TCPWrap::Initialize(Local<Object> target,
                         Local<Value> unused,
                         Local<Context> context) {
  // ...
  Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
  Local<String> tcpString = FIXED_ONE_BYTE_STRING(env->isolate(), "TCP");
  t->SetClassName(tcpString);
  t->InstanceTemplate()->SetInternalFieldCount(1);
  t->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "reading"),
                             Boolean::New(env->isolate(), false));
  t->InstanceTemplate()->Set(env->owner_string(), Null(env->isolate()));
  t->InstanceTemplate()->Set(env->onread_string(), Null(env->isolate()));
  t->InstanceTemplate()->Set(env->onconnection_string(), Null(env->isolate()));
   // ...
  env->SetProtoMethod(t, "connect", Connect);
  // ...
}
void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {
  // ...
  Environment* env = Environment::GetCurrent(args);

  int type_value = args[0].As<Int32>()->Value();
  TCPWrap::SocketType type = static_cast<TCPWrap::SocketType>(type_value);

  ProviderType provider;
  switch (type) {
    case SOCKET:
      provider = PROVIDER_TCPWRAP;
      break;
    case SERVER:
      provider = PROVIDER_TCPSERVERWRAP;
      break;
    default:
      UNREACHABLE();
  }

  new TCPWrap(env, args.This(), provider);
}

TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
    : ConnectionWrap(env, object, provider) {
  int r = uv_tcp_init(env->event_loop(), &handle_);
  CHECK_EQ(r, 0);  // How do we proxy this error up to javascript?
                   // Suggestion: uv_tcp_init() returns void.
}

通过上面的代码,我们可以清晰地看到TCPWrap向js层抛出的构造函数TCP,重点注意下

env->SetProtoMethod(t, "connect", Connect);

这里对TCP的实例添加了connect方法,意即this._handle.connect调用的为TCPWrap::Connect

void TCPWrap::Connect(const FunctionCallbackInfo<Value>& args) {
  // ...
  if (err == 0) {
    AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
    ConnectWrap* req_wrap =
        new ConnectWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_TCPCONNECTWRAP);
    err = req_wrap->Dispatch(uv_tcp_connect,
                             &wrap->handle_,
                             reinterpret_cast<const sockaddr*>(&addr),
                             AfterConnect);
    // ...
  }
  args.GetReturnValue().Set(err);
}

通过这段代码可以清晰地看到,uv_tcp_connect的cb为AfterConnect,libuv中的逻辑就不做详细介绍了,有兴趣的可以观光一下tcp.cuv__tcp_connect函数,最终会走入到uv__io_poll 的轮回中。我们重点看一下AfterConnect,代码在connection_wrap.cc中:

void ConnectionWrap<WrapType, UVType>::AfterConnect(uv_connect_t* req,
                                                    int status) {
  // ...
  req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
  delete req_wrap;
}

可以看到最终调用的函数为oncomplete,这时又回到了net.js中:

const req = new TCPConnectWrap();
req.oncomplete = afterConnect;

afterConnect的定义中有这样一句:

function afterConnect(status, handle, req, readable, writable) {
  // ...
  if (status === 0) {
    // ...
    self.emit('connect');
    self.emit('ready');
  }
}

最终在这里emit出了connect方法,即连接成功的回调,也就是net.createConnection(options[, connectListener])connectListener函数最终被emit触发的地方。

stream和'data'事件

不知道读者还有没有记得,文中的第一节曾分析过:

stream.Duplex.call(this, options); socket的实例继承了stream.Duplex的属性和方法

没错,'data'事件就是在stream.Duplex中emit的,在上一篇文章中我曾介绍过stream的写入过程,而在这里值stream的读取过程,刚才在分析TCPWrap的时候,在c++中构造js的TCP构造函数时候有这样一句:

t->InstanceTemplate()->Set(env->onread_string(), Null(env->isolate()));

由于socket继承了stream.Duplex,整个的溯源过程就不详细查找了,最终onread的触发在stream_base.cc中:

void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
  Local<Value> argv[] = {
    Integer::New(env->isolate(), nread),
    buf
  };
  // ...
  wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
}

而在js中的触发则在net.js:

function onread(nread, buffer) {
  // ...
  if (nread > 0) {
    // ...
    var ret = self.push(buffer);
    // ...
  }
}

push方法则是从stream继承来的,视线转移到_stream_readable.js中:

Readable.prototype.push = function(chunk, encoding) {
  // ...
  return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
}

readableAddChunk函数的作用是对流不断进行拼接并在过程中进行容错处理:

function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
  // ...
  if (addToFront) {
        if (state.endEmitted)
          stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
        else
          addChunk(stream, state, chunk, true);
      } else if (state.ended) {
        stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
      } else if (state.destroyed) {
        return false;
      } else {
        state.reading = false;
        if (state.decoder && !encoding) {
          chunk = state.decoder.write(chunk);
          if (state.objectMode || chunk.length !== 0)
            addChunk(stream, state, chunk, false);
          else
            maybeReadMore(stream, state);
        } else {
          addChunk(stream, state, chunk, false);
        }
}

在这里我只截取了关键代码,**可以看到在addChunk的同时,如果出错会立刻emit出'error'事件。在上游net.js中还有一些情况会emit 'error'事件,就不做详尽分析了。**接下来我们看下addChunk:

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    state.awaitDrain = 0;
    stream.emit('data', chunk);
  } else {
    // update the buffer info.
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront)
      state.buffer.unshift(chunk);
    else
      state.buffer.push(chunk);

    if (state.needReadable)
      emitReadable(stream);
  }
  maybeReadMore(stream, state);
}

通过 stream.emit('data', chunk);会emit出'data'事件,并且把chunk传入到参数中。

总结

至此,socket的'connect'、'data'、'error'事件便分析完毕了。有意思的是,这三个事件的来源不尽相同:

  • 'connect'是通过TCPWrap类及一系列函数触发的emit
  • 'data'则是和StreamBase类相关
  • 'error'则相当于全程的兜底,不管在哪里出了问题,总会emit出'error'事件。

by 小菜

@xtx1130
Copy link
Owner Author

xtx1130 commented Jun 21, 2018

已更完,最近由于单位很忙,每天都要加班到很晚,所以这篇源码分析的文章来的稍微迟了一些。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant