Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OSX: no data received #1

Closed
advancedlogic opened this issue Jul 17, 2013 · 13 comments
Closed

OSX: no data received #1

advancedlogic opened this issue Jul 17, 2013 · 13 comments
Labels

Comments

@advancedlogic
Copy link

Hi,
trying the test file you wrote. I successfully receive the heartbeat but when I try to publish something on a topic, the test receives data but it's empty and every attempt to publish again on the same topic the test fails to receive any data.
Any suggestion?
Thanks
Antonio

@mreiferson
Copy link
Member

hi @advancedlogic, thanks for the report. I've been meaning to get back to fleshing this library out so I'm not surprised there are bugs :)

I'll poke around, it's probably something rather trivial.

@mreiferson
Copy link
Member

alright, so there was a bug in libevbuffsock that I fixed in mreiferson/libevbuffsock@69d306f

but, the second problem is just due to missing functionality. this library needs to decode NSQ message data when frame_type == 2. I suspect it's running into NULLs in the string and thus not printing the data in the test app.

If you're interested in contributing it should be relatively straightforward to add a message type that can decode data. The reader type should be modified to take a message handler rather than a low-level data callback... and only when the reader receives an NSQ message should it decode and pass that to the handler.

@mreiferson
Copy link
Member

resolved in mreiferson@6ee10d8

@advancedlogic
Copy link
Author

I still have the issue (sorry to bother you) ... I have solved the second issue parsing the char* data arriving from on_data (it's timestamp[8Bytes]+attempts[2Bytes]+id[16Bytes]+message[NBytes]) ...

Here is the code (is a mix of C and Objective C, but it works)

#import "MLNSQ.h"
#include <stdio.h>
#include "nsq.h"


#ifdef DEBUG
#define _DEBUG(...) fprintf(stdout, __VA_ARGS__)
#else
#define _DEBUG(...) do {;} while (0)
#endif

static void on_data(struct NSQReader *rdr, struct NSQDConnection *conn, uint32_t frame_type, uint32_t msg_size, char *data) {
    NSData * nsData = [NSData dataWithBytes:data length:msg_size];
    if (msg_size > 28) {
        int position = 0;
        char *timestamp = malloc(8);
        [nsData getBytes:timestamp range:NSMakeRange(position, 8)];
        char *n = malloc(2);
        position += 8;
        [nsData getBytes:n range:NSMakeRange(position, 2)];
        char *ID = malloc(17);

        position += 2;
        [nsData getBytes:ID range:NSMakeRange(position, 16)];
        ID[16] = '\0';
        NSString *nsID = [NSString stringWithUTF8String:ID];
        position += 16;
        char *content = malloc(msg_size - position + 1);
        [nsData getBytes:content range:NSMakeRange(position, msg_size - position)];

        content[msg_size - position]='\0';
        NSString *nsContent = [NSString stringWithUTF8String:content];

        NSLog(@"%@ - %@\n", nsID, nsContent);
        //nsq_finish(conn->command_buf, [nsID UTF8String]);
        //nsq_ready(conn->command_buf, rdr->max_in_flight);
    }
    NSLog(@"%@",nsData);
}

static void on_connect(struct NSQReader *rdr, struct NSQDConnection *conn) {
    NSLog(@"Connected");
}

static void on_close(struct NSQReader *rdr, struct NSQDConnection *conn) {
    NSLog(@"Closed");
}

@implementation MLNSQ
@synthesize host;
@synthesize topic;
@synthesize channel;
@synthesize port;

- (id)initWithHost:(NSString*)_host andPort:(int)_port {
    if((self = [super init])) {
        self.host = _host;
        self.port = _port;
    }
    return self;
}

-(void)subscribe {
    struct NSQReader *rdr;

    rdr = new_nsq_reader([self.topic UTF8String], [self.channel UTF8String], on_connect, on_close, on_data);
    nsq_reader_connect_to_nsqd(rdr, [self.host UTF8String], self.port);
    nsq_run(ev_default_loop(0));
    NSLog(@"Exit");
}

- (void)subscribeToTopic:(NSString*)_topic andChannel:(NSString*)_channel {
    self.topic = _topic;
    self.channel = _channel;
    NSLog(@"Subscribing to channel %@ of topic %@", channel, topic);
    [NSThread detachNewThreadSelector:@selector(subscribe) toTarget:self withObject:nil];
}
@end

@mreiferson
Copy link
Member

take a look at the changes I made to the reader API and test code in mreiferson@6ee10d8 - it no longer takes a data callback, instead it takes a message callback which receives the (correctly) decoded msg with easy access to the various properties.

@advancedlogic
Copy link
Author

Yes I added all your code and the message parser works very well (thanks) but I still have issues when I send a message twice (I just see it the first time) … no problem with the heartbeat (and patched all the libs).
Do I have to send a finish command (void nsq_finish(struct Buffer *buf, const char *id) and how can I have the *buf now that I do not have the conn?
Thanks
Antonio

On Jul 17, 2013, at 8:33 PM, Matt Reiferson notifications@github.com wrote:

take a look at the changes I made to the reader API and test code in 6ee10d8 - it no longer takes a data callback, instead it takes a message callback which receives the (correctly) decoded msg with easy access to the various properties.


Reply to this email directly or view it on GitHub.

@mreiferson
Copy link
Member

Ahh, got it.

This library doesn't yet have code to maintain a RDY count with the nsqd it is connected to. Relatedly, like you pointed out, it needs a better abstraction to finish or requeue a message (which is related to maintaining your RDY count). Perhaps the message handler function should also be passed the connection as well so you can do that yourself.

So, what's happening is that after you receive your first message you're in a RDY state of 0, which means you wont be pushed any more messages.

@advancedlogic
Copy link
Author

So I have modified your test and this is the solution (if you like to update your lib adding the connection to the on_data callback:) ) …. and now it works with more than one message. My error was that I did not wrap the command with buffer_reset and buffer_socket_write_buffer … Now I have a library to manage NSQ on OSX and iOS … many thanks.

static void on_data(struct NSQReader _rdr, struct NSQDConnection *conn, struct NSQMessage *msg) {
_DEBUG("%s: %lld, %d, %s, %lu, %._s\n", FUNCTION, msg->timestamp, msg->attempts, msg->id,
msg->body_length, (int)msg->body_length, msg->body);
buffer_reset(conn->command_buf);
nsq_finish(conn->command_buf, msg->id);
buffered_socket_write_buffer(conn->bs, conn->command_buf);

buffer_reset(conn->command_buf);
nsq_ready(conn->command_buf, rdr->max_in_flight);
buffered_socket_write_buffer(conn->bs, conn->command_buf);

}

@mreiferson
Copy link
Member

yea, that should work... good to hear :)

@advancedlogic
Copy link
Author

Another suggestion, to avoid buffer overflow and malicious code injection, I hope this helps :

struct NSQMessage {
int64_t timestamp;
uint16_t attempts;
char id[17];
size_t body_length;
char *body;
};

struct NSQMessage *nsq_decode_message(const char *data, size_t data_length) {
struct NSQMessage *msg;
size_t body_length;

msg = malloc(sizeof(struct NSQMessage));
msg->timestamp = (int64_t)ntoh64((uint64_t *)data);
msg->attempts = ntohs(*(uint16_t *)data+8);
memcpy(&msg->id, data+10, 17);
msg->id[16] = '\0';
body_length = data_length - 26;
msg->body = malloc(body_length + 1);
memcpy(msg->body, data+26, body_length);
msg->body[body_length] = '\0';
msg->body_length = body_length;

return msg;

}

On Jul 17, 2013, at 8:57 PM, Matt Reiferson notifications@github.com wrote:

Ahh, got it.

This library doesn't yet have code to maintain a RDY count with the nsqd it is connected to. Relatedly, like you pointed out, it needs a better abstraction to finish or requeue a message (which is related to maintaining your RDY count). Perhaps the message handler function should also be passed the connection as well so you can do that yourself.

So, what's happening is that after you receive your first message you're in a RDY state of 0, which means you wont be pushed any more messages.


Reply to this email directly or view it on GitHub.

@mreiferson
Copy link
Member

I pushed another commit fixing a memory leak and an issue decoding message attempts eb869f6

You don't need to allocate 17 bytes for the message ID to include a NULL byte because it's a constant size and it's safer to always use pointer+size.

@advancedlogic
Copy link
Author

Maybe it's a bug of Xcode but without the 17Bytes sometimes I have some extra chars on the tail of the id

On Jul 17, 2013, at 10:29 PM, Matt Reiferson notifications@github.com wrote:

I pushed another commit fixing a memory leak and an issue decoding message attempts eb869f6

You don't need to allocate 17 bytes for the message ID to include a NULL byte because it's a constant size and it's safer to always use pointer+size.


Reply to this email directly or view it on GitHub.

@mreiferson
Copy link
Member

you mean in the Xcode UI? Yea, it doesn't know the size in that context so you're probably seeing the bytes that come after (in this case body_length).

In your actual program code this won't matter as long as you always manipulate msg->id as 16 bytes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants