Skip to content

Commit

Permalink
Release v0.14.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jcass77 committed Nov 15, 2019
2 parents 5d04d47 + 2ceab9a commit 7086148
Show file tree
Hide file tree
Showing 29 changed files with 732 additions and 630 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ a Redis Pub/Sub channel for immediate delivery.

class SecretAlgoTradingRecipe(MessageTypeHandlerApp):

@on(settings.protocol.MsgType.Logon) # Only invoked when 'Logon (type A)' messages are received.
@on(context.protocol.MsgType.Logon) # Only invoked when 'Logon (type A)' messages are received.
def on_logon(self, message):
self.send_security_definition_request()
return message
Expand Down Expand Up @@ -100,7 +100,7 @@ used message attributes.
>>> logon_msg[108] # Using old school tag number
Field(108, '30')

>>> logon_msg[settings.protocol.Tag.HeartBtInt] # Using the tag name as per the FIX specification
>>> logon_msg[context.protocol.Tag.HeartBtInt] # Using the tag name as per the FIX specification
Field(108, '30')

>>> logon_msg.HeartBtInt # Using tag name shortcut
Expand Down
9 changes: 9 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

This changelog is used to track all major changes to WTFIX.


## v0.14.0 (2019-11-15)

**Enhancements**

- Fix Python 3.8 compatibility issues.
- Switch to using a context manager for managing the active FIX connection / protocol.


## v0.13.0 (2019-11-14)

**Enhancements**
Expand Down
90 changes: 44 additions & 46 deletions run_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,16 @@
from wtfix.conf import settings
from wtfix.core.exceptions import ImproperlyConfigured
from wtfix.pipeline import BasePipeline
from wtfix.protocol.contextlib import connection_manager

logger = settings.logger

parser = argparse.ArgumentParser(description="Start a FIX connection")

try:
# If only one connection has been configured then we have a safe default to fall back to.
default_connection_name = settings.default_connection_name
except ImproperlyConfigured:
default_connection_name = None

parser.add_argument(
"--connection",
default=default_connection_name,
help=f"the configuration settings to use for the connection (default: '{default_connection_name}')",
default="default",
help="the configuration settings to use for the connection (default: 'default')",
)

