Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd i/o timeout 60 seconds after gnsqd connects causes first message to be lost instead of raising exception #12

Closed
algrebe opened this issue Oct 28, 2016 · 2 comments

Comments

@algrebe
Copy link

algrebe commented Oct 28, 2016

When I connect to nsqd using gnsq.Nsqd, 60 seconds after the connection, I see the following on nsqd logs

[nsqd] 2016/10/28 12:01:08.655200 PROTOCOL(V2): [127.0.0.1:53326] exiting ioloop
[nsqd] 2016/10/28 12:01:08.655677 ERROR: client(127.0.0.1:53326) - failed to read command - read tcp 127.0.0.1:4150->127.0.0.1:53326: i/o timeout
[nsqd] 2016/10/28 12:01:08.655987 PROTOCOL(V2): [127.0.0.1:53326] exiting messagePump

The message sent using publish_tcp doesn't reach the nsqd and neither does it throw an error. However, the second message I send through publish_tcp raises a gnsq.errors.NSQSocketError: [Errno 107] Socket is not connected. I think this error should be raised on the first message and not the second.

System Info

gnsq version

0.3.3

operating system and version

$ uname -a
Linux ant-Dell-n411z 3.13.0-93-generic #140-Ubuntu SMP Mon Jul 18 21:21:05 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
$ lsb_release -a
Distributor ID: Ubuntu
Description:    Ubuntu 14.04.4 LTS
Release:    14.04
Codename:   trusty

nsqd versions

I've noticed this happening on nsqd v0.3.8 (built w/go1.6.2) and nsqd v0.2.30 (built w/go1.3)

Scripts to reproduce behaviour

test_gnsq.py

from gevent import monkey; monkey.patch_all()

import gnsq
import gevent

def on_error(reader, error):
    print "got error %s" % error

def on_close(reader):
    print "closed..."

conn = gnsq.Nsqd(address="127.0.0.1", tcp_port=4150, http_port=4151)
conn.ping()
conn.connect()
conn.on_error.connect(on_error)
conn.on_close.connect(on_close)

try:
    print "publising message 1"
    conn.publish_tcp("TestTopic", "Did you get message 1?")
    print "successfully published message 1"

    # sleep for two minutes in main thread
    print "sleeping for 1 minute and 2 seconds"
    gevent.sleep(62)
    print "awake!"

    print "publishing message 2"
    conn.publish_tcp("TestTopic", "Did you get message 2?")
    print "successfully published message 2"

    print "publishing message 3"
    conn.publish_tcp("TestTopic", "Did you get message 3?")
    print "successfully published message 2"

finally:
    conn.close()

run.sh

$1/nsqd -verbose -verbose &
sleep 2
$1/nsq_tail -channel default -nsqd-tcp-address 127.0.0.1:4150 -topic TestTopic &
sleep 2
python test_gnsq.py
curl http://localhost:4151/stats?format=json | python -m json.tool
echo ""
sleep 3

jobs -p | xargs kill -9

commands and output

run.sh requires a path to the nsqd binary directory ( I tested it on different nsqd versions )

wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
tar -xvzf nsq-0.3.8.linux-amd64.go1.6.2.tar.gz
bash run.sh nsq-0.3.8.linux-amd64.go1.6.2/bin/

Alternatively, you could run bash run.sh /usr/bin if nsq binaries are present there.

output

The entire output is available here https://gist.github.com/algrebe/5d715eb031f110905b9b2335e0ca52fe
Here are the relevant portions

$ bash run.sh nsq-0.3.8.linux-amd64.go1.6.2/bin/

# nsq_tail connects
[nsqd] 2016/10/28 12:00:06.256774 TCP: new client(127.0.0.1:53323)
[nsqd] 2016/10/28 12:00:06.256930 CLIENT(127.0.0.1:53323): desired protocol magic '  V2'
[nsqd] 2016/10/28 12:00:06.259701 PROTOCOL(V2): [127.0.0.1:53323] [SUB TestTopic default]
[nsqd] 2016/10/28 12:00:06.260200 TOPIC(TestTopic): created
[nsqd] 2016/10/28 12:00:06.260692 TOPIC(TestTopic): new channel(default)
[nsqd] 2016/10/28 12:00:06.261209 PROTOCOL(V2): [127.0.0.1:53323] [RDY 200]

# python gnsq.Nsqd connects
[nsqd] 2016/10/28 12:00:08.652703 TCP: new client(127.0.0.1:53326)
[nsqd] 2016/10/28 12:00:08.653426 CLIENT(127.0.0.1:53326): desired protocol magic '  V2'

# python script - gnsq.Nsqd sends message and then sleeps
publising message 1
successfully published message 1
sleeping for 1 minute and 2 seconds

# nsqd sends this message to nsq_tail
[nsqd] 2016/10/28 12:00:08.654558 PROTOCOL(V2): [127.0.0.1:53326] [PUB TestTopic]
[nsqd] 2016/10/28 12:00:08.655044 PROTOCOL(V2): writing msg(070155f1855b1000) to client(127.0.0.1:53323) - Did you get message 1?
[nsqd] 2016/10/28 12:00:08.655153 [127.0.0.1:53323] state rdy:  200 inflt:    1
[nsqd] 2016/10/28 12:00:08.655323 [127.0.0.1:53323] state rdy:  200 inflt:    1

# nsq_tail recieves message
Did you get message 1?
[nsqd] 2016/10/28 12:00:08.655820 PROTOCOL(V2): [127.0.0.1:53323] [FIN 070155f1855b1000]

# after one minute, this message comes w.r.t gnsq.Nsqd client
[nsqd] 2016/10/28 12:01:08.655200 PROTOCOL(V2): [127.0.0.1:53326] exiting ioloop
[nsqd] 2016/10/28 12:01:08.655677 ERROR: client(127.0.0.1:53326) - failed to read command - read tcp 127.0.0.1:4150->127.0.0.1:53326: i/o timeout
[nsqd] 2016/10/28 12:01:08.655987 PROTOCOL(V2): [127.0.0.1:53326] exiting messagePump

# python script - out of sleep
awake!
publishing message 2
successfully published message 2
# NOTE no error was generated here, it thinks that the message was successfully sent when it hasnt !

publishing message 3
# NOTE only on publishing this message does it raise the exception and close the socket
closed...
Traceback (most recent call last):
  File "test_gnsq.py", line 37, in <module>
    conn.close()
  File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/nsqd.py", line 470, in close
    self.send(nsq.close())
  File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/nsqd.py", line 239, in send
    return self.stream.send(data, async)
  File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/stream/stream.py", line 90, in send
    self.ensure_connection()
  File ".virtualenvs/gnsq/local/lib/python2.7/site-packages/gnsq/stream/stream.py", line 49, in ensure_connection
    raise NSQSocketError(ENOTCONN, 'Socket is not connected')
gnsq.errors.NSQSocketError: [Errno 107] Socket is not connected

# examining the curl output shows us that nsqd only recieved one message
{
    "data": {
        "start_time": 1477636204,
        "topics": [
            {
                "channels": [
                    {
                        "channel_name": "default",
                        "clients": [
                            {
                                "deflate": false,
                                "finish_count": 1,
                                "in_flight_count": 0,
                                "message_count": 1,
                                "remote_address": "127.0.0.1:53323",
                                "state": 3,
                                "user_agent": "nsq_tail/0.3.8 go-nsq/1.0.5",
                                "version": "V2"
                            }
                        ],
                        "deferred_count": 0,
                        "depth": 0,
                        "e2e_processing_latency": {
                            "count": 0,
                            "percentiles": null
                        },
                        "in_flight_count": 0,
                        "message_count": 1,
                        "paused": false,
                        "requeue_count": 0,
                        "timeout_count": 0
                    }
                ],
                "depth": 0,
                "e2e_processing_latency": {
                    "count": 0,
                    "percentiles": null
                },
                "message_count": 1,
                "paused": false,
                "topic_name": "TestTopic"
            }
        ],
        "version": "0.3.8"
    },
    "status_code": 200,
    "status_txt": "OK"
}

@algrebe
Copy link
Author

algrebe commented Oct 31, 2016

@wtolson after some digging, I found out that nsqd doesn't read the underlying socket, only writes.

After looking at https://github.com/wtolson/gnsq/blob/master/gnsq/reader.py , I found that I have to spawn a gevent greenlet to conn.listen , which makes sense because it needs to listen to heartbeats when idle and respond with NOP.

  1. Is it worth making Nsqd spawn the conn.listen by itself ?
  2. Should there be a writer abstraction just like reader which utilizies nsqd and spawns the conn.listen thread ? On closing, it gracefully kills the thread, closes stream etc.

@wtolson
Copy link
Owner

wtolson commented Apr 25, 2019

The latest version provides a Producer class that will now manage the low level nsqd connection.

Fixed in #24

@wtolson wtolson closed this as completed Apr 25, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants