Skip to content

Commit

Permalink
Release v0.15.2
Browse files Browse the repository at this point in the history
  • Loading branch information
jcass77 committed Aug 5, 2020
2 parents c732833 + df7ce54 commit 730e983
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 93 deletions.
9 changes: 9 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
This changelog is used to track all major changes to WTFIX.


## v0.15.2 (2020-08-05)

**Fixes**

- `client_session`: Don't wait for `writer` to close when shutting down in order to avoid hangs due to network errors.
- Use the recommended `asyncio.create_task` to create new Tasks, which is preferred to `asyncio.ensure_future`.
- Fix issue that caused the `client_session` listener task to hang during shutdown.


## v0.15.1 (2020-07-28)

**Fixes**
Expand Down
27 changes: 15 additions & 12 deletions run_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,23 @@
help=f"reset sequence numbers and start a new session",
)

_shutting_down = asyncio.Event()


async def graceful_shutdown(pipeline, sig_name=None):
if _shutting_down.is_set():
# Only try to shut down once
logger.warning(f"Shutdown already in progress! Ignoring signal '{sig_name}'.")
return

_shutting_down.set()

if sig_name is not None:
logger.info(f"Received signal {sig_name}! Initiating graceful shutdown...")
else:
logger.info(f"Initiating graceful shutdown...")

try:
await pipeline.stop()

except asyncio.exceptions.CancelledError as e:
logger.error(f"Cancelled: connection terminated abnormally! ({e})")
sys.exit(os.EX_UNAVAILABLE)
await pipeline.stop()


async def main():
Expand All @@ -65,6 +69,7 @@ async def main():
)

args = parser.parse_args()
exit_code = os.EX_UNAVAILABLE

