Skip to content

Commit

Permalink
所有的网络读写操作都由Nio-Event-Loop线程负责
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 10, 2018
1 parent b048334 commit 4d02022
Show file tree
Hide file tree
Showing 8 changed files with 417 additions and 157 deletions.
Expand Up @@ -94,6 +94,7 @@ private ResultSet executeQuery(String sql, AsyncHandler<AsyncResult<ResultSet>>
AsyncHandler<AsyncResult<Result>> h = new AsyncHandler<AsyncResult<Result>>() {
@Override
public void handle(AsyncResult<Result> ar) {
JdbcResultSet resultSet = null;
if (ar.isSucceeded()) {
Result r = ar.getResult();
resultSet = new JdbcResultSet(conn, JdbcStatement.this, r, id, closedByResultSet,
Expand Down
Expand Up @@ -44,8 +44,13 @@ public void write(byte b[], int off, int len) {

@Override
public void flush() throws IOException {
writableChannel.write(buffer);
NetBuffer old = buffer;
reset();
writableChannel.write(old);
// 警告: 不能像下面这样用,调用write后会很快写数据到接收端,然后另一个线程很快又收到响应,
// 在调用reset前又继续用原来的buffer写,从而导致产生非常难找的协议与并发问题,我就为这个问题痛苦排查过大半天。
// writableChannel.write(buffer);
// reset();
}

protected void reset() {
Expand Down
44 changes: 44 additions & 0 deletions lealone-net/src/main/java/org/lealone/net/nio/NioEventLoop.java
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.net.nio;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

public interface NioEventLoop {

void select(long timeout) throws IOException;

void register(SocketChannel channel, int ops, Object att) throws ClosedChannelException;

void wakeup();

void addSocketChannel(SocketChannel channel);

void addNioBuffer(SocketChannel channel, NioBuffer nioBuffer);

void tryRegisterWriteOperation(Selector selector);

void write(SelectionKey key);

void closeChannel(SocketChannel channel);

}
165 changes: 165 additions & 0 deletions lealone-net/src/main/java/org/lealone/net/nio/NioEventLoopAdapter.java
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.net.nio;

import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class NioEventLoopAdapter implements NioEventLoop {

private final ConcurrentHashMap<SocketChannel, ConcurrentLinkedQueue<ByteBuffer>> channels = new ConcurrentHashMap<>();

private final AtomicBoolean selecting = new AtomicBoolean(false);
private Selector selector;

public NioEventLoopAdapter() {
}

public NioEventLoopAdapter(Selector selector) {
this.selector = selector;
}

public void setSelector(Selector selector) {
this.selector = selector;
}

@Override
public void select(long timeout) throws IOException {
tryRegisterWriteOperation(selector);
if (selecting.compareAndSet(false, true)) {
selector.select(timeout);
selecting.set(false);
}
}

@Override
public void register(SocketChannel channel, int ops, Object att) throws ClosedChannelException {
// 当nio-event-loop线程执行selector.select被阻塞时,代码内部依然会占用publicKeys锁,
// 而另一个线程执行channel.register时,内部也会去要publicKeys锁,从而导致也被阻塞,
// 所以下面这段代码的用处是:
// 只要发现nio-event-loop线程正在进行select,那么就唤醒它,并释放publicKeys锁。
while (true) {
if (selecting.compareAndSet(false, true)) {
channel.register(selector, SelectionKey.OP_CONNECT, att);
selecting.set(false);
selector.wakeup();
break;
} else {
selector.wakeup();
}
}
}

@Override
public void wakeup() {
if (selecting.compareAndSet(true, false)) {
selector.wakeup();
}
}

@Override
public void addSocketChannel(SocketChannel channel) {
ConcurrentLinkedQueue<ByteBuffer> queue = new ConcurrentLinkedQueue<>();
channels.putIfAbsent(channel, queue);
}

@Override
public void addNioBuffer(SocketChannel channel, NioBuffer nioBuffer) {
ConcurrentLinkedQueue<ByteBuffer> queue = channels.get(channel);
if (queue != null) {
ByteBuffer buffer = nioBuffer.getBuffer();
buffer.flip();
queue.add(buffer);
wakeup();
}
}

@Override
public void tryRegisterWriteOperation(Selector selector) {
for (Entry<SocketChannel, ConcurrentLinkedQueue<ByteBuffer>> entry : channels.entrySet()) {
if (!entry.getValue().isEmpty()) {
for (SelectionKey key : selector.keys()) {
if (key.channel() == entry.getKey() && key.isValid()) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
break;
}
}
}
}
}

@Override
public void write(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
try {
Queue<ByteBuffer> queue = channels.get(channel);
outer: for (ByteBuffer buffer : queue) {
// 一定要用while循环来写,否则会丢数据!
while (buffer.hasRemaining()) {
if (channel.write(buffer) <= 0)
break outer;
}
if (!buffer.hasRemaining()) {
queue.remove(buffer);
}
}
if (queue.isEmpty()) {
int ops = key.interestOps();
ops &= ~SelectionKey.OP_WRITE;
key.interestOps(ops);
}
} catch (IOException e) {
closeChannel(channel);
}
}

@Override
public void closeChannel(SocketChannel channel) {
if (channel == null) {
return;
}
for (SelectionKey key : selector.keys()) {
if (key.channel() == channel && key.isValid()) {
key.cancel();
break;
}
}
channels.remove(channel);
Socket socket = channel.socket();
if (socket != null) {
try {
socket.close();
} catch (Exception e) {
}
}
try {
channel.close();
} catch (Exception e) {
}
}
}

0 comments on commit 4d02022

Please sign in to comment.