Skip to content

Commit

Permalink
Release v0.15.3
Browse files Browse the repository at this point in the history
  • Loading branch information
jcass77 committed Aug 11, 2020
2 parents 730e983 + 7c69750 commit 96b14ef
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 144 deletions.
12 changes: 12 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@
This changelog is used to track all major changes to WTFIX.


## v0.15.3 (2020-08-11)

**Fixes**

- Avoid `AttributeError` when a `ConnectionError` occurs in the `client_session` app.
- Refactor task cancellation: client should take responsibility for final task cancellation / cleanup instead of the
pipeline. This ensures that the client itself is not also cancelled as part of a pipeline shutdown.
- Only call `super().stop()` after an app has completed all of its own shutdown routines.
- Don't allow misbehaving apps from interrupting a pipeline shutdown. This ensures that the pipeline can always be
shut down, and all outstanding asyncio tasks cancelled, even if one or more apps raise and exceptions while stopping.


## v0.15.2 (2020-08-05)

**Fixes**
Expand Down
18 changes: 18 additions & 0 deletions run_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,24 @@ async def main():

finally:
await graceful_shutdown(fix_pipeline)

# Report tasks that are still running after shutdown.
tasks = [
task
for task in asyncio.all_tasks()
if task is not asyncio.current_task() and not task.cancelled()
]

if tasks:
task_output = "\n".join(str(task) for task in tasks)
logger.warning(
f"There are still {len(tasks)} tasks running that have not been cancelled! Cancelling them now...\n"
f"{task_output}."
)

for task in tasks:
task.cancel()

sys.exit(exit_code)


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

