Permalink
Fetching contributors…
Cannot retrieve contributors at this time
652 lines (578 sloc) 23.4 KB
/* =========================================================================
zbeacon - LAN discovery and presence
Copyright (c) the Contributors as noted in the AUTHORS file.
This file is part of CZMQ, the high-level C binding for 0MQ:
http://czmq.zeromq.org.
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at http://mozilla.org/MPL/2.0/.
=========================================================================
*/
/*
@header
The zbeacon class implements a peer-to-peer discovery service for local
networks. A beacon can broadcast and/or capture service announcements
using UDP messages on the local area network. This implementation uses
IPv4 UDP broadcasts. You can define the format of your outgoing beacons,
and set a filter that validates incoming beacons. Beacons are sent and
received asynchronously in the background.
@discuss
This class replaces zbeacon_v2, and is meant for applications that use
the CZMQ v3 API (meaning, zsock).
@end
*/
#include "czmq_classes.h"
// Constants
#define INTERVAL_DFLT 1000 // Default interval = 1 second
// --------------------------------------------------------------------------
// The self_t structure holds the state for one actor instance
typedef struct {
zsock_t *pipe; // Actor command pipe
SOCKET udpsock; // UDP socket for send/recv
SOCKET udpsock_send; // UDP socket for IPv6 send
char port_nbr [7]; // UDP port number we work on
int interval; // Beacon broadcast interval
int64_t ping_at; // Next broadcast time
zframe_t *transmit; // Beacon transmit data
zframe_t *filter; // Beacon filter data
inaddr_storage_t broadcast; // Our broadcast address
bool terminated; // Did caller ask us to quit?
bool verbose; // Verbose logging enabled?
char hostname [NI_MAXHOST]; // Saved host name
} self_t;
static void
s_self_destroy (self_t **self_p)
{
assert (self_p);
if (*self_p) {
self_t *self = *self_p;
zframe_destroy (&self->transmit);
zframe_destroy (&self->filter);
if (self->udpsock) // don't close STDIN
zsys_udp_close (self->udpsock);
freen (self);
*self_p = NULL;
}
}
static self_t *
s_self_new (zsock_t *pipe)
{
self_t *self = (self_t *) zmalloc (sizeof (self_t));
assert (self);
self->pipe = pipe;
return self;
}
// --------------------------------------------------------------------------
// Prepare beacon to work on specified UPD port.
static void
s_self_prepare_udp (self_t *self)
{
// Create our UDP socket
if (self->udpsock)
zsys_udp_close (self->udpsock);
if (self->udpsock_send)
zsys_udp_close (self->udpsock_send);
self->hostname [0] = 0;
// For IPv6 we need two sockets. At least on Linux, IPv6 multicast packets
// are NOT received despite joining the group and setting the interface
// option UNLESS the socket is bound to in6_addrany, which means the kernel
// will select an arbitrary IP address as the source when sending beacons
// out. This breaks zbeacon as the protocol uses the source address of a
// beacon to find the endpoint of a peer, which is then random and
// useless (could even be associated with a different interface, eg: a
// virtual bridge).
// As a workaround, use a different socket to send packets. So the socket
// that receives can be bound to in6_addrany, and the socket that sends
// can be bound to the actual intended host address.
self->udpsock = zsys_udp_new (false);
if (self->udpsock == INVALID_SOCKET) {
self->udpsock_send = INVALID_SOCKET;
return;
}
self->udpsock_send = zsys_udp_new (false);
if (self->udpsock_send == INVALID_SOCKET) {
zsys_udp_close (self->udpsock);
self->udpsock = INVALID_SOCKET;
return;
}
// Get the network interface fro ZSYS_INTERFACE or else use first
// broadcast interface defined on system. ZSYS_INTERFACE=* means
// use INADDR_ANY + INADDR_BROADCAST.
const char *iface = zsys_interface ();
struct addrinfo *bind_to = NULL;
struct addrinfo *send_to = NULL;
struct addrinfo hint;
memset (&hint, 0, sizeof(struct addrinfo));
hint.ai_flags = AI_NUMERICHOST;
#if !defined (CZMQ_HAVE_ANDROID) && !defined (CZMQ_HAVE_FREEBSD)
hint.ai_flags |= AI_V4MAPPED;
#endif
hint.ai_socktype = SOCK_DGRAM;
hint.ai_protocol = IPPROTO_UDP;
hint.ai_family = zsys_ipv6 () ? AF_INET6 : AF_INET;
int rc;
int found_iface = 0;
unsigned int if_index = 0;
if (streq (iface, "*")) {
// Wildcard means bind to INADDR_ANY and send to INADDR_BROADCAST
// IE - getaddrinfo with NULL as first parameter or 255.255.255.255
// (or IPv6 multicast link-local all-node group ff02::1
hint.ai_flags = hint.ai_flags | AI_PASSIVE;
rc = getaddrinfo (NULL, self->port_nbr, &hint, &bind_to);
assert (rc == 0);
if (zsys_ipv6()) {
// Default is link-local all-node multicast group
rc = getaddrinfo (zsys_ipv6_mcast_address (), self->port_nbr, &hint,
&send_to);
assert (rc == 0);
}
else {
rc = getaddrinfo ("255.255.255.255", self->port_nbr, &hint,
&send_to);
assert (rc == 0);
}
found_iface = 1;
}
// if ZSYS_INTERFACE is a single digit, use the corresponding interface in
// the interface list
else if (strlen (iface) == 1 && iface[0] >= '0' && iface[0] <= '9')
{
int if_number = atoi (iface);
ziflist_t *iflist = ziflist_new_ipv6 ();
assert (iflist);
const char *name = ziflist_first (iflist);
int idx = -1;
while (name) {
idx++;
if (idx == if_number &&
((ziflist_is_ipv6 (iflist) && zsys_ipv6 ()) ||
(!ziflist_is_ipv6 (iflist) && !zsys_ipv6 ()))) {
// Using inet_addr instead of inet_aton or inet_atop
// because these are not supported in Win XP
rc = getaddrinfo (ziflist_address (iflist), self->port_nbr,
&hint, &bind_to);
assert (rc == 0);
rc = getaddrinfo (ziflist_broadcast (iflist), self->port_nbr,
&hint, &send_to);
assert (rc == 0);
if_index = if_nametoindex (name);
if (self->verbose)
zsys_info ("zbeacon: interface=%s address=%s broadcast=%s",
name, ziflist_address (iflist), ziflist_broadcast (iflist));
found_iface = 1;
break; // iface is known, so allow it
}
name = ziflist_next (iflist);
}
ziflist_destroy (&iflist);
}
else if (zsys_ipv6 () && strneq("", zsys_ipv6_address ()) && strneq (iface, "")) {
rc = getaddrinfo (zsys_ipv6_address (), self->port_nbr,
&hint, &bind_to);
assert (rc == 0);
// A user might set a link-local address without appending %iface
if (IN6_IS_ADDR_LINKLOCAL (&((in6addr_t *)bind_to->ai_addr)->sin6_addr) &&
!strchr (zsys_ipv6_address (), '%')) {
char address_and_iface [NI_MAXHOST] = {0};
strcat (address_and_iface, zsys_ipv6_address ());
strcat (address_and_iface, "%");
strcat (address_and_iface, iface);
rc = getaddrinfo (address_and_iface, self->port_nbr, &hint, &bind_to);
assert (rc == 0);
}
rc = getaddrinfo (zsys_ipv6_mcast_address (), self->port_nbr,
&hint, &send_to);
assert (rc == 0);
if_index = if_nametoindex (iface);
if (self->verbose)
zsys_info ("zbeacon: interface=%s address=%s broadcast=%s",
iface, zsys_ipv6_address (), zsys_ipv6_mcast_address ());
found_iface = 1;
}
else {
// Look for matching interface, or first ziflist item
ziflist_t *iflist = ziflist_new_ipv6 ();
assert (iflist);
const char *name = ziflist_first (iflist);
while (name) {
// If IPv6 is not enabled ignore IPv6 interfaces.
if ((streq (iface, name) || streq (iface, "")) &&
((ziflist_is_ipv6 (iflist) && zsys_ipv6 ()) ||
(!ziflist_is_ipv6 (iflist) && !zsys_ipv6 ()))) {
rc = getaddrinfo (ziflist_address (iflist), self->port_nbr,
&hint, &bind_to);
assert (rc == 0);
rc = getaddrinfo (ziflist_broadcast (iflist), self->port_nbr,
&hint, &send_to);
assert (rc == 0);
if_index = if_nametoindex (name);
if (self->verbose)
zsys_info ("zbeacon: interface=%s address=%s broadcast=%s",
name, ziflist_address (iflist), ziflist_broadcast (iflist));
found_iface = 1;
break; // iface is known, so allow it
}
name = ziflist_next (iflist);
}
ziflist_destroy (&iflist);
}
if (found_iface) {
inaddr_storage_t bind_address;
// On Windows we bind to the host address
// On *NIX we bind to INADDR_ANY or in6addr_any, otherwise multicast
// packets will be filtered out despite joining the group
#if (defined (__WINDOWS__))
memcpy (&bind_address, bind_to->ai_addr, bind_to->ai_addrlen);
#else
memcpy (&bind_address, send_to->ai_addr, send_to->ai_addrlen);
if (zsys_ipv6 ())
bind_address.__inaddr_u.__addr6.sin6_addr = in6addr_any;
else
bind_address.__inaddr_u.__addr.sin_addr.s_addr = htonl (INADDR_ANY);
#endif
memcpy (&self->broadcast, send_to->ai_addr, send_to->ai_addrlen);
if (zsys_ipv6()) {
struct ipv6_mreq mreq;
mreq.ipv6mr_interface = if_index;
memcpy (&mreq.ipv6mr_multiaddr,
&(((in6addr_t *)(send_to->ai_addr))->sin6_addr),
sizeof (struct in6_addr));
if (setsockopt (self->udpsock, IPPROTO_IPV6, IPV6_JOIN_GROUP,
(char *)&mreq, sizeof (mreq)))
zsys_socket_error ("zbeacon: setsockopt IPV6_JOIN_GROUP failed");
if (setsockopt (self->udpsock, IPPROTO_IPV6, IPV6_MULTICAST_IF,
(char *)&if_index, sizeof (if_index)))
zsys_socket_error ("zbeacon: setsockopt IPV6_MULTICAST_IF failed");
if (setsockopt (self->udpsock_send, IPPROTO_IPV6, IPV6_JOIN_GROUP,
(char *)&mreq, sizeof (mreq)))
zsys_socket_error ("zbeacon: setsockopt IPV6_JOIN_GROUP failed");
if (setsockopt (self->udpsock_send, IPPROTO_IPV6, IPV6_MULTICAST_IF,
(char *)&if_index, sizeof (if_index)))
zsys_socket_error ("zbeacon: setsockopt IPV6_MULTICAST_IF failed");
}
// If bind fails, we close the socket for opening again later (next poll interval)
if (bind (self->udpsock_send, bind_to->ai_addr, bind_to->ai_addrlen) ||
bind (self->udpsock, (struct sockaddr *)&bind_address,
zsys_ipv6 () ? sizeof (in6addr_t) : sizeof (inaddr_t))) {
zsys_debug ("zbeacon: Unable to bind to broadcast address, reason=%s", strerror (errno));
zsys_udp_close (self->udpsock);
self->udpsock = INVALID_SOCKET;
zsys_udp_close (self->udpsock_send);
self->udpsock_send = INVALID_SOCKET;
}
else if (streq (iface, "*")) {
strcpy(self->hostname, "*");
if (self->verbose)
zsys_info ("zbeacon: configured, hostname=%s", self->hostname);
}
else if (getnameinfo (bind_to->ai_addr, bind_to->ai_addrlen,
self->hostname, NI_MAXHOST, NULL, 0, NI_NUMERICHOST) == 0) {
if (self->verbose)
zsys_info ("zbeacon: configured, hostname=%s", self->hostname);
}
}
else
{
// No valid interface. Close the socket so that we can try again later
zsys_udp_close(self->udpsock);
self->udpsock = INVALID_SOCKET;
zsys_udp_close (self->udpsock_send);
self->udpsock_send = INVALID_SOCKET;
}
freeaddrinfo (bind_to);
freeaddrinfo (send_to);
}
// --------------------------------------------------------------------------
// Prepare beacon to work on specified UPD port, reply hostname to
// pipe (or "" if this failed)
static void
s_self_configure (self_t *self, int port_nbr)
{
assert (port_nbr);
snprintf (self->port_nbr, 7, "%d", port_nbr);
s_self_prepare_udp (self);
zstr_send (self->pipe, self->hostname);
if (streq (self->hostname, ""))
zsys_error ("No broadcast interface found, (ZSYS_INTERFACE=%s)", zsys_interface ());
}
// --------------------------------------------------------------------------
// Handle a command from calling application
static int
s_self_handle_pipe (self_t *self)
{
// Get just the command off the pipe
char *command = zstr_recv (self->pipe);
if (!command)
return -1; // Interrupted
if (self->verbose)
zsys_info ("zbeacon: API command=%s", command);
if (streq (command, "VERBOSE"))
self->verbose = true;
else
if (streq (command, "CONFIGURE")) {
int port;
int rc = zsock_recv (self->pipe, "i", &port);
assert (rc == 0);
s_self_configure (self, port);
}
else
if (streq (command, "PUBLISH")) {
zframe_destroy (&self->transmit);
zsock_recv (self->pipe, "fi", &self->transmit, &self->interval);
assert (zframe_size (self->transmit) <= UDP_FRAME_MAX);
if (self->interval == 0)
self->interval = INTERVAL_DFLT;
// Start broadcasting immediately
self->ping_at = zclock_mono ();
}
else
if (streq (command, "SILENCE"))
zframe_destroy (&self->transmit);
else
if (streq (command, "SUBSCRIBE")) {
zframe_destroy (&self->filter);
self->filter = zframe_recv (self->pipe);
assert (zframe_size (self->filter) <= UDP_FRAME_MAX);
}
else
if (streq (command, "UNSUBSCRIBE"))
zframe_destroy (&self->filter);
else
if (streq (command, "$TERM"))
self->terminated = true;
else {
zsys_error ("zbeacon: - invalid command: %s", command);
assert (false);
}
zstr_free (&command);
return 0;
}
// --------------------------------------------------------------------------
// Receive and filter the waiting beacon
static void
s_self_handle_udp (self_t *self)
{
assert (self);
char peername [NI_MAXHOST];
zframe_t *frame = zsys_udp_recv (self->udpsock, peername, NI_MAXHOST);
if (!frame)
return;
// If filter is set, check that beacon matches it
bool is_valid = false;
if (self->filter) {
byte *filter_data = zframe_data (self->filter);
size_t filter_size = zframe_size (self->filter);
if (zframe_size (frame) >= filter_size
&& memcmp (zframe_data (frame), filter_data, filter_size) == 0)
is_valid = true;
}
// If valid, discard our own broadcasts, which UDP echoes to us
if (is_valid && self->transmit) {
byte *transmit_data = zframe_data (self->transmit);
size_t transmit_size = zframe_size (self->transmit);
if (zframe_size (frame) == transmit_size
&& memcmp (zframe_data (frame), transmit_data, transmit_size) == 0)
is_valid = false;
}
// If still a valid beacon, send on to the API
if (is_valid) {
zmsg_t *msg = zmsg_new ();
assert (msg);
zmsg_addstr (msg, peername);
zmsg_append (msg, &frame);
if (zmsg_send (&msg, self->pipe) < 0)
zmsg_destroy (&msg);
}
else
zframe_destroy (&frame);
}
// --------------------------------------------------------------------------
// Send the beacon over UDP
static int
s_emit_beacon (self_t *self)
{
#if defined (__WINDOWS__)
// Windows doesn't broadcast on all interfaces when using INADDR_BROADCAST
// only the interface with the highest metric (as seen in `route print`)
// so send a packet per interface to each broadcast address
if (streq (zsys_interface (), "*") && !zsys_ipv6 ()) {
INTERFACE_INFO interface_list [64];
DWORD bytes_received = 0;
int rc = WSAIoctl (self->udpsock, SIO_GET_INTERFACE_LIST, 0, 0,
&interface_list, sizeof (interface_list), &bytes_received, NULL, NULL);
assert (rc != SOCKET_ERROR);
int num_interfaces = bytes_received / sizeof (INTERFACE_INFO);
// iiBroadcastAddress is always 255.255.255.255 need to calculate the specific broadcast address using the netmask
// keep the same parameters as self->broadcast but just replace the address for each interface
inaddr_t addr;
memcpy(&addr, &self->broadcast, sizeof (inaddr_t));
for (int i = 0; i < num_interfaces; ++i)
{
addr.sin_addr.S_un.S_addr = (interface_list[i].iiAddress.AddressIn.sin_addr.S_un.S_addr
| ~(interface_list[i].iiNetmask.AddressIn.sin_addr.S_un.S_addr));
if (zsys_udp_send (self->udpsock_send, self->transmit,
(inaddr_t *)&addr, sizeof (inaddr_t))) {
// Send failed, cause zbeacon to re-init socket
return -1;
}
}
return 0;
}
#endif
return zsys_udp_send (self->udpsock_send, self->transmit,
(inaddr_t *)&self->broadcast,
zsys_ipv6 () ? sizeof (in6addr_t) : sizeof (inaddr_t));
}
// --------------------------------------------------------------------------
// zbeacon() implements the zbeacon actor interface
void
zbeacon (zsock_t *pipe, void *args)
{
self_t *self = s_self_new (pipe);
assert (self);
// Signal successful initialization
zsock_signal (pipe, 0);
while (!self->terminated) {
// Poll on API pipe and on UDP socket
zmq_pollitem_t pollitems [] = {
{ zsock_resolve (self->pipe), 0, ZMQ_POLLIN, 0 },
{ NULL, self->udpsock, ZMQ_POLLIN, 0 }
};
long timeout = -1;
if (self->transmit) {
timeout = (long) (self->ping_at - zclock_mono ());
if (timeout < 0)
timeout = 0;
}
int pollset_size = (self->udpsock && self->udpsock != INVALID_SOCKET) ? 2: 1;
if (zmq_poll (pollitems, pollset_size, timeout * ZMQ_POLL_MSEC) == -1)
break; // Interrupted
if (pollitems [0].revents & ZMQ_POLLIN)
s_self_handle_pipe (self);
if (pollitems [1].revents & ZMQ_POLLIN)
s_self_handle_udp (self);
if (self->transmit
&& zclock_mono () >= self->ping_at) {
// Send beacon to any listening peers
if (!self->udpsock_send || self->udpsock_send == INVALID_SOCKET || s_emit_beacon(self))
{
const char *reason = (!self->udpsock_send || self->udpsock_send == INVALID_SOCKET) ? "invalid socket" : strerror (errno);
zsys_debug ("zbeacon: failed to transmit, attempting reconnection. reason=%s", reason);
// Try to recreate UDP socket on interface
s_self_prepare_udp (self);
}
self->ping_at = zclock_mono () + self->interval;
}
}
s_self_destroy (&self);
}
// --------------------------------------------------------------------------
// Selftest
void
zbeacon_test (bool verbose)
{
printf (" * zbeacon: ");
if (verbose)
printf ("\n");
// @selftest
// Test 1 - two beacons, one speaking, one listening
// Create speaker beacon to broadcast our service
zactor_t *speaker = zactor_new (zbeacon, NULL);
assert (speaker);
if (verbose)
zstr_sendx (speaker, "VERBOSE", NULL);
zsock_send (speaker, "si", "CONFIGURE", 9999);
char *hostname = zstr_recv (speaker);
if (!*hostname) {
printf ("OK (skipping test, no UDP broadcasting)\n");
zactor_destroy (&speaker);
freen (hostname);
return;
}
freen (hostname);
// Create listener beacon on port 9999 to lookup service
zactor_t *listener = zactor_new (zbeacon, NULL);
assert (listener);
if (verbose)
zstr_sendx (listener, "VERBOSE", NULL);
zsock_send (listener, "si", "CONFIGURE", 9999);
hostname = zstr_recv (listener);
assert (*hostname);
freen (hostname);
// We will broadcast the magic value 0xCAFE
byte announcement [2] = { 0xCA, 0xFE };
zsock_send (speaker, "sbi", "PUBLISH", announcement, 2, 100);
// We will listen to anything (empty subscription)
zsock_send (listener, "sb", "SUBSCRIBE", "", 0);
// Wait for at most 1/2 second if there's no broadcasting
zsock_set_rcvtimeo (listener, 500);
char *ipaddress = zstr_recv (listener);
if (ipaddress) {
zframe_t *content = zframe_recv (listener);
assert (zframe_size (content) == 2);
assert (zframe_data (content) [0] == 0xCA);
assert (zframe_data (content) [1] == 0xFE);
zframe_destroy (&content);
zstr_free (&ipaddress);
zstr_sendx (speaker, "SILENCE", NULL);
}
zactor_destroy (&listener);
zactor_destroy (&speaker);
// Test subscription filter using a 3-node setup
zactor_t *node1 = zactor_new (zbeacon, NULL);
assert (node1);
zsock_send (node1, "si", "CONFIGURE", 5670);
hostname = zstr_recv (node1);
assert (*hostname);
freen (hostname);
zactor_t *node2 = zactor_new (zbeacon, NULL);
assert (node2);
zsock_send (node2, "si", "CONFIGURE", 5670);
hostname = zstr_recv (node2);
assert (*hostname);
freen (hostname);
zactor_t *node3 = zactor_new (zbeacon, NULL);
assert (node3);
zsock_send (node3, "si", "CONFIGURE", 5670);
hostname = zstr_recv (node3);
assert (*hostname);
freen (hostname);
zsock_send (node1, "sbi", "PUBLISH", "NODE/1", 6, 250);
zsock_send (node2, "sbi", "PUBLISH", "NODE/2", 6, 250);
zsock_send (node3, "sbi", "PUBLISH", "RANDOM", 6, 250);
zsock_send (node1, "sb", "SUBSCRIBE", "NODE", 4);
// Poll on three API sockets at once
zpoller_t *poller = zpoller_new (node1, node2, node3, NULL);
assert (poller);
int64_t stop_at = zclock_mono () + 1000;
while (zclock_mono () < stop_at) {
long timeout = (long) (stop_at - zclock_mono ());
if (timeout < 0)
timeout = 0;
void *which = zpoller_wait (poller, timeout);
if (which) {
assert (which == node1);
char *ipaddress, *received;
zstr_recvx (node1, &ipaddress, &received, NULL);
assert (streq (received, "NODE/2"));
zstr_free (&ipaddress);
zstr_free (&received);
}
}
zpoller_destroy (&poller);
// Stop listening
zstr_sendx (node1, "UNSUBSCRIBE", NULL);
// Stop all node broadcasts
zstr_sendx (node1, "SILENCE", NULL);
zstr_sendx (node2, "SILENCE", NULL);
zstr_sendx (node3, "SILENCE", NULL);
// Destroy the test nodes
zactor_destroy (&node1);
zactor_destroy (&node2);
zactor_destroy (&node3);
#if defined (__WINDOWS__)
zsys_shutdown();
#endif
// @end
printf ("OK\n");
}