Skip to content

Commit

Permalink
Merge pull request #1606 from MonsieurNicolas/peerFix
Browse files Browse the repository at this point in the history
Outbound connectivity fixes

Reviewed-by: vogel
  • Loading branch information
latobarita committed Mar 16, 2018
2 parents a340312 + 899d7bc commit b0923f1
Show file tree
Hide file tree
Showing 16 changed files with 97 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Builds/VisualStudio/stellar-core.vcxproj
Expand Up @@ -425,6 +425,7 @@ exit /b 0
<ClCompile Include="..\..\src\overlay\LoadManager.cpp" />
<ClCompile Include="..\..\src\overlay\OverlayManagerTests.cpp" />
<ClCompile Include="..\..\src\overlay\PeerAuth.cpp" />
<ClCompile Include="..\..\src\overlay\PeerBareAddress.cpp" />
<ClCompile Include="..\..\src\overlay\PeerRecord.cpp" />
<ClCompile Include="..\..\src\overlay\PeerRecordTests.cpp" />
<ClCompile Include="..\..\src\overlay\TCPPeerTests.cpp" />
Expand Down Expand Up @@ -608,6 +609,7 @@ exit /b 0
<ClInclude Include="..\..\src\overlay\BanManagerImpl.h" />
<ClInclude Include="..\..\src\overlay\LoadManager.h" />
<ClInclude Include="..\..\src\overlay\PeerAuth.h" />
<ClInclude Include="..\..\src\overlay\PeerBareAddress.h" />
<ClInclude Include="..\..\src\overlay\StellarXDR.h" />
<ClInclude Include="..\..\src\herder\HerderImpl.h" />
<ClInclude Include="..\..\src\herder\Herder.h" />
Expand Down
6 changes: 6 additions & 0 deletions Builds/VisualStudio/stellar-core.vcxproj.filters
Expand Up @@ -852,6 +852,9 @@
<ClCompile Include="..\..\src\main\StellarCoreVersion.cpp">
<Filter>main\generated</Filter>
</ClCompile>
<ClCompile Include="..\..\src\overlay\PeerBareAddress.cpp">
<Filter>overlay</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\ledger\LedgerManager.h">
Expand Down Expand Up @@ -1466,6 +1469,9 @@
<ClInclude Include="..\..\src\main\StellarCoreVersion.h">
<Filter>main</Filter>
</ClInclude>
<ClInclude Include="..\..\src\overlay\PeerBareAddress.h">
<Filter>overlay</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="..\..\AUTHORS" />
Expand Down
14 changes: 13 additions & 1 deletion src/main/Config.cpp
Expand Up @@ -11,6 +11,7 @@
#include "main/ExternalQueue.h"
#include "main/StellarCoreVersion.h"
#include "scp/LocalNode.h"
#include "util/Fs.h"
#include "util/Logging.h"
#include "util/types.h"

Expand Down Expand Up @@ -67,7 +68,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
TARGET_PEER_CONNECTIONS = 8;
MAX_ADDITIONAL_PEER_CONNECTIONS = -1;
MAX_PEER_CONNECTIONS = 12;
MAX_PENDING_CONNECTIONS = 5000;
MAX_PENDING_CONNECTIONS = 500;
PEER_AUTHENTICATION_TIMEOUT = 2;
PEER_TIMEOUT = 30;
PREFERRED_PEERS_ONLY = false;
Expand Down Expand Up @@ -527,6 +528,17 @@ Config::load(std::string const& filename)
static_cast<unsigned short>(MAX_ADDITIONAL_PEER_CONNECTIONS +
TARGET_PEER_CONNECTIONS));

// ensure that max pending connections is not above what the system
// supports
MAX_PENDING_CONNECTIONS = static_cast<unsigned short>(
std::min<int>(MAX_PENDING_CONNECTIONS, fs::getMaxConnections()));

// enforce TARGET_PEER_CONNECTIONS <= MAX_PEER_CONNECTIONS <=
// MAX_PENDING_CONNECTIONS
MAX_PEER_CONNECTIONS =
std::min(MAX_PEER_CONNECTIONS, MAX_PENDING_CONNECTIONS);
TARGET_PEER_CONNECTIONS =
std::min(TARGET_PEER_CONNECTIONS, MAX_PEER_CONNECTIONS);
validateConfig();
}
catch (cpptoml::toml_parse_exception& ex)
Expand Down
5 changes: 3 additions & 2 deletions src/overlay/LoopbackPeer.cpp
Expand Up @@ -28,15 +28,16 @@ LoopbackPeer::LoopbackPeer(Application& app, PeerRole role) : Peer(app, role)
}

PeerBareAddress
LoopbackPeer::makeAddress(unsigned short remoteListeningPort) const
LoopbackPeer::makeAddress(int remoteListeningPort) const
{
if (remoteListeningPort <= 0 || remoteListeningPort > UINT16_MAX)
{
return PeerBareAddress{};
}
else
{
return PeerBareAddress{"127.0.0.1", remoteListeningPort};
return PeerBareAddress{
"127.0.0.1", static_cast<unsigned short>(remoteListeningPort)};
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/overlay/LoopbackPeer.h
Expand Up @@ -53,8 +53,7 @@ class LoopbackPeer : public Peer
Stats mStats;

void sendMessage(xdr::msg_ptr&& xdrBytes) override;
PeerBareAddress
makeAddress(unsigned short remoteListeningPort) const override;
PeerBareAddress makeAddress(int remoteListeningPort) const override;
AuthCert getAuthCert() override;

void processInQueue();
Expand Down
17 changes: 15 additions & 2 deletions src/overlay/OverlayManagerImpl.cpp
Expand Up @@ -135,7 +135,16 @@ OverlayManagerImpl::connectTo(PeerRecord& pr)
pr.backOff(mApp.getClock());
pr.storePeerRecord(mApp.getDatabase());

addPendingPeer(TCPPeer::initiate(mApp, pr.getAddress()));
if (getPendingPeersCount() < mApp.getConfig().MAX_PENDING_CONNECTIONS)
{
addPendingPeer(TCPPeer::initiate(mApp, pr.getAddress()));
}
else
{
CLOG(DEBUG, "Overlay")
<< "reached maximum number of pending connections, backing off "
<< pr.toString();
}
}
else
{
Expand Down Expand Up @@ -222,7 +231,11 @@ OverlayManagerImpl::getPreferredPeersFromConfig()
std::vector<PeerRecord>
OverlayManagerImpl::getPeersToConnectTo(int maxNum)
{
const int batchSize = std::max(20, maxNum);
// don't connect to too many peers at once
maxNum = std::min(maxNum, 50);

// batch is how many peers to load from the database every time
const int batchSize = std::max(50, maxNum);

std::vector<PeerRecord> peers;

Expand Down
2 changes: 1 addition & 1 deletion src/overlay/OverlayManagerTests.cpp
Expand Up @@ -40,7 +40,7 @@ class PeerStub : public Peer
mAddress = addres;
}
virtual PeerBareAddress
makeAddress(unsigned short) const override
makeAddress(int) const override
{
REQUIRE(false); // should not be called
return {};
Expand Down
20 changes: 13 additions & 7 deletions src/overlay/Peer.cpp
Expand Up @@ -378,19 +378,25 @@ Peer::sendGetScpState(uint32 ledgerSeq)
void
Peer::sendPeers()
{
// send top 50 peers we know about
StellarMessage newMsg;
newMsg.type(PEERS);
uint32 maxPeerCount = std::min<uint32>(50, newMsg.peers().max_size());

// send top peers we know about
vector<PeerRecord> peerList;
PeerRecord::loadPeerRecords(mApp.getDatabase(), 50, mApp.getClock().now(),
[&](PeerRecord const& pr) {
if (!pr.getAddress().isPrivate() &&
pr.getAddress() != mAddress)
bool r = peerList.size() < maxPeerCount;
if (r)
{
peerList.emplace_back(pr);
if (!pr.getAddress().isPrivate() &&
pr.getAddress() != mAddress)
{
peerList.emplace_back(pr);
}
}
return peerList.size() < 50;
return r;
});
StellarMessage newMsg;
newMsg.type(PEERS);
newMsg.peers().reserve(peerList.size());
for (auto const& pr : peerList)
{
Expand Down
3 changes: 1 addition & 2 deletions src/overlay/Peer.h
Expand Up @@ -178,8 +178,7 @@ class Peer : public std::enable_shared_from_this<Peer>,
}

virtual AuthCert getAuthCert();
virtual PeerBareAddress
makeAddress(unsigned short remoteListeningPort) const = 0;
virtual PeerBareAddress makeAddress(int remoteListeningPort) const = 0;

void startIdleTimer();
void idleTimerExpired(asio::error_code const& error);
Expand Down
4 changes: 2 additions & 2 deletions src/overlay/PeerBareAddress.cpp
Expand Up @@ -58,7 +58,7 @@ PeerBareAddress::PeerBareAddress(PeerAddress const& pa) : mType{Type::IPv4}
ip << (int)pa.ip.ipv4()[0] << "." << (int)pa.ip.ipv4()[1] << "."
<< (int)pa.ip.ipv4()[2] << "." << (int)pa.ip.ipv4()[3];
mIP = ip.str();
mPort = pa.port;
mPort = static_cast<unsigned short>(pa.port);
}

PeerBareAddress
Expand Down Expand Up @@ -150,7 +150,7 @@ PeerBareAddress::toString() const
return mIP + ":" + std::to_string(mPort);
}
default:
assert(false);
abort();
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/overlay/PeerRecord.cpp
Expand Up @@ -125,7 +125,7 @@ PeerRecord::loadPeerRecords(Database& db, int batchSize,
try
{
int offset = 0;
bool didSomething;
bool lastRes;
do
{
tm nextAttemptMax = VirtualClock::pointToTm(nextAttemptCutoff);
Expand All @@ -141,14 +141,14 @@ PeerRecord::loadPeerRecords(Database& db, int batchSize,
st.exchange(use(batchSize));
st.exchange(use(offset));

didSomething = false;
lastRes = false;

loadPeerRecords(db, prep, [&](PeerRecord const& pr) {
offset++;
didSomething = true;
return pred(pr);
lastRes = pred(pr);
return lastRes;
});
} while (didSomething);
} while (lastRes);
}
catch (soci_error& err)
{
Expand Down
6 changes: 4 additions & 2 deletions src/overlay/TCPPeer.cpp
Expand Up @@ -113,7 +113,7 @@ TCPPeer::~TCPPeer()
}

PeerBareAddress
TCPPeer::makeAddress(unsigned short remoteListeningPort) const
TCPPeer::makeAddress(int remoteListeningPort) const
{
asio::error_code ec;
auto ep = mSocket->next_layer().remote_endpoint(ec);
Expand All @@ -123,7 +123,9 @@ TCPPeer::makeAddress(unsigned short remoteListeningPort) const
}
else
{
return PeerBareAddress{ep.address().to_string(), remoteListeningPort};
return PeerBareAddress{
ep.address().to_string(),
static_cast<unsigned short>(remoteListeningPort)};
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/overlay/TCPPeer.h
Expand Up @@ -35,8 +35,7 @@ class TCPPeer : public Peer
bool mDelayedShutdown{false};
bool mShutdownScheduled{false};

PeerBareAddress
makeAddress(unsigned short remoteListeningPort) const override;
PeerBareAddress makeAddress(int remoteListeningPort) const override;

void recvMessage();
void sendMessage(xdr::msg_ptr&& xdrBytes) override;
Expand Down
26 changes: 26 additions & 0 deletions src/util/Fs.cpp
Expand Up @@ -15,6 +15,7 @@
#include <filesystem>
#else
#include <dirent.h>
#include <sys/resource.h>
#include <sys/stat.h>
#endif

Expand Down Expand Up @@ -440,5 +441,30 @@ checkNoGzipSuffix(std::string const& filename)
throw std::runtime_error("filename ends in .gz");
}
}

#ifdef _WIN32

int
getMaxConnections()
{
// on Windows, there is no limit on handles
// only limits based on ephemeral ports, etc
return 32000;
}

#else
int
getMaxConnections()
{
struct rlimit rl;
if (getrlimit(RLIMIT_NOFILE, &rl) == 0)
{
// leave some buffer
return (rl.rlim_cur * 3) / 4;
}
// could not query the limit, default to a value that should work
return 64;
}
#endif
}
}
3 changes: 3 additions & 0 deletions src/util/Fs.h
Expand Up @@ -85,5 +85,8 @@ std::string remoteName(std::string const& type, std::string const& hexStr,
void checkGzipSuffix(std::string const& filename);

void checkNoGzipSuffix(std::string const& filename);

// returns the maximum number of connections that can be done at the same time
int getMaxConnections();
}
}
2 changes: 1 addition & 1 deletion src/xdr/Stellar-overlay.x
Expand Up @@ -112,7 +112,7 @@ case DONT_HAVE:
case GET_PEERS:
void;
case PEERS:
PeerAddress peers<>;
PeerAddress peers<100>;

case GET_TX_SET:
uint256 txSetHash;
Expand Down

0 comments on commit b0923f1

Please sign in to comment.