Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer writes to dead connection and crashes #31

Open
Phaeilo opened this issue May 22, 2020 · 1 comment
Open

Producer writes to dead connection and crashes #31

Phaeilo opened this issue May 22, 2020 · 1 comment

Comments

@Phaeilo
Copy link

Phaeilo commented May 22, 2020

Hello, I have some long-running services that use gnsq to read messages from one topic, apply some kind of transformation and then output them onto another topic. Occasionally, the TCP connections to the different nsqd might die. This is not a problem for the Consumer as it reconnects right away and starts to fetch messages again. However, sometimes the Producer tries to write to a closed/dead connection and crashes its greenlet in such a situation. This stalls my service and requires a restart. I think gnsq should really put some error handling in place here to make sure this doesn't happen. I would expect my message to be published via an active connection, or the publish to throw an exception that I can catch myself.
I have attached a log of such a crash.
_gnsq.txt

@canhlinh
Copy link

@Phaeilo You need to ping to the NSQD server by your self.
I don't know why the HEARTBEAT function is not working as expected.

I did this workaround. It works.

from multiprocessing import Process, Queue

HEARTBEAT_INTERVAL = 60


def producer_func(q: Queue):
    import gnsq

    producer = gnsq.NsqdTCPClient('localhost', 4150, user_agent='IAM', heartbeat_interval=HEARTBEAT_INTERVAL)
    producer.connect()
    producer.identify()

    while True:
        try:
            topic, data = q.get(block=True, timeout=HEARTBEAT_INTERVAL)
            print(topic, data)
            producer.publish(topic, data.encode())
        except Empty:
            print('ping')
            producer.nop()

q = Queue()
p = Process(target=producer_func, args=(q,))
p.start()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants