Skip to content

Commit

Permalink
initial commit for nsq netty java client
Browse files Browse the repository at this point in the history
  • Loading branch information
dustismo committed Jan 15, 2013
1 parent 8bd6a10 commit 11c5f35
Show file tree
Hide file tree
Showing 23 changed files with 641 additions and 0 deletions.
14 changes: 14 additions & 0 deletions .classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="lib/async-http-client-1.7.6.jar"/>
<classpathentry kind="lib" path="lib/jcl-over-slf4j-1.7.2.jar"/>
<classpathentry kind="lib" path="lib/log4j-over-slf4j-1.7.2.jar"/>
<classpathentry kind="lib" path="lib/logback-classic-1.0.7.jar"/>
<classpathentry kind="lib" path="lib/logback-core-1.0.7.jar"/>
<classpathentry kind="lib" path="lib/netty-3.6.1.Final.jar"/>
<classpathentry kind="lib" path="lib/slf4j-api-1.6.4.jar"/>
<classpathentry kind="lib" path="lib/trendrr-oss.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
17 changes: 17 additions & 0 deletions .project
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>JavaNSQCLient</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
Binary file added lib/async-http-client-1.7.6.jar
Binary file not shown.
Binary file added lib/jcl-over-slf4j-1.7.2.jar
Binary file not shown.
Binary file added lib/log4j-over-slf4j-1.7.2.jar
Binary file not shown.
Binary file added lib/logback-classic-1.0.7.jar
Binary file not shown.
Binary file added lib/logback-core-1.0.7.jar
Binary file not shown.
Binary file added lib/netty-3.6.1.Final.jar
Binary file not shown.
Binary file added lib/slf4j-api-1.6.4.jar
Binary file not shown.
Binary file added lib/trendrr-oss.jar
Binary file not shown.
53 changes: 53 additions & 0 deletions src/Connection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
*
*/

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;

import commands.NSQCommand;

import frames.NSQFrame;


/**
* @author Dustin Norlander
* @created Jan 14, 2013
*
*/
public class Connection {

protected static Log log = LogFactory.getLog(Connection.class);

Channel channel;

public Connection(Channel channel) {
this.channel = channel;
this.channel.setAttachment(this);
}


public void incoming(NSQFrame frame) {
//incoming message, give back to whoever needs it.
System.out.println("GOT FRAME! " + frame);
}

void disconnected() {
//clean up anything that needs cleaning up.
}

public void close() {
try {
channel.close().await(10000);
} catch (Exception x) {
log.error("Caught", x);
}
this.disconnected();
}

public ChannelFuture command(NSQCommand command) {
return this.channel.write(command);
}
}
88 changes: 88 additions & 0 deletions src/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
*
*/

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

import commands.NSQCommand;


/**
* @author Dustin Norlander
* @created Jan 14, 2013
*
*/
public class Main {

protected static Log log = LogFactory.getLog(Main.class);

/**
* @param args
*/
public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {




//example lookup (works)
// NSQLookup lookup = new NSQLookup();
// lookup.addAddr("localhost", 4161);
// System.out.println(lookup.lookup("test"));


//netty connection

// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new NSQPipeline());

// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 4150));

// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
bootstrap.releaseExternalResources();
return;
}


Connection conn = new Connection(channel);

//identify with protocol version.
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();

//uhh, WTF is this?
// docs say protocol = v2, but this is what the official clients send.
buf.writeInt(538990130);
channel.write(buf);



NSQCommand command = new NSQCommand();
command.setLine("IDENTIFY");
command.addBytes("{\"short_id\":\"dustin-box\", \"long_id\":\"dustin-long-id\"}".getBytes("utf8"));
conn.command(command);
//

}

}
20 changes: 20 additions & 0 deletions src/NSQClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
*
*/

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


/**
* @author Dustin Norlander
* @created Jan 14, 2013
*
*/
public class NSQClient {

protected static Log log = LogFactory.getLog(NSQClient.class);



}
83 changes: 83 additions & 0 deletions src/NSQDecoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
*
*/