with connection_manager(args.connection) as conn:
fix_pipeline = BasePipeline(
Expand All @@ -78,7 +83,7 @@ async def main():
for sig_name in {"SIGINT", "SIGTERM"}:
loop.add_signal_handler(
getattr(signal, sig_name),
lambda: asyncio.ensure_future(
lambda: asyncio.create_task(
graceful_shutdown(fix_pipeline, sig_name=sig_name)
),
)
Expand All @@ -89,26 +94,24 @@ async def main():
logger.error(e)
# User needs to fix config issue before restart is attempted. Set os.EX_OK so that system process
# monitors like Supervisor do not attempt a restart immediately.
sys.exit(os.EX_OK)
exit_code = os.EX_OK

except KeyboardInterrupt:
logger.info("Received keyboard interrupt! Initiating shutdown...")
sys.exit(os.EX_OK)
exit_code = os.EX_OK

except asyncio.exceptions.TimeoutError as e:
logger.error(e)
sys.exit(os.EX_UNAVAILABLE)

except asyncio.exceptions.CancelledError as e:
logger.error(f"Cancelled: connection terminated abnormally! ({e})")
sys.exit(os.EX_UNAVAILABLE)

except Exception as e:
logger.exception(e)
sys.exit(os.EX_UNAVAILABLE)

finally:
await graceful_shutdown(fix_pipeline)
sys.exit(exit_code)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

setup(
name="wtfix",
version="0.15.1",
version="0.15.2",
author="John Cass",
author_email="john.cass77@gmail.com",
description="The Pythonic Financial Information eXchange (FIX) client for humans.",
Expand Down
38 changes: 18 additions & 20 deletions wtfix/apps/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ async def stop(self, *args, **kwargs):

loop = asyncio.get_running_loop()
for cancel_task in cancel_tasks:
if cancel_task.cancelled():
logger.info(f"{self.name}: {cancel_task.get_name()} cancelled!")
continue
if cancel_task.exception() is not None:
loop.call_exception_handler(
{
Expand All @@ -166,27 +169,22 @@ async def heartbeat_monitor(
:timer: The timer to use as reference against the heartbeat interval
:interval_exceeded_response: The response to take if the interval is exceeded. Must be an awaitable.
"""
try:
while not self._server_not_responding.is_set():
# Keep sending heartbeats until the server stops responding.
await asyncio.sleep(
self.seconds_to_next_check(timer)
) # Wait until the next scheduled heartbeat check.

if self.seconds_to_next_check(timer) == 0:
# Heartbeat exceeded, send response
await interval_exceeded_response()

# No response received, force logout!
logger.error(
f"{self.name}: No response received for test request '{self._test_request_id}', "
f"initiating shutdown..."
)
asyncio.create_task(self.pipeline.stop())
while not self._server_not_responding.is_set():
# Keep sending heartbeats until the server stops responding.
await asyncio.sleep(
self.seconds_to_next_check(timer)
) # Wait until the next scheduled heartbeat check.

except asyncio.exceptions.CancelledError:
# Cancellation request received
logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!")
if self.seconds_to_next_check(timer) == 0:
# Heartbeat exceeded, send response
await interval_exceeded_response()

# No response received, force logout!
logger.error(
f"{self.name}: No response received for test request '{self._test_request_id}', "
f"initiating shutdown..."
)
asyncio.create_task(self.pipeline.stop())

async def send_test_request(self):
"""
Expand Down
3 changes: 1 addition & 2 deletions wtfix/apps/api/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ def post(self):
args = self.parser.parse_args()
message = decoders.from_json(args["message"])

loop = asyncio.get_event_loop()
asyncio.ensure_future(self.app.send(message), loop=loop)
asyncio.create_task(self.app.send(message))

return JsonResultResponse(
True,
Expand Down
12 changes: 7 additions & 5 deletions wtfix/apps/brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ async def _send_channel_reader(self):
self.send(message)
) # Pass message on to pipeline

except asyncio.exceptions.CancelledError:
# Cancellation request received - close connections....
logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!")

except aioredis.ChannelClosedError:
# Shutting down...
logger.info(f"{self.name}: Unsubscribed from {send_channel.name}.")
Expand All @@ -79,7 +75,13 @@ async def stop(self, *args, **kwargs):

if self._channel_reader_task is not None:
self._channel_reader_task.cancel()
await self._channel_reader_task
try:
await self._channel_reader_task
except asyncio.exceptions.CancelledError:
# Cancellation request received - close connections....
logger.info(
f"{self.name}: {self._channel_reader_task.get_name()} cancelled!"
)

with await self.redis_pool as conn:
await conn.unsubscribe(self.SEND_CHANNEL)
Expand Down
102 changes: 49 additions & 53 deletions wtfix/apps/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,11 @@ async def stop(self, *args, **kwargs):
logger.info(f"{self.name}: Cancelling listener task...")

self._listener_task.cancel()
await self._listener_task
try:
await self._listener_task
except asyncio.exceptions.CancelledError:
# Cancellation request received - close writer
logger.info(f"{self.name}: {self._listener_task.get_name()} cancelled!")

if self.writer is not None:
logger.info(
Expand All @@ -177,7 +181,6 @@ async def stop(self, *args, **kwargs):
)

self.writer.close()
await self.writer.wait_closed()
logger.info(f"{self.name}: Session closed!")

async def listen(self):
Expand All @@ -194,63 +197,56 @@ async def listen(self):

data = []

try:
while not self.writer.is_closing(): # Listen forever for new messages
try:
# Try to read a complete message.
data = await self.reader.readuntil(
begin_string
) # Detect beginning of message.
# TODO: should there be a timeout for reading an entire message?
data += await self.reader.readuntil(
checksum_start
) # Detect start of checksum field.
data += await self.reader.readuntil(
settings.SOH
) # Detect final message delimiter.

await self.pipeline.receive(data)
data = None

except IncompleteReadError as e:
# Connection was closed before a complete message could be received.
if (
while not self.writer.is_closing(): # Listen forever for new messages
try:
# Try to read a complete message.
data = await self.reader.readuntil(
begin_string
) # Detect beginning of message.
# TODO: should there be a timeout for reading an entire message?
data += await self.reader.readuntil(
checksum_start
) # Detect start of checksum field.
data += await self.reader.readuntil(
settings.SOH
) # Detect final message delimiter.

await self.pipeline.receive(data)
data = None

except IncompleteReadError as e:
# Connection was closed before a complete message could be received.
if (
data
and utils.encode(
f"{connection.protocol.Tag.MsgType}={connection.protocol.MsgType.Logout}"
)
+ settings.SOH
in data
):
await self.pipeline.receive(
data
and utils.encode(
f"{connection.protocol.Tag.MsgType}={connection.protocol.MsgType.Logout}"
)
+ settings.SOH
in data
):
await self.pipeline.receive(
data
) # Process logout message in the pipeline as per normal

raise e

else:
logger.error(
f"{self.name}: Unexpected EOF waiting for next chunk of partial data "
f"'{utils.decode(e.partial)}' ({e})."
)

raise e

except LimitOverrunError as e:
# Buffer limit reached before a complete message could be read - abort!
) # Process logout message in the pipeline as per normal

else:
logger.error(
f"{self.name}: Stream reader buffer limit exceeded! ({e})."
f"{self.name}: Unexpected EOF waiting for next chunk of partial data "
f"'{utils.decode(e.partial)}' ({e})."
)

raise e
# Stop listening for messages
asyncio.create_task(self.pipeline.stop())
break

except asyncio.exceptions.CancelledError:
# Cancellation request received - close writer
logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!")
except LimitOverrunError as e:
# Buffer limit reached before a complete message could be read - abort!
logger.error(
f"{self.name}: Stream reader buffer limit exceeded! ({e})."
)

except Exception as e:
logger.error(f"{self.name}: Unexpected error {e}. Initiating shutdown...")
asyncio.create_task(self.pipeline.stop())
# Stop listening for messages
asyncio.create_task(self.pipeline.stop())
break

async def on_send(self, message):
"""
Expand Down

0 comments on commit 730e983

Please sign in to comment.