Skip to content

Commit

Permalink
Add ZBeacon implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Constantin Rack committed Oct 31, 2014
1 parent 6a780a0 commit a572bb5
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 0 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Expand Up @@ -20,6 +20,7 @@ Chris Wong
Christian Gudrian
Chuck Remes
Conrad D. Steenberg
Constantin Rack
Dhammika Pathirana
Dhruva Krishnamurthy
Dirk O. Kaar
Expand Down
235 changes: 235 additions & 0 deletions src/main/java/org/zeromq/ZBeacon.java
@@ -0,0 +1,235 @@
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.zeromq;

import java.io.IOException;
import java.net.DatagramSocket;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;

public class ZBeacon {

public static final long DEFAULT_BROADCAST_INTERVAL = 1000L;
public static final String DEFAULT_BROADCAST_HOST = "255.255.255.255";

private final int port;
private InetAddress broadcastInetAddress;
private BroadcastClient broadcastClient;
private BroadcastServer broadcastServer;
private final byte[] beacon;
private byte[] prefix = {};
private long broadcastInterval = DEFAULT_BROADCAST_INTERVAL;
private Listener listener = null;

public ZBeacon(int port, byte[] beacon) {
this(DEFAULT_BROADCAST_HOST, port, beacon);
}

public ZBeacon(String host, int port, byte[] beacon) {
this.port = port;
this.beacon = beacon;
try {
broadcastInetAddress = InetAddress.getByName(host);
} catch (UnknownHostException unknownHostException) {
throw new RuntimeException(unknownHostException);
}
}

public void start() {
if (listener != null) {
broadcastServer = new BroadcastServer();
broadcastServer.start();
}
broadcastClient = new BroadcastClient();
broadcastClient.start();
}

public void stop() throws InterruptedException {
if (broadcastClient != null) {
broadcastClient.interrupt();
broadcastClient.join();
broadcastClient = null;
}
if (broadcastServer != null) {
broadcastServer.interrupt();
broadcastServer.join();
broadcastServer = null;
}
}

public void setPrefix(byte[] prefix) {
this.prefix = prefix;
}

public byte[] getPrefix() {
return prefix;
}

public void setListener(Listener listener) {
this.listener = listener;
}

public Listener getListener() {
return listener;
}

/**
* All beacons with matching prefix are passed to a listener.
*/
public interface Listener {

void onBeacon(InetAddress sender, byte[] beacon);

}

/**
* The broadcast client periodically sends beacons via UDP to the network.
*/
private class BroadcastClient extends Thread {

private DatagramChannel broadcastChannel;
private final InetSocketAddress broadcastInetSocketAddress;

public BroadcastClient() {
broadcastInetSocketAddress = new InetSocketAddress(broadcastInetAddress, port);
}

@Override
public void run() {
try {
broadcastChannel = DatagramChannel.open();
broadcastChannel.socket().setBroadcast(true);
while (!interrupted()) {
try {
broadcastChannel.send(ByteBuffer.wrap(beacon), broadcastInetSocketAddress);
Thread.sleep(broadcastInterval);
} catch (InterruptedException interruptedException) {
break;
} catch (Exception exception) {
throw new RuntimeException(exception);
}
}
} catch (IOException ioException) {
throw new RuntimeException(ioException);
} finally {
try {
broadcastChannel.close();
} catch (IOException ioException) {
throw new RuntimeException(ioException);
}
}
}

}

/**
* The broadcast server receives beacons.
*/
private class BroadcastServer extends Thread {

private DatagramChannel handle; // Socket for send/recv
private InetAddress address; // Own address

public BroadcastServer() {
try {
// Create UDP socket
handle = DatagramChannel.open();
handle.configureBlocking(false);
DatagramSocket sock = handle.socket();
sock.setReuseAddress(true);
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
for (NetworkInterface netint : Collections.list(interfaces)) {
if (netint.isLoopback()) {
continue;
}
Enumeration<InetAddress> inetAddresses = netint.getInetAddresses();
for (InetAddress addr : Collections.list(inetAddresses)) {
if (addr instanceof Inet4Address) {
address = addr;
}
}
}
sock.bind(new InetSocketAddress(InetAddress.getByAddress(new byte[] { 0, 0, 0, 0 }), port));
} catch (IOException ioException) {
throw new RuntimeException(ioException);
}
}

@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(65535);
SocketAddress sender;
int size;
while (!interrupted()) {
buffer.clear();
try {
int read = buffer.remaining();
sender = handle.receive(buffer);
if (sender == null) {
continue;
}

InetAddress senderAddress = ((InetSocketAddress) sender).getAddress();

if (address.equals(senderAddress)) {
continue;
}
size = read - buffer.remaining();
handleMessage(buffer, size, senderAddress);
} catch (IOException ioException) {
throw new RuntimeException(ioException);
}
}
handle.socket().close();
}

private void handleMessage(ByteBuffer buffer, int size, InetAddress from) {
if (size < prefix.length) {
return;
}
byte[] bytes = buffer.array();
// Compare prefix
for (int i = 0; i < prefix.length; i++) {
if (bytes[i] != prefix[i]) {
return;
}
}
listener.onBeacon(from, Arrays.copyOf(bytes, size));
}
}

public long getBroadcastInterval() {
return broadcastInterval;
}

public void setBroadcastInterval(long broadcastInterval) {
this.broadcastInterval = broadcastInterval;
}

}
47 changes: 47 additions & 0 deletions src/test/java/org/zeromq/ZBeaconTest.java
@@ -0,0 +1,47 @@
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.zeromq;

import java.net.InetAddress;

import org.junit.Test;
import org.zeromq.ZBeacon.Listener;

public class ZBeaconTest {

@Test
public void test() throws InterruptedException {
byte[] beacon = new byte[] { 'H', 'Y', 'D', 'R', 'A', 0x01, 0x12, 0x34 };
byte[] prefix = new byte[] { 'H', 'Y', 'D', 'R', 'A', 0x01 };
ZBeacon zbeacon = new ZBeacon(5670, beacon);
zbeacon.setPrefix(prefix);
zbeacon.setListener(new Listener() {

public void onBeacon(InetAddress sender, byte[] beacon) {
System.out.println("Got beacon from: " + sender.getHostAddress());
}

});
zbeacon.start();
Thread.sleep(2000);
zbeacon.stop();
}

}

0 comments on commit a572bb5

Please sign in to comment.