Skip to content

StreamControl does not increment the current stream id after removing a stream id and repeats the same id #330

@km-64

Description

@km-64

The RSocket protocol specification states that stream ids must increment by two:

https://github.com/rsocket/rsocket/blob/master/Protocol.md#generation

Stream IDs on the client MUST start at 1 and increment by 2 sequentially, such as 1, 3, 5, 7, etc.

Stream IDs on the server MUST start at 2 and increment by 2 sequentially, such as 2, 4, 6, 8, etc.

Looking at the implementation in rsocket-py, it doesn't use a do-while loop as is done in rsocket-java rsocket-core/src/main/java/io/rsocket/core/StreamIdSupplier.java and instead uses a while loop.

Expected behaviour

The rsocket-java implementation StreamIdSupplier seems to implement the correct behaviour consistent with the spec.

StreamIdSupplier supplier = StreamIdSupplier.clientSupplier();
IntObjectMap<Object> streamIds = new IntObjectHashMap<>();

// Create a stream id
int id1 = supplier.nextStreamId(streamIds);
System.out.println(id1);
// 1

// Remove the stream id
map.remove(id1);

// Generate a new stream id
int id2 = supplier.nextStreamId(streamIds);
System.out.println(id2); 
// 3

Actual behaviour

The python implementation will repeat the same value after the stream has been finished.

from rsocket.stream_control import StreamControl

control = StreamControl(1) 
dummy_stream = object()

# Register a stream with a new id
stream_id = control.allocate_stream()
print(stream_id)
# 1
stream.register_stream(stream_id, dummy_stream)

# Finish that stream
control.finish_stream(stream_id)

# Generate a new stream id
new_stream_id = control.allocate_stream()
print(new_stream_id)
#  1  (should be 3)

Comparison between implementations

  int nextStreamId(IntObjectMap<?> streamIds) {
    int streamId;
    do {
      this.streamId += 2;
      streamId = (int) (this.streamId & MASK);
    } while (streamId == 0 || streamIds.containsKey(streamId));
    return streamId;
  }

https://github.com/rsocket/rsocket-java/blob/fc64b43ac4000772df7fc8334684b715f63cdd68/rsocket-core/src/main/java/io/rsocket/core/StreamIdSupplier.java#L46-L53

def allocate_stream(self) -> int:
attempt_counter = 0
while (self._current_stream_id == CONNECTION_STREAM_ID
or self._current_stream_id in self._streams):
if attempt_counter > self._maximum_stream_id / 2:
raise RSocketStreamAllocationFailure()
self._increment_stream_id()
attempt_counter += 1
return self._current_stream_id

Metadata

Metadata

Assignees

Labels

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions