Skip to content

Commit

Permalink
Added exception module
Browse files Browse the repository at this point in the history
Added logging to errors
  • Loading branch information
jamespeterschinner committed Dec 31, 2017
1 parent b40f2a3 commit 7aefda8
Show file tree
Hide file tree
Showing 21 changed files with 355 additions and 207 deletions.
6 changes: 5 additions & 1 deletion async_v20/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import logging

from async_v20.client import OandaClient
from async_v20.client import __version__
from async_v20.definitions import *
from async_v20.endpoints.annotations import *
from async_v20.client import __version__

logging.getLogger(__name__).addHandler(logging.NullHandler())

__version__ = __version__
125 changes: 84 additions & 41 deletions async_v20/client.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import asyncio
import logging
import os
import ujson as json
import warnings
from functools import partial
from time import time

import aiohttp
from aiohttp.client_exceptions import ClientConnectionError
from yarl import URL

from .definitions.types import AcceptDatetimeFormat
from .definitions.types import AccountID
from .definitions.types import ArrayTransaction
from .endpoints.annotations import Authorization, SinceTransactionID, LastTransactionID
from .exceptions import InitializationFailure, ResponseTimeout, CloseAllTradesFailure
from .interface import *

logger = logging.getLogger(__name__)


async def sleep(s=0.0):
await asyncio.sleep(s)
Expand All @@ -40,14 +43,22 @@ class OandaClient(AccountInterface, InstrumentInterface, OrderInterface, Positio
rest_port: -- The port of the v20 REST server
stream_host: -- The hostname of the v20 REST server
stream_port: -- The port of the v20 REST server
rest_scheme: -- The scheme of the connection. Defaults to 'https'
stream_scheme: -- The scheme of the connection. Defaults to 'https'
rest_scheme: -- The scheme of the connection to rest server.
Defaults to 'https'
stream_scheme: -- The scheme of the connection to the stream server.
Defaults to 'https'
health_host: -- The hostname of the health API server
health_port: -- The port of the health server
health_scheme: -- The scheme of the connection for the health server.
Defaults to 'http'
datetime_format: -- The format to request when dealing with times
rest_timeout: -- The timeout to use when making a polling request with
the v20 REST server
stream_timeout: -- period to wait for an new json object during streaming
max_requests_per_second: -- Maximum HTTP requests sent per second
max_simultaneous_connections: -- Maximum concurrent HTTP requests
debug: -- Default=False. Set to True to log debug messages.
Disabled by default to reduce overhead
"""
headers = {'Content-Type': 'application/json', 'Connection': 'keep-alive',
Expand All @@ -59,7 +70,7 @@ class OandaClient(AccountInterface, InstrumentInterface, OrderInterface, Positio

initializing = False

expected_step = None # The first step to be called during initialization
_initialization_step = None # The first step to be called during initialization

initialization_sleep = 0.5 # Time to poll initialized when waiting for initialization

Expand All @@ -71,6 +82,8 @@ class OandaClient(AccountInterface, InstrumentInterface, OrderInterface, Positio

session = None # http session will be created during initialization

_rest_timeout = None # seconds

@property
def max_requests_per_second(self):
return self._max_requests_per_second
Expand All @@ -94,16 +107,26 @@ def max_simultaneous_connections(self, value):
def datetime_format(self):
return self._datetime_format

def __init__(self, token=None, account_id=None, format_order_requests=False,
def __init__(self,
token=None,
account_id=None,
format_order_requests=False,
max_transaction_history=100,
rest_host='api-fxpractice.oanda.com', rest_port=443,
rest_scheme='https', stream_host='stream-fxpractice.oanda.com', stream_port=None,
stream_scheme='https', health_host='api-status.oanda.com', health_port=80, health_scheme='http',
datetime_format='UNIX', rest_timeout=10, stream_timeout=60,
max_requests_per_second=99, max_simultaneous_connections=10, loop=None):

if loop is None:
self._loop = asyncio.get_event_loop()
rest_host='api-fxpractice.oanda.com',
rest_port=443,
rest_scheme='https',
stream_host='stream-fxpractice.oanda.com',
stream_port=None,
stream_scheme='https',
health_host='api-status.oanda.com',
health_port=80,
health_scheme='http',
datetime_format='UNIX',
rest_timeout=10,
stream_timeout=60,
max_requests_per_second=99,
max_simultaneous_connections=10,
debug=False):

self.version = __version__

Expand Down Expand Up @@ -147,13 +170,16 @@ def __init__(self, token=None, account_id=None, format_order_requests=False,
AcceptDatetimeFormat: datetime_format}
)

self.debug = debug

async def account(self):
"""Get updated account
Returns:
:class:`~async_v20.definitions.types.Account`
"""
logger.info('account()')
await self.account_changes()
return self._account

Expand All @@ -171,22 +197,28 @@ async def close_all_trades(self):
# - attempt to close all open trades
# - get all open trades again and check there there are None
# - return close trade responses and successful/unsuccessful

logger.info('close_all_trades()')
all_trades_closed = False
response = await self.list_open_trades()
if response:
close_trade_responses = await asyncio.gather(*[self.close_trade(trade.id)
for trade in response.trades])
else:
raise ConnectionError(f'Could not get open trades. '
f'Server returned status {response.status}')
msg = f'Could not get open trades. ' \
f'Server returned status {response.status}'
logger.error(msg)
raise CloseAllTradesFailure(msg)
# After closing all trades check that all trades have indeed been closed
response = await self.list_open_trades()
if response:
if len(response.trades) == 0:
all_trades_closed = True
else:
raise ConnectionError(f'Unable to confirm all trades have been closed! '
f'Server returned status {response.status}')
msg = f'Unable to confirm all trades have been closed! ' \
f'Server returned status {response.status}'
logger.error(msg)
raise CloseAllTradesFailure(msg)

return all_trades_closed, close_trade_responses

Expand All @@ -199,7 +231,10 @@ async def _request_limiter(self):
return

if self._next_request_time - time() > 0:
await sleep(self._next_request_time - time())
wait_time = self._next_request_time - time()
if self.debug:
logger.debug('Request waiting for %s seconds', wait_time)
await sleep(wait_time)
return

async def __aenter__(self):
Expand All @@ -210,7 +245,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
self.close()

def __enter__(self):
print('Warning: <with> used rather than <async with>')
logger.warning('<with> used rather than <async with>')
return self

def __exit__(self, exc_type, exc_val, exc_tb):
Expand All @@ -225,16 +260,18 @@ def close(self):

async def initialize_session(self):
# Create http session this client will use to sent all requests
logger.info('Initializing session')
conn = aiohttp.TCPConnector(limit=self.max_simultaneous_connections)

self.session = aiohttp.ClientSession(
json_serialize=json.dumps,
headers=self.headers,
connector=conn,
read_timeout=self.rest_timeout
read_timeout=0 # async_v20 will handle timeouts to allow dynamic changing of timeout.
# after client initialization
)

async def initialize(self, initialization_step=False):
async def initialize(self, initialization_method=False):
"""Initialize client instance
Args:
Expand All @@ -243,7 +280,7 @@ async def initialize(self, initialization_step=False):
Returns: True when complete
"""
if self.initialized or self.expected_step == initialization_step:
if self.initialized or self._initialization_step == initialization_method:
# Do not initialize or wait for initialization to complete.
# If it did, due to circular logic, initialization would never
# complete.
Expand All @@ -256,19 +293,22 @@ async def initialize(self, initialization_step=False):
await sleep(self.initialization_sleep)

else: # If it gets this far. An initialization if required.

msg = '' # msg is used to create a useful Error msg if Initialization fails
try:
logger.info('Initializing client')
self.initializing = True # immediately set initializing to make sure
# Upcoming requests wait for this initialization to complete.

self._initialization_step = self.list_services.__name__
response = await self.list_services()
if response:
for service in response.services:
if service.current_event.status.name != 'Up':
warnings.warn(f'{service.name} {service.current_event.message}')
else:
print(response)
warnings.warn(f'Server did not return available services')

logging.warning('Server did not return available services')
print(response.json())
if not self.session:
await self.initialize_session()

Expand All @@ -278,58 +318,61 @@ async def initialize(self, initialization_step=False):

if self.account_id: # Allow manual assignment of AccountID
self.default_parameters.update({AccountID: self.account_id})
self.account_id = self.account_id

else: # Get the corresponding AccountID for the provided token

self.expected_step = 1 # Setting this prevents the request from
# waiting for initialization to complete.

self._initialization_step = self.list_accounts.__name__
response = await self.list_accounts()
if response: # Checks is the response status was the expected status as
# defined by OANDA spec.
self.default_parameters.update({AccountID: response['accounts'][0].id})
else:
self.initializing = False
raise ConnectionError(f'Server did not return AccountID during '
f'initialization. {response} {response.dict()}')
msg = f'Server did not return AccountID during initialization'
raise InitializationFailure()

# Get Account snapshot and last transaction id
# last transaction is automatically updated when the
# response is parsed

self.expected_step = 2
self._initialization_step = self.get_account_details.__name__
response = await self.get_account_details()
if response:
self._account = response['account']
else:
self.initializing = False
raise ConnectionError(f'Server did not return Account Details during '
f'initialization. {response} {response.dict()}')
msg = f'Server did not return Account Details during initialization.'
raise InitializationFailure()

self.expected_step = 3
self._initialization_step = self.account_instruments.__name__
response = await self.account_instruments()
if response:
self.instruments = response['instruments']
else:
self.initializing = False
raise ConnectionError(f'Server did not return Account Instruments during '
f'initialization. {response} {response.dict()}')
msg = f'Server did not return Account Instruments during initialization'
raise InitializationFailure()

# On initialization the SinceTransactionID needs updated to reflect LastTransactionID
self.default_parameters.update({SinceTransactionID: self.default_parameters[LastTransactionID]})

self.initializing = False
self.initialized = True

except TimeoutError:
except ResponseTimeout:
self.initializing = False
self.initialized = False
raise TimeoutError(f'Initialization step {self.expected_step} '
f'took longer than {self.rest_timeout} seconds')
except (ConnectionError, ClientConnectionError) as e:
msg = f'Initialization step {self._initialization_step} ' \
f'took longer than {self.rest_timeout} seconds'
logger.exception(msg)
raise InitializationFailure(msg)

except InitializationFailure:
self.initializing = False
self.initialized = False
raise ConnectionError(e)
logging.exception(msg)
raise InitializationFailure(msg)

# Always return True when initialization has complete
return True

0 comments on commit 7aefda8

Please sign in to comment.