Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into remove_default_emp…
Browse files Browse the repository at this point in the history
…ty_list
  • Loading branch information
azriel1rf committed Oct 29, 2021
2 parents c64f82e + 253bba4 commit f03543c
Show file tree
Hide file tree
Showing 6 changed files with 332 additions and 4 deletions.
2 changes: 2 additions & 0 deletions pybotters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .models import experimental
from .models.binance import BinanceDataStore
from .models.bitbank import bitbankDataStore
from .models.bitflyer import bitFlyerDataStore
from .models.bitmex import BitMEXDataStore
from .models.bybit import BybitDataStore
from .models.ftx import FTXDataStore
Expand All @@ -28,6 +29,7 @@
'FTXDataStore',
'BinanceDataStore',
'bitbankDataStore',
'bitFlyerDataStore',
'BitMEXDataStore',
'GMOCoinDataStore',
'experimental',
Expand Down
280 changes: 280 additions & 0 deletions pybotters/models/bitflyer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
import asyncio
import logging
import operator
from decimal import Decimal
from typing import Awaitable, Dict, List

import aiohttp

from ..store import DataStore, DataStoreManager
from ..typedefs import Item
from ..ws import ClientWebSocketResponse

logger = logging.getLogger(__name__)


class bitFlyerDataStore(DataStoreManager):
def _init(self) -> None:
self.create('board', datastore_class=Board)
self.create('ticker', datastore_class=Ticker)
self.create('executions', datastore_class=Executions)
self.create('childorderevents', datastore_class=ChildOrderEvents)
self.create('childorders', datastore_class=ChildOrders)
self.create('parentorderevents', datastore_class=ParentOrderEvents)
self.create('parentorders', datastore_class=ParentOrders)
self.create('positions', datastore_class=Positions)
self._snapshots = set()

async def initialize(self, *aws: Awaitable[aiohttp.ClientResponse]) -> None:
for f in asyncio.as_completed(aws):
resp = await f
data = await resp.json()
if resp.url.path == '/v1/me/getchildorders':
self.childorders._onresponse(data)
elif resp.url.path == '/v1/me/getparentorders':
self.parentorders._onresponse(data)
elif resp.url.path == '/v1/me/getpositions':
self.positions._onresponse(data)

def _onmessage(self, msg: Item, ws: ClientWebSocketResponse) -> None:
if 'error' in msg:
logger.warning(msg)
if 'params' in msg:
channel: str = msg['params']['channel']
message = msg['params']['message']
if channel.startswith('lightning_board_'):
if channel.startswith('lightning_board_snapshot_'):
asyncio.create_task(
ws.send_json(
{
'method': 'unsubscribe',
'params': {'channel': channel},
}
)
)
product_code = channel.replace('lightning_board_snapshot_', '')
self.board._delete(self.board.find({'product_code': product_code}))
self._snapshots.add(product_code)
else:
product_code = channel.replace('lightning_board_', '')
if product_code in self._snapshots:
self.board._onmessage(product_code, message)
elif channel.startswith('lightning_ticker_'):
self.ticker._onmessage(message)
elif channel.startswith('lightning_executions_'):
self.executions._onmessage(message)
elif channel == 'child_order_events':
self.childorderevents._onmessage(message)
self.childorders._onmessage(message)
self.positions._onmessage(message)
elif channel == 'parent_order_events':
self.parentorderevents._onmessage(message)
self.parentorders._onmessage(message)

@property
def board(self) -> 'Board':
return self.get('board', Board)

@property
def ticker(self) -> 'Ticker':
return self.get('ticker', Ticker)

@property
def executions(self) -> 'Executions':
return self.get('executions', Executions)

@property
def childorderevents(self) -> 'ChildOrderEvents':
return self.get('childorderevents', ChildOrderEvents)

@property
def childorders(self) -> 'ChildOrders':
return self.get('childorders', ChildOrders)

@property
def parentorderevents(self) -> 'ParentOrderEvents':
return self.get('parentorderevents', ParentOrderEvents)

