Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Websocket copies / accept memoryviews #2102

Closed
bryevdv opened this issue Jun 30, 2017 · 25 comments
Closed

Reduce Websocket copies / accept memoryviews #2102

bryevdv opened this issue Jun 30, 2017 · 25 comments

Comments

@bryevdv
Copy link

bryevdv commented Jun 30, 2017

Since masking of inbound (client->server) message is mandated by RFC, a copy in that case is unavoidable. However outbound masking (server->client) is not mandated, and appears to be turned off by default:

https://github.com/tornadoweb/tornado/blob/master/tornado/websocket.py#L587
https://github.com/tornadoweb/tornado/blob/master/tornado/websocket.py#L461-L462

(It is set to True in the WS client connection class, as expected)

This outbound case is the most relevant and important one for Bokeh, so any improvements to reduce copies on outbound messages would be beneficial for Bokeh users.

Below are some ideas from tracing through the code, I am sure there are many details I am not familiar with, but perhaps this can start a discussion.


Allow write_messages to accept a memoryview. Then in _write_frame, instead of doing all these concatenations:

https://github.com/tornadoweb/tornado/blob/master/tornado/websocket.py#L762-L767

Place the message chunks on the stream write buffer individually. I am not sure if multiple calls to self.stream.write(chunk) would suffice (I'm guessing not), or if iostream.write would have be modified to accept multiple ordered chunks. However, it seems that iostream.write is already capable of storing a list of pending writes when the write buffer is "frozen". Currently all of these buffers get concatenated:

https://github.com/tornadoweb/tornado/blob/master/tornado/iostream.py#L840

But perhaps instead of concatenating before clearing pending writes, the list of buffers could be copied instead, then _handle_write could loop over these, instead of expecting one concatenated array.

@bdarnell
Copy link
Member

bdarnell commented Jul 5, 2017

I am not sure if multiple calls to self.stream.write(chunk) would suffice (I'm guessing not)

This will work, but each one will do a system call. _write_frame concatenates three strings: one byte for opcode and flags, 1-9 bytes for the length, then the data. Some refactoring could combine the opcode/flag bytes with the length so you'd get it down to two writes (header, data).

For a small write, it's probably going to be better to do the copy than to do two system calls. But this is a question that can be verified empirically: how big does a copy need to be to be more expensive than a system call?

There are also interactions with TCP_NODELAY to consider: If that flag has been set, you may not want to send a tiny header-only packet.

if iostream.write would have be modified to accept multiple ordered chunks

Python 3 has os.writev, so it might be worth exploring this too (although it won't help for TLS).

But perhaps instead of concatenating before clearing pending writes, the list of buffers could be copied instead, then _handle_write could loop over these, instead of expecting one concatenated array.

Yeah, when the buffered writes are big enough (or when writev is available), we could write these without additional copies. Maybe we could even expose an explicit freeze/unfreeze API on IOStream so that client code could freeze, perform multiple writes, and unfreeze to avoid TCP_NODELAY effects.

@socketpair
Copy link

First thing that must be done - is the benchmarking and profiling. I would like to fix the problem

@socketpair
Copy link

socketpair commented Aug 25, 2019

#!/usr/bin/python3

import time

import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado import gen

MSG = b'x' * 65536


class ChatSocketHandler(tornado.websocket.WebSocketHandler):
    def get_compression_options(self):
        # Non-None enables compression with default options.
        return None

    @gen.coroutine
    def on_message(self, message):
        iterations = 10000
        start = time.monotonic()
        for _ in range(iterations):
            yield self.write_message(MSG)  # yielding here changes from 319 to 265 MB/s
        stop = time.monotonic()
        print('Speed: {:0.02f} MB/s'.format(iterations * len(MSG) / ((stop - start) * 1000000)))
        self.close()


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.set_header('content-type', 'text/html')
        self.write('''
        <body>
            <script>
                var ws = new WebSocket("ws://localhost:1234/test");
                ws.onopen = function() {
                   ws.send("start!");
                };
                ws.onmessage = function (evt) {
                   // alert(evt.data);
                };
            </script>
        </body>
        ''')


def main():
    app = tornado.web.Application(handlers=[
        (r'/', MainHandler),
        (r"/test", ChatSocketHandler)
    ])
    app.listen(1234)
    tornado.ioloop.IOLoop.current().start()


if __name__ == "__main__":
    main()

Gives about 290 MB/s on my laptop.

@socketpair
Copy link

socketpair commented Aug 25, 2019

@bryevdv please provide typical usage patterns. I've profiled write-intensive case shown above. Here is the distribution:

profile

I'll investigate further what is the bottleneck. First, I need to discover how really slow copying is. Stay tuned.

@bryevdv
Copy link
Author

bryevdv commented Aug 25, 2019

@socketpair here is a Bokeh server example that transfers dense 2d arrays every time a slider updates:

https://github.com/bokeh/bokeh/blob/master/examples/app/image_blur.py

That's probably the simplest example of the kind of usage Bokeh specifically would benefit from (but anyone transmitting dense arrays, I imagine). That example uses a 512x512 array and it performs acceptably on my laptop. Going to 1024x1024 is somewhat laggier. To run it:

bokeh serve --show image_blur.py

If you need to do any instrumentation and have questions I'd suggest pinging me on the gitter dev channel: https://gitter.im/bokeh/bokeh-dev

@socketpair
Copy link

socketpair commented Aug 28, 2019

@bryevdv Please describe the usage pattern that is used during websocket operations. As far as I understand from previous messages, it sendы about 3MB of data in one big websocket message. Right?

If yes, does this message constructed from small pieces before transferring it to tornado webocket API?

if no, what is the typical size of the messages? Does websocket client acknowledges each piece after reception?

I need to construct realistic test application in order to distinguish between tornado websocket implementation inefficiency and Bokeh problems.

Also, does compression make sense ? I mean, additional deplay may be caused by compression.

Even more, I did not find usage of set_nodelay() in Bokeh. By default, in Tornado it is switched OFF.

Do you really think, that the main reason of delays is in Tornado ? Yep, I can try to dive into Bokeh sources to make performance better. But this should be discussed in a different Github project.

Please answer to the all questions from the message.

@bryevdv
Copy link
Author

bryevdv commented Aug 28, 2019

@socketpair As a precursor, here is is the Bokeh message protocol:

    [
        # these are required
        b'{header}',        # serialized header dict (see below)
        b'{metadata}',      # serialized metadata dict
        b'{content},        # serialized content dict

        # these are optional, and come in pairs; header contains num_buffers
        b'{buf_header}',    # serialized buffer header dict
        b'array'            # raw buffer payload data
        ...
    ]
    
    header = {
        # these are required
        'msgid'       : <str> # a unique id for the message
        'msgtype'     : <str> # a message type, e.g. 'ACK', 'PATCH-DOC', etc

        # these are optional
        'num_buffers' : <int> # the number of additional buffers, if any
    }

As far as I understand from previous messages, it sendы about 3MB of data in one big websocket message. Right?

If yes, does this message constructed from small pieces before transferring it to tornado webocket API?

Not entirely sure how to answer. Each NumPy array is sent as a single buffer, in the protocol above. The entire overall message is an assembly, but the buffers are not. They are basically just a pointers to NumPy array memory that passed to the write calls, they are not encoded, chunked, serialized, or assembled in any way before passing to tornado.

if no, what is the typical size of the messages? Does websocket client acknowledges each piece after reception?

There is no typical size. Users will send whatever they send. We would just like to make the ceiling for reasonable use as high as possible. E.g. 2000x2000 RGBA image array updating in a response to a slider drag would be a great goal to achieve (I"m not sure if that is possible or not, but that's what I'd aim for) The protocol has an initial ACK, but subsequently there is not any ACK after every message.

Also, does compression make sense ? I mean, additional delay may be caused by compression.

It's possible, though offhand I expect it would depend on the payload sizes.

Even more, I did not find usage of set_nodelay() in Bokeh. By default, in Tornado it is switched OFF.

I don't recall that we use it, we can certainly enable it if it makes sense.

Do you really think, that the main reason of delays is in Tornado ? Yep, I can try to dive into Bokeh sources to make performance better. But this should be discussed in a different Github project.

I think there are definitely avoidable copies in the Tornado codepaths, as described above (e.g. the frame += data concatenation that happens in _write_frame) and that removing avoidable copies was useful to boost performance in other contexts (HTTP requests), so that it's worth investigating if removing avoidable copies might do the same here. I am sure there are improvements on the Bokeh side too, and would certainly be interested to discuss those elsewhere. I don't know for certain which dominates, though I would defer to any conclusions you reach.

@bryevdv
Copy link
Author

bryevdv commented Aug 28, 2019

Here is the important codepath for Bokeh:

https://github.com/bokeh/bokeh/blob/master/bokeh/protocol/messages/patch_doc.py

Specifically:

        patch_json, buffers = process_document_events(events, use_buffers)
        content = loads(patch_json)

        msg = cls(header, metadata, content)

        for (header, payload) in buffers:
            msg.add_buffer(header, payload)

As events are serialized, any NumPy arrays that are encountered are represented in the JSON with a small identifier, and and actually numpy data is returned separately in a (header, payload) buffer. (buffers are tuples, so the small text header for the buffer can be sent separately from the large binary payload. It's finding avoidable copies of the (potentially large) payload that this issue is questioning)

Edit: Also if it would help to have a call, I'm happy to do a screenshare and go through the Bokeh sides of things.

@bryevdv
Copy link
Author

bryevdv commented Aug 28, 2019

On Aug 28, 2019, at 11:50 AM, Коренберг Марк notifications@github.com wrote:

OK, Now I understand. Messages are concatenated in Bokeh before sending to Tornado, and you want to teach Tornado to accept message by list of parts. Sorry for misunderstanding before. Now I know what to benchmark.

I don't think I agree with this unless I am misunderstanding it. In the protocol message description above, Bokeh sends all of the parts listed individually. It does not concatenate them. The possibility I was wondering about then is the (potentially large) payload buffer for arrays (which bokeh sends individually) being copied internally in tornado.

Edit: Maybe I was confusing in stating "the entire message is an assembly" I did not mean that the entire message is concatenated. Every sub-piece of out messages are sent individually:

       with (yield conn.write_lock.acquire()):
            sent = 0

            yield conn.write_message(self.header_json, locked=False)
            sent += len(self.header_json)

            yield conn.write_message(self.metadata_json, locked=False)
            sent += len(self.metadata_json)

            yield conn.write_message(self.content_json, locked=False)
            sent += len(self.content_json)

            sent += yield self.write_buffers(conn, locked=False)

            raise gen.Return(sent)

where write_buffers is basically:

       for header, payload in self._buffers:
            yield conn.write_message(header, locked=locked)
            yield conn.write_message(payload, binary=True, locked=locked)
            sent += (len(header) + len(payload))

@bryevdv
Copy link
Author

bryevdv commented Aug 28, 2019

@socketpair actually I did forget one important thing. Bokeh has to do this below, because Tornado write_message does not accept memory views:

    buffer_id = make_id()
    buf = (dict(id=buffer_id), array.tobytes())
    buffers.append(buf)

The array.tobytes() makes a copy, but that copy could be avoided if Tornado would accept memoryview directly. That along with the internal concat in _write_frame mean at least two avoidable data copies.

@socketpair
Copy link

socketpair commented Aug 28, 2019

@bryevdv I've managed to make a dirty (but working) patch to Tornado that does everything you need: memoryviews, and ability to pass an array of items to write_message(). Please take a look. The work took about 3 hours only :). So, I did not make any good benchmarks actually.

I've provided test of new API in the PR.

strace shows:
изображение

As you can see, no concatenation happened.

Also, I know that Python's implementation of IOV allocates an array every time you call socket.sendmsg(). In other words, we eliminate copying, but add allocation and filling of this array, which could be slower than concatenation if we pass many small-sized buffers.

@socketpair
Copy link

socketpair commented Aug 28, 2019

You may already try to use this patched version of Tornado with slight changes in Bokeh. If such change appropriate for you, I can work on cleaning the patch.

@socketpair
Copy link

And also, TCP_NODELAY would greatly help in bokeh.

@bryevdv
Copy link
Author

bryevdv commented Aug 28, 2019

You may already try to use this patched version of Tornado with slight changes in Bokeh. If such change appropriate for you, I can work on cleaning the patch.

@socketpair That would be great, I have a three day weekend coming up in a few day where I planned to catch up on a lot of Bokeh work, I will check out things in detail then

And also, TCP_NODELAY would greatly help in bokeh.

Where would this be set?

@socketpair
Copy link

@bryevdv TCP_NODELAY should be set once in every connection. Somewhere after connection establishment. This is especially important since you split big message to small ones. Without setting it, you are having delay after each submessage. Delays are OS-specific, and by my measures could reach 0.5 seconds.

@socketpair
Copy link

Because of websocket framing, I would discourage you from splitting big message to small parts.

@ploxiln
Copy link
Contributor

ploxiln commented Aug 29, 2019

@ploxiln
Copy link
Contributor

ploxiln commented Aug 29, 2019

... looks like this is done automatically for the tornado websocket client (which uses SimpleAsyncHTTPClient)
https://github.com/tornadoweb/tornado/blob/v6.0.3/tornado/simple_httpclient.py#L494

@bryevdv
Copy link
Author

bryevdv commented Aug 31, 2019

ability to pass an array of items to write_message()

FYI @socketpair I don't have an opinion on this, it's not something I was thinking of previously, and I don't think it has any bearing on performance? I was mostly interested in:

  • allowing write_messages to take a memoryview so that the array.tobytes in bokeh could be removed. Did this already happen sometime previously since last year?

  • seeing if the concatenations in _write_frame might be better off avoided when the message is large (e.g by using writev to send the parts as separate buffers. It seems like you think the answer to this is no?

In any case I will see about changing the Bokeh code to send a memory view this weekend.

@socketpair
Copy link

socketpair commented Sep 1, 2019

  1. Yes, you can now pass memoryview as a source of websocket message. I did not check, but I bear in mind that requirement during making the patch.
  2. Yes, I've removed any concatenations in Tornado's websocket.
  3. Yes, it now uses sendmsg() which naturally uses construction like IOV in writev(). (see https://docs.python.org/3/library/socket.html#socket.socket.sendmsg : bufferS). And also see my screenshot in strace. Yes, I can change it to writev(). it will work, but it is not intended for sockets actually. And will not work for SSL, for example.

@socketpair
Copy link

I've fixed code again (two lines) and now you can pass memoryview without any arrays, like yield self.write_message(memoryview(b'xxx'))

@bdarnell bdarnell mentioned this issue Sep 2, 2019
@socketpair
Copy link

socketpair commented Sep 2, 2019

@bryevdv could you please test performance using Bokeh ? Please remove any concatenations and conversion to bytes.

Yes, my patch is dirty alot, but should work. If ok, I will write own benchmark, tests and cleanup everything in the patch.

@bryevdv
Copy link
Author

bryevdv commented Sep 2, 2019

@socketpair We don't currently do any concatenations on the Bokeh side? I am about to try passing the memoryview with your branch, presently.

@bryevdv
Copy link
Author

bryevdv commented Sep 2, 2019

@socketpair if I make this change to pass a memoryview to write_message:

diff --git a/bokeh/util/serialization.py b/bokeh/util/serialization.py
index 79650827d..8b9c4488e 100644
--- a/bokeh/util/serialization.py
+++ b/bokeh/util/serialization.py
@@ -519,7 +519,7 @@ def encode_binary_dict(array, buffers):

     '''
     buffer_id = make_id()
-    buf = (dict(id=buffer_id), array.tobytes())
+    buf = (dict(id=buffer_id), memoryview(array))
     buffers.append(buf)

     return {

Then I see an assertion error in Tornado

  File "/Users/bryan/work/bokeh/bokeh/server/views/ws.py", line 269, in write_message
    yield super(WSHandler, self).write_message(message, binary)
  File "/Users/bryan/work/tornado/tornado/websocket.py", line 342, in write_message
    return self.ws_connection.write_message(message, binary=binary)
  File "/Users/bryan/work/tornado/tornado/websocket.py", line 1116, in write_message
    fut = self._write_frame(True, opcode, message, flags=flags)
  File "/Users/bryan/work/tornado/tornado/websocket.py", line 1086, in _write_frame
    return self.stream.write([frame] + data)
  File "/Users/bryan/work/tornado/tornado/iostream.py", line 568, in write
    self._handle_write()
  File "/Users/bryan/work/tornado/tornado/iostream.py", line 974, in _handle_write
    self._write_buffer.advance(num_bytes)
  File "/Users/bryan/work/tornado/tornado/iostream.py", line 200, in advance
    assert 0 < size <= self._size

when I run the bokeh serve --show image_blur.py and subsequently scrub the slider

@bryevdv
Copy link
Author

bryevdv commented Sep 10, 2019

@socketpair do you have any thoughts on he error above when passing a memoryview to write_message?

@bryevdv bryevdv closed this as completed Nov 30, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants