Following the introduction of Protobuf (de)serialization in #20331 the deserialisation process assumes two things which are invalid:
- DataBuffer behaves like InputStream (returns -1 when no data left)
- Data in DataBuffer will somehow magically avoid splitting bytes that represent messageBytesToRead across two separate DataBuffer instances (which can happen in practice. I actually saw that the first time I ran decoding of delimited stream bigger than few items
To solve the issue I think it would be necessary to:
introduce some additional DataBuffer as a member of MessageDecoderFunction to keep the data from the previous instance of input (DataBuffer) from previous item in Flux that was not a full integer
stop using -1 as a signal for failure of parsing. -1 as a byte value can be just a part of some valid varint32 representation. Instead I would propose trying to readVarint32 given the available remaining data in current input and if it fails then restore the read position to before the read and store the remaining bytes from input in the additional buffer proposed in point above
once in next flux item check if there is some data in the additional buffer and if there is then try appending enough of data from new input into the additional buffer to make sure we can read varint32 (64 bytes seems to be the maximum required to read it). Try read the value and if succeeded then restore the input read position to the place right after the last byte necessary to read the varint32.
set the messageBytesToRead based on the read varint32 and proceed to the normal flow
This is obviously just one proposed solution, there's many that will work equally well.
that would split the initial and/or subsequent messageBytesToRead varint32 into separate DataBuffers. For that the sample messages sizes may have to be increased significantly. Also it would be good to add a test that verifies that a messageBytesToRead that has got varint32 representation starting with a byte value -1 (as int) is not throwing exception.
Please let me know if you need any further details or help in preparing reproducing test and/or solution.
Grzegorz Ligas Fix merged in master. If you can find the time to test 5.1.3 snapshots with your use case before the release expected on Monday, that would be great. Thanks for this detailed and accurate bug report.