Skip to content

Commit

Permalink
Create extensible service discovery protocol, with initial basic impl…
Browse files Browse the repository at this point in the history
…ementation
  • Loading branch information
lakeman committed May 9, 2014
1 parent 4204973 commit 672104b
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 39 deletions.
2 changes: 1 addition & 1 deletion commandline.c
Expand Up @@ -3235,7 +3235,7 @@ struct cli_schema command_line_options[]={
"Run memory speed test"},
{app_byteorder_test,{"test","byteorder",NULL}, 0,
"Run byte order handling test"},
{app_msp_connection,{"msp", "listen", "[--once]", "[--forward=<local_port>]", "<port>", NULL}, 0,
{app_msp_connection,{"msp", "listen", "[--once]", "[--forward=<local_port>]", "[--service=<service_name>]", "<port>", NULL}, 0,
"Listen for incoming connections"},
{app_msp_connection,{"msp", "connect", "[--once]", "[--forward=<local_port>]", "<sid>", "<port>", NULL}, 0,
"Connect to a remote party"},
Expand Down
1 change: 1 addition & 0 deletions constants.h
Expand Up @@ -117,6 +117,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#define MDP_PORT_ECHO 7
#define MDP_PORT_TRACE 8
#define MDP_PORT_DNALOOKUP 10
#define MDP_PORT_SERVICE_DISCOVERY 11
#define MDP_PORT_VOMP 12
#define MDP_PORT_RHIZOME_REQUEST 13
#define MDP_PORT_RHIZOME_RESPONSE 14
Expand Down
54 changes: 54 additions & 0 deletions java/org/servalproject/servaldna/AbstractMdpProtocol.java
@@ -0,0 +1,54 @@
package org.servalproject.servaldna;

import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;

/**
* Created by jeremy on 8/05/14.
*/
public abstract class AbstractMdpProtocol<T> extends ChannelSelector.Handler {
private final ChannelSelector selector;
protected final MdpSocket socket;
protected final AsyncResult<T> results;

public AbstractMdpProtocol(ChannelSelector selector, AsyncResult<T> results) throws IOException {
socket = new MdpSocket();
socket.bind();
this.selector = selector;
this.results = results;
selector.register(this);
}

public void close(){
try {
selector.unregister(this);
} catch (IOException e) {
e.printStackTrace();
}
socket.close();
}

protected abstract void parse(MdpPacket response);

@Override
public void read() {
try {
MdpPacket response = new MdpPacket();
socket.receive(response);
parse(response);
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public SelectableChannel getChannel() throws IOException {
return socket.getChannel();
}

@Override
public int getInterest() {
return SelectionKey.OP_READ;
}
}
37 changes: 3 additions & 34 deletions java/org/servalproject/servaldna/MdpDnaLookup.java
@@ -1,23 +1,14 @@
package org.servalproject.servaldna;

import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;

/**
* Created by jeremy on 21/02/14.
*/
public class MdpDnaLookup extends ChannelSelector.Handler{
private final ChannelSelector selector;
private final MdpSocket socket;
private final AsyncResult<ServalDCommand.LookupResult> results;
public class MdpDnaLookup extends AbstractMdpProtocol<ServalDCommand.LookupResult> {

public MdpDnaLookup(ChannelSelector selector, AsyncResult<ServalDCommand.LookupResult> results) throws IOException {
socket = new MdpSocket();
socket.bind();
this.selector = selector;
this.results = results;
selector.register(this);
super(selector, results);
}

public void sendRequest(SubscriberId destination, String did) throws IOException {
Expand All @@ -33,10 +24,8 @@ public void sendRequest(SubscriberId destination, String did) throws IOException
}

@Override
public void read() {
protected void parse(MdpPacket response) {
try {
MdpPacket response = new MdpPacket();
socket.receive(response);
byte bytes[] = new byte[response.payload.remaining()];
response.payload.get(bytes);
String resultString = new String(bytes);
Expand All @@ -54,25 +43,5 @@ public void read() {
} catch (AbstractId.InvalidHexException e) {
e.printStackTrace();
}

}

public void close(){
try {
selector.unregister(this);
} catch (IOException e) {
e.printStackTrace();
}
socket.close();
}

@Override
public SelectableChannel getChannel() throws IOException {
return socket.getChannel();
}

@Override
public int getInterest() {
return SelectionKey.OP_READ;
}
}
1 change: 1 addition & 0 deletions java/org/servalproject/servaldna/MdpPacket.java
Expand Up @@ -23,6 +23,7 @@ public class MdpPacket {

public static final int MDP_PORT_ECHO = 7;
public static final int MDP_PORT_DNALOOKUP = 10;
public static final int MDP_PORT_SERVICE_DISCOVERY = 11;

public MdpPacket(){
buff = ByteBuffer.allocate(MDP_MTU);
Expand Down
96 changes: 96 additions & 0 deletions java/org/servalproject/servaldna/MdpServiceLookup.java
@@ -0,0 +1,96 @@
package org.servalproject.servaldna;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Properties;

/**
* Created by jeremy on 8/05/14.
*/
public class MdpServiceLookup extends AbstractMdpProtocol<MdpServiceLookup.ServiceResult> {

public static class ServiceResult extends Properties {
public final SubscriberId subscriberId;
public ServiceResult(SubscriberId subscriberId){
this.subscriberId = subscriberId;
}

public String toString(){
return "ServiceResult{subscriberId="+subscriberId+", "+super.toString()+"}";
}
}

public MdpServiceLookup(ChannelSelector selector, AsyncResult<ServiceResult> results) throws IOException {
super(selector, results);
}

public void sendRequest(SubscriberId destination, String pattern) throws IOException {
MdpPacket request = new MdpPacket();
if (destination.isBroadcast())
request.setFlags(MdpPacket.MDP_FLAG_NO_CRYPT);
request.setRemoteSid(destination);
request.setRemotePort(MdpPacket.MDP_PORT_SERVICE_DISCOVERY);
request.payload.put(pattern.getBytes());
request.payload.put((byte)0);
request.payload.flip();
socket.send(request);
}

public static class BuffStream extends InputStream{
private final ByteBuffer buff;
public BuffStream(ByteBuffer buff){
this.buff = buff;
}

@Override
public boolean markSupported() {
return true;
}

@Override
public int read() throws IOException {
if (!buff.hasRemaining())
return -1;
return buff.get()&0xFF;
}

@Override
public void mark(int readLimit){
buff.mark();
}

@Override
public void reset() throws IOException {
buff.rewind();
}

@Override
public void close() throws IOException {
// noop
}

@Override
public int read(byte[] dst, int dstOffset, int charCount) throws IOException {
if (!buff.hasRemaining())
return -1;
if (charCount > buff.remaining())
charCount = buff.remaining();
buff.get(dst, dstOffset, charCount);
return charCount;
}
}

@Override
protected void parse(MdpPacket response) {
try {
ServiceResult result = new ServiceResult(response.getRemoteSid());
result.load(new BuffStream(response.payload));
results.result(result);
} catch (IOException e) {
e.printStackTrace();
} catch (AbstractId.InvalidBinaryException e) {
e.printStackTrace();
}
}
}
24 changes: 22 additions & 2 deletions java/org/servalproject/test/CommandLine.java
Expand Up @@ -3,6 +3,7 @@
import org.servalproject.servaldna.AsyncResult;
import org.servalproject.servaldna.ChannelSelector;
import org.servalproject.servaldna.MdpDnaLookup;
import org.servalproject.servaldna.MdpServiceLookup;
import org.servalproject.servaldna.MdpSocket;
import org.servalproject.servaldna.ResultList;
import org.servalproject.servaldna.ServalDCommand;
Expand Down Expand Up @@ -38,7 +39,6 @@ static void lookup(String did) throws IOException, InterruptedException, ServalD
System.out.println(s);
if (s.getResult()!=0)
throw new ServalDFailureException("Serval daemon isn't running");
System.out.println(s);
MdpSocket.loopbackMdpPort = s.mdpInetPort;
ChannelSelector selector = new ChannelSelector();
MdpDnaLookup lookup = new MdpDnaLookup(selector, new AsyncResult<ServalDCommand.LookupResult>() {
Expand All @@ -52,6 +52,24 @@ public void result(ServalDCommand.LookupResult nextResult) {
lookup.close();
}

static void service(String pattern) throws IOException, InterruptedException, ServalDFailureException {
ServalDCommand.Status s = ServalDCommand.serverStatus();
System.out.println(s);
if (s.getResult()!=0)
throw new ServalDFailureException("Serval daemon isn't running");
MdpSocket.loopbackMdpPort = s.mdpInetPort;
ChannelSelector selector = new ChannelSelector();
MdpServiceLookup lookup = new MdpServiceLookup(selector, new AsyncResult<MdpServiceLookup.ServiceResult>() {
@Override
public void result(MdpServiceLookup.ServiceResult nextResult) {
System.out.println(nextResult.toString());
}
});
lookup.sendRequest(SubscriberId.broadcastSid, pattern);
Thread.sleep(3000);
lookup.close();
}

public static void main(String... args){
if (args.length<1)
return;
Expand All @@ -66,7 +84,9 @@ public static void main(String... args){
if (methodName.equals("peers"))
getPeers();
if (methodName.equals("lookup"))
lookup(args.length>=2 ? args[1] : "");
lookup(args.length >= 2 ? args[1] : "");
if (methodName.equals("service"))
service(args.length >= 2 ? args[1] : "");

if (result!=null)
System.out.println(result.toString());
Expand Down
1 change: 1 addition & 0 deletions mdp_client.h
Expand Up @@ -32,6 +32,7 @@ struct mdp_sockaddr {
#define MDP_FLAG_NO_CRYPT (1<<0)
#define MDP_FLAG_NO_SIGN (1<<1)
#define MDP_FLAG_BIND (1<<2)
#define MDP_FLAG_REUSE (1<<5)

#define MDP_FLAG_CLOSE (1<<3)
#define MDP_FLAG_ERROR (1<<4)
Expand Down

0 comments on commit 672104b

Please sign in to comment.