@property
def parentorders(self) -> 'ParentOrders':
return self.get('parentorders', ParentOrders)

@property
def positions(self) -> 'Positions':
return self.get('positions', Positions)


class Board(DataStore):
_KEYS = ['product_code', 'side', 'price']

def _init(self) -> None:
self.mid_price: Dict[str, float] = {}

def sorted(self, query: Item = {}) -> Dict[str, List[Item]]:
result = {'SELL': [], 'BUY': []}
for item in self:
if all(k in item and query[k] == item[k] for k in query):
result[item['side']].append(item)
result['SELL'].sort(key=lambda x: x['price'])
result['BUY'].sort(key=lambda x: x['price'], reverse=True)
return result

def _onmessage(self, product_code: str, message: Item) -> None:
self.mid_price[product_code] = message['mid_price']
for key, side in (('bids', 'BUY'), ('asks', 'SELL')):
for item in message[key]:
if item['size']:
self._insert([{'product_code': product_code, 'side': side, **item}])
else:
self._delete([{'product_code': product_code, 'side': side, **item}])
board = self.sorted({'product_code': product_code})
targets = []
for side, ope in (('BUY', operator.le), ('SELL', operator.gt)):
for item in board[side]:
if ope(item['price'], message['mid_price']):
break
else:
targets.append(item)
self._delete(targets)


class Ticker(DataStore):
_KEYS = ['product_code']

def _onmessage(self, message: Item) -> None:
self._update([message])


class Executions(DataStore):
_MAXLEN = 99999

def _onmessage(self, message: List[Item]) -> None:
self._insert(message)


class ChildOrderEvents(DataStore):
def _onmessage(self, message: List[Item]) -> None:
self._insert(message)


class ParentOrderEvents(DataStore):
def _onmessage(self, message: List[Item]) -> None:
self._insert(message)


class ChildOrders(DataStore):
_KEYS = ['child_order_acceptance_id']

def _onresponse(self, data: List[Item]) -> None:
if data:
self._delete(self.find({'product_code': data[0]['product_code']}))
for item in data:
if item['child_order_state'] == 'ACTIVE':
self._insert([item])

def _onmessage(self, message: List[Item]) -> None:
for item in message:
if item['event_type'] == 'ORDER':
self._insert([item])
elif item['event_type'] in ('CANCEL', 'EXPIRE'):
self._delete([item])
elif item['event_type'] == 'EXECUTION':
if item['outstanding_size']:
childorder = self.get(item)
if childorder:
if isinstance(childorder['size'], int) and isinstance(
item['size'], int
):
childorder['size'] -= item['size']
else:
childorder['size'] = float(
Decimal(str(childorder['size']))
- Decimal(str(item['size']))
)
else:
self._delete([item])


class ParentOrders(DataStore):
_KEYS = ['parent_order_acceptance_id']

def _onresponse(self, data: List[Item]) -> None:
if data:
self._delete(self.find({'product_code': data[0]['product_code']}))
for item in data:
if item['parent_order_state'] == 'ACTIVE':
self._insert([item])

def _onmessage(self, message: List[Item]) -> None:
for item in message:
if item['event_type'] == 'ORDER':
self._insert([item])
elif item['event_type'] in ('CANCEL', 'EXPIRE'):
self._delete([item])
elif item['event_type'] == 'COMPLETE':
parentorder = self.get(item)
if parentorder:
if parentorder['parent_order_type'] in ('IFD', 'IFDOCO'):
if item['parameter_index'] >= 2:
self._delete([item])
else:
self._delete([item])


class Positions(DataStore):
_COMMON_KEYS = [
'product_code',
'side',
'price',
'size',
'commission',
'sfd',
]

def _common_keys(self, item: Item) -> Item:
return {key: item[key] for key in self._COMMON_KEYS}

def _onresponse(self, data: List[Item]) -> None:
if data:
self._delete(self.find({'product_code': data[0]['product_code']}))
for item in data:
self._insert([self._common_keys(item)])

def _onmessage(self, message: List[Item]) -> None:
for item in message:
if item['event_type'] == 'EXECUTION':
positions = self._find_with_uuid({'product_code': item['product_code']})
if positions:
if positions[next(iter(positions))]['side'] == item['side']:
self._insert([self._common_keys(item)])
else:
for uid, pos in positions.items():
if pos['size'] > item['size']:
if isinstance(pos['size'], int) and isinstance(
item['size'], int
):
pos['size'] -= item['size']
else:
pos['size'] = float(
Decimal(str(pos['size']))
- Decimal(str(item['size']))
)
break
else:
if isinstance(pos['size'], int) and isinstance(
item['size'], int
):
item['size'] -= pos['size']
else:
item['size'] = float(
Decimal(str(item['size']))
- Decimal(str(pos['size']))
)
self._remove([uid])
if not pos['size']:
break
else:
try:
self._insert([self._common_keys(item)])
except KeyError:
pass
7 changes: 7 additions & 0 deletions pybotters/models/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import Tuple
from .bybit import BybitInverseDataStore, BybitUSDTDataStore

__all__: Tuple[str, ...] = (
'BybitInverseDataStore',
'BybitUSDTDataStore',
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

import aiohttp

from ..store import DataStore, DataStoreManager
from ..typedefs import Item
from ..ws import ClientWebSocketResponse
from ...store import DataStore, DataStoreManager
from ...typedefs import Item
from ...ws import ClientWebSocketResponse

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -543,7 +543,17 @@ class PositionUSDT(PositionInverse):
],
}

<<<<<<< HEAD:pybotters/models/experimental.py
def both(self, symbol: str) -> dict[str, Optional[Item]]:
=======
def one(self, symbol: str) -> Dict[str, Optional[Item]]:
return {
"Sell": self.get({"symbol": symbol, "side": "Sell"}),
"Buy": self.get({"symbol": symbol, "side": "Buy"}),
}

def both(self, symbol: str) -> Dict[str, Optional[Item]]:
>>>>>>> origin/develop:pybotters/models/experimental/bybit.py
return {
"Sell": self.get({"symbol": symbol, "side": "Sell"}),
"Buy": self.get({"symbol": symbol, "side": "Buy"}),
Expand Down
25 changes: 25 additions & 0 deletions pybotters/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,21 @@ def _delete(self, data: list[Item]) -> None:
# !TODO! This behaviour might be undesirable.
self._set(data)

def _remove(self, uuids: List[uuid.UUID]) -> None:
if self._keys:
for _id in uuids:
if _id in self._data:
item = self._data[_id]
keyhash = self._hash({k: item[k] for k in self._keys})
del self._data[_id]
del self._index[keyhash]
else:
for _id in uuids:
if _id in self._data:
del self._data[_id]
# !TODO! This behaviour might be undesirable.
self._set([])

def _clear(self) -> None:
self._data.clear()
self._index.clear()
Expand Down Expand Up @@ -155,6 +170,16 @@ def find(self, query: Optional[Item] = None) -> list[Item]:
else:
return list(self)

def _find_with_uuid(self, query: Item = None) -> dict[uuid.UUID, Item]:
if query:
return {
_id: item
for _id, item in self._data.items()
if all(k in item and query[k] == item[k] for k in query)
}
else:
return self._data

def _find_and_delete(self, query: Optional[Item] = None) -> list[Item]:
if query:
ret = [
Expand Down
6 changes: 5 additions & 1 deletion pybotters/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ async def ws_run_forever(
logger.error(repr(e))
elif msg.type == aiohttp.WSMsgType.ERROR:
break
except (aiohttp.WSServerHandshakeError, aiohttp.ClientOSError) as e:
except (
aiohttp.WSServerHandshakeError,
aiohttp.ClientOSError,
ConnectionResetError,
) as e:
logger.warning(repr(e))
await cooldown

Expand Down

0 comments on commit f03543c

Please sign in to comment.