setup(
name="wtfix",
version="0.15.2",
version="0.15.3",
author="John Cass",
author_email="john.cass77@gmail.com",
description="The Pythonic Financial Information eXchange (FIX) client for humans.",
Expand Down
48 changes: 30 additions & 18 deletions wtfix/apps/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ async def stop(self, *args, **kwargs):
"""
Cancel the heartbeat monitor on the next iteration of the event loop.
"""
await super().stop(*args, **kwargs)
# Stop heartbeat monitors
cancel_tasks = [
task
Expand All @@ -149,7 +148,6 @@ async def stop(self, *args, **kwargs):
loop = asyncio.get_running_loop()
for cancel_task in cancel_tasks:
if cancel_task.cancelled():
logger.info(f"{self.name}: {cancel_task.get_name()} cancelled!")
continue
if cancel_task.exception() is not None:
loop.call_exception_handler(
Expand All @@ -160,6 +158,8 @@ async def stop(self, *args, **kwargs):
}
)

await super().stop(*args, **kwargs)

async def heartbeat_monitor(
self, timer: HeartbeatTimers, interval_exceeded_response: Callable
):
Expand All @@ -169,22 +169,34 @@ async def heartbeat_monitor(
:timer: The timer to use as reference against the heartbeat interval
:interval_exceeded_response: The response to take if the interval is exceeded. Must be an awaitable.
"""
while not self._server_not_responding.is_set():
# Keep sending heartbeats until the server stops responding.
await asyncio.sleep(
self.seconds_to_next_check(timer)
) # Wait until the next scheduled heartbeat check.
try:
while not self._server_not_responding.is_set():
# Keep sending heartbeats until the server stops responding.
await asyncio.sleep(
self.seconds_to_next_check(timer)
) # Wait until the next scheduled heartbeat check.

if self.seconds_to_next_check(timer) == 0:
# Heartbeat exceeded, send response
await interval_exceeded_response()

# No response received, force logout!
logger.error(
f"{self.name}: No response received for test request '{self._test_request_id}', "
f"initiating shutdown..."
)
asyncio.create_task(self.pipeline.stop())

if self.seconds_to_next_check(timer) == 0:
# Heartbeat exceeded, send response
await interval_exceeded_response()
except asyncio.exceptions.CancelledError:
logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!")

# No response received, force logout!
logger.error(
f"{self.name}: No response received for test request '{self._test_request_id}', "
f"initiating shutdown..."
)
asyncio.create_task(self.pipeline.stop())
except Exception:
# Stop monitoring heartbeat
logger.exception(
f"{self.name}: Unhandled exception while monitoring heartbeat! Shutting down pipeline..."
)
asyncio.create_task(self.pipeline.stop())
raise

async def send_test_request(self):
"""
Expand Down Expand Up @@ -330,10 +342,10 @@ async def start(self, *args, **kwargs):
await self.logon()

async def stop(self, *args, **kwargs):
await super().stop(*args, **kwargs)

await self.logout()

await super().stop(*args, **kwargs)

@on(connection.protocol.MsgType.Logon)
async def on_logon(self, message):
"""
Expand Down
4 changes: 2 additions & 2 deletions wtfix/apps/api/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ def _run_flask_dev_server(self, flask_app):
) # debug=False: disable automatic restarting of the Flask server

async def stop(self, *args, **kwargs):
await super().stop(*args, **kwargs)

result = requests.post(
"http://127.0.0.1:5000/shutdown", data={"token": self.secret_key}
)

if self._flask_process is not None:
self._flask_process.kill()

await super().stop(*args, **kwargs)

return result
4 changes: 2 additions & 2 deletions wtfix/apps/brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ async def start(self, *args, **kwargs):
)

async def stop(self, *args, **kwargs):
await super().stop(*args, **kwargs)

if self._channel_reader_task is not None:
self._channel_reader_task.cancel()
try:
Expand All @@ -88,3 +86,5 @@ async def stop(self, *args, **kwargs):

self.redis_pool.close()
await self.redis_pool.wait_closed() # Closing all open connections

await super().stop(*args, **kwargs)
130 changes: 66 additions & 64 deletions wtfix/apps/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import asyncio
import os
import uuid
from asyncio import IncompleteReadError, LimitOverrunError
from asyncio import IncompleteReadError
from pathlib import Path

from wtfix.apps.base import BaseApp
Expand Down Expand Up @@ -162,18 +162,6 @@ async def stop(self, *args, **kwargs):
"""
Close the writer.
"""
await super().stop(*args, **kwargs)

if self._listener_task is not None:
logger.info(f"{self.name}: Cancelling listener task...")

self._listener_task.cancel()
try:
await self._listener_task
except asyncio.exceptions.CancelledError:
# Cancellation request received - close writer
logger.info(f"{self.name}: {self._listener_task.get_name()} cancelled!")

if self.writer is not None:
logger.info(
f"{self.name}: Initiating disconnect from "
Expand All @@ -183,6 +171,14 @@ async def stop(self, *args, **kwargs):
self.writer.close()
logger.info(f"{self.name}: Session closed!")

if self._listener_task is not None:
logger.info(f"{self.name}: Cancelling listener task...")

self._listener_task.cancel()
await self._listener_task

await super().stop(*args, **kwargs)

async def listen(self):
"""
Listen for new messages that are sent by the server.
Expand All @@ -197,64 +193,70 @@ async def listen(self):

data = []

while not self.writer.is_closing(): # Listen forever for new messages
try:
# Try to read a complete message.
data = await self.reader.readuntil(
begin_string
) # Detect beginning of message.
# TODO: should there be a timeout for reading an entire message?
data += await self.reader.readuntil(
checksum_start
) # Detect start of checksum field.
data += await self.reader.readuntil(
settings.SOH
) # Detect final message delimiter.

await self.pipeline.receive(data)
data = None

except IncompleteReadError as e:
# Connection was closed before a complete message could be received.
if (
data
and utils.encode(
f"{connection.protocol.Tag.MsgType}={connection.protocol.MsgType.Logout}"
)
+ settings.SOH
in data
):
await self.pipeline.receive(
try:
while not self.writer.is_closing(): # Listen forever for new messages
try:
# Try to read a complete message.
data = await self.reader.readuntil(
begin_string
) # Detect beginning of message.
# TODO: should there be a timeout for reading an entire message?
data += await self.reader.readuntil(
checksum_start
) # Detect start of checksum field.
data += await self.reader.readuntil(
settings.SOH
) # Detect final message delimiter.

await self.pipeline.receive(data)
data = None

except IncompleteReadError:
if (
data
) # Process logout message in the pipeline as per normal

else:
logger.error(
f"{self.name}: Unexpected EOF waiting for next chunk of partial data "
f"'{utils.decode(e.partial)}' ({e})."
)

# Stop listening for messages
asyncio.create_task(self.pipeline.stop())
break

except LimitOverrunError as e:
# Buffer limit reached before a complete message could be read - abort!
logger.error(
f"{self.name}: Stream reader buffer limit exceeded! ({e})."
)

# Stop listening for messages
asyncio.create_task(self.pipeline.stop())
break
and utils.encode(
f"{connection.protocol.Tag.MsgType}={connection.protocol.MsgType.Logout}"
)
+ settings.SOH
in data
):
# Connection was closed before a complete message could be received.
await self.pipeline.receive(
data
) # Process logout message in the pipeline as per normal
break

else:
# Something else went wrong, re-raise
raise

except asyncio.exceptions.CancelledError:
logger.info(f"{self.name}: {asyncio.current_task().get_name()} cancelled!")

except Exception:
# Stop monitoring heartbeat
logger.exception(
f"{self.name}: Unhandled exception while listening for messages! Shutting down pipeline..."
)
asyncio.create_task(self.pipeline.stop())
raise

async def on_send(self, message):
"""
Writes an encoded message to the StreamWriter.
:param message: A valid, encoded, FIX message.
"""
self.writer.write(message)
await self.writer.drain()
try:
self.writer.write(message)
await self.writer.drain()
except AttributeError:
# Ignore send failures if pipeline is already shutting down.
if not self.writer and self.pipeline.stopping_event.is_set():
logger.warning(
f"{self.name}: No connection established, cannot send message {message}."
)
else:
raise

del message # Encourage garbage collection of message once it has been sent
26 changes: 6 additions & 20 deletions wtfix/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,27 +150,14 @@ async def stop(self):
except asyncio.exceptions.TimeoutError:
logger.error(f"Timeout waiting for app '{app}' to stop!")
continue # Continue trying to stop next app.
except Exception:
# Don't allow misbehaving apps to interrupt pipeline shutdown.
logger.exception(f"Error trying to stop app '{app}'.")
continue

self.stopped_event.set()
logger.info("Pipeline stopped.")

# Report tasks that are still running after shutdown.
tasks = [
task
for task in asyncio.all_tasks()
if task is not asyncio.current_task() and not task.cancelled()
]

if tasks:
task_output = "\n".join(str(task) for task in tasks)
logger.warning(
f"There are still {len(tasks)} tasks running that have not been cancelled! Cancelling them now...\n"
f"{task_output}."
)

for task in tasks:
task.cancel()

def _setup_message_handling(self, direction):
if direction is self.INBOUND_PROCESSING:
return "on_receive", reversed(self.apps.values())
Expand Down Expand Up @@ -215,6 +202,7 @@ async def _process_message(self, message, direction):
except Exception as e:
if (
isinstance(e, ConnectionError)
and hasattr(message, "type")
and message.type == connection.protocol.MsgType.Logout
and self.stopping_event.is_set()
):
Expand All @@ -231,9 +219,7 @@ async def _process_message(self, message, direction):
logger.exception(
f"Unhandled exception while doing {method_name}: {e} ({message})."
)
await asyncio.wait_for(
self.stop(), None
) # Block while we try to stop the pipeline
await self.stop() # Block while we try to stop the pipeline
raise e

return message
Expand Down

0 comments on commit 96b14ef

Please sign in to comment.