Skip to content

NIO Source Code

javahongxi edited this page Jul 31, 2019 · 1 revision

server

ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.socket().setReuseAddress(true);
channel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.isOpen()) {
  System.out.println("Server is running, port:" + channel.socket().getLocalPort());
  if (selector.select(TIMEOUT) == 0) {
    System.out.println("select timeout");
    continue;
  }
  Iterator<SelectionKey> it = selector.selectedKeys().iterator();
  while (it.hasNext()) {
    SelectionKey key = it.next();
    it.remove();
    if (!key.isValid()) {
      continue;
    }
    if (key.isAcceptable()) {
      accept(key);
    } else if (key.isReadable()) {
      read(key);
    }
  }
}

client

SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
boolean isSuccess = channel.connect(new InetSocketAddress(30008));
if (!isSuccess) {
  while (!channel.finishConnect()) {
    System.out.println("Client is connecting...");
  }
}
System.out.println("Client is connected.");

Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_WRITE);
while (selector.isOpen()) {
  System.out.println("client selector is open");
  selector.select();
  System.out.println("client selector select");
  Iterator<SelectionKey> it = selector.selectedKeys().iterator();
  while (it.hasNext()) {
    SelectionKey key = it.next();
    it.remove();
    if (!key.isValid()) {
      continue;
    }
    if (key.isWritable()) {
      write();
    }
  }
}

要点

  • 对于ServerSocketChannel/SocketChannel,Selector是可选的,当Channel+Selector组合使用时,必须设定blocking=false
  • server是nio,client可以是nio/io等,如tomcat默认底层是一个 nio server,浏览器的请求显然是io而不是nio
  • 一般Acceptor不用注册Channel到Selector,且Acceptor设为阻塞模式,将收到的Channel注册到Selector即可(读写事件),最后处理业务逻辑交给线程池
  • 一个Channel支持同时注册到多个Selector,比如读事件注册到一个Selector,写事件注册到另一个Selector
  • ServerSocketChannel/SocketChannel类似于ServerSocket/Socket,但真正实现不是用的ServerSocket/Socket,只是利用Adaptor模式借用了ServerSocket/Socket的方法定义。ServerSocketAdaptor extends ServerSocket, SocketAdaptor extends Socket
  • ServerSocketChannel/SocketChannel和Selector均由SelectorProvider.provider().openXxx()返回,其中provider对象均与操作系统有关,即不同的操作系统返回的provider不同,Mac OS 返回的是KQueueSelectorImpl,Windows返回的是WindowsSelectorImpl
  • Windows下利用两个本地socket模拟的pipe,linux下直接用的操作系统的pipe
    KQueueSelectorImpl(SelectorProvider var1) {
        super(var1);
        long var2 = IOUtil.makePipe(false);
        // 高32位存放的是Pipe管道的读端的文件描述符
        this.fd0 = (int)(var2 >>> 32);
        // 低32位存放的是Pipe管道的写端的文件描述符
        this.fd1 = (int)var2;

        try {
            // KQueueArrayWrapper是selector的核心,里面大都是native方法
            this.kqueueWrapper = new KQueueArrayWrapper();
            this.kqueueWrapper.initInterrupt(this.fd0, this.fd1);
            this.fdMap = new HashMap();
            this.totalChannels = 1;
        } catch (Throwable var8) {
            //
    }

首页

Java核心技术

Netty

RocketMQ深入研究

kafka深入研究

Pulsar深入研究

Dubbo源码导读

微服务架构

Redis

Elasticsearch

其他

杂谈

关于我

Clone this wiki locally