Skip to content

Commit

Permalink
fix: add Reactor single
Browse files Browse the repository at this point in the history
  • Loading branch information
renfakai committed Aug 6, 2021
1 parent e811ea6 commit 9bf11da
Showing 1 changed file with 121 additions and 0 deletions.
121 changes: 121 additions & 0 deletions src/main/java/reactor/Reactor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Reactor implements Runnable {

final Selector selector;
final ServerSocketChannel serverSocket;

public Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
serverSocket.socket().bind(new InetSocketAddress(800));
SelectionKey register = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
register.attach(new Acceptor());
}

@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
dispatcher(iterator.next());
}
selectionKeys.clear();
}
} catch (IOException ignore) {
}
}

private void dispatcher(SelectionKey next) {
Runnable attachment = (Runnable) next.attachment();
if (null != attachment) {
attachment.run();
}
}

class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel connection = serverSocket.accept();
if (null != connection) {
new Handler(selector, connection);
}
} catch (IOException ig) {

}
}
}

final class Handler implements Runnable {
final SocketChannel socketChannel;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(4096);
ByteBuffer output = ByteBuffer.allocate(4096);

static final int READING = 0, SENDING = 1;
int state = READING;

public Handler(Selector selector, SocketChannel connection) throws IOException {
socketChannel = connection;
connection.configureBlocking(false);
sk = socketChannel.register(selector, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}

boolean inputIsComplete() {
return false;
}

boolean outputIsComplete() {
return false;
}

void process() {
}

@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ig) {
}
}

void read() throws IOException {
socketChannel.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
}

void send() throws IOException {
socketChannel.write(output);
if (outputIsComplete()) {
sk.channel();
}
}
}
}

0 comments on commit 9bf11da

Please sign in to comment.