From 2eb348ede74f9764ae635b979cd4d6f26c458751 Mon Sep 17 00:00:00 2001 From: Karim Malik <79924100+km-64@users.noreply.github.com> Date: Thu, 4 Sep 2025 18:57:57 +0200 Subject: [PATCH 1/5] Use python do-while loop equivalent in stream id allocation --- rsocket/stream_control.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/rsocket/stream_control.py b/rsocket/stream_control.py index f444e1b7..e8573c7e 100644 --- a/rsocket/stream_control.py +++ b/rsocket/stream_control.py @@ -21,15 +21,20 @@ def __init__(self, first_stream_id: int): def allocate_stream(self) -> int: attempt_counter = 0 - while (self._current_stream_id == CONNECTION_STREAM_ID - or self._current_stream_id in self._streams): - + # python do-while loop equivalent + repeat = True + while repeat: if attempt_counter > self._maximum_stream_id / 2: raise RSocketStreamAllocationFailure() self._increment_stream_id() attempt_counter += 1 + repeat = ( + self._current_stream_id == CONNECTION_STREAM_ID + or self._current_stream_id in self._streams + ) + return self._current_stream_id def _increment_stream_id(self): From dd7b4143f3a1e1025cb8908e9c1ddc66fc85c8a0 Mon Sep 17 00:00:00 2001 From: Karim Malik <79924100+km-64@users.noreply.github.com> Date: Thu, 4 Sep 2025 19:36:19 +0200 Subject: [PATCH 2/5] Decrement _first_stream_id by 2 to retain behaviour of first_stream_id --- rsocket/stream_control.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket/stream_control.py b/rsocket/stream_control.py index e8573c7e..e08646c3 100644 --- a/rsocket/stream_control.py +++ b/rsocket/stream_control.py @@ -13,7 +13,7 @@ class StreamControl: def __init__(self, first_stream_id: int): - self._first_stream_id = first_stream_id + self._first_stream_id = first_stream_id - 2 self._current_stream_id = self._first_stream_id self._streams: Dict[int, StreamHandler] = {} self._maximum_stream_id = MAX_STREAM_ID From a2d3deff7e10435cf7c5ace0254e921515c4688e Mon Sep 17 00:00:00 2001 From: Karim Malik <79924100+km-64@users.noreply.github.com> Date: Thu, 4 Sep 2025 20:20:48 +0200 Subject: [PATCH 3/5] Mask _self_stream_id after decrementing in __init__ --- rsocket/stream_control.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rsocket/stream_control.py b/rsocket/stream_control.py index e08646c3..adb238f7 100644 --- a/rsocket/stream_control.py +++ b/rsocket/stream_control.py @@ -13,7 +13,7 @@ class StreamControl: def __init__(self, first_stream_id: int): - self._first_stream_id = first_stream_id - 2 + self._first_stream_id = (first_stream_id - 2) & MAX_STREAM_ID self._current_stream_id = self._first_stream_id self._streams: Dict[int, StreamHandler] = {} self._maximum_stream_id = MAX_STREAM_ID From 026911508611c36a63a8e91f35b210f044112d29 Mon Sep 17 00:00:00 2001 From: Karim Malik <79924100+km-64@users.noreply.github.com> Date: Fri, 5 Sep 2025 15:58:56 +0200 Subject: [PATCH 4/5] Remove comment and invert logic for do-while behaviour --- rsocket/stream_control.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rsocket/stream_control.py b/rsocket/stream_control.py index adb238f7..53263195 100644 --- a/rsocket/stream_control.py +++ b/rsocket/stream_control.py @@ -21,16 +21,15 @@ def __init__(self, first_stream_id: int): def allocate_stream(self) -> int: attempt_counter = 0 - # python do-while loop equivalent - repeat = True - while repeat: + available_stream_id_found = False + while not available_stream_id_found: if attempt_counter > self._maximum_stream_id / 2: raise RSocketStreamAllocationFailure() self._increment_stream_id() attempt_counter += 1 - repeat = ( + available_stream_id_found = not ( self._current_stream_id == CONNECTION_STREAM_ID or self._current_stream_id in self._streams ) From 42a8f5fdb4a4469b2967f00f42c72bb689b4a53a Mon Sep 17 00:00:00 2001 From: Karim Malik <79924100+km-64@users.noreply.github.com> Date: Fri, 5 Sep 2025 16:06:06 +0200 Subject: [PATCH 5/5] Add test case for issue #330 --- tests/rsocket/test_stream_control.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/rsocket/test_stream_control.py b/tests/rsocket/test_stream_control.py index b4b93b4d..66a51965 100644 --- a/tests/rsocket/test_stream_control.py +++ b/tests/rsocket/test_stream_control.py @@ -66,6 +66,20 @@ def test_stream_control_reuse_old_stream_ids(): assert next_stream == 5 +@pytest.mark.parametrize('first_stream_id', (1, 2)) +def test_stream_id_increments_after_allocation_and_registration_followed_by_finishing(first_stream_id: int): + control = StreamControl(first_stream_id) + dummy_stream = object() + + allocated_id = control.allocate_stream() + control.register_stream(allocated_id, dummy_stream) + + control.finish_stream(allocated_id) + new_allocated_id = control.allocate_stream() + + assert new_allocated_id != allocated_id + + def test_stream_in_use(): control = StreamControl(1)