Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection reset while transferring stages #509

Closed
Weiming-Hu opened this issue Nov 20, 2020 · 17 comments · Fixed by #539
Closed

Connection reset while transferring stages #509

Weiming-Hu opened this issue Nov 20, 2020 · 17 comments · Fixed by #539

Comments

@Weiming-Hu
Copy link

I'm testing on a small toy example. I have one executable that simply waits for 10 minutes. This workflow requires 1 core and 1 hour of resources. When the executable finishes waiting, EnTK got stuck and never made any progress.

Below error message can be found from the client-side logs:

1605825753.056 : radical.entk.task_manager.0000 : 9506  : 139943724709632 : DEBUG    : Unit unit.000000 in state DONE
1605825753.057 : radical.entk.task_manager.0000 : 9506  : 139943724709632 : INFO     : Transition task.0000 to EXECUTED
1605825753.057 : radical.entk.task_manager.0000 : 9506  : 139943724709632 : DEBUG    : task.0000 (EXECUTED) to sync with amgr
1605825753.058 : radical.entk.task_manager.0000 : 9506  : 139943724709632 : ERROR    : Transition task.0000 to state EXECUTED failed, error: (-1, "ConnectionResetError(104, 'Connection reset by peer')")
Traceback (most recent call last):
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/radical/entk/execman/base/task_manager.py", line 206, in _advance
    self._sync_with_master(obj, obj_type, channel, queue)
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/radical/entk/execman/base/task_manager.py", line 153, in _sync_with_master
    properties=pika.BasicProperties(correlation_id=corr_id))
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish
    mandatory, immediate)
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2207, in publish
    self._flush_output()
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1292, in _flush_output
    *waiters)
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
    result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")
1605825753.097 : radical.entk.task_manager.0000 : 9506  : 139943724709632 : DEBUG    : task.0000 (DESCRIBED) to sync with amgr
1605825753.097 : radical.entk.task_manager.0000 : 9506  : 139943724709632 : ERROR    : Error in RP callback thread: 
Traceback (most recent call last):
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/radical/entk/execman/base/task_manager.py", line 206, in _advance
    self._sync_with_master(obj, obj_type, channel, queue)
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/radical/entk/execman/base/task_manager.py", line 153, in _sync_with_master
    properties=pika.BasicProperties(correlation_id=corr_id))
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish
    mandatory, immediate)
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2207, in publish
    self._flush_output()
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1292, in _flush_output
    *waiters)
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
    result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/radical/entk/execman/rp/task_manager.py", line 252, in unit_state_cb
    mq_channel, '%s-cb-to-sync' % self._sid)
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/radical/entk/execman/base/task_manager.py", line 213, in _advance
    self._sync_with_master(obj, obj_type, channel, queue)
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/radical/entk/execman/base/task_manager.py", line 153, in _sync_with_master
    properties=pika.BasicProperties(correlation_id=corr_id))
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish
    mandatory, immediate)
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2206, in publish
    immediate=immediate)
  File "/glade/u/home/wuh20/venv/lib/python3.7/site-packages/pika/channel.py", line 415, in basic_publish
    raise exceptions.ChannelClosed()
pika.exceptions.ChannelClosed
1605825770.766 : radical.entk.task_manager.0000 : 70237 : 139932584638208 : INFO     : Received heartbeat response
1605825770.802 : radical.entk.task_manager.0000 : 70237 : 139932584638208 : INFO     : Sent heartbeat request
1605825770.853 : radical.entk.task_manager.0000 : 9506  : 139947319789312 : INFO     : Received heartbeat request
1605825770.853 : radical.entk.task_manager.0000 : 9506  : 139947319789312 : INFO     : Sent heartbeat response
1605825800.866 : radical.entk.task_manager.0000 : 70237 : 139932584638208 : INFO     : Received heartbeat response

I have done a bit study on this issue and I have found a workaround.

I have added the following line to radical/entk/appman/appmanager.py:

        credentials = pika.PlainCredentials(self._username, self._password)
        self._rmq_conn_params = pika.connection.ConnectionParameters(
                                        heartbeat_interval=0,
                                        host=self._hostname,
                                        port=self._port,
                                        credentials=credentials)

Please note heartbeat_interval=0 to disable active termination of connection from the server side.

Once I do this, the program runs correctly.

However, I'm not sure about the impact of this. Please advice.

Thank you very much.

@Weiming-Hu
Copy link
Author

Weiming-Hu commented Nov 20, 2020

A quick update. The responses with and without heartbeat_interval=0 are pretty consistent after several testing. Whenever I comment out the line, the connection will be reset after sleeping for 10 minutes; Whenever the line is used, the connection is live and EnTK succeeded with no problems.

Thanks

@Weiming-Hu
Copy link
Author

Another update. The following changes would be sufficient to change the default parameters used by pika so that I don't have to change the source code of EnTK.

import pika

# Make sure server does not close any connection unless the client does so
pika.connection.Parameters.DEFAULT_HEARTBEAT_INTERVAL = 0
pika.connection.Parameters.DEFAULT_HEARTBEAT_TIMEOUT = 0

I have set both DEFAULT_HEARTBEAT_INTERVAL and DEFAULT_HEARTBEAT_TIMEOUT because it depends on the version. To be safe. I set both.

Then, when I test the toy case, there is no longer a connection resetting problem.

@mturilli mturilli added this to the Dec 2020 release milestone Nov 23, 2020
@iparask
Copy link
Contributor

iparask commented Nov 23, 2020

Hello @Weiming-Hu, let me try to reproduce it and I will get back at you. Can you let me know where is your RMQ compared to EnTK and which resource you are using for this toy example?

Can you also let me if your workaround is it okay for now?

Thank you

@Weiming-Hu
Copy link
Author

Thank you for the response.

My workaround is currently working.

I'm using NCAR Cheyenne. Below is my stack information:

(venv) wuh20@cheyenne3:~> radical-stack 

  python               : 3.7.5
  pythonpath           : 
  virtualenv           : /glade/u/home/wuh20/venv

  radical.entk         : 1.5.5
  radical.gtod         : 1.5.0
  radical.pilot        : 1.5.7-v1.5.7-70-g03956fe3b@fix-mpiexec_mpt
  radical.saga         : 1.5.7
  radical.utils        : 1.5.7

I'm connecting to RMQ prepared for me by @mturilli. I'm sorry I don't know the details about the RMQ though.

@iparask
Copy link
Contributor

iparask commented Nov 24, 2020

I see two things that have to do with RabbitMQ heartbeats and connections closing. The first is that RMQ after version 3.5.5 changed the heartbeat from 580 secs to 60secs. Our RMQ version is 3.8.9. The second is that RMQ server disconnects when a connection is idle for several minutes (here is a stackoverflow discussion).

They also suggest that to set heartbeat to 0 as that disables it completely and keeps the connection active. Thank you.

@Weiming-Hu
Copy link
Author

Thank you. Is there a way to to set the hearbeat of EnTK task manager in the user code? Currently I'm changing the source code at radical/entk/appman/appmanager.py. But I'm wondering if we have a better solution to that? Thanks

@iparask
Copy link
Contributor

iparask commented Nov 30, 2020

I believe the environment variables should be enough. In a simple example, I changed the default values of heartbeat and timeout, as you show above. I logged the heartbeat value that the appmanager picked, and it was 0.

Please remove the change you made in the appmanager, and try to run a small example of your workflow by only setting the two default values in the user script. I think it will work, but can you let me know as well?

I'm still investigating how to solve it. I'll let you know when I have more.

@Weiming-Hu
Copy link
Author

Thanks for the reply. I'm afraid I'm not sure how to set the heartbeat in my user code. Could you give me an example without changing the source code? Thank you!

@iparask
Copy link
Contributor

iparask commented Dec 1, 2020

This is how:

 import pika

