Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader: incorrect total_rdy update logic when sending new RDY count #175

Closed
alpaker opened this issue Apr 28, 2017 · 4 comments
Closed

reader: incorrect total_rdy update logic when sending new RDY count #175

alpaker opened this issue Apr 28, 2017 · 4 comments
Labels

Comments

@alpaker
Copy link
Contributor

alpaker commented Apr 28, 2017

In Reader._send_rdy() the following block is responsible for updating the reader's total_rdy attribute:

if conn.send_rdy(value):
      self.total_rdy = max(self.total_rdy - conn.rdy + value, 0)

Here value represents the desired new RDY count; thus if conn.send_rdy(value) succeeds we'll have conn.rdy == value. As a result the above will leave total_rdy unchanged. Since there's no other spot in which total_rdy is increased, it never gets above its initial value of 0.

One behavioral consequence of this: Even if max_in_flight is less than the nsqd connection count, every connection will get an initial RDY count of 1, and so the max_in_flight constraint can be violated immediately after a reader begins consuming messages.

For comparison, the go client does this:

atomic.AddInt64(&r.totalRdyCount, -c.RDY()+count)
err := c.WriteCommand(Ready(int(count)))
@alpaker alpaker changed the title incorrect total_rdy update logic when sending new RDYcount incorrect total_rdy update logic when sending new RDY count Apr 28, 2017
@mreiferson
Copy link
Member

@alpaker thanks for the report, but I think https://github.com/nsqio/pynsq/blob/master/nsq/reader.py#L455 addresses that edge case?

@alpaker
Copy link
Contributor Author

alpaker commented Apr 28, 2017

That doesn't appear to prevent the issue, unfortunately.

Suppose we have max_in_flight == 2 and 4 initial connections.

  1. For each connection, the reader initializes the RDY count in the on-ready callback with a call to Reader._send_rdy(), passing value == 1.

  2. The first such call sends a RDY count of 1 for the connection with Async.send_rdy(). Note that this method sets the connection's rdy attribute; after it returns we have conn.rdy == 1. And only after the send_rdy() call does the reader attempt to update its total_rdy from the initial value of 0 to 1:

self.total_rdy = max(self.total_rdy - conn.rdy + value, 0)

But at this point value == conn.rdy == 1. Thus the reader returns from _send_rdy() with total_rdy == 0.

  1. The same happens on each of the 3 subsequent initializing calls to _send_rdy(). Each time, the condition you pointed to:
(self.total_rdy + value) > self.max_in_flight

is (0 + 1) > 2 and so control continues to conn.send_rdy(value). Thus each of the 4 connections is initialized to RDY == 1.

Here's a demonstration.

Run 4 local nsqd instances:

$ ps -ocommand | grep nsqd | grep -v grep
nsqd -lookupd-tcp-address=127.0.0.1:4160 -node-id 0 -tcp-address 0.0.0.0:9000 -http-address 0.0.0.0:9001 -data-path=/tmp/data0
nsqd -lookupd-tcp-address=127.0.0.1:4160 -node-id 1 -tcp-address 0.0.0.0:9002 -http-address 0.0.0.0:9003 -data-path=/tmp/data1
nsqd -lookupd-tcp-address=127.0.0.1:4160 -node-id 2 -tcp-address 0.0.0.0:9004 -http-address 0.0.0.0:9005 -data-path=/tmp/data2
nsqd -lookupd-tcp-address=127.0.0.1:4160 -node-id 3 -tcp-address 0.0.0.0:9006 -http-address 0.0.0.0:9007 -data-path=/tmp/data3

Send a couple of msgs to each instance:

$ for i in {0..7}; do \
  echo foo | to_nsq -nsqd-tcp-address localhost:$((9000 + 2*($i % 4))) \
                    -topic foo-topic \
                    -rate 100 2>/dev/null; \
done

Client code:

$ cat test.py
#!/usr/local/bin/python

from nsq import Reader, run
from tornado.ioloop import IOLoop

def handler(msg):
    msg.enable_async()
    IOLoop.current().add_timeout(IOLoop.current().time() + 20, msg.finish)

if __name__ == '__main__':
    Reader(
        topic='foo-topic',
        channel='foo-channel',
        lookupd_http_addresses='localhost:4161',
        max_in_flight=2,
        message_handler=handler
    )
    run()

Start client and observe that we have 4 msgs in flight, despite a configured max_in_flight value of 2:

$ ./test.py & nsq_stat -topic foo-topic \
                       -channel foo-channel \
                       -lookupd-http-address localhost:4161 \
                       -count 1
[1] 9979
------rate------+----------------depth----------------+--------------metadata---------------
ingress  egress |   total     mem    disk inflt   def |     req     t-o         msgs clients
      0       2 |       4       4       0     4     0 |       0       0           16       0

@mreiferson
Copy link
Member

I stand corrected! Thanks for the detailed debugging info, are you interested in working up a fix?

@mreiferson mreiferson added the bug label Apr 28, 2017
@mreiferson mreiferson changed the title incorrect total_rdy update logic when sending new RDY count reader: incorrect total_rdy update logic when sending new RDY count Apr 28, 2017
@alpaker
Copy link
Contributor Author

alpaker commented Apr 28, 2017

Will do!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants