Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trouble with EPGM/multicast pub/sub #2227

Open
elockman opened this issue Jun 20, 2022 · 4 comments
Open

Trouble with EPGM/multicast pub/sub #2227

elockman opened this issue Jun 20, 2022 · 4 comments

Comments

@elockman
Copy link

Issue description

I have a sub/pub program working over tcp on separate nodes.
I am having trouble with the epgm protocol similar to thread (https://lists.zeromq.org/mailman/private/zeromq-dev/2013-July/021878.html), but the thread does not show the solution.

I'm currently working with 2 nodes and my ip addresses are 10.1.100.10 and 10.1.100.20.
Examples show something similar to this endpoint for zmq_connect
rc = zmq_connect(socket, "epgm://mesh0;239.192.1.1:5555");
assert (rc == 0);

I am not seeing any examples of binding with epgm for zmq_bind
Can someone provide an example, or point me to one?

Finally, I also have ipv6 addresses configured on the nodes.
I would like to avoid a dhcp server on my mesh network.
Is it possible to use ipv6 addresses with epgm in zmq?

Environment

Minimal test code / Steps to reproduce the issue

Full test code provided below.
Note: You may need to change the ipv4 or ipv6 address in the code below.
Protocol can be changed via the #def.

#include <pthread.h>
#include <czmq.h>

void *context = NULL;
void *publisher = NULL;

#define TEST_IPV4   0   //unicast endpoint using ipv4
#define TEST_IPV6   1   //unicast endpoint using ipv6
#define TEST_MCAST6 2   //publish to a multicast ipv6 address
#define TEST_EPGM4  3   //encapsulated PGM endpoint with ipv4
#define TEST_EPGM6  4   //encapsulated PGM endpoint with ipv6

// *** Set protocol here ***
#define TEST_SELECTED           TEST_EPGM4


void *sub_thread(void *data)
{
    data=data;   //self-assign to avoid warning

    int rc=0;
    char endpoint[128]={0};
    char msg[256];

    void *subscriber = zmq_socket(context, ZMQ_SUB);

    int t_len=0;
    char topic[64]={0};

    int ipv6=1;
    rc = zmq_setsockopt(subscriber, ZMQ_IPV6, &ipv6, sizeof(int));
    assert(rc == 0);

    rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", 0);
    assert(rc == 0);

#if (TEST_SELECTED == TEST_EPGM4 || TEST_SELECTED == TEST_EPGM6)
  rc = zmq_bind(subscriber, "epgm://mesh0;*:5555");
#else
    rc = zmq_bind(subscriber, "tcp://*:5555");
#endif
    assert(rc == 0);

    while(1) {
        rc = zmq_recv(subscriber, msg, 256, 0);
        if(rc > 0){
          //printf("RX VAL: %d\n", rc);
          printf("RX MSG: %s\n", msg);
          //prov_parse_msg(msg);
        }
    }

    zmq_close(subscriber);
    zmq_ctx_destroy(context);

    return NULL;
}

void mesh_rx_test(void)
{
  printf("**********************\n");
  printf("STARTING MESH RX TEST:\n");
  printf("**********************\n");

  //start subscription thread
  pthread_t thread_id_sub;
  pthread_create(&thread_id_sub, NULL, sub_thread, NULL);
}

void _zmq_publish(char* msg, int len)
{
  int rc = zmq_send(publisher, msg, len, 0);
  while(rc != len && rc >= 0){
      printf("_zmq_publish: %d != %d\n",rc,len);
      rc = zmq_send(publisher, msg, len, 0);
  }
  return;    
}

void mesh_tx_test(void)
{
  static int i=0;
  int rc=0;
  int zlen=0;
  char msg[64]={0};

  printf("**********************\n");
  printf("STARTING MESH TX TEST:\n");
  printf("**********************\n");

  publisher = zmq_socket (context, ZMQ_PUB);

  int ipv6=1;
  rc = zmq_setsockopt(publisher, ZMQ_IPV6, &ipv6, sizeof(int));
  assert(rc == 0);

#if (TEST_SELECTED == TEST_IPV4)
  rc = zmq_connect(publisher, "tcp://10.1.100.10:5555");
#endif
#if (TEST_SELECTED == TEST_IPV6)
  rc = zmq_connect(publisher, "tcp://[fe80::20f:ff:fefa:852b%mesh0]:5555");
#endif
#if (TEST_SELECTED == TEST_MCAST6)
  rc = zmq_connect(publisher, "tcp://[ff02::1%mesh0]:5555");
#endif
#if (TEST_SELECTED == TEST_EPGM4)
  rc = zmq_connect(publisher, "epgm://mesh0;239.192.1.1:5555");
#endif
#if (TEST_SELECTED == TEST_EPGM6)
  rc = zmq_connect(publisher, "epgm://mesh0;[ff02::1%mesh0]:5555");
#endif
  assert(rc == 0);

  while(1){
    i++;
    zlen = sprintf(msg,"{'topic':'test','seq':'%d'}", i);
    printf("TX %d: %s\n", i, msg);
    _zmq_publish(msg, zlen);
    zclock_sleep(1000);
  }
}

int main(void)
{
    context = zmq_ctx_new();

    mesh_rx_test();
     mesh_tx_test();
     while(1);

     zmq_ctx_destroy(context);

  return 0;
}

What's the actual result? (include assertion message & call stack if applicable)

Unicast versions (TEST_IPV4 and TEST_IPV6) work, but the multicast versions do not. Only the TX debug is yielded.

**********************
STARTING MESH RX TEST:
**********************
**********************
STARTING MESH TX TEST:
**********************
TX 1: {'topic':'test','seq':'1'}
TX 2: {'topic':'test','seq':'2'}
TX 3: {'topic':'test','seq':'3'}
TX 4: {'topic':'test','seq':'4'}
TX 5: {'topic':'test','seq':'5'}

What's the expected result?

Expect both TX and RX debug, not just the TX debug.

**********************
STARTING MESH RX TEST:
**********************
**********************
STARTING MESH TX TEST:
**********************
TX 1: {'topic':'test','seq':'1'}
TX 2: {'topic':'test','seq':'2'}
RX MSG: {'topic':'test','seq':'2'}
TX 3: {'topic':'test','seq':'3'}
RX MSG: {'topic':'test','seq':'3'}
TX 4: {'topic':'test','seq':'4'}
RX MSG: {'topic':'test','seq':'4'}
TX 5: {'topic':'test','seq':'5'}
RX MSG: {'topic':'test','seq':'5'}

@sphaero
Copy link
Contributor

sphaero commented Jun 21, 2022

You are familiar with these docs, I assume? https://github.com/zeromq/libzmq/blob/master/doc/zmq_pgm.txt

@elockman
Copy link
Author

Yes, thank you for the response.

But again, this provides an example to connect, but not to bind.
rc = zmq_connect(socket, "epgm://eth0;239.192.1.1:5555");
zmq_bind is still relevant while using epgm, correct? If so, what should the format look like?

Secondly, I'd like to know if I can use ipv6 instead of ipv4 when using epgm?
I believe ff02::1 is an ipv6 multicast address (which, I've found I can ping other ipv6 devices with).
Would something like this be a valid zmq_connect function call?
rc = zmq_connect(publisher, "epgm://[ff02::1%mesh0]:5555");

@elockman
Copy link
Author

@sphaero, thanks again for the response.

This is still an outstanding issue for me. I am getting the following error:

**********************
STARTING MESH RX TEST:
**********************
**********************
STARTING MESH TX TEST:
**********************
xsite_provision: xsite_provision.c:112: mesh_tx_test: Assertion `rc == 0' failed.
Aborted

When printed out, rc = -1.

Can anyone point me to a working epgm example?

@sphaero
Copy link
Contributor

sphaero commented Sep 21, 2022

I don't know if it's any help but I've been testing multicast through dgram sockets. Here's an example to send:

#include "czmq.h"

int main() {
    // mimick natnet actor
    zsock_t *data_socket = zsock_new(ZMQ_DGRAM);
    assert(data_socket);
    //int rc = zsock_connect(data_socket, "udp://239.0.0.1:5555");
    int rc = zsock_bind(data_socket, "udp://*:*");
    assert(rc == 0);
    int i = 0;
    while (i<10)
    {
        zstr_sendm(data_socket, "239.0.0.1:5555");
        zstr_send(data_socket, "hello");
        zsys_warning("hello %i", i);
        i++;
        zclock_sleep(1000);
    }
    zsock_destroy(&data_socket);
    return 0;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants