Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For discussion - Ideas for outlet optimizations #144

Open
cboulay opened this issue Sep 29, 2021 · 5 comments
Open

For discussion - Ideas for outlet optimizations #144

cboulay opened this issue Sep 29, 2021 · 5 comments

Comments

@cboulay
Copy link
Collaborator

cboulay commented Sep 29, 2021

push_chunk -- memcpy entire chunk instead of enqueue sample-by-sample

I'm guessing we would need transform the sample class so the data_ member was a pointer to an address within a larger chunk-size buffer somewhere, then adjacent (in time) samples' data would occupy contiguous space in memory. sample::save_streambuf (in client_session::transfer_samples_thread) appears to be compatible with this, and we could get even more benefit if we augment save_streambuf with an argument for n_samples = 1 and save_raw's third argument will be datasize() * n_samples.

This actually looks a little more plausible than I originally thought. It's still a pretty big endeavour.

boost::asio::socket_base::send_buffer_size -- configurable via lsl_api.cfg

This isn't really an optimization so much as a parameter to allow very high throughput outlets to not choke on bursty inlets (whether due to network or the inlet process).

Thoughts?
@tstenner @chkothe

@cboulay
Copy link
Collaborator Author

cboulay commented Sep 29, 2021

The previously first idea is a dud. send_buffer::push_sample is a p_sample copy, so a pointer, not the actual data. This is cheap and not worth optimizing. I've removed it.

@cboulay
Copy link
Collaborator Author

cboulay commented Sep 30, 2021

send_buffer_size merged in #145

@cboulay
Copy link
Collaborator Author

cboulay commented Oct 2, 2021

I understand the over-the-wire protocol a little better now. We can't push a chunk of data through directly unless it's been a priori formatted as 1|2 (ts) ch0 ch1 ... chx 1|2 (ts) ch0 ch1 ... chx etc, where 1|2 is a uint8_t flag to indicate whether or not a timestamp follows, ts is an optional double timestamp depending on the flag, and chx are sample values of any type.

So if the data provider encodes their data that way then it can push all the way to asio with no copies. This also assumes endianness compatibility between sender and receiver!

It's probably pretty rare that the data comes off the device formatted that way (with flags and timestamps interleaved in each sample), so instead we can use a gather-write vector of buffers. Thank you @tstenner for cluing me into this.

std::vector<const_buffer> bufs;
for (samp_idx...) {
    bufs.push_back(boost::asio::buffer(&flags[TAG_XXX_TIMESTAMP], 1));
    if (...) bufs.push_back(boost::asio::buffer(&ts[samp_idx], sizeof(double)));
    bufs.push_back(boost::asio::buffer(&data[samp_idx], sizeof(data[0]) * nchan));
}
for (consumer...) {
    bytes_transferred = consumer_->socket.send(bufs);
}

I just need access to the socket...

@cboulay
Copy link
Collaborator Author

cboulay commented Oct 4, 2021

I figured out a way to do the blocking synchronous write with zero copies.

With 2,000 channels at 30,000 samples per second, CPU utilization on the outlet process decreases from 18.3% to 5% on my desktop (i5-8400). Pretty good!

Unfortunately I can't test what this means in practical terms of "what does this now enable us to do that we couldn't before?" because I'm bottle-necked either by the network or CPU usage of the receiver application. I guess that's a good problem to have. After I clean this up, I think the next step will be to work on optimizing the inlet.

@tstenner provided me with a one-liner that could mimic an inlet with almost no overhead for testing purposes. This works well in the new synchronous-write mode. But for some reason in the original asynchronous mode, the outlet process consumes much more CPU using this one-liner than it would if I were using a normal inlet application. (For reference, it's spending most of its time in lsl::sample::serialize_channels). Unfortunately I can't make any conclusions about gains here because I don't know what the limits are in asynchronous mode.

One-liner for posterity:
nc -t localhost 16573 < handshake.dmp > /dev/null

Where handshake.dmp is a file with the following contents:

LSL:streamfeed/110 ce67fa29-595e-43e2-adce-b10283840b4c
Native-Byte-Order: 1234
Endian-Performance: 6.14477e+06
Has-IEEE754-Floats: 1
Supports-Subnormals: 0
Value-Size: 8
Data-Protocol-Version: 110
Max-Buffer-Length: 300
Max-Chunk-Length: 0
Hostname: chad-ubuntu-18
Source-Id: example-SendDataInChunks
Session-Id: default

The long hex-code in the first line has to be replaced with the stream UID which can be obtained from the outlet.info().uid(). Source-Id and Hostname should also be changed.

@tstenner
Copy link
Collaborator

tstenner commented Oct 5, 2021

But for some reason in the original asynchronous mode, the outlet process consumes much more CPU using this one-liner than it would if I were using a normal inlet application.

That's because the Value-Size doesn't match the stream's size (2 in your case) so the data protocol gets downgraded to 100 silently. This can be circumvented with a small script out of the you-shouldn't-do-that-unless-you-have-to-department:

import pylsl

# Generate a handshake packet to stresstest or debug an outlet
# Usage: Generate_Handshake.py my_stream_name | nc -t $outlet_hostname $ip -W 1
# $ip is usually 16573  
# omit -W 1 to receive the stream data  
# add > /dev/null to dump the received data immediately

def lsl_handshake(streamname, endian=1234, version=110, maxbuflen=300, maxchunklen=0):
    stream = pylsl.resolve_byprop('name', streamname, 1, 5)[0]
    value_sizes = [0, 4, 8, 32, 4, 2, 1, 8]
    fields = {
        'Native-Byte-Order': str(endian),
        'Endian-Performance': '0',
        'Has-IEEE754-Floats': '1',
        'Supports-Subnormals': '0',
        'Value-Size': str(value_sizes[stream.channel_format()]),
        'Data-Protocol-Version': str(version),
        'Max-Buffer-Length': str(maxbuflen),
        'Max-Chunk-Length': str(maxchunklen),
        'Hostname': stream.hostname(),
        'Session-Id': 'default'
        }
    print('LSL:streamfeed/110 ' + stream.uid())
    for key, value in fields.items():
        print(key, value, sep=': ')
    print('\r\n\r\n')

if __name__ == '__main__':
    import sys
    lsl_handshake('1' if len(sys.argv) < 2 else sys.argv[1])

For the first run, you should check with the -W 1 option that the parameters are interpreted correctly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants