-
Notifications
You must be signed in to change notification settings - Fork 1
/
marketmaker.py
321 lines (260 loc) · 14.9 KB
/
marketmaker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
import concurrent.futures
import json
import logging
import math
import multiprocessing
import operator
import time
import traceback
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from btc38 import btc38exchange
from data import marketmakerexchange
from dex import dexexchange
from notifications import emailsender
CNY_CURRENCY_CODE = marketmakerexchange.CNY
BTS_CURRENCY_CODE = marketmakerexchange.BTS
BIDS = marketmakerexchange.BIDS
ASKS = marketmakerexchange.ASKS
UPDATE_TIME = "updateTime"
PROFIT_THRESHOLD = 0.00
MINIMUM_PURCHASE_VOLUME = 500
# Leave 100 shares in the listing as buffer
MIN_LISTING_VOLUME_BUFFER = 500
ACCOUNT_CNY_RESERVE = 50
ACCOUNT_BTS_RESERVE = 100
EXCHANGE_SYNC_TOLERANCE = 2
UPDATE_LAG_TOLERANCE = 10
# Order book information is valid for 1 second.
ORDER_BOOK_VALID_WINDOW = 1
# logger
log = logging.getLogger(__name__)
class TradeExchange(object):
def __init__(self):
with open("configurations/config.json") as client_config:
config = json.load(client_config)
self.exchanges = {}
for exchange_client in config:
if exchange_client['client'] == dexexchange.EXCHANGE_NAME:
dex_exchange = dexexchange.DexExchange(exchange_client['WITNESS_URL'],
exchange_client['ACCOUNT'],
exchange_client['SECRET_KEY'])
self.exchanges[dex_exchange.get_exchange_name()] = dex_exchange
if exchange_client['client'] == btc38exchange.EXCHANGE_NAME:
btc38_exchange = btc38exchange.BTC38Exchange(exchange_client['ACCESS_KEY'],
exchange_client['SECRET_KEY'],
exchange_client['ACCOUNT_ID'])
self.exchanges[btc38_exchange.get_exchange_name()] = btc38_exchange
# Daemon to update order books. Each exchange requires one daemon. Daemon should not terminate in any cases.
def order_book_fetcher_daemon(exchange, order_book):
while True:
exchange_name = exchange.get_exchange_name()
# Multiprocessing manager cannot update nested items in the dictionary, therefore, I have to create a copy of
# the nested dictionary, update it, and then put it back into the main dictionary.
exchange_order_book = order_book[exchange_name]
time_since_last_update = 0
try:
top_offers = exchange.get_top_offers()
current_time = datetime.now()
time_since_last_update = (current_time - exchange_order_book[UPDATE_TIME]).total_seconds()
exchange_order_book[BIDS] = top_offers[0]
exchange_order_book[ASKS] = top_offers[1]
exchange_order_book[UPDATE_TIME] = current_time
order_book[exchange_name] = exchange_order_book
time.sleep(0.3)
except Exception as e:
if time_since_last_update > UPDATE_LAG_TOLERANCE:
log.warning("Exchange: {} receives no update for {} seconds. (Last error: {})"
.format(exchange_name, time_since_last_update, e))
def round_up(value, decimal=0):
return math.ceil(value * (10 ** decimal)) / (10 ** decimal)
def round_down(value, decimal=0):
return math.floor(value * (10 ** decimal)) / (10 ** decimal)
# Send notification email using the template.
def send_notification_email(message):
with open("configurations/email_header.json") as email_header:
header = json.load(email_header)
with open("configurations/email_credential.json") as email_credential:
credential = json.load(email_credential)
email_sender = emailsender.EmailSender(credential["login"], credential["password"])
email_process = multiprocessing.Process(target=email_sender.send_email_with_message,
args=(message, header["subject"], header["from"], header["to"]))
email_process.start()
class MarketMaker(object):
def __init__(self):
trade_exchanges = TradeExchange()
self.exchanges_dict = trade_exchanges.exchanges
self.account_balance = {exchange_name: {CNY_CURRENCY_CODE: 0, BTS_CURRENCY_CODE: 0} for
exchange_name, exchange in self.exchanges_dict.items()}
current_time = datetime.now()
manager = multiprocessing.Manager()
self.order_book = manager.dict({exchange_name: {BIDS: [0, 0], ASKS: [0, 0], UPDATE_TIME: current_time} for
exchange_name, exchange in self.exchanges_dict.items()})
self.last_transaction_time = {exchange_name: current_time for
exchange_name, exchange in self.exchanges_dict.items()}
# Check balance only when transactions were made.
self.need_balance_check = True
def run(self):
scheduler = BackgroundScheduler()
# Update account balance every 5 minutes in case external transfer happened.
scheduler.add_job(self.__request_account_balance_checking, 'interval', minutes=5)
scheduler.start()
order_book_fetchers = []
for exchange_name, exchange in self.exchanges_dict.items():
p = multiprocessing.Process(target=order_book_fetcher_daemon, args=(exchange, self.order_book))
p.daemon = True
order_book_fetchers.append(p)
p.start()
log.info("Started updating order book process for exchange: {}, pid: {}"
.format(exchange.get_exchange_name(), p.pid))
while all(map(multiprocessing.Process.is_alive, order_book_fetchers)):
try:
self.__speculate()
time.sleep(0.3)
except Exception as e:
traceback.print_exc()
log.error("Unexpected exception caught in main execution. (Error: {})".format(e))
log.fatal("Order book daemon terminated! Exit the market maker.")
send_notification_email("Market Maker terminated!")
def __speculate(self):
if self.need_balance_check:
try:
self.__update_account_balance()
except Exception as e:
log.error("Failed to update account balance.(Error: {})".format(e))
return
for buyer_name, buyer_exchange in self.exchanges_dict.items():
if self.__is_order_book_valid(buyer_name):
profitable_exchange_name = self.__find_profitable_exchange(buyer_exchange)
if profitable_exchange_name:
seller_exchange = self.exchanges_dict[profitable_exchange_name]
# BTC38 can only accept price with 5 decimal places, and volume up to 6 decimal places.
# Round up the purchase price, and round down the sell price to guarantee profit.
purchase_price = round_up(self.order_book[buyer_name][ASKS][0], 4)
sell_price = round_down(self.order_book[profitable_exchange_name][BIDS][0], 4)
purchase_volume = round(self.__calculate_purchase_volume(buyer_exchange, seller_exchange), 6)
sell_volume = round(self.__calculate_sell_volume(buyer_exchange, purchase_volume), 6)
if sell_volume < MINIMUM_PURCHASE_VOLUME:
log.info("Under minimum arbitrage volume: {}".format(sell_volume))
else:
log.info("Placing arbitrage order...")
order_placed = self.__place_arbitrage_orders(buyer_exchange, purchase_price, purchase_volume,
seller_exchange, sell_price, sell_volume)
if order_placed:
send_notification_email("Arbitrage: purchase from {} at {}, volume: {}. Total: {}\n"
"Arbitrage: sell to {} at {}, volume: {}. Total: {}"
.format(buyer_name, purchase_price, purchase_volume,
purchase_price * purchase_volume,
profitable_exchange_name, sell_price, sell_volume,
sell_price * sell_volume))
else:
send_notification_email("Failed to place arbitrage order!")
"""
Find the highest bidder in the order book. If the profit is higher than the threshold,
return profitable exchange name.
"""
def __find_profitable_exchange(self, buyer_exchange):
buyer_name = buyer_exchange.get_exchange_name()
target_order_book = {exchange_name: order_book[BIDS][0] for exchange_name, order_book in self.order_book.items()
if self.__is_order_book_valid(exchange_name) and exchange_name != buyer_name}
if len(target_order_book) == 0:
return None
best_offer = max(target_order_book.items(), key=operator.itemgetter(1))
purchase_price = self.order_book[buyer_name][ASKS][0]
profit = (best_offer[1] - purchase_price) / purchase_price
if profit - buyer_exchange.get_profit_deduction() > PROFIT_THRESHOLD:
log.info("Found profitable exchange {}! Profit: {:.2f}%."
.format(buyer_name, profit * 100))
return best_offer[0]
def __order_books_in_sync(self, base_exchange_name, target_exchange_name):
base_ex_update_time = self.order_book[base_exchange_name][UPDATE_TIME]
compare_ex_update_time = self.order_book[target_exchange_name][UPDATE_TIME]
if abs((base_ex_update_time - compare_ex_update_time).total_seconds()) > EXCHANGE_SYNC_TOLERANCE:
return False
else:
return True
def __is_order_book_valid(self, exchange_name):
current_time = datetime.now()
last_update_time = self.order_book[exchange_name][UPDATE_TIME]
updated_after_transactions = last_update_time > self.last_transaction_time[exchange_name]
order_book_valid = (current_time - last_update_time).total_seconds() < ORDER_BOOK_VALID_WINDOW
return updated_after_transactions and order_book_valid
def __request_account_balance_checking(self):
self.need_balance_check = True
def __update_account_balance(self):
for exchange_name, exchange in self.exchanges_dict.items():
exchange_name = exchange.get_exchange_name()
balance = exchange.get_maker_account_balance()
self.account_balance[exchange_name][CNY_CURRENCY_CODE] = balance[CNY_CURRENCY_CODE]
self.account_balance[exchange_name][BTS_CURRENCY_CODE] = balance[BTS_CURRENCY_CODE]
log.info("Account balance updated. New account balance: {}".format(self.account_balance))
self.need_balance_check = False
# Price = BTS price in terms of CNY. Volume = number of BTS shares.
def __calculate_purchase_volume(self, buyer_exchange, seller_exchange):
buyer_exchange_name = buyer_exchange.get_exchange_name()
seller_exchange_name = seller_exchange.get_exchange_name()
buyer_vol = self.order_book[buyer_exchange_name][ASKS][1]
seller_vol = self.order_book[seller_exchange_name][BIDS][1]
volume_available = min(buyer_vol, seller_vol)
if volume_available < MINIMUM_PURCHASE_VOLUME + MIN_LISTING_VOLUME_BUFFER:
return 0
buyer_price = self.order_book[buyer_exchange_name][ASKS][0]
usable_cny = self.account_balance[buyer_exchange_name][CNY_CURRENCY_CODE] - ACCOUNT_CNY_RESERVE
usable_bts = self.account_balance[seller_exchange_name][BTS_CURRENCY_CODE] - ACCOUNT_BTS_RESERVE
if usable_cny <= 0:
log.info("Insufficient fund on buyer account: {}".format(buyer_exchange_name))
return 0
elif usable_bts <= 0:
log.info("Insufficient fund on seller account: {}".format(seller_exchange_name))
return 0
else:
safe_volume = volume_available - max(volume_available * 0.2, MIN_LISTING_VOLUME_BUFFER)
return min(usable_cny / buyer_price, usable_bts, safe_volume)
@staticmethod
def __calculate_sell_volume(buyer_exchange, purchase_volume):
buyer_name = buyer_exchange.get_exchange_name()
# Withdrawal fee from btc38 is 1%, therefore, sell_vol = purchase_vol * 0.99 - 1
if buyer_name == btc38exchange.EXCHANGE_NAME:
return purchase_volume * 0.99 - 1
else:
return purchase_volume - 1
def __open_order_exists(self):
for exchange_name, exchange in self.exchanges_dict.items():
orders = exchange.list_my_orders()
if orders:
log.info("{} order still open: {}".format(exchange.get_exchange_name(), orders))
return True
return False
"""
Place arbitrage order, return True if both orders have been placed, false otherwise.
After placing arbitrage order, send out email notification.
Price = BTS price in terms of CNY. Volume = number of BTS shares.
"""
def __place_arbitrage_orders(self, buyer_exchange, purchase_price, purchase_volume,
seller_exchange, sell_price, sell_volume):
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
seller_thread_future = executor.submit(self.__place_orders_thread,
seller_exchange, 2, sell_price, sell_volume)
buyer_thread_future = executor.submit(self.__place_orders_thread,
buyer_exchange, 1, purchase_price, purchase_volume)
seller_exception = seller_thread_future.exception()
buyer_exception = buyer_thread_future.exception()
# If this method is called, successful or not, we need to recheck the account balance.
self.__request_account_balance_checking()
if seller_exception is not None or buyer_exception is not None:
error_email = "Seller exception: {}\nBuyer exception: {}".format(seller_exception, buyer_exception)
send_notification_email(error_email)
return False
return True
def __place_orders_thread(self, exchange, order_type, price, volume):
exchange_name = exchange.get_exchange_name()
order_message = "Place order at: {} - order type {}, at {}, volume: {}".format(
exchange_name, order_type, price, volume)
try:
exchange.submit_arbitrage_order(order_type, price, volume)
current_time = datetime.now()
self.last_transaction_time[exchange_name] = current_time
log.info("Arbitrage order placed successfully" + order_message)
except Exception as e:
log.error("Failed to place order - " + order_message + ". Error: {}.".format(e))
raise e