import java.nio.channels.Channel;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
import org.jboss.netty.handler.codec.replay.VoidEnum;

import frames.NSQFrame;


/**
* @author Dustin Norlander
* @created Jan 14, 2013
*
*/
public class NSQDecoder extends ReplayingDecoder<NSQDecoder.MyDecoderState>{

protected static Log log = LogFactory.getLog(NSQDecoder.class);

public static enum MyDecoderState {
READ_SIZE,
READ_FRAME_ID,
READ_DATA;
};

private int size;
private NSQFrame frame;

public NSQDecoder() {
// Set the initial state.
super(MyDecoderState.READ_SIZE);
}

/* (non-Javadoc)
* @see org.jboss.netty.handler.codec.replay.ReplayingDecoder#decode(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.Channel, org.jboss.netty.buffer.ChannelBuffer, java.lang.Enum)
*/
@Override
protected Object decode(ChannelHandlerContext ctx,
org.jboss.netty.channel.Channel channel, ChannelBuffer buf,
MyDecoderState state) throws Exception {
switch (state) {
case READ_SIZE:
size = buf.readInt();
System.out.println("Got size: " + size);
checkpoint(MyDecoderState.READ_FRAME_ID);
break;
case READ_FRAME_ID:
int id = buf.readInt();
System.out.println("GOT ID: " + id);
this.frame = NSQFrame.instance(id);
if (this.frame == null) {
//uhh, bad response from server.. what should we do?
throw new Exception("Bad frame id from server (" + id + "). disconnect!");
}
this.frame.setSize(size);
checkpoint(MyDecoderState.READ_DATA);
break;
case READ_DATA:
ChannelBuffer bytes = buf.readBytes(frame.getSize());
this.frame.setData(bytes.array());
checkpoint(MyDecoderState.READ_SIZE);
return this.frame;
default:
throw new Error("Shouldn't reach here.");
}
return null;
}

// @Override
// protected Object decode(ChannelHandlerContext ctx,
// Channel channel,
// ChannelBuffer buf,
// NSQDecoder.MyDecoderState state) throws Exception {
//
// }

}
46 changes: 46 additions & 0 deletions src/NSQEncoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
*
*/

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

import commands.NSQCommand;


/**
* @author Dustin Norlander
* @created Jan 14, 2013
*
*/
public class NSQEncoder extends OneToOneEncoder {

protected static Log log = LogFactory.getLog(NSQEncoder.class);

/* (non-Javadoc)
* @see org.jboss.netty.handler.codec.oneone.OneToOneEncoder#encode(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.Channel, java.lang.Object)
*/
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object message) throws Exception {
if (!(message instanceof NSQCommand)) {
return message;
}

NSQCommand com = (NSQCommand)message;

ChannelBuffer buf = ChannelBuffers.dynamicBuffer();

buf.writeBytes(com.getLine().getBytes("utf8"));
for (byte[] data : com.getData()) {
buf.writeInt(data.length);
buf.writeBytes(data);
}
return buf;
}
}
62 changes: 62 additions & 0 deletions src/NSQHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
*
*/

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

import frames.NSQFrame;


/**
* @author Dustin Norlander
* @created Jan 14, 2013
*
*/
public class NSQHandler extends SimpleChannelUpstreamHandler {

protected static Log log = LogFactory.getLog(NSQHandler.class);

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NSQFrame frame = (NSQFrame)e.getMessage();

Connection con = (Connection)e.getChannel().getAttachment();
if (con != null) {
con.incoming(frame);
} else {
log.warn("No connection set for : " + e.getChannel());
//TODO: should we kill the channel?
}
}

@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
Connection con = (Connection)e.getChannel().getAttachment();
if (con != null) {
con.disconnected();
} else {
log.warn("No connection set for : " + e.getChannel());
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
log.warn("Caught", e.getCause());
e.getChannel().close();

Connection con = (Connection)e.getChannel().getAttachment();
if (con != null) {
con.disconnected();
} else {
log.warn("No connection set for : " + e.getChannel());
}
}

}
Loading

0 comments on commit 11c5f35

Please sign in to comment.