parser.add_argument(
Expand Down Expand Up @@ -69,51 +64,54 @@ async def main():

args = parser.parse_args()

fix_pipeline = BasePipeline(
connection_name=args.connection, new_session=args.new_session
)

try:
# Graceful shutdown on termination signals.
# See: https://docs.python.org/3.7/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm
loop = asyncio.get_running_loop()
for sig_name in {"SIGINT", "SIGTERM"}:
loop.add_signal_handler(
getattr(signal, sig_name),
lambda: asyncio.ensure_future(
graceful_shutdown(sig_name, fix_pipeline)
),
)

await fix_pipeline.start()

except futures.TimeoutError as e:
logger.error(e)
sys.exit(os.EX_UNAVAILABLE)

except KeyboardInterrupt:
logger.info("Received keyboard interrupt! Initiating shutdown...")
sys.exit(os.EX_OK)

except futures.CancelledError as e:
logger.error(f"Cancelled: connection terminated abnormally! ({e})")
sys.exit(os.EX_UNAVAILABLE)
with connection_manager(args.connection) as conn:
fix_pipeline = BasePipeline(
connection_name=conn.name, new_session=args.new_session
)

except ImproperlyConfigured as e:
logger.error(e)
sys.exit(os.EX_OK) # User needs to fix config issue before restart is attempted
try:
# Graceful shutdown on termination signals.
# See: https://docs.python.org/3.7/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm
loop = asyncio.get_running_loop()
for sig_name in {"SIGINT", "SIGTERM"}:
loop.add_signal_handler(
getattr(signal, sig_name),
lambda: asyncio.ensure_future(
graceful_shutdown(sig_name, fix_pipeline)
),
)

await fix_pipeline.start()

except asyncio.TimeoutError as e:
logger.error(e)
sys.exit(os.EX_UNAVAILABLE)

except Exception as e:
logger.exception(e)
sys.exit(os.EX_UNAVAILABLE)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt! Initiating shutdown...")
sys.exit(os.EX_OK)

finally:
try:
await fix_pipeline.stop()
except futures.CancelledError as e:
logger.error(f"Cancelled: connection terminated abnormally! ({e})")
sys.exit(os.EX_UNAVAILABLE)

except ImproperlyConfigured as e:
logger.error(e)
sys.exit(
os.EX_OK
) # User needs to fix config issue before restart is attempted

except Exception as e:
logger.exception(e)
sys.exit(os.EX_UNAVAILABLE)

finally:
try:
await fix_pipeline.stop()
except futures.CancelledError as e:
logger.error(f"Cancelled: connection terminated abnormally! ({e})")
sys.exit(os.EX_UNAVAILABLE)


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

setup(
name="wtfix",
version="0.13.0",
version="0.14.0",
author="John Cass",
author_email="john.cass77@gmail.com",
description="The Pythonic Financial Information eXchange (FIX) client for humans.",
Expand Down
27 changes: 14 additions & 13 deletions wtfix/apps/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from wtfix.message import admin
from wtfix.core import utils
from wtfix.message.message import FIXMessage
from wtfix.protocol.contextlib import connection

logger = settings.logger

Expand Down Expand Up @@ -196,7 +197,7 @@ async def send_heartbeat(self):
self.send(admin.HeartbeatMessage())
) # Don't need to block while heartbeat is sent

@on(settings.protocol.MsgType.Logon)
@on(connection.protocol.MsgType.Logon)
async def on_logon(self, message: FIXMessage) -> FIXMessage:
"""
Start the heartbeat monitor as soon as a logon response is received from the server.
Expand All @@ -208,7 +209,7 @@ async def on_logon(self, message: FIXMessage) -> FIXMessage:

return message

@on(settings.protocol.MsgType.TestRequest)
@on(connection.protocol.MsgType.TestRequest)
async def on_test_request(self, message: FIXMessage) -> FIXMessage:
"""
Send a HeartBeat message in response to a TestRequest received from the server.
Expand All @@ -223,7 +224,7 @@ async def on_test_request(self, message: FIXMessage) -> FIXMessage:

return message

@on(settings.protocol.MsgType.Heartbeat)
@on(connection.protocol.MsgType.Heartbeat)
async def on_heartbeat(self, message: FIXMessage) -> FIXMessage:
"""
Handle a TestRequest response from the server.
Expand Down Expand Up @@ -311,7 +312,7 @@ async def stop(self, *args, **kwargs):

await self.logout()

@on(settings.protocol.MsgType.Logon)
@on(connection.protocol.MsgType.Logon)
async def on_logon(self, message):
"""
Confirms all of the session parameters that we sent when logging on.
Expand Down Expand Up @@ -351,7 +352,7 @@ async def on_logon(self, message):

return message

@on(settings.protocol.MsgType.Logout)
@on(connection.protocol.MsgType.Logout)
async def on_logout(self, message):
self.logged_out_event.set() # FIX server has logged us out.

Expand Down Expand Up @@ -438,12 +439,12 @@ class SeqNumManagerApp(MessageTypeHandlerApp):
name = "seq_num_manager"

ADMIN_MESSAGES = [
settings.protocol.MsgType.Logon,
settings.protocol.MsgType.Logout,
settings.protocol.MsgType.ResendRequest,
settings.protocol.MsgType.Heartbeat,
settings.protocol.MsgType.TestRequest,
settings.protocol.MsgType.SequenceReset,
connection.protocol.MsgType.Logon,
connection.protocol.MsgType.Logout,
connection.protocol.MsgType.ResendRequest,
connection.protocol.MsgType.Heartbeat,
connection.protocol.MsgType.TestRequest,
connection.protocol.MsgType.SequenceReset,
]

# How long to wait (in seconds) for resend requests from target before sending our own resend requests.
Expand Down Expand Up @@ -531,7 +532,7 @@ async def _check_sequence_number(self, message):

else:
# Message received in correct order.
if message.type == settings.protocol.MsgType.SequenceReset:
if message.type == connection.protocol.MsgType.SequenceReset:
# Special handling for SequenceReset admin message
message = self._handle_sequence_reset(message)
else:
Expand Down Expand Up @@ -762,7 +763,7 @@ async def on_receive(self, message: FIXMessage) -> FIXMessage:
Check the sequence number for every message received
"""
# Special handling for ResendRequest admin message: should be responded to even if received out of order
if message.type == settings.protocol.MsgType.ResendRequest:
if message.type == connection.protocol.MsgType.ResendRequest:
message = await self._handle_resend_request(
message
) # Handle resend request immediately.
Expand Down
7 changes: 4 additions & 3 deletions wtfix/apps/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from wtfix.apps.base import BaseApp
from wtfix.conf import settings
from wtfix.core import utils
from wtfix.protocol.contextlib import connection


logger = settings.logger
Expand Down Expand Up @@ -170,11 +171,11 @@ async def listen(self):
Listen for new messages that are sent by the server.
"""
begin_string = utils.encode(
f"{settings.protocol.Tag.BeginString}="
f"{connection.protocol.Tag.BeginString}="
) + utils.encode(settings.BEGIN_STRING)

checksum_start = settings.SOH + utils.encode(
f"{settings.protocol.Tag.CheckSum}="
f"{connection.protocol.Tag.CheckSum}="
)

data = []
Expand All @@ -200,7 +201,7 @@ async def listen(self):
# Connection was closed before a complete message could be received.
if (
utils.encode(
f"{settings.protocol.Tag.MsgType}={settings.protocol.MsgType.Logout}"
f"{connection.protocol.Tag.MsgType}={connection.protocol.MsgType.Logout}"
)
+ settings.SOH
in data
Expand Down
70 changes: 39 additions & 31 deletions wtfix/apps/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
from wtfix.apps.parsers import RawMessageParserApp
from wtfix.apps.sessions import ClientSessionApp
from wtfix.apps.wire import EncoderApp, DecoderApp
from wtfix.conf import settings
from wtfix.conf import settings, ConnectionSettings
from wtfix.message.admin import HeartbeatMessage
from wtfix.message.message import generic_message_factory
from wtfix.pipeline import BasePipeline
from wtfix.protocol.contextlib import connection, connection_manager


@asyncio.coroutine
Expand All @@ -29,33 +30,34 @@ def base_pipeline():
:return: A pipeline mock with a client session initialized.
"""
pipeline = MagicMock(BasePipeline)
pipeline.settings = settings.default_connection
with connection_manager() as conn:
pipeline = MagicMock(BasePipeline)
pipeline.settings = ConnectionSettings(conn.name)

client_session = ClientSessionApp(pipeline, new_session=True)
client_session.sender = settings.default_connection.SENDER
client_session.target = settings.default_connection.TARGET
client_session = ClientSessionApp(pipeline, new_session=True)
client_session.sender = pipeline.settings.SENDER
client_session.target = pipeline.settings.TARGET

pipeline.apps = {ClientSessionApp.name: client_session}
pipeline.apps = {ClientSessionApp.name: client_session}

# Mock a future message that will allow us to await pipeline.send and pipeline.receive.
# Only useful in situations where we are not interested in the actual message result :(
mock_future_message = MagicMock(return_value=Future())
mock_future_message.return_value.set_result({})
# Mock a future message that will allow us to await pipeline.send and pipeline.receive.
# Only useful in situations where we are not interested in the actual message result :(
mock_future_message = MagicMock(return_value=Future())
mock_future_message.return_value.set_result({})

pipeline.send = mock_future_message
pipeline.receive = mock_future_message
pipeline.send = mock_future_message
pipeline.receive = mock_future_message

# Simulate the pipeline shutting down
pipeline.stop = MagicMock(return_value=mock_stop())
# Simulate the pipeline shutting down
pipeline.stop = MagicMock(return_value=mock_stop())

yield pipeline
yield pipeline

try:
os.remove(client_session._sid_path)
except FileNotFoundError:
# File does not exist - skip deletion
pass
try:
os.remove(client_session._sid_path)
except FileNotFoundError:
# File does not exist - skip deletion
pass


@pytest.fixture
Expand Down Expand Up @@ -119,16 +121,22 @@ def user_notification_message():
faker = Faker()

return generic_message_factory(
(settings.protocol.Tag.MsgType, settings.protocol.MsgType.UserNotification),
(settings.protocol.Tag.MsgSeqNum, 1),
(settings.protocol.Tag.SenderCompID, settings.default_connection.SENDER),
(settings.protocol.Tag.SendingTime, "20181206-10:24:27.018"),
(settings.protocol.Tag.TargetCompID, settings.default_connection.TARGET),
(settings.protocol.Tag.NoLinesOfText, 1),
(settings.protocol.Tag.Text, "abc"),
(settings.protocol.Tag.EmailType, 0),
(settings.protocol.Tag.Subject, "Test message"),
(settings.protocol.Tag.EmailThreadID, faker.pyint()),
(connection.protocol.Tag.MsgType, connection.protocol.MsgType.UserNotification),
(connection.protocol.Tag.MsgSeqNum, 1),
(
connection.protocol.Tag.SenderCompID,
settings.CONNECTIONS[connection.name]["SENDER"],
),
(connection.protocol.Tag.SendingTime, "20181206-10:24:27.018"),
(
connection.protocol.Tag.TargetCompID,
settings.CONNECTIONS[connection.name]["TARGET"],
),
(connection.protocol.Tag.NoLinesOfText, 1),
(connection.protocol.Tag.Text, "abc"),
(connection.protocol.Tag.EmailType, 0),
(connection.protocol.Tag.Subject, "Test message"),
(connection.protocol.Tag.EmailThreadID, faker.pyint()),
)


Expand Down

0 comments on commit 7086148

Please sign in to comment.