Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError: Second simultaneous read on fileno #13

Closed
NotSureAbout opened this issue Jan 11, 2016 · 12 comments
Closed

RuntimeError: Second simultaneous read on fileno #13

NotSureAbout opened this issue Jan 11, 2016 · 12 comments
Labels

Comments

@NotSureAbout
Copy link

app = Flask(__name__)
mgr = socketio.KombuManager('amqp://')
sio = socketio.Server(client_manager=mgr, async_mode='eventlet')

Does not seem to work for me.

RuntimeError: Second simultaneous read on fileno ....

I launch the app with:

if __name__ == '__main__':
    # wrap Flask application with socketio's middleware
    app = socketio.Middleware(sio, app)

    # deploy as an eventlet WSGI server
    eventlet.wsgi.server(eventlet.listen(('', 8000)), app)

I would more than welcome any pointers to where I have messed up.

@miguelgrinberg
Copy link
Owner

Include the full stack trace of the error please, and make sure you have monkey patched the standard library.

@NotSureAbout
Copy link
Author

Traceback (most recent call last):
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/eventlet/wsgi.py", line 454, in handle_one_response
    result = self.application(self.environ, start_response)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/middleware.py", line 47, in __call__
    return self.engineio_app.handle_request(environ, start_response)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 303, in handle_request
    return self.eio.handle_request(environ, start_response)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/server.py", line 241, in handle_request
    socket.handle_post_request(environ)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/socket.py", line 92, in handle_post_request
    self.receive(pkt)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/socket.py", line 45, in receive
    self.server._trigger_event('message', self.sid, pkt.data)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/server.py", line 327, in _trigger_event
    return self.handlers[event](*args)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 420, in _handle_eio_message
    self._handle_connect(sid, pkt.namespace)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 334, in _handle_connect
    self.environ[sid]) is False:
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 392, in _trigger_event
    return self.handlers[namespace][event](*args)
  File "labcoreapp.py", line 68, in test_connect
    namespace='/test')
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 181, in emit
    self.manager.emit(event, data, namespace, room, skip_sid, callback)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/pubsub_manager.py", line 62, in emit
    'skip_sid': skip_sid, 'callback': callback})
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/kombu_manager.py", line 53, in _publish
    with self.kombu.SimpleQueue(self.queue) as queue:
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/connection.py", line 677, in SimpleQueue
    exchange_opts, **kwargs)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/simple.py", line 125, in __init__
    compression=compression)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/messaging.py", line 85, in __init__
    self.revive(self._channel)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/messaging.py", line 222, in revive
    self.declare()
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/messaging.py", line 105, in declare
    self.exchange.declare()
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/entity.py", line 174, in declare
    nowait=nowait, passive=passive,
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/channel.py", line 622, in exchange_declare
    (40, 11),  # Channel.exchange_declare_ok
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
    self.channel_id, allowed_methods, timeout)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/connection.py", line 241, in _wait_method
    channel, method_sig, args, content = read_timeout(timeout)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/connection.py", line 330, in read_timeout
    return self.method_reader.read_method()
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/method_framing.py", line 189, in read_method
    raise m
RuntimeError: Second simultaneous read on fileno 4 detected.  Unless you really know what you're doing, make sure that only one greenthread can read any particular socket.  Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of greenlet.greenlet object at 0x7febb1ec8370>; THAT THREAD=FdListener('read', 4, <built-in method switch of greenlet.greenlet object at 0x7febb1ec8050>, <built-in method throw of greenlet.greenlet object at 0x7febb1ec8050>)

I did the monkey_patch dance.

import socketio
from flask import Flask, render_template
import eventlet
eventlet.monkey_patch()
``

@miguelgrinberg
Copy link
Owner

Do you know how many active clients did you have when this happened?

@NotSureAbout
Copy link
Author

I opened 1 browser window.

The problem might come from the changes to _listen. If I make the following changes:

    def _listen(self):
        with kombu.Connection('amqp://guest:guest@localhost:5672//') as conn:
            with conn.SimpleQueue(self.queue) as queue:
                while True:
                    message = queue.get(block=True)
                    message.ack()
                    yield message.payload

The problem goes aways.

@miguelgrinberg
Copy link
Owner

Okay, so your solution is basically to read and write to the queue using different connections. Can't find any references in the documentation regarding the thread safety of a kombu connection, and none of the examples seem to have the producer and the consumer working concurrently on the same process.

Thanks for your analysis, I'll probably go about this with your solution.

@miguelgrinberg
Copy link
Owner

@NotSureAbout Unfortunately your fix does not work for me. The error goes away, but messages that the server sends do not make it to the client because the _listen function never wakes up to deliver them.

I'll continue investigating. I haven't really found any similar working examples, pretty much all Kombu examples I have found have the producer and the consumer in different processes, not sure how well tested is the multithreaded approach that I'm using.

@miguelgrinberg
Copy link
Owner

@NotSureAbout Can I ask you to retest with the current master branch to confirm the problem is addressed for you?

@NotSureAbout
Copy link
Author

Works for me.

@phillip-martin
Copy link

phillip-martin commented Jun 1, 2016

I am using the latest release of the great flask-socketio library, along with eventlet. I came across an issue in my code that I can't quite figure out but it seems that it may be related to this issue. I have the following code

