Skip to content

Commit

Permalink
Merge pull request #512 from nats-io/js-fixes
Browse files Browse the repository at this point in the history
JetStream updates and fixes
  • Loading branch information
wallyqs committed Oct 27, 2023
2 parents 12fe022 + 47b5838 commit 8d3f053
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 23 deletions.
16 changes: 9 additions & 7 deletions nats/aio/client.py
Expand Up @@ -50,6 +50,7 @@
from nats.protocol.parser import (
AUTHORIZATION_VIOLATION,
PERMISSIONS_ERR,
PING,
PONG,
STALE_CONNECTION,
Parser,
Expand All @@ -64,7 +65,7 @@
)
from .transport import TcpTransport, Transport, WebSocketTransport

__version__ = '2.5.0'
__version__ = '2.6.0'
__lang__ = 'python3'
_logger = logging.getLogger(__name__)
PROTOCOL = 1
Expand Down Expand Up @@ -1705,22 +1706,23 @@ async def _process_msg(

# Process flow control messages in case of using a JetStream context.
ctrl_msg = None
fcReply = None
fc_reply = None
if sub._jsi:
#########################################
# #
# JetStream Control Messages Processing #
# #
#########################################
jsi = sub._jsi
jsi._active = True
if hdr:
ctrl_msg = self._is_control_message(data, hdr)

# Check if the heartbeat has a "Consumer Stalled" header, if
# so, the value is the FC reply to send a nil message to.
# We will send it at the end of this function.
if ctrl_msg and ctrl_msg.startswith("Idle"):
fcReply = hdr.get(nats.js.api.Header.CONSUMER_STALLED)
fc_reply = hdr.get(nats.js.api.Header.CONSUMER_STALLED)

# OrderedConsumer: checkOrderedMsgs
if not ctrl_msg and jsi._ordered and msg.reply:
Expand Down Expand Up @@ -1788,15 +1790,15 @@ async def _process_msg(
# DATA message that was received before this flow control message, which
# has sequence `jsi.fciseq`. However, it is possible that this message
# has already been delivered, in that case, we need to send the FC reply now.
if sub.delivered >= sub._jsi._fciseq:
fcReply = msg.reply
if sub._jsi.get_js_delivered() >= sub._jsi._fciseq:
fc_reply = msg.reply
else:
# Schedule a reply after the previous message is delivered.
sub._jsi.schedule_flow_control_response(msg.reply)

# Handle flow control response.
if fcReply:
await self.publish(fcReply)
if fc_reply:
await self.publish(fc_reply)

if ctrl_msg and not msg.reply and ctrl_msg.startswith("Idle"):
if sub._jsi:
Expand Down
134 changes: 127 additions & 7 deletions nats/js/client.py
Expand Up @@ -358,6 +358,15 @@ async def subscribe_bind(
sub=sub,
ccreq=config,
)

if config.idle_heartbeat:
sub._jsi._hbtask = asyncio.create_task(sub._jsi.activity_check())

if ordered_consumer:
sub._jsi._fctask = asyncio.create_task(
sub._jsi.check_flow_control_response()
)

return psub

@staticmethod
Expand All @@ -375,7 +384,7 @@ async def new_callback(msg: Msg) -> None:
async def pull_subscribe(
self,
subject: str,
durable: str,
durable: Optional[str] = None,
stream: Optional[str] = None,
config: Optional[api.ConsumerConfig] = None,
pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
Expand Down Expand Up @@ -415,16 +424,30 @@ async def main():
if stream is None:
stream = await self._jsm.find_stream_name_by_subject(subject)

should_create = True
try:
# TODO: Detect configuration drift with the consumer.
await self._jsm.consumer_info(stream, durable)
if durable:
await self._jsm.consumer_info(stream, durable)
should_create = False
except nats.js.errors.NotFoundError:
pass

consumer_name = durable
if should_create:
# If not found then attempt to create with the defaults.
if config is None:
config = api.ConsumerConfig()

# Auto created consumers use the filter subject.
# config.name = durable
config.filter_subject = subject
config.durable_name = durable
if durable:
config.name = durable
config.durable_name = durable
else:
consumer_name = self._nc._nuid.next().decode()
config.name = consumer_name

await self._jsm.add_consumer(stream, config=config)

return await self.pull_subscribe_bind(
Expand All @@ -433,6 +456,7 @@ async def main():
inbox_prefix=inbox_prefix,
pending_bytes_limit=pending_bytes_limit,
pending_msgs_limit=pending_msgs_limit,
name=consumer_name,
)

async def pull_subscribe_bind(
Expand All @@ -442,6 +466,7 @@ async def pull_subscribe_bind(
inbox_prefix: bytes = api.INBOX_PREFIX,
pending_msgs_limit: int = DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
pending_bytes_limit: int = DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
name: Optional[str] = None,
) -> JetStreamContext.PullSubscription:
"""
pull_subscribe returns a `PullSubscription` that can be delivered messages
Expand Down Expand Up @@ -475,11 +500,16 @@ async def main():
pending_msgs_limit=pending_msgs_limit,
pending_bytes_limit=pending_bytes_limit
)
consumer = None
if durable:
consumer = durable
else:
consumer = name
return JetStreamContext.PullSubscription(
js=self,
sub=sub,
stream=stream,
consumer=durable,
consumer=consumer,
deliver=deliver,
)

Expand Down Expand Up @@ -533,18 +563,76 @@ def __init__(
self._sub = sub
self._ccreq = ccreq

# Heartbeat
self._hbtask = None
self._hbi = None
if ccreq and ccreq.idle_heartbeat:
self._hbi = ccreq.idle_heartbeat

# Ordered Consumer implementation.
self._dseq = 1
self._sseq = 0
self._cmeta: Optional[str] = None
self._fcr: Optional[str] = None
self._fcd = 0
self._fciseq = 0
self._active: Optional[bool] = None
self._active: Optional[bool] = True
self._fctask = None

def track_sequences(self, reply: str) -> None:
self._fciseq += 1
self._cmeta = reply

def schedule_flow_control_response(self, reply: str) -> None:
pass
self._active = True
self._fcr = reply
self._fcd = self._fciseq

def get_js_delivered(self):
if self._sub._cb:
return self._sub.delivered
return self._fciseq - self._sub._pending_queue.qsize()

async def activity_check(self):
# Can at most miss two heartbeats.
hbc_threshold = 2
while True:
try:
if self._conn.is_closed:
break

# Wait for all idle heartbeats to be received,
# one of them would have toggled the state of the
# consumer back to being active.
await asyncio.sleep(self._hbi * hbc_threshold)
active = self._active
self._active = False
if not active:
if self._ordered:
await self.reset_ordered_consumer(
self._sseq + 1
)
except asyncio.CancelledError:
break

async def check_flow_control_response(self):
while True:
try:
if self._conn.is_closed:
break

if (self._fciseq - self._psub._pending_queue.qsize()) >= self._fcd:
fc_reply = self._fcr
try:
if fc_reply:
await self._conn.publish(fc_reply)
except Exception:
pass
self._fcr = None
self._fcd = 0
await asyncio.sleep(0.25)
except asyncio.CancelledError:
break

async def check_for_sequence_mismatch(self,
msg: Msg) -> Optional[bool]:
Expand Down Expand Up @@ -684,6 +772,38 @@ def delivered(self) -> int:
"""
return self._sub._received

@delivered.setter
def delivered(self, value):
self._sub._received = value

@property
def _pending_size(self):
return self._sub._pending_size

@_pending_size.setter
def _pending_size(self, value):
self._sub._pending_size = value

async def next_msg(self, timeout: Optional[float] = 1.0) -> Msg:
"""
:params timeout: Time in seconds to wait for next message before timing out.
:raises nats.errors.TimeoutError:
next_msg can be used to retrieve the next message from a stream of messages using
await syntax, this only works when not passing a callback on `subscribe`::
"""
msg = await self._sub.next_msg(timeout)

# In case there is a flow control reply present need to handle here.
if self._sub and self._sub._jsi:
self._sub._jsi._active = True
if self._sub._jsi.get_js_delivered() >= self._sub._jsi._fciseq:
fc_reply = self._sub._jsi._fcr
if fc_reply:
await self._conn.publish(fc_reply)
self._sub._jsi._fcr = None
return msg

class PullSubscription:
"""
PullSubscription is a subscription that can fetch messages.
Expand Down
2 changes: 2 additions & 0 deletions nats/js/manager.py
Expand Up @@ -64,6 +64,8 @@ async def find_stream_name_by_subject(self, subject: str) -> str:
info = await self._api_request(
req_sub, req_data.encode(), timeout=self._timeout
)
if not info['streams']:
raise NotFoundError
return info['streams'][0]

async def stream_info(self, name: str) -> api.StreamInfo:
Expand Down
9 changes: 4 additions & 5 deletions nats/protocol/parser.py
@@ -1,4 +1,4 @@
# Copyright 2016-2021 The NATS Authors
# Copyright 2016-2023 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -63,7 +63,7 @@
# States
AWAITING_CONTROL_LINE = 1
AWAITING_MSG_PAYLOAD = 2
MAX_CONTROL_LINE_SIZE = 1024
MAX_CONTROL_LINE_SIZE = 4096

# Protocol Errors
STALE_CONNECTION = "stale connection"
Expand Down Expand Up @@ -165,10 +165,9 @@ async def parse(self, data: bytes = b''):
del self.buf[:info.end()]
continue

if len(self.buf
) < MAX_CONTROL_LINE_SIZE and _CRLF_ in self.buf:
if len(self.buf) < MAX_CONTROL_LINE_SIZE and _CRLF_ in self.buf:
# FIXME: By default server uses a max protocol
# line of 1024 bytes but it can be tuned in latest
# line of 4096 bytes but it can be tuned in latest
# releases, in that case we won't reach here but
# client ping/pong interval would disconnect
# eventually.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -4,7 +4,7 @@
# These are here for GitHub's dependency graph and help with setuptools support in some environments.
setup(
name="nats-py",
version='2.5.0',
version='2.6.0',
license='Apache 2 License',
extras_require={
'nkeys': ['nkeys'],
Expand Down

0 comments on commit 8d3f053

Please sign in to comment.