New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async Writer to support pub&mpub #25

Merged
merged 2 commits into from Apr 30, 2013

Conversation

Projects
None yet
2 participants
@felinx
Copy link
Contributor

felinx commented Apr 26, 2013

Add async Writer to support pub&mpub and fix JSONDecodeError


conn.id = conn_id
conn.last_recv_timestamp = time.time()
conn.last_msg_timestamp = time.time()

This comment has been minimized.

@mreiferson

mreiferson Apr 26, 2013

Member

i dont think we need this for writing

This comment has been minimized.

@mreiferson

mreiferson Apr 27, 2013

Member

Thanks for adding the tests, this stray line is the last item

This comment has been minimized.

@felinx

felinx Apr 27, 2013

Author Contributor
  •    conn.id = conn_id
    
  •    conn.last_recv_timestamp = time.time()
    
  •    conn.last_msg_timestamp = time.time()
    

Do you mean check_last_recv_timestamps etc?

This comment has been minimized.

@mreiferson

mreiferson Apr 27, 2013

Member

I meant last_msg_timestamp, it's not used here.

This comment has been minimized.

@felinx

felinx Apr 27, 2013

Author Contributor

Updated, please review it, thanks!

@mreiferson

This comment has been minimized.

Copy link
Member

mreiferson commented Apr 26, 2013

I had started writing some code in #11, can you port over the low-level tests I added there to this pull request (and I'll close #11).

I think eventually (not now) this can benefit from some re-structuring for the shared code in Reader and Writer for managing connections, data flow, and liveness to reduce some of the duplication, but I'm fine with this for now.

Appreciate the contribution, nice work.

@felinx

This comment has been minimized.

Copy link
Contributor Author

felinx commented Apr 26, 2013

Yes, there are some duplicated code now, I just want to implement this functional at first and I will do my best to do more contribution later.

@mreiferson

This comment has been minimized.

Copy link
Member

mreiferson commented Apr 27, 2013

Looks good, you mind squashing down to two commits (one for JSON fix, one for everything else)?

@felinx

This comment has been minimized.

Copy link
Contributor Author

felinx commented Apr 27, 2013

It's ok for me,thanks!

@felinx

This comment has been minimized.

Copy link
Contributor Author

felinx commented Apr 27, 2013

Squashing is done, it's ready for merging, thanks!

def pub_failed_callback(self, conn):
logging.error('[%s] failed to send pub (%s, %s)' % (conn.id,
conn.last_pub[0],
conn.last_pub(1)))

This comment has been minimized.

@mreiferson

mreiferson Apr 27, 2013

Member

I just noticed this... conn.last_pub(1) should be conn.last_pub[1], but I have an alternative suggestion.

Even though we have async heartbeat messages that the Writer has to handle... every PUB or MPUB command can expect exactly one FRAME_TYPE_RESPONSE or FRAME_TYPE_ERROR _in the order they were sent_.

So, if instead we maintained a callback_queue instead of just a response_callback_queue, we can actually respond to each specific PUB or MPUB command rather than a generic "this failed callback" like you've currently implemented.

This callback can be optional, but the API would look like this:

def handler(self):
    self.writer.pub("log", "Hello world", callback=self._finish_pub)

def _finish_pub(self, data):
    if data.frame_type_id == nsq.FRAME_TYPE_ERROR:
        logging.error("blablabla")

what do you think?

@mreiferson mreiferson referenced this pull request Apr 27, 2013

Closed

add Writer API #11



class Writer(object):
def __init__(self, nsqds, heartbeat_interval=30):

This comment has been minimized.

@mreiferson

mreiferson Apr 28, 2013

Member

rename the nsqds variable to nsqd_tcp_addresses for consistency with Reader

command_ = getattr(nsq, command)
conn.send(command_(topic, data))

conn.callback_queue.append((callback, (topic, data)))

This comment has been minimized.

@mreiferson

mreiferson Apr 28, 2013

Member

I'd prefer that if the user wants to pack data into the callback that they would do something like this:

callback = functools.partial(_finish_pub, msg=msg)
nsq.writer.pub("topic", msg, callback)

def _finish_pub(msg, response):
    if response.frame_type_id == nsq.FRAME_TYPE_ERROR:
        logging.error(response.data)

this way we dont have to auto-pack the data that was passed in, ie the line above just becomes:

conn.callback_queue.append(callback)

