Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying to set ZMQ_CONFLATE within rzmq #56

Open
modernresearch opened this issue Aug 27, 2021 · 4 comments
Open

Trying to set ZMQ_CONFLATE within rzmq #56

modernresearch opened this issue Aug 27, 2021 · 4 comments

Comments

@modernresearch
Copy link

I'm trying to set option ZMQ_CONFLATE to 1 (http://api.zeromq.org/master:zmq-setsockopt) for a ZMQ_SUB socket.

I didn't see a built-in function to do that, so I tried to add my own. Here's what I did:

  1. Added function to R/zmq.R, following pattern of set.affinity:
set.conflate <- function(socket, option.value) {
    .Call("set_conflate",socket, option.value, PACKAGE="rzmq")
}
  1. Added set.conflate to export list in NAMESPACE file.

  2. Added set_conflate function in src/interface.cpp, following same pattern as
    set_affinity:

SEXP set_conflate(SEXP socket_, SEXP option_value_) {

  zmq::socket_t* socket = reinterpret_cast<zmq::socket_t*>(checkExternalPointer(socket_,"zmq::socket_t*"));
  if(!socket) { REprintf("bad socket object.\n");return R_NilValue; }
  if(TYPEOF(option_value_)!=INTSXP) { REprintf("option value must be an int.\n");return R_NilValue; }
  SEXP ans; PROTECT(ans = allocVector(LGLSXP,1)); LOGICAL(ans)[0] = 1;

  uint64_t option_value(INTEGER(option_value_)[0]);
  try {
    socket->setsockopt(ZMQ_CONFLATE, &option_value, sizeof(uint64_t));
  } catch(std::exception& e) {
    REprintf("%s\n",e.what());
    LOGICAL(ans)[0] = 0;
  }
  UNPROTECT(1);
  return ans;
}
  1. Added a line to src/interface.h in the extern "C" block:

SEXP set_conflate(SEXP socket_, SEXP option_value_);

  1. Deleted src/rzmq.so from my prior installation.

  2. Re-installed package from source, verified src/rzmq.so was re-built.

  3. When I tried to use it, it definitely finds the function and tries, but it says the argument is invalid:

> library(rzmq)
> sock = init.socket(init.context(), "ZMQ_SUB")
> connect.socket(sock, "tcp://host:port")
> subscribe(sock, "")
> set.conflate(sock, 1L)
Invalid argument
[1] FALSE

Two questions:

  1. Why would it say 1L is an invalid argument? The docs say it's expecting an integer.
  2. Is there a better way to accomplish setting ZMQ_CONFLATE to 1?
@armstrtw
Copy link
Contributor

Can you push your branch, and drop a link to it here?

Alternatively if you make a PR w/ your changes, we can test.

@modernresearch
Copy link
Author

Thanks for the quick response - here's the PR: #57

Let me know if you need anything else!

@armstrtw
Copy link
Contributor

Your PR seems correct.

I think the issue you are seeing w/ 'Invalid argument' is related to your sock object being invalid.

Please check that before calling set.conflate.

Additionally, you can add a simple check after the setsocketopt call to see if your cmd was successful.

something along these lines (untested):
uint64_t option_value_check; // before try block

// add after the call to setsocketopt
socket->getsockopt(ZMQ_CONFLATE, &option_value_check, sizeof(uint64_t));
cout << option_value_check << endl;

I don't have time to test locally at this moment, but I can try later this weekend.

@modernresearch
Copy link
Author

Thanks, will check it out and follow up

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants