Skip to content

Commit

Permalink
Make peer keepalive timeout configurable (#390)
Browse files Browse the repository at this point in the history
Reduce default to 2 mins from 8 mins

Signed-off-by: Tom Flynn <tom.flynn@gmail.com>
  • Loading branch information
tomflynn committed Apr 28, 2021
1 parent 5deb5d4 commit f9ff4ed
Show file tree
Hide file tree
Showing 15 changed files with 72 additions and 18 deletions.
8 changes: 8 additions & 0 deletions agent-ovs/lib/Agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ void Agent::setProperties(const boost::property_tree::ptree& properties) {
static const std::string OPFLEX_STATS_SYSTEM_INTERVAL("opflex.statistics.system.interval");
static const std::string OPFLEX_PRR_INTERVAL("opflex.timers.prr");
static const std::string OPFLEX_HANDSHAKE("opflex.timers.handshake-timeout");
static const std::string OPFLEX_KEEPALIVE("opflex.timers.keepalive-timeout");
static const std::string DISABLED_FEATURES("feature.disabled");
static const std::string BEHAVIOR_L34FLOWS_WITHOUT_SUBNET("behavior.l34flows-without-subnet");

Expand Down Expand Up @@ -455,6 +456,12 @@ void Agent::setProperties(const boost::property_tree::ptree& properties) {
LOG(INFO) << "peer handshake timeout set to " << peerHandshakeTimeout << " ms";
}

optional<uint32_t> keepaliveOpt = properties.get_optional<uint32_t>(OPFLEX_KEEPALIVE);
if (keepaliveOpt) {
keepaliveTimeout = keepaliveOpt.get();
LOG(INFO) << "keepalive timeout set to " << keepaliveTimeout << " ms";
}

LOG(INFO) << "Agent mode set to " <<
((this->rendererFwdMode == opflex::ofcore::OFConstants::TRANSPORT_MODE)?
"transport-mode" : "stitched-mode");
Expand Down Expand Up @@ -520,6 +527,7 @@ void Agent::applyProperties() {

framework.setPrrTimerDuration(prr_timer);
framework.setHandshakeTimeout(peerHandshakeTimeout);
framework.setKeepaliveTimeout(keepaliveTimeout);
}

void Agent::start() {
Expand Down
2 changes: 2 additions & 0 deletions agent-ovs/lib/include/opflexagent/Agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,8 @@ typedef opflex::ofcore::OFConstants::OpflexElementMode opflex_elem_t;
boost::uint_t<64>::fast prr_timer = 7200; /* seconds */
/* handshake timeout */
uint32_t peerHandshakeTimeout = 45000;
/* keepalive timeout */
uint32_t keepaliveTimeout = 120000;

std::set<std::string> endpointSourceFSPaths;
std::set<std::string> disabledFeaturesSet;
Expand Down
6 changes: 5 additions & 1 deletion agent-ovs/opflex-agent-ovs.conf.in
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@
//
// How long to wait for the initial peer
// handshake to complete (in ms)
// "handshake-timeout" : 45000
// "handshake-timeout" : 45000,
//
// How long to wait (in ms) for keepalive echo to
// be ack'd before timing out connection
// "keepalive-timeout" : 120000
},
// Statistics. Counters for various artifacts.
// mode: can be either
Expand Down
2 changes: 1 addition & 1 deletion agent-ovs/ovs/OvsdbConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void OvsdbConnection::on_state_change(yajr::Peer * p, void * data,
switch (stateChange) {
case yajr::StateChange::CONNECT: {
conn->setConnected(true);
p->startKeepAlive(0, 5000, 60000);
p->startKeepAlive(0, 5000, 180000);
// OVSDB monitor call
conn->sendMonitorRequests();
}
Expand Down
12 changes: 4 additions & 8 deletions libopflex/comms/CommunicationPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ namespace yajr {
void CommunicationPeer::startKeepAlive(
uint64_t begin,
uint64_t repeat,
uint64_t interval) {
LOG(DEBUG) << this << " interval=" << interval << " begin=" << begin << " repeat=" << repeat;
uint64_t timeoutAfter) {
LOG(DEBUG) << this << " timeoutAfter=" << timeoutAfter << " begin=" << begin << " repeat=" << repeat;

sendEchoReq();
bumpLastHeard();

keepAliveInterval_ = interval;
keepAliveInterval_ = timeoutAfter;
uv_timer_start(&keepAliveTimer_, on_timeout, begin, repeat);
}

Expand Down Expand Up @@ -339,11 +339,7 @@ void CommunicationPeer::timeout() {
return;
}

if (rtt <= (keepAliveInterval_ >> 3u) ) {
return;
}

if (rtt > (keepAliveInterval_ << 3u) ) {
if (rtt > keepAliveInterval_) {
LOG(WARNING) << this << " tearing down the connection upon timeout";
/* close the connection and hope for the best */
this->onDisconnect();
Expand Down
2 changes: 1 addition & 1 deletion libopflex/engine/OpflexClientConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void OpflexClientConnection::on_state_change(Peer * p, void * data,

if (conn->pool->clientCtx.get())
ZeroCopyOpenSSL::attachTransport(p, conn->pool->clientCtx.get());
p->startKeepAlive(10000, 15000, 60000);
p->startKeepAlive(10000, 15000, conn->getKeepaliveTimeout());

conn->pool->updatePeerStatus(conn->hostname, conn->port,
PeerStatusListener::CONNECTED);
Expand Down
1 change: 1 addition & 0 deletions libopflex/engine/OpflexPEHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class SendIdentityReq : public OpflexMessage {
OpflexPEHandler::OpflexPEHandler(OpflexConnection* conn, Processor* processor_)
: OpflexHandler(conn), processor(processor_) {
conn->setHandshakeTimeout(processor_->getHandshakeTimeout());
conn->setKeepaliveTimeout(processor_->getKeepaliveTimeout());
}

void OpflexPEHandler::connected() {
Expand Down
2 changes: 1 addition & 1 deletion libopflex/engine/OpflexServerConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void OpflexServerConnection::on_state_change(yajr::Peer * p, void * data,
ZeroCopyOpenSSL::Ctx* serverCtx = conn->listener->serverCtx.get();
if (serverCtx)
ZeroCopyOpenSSL::attachTransport(p, serverCtx);
p->startKeepAlive(10000, 15000, 45000);
p->startKeepAlive(10000, 15000, 120000);

conn->handler->connected();
}
Expand Down
15 changes: 15 additions & 0 deletions libopflex/engine/include/opflex/engine/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ class Processor : public internal::AbstractObjectListener,
peerHandshakeTimeout = timeout;
}

/**
* Get the keepalive timeout
*/
uint32_t getKeepaliveTimeout() const {
return keepaliveTimeout;
}

/**
* Set the keepalive timeout
*/
void setKeepaliveTimeout(const uint32_t timeout) {
keepaliveTimeout = timeout;
}
\
/**
* Set the prr timer duration in secs
*/
Expand Down Expand Up @@ -524,6 +538,7 @@ class Processor : public internal::AbstractObjectListener,
uint64_t prrTimerDuration = DEFAULT_PRR_TIMER_DURATION;

uint32_t peerHandshakeTimeout = 45000;
uint32_t keepaliveTimeout = 120000;

/**
* policy refresh timer duration in msecs
Expand Down
17 changes: 17 additions & 0 deletions libopflex/engine/include/opflex/engine/internal/OpflexConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,22 @@ class OpflexConnection : public opflex::jsonrpc::RpcConnection {
handshakeTimeout = timeout;
}

/**
* Get the keepalive timeout (in ms)
* @return timeout
*/
uint32_t getKeepaliveTimeout() const {
return keepaliveTimeout;
}

/**
* Set the keepalive timeout (in ms)
* @param timeout timeout
*/
void setKeepaliveTimeout(uint32_t timeout) {
keepaliveTimeout = timeout;
}

protected:
/**
* The handler for the connection
Expand All @@ -133,6 +149,7 @@ class OpflexConnection : public opflex::jsonrpc::RpcConnection {

private:
uint32_t handshakeTimeout;
uint32_t keepaliveTimeout;

virtual void notifyReady();
virtual void notifyFailed() {}
Expand Down
6 changes: 6 additions & 0 deletions libopflex/include/opflex/ofcore/OFFramework.h
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,12 @@ class OFFramework : private boost::noncopyable {
*/
void setHandshakeTimeout(const uint32_t timeout);

/**
* Set the keepalive timeout
* @param timeout keepalive timeout in milliseconds
*/
void setKeepaliveTimeout(const uint32_t timeout);

/**
* Start the framework. This will start all the framework threads
* and attempt to connect to configured OpFlex peers.
Expand Down
8 changes: 4 additions & 4 deletions libopflex/include/opflex/yajr/internal/comms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,12 @@ class CommunicationPeer : public Peer, virtual public ::yajr::Peer {
* Start TCP keepalive to peer
* @param begin delay before starting
* @param repeat repeat
* @param interval interval
* @param timeoutAfter timeout after
*/
virtual void startKeepAlive(
uint64_t begin = 100,
uint64_t repeat = 1250,
uint64_t interval = 9000);
uint64_t begin = 100,
uint64_t repeat = 1250,
uint64_t timeoutAfter = 9000);

/**
* Stop TCP keepalive
Expand Down
4 changes: 2 additions & 2 deletions libopflex/include/opflex/yajr/yajr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ class Peer {
*
* @param begin delay before starting
* @param repeat repeat
* @param interval interval
* @param timeoutAfter timeoutAfter
*/
virtual void startKeepAlive(
uint64_t begin = 100,
uint64_t repeat = 1250,
uint64_t interval = 9000
uint64_t timeoutAfter = 9000
) = 0;

/**
Expand Down
4 changes: 4 additions & 0 deletions libopflex/ofcore/OFFramework.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ void OFFramework::setHandshakeTimeout(const uint32_t timeout) {
pimpl->processor.setHandshakeTimeout(timeout);
}

void OFFramework::setKeepaliveTimeout(const uint32_t timeout) {
pimpl->processor.setKeepaliveTimeout(timeout);
}

void OFFramework::start() {
LOG(DEBUG) << "Starting OpFlex Framework";
pimpl->started = true;
Expand Down
1 change: 1 addition & 0 deletions libopflex/ofcore/test/OFFramework_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ BOOST_AUTO_TEST_CASE( test_misc ) {
BOOST_CHECK_EQUAL(opflex::ofcore::OFConstants::TRANSPORT_MODE, fw.getElementMode());
fw.setPrrTimerDuration(12345);
fw.setHandshakeTimeout(54321);
fw.setKeepaliveTimeout(123456);
boost::asio::ip::address_v4 proxy;
fw.getV4Proxy(proxy);
fw.getV6Proxy(proxy);
Expand Down

0 comments on commit f9ff4ed

Please sign in to comment.