pika.connection.Parameters.DEFAULT_HEARTBEAT_TIMEOUT  = 0

@Weiming-Hu
Copy link
Author

Ah. My bad. I misunderstood you. Thank you very much for the solution. I'm closing this.

@iparask
Copy link
Contributor

iparask commented Dec 2, 2020

I am reopening this issue as it is not resolved cleanly. A workaround exists.

@iparask iparask reopened this Dec 2, 2020
@iparask iparask removed this from the Jan 2021 Release milestone Jan 4, 2021
@mturilli
Copy link
Contributor

mturilli commented Jan 8, 2021

@iparask do we know how we want to solve this beyond the initial workaround? Is there anything to discuss before you work at the PR?

@lee212
Copy link
Contributor

lee212 commented Jan 8, 2021

FYI,

I experienced similar error:

1609800934.648 : radical.entk.task_manager.0000 : 96290 : 140735659504048 : INFO     : Transition task.0013 to EXECUTED
1609800934.648 : radical.entk.task_manager.0000 : 96290 : 140735659504048 : DEBUG    : task.0013 (EXECUTED) to sync with amgr
1609800934.650 : radical.entk.task_manager.0000 : 96290 : 140735659504048 : ERROR    : Transition task.0013 to state EXECUTED failed, error: (-1, "Connection
ResetError(104, 'Connection reset by peer')")
Traceback (most recent call last):
  File "/ccs/home/hrlee/venv3/multi_dvm_32/lib/python3.6/site-packages/radical/entk/execman/base/task_manager.py", line 206, in _advance
    self._sync_with_master(obj, obj_type, channel, queue)
  File "/ccs/home/hrlee/venv3/multi_dvm_32/lib/python3.6/site-packages/radical/entk/execman/base/task_manager.py", line 153, in _sync_with_master
    properties=pika.BasicProperties(correlation_id=corr_id))
  File "/ccs/home/hrlee/venv3/multi_dvm_32/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2120, in basic_publish
    mandatory, immediate)
  File "/ccs/home/hrlee/venv3/multi_dvm_32/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2207, in publish
    self._flush_output()
  File "/ccs/home/hrlee/venv3/multi_dvm_32/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1292, in _flush_output
    *waiters)
  File "/ccs/home/hrlee/venv3/multi_dvm_32/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
    result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")

this was a large run asking 6120 concurrent tasks

@iparask
Copy link
Contributor

iparask commented Jan 21, 2021

Srinivas also experiences this issue.

I think we should disable pika's heartbeat timeout completely. There are several things that can affect this issue. The one that I think causes this is that we stopped acknowledging messages, but I am not very sure. I think I have a replicator somewhere.

@lee212 how long were the tasks of your run execute for?

It might make sense to send an ack every now and to make sure that the connection stays open, but not often enough to nullify the performance gain that @lee212 introduced.

@iparask iparask added this to the Feb 2021 release milestone Jan 21, 2021
@iparask
Copy link
Contributor

iparask commented Jan 21, 2021

I found the reason. If the channel stays idle for enough time, the connection is dropped by RMQ. This is also discussed here. I created a producer and a worker. The producer sleeps for x seconds and then sends a message. The worker prints the message.

Producer:

#!/usr/bin/env python
import pika
import os
import time

hostname = os.environ['RMQ_HOSTNAME']
password = os.environ['RMQ_PASSWORD']
port = os.environ['RMQ_PORT']
username = os.environ['RMQ_USERNAME']


credentials = pika.PlainCredentials(username, password)
rmq_conn_params = pika.connection.ConnectionParameters(host=hostname, port=port, credentials=credentials)
connection = pika.BlockingConnection(rmq_conn_params)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

for i in range(0,1800,100):
    message = str(i)
    print(" [x] Sending %r" % message)
    time.sleep(i)
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))

connection.close()

worker:

#!/usr/bin/env python
import pika
import time
import os

