Permalink
Browse files

initial import

  • Loading branch information...
0 parents commit 265e45d640fdccc6012699af4c35ed08ec66de9d @rep committed May 3, 2011
12 README.md
@@ -0,0 +1,12 @@
+pwrcall
+===========
+A new framework for secure distributed function calls.
+
+## Getting Started
+
+See examples/.
+
+## Learn More
+
+ - [Repository at github](https://github.com/rep/pwrcall)
+
125 c/client.c
@@ -0,0 +1,125 @@
+#include <openssl/bio.h>
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+#include <stdio.h>
+#include <msgpack.h>
+
+int RPC_REQUEST = 0;
+int RPC_RESPONSE = 1;
+
+void write_msgpackobj(BIO* bio, msgpack_sbuffer* buffer) {
+ BIO_write(bio, buffer->data, buffer->size);
+ msgpack_sbuffer_clear(buffer);
+}
+
+void gen_request(msgpack_packer* pk, int msgid, char* object_ref, int a, int b) {
+ /* serializes [RPC_REQUEST, 1, <object_ref>, "add", [15, 13]] */
+ msgpack_pack_array(pk, 5);
+ msgpack_pack_uint64(pk, RPC_REQUEST);
+ msgpack_pack_uint64(pk, msgid);
+ msgpack_pack_raw(pk, strlen(object_ref));
+ msgpack_pack_raw_body(pk, object_ref, strlen(object_ref));
+ msgpack_pack_raw(pk, 3);
+ msgpack_pack_raw_body(pk, "add", 3);
+ msgpack_pack_array(pk, 2);
+ msgpack_pack_uint64(pk, a);
+ msgpack_pack_uint64(pk, b);
+}
+void gen_errormsg(msgpack_packer* pk, int msgid) {
+ /* serializes [RPC_RESPONSE, <msgid>, <errormsg>, null] */
+ msgpack_pack_array(pk, 4);
+ msgpack_pack_uint64(pk, 1);
+ msgpack_pack_uint64(pk, msgid);
+ msgpack_pack_raw(pk, 20);
+ msgpack_pack_raw_body(pk, "No requests allowed.", 20);
+ msgpack_pack_nil(pk);
+}
+
+void pwrcall_client(char * arg1) {
+ SSL_CTX* ctx = SSL_CTX_new(SSLv23_client_method());
+ SSL* ssl;
+ BIO* bio = BIO_new_ssl_connect(ctx);
+ if (bio == NULL) {
+ printf("Error creating BIO!\n");
+ ERR_print_errors_fp(stderr);
+ return;
+ }
+
+ BIO_get_ssl(bio, &ssl);
+ SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);
+ SSL_use_certificate_file(ssl, "cert.pem", SSL_FILETYPE_PEM);
+ SSL_use_PrivateKey_file(ssl, "cert.pem", SSL_FILETYPE_PEM);
+ BIO_set_conn_hostname(bio, "127.0.0.1:10000");
+
+ if (BIO_do_connect(bio) <= 0) {
+ printf("Failed to connect!\n");
+ return;
+ }
+
+ if (BIO_do_handshake(bio) <= 0) {
+ printf("Failed to do SSL handshake!\n");
+ return;
+ }
+
+ // buffer for received data
+ char buf[16384];
+ memset(buf, 0, sizeof(buf));
+
+ // message structure for unpacking
+ msgpack_unpacked msg;
+ msgpack_unpacked_init(&msg);
+ msgpack_sbuffer* buffer = msgpack_sbuffer_new();
+ msgpack_packer* pk = msgpack_packer_new(buffer, msgpack_sbuffer_write);
+
+ // send request
+ gen_request(pk, 1, arg1, 15, 17);
+ write_msgpackobj(bio, buffer);
+
+ while (1) {
+ int x = BIO_read(bio, buf, sizeof(buf) - 1);
+ if (x == 0) break;
+ else if (x < 0) {
+ if (!BIO_should_retry(bio)) {
+ printf("Read Failed!\n");
+ return;
+ }
+ } else {
+ bool success = msgpack_unpack_next(&msg, buf, x, NULL);
+ if (!success) {
+ printf("msgpack_unpack failed!\n");
+ return;
+ }
+ msgpack_object o = msg.data;
+ if (o.via.array.ptr->via.u64 == 0) {
+ // we don't handle requests
+ gen_errormsg(pk, (o.via.array.ptr+1)->via.u64);
+ write_msgpackobj(bio, buffer);
+ } else if (o.via.array.ptr->via.u64 == 1) {
+ // only the add call response should be received
+ printf("call response: %lu\n", (o.via.array.ptr+3)->via.u64);
+ break;
+ }
+ }
+ fflush(stdout);
+ }
+ fflush(stdout);
+
+ // cleanup
+ msgpack_sbuffer_free(buffer);
+ msgpack_packer_free(pk);
+ msgpack_unpacked_destroy(&msg);
+ BIO_free_all(bio);
+ SSL_CTX_free(ctx);
+ return;
+}
+
+int main(int argc, char** argv) {
+ CRYPTO_malloc_init();
+ SSL_library_init();
+ SSL_load_error_strings();
+ ERR_load_BIO_strings();
+ OpenSSL_add_all_algorithms();
+
+ pwrcall_client(argv[1]);
+ return 0;
+}
25 java/examples/itsecnetback/Frontend.java
@@ -0,0 +1,25 @@
+
+import java.security.cert.X509Certificate;
+import org.jboss.netty.channel.Channel;
+
+import itsecnetback.NetFront;
+import itsecnetback.MessageHandler;
+
+public class Frontend implements NetFront {
+
+ public boolean verify(X509Certificate cert, String digest) {
+ try {
+ cert.checkValidity();
+ } catch (Exception e) {
+ System.err.println("checkValidity fails");
+ return false;
+ }
+ System.out.println("verify called with digest " + digest);
+ return true;
+ }
+
+ public MessageHandler genHandler(Channel chan) {
+ return new TestHandler(chan);
+ }
+}
+
15 java/examples/itsecnetback/Srvtest.java
@@ -0,0 +1,15 @@
+import itsecnetback.Server;
+import itsecnetback.Client;
+
+/**
+ *
+ * @author Mark Schloesser
+ */
+public class Srvtest {
+ public static void main(String[] gs) {
+ Frontend front = new Frontend();
+ Frontend front2 = new Frontend();
+ Server foo = new Server(20001, front, "cert_t1.jks", "secret", "secret");
+ Client foo2 = new Client("127.0.0.1", 20001, front2, "cert_t2.jks", "secret", "secret");
+ }
+}
60 java/examples/itsecnetback/TestHandler.java
@@ -0,0 +1,60 @@
+import java.io.IOException;
+import java.security.cert.Certificate;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.msgpack.MessagePackObject;
+import org.msgpack.MessagePackable;
+import org.msgpack.Packer;
+
+import itsecnetback.NetBack;
+import itsecnetback.MessageHandler;
+
+public class TestHandler implements MessageHandler {
+ private final Channel chan;
+ private Certificate cert;
+ private String certdigest;
+
+ private class DebugMessage implements MessagePackable {
+ private int opcode; private String arg1; private Object arg2;
+ public DebugMessage(int opcode, String arg1, Object arg2) {
+ this.opcode = opcode;
+ this.arg1 = arg1;
+ this.arg2 = arg2;
+ }
+ @Override
+ public void messagePack(Packer pk) throws IOException {
+ pk.packArray(3);
+ pk.packInt(opcode);
+ pk.packString(arg1);
+ pk.pack(arg2);
+ }
+ }
+
+ public TestHandler(Channel chan){
+ this.chan = chan;
+ this.chan.write(new DebugMessage(5, "foobarstr", (Object)null));
+ this.chan.write(new DebugMessage(5, "foobarstr", (Object)null));
+ try {
+ this.cert = ((SslHandler)(chan.getPipeline().get("ssl"))).getEngine().getSession().getPeerCertificates()[0];
+ this.certdigest = NetBack.calcDigest(this.cert.getEncoded());
+ } catch (Exception e) {
+ System.out.println("Could not get peer's certificate from channel. Closing link.");
+ chan.close();
+ }
+ }
+
+ public void handle(MessagePackObject msg) {
+ System.out.println("handle called " + msg.toString());
+ System.out.println("this client peer digest " + this.certdigest);
+ if (msg.isArrayType()) {
+ MessagePackObject[] ma = msg.asArray();
+ for (int i=0; i<ma.length; i++)
+ System.out.println("item " + ma[i].toString());
+ } else
+ System.out.println("item not arraytype :(");
+ }
+
+ public void closed(String cause) {
+ System.out.println("closed, cause: " + cause);
+ }
+}
53 java/examples/jpwrcall/jpwrtestcli1.java
@@ -0,0 +1,53 @@
+import jpwrcall.Node;
+import jpwrcall.Util;
+import jpwrcall.Promise;
+import jpwrcall.RPCConnection;
+
+import org.msgpack.MessagePackObject;
+
+/**
+ *
+ * @author Mark Schloesser
+ */
+public class jpwrtestcli1 {
+ private static class OnResult implements Util.Callback {
+ public void cb(Object r) {
+ System.out.println("on_result " + r.toString());
+ MessagePackObject rm = (MessagePackObject)r;
+ if (rm.isIntegerType())
+ System.out.println("received result: " + rm.asInt());
+ System.exit(0);
+ }
+ }
+
+ private static class OnConnected implements Util.Callback {
+ private String ref;
+
+ public OnConnected(String ref) {
+ this.ref = ref;
+ }
+
+ public void cb(Object r) {
+ System.out.println("on_connected " + r.toString());
+
+ RPCConnection rc = (RPCConnection) r;
+ Promise p = rc.call(this.ref, "add", 25, 75);
+ p.when(new OnResult());
+ p.except(new OnError());
+ }
+ }
+
+ private static class OnError implements Util.Callback {
+ public void cb(Object r) {
+ System.out.println("on_error " + r.toString());
+ }
+ }
+
+ public static void main(String[] args) {
+ Node n = new Node("cert_t1.jks");
+ Promise p = n.connect("127.0.0.1", 10000);
+ p.when(new OnConnected(args[0]));
+ p.except(new OnError());
+ }
+
+}
26 java/examples/jpwrcall/jpwrtestsrv1.java
@@ -0,0 +1,26 @@
+import jpwrcall.Node;
+
+/**
+ *
+ * @author Mark Schloesser
+ */
+public class jpwrtestsrv1 {
+ private static Math m;
+
+ public static class Math {
+ public int add(int a, int b) {
+ return a+b;
+ }
+ public int mul(int a, int b) {
+ return a*b;
+ }
+ }
+
+ public static void main(String[] args) {
+ Node n = new Node("cert_t1.jks");
+ Testsrv1.m = new Math();
+ String ref = n.register_object(m, "mathobj");
+ n.listen("0.0.0.0", 10000);
+ System.out.println("Math srv ready at " + n.refurl(ref));
+ }
+}
26 java/src/itsecnetback/Client.java
@@ -0,0 +1,26 @@
+package itsecnetback;
+
+import javax.net.ssl.SSLEngine;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+public class Client extends NetBack {
+ protected void configureEngine(SSLEngine engine) {
+ engine.setUseClientMode(true);
+ }
+
+ public Client(String hostname, int port, NetFront frontend, String keystorepath, String keystorepassword, String keystoreintegrity) {
+ super(frontend, keystorepath, keystorepassword, keystoreintegrity);
+
+ ClientBootstrap bootstrap = new ClientBootstrap(
+ new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()
+ )
+ );
+ bootstrap.setPipelineFactory(new PipeLineFactory());
+ bootstrap.connect(new InetSocketAddress(hostname, port));
+ }
+}
9 java/src/itsecnetback/MessageHandler.java
@@ -0,0 +1,9 @@
+package itsecnetback;
+
+import org.msgpack.MessagePackObject;
+
+public interface MessageHandler {
+ void handle(MessagePackObject msg);
+ void closed(String cause);
+}
+
39 java/src/itsecnetback/MessagePackEncoder.java
@@ -0,0 +1,39 @@
+package itsecnetback;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+import org.msgpack.MessagePack;
+
+public class MessagePackEncoder extends OneToOneEncoder {
+ private final int estimatedLength;
+
+ public MessagePackEncoder() {
+ this(1024);
+ }
+
+ public MessagePackEncoder(int estimatedLength) {
+ this.estimatedLength = estimatedLength;
+ }
+
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+ if(msg instanceof ChannelBuffer)
+ return msg;
+
+ ChannelBufferOutputStream out = new ChannelBufferOutputStream(
+ ChannelBuffers.dynamicBuffer(
+ estimatedLength,
+ ctx.getChannel().getConfig().getBufferFactory()
+ ));
+
+ MessagePack.pack(out, msg);
+
+ ChannelBuffer result = out.buffer();
+ return result;
+ }
+}
41 java/src/itsecnetback/MessagePackStreamDecoder.java
@@ -0,0 +1,41 @@
+package itsecnetback;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+import org.msgpack.Unpacker;
+import org.msgpack.MessagePackObject;
+
+public class MessagePackStreamDecoder extends FrameDecoder {
+ protected Unpacker pac = new Unpacker();
+
+ public MessagePackStreamDecoder() {
+ super();
+ }
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer source) throws Exception {
+ ByteBuffer buffer = source.toByteBuffer();
+ if(!buffer.hasRemaining())
+ return null;
+
+ byte[] bytes = buffer.array();
+ int offset = buffer.arrayOffset() + buffer.position();
+ int length = buffer.arrayOffset() + buffer.limit();
+
+ int noffset = pac.execute(bytes, offset, length);
+ if(noffset > offset)
+ source.skipBytes(noffset - offset);
+
+ if(pac.isFinished()) {
+ MessagePackObject msg = pac.getData();
+ pac.reset();
+ return msg;
+ } else
+ return null;
+ }
+}
175 java/src/itsecnetback/NetBack.java
@@ -0,0 +1,175 @@
+package itsecnetback;
+
+import java.io.FileInputStream;
+import java.util.logging.Logger;
+import java.security.KeyStore;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateEncodingException;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+import org.msgpack.MessagePackObject;
+
+public abstract class NetBack {
+
+ protected final Logger logger = Logger.getLogger("itsecnetback");
+ protected SSLContext sslctx = null;
+ protected String certpath;
+ protected String kspw;
+ protected String ksipw;
+ protected NetFront frontend;
+
+ public static String getHex( final byte [] raw ) {
+ if ( raw == null ) return null;
+ final StringBuilder hex = new StringBuilder( 2 * raw.length );
+
+ for ( final byte b : raw ) {
+ int v = b & 0xff;
+ if (v<16) hex.append('0');
+ hex.append(Integer.toHexString(v));
+ }
+ return hex.toString();
+ }
+
+ public static String calcDigest(byte[] data) throws NoSuchAlgorithmException {
+ MessageDigest md = MessageDigest.getInstance("SHA-1");
+ md.update(data);
+ return getHex(md.digest());
+ }
+
+ private final TrustManager NetBackTrustMan = new X509TrustManager() {
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+
+ public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ checkTrusted(chain, authType);
+ }
+
+ public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ checkTrusted(chain, authType);
+ }
+
+ private void checkTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ try {
+ if (!frontend.verify(chain[0], calcDigest(chain[0].getEncoded())))
+ throw new CertificateException("Frontend verify() = false");
+ } catch (Exception e) {
+ throw new CertificateException("exception " + e.toString());
+ }
+ }
+ };
+
+ private SSLContext getContext() {
+ if (sslctx == null) {
+ try {
+ KeyStore ks = KeyStore.getInstance("JKS");
+ ks.load(new FileInputStream(certpath), ksipw.toCharArray());
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ kmf.init(ks, kspw.toCharArray());
+ sslctx = SSLContext.getInstance("TLSv1");
+ sslctx.init(kmf.getKeyManagers(), new TrustManager[]{NetBackTrustMan}, null);
+ } catch (Exception e) {
+ throw new Error("Failed to initialize the SSLContext", e);
+ }
+ }
+ return sslctx;
+ }
+
+ private class NetBackHandler extends SimpleChannelUpstreamHandler {
+ private MessageHandler handler = null;
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ final SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
+ ChannelFuture handshakeFuture = sslHandler.handshake();
+ handshakeFuture.addListener(new ReadyWaiter());
+ }
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ handler.closed("channelClosed");
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ System.out.println("messageReveiced " + e.toString());
+ Object m = e.getMessage();
+ if(!(m instanceof MessagePackObject)) {
+ ctx.sendUpstream(e);
+ return;
+ }
+
+ MessagePackObject msg = (MessagePackObject)m;
+ System.out.println(" -> is MessagePackObject " + msg.toString());
+ if (handler == null)
+ e.getChannel().close();
+ else
+ handler.handle(msg);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ logger.warning("Unexpected exception from downstream. " + e.getCause());
+ e.getChannel().close();
+ if (handler != null) {
+ handler.closed(e.getCause().toString());
+ e.getCause().printStackTrace();
+ }
+ }
+
+ private final class ReadyWaiter implements ChannelFutureListener {
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ handler = frontend.genHandler(future.getChannel());
+ } else {
+ future.getChannel().close();
+ }
+ }
+ }
+ }
+
+ protected class PipeLineFactory implements ChannelPipelineFactory {
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+
+ SSLEngine engine = getContext().createSSLEngine();
+ configureEngine(engine);
+
+ pipeline.addLast("ssl", new SslHandler(engine));
+ pipeline.addLast("msgpack-decode-stream", new MessagePackStreamDecoder());
+ pipeline.addLast("msgpack-encode", new MessagePackEncoder());
+ pipeline.addLast("message", new NetBackHandler());
+ return pipeline;
+ }
+ }
+
+ protected abstract void configureEngine(SSLEngine engine);
+
+ public NetBack(NetFront frontend, String keystorepath, String keystorepassword, String keystoreintegrity) {
+ this.frontend = frontend;
+ this.certpath = keystorepath;
+ this.kspw = keystorepassword;
+ this.ksipw = keystoreintegrity;
+ }
+}
10 java/src/itsecnetback/NetFront.java
@@ -0,0 +1,10 @@
+package itsecnetback;
+
+import java.security.cert.X509Certificate;
+import org.jboss.netty.channel.Channel;
+
+public interface NetFront {
+ boolean verify(X509Certificate cert, String digest);
+ MessageHandler genHandler(Channel chan);
+}
+
28 java/src/itsecnetback/Server.java
@@ -0,0 +1,28 @@
+package itsecnetback;
+
+import javax.net.ssl.SSLEngine;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+public class Server extends NetBack {
+ @Override
+ protected void configureEngine(SSLEngine engine) {
+ engine.setUseClientMode(false);
+ engine.setNeedClientAuth(true);
+ }
+
+ public Server(int port, NetFront frontend, String keystorepath, String keystorepassword, String keystoreintegrity) {
+ super(frontend, keystorepath, keystorepassword, keystoreintegrity);
+
+ ServerBootstrap bootstrap = new ServerBootstrap(
+ new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()
+ )
+ );
+ bootstrap.setPipelineFactory(new PipeLineFactory());
+ bootstrap.bind(new InetSocketAddress(port));
+ }
+}
34 java/src/jpwrcall/Frontend.java
@@ -0,0 +1,34 @@
+package jpwrcall;
+
+import java.security.cert.X509Certificate;
+import org.jboss.netty.channel.Channel;
+
+import itsecnetback.NetFront;
+import itsecnetback.MessageHandler;
+
+
+public class Frontend implements NetFront {
+ private Node n;
+
+ public Frontend(Node n) {
+ this.n = n;
+ }
+
+ public boolean verify(X509Certificate cert, String digest) {
+ try {
+ cert.checkValidity();
+ } catch (Exception e) {
+ System.err.println("checkValidity fails");
+ return false;
+ }
+ System.out.println("verify called with digest " + digest);
+ return true;
+ }
+
+ public MessageHandler genHandler(Channel chan) {
+ RPCConnection rc = new RPCConnection(chan, this.n);
+ this.n.conn_cb(rc, chan);
+ return rc;
+ }
+}
+
65 java/src/jpwrcall/Node.java
@@ -0,0 +1,65 @@
+package jpwrcall;
+
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
+import java.util.AbstractMap;
+import org.jboss.netty.channel.Channel;
+import java.net.InetSocketAddress;
+
+import itsecnetback.Server;
+import itsecnetback.Client;
+
+import itsecnetback.NetFront;
+
+/**
+ *
+ * @author Mark Schloesser
+ */
+public class Node {
+ private Frontend f;
+ private String certpath;
+ public AbstractMap<String, WeakReference> exports = new HashMap<String, WeakReference>();
+ public AbstractMap<String, Promise> connection_promises = new HashMap<String, Promise>();
+
+ public Node(String cert) {
+ this.certpath = cert;
+ this.f = new Frontend(this);
+ }
+
+ public void conn_cb(RPCConnection rc, Channel chan) {
+ InetSocketAddress addr = (InetSocketAddress) chan.getRemoteAddress();
+ String addrstr = addr.getAddress().getHostAddress() + ":" + addr.getPort();
+ Promise p = connection_promises.get(addrstr);
+ if (p == null)
+ System.out.println("conn_cb, but can't find promise to fulfill... :(");
+ else
+ p.resolve(rc);
+ }
+
+ public Promise connect(String host, int port) {
+ Promise p = new Promise();
+ Client foo2 = new Client(host, port, this.f, this.certpath, "secret", "secret");
+ connection_promises.put(host + ":" + port, p);
+ return p;
+ }
+
+ public void listen(String host, int port) {
+ Server foo = new Server(port, this.f, this.certpath, "secret", "secret");
+ }
+
+ public String register_object(Object o) {
+ String ref = Util.randstr();
+ return register_object(o, ref);
+ }
+
+ public String register_object(Object o, String ref) {
+ exports.put(ref, new WeakReference(o));
+ return ref;
+ }
+
+
+ public String refurl(String ref) {
+ return "pwrcall://certhash@host:port/" + ref;
+ }
+}
+
22 java/src/jpwrcall/Promise.java
@@ -0,0 +1,22 @@
+package jpwrcall;
+
+public class Promise {
+ private Util.Callback resultcb;
+ private Util.Callback errorcb;
+ private Object result;
+
+ public void when(Util.Callback cb) {
+ this.resultcb = cb;
+ }
+ public void except(Util.Callback cb) {
+ this.errorcb = cb;
+ }
+ public void resolve(Object result) {
+ if (this.resultcb != null)
+ this.resultcb.cb(result);
+ }
+ public void smash(String error) {
+ if (this.errorcb != null)
+ this.errorcb.cb(error);
+ }
+}
288 java/src/jpwrcall/RPCConnection.java
@@ -0,0 +1,288 @@
+package jpwrcall;
+
+import java.lang.Math;
+import java.lang.Class;
+import java.lang.reflect.Type;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.AbstractMap;
+import java.lang.ref.WeakReference;
+
+import java.io.IOException;
+import java.security.cert.Certificate;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.msgpack.MessagePackObject;
+import org.msgpack.MessagePackable;
+import org.msgpack.Packer;
+
+import itsecnetback.NetBack;
+import itsecnetback.MessageHandler;
+
+public class RPCConnection implements MessageHandler {
+ private final Channel chan;
+ private Certificate cert;
+ private String certdigest;
+ private Node n;
+ private AbstractMap<Integer, Promise> out_requests = new HashMap<Integer, Promise>();
+ private int last_msgid = 0;
+
+ public static final int RPC_REQUEST = 0;
+ public static final int RPC_RESPONSE = 1;
+ public static final int RPC_NOTIFY = 2;
+ public static final int RPC_BOOTSTRAP = 3;
+
+ private class BootstrapMessage implements MessagePackable {
+ private String arg1;
+ public BootstrapMessage(String arg1) {
+ this.arg1 = arg1;
+ }
+ @Override
+ public void messagePack(Packer pk) throws IOException {
+ pk.packArray(2);
+ pk.packInt(RPC_BOOTSTRAP);
+ pk.packString(arg1);
+ }
+ }
+
+ private class ResponseMessage implements MessagePackable {
+ private Object arg1;
+ private Object arg2;
+ private Integer msgid;
+ public ResponseMessage(Integer msgid, Object arg1, Object arg2) {
+ this.msgid = msgid;
+ this.arg1 = arg1;
+ this.arg2 = arg2;
+ }
+ @Override
+ public void messagePack(Packer pk) throws IOException {
+ pk.packArray(4);
+ pk.packInt(RPC_RESPONSE);
+ pk.packInt(msgid);
+ pk.pack(arg1);
+ pk.pack(arg2);
+ }
+ }
+
+ private class RequestMessage implements MessagePackable {
+ private String arg1;
+ private String arg2;
+ private Integer msgid;
+ private Object[] params;
+ public RequestMessage(Integer msgid, String arg1, String arg2, Object[] params) {
+ this.msgid = msgid;
+ this.arg1 = arg1;
+ this.arg2 = arg2;
+ this.params = params;
+ }
+ @Override
+ public void messagePack(Packer pk) throws IOException {
+ pk.packArray(5);
+ pk.packInt(RPC_REQUEST);
+ pk.packInt(msgid);
+ pk.packString(arg1);
+ pk.packString(arg2);
+ pk.packArray(params.length);
+ for(Object p : params)
+ pk.pack(p);
+ }
+ }
+
+ public RPCConnection(Channel chan, Node n) {
+ this.chan = chan;
+ this.n = n;
+ //System.out.println("sending msg");
+ //this.chan.write(new BootstrapMessage("foobarstr"));
+
+ try {
+ this.cert = ((SslHandler)(chan.getPipeline().get("ssl"))).getEngine().getSession().getPeerCertificates()[0];
+ this.certdigest = NetBack.calcDigest(this.cert.getEncoded());
+ } catch (Exception e) {
+ System.out.println("Could not get peer's certificate from channel. Closing link.");
+ chan.close();
+ }
+ }
+
+ public void handle(MessagePackObject msg) {
+ if (!msg.isArrayType()) { logclose("message not array"); return; }
+ MessagePackObject[] array = msg.asArray();
+ if (array.length < 1 || !array[0].isIntegerType()) { logclose("message opcode not int"); return; }
+
+ int opcode = array[0].asInt();
+ System.out.println("handle called " + array.length + " opcode " + opcode);
+ if (1*1 == 1) return;
+
+ switch (opcode) {
+ case RPC_REQUEST: {
+ int msgid = array[1].asInt();
+ String ref = array[2].asString();
+ String method = array[3].asString();
+ MessagePackObject[] params = array[4].asArray();
+ handle_request(msgid, ref, method, params);
+ break;
+ }
+ case RPC_RESPONSE: {
+ int msgid = array[1].asInt();
+ MessagePackObject error = array[2];
+ MessagePackObject result = array[3];
+ handle_response(msgid, error, result);
+ break;
+ }
+ case RPC_NOTIFY: {
+ String ref = array[1].asString();
+ String method = array[2].asString();
+ MessagePackObject[] params = array[3].asArray();
+ handle_notify(ref, method, params);
+ break;
+ }
+ case RPC_BOOTSTRAP: {
+ MessagePackObject obj = array[1];
+ handle_bootstrap(obj);
+ break;
+ }
+ default: {
+ logclose("unknown opcode: " + opcode);
+ }
+ }
+ }
+
+ Object lookup(String reference) throws Util.NodeException {
+ WeakReference wr = this.n.exports.get(reference);
+ if (wr == null) throw new Util.NodeException("Invalid object reference used.");
+ Object obj = wr.get();
+ if (obj == null) throw new Util.NodeException("Object has gone away.");
+ return obj;
+ }
+
+ Object invoke(Object obj, String method, MessagePackObject[] params) throws Util.NodeException {
+ Class cls = obj.getClass();
+ Method m = null; Object result;
+ try {
+ //do it in an ugly loop cause getDeclaredMethod somehow screws up
+ Method[] ms = cls.getDeclaredMethods();
+ for(Method cm : ms) {
+ //System.out.println("object has method: " + cm.getName() + ". we want " + method + "." + cm.getName().getClass() + method.getClass());
+ if (cm.getName().equals(method)) {
+ m = cm; break;
+ }
+ }
+ if (m == null) throw new NoSuchMethodException();
+ //m = cls.getDeclaredMethod(method, (Class[])null);
+ } catch (NoSuchMethodException e) {
+ throw new Util.NodeException("Object has no such method: " + method);
+ }
+
+ Type[] types = m.getGenericParameterTypes();
+ Object[] converted = new Object[types.length];
+
+ for (int i=0; i<Math.min(types.length, params.length); i++) {
+ MessagePackObject tmp = params[i];
+ Type type = types[i];
+
+ if (type.equals(boolean.class)) {
+ converted[i] = tmp.asBoolean();
+ } else if(type.equals(byte.class)) {
+ converted[i] = tmp.asByte();
+ } else if(type.equals(short.class)) {
+ converted[i] = tmp.asShort();
+ } else if(type.equals(int.class)) {
+ converted[i] = tmp.asInt();
+ } else if(type.equals(long.class)) {
+ converted[i] = tmp.asLong();
+ } else if(type.equals(float.class)) {
+ converted[i] = tmp.asFloat();
+ } else if(type.equals(double.class)) {
+ converted[i] = tmp.asDouble();
+ } else {
+ // TODO
+ //Template tmpl = TemplateRegistry.lookup(e.getGenericType(), true);
+ //res[i] = new ObjectArgumentEntry(e, tmpl);
+ converted[i] = null;
+ }
+ }
+
+ //System.out.println("params " + params.toString() + ((Object[])params).toString());
+ //if (params.length > 0)
+ // System.out.println("params " + params[0].toString() + ((Object)params[0]).toString());
+
+
+ try {
+ result = m.invoke(obj, converted);
+ } catch (Exception e) {
+ throw new Util.NodeException("Exception: " + e.toString());
+ }
+
+ return result;
+ }
+
+ private void handle_request(int msgid, String ref, String method, MessagePackObject[] params) {
+ //System.out.println("handle_request " + msgid + " " + ref + " " + method);
+ Object obj; Object result;
+
+ try {
+ obj = lookup(ref);
+ } catch (Util.NodeException e) {
+ send_response(msgid, e.toString(), null);
+ return;
+ }
+
+ try {
+ result = invoke(obj, method, params);
+ } catch (Util.NodeException e) {
+ send_response(msgid, e.toString(), null);
+ return;
+ }
+
+ send_response(msgid, null, result);
+ }
+
+ private void handle_notify(String ref, String method, MessagePackObject[] params) {
+ //System.out.println("handle_notify " + ref + " " + method);
+ }
+
+ private void handle_response(int msgid, MessagePackObject error, MessagePackObject result) {
+ //System.out.println("handle_response " + error + " " + result.toString());
+ Promise p = out_requests.get(msgid);
+ if (p == null)
+ System.out.println("could not find promise for this response... :(");
+ else {
+ if (error.isNil())
+ p.resolve(result);
+ else
+ p.smash(error.asString());
+ }
+ }
+
+ private void handle_bootstrap(MessagePackObject obj) {
+ //System.out.println("handle_bootstrap " + obj.asString());
+ }
+
+ private void send_response(int msgid, String error, Object obj) {
+ //System.out.println("send_response " + msgid + " " + error + " " + obj);
+ this.chan.write(new ResponseMessage(msgid, error, obj));
+ }
+
+ public void closed(String cause) {
+ System.out.println("rpc conn closed, cause: " + cause);
+ }
+
+ public void logclose(String cause) {
+ System.out.println("rpc conn will be closed, cause: " + cause);
+ this.chan.close();
+ }
+
+ String gen_cap(Object obj) {
+ return Util.randstr();
+ }
+
+ public Promise call(String ref, String method, Object ... params) {
+ //System.out.println("call " + ref + " " + method + " " + params[0] +" , " + params[1]);
+ this.last_msgid += 1;
+ int callid = this.last_msgid;
+ Promise p = new Promise();
+ out_requests.put(callid, p);
+ this.chan.write(new RequestMessage(callid, ref, method, params));
+ return p;
+ }
+}
+
29 java/src/jpwrcall/Util.java
@@ -0,0 +1,29 @@
+package jpwrcall;
+
+import java.math.BigInteger;
+import java.security.SecureRandom;
+
+public class Util {
+ private static SecureRandom random = new SecureRandom();
+
+ public static String conv_cert(String certpath) {
+ return "foo";
+ }
+
+ public static String randstr() {
+ return new BigInteger(130, random).toString(32);
+ }
+
+ public static interface Callback {
+ void cb(Object result);
+ }
+
+ public static class NodeException extends Exception {
+ public NodeException() {
+ }
+
+ public NodeException(String msg) {
+ super(msg);
+ }
+ }
+}
12 py/README.md
@@ -0,0 +1,12 @@
+pwrcall
+===========
+A new framework for secure distributed function calls.
+
+## Getting Started
+
+See examples/.
+
+## Learn More
+
+ - [Repository at github](https://github.com/rep/pwrcall)
+
485 py/distribute_setup.py
@@ -0,0 +1,485 @@
+#!python
+"""Bootstrap distribute installation
+
+If you want to use setuptools in your package's setup.py, just include this
+file in the same directory with it, and add this to the top of your setup.py::
+
+ from distribute_setup import use_setuptools
+ use_setuptools()
+
+If you want to require a specific version of setuptools, set a download
+mirror, or use an alternate download directory, you can do so by supplying
+the appropriate options to ``use_setuptools()``.
+
+This file can also be run as a script to install or upgrade setuptools.
+"""
+import os
+import sys
+import time
+import fnmatch
+import tempfile
+import tarfile
+from distutils import log
+
+try:
+ from site import USER_SITE
+except ImportError:
+ USER_SITE = None
+
+try:
+ import subprocess
+
+ def _python_cmd(*args):
+ args = (sys.executable,) + args
+ return subprocess.call(args) == 0
+
+except ImportError:
+ # will be used for python 2.3
+ def _python_cmd(*args):
+ args = (sys.executable,) + args
+ # quoting arguments if windows
+ if sys.platform == 'win32':
+ def quote(arg):
+ if ' ' in arg:
+ return '"%s"' % arg
+ return arg
+ args = [quote(arg) for arg in args]
+ return os.spawnl(os.P_WAIT, sys.executable, *args) == 0
+
+DEFAULT_VERSION = "0.6.15"
+DEFAULT_URL = "http://pypi.python.org/packages/source/d/distribute/"
+SETUPTOOLS_FAKED_VERSION = "0.6c11"
+
+SETUPTOOLS_PKG_INFO = """\
+Metadata-Version: 1.0
+Name: setuptools
+Version: %s
+Summary: xxxx
+Home-page: xxx
+Author: xxx
+Author-email: xxx
+License: xxx
+Description: xxx
+""" % SETUPTOOLS_FAKED_VERSION
+
+
+def _install(tarball):
+ # extracting the tarball
+ tmpdir = tempfile.mkdtemp()
+ log.warn('Extracting in %s', tmpdir)
+ old_wd = os.getcwd()
+ try:
+ os.chdir(tmpdir)
+ tar = tarfile.open(tarball)
+ _extractall(tar)
+ tar.close()
+
+ # going in the directory
+ subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
+ os.chdir(subdir)
+ log.warn('Now working in %s', subdir)
+
+ # installing
+ log.warn('Installing Distribute')
+ if not _python_cmd('setup.py', 'install'):
+ log.warn('Something went wrong during the installation.')
+ log.warn('See the error message above.')
+ finally:
+ os.chdir(old_wd)
+
+
+def _build_egg(egg, tarball, to_dir):
+ # extracting the tarball
+ tmpdir = tempfile.mkdtemp()
+ log.warn('Extracting in %s', tmpdir)
+ old_wd = os.getcwd()
+ try:
+ os.chdir(tmpdir)
+ tar = tarfile.open(tarball)
+ _extractall(tar)
+ tar.close()
+
+ # going in the directory
+ subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
+ os.chdir(subdir)
+ log.warn('Now working in %s', subdir)
+
+ # building an egg
+ log.warn('Building a Distribute egg in %s', to_dir)
+ _python_cmd('setup.py', '-q', 'bdist_egg', '--dist-dir', to_dir)
+
+ finally:
+ os.chdir(old_wd)
+ # returning the result
+ log.warn(egg)
+ if not os.path.exists(egg):
+ raise IOError('Could not build the egg.')
+
+
+def _do_download(version, download_base, to_dir, download_delay):
+ egg = os.path.join(to_dir, 'distribute-%s-py%d.%d.egg'
+ % (version, sys.version_info[0], sys.version_info[1]))
+ if not os.path.exists(egg):
+ tarball = download_setuptools(version, download_base,
+ to_dir, download_delay)
+ _build_egg(egg, tarball, to_dir)
+ sys.path.insert(0, egg)
+ import setuptools
+ setuptools.bootstrap_install_from = egg
+
+
+def use_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
+ to_dir=os.curdir, download_delay=15, no_fake=True):
+ # making sure we use the absolute path
+ to_dir = os.path.abspath(to_dir)
+ was_imported = 'pkg_resources' in sys.modules or \
+ 'setuptools' in sys.modules
+ try:
+ try:
+ import pkg_resources
+ if not hasattr(pkg_resources, '_distribute'):
+ if not no_fake:
+ _fake_setuptools()
+ raise ImportError
+ except ImportError:
+ return _do_download(version, download_base, to_dir, download_delay)
+ try:
+ pkg_resources.require("distribute>="+version)
+ return
+ except pkg_resources.VersionConflict:
+ e = sys.exc_info()[1]
+ if was_imported:
+ sys.stderr.write(
+ "The required version of distribute (>=%s) is not available,\n"
+ "and can't be installed while this script is running. Please\n"
+ "install a more recent version first, using\n"
+ "'easy_install -U distribute'."
+ "\n\n(Currently using %r)\n" % (version, e.args[0]))
+ sys.exit(2)
+ else:
+ del pkg_resources, sys.modules['pkg_resources'] # reload ok
+ return _do_download(version, download_base, to_dir,
+ download_delay)
+ except pkg_resources.DistributionNotFound:
+ return _do_download(version, download_base, to_dir,
+ download_delay)
+ finally:
+ if not no_fake:
+ _create_fake_setuptools_pkg_info(to_dir)
+
+def download_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
+ to_dir=os.curdir, delay=15):
+ """Download distribute from a specified location and return its filename
+
+ `version` should be a valid distribute version number that is available
+ as an egg for download under the `download_base` URL (which should end
+ with a '/'). `to_dir` is the directory where the egg will be downloaded.
+ `delay` is the number of seconds to pause before an actual download
+ attempt.
+ """
+ # making sure we use the absolute path
+ to_dir = os.path.abspath(to_dir)
+ try:
+ from urllib.request import urlopen
+ except ImportError:
+ from urllib2 import urlopen
+ tgz_name = "distribute-%s.tar.gz" % version
+ url = download_base + tgz_name
+ saveto = os.path.join(to_dir, tgz_name)
+ src = dst = None
+ if not os.path.exists(saveto): # Avoid repeated downloads
+ try:
+ log.warn("Downloading %s", url)
+ src = urlopen(url)
+ # Read/write all in one block, so we don't create a corrupt file
+ # if the download is interrupted.
+ data = src.read()
+ dst = open(saveto, "wb")
+ dst.write(data)
+ finally:
+ if src:
+ src.close()
+ if dst:
+ dst.close()
+ return os.path.realpath(saveto)
+
+def _no_sandbox(function):
+ def __no_sandbox(*args, **kw):
+ try:
+ from setuptools.sandbox import DirectorySandbox
+ if not hasattr(DirectorySandbox, '_old'):
+ def violation(*args):
+ pass
+ DirectorySandbox._old = DirectorySandbox._violation
+ DirectorySandbox._violation = violation
+ patched = True
+ else:
+ patched = False
+ except ImportError:
+ patched = False
+
+ try:
+ return function(*args, **kw)
+ finally:
+ if patched:
+ DirectorySandbox._violation = DirectorySandbox._old
+ del DirectorySandbox._old
+
+ return __no_sandbox
+
+def _patch_file(path, content):
+ """Will backup the file then patch it"""
+ existing_content = open(path).read()
+ if existing_content == content:
+ # already patched
+ log.warn('Already patched.')
+ return False
+ log.warn('Patching...')
+ _rename_path(path)
+ f = open(path, 'w')
+ try:
+ f.write(content)
+ finally:
+ f.close()
+ return True
+
+_patch_file = _no_sandbox(_patch_file)
+
+def _same_content(path, content):
+ return open(path).read() == content
+
+def _rename_path(path):
+ new_name = path + '.OLD.%s' % time.time()
+ log.warn('Renaming %s into %s', path, new_name)
+ os.rename(path, new_name)
+ return new_name
+
+def _remove_flat_installation(placeholder):
+ if not os.path.isdir(placeholder):
+ log.warn('Unkown installation at %s', placeholder)
+ return False
+ found = False
+ for file in os.listdir(placeholder):
+ if fnmatch.fnmatch(file, 'setuptools*.egg-info'):
+ found = True
+ break
+ if not found:
+ log.warn('Could not locate setuptools*.egg-info')
+ return
+
+ log.warn('Removing elements out of the way...')
+ pkg_info = os.path.join(placeholder, file)
+ if os.path.isdir(pkg_info):
+ patched = _patch_egg_dir(pkg_info)
+ else:
+ patched = _patch_file(pkg_info, SETUPTOOLS_PKG_INFO)
+
+ if not patched:
+ log.warn('%s already patched.', pkg_info)
+ return False
+ # now let's move the files out of the way
+ for element in ('setuptools', 'pkg_resources.py', 'site.py'):
+ element = os.path.join(placeholder, element)
+ if os.path.exists(element):
+ _rename_path(element)
+ else:
+ log.warn('Could not find the %s element of the '
+ 'Setuptools distribution', element)
+ return True
+
+_remove_flat_installation = _no_sandbox(_remove_flat_installation)
+
+def _after_install(dist):
+ log.warn('After install bootstrap.')
+ placeholder = dist.get_command_obj('install').install_purelib
+ _create_fake_setuptools_pkg_info(placeholder)
+
+def _create_fake_setuptools_pkg_info(placeholder):
+ if not placeholder or not os.path.exists(placeholder):
+ log.warn('Could not find the install location')
+ return
+ pyver = '%s.%s' % (sys.version_info[0], sys.version_info[1])
+ setuptools_file = 'setuptools-%s-py%s.egg-info' % \
+ (SETUPTOOLS_FAKED_VERSION, pyver)
+ pkg_info = os.path.join(placeholder, setuptools_file)
+ if os.path.exists(pkg_info):
+ log.warn('%s already exists', pkg_info)
+ return
+
+ log.warn('Creating %s', pkg_info)
+ f = open(pkg_info, 'w')
+ try:
+ f.write(SETUPTOOLS_PKG_INFO)
+ finally:
+ f.close()
+
+ pth_file = os.path.join(placeholder, 'setuptools.pth')
+ log.warn('Creating %s', pth_file)
+ f = open(pth_file, 'w')
+ try:
+ f.write(os.path.join(os.curdir, setuptools_file))
+ finally:
+ f.close()
+
+_create_fake_setuptools_pkg_info = _no_sandbox(_create_fake_setuptools_pkg_info)
+
+def _patch_egg_dir(path):
+ # let's check if it's already patched
+ pkg_info = os.path.join(path, 'EGG-INFO', 'PKG-INFO')
+ if os.path.exists(pkg_info):
+ if _same_content(pkg_info, SETUPTOOLS_PKG_INFO):
+ log.warn('%s already patched.', pkg_info)
+ return False
+ _rename_path(path)
+ os.mkdir(path)
+ os.mkdir(os.path.join(path, 'EGG-INFO'))
+ pkg_info = os.path.join(path, 'EGG-INFO', 'PKG-INFO')
+ f = open(pkg_info, 'w')
+ try:
+ f.write(SETUPTOOLS_PKG_INFO)
+ finally:
+ f.close()
+ return True
+
+_patch_egg_dir = _no_sandbox(_patch_egg_dir)
+
+def _before_install():
+ log.warn('Before install bootstrap.')
+ _fake_setuptools()
+
+
+def _under_prefix(location):
+ if 'install' not in sys.argv:
+ return True
+ args = sys.argv[sys.argv.index('install')+1:]
+ for index, arg in enumerate(args):
+ for option in ('--root', '--prefix'):
+ if arg.startswith('%s=' % option):
+ top_dir = arg.split('root=')[-1]
+ return location.startswith(top_dir)
+ elif arg == option:
+ if len(args) > index:
+ top_dir = args[index+1]
+ return location.startswith(top_dir)
+ if arg == '--user' and USER_SITE is not None:
+ return location.startswith(USER_SITE)
+ return True
+
+
+def _fake_setuptools():
+ log.warn('Scanning installed packages')
+ try:
+ import pkg_resources
+ except ImportError:
+ # we're cool
+ log.warn('Setuptools or Distribute does not seem to be installed.')
+ return
+ ws = pkg_resources.working_set
+ try:
+ setuptools_dist = ws.find(pkg_resources.Requirement.parse('setuptools',
+ replacement=False))
+ except TypeError:
+ # old distribute API
+ setuptools_dist = ws.find(pkg_resources.Requirement.parse('setuptools'))
+
+ if setuptools_dist is None:
+ log.warn('No setuptools distribution found')
+ return
+ # detecting if it was already faked
+ setuptools_location = setuptools_dist.location
+ log.warn('Setuptools installation detected at %s', setuptools_location)
+
+ # if --root or --preix was provided, and if
+ # setuptools is not located in them, we don't patch it
+ if not _under_prefix(setuptools_location):
+ log.warn('Not patching, --root or --prefix is installing Distribute'
+ ' in another location')
+ return
+
+ # let's see if its an egg
+ if not setuptools_location.endswith('.egg'):
+ log.warn('Non-egg installation')
+ res = _remove_flat_installation(setuptools_location)
+ if not res:
+ return
+ else:
+ log.warn('Egg installation')
+ pkg_info = os.path.join(setuptools_location, 'EGG-INFO', 'PKG-INFO')
+ if (os.path.exists(pkg_info) and
+ _same_content(pkg_info, SETUPTOOLS_PKG_INFO)):
+ log.warn('Already patched.')
+ return
+ log.warn('Patching...')
+ # let's create a fake egg replacing setuptools one
+ res = _patch_egg_dir(setuptools_location)
+ if not res:
+ return
+ log.warn('Patched done.')
+ _relaunch()
+
+
+def _relaunch():
+ log.warn('Relaunching...')
+ # we have to relaunch the process
+ # pip marker to avoid a relaunch bug
+ if sys.argv[:3] == ['-c', 'install', '--single-version-externally-managed']:
+ sys.argv[0] = 'setup.py'
+ args = [sys.executable] + sys.argv
+ sys.exit(subprocess.call(args))
+
+
+def _extractall(self, path=".", members=None):
+ """Extract all members from the archive to the current working
+ directory and set owner, modification time and permissions on
+ directories afterwards. `path' specifies a different directory
+ to extract to. `members' is optional and must be a subset of the
+ list returned by getmembers().
+ """
+ import copy
+ import operator
+ from tarfile import ExtractError
+ directories = []
+
+ if members is None:
+ members = self
+
+ for tarinfo in members:
+ if tarinfo.isdir():
+ # Extract directories with a safe mode.
+ directories.append(tarinfo)
+ tarinfo = copy.copy(tarinfo)
+ tarinfo.mode = 448 # decimal for oct 0700
+ self.extract(tarinfo, path)
+
+ # Reverse sort directories.
+ if sys.version_info < (2, 4):
+ def sorter(dir1, dir2):
+ return cmp(dir1.name, dir2.name)
+ directories.sort(sorter)
+ directories.reverse()
+ else:
+ directories.sort(key=operator.attrgetter('name'), reverse=True)
+
+ # Set correct owner, mtime and filemode on directories.
+ for tarinfo in directories:
+ dirpath = os.path.join(path, tarinfo.name)
+ try:
+ self.chown(tarinfo, dirpath)
+ self.utime(tarinfo, dirpath)
+ self.chmod(tarinfo, dirpath)
+ except ExtractError:
+ e = sys.exc_info()[1]
+ if self.errorlevel > 1:
+ raise
+ else:
+ self._dbg(1, "tarfile: %s" % e)
+
+
+def main(argv, version=DEFAULT_VERSION):
+ """Install or upgrade setuptools and EasyInstall"""
+ tarball = download_setuptools()
+ _install(tarball)
+
+
+if __name__ == '__main__':
+ main(sys.argv[1:])
24 py/examples/dasample1c.py
@@ -0,0 +1,24 @@
+import sys
+from pwrcall import loop, unloop, Node, expose
+
+n = Node(cert='cert.pem')
+math = n.establish(sys.argv[1])
+#math2 = n.connect('127.0.0.1', 10000).rootobj().call('get', sys.argv[1].split('/')[-1])
+
+def printexception(r):
+ print 'exc:', r
+ n.shutdown()
+ unloop()
+
+math._except(printexception)
+p = math.call('add', 11, 17)
+
+def printresult(result):
+ print 'printresult:', result
+ n.shutdown()
+ unloop()
+
+p._when(printresult)
+
+loop()
+
21 py/examples/dasample1s.py
@@ -0,0 +1,21 @@
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+from pwrcall import loop, unloop, Node, expose
+
+class Math(object):
+ @expose
+ def add(self, a, b):
+ return a+b
+
+ @expose
+ def mul(self, a, b):
+ return a*b
+
+n = Node(cert='cert2.pem')
+ref = n.register_object(Math())
+n.listen(port=10000)
+print 'math obj ready at', n.refurl(ref)
+
+loop()
+
27 py/examples/dasample2c.py
@@ -0,0 +1,27 @@
+import sys
+import random
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+from pwrcall import loop, unloop, Node, expose, Promise
+from evnet import later
+
+class Slave(object):
+ @expose
+ def procfunc(self, arg):
+ print 'procfunc called with array length', len(arg)
+ p = Promise()
+ def fulfill():
+ r = sum(arg)
+ print 'procfunc sending result', r
+ p._resolve(r)
+ later(2.0, fulfill)
+ return p
+
+n = Node(cert='cert2.pem')
+s = Slave()
+m = n.establish(sys.argv[1])
+m.call('addslave', s)
+
+loop()
+
83 py/examples/dasample2s.py
@@ -0,0 +1,83 @@
+import sys
+import random
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+from pwrcall import loop, unloop, Node, expose, Promise
+from evnet import later
+
+class Processround(object):
+ def __init__(self, slaves, data, funcname, combinefunc):
+ self.slaves = slaves
+ self.data = data
+ self.funcname = funcname
+ self.combinefunc = combinefunc
+ self.p = Promise()
+ self.results = []
+
+ def run(self):
+ amount = len(self.data) / len(self.slaves)
+ for i in range(len(self.slaves)):
+ rp = self.slaves[i].call(self.funcname, self.data[i*amount:(i*amount)+amount])
+ rp._when(self.resultcb)
+ rp._except(self.exceptcb)
+
+ return self.p
+
+ def exceptcb(self, e):
+ self.p._smash(e)
+
+ def resultcb(self, result):
+ self.results.append(result)
+ if len(self.results) == len(self.slaves):
+ self.p._resolve(self.combinefunc(self.results))
+
+
+class Master(object):
+ def __init__(self):
+ self.n = Node(cert='cert.pem')
+ self.n.listen(port=10000)
+ ref = self.n.register_object(self, 'master')
+ print 'Master at', self.n.refurl(ref)
+
+ self.slaves = {}
+ later(5.0, self.process)
+
+ def process(self):
+ slaves = self.slaves.values()
+ if not slaves:
+ later(5.0, self.process)
+ print 'process, but no slaves'
+ return
+
+ somerand = random.randint(10,200)
+ data = range(somerand * 720, (somerand+1) * 720)
+
+ pr = Processround(slaves, data, 'procfunc', sum)
+ p = pr.run()
+ p._except(self.roundfailed)
+ p._when(self.roundresult, sum(data))
+
+ def roundresult(self, r, should):
+ print 'roundresult, shouldbe:', r, should, r==should
+ later(5.0, self.process)
+
+ def roundfailed(self, e):
+ print 'processround failed:', e
+ later(5.0, self.process)
+
+ @expose
+ def addslave(self, slave, conn):
+ self.slaves[conn] = slave
+
+ def connclosed(reason):
+ sl = self.slaves.pop(conn, None)
+ print 'slave {0} gone, as conn {1} is closed.'. format(sl, conn)
+
+ conn._on('close', connclosed)
+ return True
+
+m = Master()
+
+loop()
+
213 py/examples/dasample3c.py
@@ -0,0 +1,213 @@
+import sys
+import os
+import hashlib
+import tempfile
+import optparse
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+from pwrcall import loop, unloop, Node, expose, Promise
+from pwrcall.util import NodeException, gen_selfsigned_cert
+from evnet import schedule
+
+import gtkdialog
+
+MASTERURL = 'pwrcall://c6db6fac64dd58e7a7a8228e9bc11116397ba58d@137.226.161.210:20001/im'
+MASTERURL = 'pwrcall://e7bcae69e9c79aad2f4b8fe1f14bcd52beb4faae@137.226.161.211:20001/im'
+DOTPATH = os.path.expanduser('~/.instashare/')
+
+class File(object):
+ def __init__(self, fname):
+ self.fobj = None
+ self.fname = fname
+
+ @expose
+ def write(self, data):
+ if not self.fobj:
+ self.fobj = tempfile.NamedTemporaryFile(delete=False, prefix='instasharetmp--{0}--'.format(self.fname), dir=DOTPATH)
+ return self.fobj.write(data)
+
+ @expose
+ def close(self):
+ self.fobj.close()
+ print 'sink with path {0}, fname {1} was closed.'.format(self.fobj.name, self.fname)
+
+class Instaclient(object):
+ def __init__(self, node):
+ self.node = node
+ self.ref = node.register_object(self)
+ self.pwrurl = node.refurl(self.ref)
+ print 'Instaclient exported', self.pwrurl
+
+ gtkdialog.t.create_staticon('./instaicon.svg')
+ self.call_master()
+
+ def masterclose(self, r):
+ if not self.node._closing and ('timeout' in str(r) or 'ZeroReturn' in str(r)):
+ print 'Connection to Instamaster closed. Reconnecting...'
+ self.call_master()
+
+ def call_master(self):
+ im = self.node.establish(MASTERURL)
+
+ def master_cb(r):
+ r.conn._on('close', self.masterclose)
+ print 'Successfully connected to Instamaster. Waiting...'
+ def master_exc(r):
+ print 'Exception on Instamaster promise:', e
+ unloop()
+
+ im._when(master_cb)
+ im._except(master_exc)
+
+ @expose
+ def open(self, filename, conn):
+ print 'request to open a file', filename
+ sink = File(filename)
+ return sink
+
+class Filestreamer(object):
+ def __init__(self, src, dst, conn):
+ self.src, self.dst = src, dst
+ self.conn = conn
+ self.done = False
+ self.p = Promise()
+ self.count = 0
+ self.burst = True
+
+ conn.conn._on('writable', self.copy)
+ conn.conn._on('close', self.closed)
+ schedule(self.copy)
+
+ def copyagain(self, r):
+ self.burst = True
+ schedule(self.copy)
+
+ def copy(self):
+ if self.done or self.conn.conn._closed: return
+ if not self.burst: return
+ self.count += 1
+
+ d = self.src.read(16384)
+ if not d:
+ print 'EOF on src, sending close, local close, done'
+ p2 = self.dst.call('close')
+ def done(r):
+ self.p._resolve(True)
+ self.conn.close()
+ p2._when(done)
+
+ self.done = True
+ self.src.close()
+ return
+
+ if self.count == 10:
+ self.burst = False
+ self.count = 0
+ p = self.dst.call('write', d)
+ p._when(self.copyagain)
+ else:
+ p = self.dst.notify('write', d)
+
+ def closed(self, e):
+ print 'connection closed', e
+ if not self.done:
+ self.p._smash('Closed, but not done, yet.')
+
+
+def opts():
+ usage = """usage:
+ %prog listen
+ - registers with master and waits for incoming files
+ %prog send <target> <filepath>
+ - attempts to send <filepath> to <target>
+"""
+ parser = optparse.OptionParser(usage=usage)
+
+ options, args = parser.parse_args()
+
+ if len(args) < 1:
+ parser.error('Please read usage hints on how to INSTASHARE.')
+
+ action, args = args[0], args[1:]
+ if action not in ['listen', 'send']:
+ parser.error('Please read usage hints on how to INSTASHARE.')
+
+ if action == 'send':
+ if len(args) != 2:
+ parser.error('Please read usage hints on how to INSTASHARE.')
+
+ if not setup_dotdir():
+ parser.error('Could not create dotdir {0}, exiting.'.format(DOTDIR))
+
+ return options, action, args
+
+def main(options, action, args):
+ n = Node(cert=os.path.join(DOTPATH, 'cert.pem'))
+
+ if action == 'listen':
+ ic = Instaclient(n)
+ loop()
+ elif action == 'send':
+ targetcap, fp = args
+
+ if not (os.path.exists(fp) and os.path.isfile(fp)):
+ print fp, 'not a file'
+ return 1
+
+ im = n.establish(MASTERURL)
+
+ def established(rim):
+ p = rim.call('open', targetcap)
+
+ def donecb(r):
+ print 'done:', r
+ n.shutdown()
+ unloop()
+
+ def prsink(r):
+ print 'sink:' , r
+ r.notify('init', os.path.basename(fp))
+ fs = Filestreamer(open(fp, 'rb'), r, rim.conn)
+ fs.p._when(donecb)
+ fs.p._except(donecb)
+
+ def prexcept(e):
+ print 'Error:', e
+ n.shutdown()
+ unloop()
+
+ p._when(prsink)
+ p._except(prexcept)
+
+ im._when(established)
+ loop()
+
+ return 0
+
+def setup_dotdir():
+ if not (os.path.exists(DOTPATH) and os.path.isdir(DOTPATH)):
+ print 'Creating directory', DOTPATH, '...'
+ try:
+ os.mkdir(DOTPATH, 0700)
+ except:
+ return False
+
+ certpath = os.path.join(DOTPATH, 'cert.pem')
+ if not (os.path.exists(certpath) and os.path.isfile(certpath)):
+ print 'Generating self-signed certificate...'
+ try:
+ crt = gen_selfsigned_cert()
+ open(certpath, 'w').write(crt)
+ except:
+ return False
+
+ return True
+
+if __name__ == '__main__':
+ options, action, args = opts()
+ try:
+ sys.exit(main(options, action, args))
+ except KeyboardInterrupt:
+ sys.exit(0)
+
52 py/examples/dasample3s.py
@@ -0,0 +1,52 @@
+import sys
+import logging
+logging.basicConfig(level=logging.DEBUG)
+
+from pwrcall import loop, unloop, Node, expose
+from pwrcall.util import NodeException, parse_url
+
+class Instamaster(object):
+ def __init__(self):
+ self.clients = {}
+
+ @expose
+ def open(self, pwrurl, filename, conn):
+ try:
+ fp, hints, cap = parse_url(pwrurl)
+ except:
+ raise NodeException('Error in pwrcall URL.')
+
+ cli = self.clients.get(fp, None)
+ if not cli: raise NodeException('Target client unavailable.')
+ return cli.call(cap, 'open', filename)
+
+ def on_new_conn(self, rc, addr):
+ rc.onready()._when(self.register_conn)
+
+ def register_conn(self, rc):
+ cli = self.clients.get(rc.conn.peerfp, None)
+ if cli:
+ logging.warn('New conn {0} kicking old {1}.'.format(rc.conn.peerfp, cli.conn.peerfp))
+ cli.close()
+
+ self.clients[rc.conn.peerfp] = rc
+ def connclosed(reason):
+ self.clients.pop(rc.conn.peerfp, None)
+
+ rc._on('close', connclosed)
+
+def main():
+ n = Node(cert='instamaster.pem')
+ im = Instamaster()
+ ref = n.register_object(im, 'im')
+ n._on('connection', im.on_new_conn)
+ n.listen(port=20001)
+ print 'Instamaster at', n.refurl(ref)
+
+ loop()
+
+ return 0
+
+if __name__ == '__main__':
+ sys.exit(main())
+
6 py/pwrcall/__init__.py
@@ -0,0 +1,6 @@
+
+from .rpcnode import Node, expose
+from .promise import Promise
+from .util import Referenced
+
+from .pyevloop import loop, unloop, schedule
118 py/pwrcall/promise.py
@@ -0,0 +1,118 @@
+
+import logging
+
+from .util import NodeException
+from .pyevloop import schedule
+
+EVENTUAL = 1
+FULFILLED = 2
+BROKEN = 3
+
+class Promise(object):
+ def __init__(self):
+ self.__state = EVENTUAL
+ self._callbacks = []
+ self._errbacks = []
+ self._calls = []
+ self._result = None
+
+ def _state(self):
+ return self.__state
+
+ def _call_errbacks(self):
+ if self.__state == BROKEN:
+ for fn, args, kwargs in self._errbacks:
+ #schedule(fn, self._result, *args, **kwargs)
+ fn(self._result, *args, **kwargs)
+ self._errbacks = []
+
+ def _call_callbacks(self):
+ if self.__state == FULFILLED:
+ # call all callbacks and invoke all methods
+ for fn, args, kwargs in self._callbacks:
+ #schedule(fn, self._result, *args, **kwargs)
+ fn(self._result, *args, **kwargs)
+ self._callbacks = []
+
+ for p, m, args, kwargs in self._calls:
+ self._exec_call(p, m, args, kwargs)
+ self._calls = []
+
+ def _exec_call(self, p, m, args, kwargs):
+ f = getattr(self._result, m)
+ #r = f(*args, **kwargs)
+ #p._resolve(r)
+ #return
+ def tmpcaller():
+ r = f(*args, **kwargs)
+ p._resolve(r)
+ schedule(tmpcaller)
+
+ def _chain(self, cbs, calls, ebs):
+ self._callbacks += cbs
+ self._calls += calls
+ self._errbacks += ebs
+ self._call_callbacks()
+ self._call_errbacks()
+
+ def _resolve(self, result):
+ if self.__state != EVENTUAL:
+ raise NodeException('Promises may only be fulfilled once.')
+
+ self.__state = FULFILLED
+ self._result = result
+
+ if isinstance(result, Promise):
+ result._chain(self._callbacks, self._calls, self._errbacks)
+ self._when = result._when
+ self._except = result._except
+ self._state = result._state
+ self._call = result._call
+ self._chain = result._chain
+ self.__getattr__ = result.__getattr__
+ else:
+ self._call_callbacks()
+
+ def _smash(self, exc):
+ if self.__state == BROKEN:
+ raise NodeException('Promises may only be broken once.')
+ if self.__state == FULFILLED:
+ raise NodeException('Promise was fulfilled and now broken?')
+ self._result = exc
+ self.__state = BROKEN
+ # now call all errbacks, ignoring method calls
+ if not self._errbacks:
+ logging.warn('Promise without errbacks smashed with exception: {0}'.format(exc))
+ self._call_errbacks()
+
+ def _when(self, cb, *args, **kwargs):
+ if self.__state == EVENTUAL:
+ self._callbacks.append((cb, args, kwargs))
+ elif self.__state == FULFILLED:
+ #schedule(cb, self._result, *args, **kwargs)
+ cb(self._result, *args, **kwargs)
+
+ def _except(self, cb, *args, **kwargs):
+ if self.__state == EVENTUAL:
+ self._errbacks.append((cb, args, kwargs))
+ elif self.__state == BROKEN:
+ #schedule(cb, self._result, *args, **kwargs)
+ cb(self._result, *args, **kwargs)
+
+ def _call(self, method, *args, **kwargs):
+ p = Promise()
+ if self.__state == EVENTUAL:
+ self._calls.append((p, method, args, kwargs))
+ elif self.__state == BROKEN:
+ p._smash(self._result)
+ else:
+ self._exec_call(p, method, args, kwargs)
+ return p
+
+ def __getattr__(self, name):
+ #logging.debug('promise __getattr__: {0}'.format(name))
+ def promisingFunc(*args, **kwargs):
+ return self._call(name, *args, **kwargs)
+ return promisingFunc
+
+
2 py/pwrcall/pyevloop.py
@@ -0,0 +1,2 @@
+
+from evnet import *
400 py/pwrcall/rpcnode.py
@@ -0,0 +1,400 @@
+
+import os
+import logging
+import socket
+import weakref
+import inspect
+import itertools
+import time
+import collections
+
+from . import util
+from . import pyevloop
+from . import serialize
+from .promise import Promise
+from .util import NodeException, expose, Referenced, EventGen
+
+RPC_REQUEST = 0
+RPC_RESPONSE = 1
+RPC_NOTIFY = 2
+
+class nodeFunctions(object):
+ """pwrcall node functionality"""
+ def __init__(self, node):
+ self.node = node
+
+ @expose
+ def get(self, cap):
+ """Retrieve objects registered with the Node"""
+ return self.node.lookup(cap)
+
+ @expose
+ def clone(self, cap, options={}, conn):
+ """Clone a capability"""
+ o = self.node.lookup(cap)
+ # cloning always includes the caller fingerprint
+ # this restricts revocation
+ options.update({'clonefp': conn.conn.peerfp})
+ cap = util.gen_forwarder(self.node.secret, o, self.node.nonce, options=options)
+ return cap
+
+ @expose
+ def revoke(self, cap, conn):
+ """Revoke a previously cloned capability."""
+ objid, opts = self.node.decode_cap(cap)
+ cfp = opts.get('clonefp', None)
+ autofp = opts.get('fp', None)
+ if not cfp: raise NodeException('Not a cloned cap.')
+ if cfp != conn.conn.peerfp: raise NodeException('Denied.')
+ self.node.revoked.add(cap)
+ return True
+
+ @expose
+ def revoke_option(self, opt, conn):
+ self.node.revoked_opts[conn.conn.peerfp].add(opt)
+ return True
+
+class Node(EventGen):
+ def __init__(self, cert=None, eventloop=pyevloop):
+ EventGen.__init__(self)
+ self.eventloop = eventloop
+ self.eventloop.shutdown_callback(self._shutdown_request)
+ self.cert = cert
+ self.x509, self.fp = util.load_cert(self.cert)
+ logging.debug('Node fingerprint {0}'.format(self.fp))
+ self.verify_hook = None
+
+ self.connections = set()
+ self.peers = {}
+ self.listeners = set()
+ self._closing = False
+ self._shutdown = False
+ self.exports = {}
+ self.revoked = set()
+ self.revoked_opts = collections.defaultdict(set)
+
+ self.secret = util.filehash(self.cert)[:16]
+ self.nonce = (util.rand32()<<32) | util.rand32()
+
+ self.register_object(nodeFunctions(self), '$node')
+ #pyevloop.later(5, self.connstats)
+
+ def connstats(self):
+ print '---connstats---'
+ for i in self.connections:
+ print i, i.exports, i.out_requests, i.last_msgid, i.conn.buf.size, i.conn.readbytes, i.conn.writebytes
+ pyevloop.later(5, self.connstats)
+
+ def verify_peer(self, ok, store, *args, **kwargs):
+ if self.verify_hook: return self.verify_hook(ok, store, *args, **kwargs)
+ return True
+
+ def register(self, obj, options=None, cap=None):
+ if not cap: cap = util.gen_forwarder(self.secret, obj, self.nonce, options=options)
+ self.exports[cap] = obj
+ return cap
+
+ def refurl(self, ref):
+ ports = [i.sock.getsockname()[1] for i in self.listeners]
+ hints = ','.join(['{0}:{1}'.format(i[0], i[1]) for i in itertools.product(self.eventloop.hints, ports)])
+ return 'pwrcall://{0}@{1}/{2}'.format(self.fp, hints, ref)
+
+ def option_revoked(self, opts):
+ cfp = opts.pop('clonefp', None)
+ if not cfp: return False
+ for i in opts.items():
+ if i in self.revoked_opts[cfp]: return True
+ return False
+
+ def lookup(self, ref):
+ if ref in revoked: raise NodeException('Invalid object reference used.')
+ try:
+ objid, options = self.decode_cap(ref)
+ if self.option_revoked(options): raise NodeException('Invalid object reference used.')
+ owref = self.exports.get(objid, None)
+ if not owref: raise NodeException('Invalid object reference used.')
+ o = owref()
+ if not o: raise NodeException('Object has gone away.')
+ except NodeException:
+ raise
+ except:
+ o = self.exports.get(ref, None)
+ if not o: raise NodeException('Invalid object reference used.')
+ else: return o
+ else:
+ return o
+
+ def decode_cap(self, cap):
+ objid, nonce, opts = util.cap_from_forwarder(self.secret, cap)
+ if nonce != self.nonce: raise NodeException('Nonce from message incorrect.')
+ # TODO: somehow give options to user
+ return objid, opts
+
+ def shutdown(self, reason=NodeException('Server shutdown.')):
+ if self._shutdown:
+ return
+
+ self._closing = True
+ for c in self.connections: c.close()
+ for l in self.listeners: l.close()
+
+ self.connections = set()
+ self.listeners = set()
+ self._shutdown = True
+
+ logging.info('Node shutdown, {0}'.format(reason))
+
+ # this method is given to eventloop as callback
+ # so eventloop can tell us if it goes down
+ def _shutdown_request(self):
+ self.shutdown()
+
+ def _remove_connection(self, c):
+ if not self._closing:
+ self.connections.remove(c)
+ if c.conn.peerfp:
+ self.peers.pop(c.conn.peerfp, None)
+ logging.info('Disconnect by {0}'.format(c.addr))