eventlet.monkey_patch()
app = Flask("test_app")
socketio = SocketIO(app, message_queue='amqp://')

@socketio.on('content update', namespace='/test')
def content_update(message):
    pass

In a separate process (a celery task; celery is run with eventlet enabled) I call:

socketio.emit('content update', {'data': 'foo'}, namespace='/test')

This task gets called quite frequently and so it is not surprising that multiple threads ended up calling the above line simultaneously. When that happens I end up with the following stack trace:

Traceback (most recent call last):
  File "proj/venv/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "proj/venv/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "proj/tasks.py", line 109, in submit_new_content
    socketio.emit('content update', {'data': 'foo'}, namespace='/test')
  File "proj/venv/lib/python2.7/site-packages/flask_socketio/__init__.py", line 285, in emit
    callback=kwargs.get('callback'))
  File "proj/venv/lib/python2.7/site-packages/socketio/server.py", line 181, in emit
    self.manager.emit(event, data, namespace, room, skip_sid, callback)
  File "proj/venv/lib/python2.7/site-packages/socketio/pubsub_manager.py", line 62, in emit
    'skip_sid': skip_sid, 'callback': callback})
  File "proj/venv/lib/python2.7/site-packages/socketio/kombu_manager.py", line 57, in _publish
    with self.writer_conn.SimpleQueue(self.writer_queue) as queue:
  File "proj/venv/lib/python2.7/site-packages/kombu/connection.py", line 678, in SimpleQueue
    exchange_opts, **kwargs)
  File "proj/venv/lib/python2.7/site-packages/kombu/simple.py", line 126, in __init__
    consumer = messaging.Consumer(channel, queue)
  File "proj/venv/lib/python2.7/site-packages/kombu/messaging.py", line 364, in __init__
    self.revive(self.channel)
  File "proj/venv/lib/python2.7/site-packages/kombu/messaging.py", line 376, in revive
    self.declare()
  File "proj/venv/lib/python2.7/site-packages/kombu/messaging.py", line 386, in declare
    queue.declare()
  File "proj/venv/lib/python2.7/site-packages/kombu/entity.py", line 521, in declare
    self.exchange.declare(nowait)
  File "proj/venv/lib/python2.7/site-packages/kombu/entity.py", line 174, in declare
    nowait=nowait, passive=passive,
  File "proj/venv/lib/python2.7/site-packages/amqp/channel.py", line 622, in exchange_declare
    (40, 11),  # Channel.exchange_declare_ok
  File "proj/venv/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
    self.channel_id, allowed_methods, timeout)
  File "proj/venv/lib/python2.7/site-packages/amqp/connection.py", line 241, in _wait_method
    channel, method_sig, args, content = read_timeout(timeout)
  File "proj/venv/lib/python2.7/site-packages/amqp/connection.py", line 330, in read_timeout
    return self.method_reader.read_method()
  File "proj/venv/lib/python2.7/site-packages/amqp/method_framing.py", line 189, in read_method
    raise m
RuntimeError: Second simultaneous read on fileno 18 detected.  Unless you really know what you're doing, make sure that only one greenthread can read any particular socket.  Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of greenlet.greenlet object at 0x10998d0f0>; THAT THREAD=FdListener('read', 18, <built-in method switch of greenlet.greenlet object at 0x10998d190>, <built-in method throw of greenlet.greenlet object at 0x10998d190>)

This error message seems fairly similar to the one listed by @NotSureAbout. I am very interested to know if you have any insight as to what I might be doing wrong. Thanks in advance.

@phillip-martin
Copy link

As an update, I have tried two things.

  1. I protected the socketio.emit method with an eventlet Semaphore. This made the error go away, but made the behavior sequential.
  2. I made the following changes in kombu_manager.py, emulating how the kombu connection is made in the _listen method. This worked completely as desired.
    def _publish(self, data):
        writer_conn = kombu.Connection(self.url)
        writer_queue = self._queue(writer_conn)
        with writer_conn.SimpleQueue(writer_queue) as queue:
            queue.put(pickle.dumps(data))

@miguelgrinberg
Copy link
Owner

@pmarti28 the error is the same, but your problem is different, I think.

In your case, there actually is a legit simultaneous access of a handle by two or more threads, because you are emitting with the same socketio object from multiple green threads, and this object keeps in its state the file handle on which the writes are made.

Your solution removes the stored file handle, and creates a new one each time a write is made. Clearly this addresses the problem, but I'm concerned about the performance cost of constantly having to open new handles.

As a short term solution I think your change is fine, but I'm not sure how I feel about penalizing the majority of users who do not have a set up like yours.

An alternative solution that I have used, is to not run celery under eventlet. In my opinion there is no reason to switch Celery to eventlet, in general Celery works better with a pool of process workers of a fixed size. Running Celery w/o eventlet will address this problem, because each worker process will have a separate instance of socketio. If you try this, in addition to disabling eventlet support in the Celery config, make sure you create the socketio instance with async_mode='threading', which effectively disables eventlet in the socketio subsystem.

A long term solution for this problem is, I think, something that works similarly to context variables in Flask would make sense. That would allow a different file handle to be kept for each thread, all transparently done for you.

@phillip-martin
Copy link

@miguelgrinberg thanks for the thorough explanation. I will look into switching Celery away from eventlet.

For the long term solution, I will come back here if I think of anything that might be useful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants