Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

java changes pending for months

  • Loading branch information...
commit 8bf6b82e34809d9ee93dfedfa5fc008151a53a50 1 parent c1d80df
@rep authored
View
4 java/examples/jpwrcall/jpwrtestcli1.java
@@ -54,8 +54,8 @@ public void cb(Object r) {
}
public static void main(String[] args) {
- Node n = new Node("cert_t1.jks");
- Promise p = n.connect("127.0.0.1", 10001);
+ Node n = new Node("client_keystore.jks");
+ Promise p = n.connect("127.0.0.1", 10003);
p.when(new OnConnected(args[0]));
p.except(new OnError());
}
View
2  java/examples/jpwrcall/jpwrtestsrv1.java
@@ -17,7 +17,7 @@ public int mul(int a, int b) {
}
public static void main(String[] args) {
- Node n = new Node("cert_t1.jks");
+ Node n = new Node("server_keystore.jks");
jpwrtestsrv1.m = new Math();
String ref = n.register_object(m, "mathobj");
n.listen("0.0.0.0", 10003);
View
4 java/src/itsecnetback/MessageHandler.java
@@ -1,9 +1,9 @@
package itsecnetback;
-import org.msgpack.MessagePackObject;
+import org.jboss.netty.buffer.ChannelBuffer;
public interface MessageHandler {
- void handle(MessagePackObject msg);
+ void handle(ChannelBuffer msg);
void closed(String cause);
}
View
4 java/src/itsecnetback/MessagePackEncoder.java
@@ -24,6 +24,10 @@ public MessagePackEncoder(int estimatedLength) {
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if(msg instanceof ChannelBuffer)
return msg;
+ if(msg instanceof String) {
+ ChannelBuffer out = ChannelBuffers.copiedBuffer((String)msg, "UTF-8");
+ return out;
+ }
ChannelBufferOutputStream out = new ChannelBufferOutputStream(
ChannelBuffers.dynamicBuffer(
View
19 java/src/itsecnetback/NetBack.java
@@ -14,6 +14,7 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -113,19 +114,9 @@ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- System.out.println("messageReveiced " + e.toString());
- Object m = e.getMessage();
- if(!(m instanceof MessagePackObject)) {
- ctx.sendUpstream(e);
- return;
- }
-
- MessagePackObject msg = (MessagePackObject)m;
- System.out.println(" -> is MessagePackObject " + msg.toString());
- if (handler == null)
- e.getChannel().close();
- else
- handler.handle(msg);
+ ChannelBuffer m = (ChannelBuffer)e.getMessage();
+ if (handler == null) e.getChannel().close();
+ else handler.handle(m);
}
@Override
@@ -157,7 +148,7 @@ public ChannelPipeline getPipeline() throws Exception {
configureEngine(engine);
pipeline.addLast("ssl", new SslHandler(engine));
- pipeline.addLast("msgpack-decode-stream", new MessagePackStreamDecoder());
+ //pipeline.addLast("msgpack-decode-stream", new MessagePackStreamDecoder());
pipeline.addLast("msgpack-encode", new MessagePackEncoder());
pipeline.addLast("message", new NetBackHandler());
return pipeline;
View
100 java/src/jpwrcall/RPCConnection.java
@@ -7,18 +7,23 @@
import java.util.HashMap;
import java.util.AbstractMap;
import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.io.IOException;
import java.security.cert.Certificate;
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.handler.ssl.SslHandler;
import org.msgpack.MessagePackObject;
import org.msgpack.MessagePackable;
import org.msgpack.Packer;
+import org.msgpack.Unpacker;
import itsecnetback.NetBack;
import itsecnetback.MessageHandler;
+
public class RPCConnection implements MessageHandler {
private final Channel chan;
private Certificate cert;
@@ -26,7 +31,11 @@
private Node n;
private AbstractMap<Integer, Promise> out_requests = new HashMap<Integer, Promise>();
private int last_msgid = 0;
+ private boolean negotiated = false;
+ private String remote_info;
+ private Unpacker pac = new Unpacker();
+ public static Charset utf8charset = Charset.forName("UTF-8");
public static final int RPC_REQUEST = 0;
public static final int RPC_RESPONSE = 1;
public static final int RPC_NOTIFY = 2;
@@ -92,7 +101,7 @@ public RPCConnection(Channel chan, Node n) {
this.chan = chan;
this.n = n;
//System.out.println("sending msg");
- //this.chan.write(new BootstrapMessage("foobarstr"));
+ this.chan.write("pwrcall java - caps: msgpack\n");
try {
this.cert = ((SslHandler)(chan.getPipeline().get("ssl"))).getEngine().getSession().getPeerCertificates()[0];
@@ -103,44 +112,57 @@ public RPCConnection(Channel chan, Node n) {
}
}
- public void handle(MessagePackObject msg) {
- if (!msg.isArrayType()) { logclose("message not array"); return; }
- MessagePackObject[] array = msg.asArray();
- if (array.length < 1 || !array[0].isIntegerType()) { logclose("message opcode not int"); return; }
-
- int opcode = array[0].asInt();
- System.out.println("handle called " + array.length + " opcode " + opcode);
-
- switch (opcode) {
- case RPC_REQUEST: {
- int msgid = array[1].asInt();
- String ref = array[2].asString();
- String method = array[3].asString();
- MessagePackObject[] params = array[4].asArray();
- handle_request(msgid, ref, method, params);
- break;
- }
- case RPC_RESPONSE: {
- int msgid = array[1].asInt();
- MessagePackObject error = array[2];
- MessagePackObject result = array[3];
- handle_response(msgid, error, result);
- break;
- }
- case RPC_NOTIFY: {
- String ref = array[1].asString();
- String method = array[2].asString();
- MessagePackObject[] params = array[3].asArray();
- handle_notify(ref, method, params);
- break;
- }
- case RPC_BOOTSTRAP: {
- MessagePackObject obj = array[1];
- handle_bootstrap(obj);
- break;
- }
- default: {
- logclose("unknown opcode: " + opcode);
+ public void handle(ChannelBuffer msg) {
+ if (!negotiated) {
+ remote_info = msg.toString(utf8charset);
+ System.out.println("remote banner: " + remote_info);
+ negotiated = true;
+ return;
+ }
+
+ ByteBuffer buffer = msg.toByteBuffer();
+ pac.feed(buffer);
+
+ for(MessagePackObject obj : pac) {
+ System.out.println("found msgpack object " + obj.toString());
+ if (!obj.isArrayType()) { logclose("message not array"); return; }
+ MessagePackObject[] array = obj.asArray();
+ if (array.length < 1 || !array[0].isIntegerType()) { logclose("message opcode not int"); return; }
+
+ int opcode = array[0].asInt();
+ System.out.println("handle called " + array.length + " opcode " + opcode);
+
+ switch (opcode) {
+ case RPC_REQUEST: {
+ int msgid = array[1].asInt();
+ String ref = array[2].asString();
+ String method = array[3].asString();
+ MessagePackObject[] params = array[4].asArray();
+ handle_request(msgid, ref, method, params);
+ break;
+ }
+ case RPC_RESPONSE: {
+ int msgid = array[1].asInt();
+ MessagePackObject error = array[2];
+ MessagePackObject result = array[3];
+ handle_response(msgid, error, result);
+ break;
+ }
+ case RPC_NOTIFY: {
+ String ref = array[1].asString();
+ String method = array[2].asString();
+ MessagePackObject[] params = array[3].asArray();
+ handle_notify(ref, method, params);
+ break;
+ }
+ case RPC_BOOTSTRAP: {
+ MessagePackObject obj2 = array[1];
+ handle_bootstrap(obj2);
+ break;
+ }
+ default: {
+ logclose("unknown opcode: " + opcode);
+ }
}
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.