hostname = os.environ['RMQ_HOSTNAME']
password = os.environ['RMQ_PASSWORD']
port = os.environ['RMQ_PORT']
username = os.environ['RMQ_USERNAME']

credentials = pika.PlainCredentials(username, password)
rmq_conn_params = pika.connection.ConnectionParameters(host=hostname, port=port, credentials=credentials)
connection = pika.BlockingConnection(rmq_conn_params)
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    sleep = body.decode()
    print(" [x] Received %r" % sleep)
    #time.sleep(int(sleep))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', consumer_callback=callback)

channel.start_consuming()

This is the output I got:

(rct) iparask@ubuntu-20-04-devel-and-docker:~/Pika$ python producer.py
 [x] Sending '0'
 [x] Sending '100'
 [x] Sending '200'
Traceback (most recent call last):
  File "/home/iparask/Pika/producer.py", line 22, in <module>
    channel.basic_publish(
  File "/home/iparask/miniconda3/envs/rct/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2119, in basic_publish
    self.publish(exchange, routing_key, body, properties,
  File "/home/iparask/miniconda3/envs/rct/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2207, in publish
    self._flush_output()
  File "/home/iparask/miniconda3/envs/rct/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1290, in _flush_output
    self._connection._flush_output(
  File "/home/iparask/miniconda3/envs/rct/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 476, in _flush_output
    raise exceptions.ConnectionClosed(result.reason_code,
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')")

There are several places where we share a pika channel; an example is here. There are three possible solutions.

  1. Disable RMQ heartbeat. Essentially, introduce the workaround as part of EnTK's codebase, but I am not sure what problems it may cause.
  2. Instead of sharing the channel, we share the connection parameters, and new connections are created from different threads. It may have some performance impact for short-running tasks, but it will ensure that new channels are opened and messages are actually transmitted.
  3. Share the connection parameters as well and handle this exception. If the basic_publish fails because of this exception, we open a new channel and send the message.

I prefer solution 2. It may be the slowest one, but it is the safest.

@mturilli
Copy link
Contributor

Thanks Giannis, great job! Would solution 3 be as safe as 2 but without the potential performance issue?

@iparask
Copy link
Contributor

iparask commented Jan 25, 2021

Update: The whole connection closes.

I changed the producer to handle the specific exception.

#!/usr/bin/env python
import pika
import os
import time

hostname = os.environ['RMQ_HOSTNAME']
password = os.environ['RMQ_PASSWORD']
port = os.environ['RMQ_PORT']
username = os.environ['RMQ_USERNAME']


credentials = pika.PlainCredentials(username, password)
rmq_conn_params = pika.connection.ConnectionParameters(host=hostname, port=port, credentials=credentials)
connection = pika.BlockingConnection(rmq_conn_params)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)

for i in range(0,1800,100):
    message = str(i)
    print(" [x] Sending %r" % message)
    time.sleep(i)
    try:
        channel.basic_publish(exchange='',routing_key='task_queue',body=message,properties=pika.BasicProperties(delivery_mode=2))
    except pika.exceptions.ConnectionClosed:
        print('Connection closed? %s' % connection.is_closed)
        print('Channel Status: %s' % channel.is_open)
        connection = pika.BlockingConnection(rmq_conn_params)
        channel = connection.channel()
        channel.basic_publish(exchange='',routing_key='task_queue',body=message,properties=pika.BasicProperties(delivery_mode=2))

channel.queue_delete(queue='new_task')
connection.close()

and the output looks like (channel status is True when the channel is open):

(rct) iparask@ubuntu-20-04-devel-and-docker:~/Pika$ python new_task.py
 [x] Sending '0'
 [x] Sending '100'
 [x] Sending '200'
Connection closed? True
Channel Status: False
 [x] Sending '300'
Connection closed? True
Channel Status: False
 [x] Sending '400'
Connection closed? True
Channel Status: False
 [x] Sending '500'
Connection closed? True
Channel Status: False
 [x] Sending '600'

I'll apply the change where we have a publish and open a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants