Skip to content

Commit

Permalink
Finishes all futures when one throws an exception
Browse files Browse the repository at this point in the history
  • Loading branch information
sjanel committed Oct 24, 2023
1 parent a0d8b12 commit 0c50407
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 100 deletions.
4 changes: 2 additions & 2 deletions src/api/common/include/exchangeprivateapi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ class ExchangePrivate : public ExchangeBase {
/// Returns the amounts actually traded with the final amount balance on this currency
TradedAmountsVectorWithFinalAmount queryDustSweeper(CurrencyCode currencyCode);

/// Builds en ExchangeName wrapping the exchange and the key name
ExchangeName exchangeName() const { return ExchangeName(_exchangePublic.name(), _apiKey.name()); }
/// Builds an ExchangeName wrapping the exchange and the key name
ExchangeName exchangeName() const { return {_exchangePublic.name(), _apiKey.name()}; }

const ExchangeInfo &exchangeInfo() const { return _exchangePublic.exchangeInfo(); }

Expand Down
15 changes: 8 additions & 7 deletions src/api/exchanges/src/binanceprivateapi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "recentdeposit.hpp"
#include "ssl_sha.hpp"
#include "stringhelpers.hpp"
#include "timedef.hpp"
#include "timestring.hpp"
#include "tradeinfo.hpp"

Expand Down Expand Up @@ -61,7 +62,7 @@ void SetNonceAndSignature(const APIKey& apiKey, CurlPostData& postData, Duration

bool CheckErrorDoRetry(int statusCode, const json& ret, QueryDelayDir& queryDelayDir, Duration& sleepingTime,
Duration& queryDelay) {
static constexpr Duration kInitialDurationQueryDelay = std::chrono::milliseconds(200);
static constexpr Duration kInitialDurationQueryDelay = TimeInMs(200);
switch (statusCode) {
case kInvalidTimestamp: {
auto msgIt = ret.find("msg");
Expand All @@ -80,7 +81,7 @@ bool CheckErrorDoRetry(int statusCode, const json& ret, QueryDelayDir& queryDela
}
queryDelay -= sleepingTime;
log::warn("Our local time is ahead of Binance server's time. Query delay modified to {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(queryDelay).count());
std::chrono::duration_cast<TimeInMs>(queryDelay).count());
// Ensure Nonce is increasing while modifying the query delay
std::this_thread::sleep_for(sleepingTime);
return true;
Expand All @@ -96,7 +97,7 @@ bool CheckErrorDoRetry(int statusCode, const json& ret, QueryDelayDir& queryDela
}
queryDelay += sleepingTime;
log::warn("Our local time is behind of Binance server's time. Query delay modified to {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(queryDelay).count());
std::chrono::duration_cast<TimeInMs>(queryDelay).count());
return true;
}
}
Expand Down Expand Up @@ -131,7 +132,7 @@ json PrivateQuery(CurlHandle& curlHandle, const APIKey& apiKey, HttpRequestType
json ret;
for (int retryPos = 0; retryPos < kNbOrderRequestsRetries; ++retryPos) {
if (retryPos != 0) {
log::trace("Wait {} ms...", std::chrono::duration_cast<std::chrono::milliseconds>(sleepingTime).count());
log::trace("Wait {} ms...", std::chrono::duration_cast<TimeInMs>(sleepingTime).count());
std::this_thread::sleep_for(sleepingTime);
sleepingTime = (3 * sleepingTime) / 2;
}
Expand All @@ -153,7 +154,7 @@ json PrivateQuery(CurlHandle& curlHandle, const APIKey& apiKey, HttpRequestType
break;
}
if (throwIfError) {
log::error("Full Binance json error: '{}'", ret.dump());
log::error("Full Binance json error for {}: '{}'", apiKey.name(), ret.dump());
throw exception("Error: {}, msg: {}", MonetaryAmount(statusCode), ret["msg"].get<std::string_view>());
}
return ret;
Expand Down Expand Up @@ -267,7 +268,7 @@ Orders BinancePrivate::queryOpenedOrders(const OrdersConstraints& openedOrdersCo
}
int64_t millisecondsSinceEpoch = orderDetails["time"].get<int64_t>();

TimePoint placedTime{std::chrono::milliseconds(millisecondsSinceEpoch)};
TimePoint placedTime{TimeInMs(millisecondsSinceEpoch)};
if (!openedOrdersConstraints.validatePlacedTime(placedTime)) {
continue;
}
Expand Down Expand Up @@ -697,7 +698,7 @@ MonetaryAmount BinancePrivate::queryWithdrawDelivery(const InitiatedWithdrawInfo
MonetaryAmount amountReceived(depositDetail["amount"].get<double>(), currencyCode);
int64_t millisecondsSinceEpoch = depositDetail["insertTime"].get<int64_t>();

TimePoint timestamp{std::chrono::milliseconds(millisecondsSinceEpoch)};
TimePoint timestamp{TimeInMs(millisecondsSinceEpoch)};

closestRecentDepositPicker.addDeposit(RecentDeposit(amountReceived, timestamp));
}
Expand Down
71 changes: 35 additions & 36 deletions src/engine/src/exchangesorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ ExchangeHealthCheckStatus ExchangesOrchestrator::healthCheck(ExchangeNameSpan ex

ExchangeHealthCheckStatus ret(selectedExchanges.size());

_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(), ret.begin(),
[](Exchange *exchange) { return std::make_pair(exchange, exchange->healthCheck()); });
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), ret.begin(),
[](Exchange *exchange) { return std::make_pair(exchange, exchange->healthCheck()); });

return ret;
}
Expand All @@ -109,7 +109,7 @@ ExchangeTickerMaps ExchangesOrchestrator::getTickerInformation(ExchangeNameSpan
UniquePublicSelectedExchanges selectedExchanges = _exchangeRetriever.selectOneAccount(exchangeNames);

ExchangeTickerMaps ret(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), ret.begin(),
[](Exchange *exchange) { return std::make_pair(exchange, exchange->queryAllApproximatedOrderBooks(1)); });

Expand All @@ -124,8 +124,8 @@ MarketOrderBookConversionRates ExchangesOrchestrator::getMarketOrderBooks(Market
equiCurrencyCode.isNeutral() ? "" : equiCurrencyCode);
UniquePublicSelectedExchanges selectedExchanges = _exchangeRetriever.selectOneAccount(exchangeNames);
std::array<bool, kNbSupportedExchanges> isMarketTradable;
_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(), isMarketTradable.begin(),
[mk](Exchange *exchange) { return exchange->queryTradableMarkets().contains(mk); });
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), isMarketTradable.begin(),
[mk](Exchange *exchange) { return exchange->queryTradableMarkets().contains(mk); });

FilterVector(selectedExchanges, isMarketTradable);

Expand All @@ -141,7 +141,7 @@ MarketOrderBookConversionRates ExchangesOrchestrator::getMarketOrderBooks(Market
}
return std::make_tuple(exchange->name(), std::move(marketOrderBook), optConversionRate);
};
_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), marketOrderBooksFunc);
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), marketOrderBooksFunc);
return ret;
}

Expand All @@ -157,9 +157,9 @@ BalancePerExchange ExchangesOrchestrator::getBalance(std::span<const ExchangeNam
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);

SmallVector<BalancePortfolio, kTypicalNbPrivateAccounts> balancePortfolios(balanceExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
balanceExchanges.begin(), balanceExchanges.end(), balancePortfolios.begin(),
[&](Exchange *exchange) { return exchange->apiPrivate().getAccountBalance(balanceOptions); });
[&balanceOptions](Exchange *exchange) { return exchange->apiPrivate().getAccountBalance(balanceOptions); });

BalancePerExchange ret;
ret.reserve(balanceExchanges.size());
Expand Down Expand Up @@ -202,7 +202,7 @@ WalletPerExchange ExchangesOrchestrator::getDepositInfo(std::span<const Exchange
FilterVector(depositInfoExchanges, canDepositCurrency);

SmallVector<Wallet, kTypicalNbPrivateAccounts> walletPerExchange(depositInfoExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
depositInfoExchanges.begin(), depositInfoExchanges.end(), walletPerExchange.begin(),
[depositCurrency](Exchange *exchange) { return exchange->apiPrivate().queryDepositWallet(depositCurrency); });
WalletPerExchange ret;
Expand All @@ -221,7 +221,7 @@ OpenedOrdersPerExchange ExchangesOrchestrator::getOpenedOrders(std::span<const E
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);

OpenedOrdersPerExchange ret(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), [&](Exchange *exchange) {
return std::make_pair(exchange, OrdersSet(exchange->apiPrivate().queryOpenedOrders(openedOrdersConstraints)));
});
Expand All @@ -236,7 +236,7 @@ NbCancelledOrdersPerExchange ExchangesOrchestrator::cancelOrders(std::span<const
ExchangeRetriever::SelectedExchanges selectedExchanges =
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);
NbCancelledOrdersPerExchange nbOrdersCancelled(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), nbOrdersCancelled.begin(), [&](Exchange *exchange) {
return std::make_pair(exchange, exchange->apiPrivate().cancelOpenedOrders(ordersConstraints));
});
Expand All @@ -252,7 +252,7 @@ DepositsPerExchange ExchangesOrchestrator::getRecentDeposits(std::span<const Exc
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);

DepositsPerExchange ret(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), [&](Exchange *exchange) {
return std::make_pair(exchange, exchange->apiPrivate().queryRecentDeposits(depositsConstraints));
});
Expand All @@ -268,7 +268,7 @@ WithdrawsPerExchange ExchangesOrchestrator::getRecentWithdraws(std::span<const E
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);

WithdrawsPerExchange ret(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), [&](Exchange *exchange) {
return std::make_pair(exchange, exchange->apiPrivate().queryRecentWithdraws(withdrawsConstraints));
});
Expand All @@ -280,7 +280,7 @@ ConversionPathPerExchange ExchangesOrchestrator::getConversionPaths(Market mk, E
log::info("Query {} conversion path from {}", mk, ConstructAccumulatedExchangeNames(exchangeNames));
UniquePublicSelectedExchanges selectedExchanges = _exchangeRetriever.selectOneAccount(exchangeNames);
ConversionPathPerExchange conversionPathPerExchange(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), conversionPathPerExchange.begin(), [mk](Exchange *exchange) {
return std::make_pair(exchange, exchange->apiPublic().findMarketsPath(mk.base(), mk.quote()));
});
Expand All @@ -305,8 +305,8 @@ MarketsPerExchange ExchangesOrchestrator::getMarketsPerExchange(CurrencyCode cur
[cur1, cur2](Market mk) { return mk.canTrade(cur1) && (cur2.isNeutral() || mk.canTrade(cur2)); });
return std::make_pair(exchange, std::move(ret));
};
_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(), marketsPerExchange.begin(),
marketsWithCur);
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), marketsPerExchange.begin(),
marketsWithCur);
return marketsPerExchange;
}

Expand All @@ -315,7 +315,7 @@ UniquePublicSelectedExchanges ExchangesOrchestrator::getExchangesTradingCurrency
bool shouldBeWithdrawable) {
UniquePublicSelectedExchanges selectedExchanges = _exchangeRetriever.selectOneAccount(exchangeNames);
std::array<bool, kNbSupportedExchanges> isCurrencyTradablePerExchange;
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), isCurrencyTradablePerExchange.begin(),
[currencyCode, shouldBeWithdrawable](Exchange *exchange) {
CurrencyExchangeFlatSet currencies = exchange->queryTradableCurrencies();
Expand All @@ -332,9 +332,8 @@ UniquePublicSelectedExchanges ExchangesOrchestrator::getExchangesTradingMarket(M
ExchangeNameSpan exchangeNames) {
UniquePublicSelectedExchanges selectedExchanges = _exchangeRetriever.selectOneAccount(exchangeNames);
std::array<bool, kNbSupportedExchanges> isMarketTradablePerExchange;
_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(),
isMarketTradablePerExchange.begin(),
[mk](Exchange *exchange) { return exchange->queryTradableMarkets().contains(mk); });
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), isMarketTradablePerExchange.begin(),
[mk](Exchange *exchange) { return exchange->queryTradableMarkets().contains(mk); });

// Erases Exchanges which do not propose asked market
FilterVector(selectedExchanges, isMarketTradablePerExchange);
Expand Down Expand Up @@ -426,7 +425,7 @@ TradedAmountsPerExchange LaunchAndCollectTrades(ThreadPool &threadPool, Exchange
ExchangeAmountMarketsPathVector::iterator last, CurrencyCode toCurrency,
const TradeOptions &tradeOptions) {
TradedAmountsPerExchange tradeAmountsPerExchange(std::distance(first, last));
threadPool.parallel_transform(first, last, tradeAmountsPerExchange.begin(), [toCurrency, &tradeOptions](auto &tuple) {
threadPool.parallelTransform(first, last, tradeAmountsPerExchange.begin(), [toCurrency, &tradeOptions](auto &tuple) {
Exchange *exchange = std::get<0>(tuple);
return std::make_pair(
exchange, exchange->apiPrivate().trade(std::get<1>(tuple), toCurrency, tradeOptions, std::get<2>(tuple)));
Expand All @@ -438,7 +437,7 @@ template <class Iterator>
TradedAmountsPerExchange LaunchAndCollectTrades(ThreadPool &threadPool, Iterator first, Iterator last,
const TradeOptions &tradeOptions) {
TradedAmountsPerExchange tradeAmountsPerExchange(std::distance(first, last));
threadPool.parallel_transform(first, last, tradeAmountsPerExchange.begin(), [&tradeOptions](auto &tuple) {
threadPool.parallelTransform(first, last, tradeAmountsPerExchange.begin(), [&tradeOptions](auto &tuple) {
Exchange *exchange = std::get<0>(tuple);
return std::make_pair(exchange, exchange->apiPrivate().trade(std::get<1>(tuple), std::get<2>(tuple), tradeOptions,
std::get<3>(tuple)));
Expand Down Expand Up @@ -718,11 +717,11 @@ TradedAmountsVectorWithFinalAmountPerExchange ExchangesOrchestrator::dustSweeper
_exchangeRetriever.select(ExchangeRetriever::Order::kInitial, privateExchangeNames);

TradedAmountsVectorWithFinalAmountPerExchange ret(selExchanges.size());
_threadPool.parallel_transform(selExchanges.begin(), selExchanges.end(), ret.begin(),
[currencyCode](Exchange *exchange) {
return std::make_pair(static_cast<const Exchange *>(exchange),
exchange->apiPrivate().queryDustSweeper(currencyCode));
});
_threadPool.parallelTransform(selExchanges.begin(), selExchanges.end(), ret.begin(),
[currencyCode](Exchange *exchange) {
return std::make_pair(static_cast<const Exchange *>(exchange),
exchange->apiPrivate().queryDustSweeper(currencyCode));
});

return ret;
}
Expand All @@ -747,8 +746,8 @@ DeliveredWithdrawInfoWithExchanges ExchangesOrchestrator::withdraw(MonetaryAmoun
throw exception("Cannot withdraw to the same account");
}
std::array<CurrencyExchangeFlatSet, 2> currencyExchangeSets;
_threadPool.parallel_transform(exchangePair.begin(), exchangePair.end(), currencyExchangeSets.begin(),
[](Exchange *exchange) { return exchange->queryTradableCurrencies(); });
_threadPool.parallelTransform(exchangePair.begin(), exchangePair.end(), currencyExchangeSets.begin(),
[](Exchange *exchange) { return exchange->queryTradableCurrencies(); });

DeliveredWithdrawInfoWithExchanges ret{{&fromExchange, &toExchange}, DeliveredWithdrawInfo{}};

Expand Down Expand Up @@ -783,10 +782,10 @@ MonetaryAmountPerExchange ExchangesOrchestrator::getWithdrawFees(CurrencyCode cu
UniquePublicSelectedExchanges selectedExchanges = getExchangesTradingCurrency(currencyCode, exchangeNames, true);

MonetaryAmountPerExchange withdrawFeePerExchange(selectedExchanges.size());
_threadPool.parallel_transform(selectedExchanges.begin(), selectedExchanges.end(), withdrawFeePerExchange.begin(),
[currencyCode](Exchange *exchange) {
return std::make_pair(exchange, exchange->queryWithdrawalFee(currencyCode));
});
_threadPool.parallelTransform(selectedExchanges.begin(), selectedExchanges.end(), withdrawFeePerExchange.begin(),
[currencyCode](Exchange *exchange) {
return std::make_pair(exchange, exchange->queryWithdrawalFee(currencyCode));
});
return withdrawFeePerExchange;
}

Expand All @@ -796,7 +795,7 @@ MonetaryAmountPerExchange ExchangesOrchestrator::getLast24hTradedVolumePerExchan
UniquePublicSelectedExchanges selectedExchanges = getExchangesTradingMarket(mk, exchangeNames);

MonetaryAmountPerExchange tradedVolumePerExchange(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), tradedVolumePerExchange.begin(),
[mk](Exchange *exchange) { return std::make_pair(exchange, exchange->queryLast24hVolume(mk)); });
return tradedVolumePerExchange;
Expand All @@ -809,7 +808,7 @@ LastTradesPerExchange ExchangesOrchestrator::getLastTradesPerExchange(Market mk,
UniquePublicSelectedExchanges selectedExchanges = getExchangesTradingMarket(mk, exchangeNames);

LastTradesPerExchange ret(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), ret.begin(), [mk, nbLastTrades](Exchange *exchange) {
return std::make_pair(static_cast<const Exchange *>(exchange), exchange->queryLastTrades(mk, nbLastTrades));
});
Expand All @@ -822,7 +821,7 @@ MonetaryAmountPerExchange ExchangesOrchestrator::getLastPricePerExchange(Market
UniquePublicSelectedExchanges selectedExchanges = getExchangesTradingMarket(mk, exchangeNames);

MonetaryAmountPerExchange lastPricePerExchange(selectedExchanges.size());
_threadPool.parallel_transform(
_threadPool.parallelTransform(
selectedExchanges.begin(), selectedExchanges.end(), lastPricePerExchange.begin(),
[mk](Exchange *exchange) { return std::make_pair(exchange, exchange->queryLastPrice(mk)); });
return lastPricePerExchange;
Expand Down
2 changes: 2 additions & 0 deletions src/tech/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ add_unit_test(
add_unit_test(
threadpool_test
test/threadpool_test.cpp
DEFINITIONS
CCT_DISABLE_SPDLOG
)

add_unit_test(
Expand Down
Loading

0 comments on commit 0c50407

Please sign in to comment.