Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add couple of extra unique token characters to requests #335

Merged
merged 1 commit into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions nats/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import ssl
import sys
import time
from secrets import token_hex
from dataclasses import dataclass
from email.parser import BytesParser
from random import shuffle
Expand Down Expand Up @@ -56,7 +57,7 @@
Subscription,
)

__version__ = '2.1.6'
__version__ = '2.1.7'
__lang__ = 'python3'
_logger = logging.getLogger(__name__)
PROTOCOL = 1
Expand Down Expand Up @@ -168,12 +169,8 @@ def __init__(self) -> None:
self._reconnection_task: Union[asyncio.Task[None], None] = None
self._reconnection_task_future: Optional[asyncio.Future] = None
self._max_payload: int = DEFAULT_MAX_PAYLOAD_SIZE
# This is the client id that the NATS server knows
# about. Useful in debugging application errors
# when logged with this identifier along
# with nats server log.
# This would make more sense if we log the server
# connected to as well in case of cluster setup.

# client id that the NATS server knows about.
self._client_id: Optional[str] = None
self._sid: int = 0
self._subs: Dict[int, Subscription] = {}
Expand Down Expand Up @@ -929,8 +926,10 @@ async def _request_new_style(
await self._init_request_sub()
assert self._resp_sub_prefix

# Use a new NUID for the token inbox and then use the future.
# Use a new NUID + couple of unique token bytes to identify the request,
# then use the future to get the response.
token = self._nuid.next()
token.extend(token_hex(2).encode())
inbox = self._resp_sub_prefix[:]
inbox.extend(token)
future: asyncio.Future = asyncio.Future()
Expand Down
10 changes: 4 additions & 6 deletions nats/nuid.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# limitations under the License.
#

from random import Random, SystemRandom
from random import Random
from sys import maxsize as MaxInt
from secrets import token_bytes, randbelow

DIGITS = b'0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
BASE = 62
Expand All @@ -33,8 +34,7 @@ class NUID:
"""

def __init__(self) -> None:
self._srand = SystemRandom()
self._prand = Random(self._srand.randint(0, MaxInt))
self._prand = Random(randbelow(MaxInt))
self._seq = self._prand.randint(0, MAX_SEQ)
self._inc = MIN_INC + self._prand.randint(BASE + 1, INC)
self._prefix = bytearray()
Expand All @@ -60,9 +60,7 @@ def next(self) -> bytearray:
return prefix

def randomize_prefix(self) -> None:
random_bytes = (
self._srand.getrandbits(8) for i in range(PREFIX_LENGTH)
)
random_bytes = token_bytes(PREFIX_LENGTH)
self._prefix = bytearray(DIGITS[c % BASE] for c in random_bytes)

def reset_sequential(self) -> None:
Expand Down
16 changes: 16 additions & 0 deletions tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,22 @@ async def error_cb(err):
assert len(msgs) <= 100
assert sub.pending_msgs == 0
assert sub.pending_bytes == 0

# Consumer has a single message pending but none in buffer.
await js.publish("a3", b'last message')
info = await sub.consumer_info()
assert info.num_pending == 1
assert sub.pending_msgs == 0

# Remove interest
await sub.unsubscribe()
with pytest.raises(TimeoutError):
await sub.fetch(1, timeout=1)

# The pending message is still there, but not possible to consume.
info = await sub.consumer_info()
assert info.num_pending == 1

await nc.close()


Expand Down
6 changes: 3 additions & 3 deletions todo.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
- [X] io_loop becomes loop parameter
- [X] Drain Mode
- [X] Connect timeout
- [X] Adopt async/await in client
- [X] Subscription object on subscribe
- [X] Error handler yields the subscription
- [ ] Use asyncio.Protocol
- [ ] Adopt async/await in client
- [ ] Subscription object on subscribe
- [ ] Error handler yields the subscription