Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

initial checkin

  • Loading branch information...
commit 84374f132d1edd640648561b82f3f4655eacc263 1 parent 691010f
@tempredirect authored
View
35 pom.xml
@@ -0,0 +1,35 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.logicalpractice.selectorfun</groupId>
+ <artifactId>SelectorFun</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <name>SelectorFun</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>1.6.3</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.10</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
View
239 src/main/java/com/logicalpractice/selectorfun/BroadcastServer.java
@@ -0,0 +1,239 @@
+package com.logicalpractice.selectorfun;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ *
+ */
+public class BroadcastServer {
+
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+
+ private final Logger logger = LoggerFactory.getLogger(BroadcastServer.class);
+
+ private final Selector selector;
+
+ private final BlockingQueue<byte[]> messages = new ArrayBlockingQueue<byte[]>(16);
+
+ private volatile boolean stopping ;
+
+ private Thread thread;
+
+ public BroadcastServer() {
+ try {
+ selector = Selector.open();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void go() throws IOException {
+
+ ServerSocketChannel ssc = ServerSocketChannel.open();
+
+ ssc.configureBlocking(false);
+ ssc.bind(new InetSocketAddress(2001));
+
+ SelectionKey serverKey = ssc.register(selector, SelectionKey.OP_ACCEPT );
+
+ logger.info("listening on port 2001");
+ while(!stopping){
+ selector.select();
+
+ Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+ while(it.hasNext()){
+ SelectionKey selectionKey = it.next();
+ it.remove();
+ if( selectionKey.isAcceptable() ){
+ SocketChannel newClient = ssc.accept();
+ newClient.configureBlocking(false);
+ newClient.register(selector, SelectionKey.OP_READ, new ConnectionState());
+ logger.info("New client connected");
+ }
+ if( selectionKey.isValid() && selectionKey.isWritable() ){
+// logger.info("Writable()");
+ writeAttached(selectionKey);
+ }
+ if( selectionKey.isValid() && selectionKey.isReadable() ){
+ readAttached(selectionKey);
+ }
+ }
+ byte[] message ;
+ while( (message = messages.poll()) != null) {
+ for (SelectionKey selectionKey : selector.keys()) {
+ if( selectionKey != serverKey ){
+ ConnectionState state = (ConnectionState) selectionKey.attachment();
+ state.writeQueue.add(ByteBuffer.wrap(message));
+ selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE); // switch on writes
+ }
+ }
+ }
+ }
+ selector.close();
+ ssc.close();
+ }
+
+ private void readAttached(SelectionKey key) {
+ ConnectionState state = (ConnectionState) key.attachment();
+
+ SocketChannel socket = (SocketChannel) key.channel();
+
+ ByteBuffer readBuffer = state.readBuffer;
+
+ try {
+ int read = socket.read(readBuffer);
+ if( read == -1 ){
+ logger.info("EOF - disconnecting");
+ key.cancel();
+ return ;
+ }
+
+ if( read == 0 ){
+ return;
+ }
+
+ readBuffer.flip();
+ int limit = readBuffer.limit();
+ logger.info("Scanning {} bytes", limit);
+
+ ByteBuffer command = ByteBuffer.allocate(limit);
+
+ int endOfLastCommand = 0;
+ for( int i = 0; i < limit; i ++ ){
+ byte b = readBuffer.get();
+ if( b == '\n' ){
+ // end of command
+ command.flip();
+ logger.info("Complete message from client, [{}]", new String(command.array(),Charset.defaultCharset()));
+ command.clear(); // put it back
+ endOfLastCommand = i;
+ } else {
+ command.put(b);
+ }
+ }
+ readBuffer.clear(); // have to clear cos we flipped it
+
+ if( endOfLastCommand > 0 && endOfLastCommand != (limit - 1) ) {
+ readBuffer.position(endOfLastCommand + 1);
+ readBuffer.compact();
+ readBuffer.position(limit - endOfLastCommand);
+ }
+
+ } catch (IOException e) {
+ logger.info("IOException [{}] - disconnecting", e.getMessage());
+ key.cancel();
+ }
+ }
+
+ private void writeAttached(SelectionKey key) throws IOException {
+ ConnectionState state = (ConnectionState) key.attachment();
+
+ ByteBuffer buff = state.writeQueue.peek();
+
+
+ if( buff == null ){
+ // not interesting in writing anymore
+ key.interestOps(key.interestOps() & ~ SelectionKey.OP_WRITE);
+ return ;
+ }
+
+ if ( buff.hasRemaining() ){
+ try {
+ SocketChannel socket = (SocketChannel) key.channel();
+ socket.write(buff);
+ } catch (IOException e) {
+ logger.info("IOException [{}] - disconnecting", e.getMessage());
+ key.cancel();
+ return ;
+ }
+ }
+ if( buff.remaining() == 0 ){
+ state.writeQueue.remove();
+ }
+ }
+
+ public synchronized void start(){
+ if( this.thread != null ){
+ throw new IllegalStateException("Already started");
+ }
+
+ this.thread = new Thread(){
+ @Override
+ public void run() {
+ try {
+ go();
+ } catch (Exception e) {
+ logger.error("BroadcastServer ending with exception", e);
+ }
+ }
+ };
+ thread.start();
+ }
+
+ public synchronized void stop() throws InterruptedException {
+ stopping = true;
+ selector.wakeup();
+ this.thread.join();
+ }
+
+ public void sendMessage(String message) throws InterruptedException {
+ byte [] bytes = (message + "\n").getBytes(UTF8);
+ messages.put(bytes);
+ selector.wakeup();
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException {
+ BroadcastServer server = new BroadcastServer();
+
+ server.start();
+
+ BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
+
+ String line;
+ while( ( line = br.readLine() ) != null ){
+ switch(line){
+ case "q":
+ server.stop();
+ System.out.println("Server stopped");
+ return;
+ case "s":
+ server.printSummary();
+ default:
+ try {
+ int count = Integer.parseInt(line);
+ for( int i = count; i > 0; i-- ){
+ server.sendMessage("message " + i);
+ }
+ System.out.println("Sent "+ count + " messages");
+ } catch (NumberFormatException e) {
+ System.out.println("Input number of messages");
+ }
+ }
+ }
+ }
+
+ private void printSummary() {
+ System.out.println("number of clients : " + (selector.keys().size() - 1));
+ }
+}
+
+final class ConnectionState {
+ final Queue<ByteBuffer> writeQueue = new LinkedList<ByteBuffer>();
+ ByteBuffer readBuffer = ByteBuffer.allocate(1024) ;
+}
View
31 src/main/python/sendlines.py
@@ -0,0 +1,31 @@
+__author__ = 'gareth'
+import socket
+import time
+
+class MyClient:
+ def __init__(self):
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ def connect(self):
+ self.sock.connect(("localhost", 2001))
+
+ def send(self, message):
+ totalsent = 0
+ msglen = len(message)
+ while totalsent < msglen:
+ sent = self.sock.send(message[totalsent:])
+ if sent == 0:
+ raise RuntimeError("socket connection broken")
+ totalsent = totalsent + sent
+
+cli = MyClient()
+
+cli.connect()
+
+cli.send("Wibble Wobble\n")
+cli.send("Hello world\n")
+cli.send("""This is a multi
+line request""")
+
+time.sleep(1)
+cli.send("Hello world\n")
Please sign in to comment.
Something went wrong with that request. Please try again.