-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add datagram raw socket type #1986
Conversation
@Bitiquinho |
@hitstergtd |
Add the new public API in src/precompiled.hpp too, together with include/zmq.h, so that it works when DRAFT APIs are disabled |
Interesting idea, I will take a look later today... Is it working now or need more tweaking? |
Please rebase your branch on top of master rather than merging it, otherwise the git tree will be very messy and the diff is incomprehensible :-) git rebase --onto master 50685d^ |
@bluca
And git gives me:
So I needed to do a "git pull" befor pushing changes. Now I tried to push with --force after the rebase to skip the merge. |
No worries! Yes, when you rebase your branch push -f is the right thing. |
@@ -396,7 +398,8 @@ test_apps = \ | |||
tests/test_sockopt_hwm \ | |||
tests/test_heartbeats \ | |||
tests/test_stream_exceeds_buffer \ | |||
tests/test_pub_invert_matching | |||
tests/test_pub_invert_matching \ | |||
tests/test_dgram |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add this in the "if ENABLE_DRAFTS" section below, otherwise it will break the build
@somdoron |
I was thinking... In msg.hpp I can see that messages have a union of structs to handle different types of messages, and all of them contain the fields struct {
...
char group [16];
uint32_t routing_id;
}; that wouldn't be used for ZMQ_STREAM and ZMQ_DGRAM messages. Why don't raw stream sockets use this available memory to store a peer identifier, avoiding the need for manually sending two messages ? In the case of datagrams, wouldn't be a good idea to add a new struct to this union with a field unsigned char address[20] to store the address info ? According to the Beej's guide, the structure struct sockaddr_in {
short int sin_family; // Address family, AF_INET
unsigned short int sin_port; // Port number
struct in_addr sin_addr; // Internet address (32 bits)
unsigned char sin_zero[8]; // Same size as struct sockaddr
}; has 2 + 2 + 4 + 8 = 16 bytes of data, which fits the available memory. The downside of it is that we couldn't store IPv6 addresses struct sockaddr_in6 {
u_int16_t sin6_family; // address family, AF_INET6
u_int16_t sin6_port; // port number, Network Byte Order
u_int32_t sin6_flowinfo; // IPv6 flow information
struct in6_addr sin6_addr; // IPv6 address
u_int32_t sin6_scope_id; // Scope ID
};
struct in6_addr {
unsigned char s6_addr[16]; // IPv6 address
}; as they need 2 + 2 + 4 + 16 + 4 = 28 bytes. But maybe we could create a common structure to store both addresses info, ignore the sin6_flowinfo and sin6_scope_id fields (storing exactly 20 bytes), and rebuild the sockaddr_in* structures on demand: typedef struct socket_address {
uint16_t family; // stores sin_family or sin6_family
uint16_t port; // stores sin_port or sin6_port
unsigned char host[16]; // stores sin_addr (first 4 bytes) or sin6_addr
} socket_address_t; Obviously, as there is zmq_msg_set_routing_id and zmq_msg_set_group functions, we would need zmq_msg_set_address as well. I suppose that's a topic for a different pull request. But what do you think ? |
@Bitiquinho They also feel more natural. |
@hitstergtd Aside from that, does the overall idea make sense ? |
Something I don't understand well: at tests/test_udp.cpp, the function for sending radio messages looks like: int msg_send (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_)
{
int rc = zmq_msg_init_size (msg_, strlen (body_));
if (rc != 0)
return rc;
memcpy (zmq_msg_data (msg_), body_, strlen (body_));
rc = zmq_msg_set_group (msg_, group_);
if (rc != 0) {
zmq_msg_close (msg_);
return rc;
}
rc = zmq_msg_send (msg_, s_, 0);
zmq_msg_close (msg_);
return rc;
} So I guessed that the content and the group goes in the same message. But inside the original udp_engine.cpp void zmq::udp_engine_t::out_event()
{
msg_t group_msg;
int rc = session->pull_msg (&group_msg);
errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
if (rc == 0) {
msg_t body_msg;
rc = session->pull_msg (&body_msg);
size_t group_size = group_msg.size ();
size_t body_size = body_msg.size ();
size_t size = group_size + body_size + 1;
// TODO: check if larger than maximum size
out_buffer[0] = (unsigned char) group_size;
memcpy (out_buffer + 1, group_msg.data (), group_size);
memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);
rc = group_msg.close ();
errno_assert (rc == 0);
body_msg.close ();
errno_assert (rc == 0);
rc = sendto (fd, out_buffer, size, 0,
address->resolved.udp_addr->dest_addr (),
address->resolved.udp_addr->dest_addrlen ());
errno_assert (rc != -1);
}
else
reset_pollout (handle);
} There are 2 messages being pulled to be combined in the output buffer. Am I interpreting right ? If so, why is that ? (the same applies to receiving dish messages) (Further investigation showed that this separation only happens on the push_msg() and pull_msg() overrides of radio_session_t and dish_session_t. Why the message is not simply forwarded, like in session_base_t, and we extract/fill group info from the u.base.group field of msg_t ? Obviously I don't understand all the code, but, for now, I don't see a reason for it.) |
@Bitiquinho
Apologies, I have not looked at your PR closely enough. I am not subscribed to the zeromq-dev list, but it would be good to ask there too, although I doubt you will get any nay-sayers. |
I would also ensure that It allows API users to easily and intuitively use the same functions for sending to TCP and raw UDP sockets without any special conditional logic. An appropriate |
@hitstergtd |
So few thoughts:
I think you are pretty close to make it work, the main problem is in the dgram file, try to copy pair instead, all incoming and outgoing is from one pipe. Each message must be 2 frames, first for the ip address with more set and second is the message it self. You already have the logic right in the udp_engine, it seems. |
@Bitiquinho, regarding the two messages in the udp_engine vs one send in the API, the radio_session convert the one message to two multi-part message for the transport. |
Because transports doesn't handle the group field, transport only knows how to send messages. Therefore before pushing to the transport we send the group as a multipart message. |
There is a problem with adding new fields to msg structure as the space is very expensive. Maybe we can a new union for routing_id, group and address, as those are never used together and save some space. But I suggest not to start with it, as this new dgram is draft we can make minimal thing that work (with two part message) and later tweak it to one part. Lets get this PR ready for merge and we can tweak and improve later. |
I'm not sure about this, as it is complicating the API of libzmq, we use the routing id for server and we can also call this sendto, so I think we should stay with msg structure. Think of all the binding that now need to handle a new set of methods |
This is great stuff, but 👍 to keep the public API simpler from me too :-) |
@Bitiquinho - heads up, a change just got merged so you'll have a merge conflict - the draft API got moved to a new file, src/zmq_draft.h. Sorry for the churn! |
@somdoron
Maybe I'm not getting things right, but couldn't we have multiple pipes indexed by address data (rather than what stream uses, looks like it's a random number) for dgram_t. For incoming data, we really just have one underlying (native) socket listening for messages, but the (native) recvfrom call (at int_event) provides us the address data to find the right incoming pipe.
What I mean is to [fill the output buffer from/read the input buffer to] the group field of the msg_t object. Like this (considering only IPv4): void zmq::udp_engine_t::in_event()
{
socket_address_t in_msg_address; // As previously defined
struct sockaddr_in in_address;
socklen_t in_addrlen;
int nbytes = recvfrom(fd, in_buffer, MAX_UDP_MSG, 0, (sockaddr*) &in_address, &in_addrlen);
// [...]
void *identifier_buffer, *body_buffer;
int identifier_size, body_size;
msg_t msg;
if (options.raw_socket) { // Handle ZMQ_DGRAM
in_msg_address.family = in_address.sin_family; // Assuming AF_INET
in_msg_address.port = in_address.sin_port;
memcpy( &(in_msg_address.host), &(in_address.sin_addr.s_addr), sizeof(uint32_t) );
identifier_buffer = (void*) &(in_msg_address);
identifier_size = sizeof(in_msg_address);
body_buffer = in_buffer;
body_size = nbytes;
}
else { // Handle ZMQ_DISH
identifier_buffer = in_buffer + 1;
identifier_size = in_buffer[0];
body_buffer = in_buffer + 1 + identifier_size;
body_size = nbytes - 1 - identifier_size;
}
int rc = msg.init_size (body_size);
if( options.raw_socket ) msg.set_address( identifier_buffer, identifier_size ); // New method for datagram
else msg.set_group( identifier_buffer, identifier_size ); // Set dish received message group
memcpy (msg.data (), body_buffer, body_size); // Set message content
rc = session->push_msg (&msg); // Push single message with group/address and content data to session_base
// [...]
} void zmq::udp_engine_t::out_event()
{
msg_t msg;
int rc = session->pull_msg (&msg); // Pull single message with group/address and content data from session_base
errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
if (rc == 0) {
size_t body_size = msg.size ();
size_t out_buffer_size;
struct sockaddr_in out_address_data;
struct sockaddr* out_address = (struct sockaddr*) address->resolved.udp_addr->dest_addr ();
socklen_t out_addrlen = address->resolved.udp_addr->dest_addrlen ();
if (options.raw_socket) { // Handle ZMQ_DGRAM
socket_address_t* out_msg_address = msg.get_address(); // New method for datagram
// Some way to determine if default address should be used
if (out_msg_address.family == AF_INET) {
out_address_data.sin_family = out_msg_address->family;
out_address_data.sin_port = out_msg_address->port;
memcpy( &(out_address_data.sin_addr.s_addr), &(out_msg_address->host), sizeof(uint32_t) );
out_address = (struct sockaddr*) out_address_data;
out_addrlen = sizeof(out_address_data);
}
memcpy (out_buffer, msg.data (), body_size);
out_buffer_size = body_size;
}
else { // Handle ZMQ_RADIO
const char* group_buffer = msg.group();
size_t group_size = strlen (group_buffer);
out_buffer[0] = (unsigned char) group_size;
memcpy (out_buffer + 1, group_buffer, group_size);
memcpy (out_buffer + 1 + group_size, msg.data (), body_size);
out_buffer_size = group_size + body_size + 1;
}
//[...]
rc = sendto (fd, out_buffer, out_buffer_size, 0, out_address, out_addrlen);
// [...]
}
// [...]
}
That's exactly what I mean, to create a new option in the message union that replaces routing_id and group for address: union {
struct {
// [...]
char group [16];
uint32_t routing_id;
} delimiter;
struct {
// [...]
unsigned char address [20];
} addressable;
} u; |
@bluca |
One pipe can match multiple destination addressed, for example, when you bind all incoming communication will be arrived from one pipe, we can try and manage multiple address to one pipe, but now you won't be able to send without receiving first, kind of deadlock. So if you only support one connect or one bind that solve this problem. We can try and solve this later, lets leave it for now. One problem at a time.
My opinion, which is back to solve one problem at a time, for now when using raw sockets push two messages upstream the IP address and then the message. Leave the group field out of this for a while. Again we can change that later. Also I think you should put the address as string IP:Port, the socket_addr structure is just too complicated for users, IP and Port is the notion all around zeromq, I think we should continue and do this. Later we can think of making address (string) as part of msg structure, maybe as union for group, routing_id and address. But for now, let's leave it, we can make the datagram perfect over time... |
I see what you mean now (guess so). I was thinking about sending to a default address (the one defined on the connect() call) when the user does not define it manually (with the hypothetical set_address() call). But when we do multiple bind/connect call, there will be no default address, so it does really make sense to restrict it like in a ZMQ_PAIR socket. Still, it would be nice that, once binded to an address and listening, a ZMQ_DGRAM socket could reply to whoever sent it a message, by aquiring remote address information.
I do agree to keep things separate and leave these ideas for a second iteration. That said, I was not intending to let the users manipulate the address structure directly, but only copying the received structure data from one message to the other. The IP address on a string format would be provided only in the unique connect() call. |
@somdoron, Moreover, it also gives us the opportunity to make the return type a ssize_t for those functions without breaking ABI and also since the current send/recv API uses Ints. It's a Win-Win all around, as far as I can see, unless I am missing something here. |
Moreover, for a connection less protocol, of any sort, I have always thought the paradigm of setting the destination/recipient address as a "packaged" proposition. Maybe that's just my affliction of having used BSD sockets for a while! ;-) |
@somdoron, apologies I didn't understand the above; maybe that's one of the points that's missing in my suggestion. However, if you mean API Bindings, then I would have thought that they would need to handle a new set of methods anyway when they have to catch up with changes in 4.2; but at least by using ssize_t for sendto/recvfrom return values and giving them a set of methods to deal with connection-less protocols, we would be setting them up for the future. It's a one-off cost. |
@hitstergtd the problem is that a lot is around the send/recv methods, it is not just new methods, take CZMQ for example, we have zstring,zsock,zframe,zmessage and more all using regular send/recv methods. Think of zproxy for example as well... Other binding are even more complicated and build rich api for sending, now need to duplicate everything. Also I don't see much difference between server, router or datagram in all you have to specify the address when sending. Having different API for some of the socket types just making the library complicated. We can if we want make the sendto/recvfrom as syntactic-sugar for specifying routing id/ address. |
I tried to make the ZMQ_DGRAM socket more like ZMQ_PAIR now. Still need to make it pass the tests though. |
@somdoron, See, I knew you had perfectly valid reasons! Thanks for providing a thorough perspective. |
@Bitiquinho if you are fine with it I can try to send PR based on your work so the test pass... |
@somdoron I'm ok with that |
closing, continue here #1994 |
I'm trying to add a raw datagram (UDP) socket type to ZeroMQ, for compatibility with applications that use BSD sockets directly.
I'm trying to reproduce (a.k.a. copy) the same behaviour of the ZMQ_STREAM socket type (2 part message with id and content), which has more or less the same goal, using the recently added UDP engine with some adaptions.
The idea is to fill the first part of the received message (the group for the DISH socket or the id for the STREAM socket) with the address (struct sockaddr_in structure) of the incoming message (aquired with recvfrom), so that we are able to send a reply back to the same peer (with sendto).
If I set the group/id message to NULL (with size 0), the message is sent to the own socket address, which could be useful if it is multicast.
I guess it's all pretty hacky and incomplete (I basically copied the stream_t class), but I'm still trying to figure out everything I should change for this to work, as I'm not used to the code base. Suggestions (even if it's "forget about it !") are really welcome.