and below we do not need to special case if pub_data: (https://github.com/bitly/pynsq/pull/25/files#L1R124)

return

logging.error('[%s] failed to publish (%s, %s), response is %s' % \
(conn.id, pub_data(0), pub_data(1), data))

This comment has been minimized.

@mreiferson

mreiferson Apr 28, 2013

Member

is this another parentheses typo or are these actually callable?

@felinx

This comment has been minimized.

Copy link
Contributor Author

felinx commented Apr 28, 2013

Updated code follow your comments, and I also renamed variable data to msg in pub&mpub func because it is a little confused with response's data. Squashed.

return self.application.nsq
def get(self):
self.nsq.pub("log", "Hello world")

This comment has been minimized.

@mreiferson

mreiferson Apr 28, 2013

Member

should we update the example w/ the new (optional) API?

logging.error("[%s] ERROR: %s", conn.id, data)
do_callback = True
else:
pass

This comment has been minimized.

@mreiferson

mreiferson Apr 28, 2013

Member

this else/pass isn't doing anything 😄

logging.exception('[%s] failed to send %s' % (conn.id, command))
conn.close()

callback(conn, None)

This comment has been minimized.

@mreiferson

mreiferson Apr 28, 2013

Member

by passing None callers are going to need to disambiguate... I'm trying to think through minor API changes that would make this a bit more straightforward.

One idea is to require that callbacks always accept response and error params (and in the case above response = None and error = <exception>? (same for the lines below in _data_callback)


logging.info('[%s] IDENTIFY received %r', conn.id, data)

def _close_callback(self, conn):

This comment has been minimized.

@mreiferson

mreiferson Apr 28, 2013

Member

we need to flush the callback_queue in here so callers are guaranteed that their callback will be executed

This comment has been minimized.

@felinx

felinx Apr 29, 2013

Author Contributor

Is there any case _identify_response_callback has a json response data? I did not find that from nsq spec.


conn = random.choice(self.conns.values())
try:
command_ = getattr(nsq, command)

This comment has been minimized.

@mreiferson

mreiferson Apr 28, 2013

Member

s/command_/command please

This comment has been minimized.

@mreiferson

mreiferson Apr 28, 2013

Member

actually s/command_/cmd

@felinx

This comment has been minimized.

Copy link
Contributor Author

felinx commented Apr 28, 2013

Refined callback and updated example.


if do_callback and conn.callback_queue:
callback = conn.callback_queue.pop(0)
callback(conn, response, None)

This comment has been minimized.

@mreiferson

mreiferson Apr 29, 2013

Member

this should send response when frame == nsq.FRAME_TYPE_RESPONSE xor error when frame == nsq.FRAME_TYPE_ERROR, in the latter case perhaps as an exception we define?


if do_callback and conn.callback_queue:
callback = conn.callback_queue.pop(0)
callback(conn, response)

This comment has been minimized.

@mreiferson

mreiferson Apr 29, 2013

Member

Thanks for your work and apologies for all the back and forth, we're close! The last commit wasn't quite what I had in mind, here's what I was thinking:

def _data_callback(self, conn, raw_data):
    conn.last_recv_timestamp = time.time()
    response = None
    error = None
    frame, data = nsq.unpack_response(raw_data)
    if frame == nsq.FRAME_TYPE_RESPONSE and data == "_heartbeat_":
        logging.info("[%s] received heartbeat", conn.id)
        self.heartbeat(conn)
        conn.send(nsq.nop())
    elif frame == nsq.FRAME_TYPE_RESPONSE:
        response = data
    elif frame == nsq.FRAME_TYPE_ERROR:
        logging.error("[%s] ERROR: %s", conn.id, data)
        error = NSQError(data)

    if (response or error) and conn.callback_queue:
        callback = conn.callback_queue.pop(0)
        callback(conn, response, error)

This way there is only ever one parameter set on the callback. Callbacks would look like this:

def callback(conn, response, error):
    if error:
        logging.error("blablabla")

(this implies changes in the few other spots where you're handling callbacks)

This comment has been minimized.

@felinx

felinx Apr 29, 2013

Author Contributor

The response just can be a string or an Exception instance now, an error means a special response, it is easy to distinguish them, ex:

def callback(conn, response):
    if isinstance(response, NSQError):
        logging.error("blablabla")

or there will be some confused None response like below:

callback(conn, response, error) #  _data_callback with error
callback(conn, response, None) #  _data_callback without error
callback(conn, None, error) # _pub
callback(conn, None, ConnectionClosedError()) # _close_callback

This comment has been minimized.

@mreiferson

mreiferson Apr 29, 2013

Member

my intention is to avoid confusing "response" with "error" (they are implemented as two different frame types)

This comment has been minimized.

@mreiferson

mreiferson Apr 29, 2013

Member

I have come to realize that "response" was terribly named, but we are where we are :)

If you like having just one parameter we need to come up with a better name than "response" because it is ambiguous

This comment has been minimized.

@mreiferson

mreiferson Apr 29, 2013

Member

also, I did not mean for NSQError to actually be used (if would just be nsq.Error, the "NSQ" doesn't add any value)

This comment has been minimized.

@felinx

felinx Apr 29, 2013

Author Contributor

Rollback "response" to data for consistency with Reader?

This comment has been minimized.

@mreiferson

mreiferson Apr 29, 2013

Member

yea, that sounds reasonable


try:
if isinstance(data, DataError):
data = data.data

This comment has been minimized.

@mreiferson

mreiferson Apr 29, 2013

Member

s/nsq.NSQError/nsq.Error

also, these two lines should move out of the try but should be combined with line 191:

if data == 'OK' or isinstance(data, nsq.Error):
    return

it already logs the error so we dont need to log it again

I think thats the last comment 😄

@felinx

This comment has been minimized.

Copy link
Contributor Author

felinx commented Apr 30, 2013

Committed this change and squashed :)

@mreiferson

This comment has been minimized.

Copy link
Member

mreiferson commented Apr 30, 2013

thanks

mreiferson added a commit that referenced this pull request Apr 30, 2013

Merge pull request #25 from felinx/master
Add async Writer to support pub&mpub

@mreiferson mreiferson merged commit 9ad4f30 into nsqio:master Apr 30, 2013

1 check passed

default The Travis build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment