Skip to content

Commit

Permalink
Fix for large values of policy unresolved counts
Browse files Browse the repository at this point in the history
Whenever a connection is destroyed, we need to clear the pending resolutions for that connection. Without this, unresolved counts and pending resolutions can get out of sync.

Signed-off-by: Gautam Venkataramanan <gautam.chennai@gmail.com>
  • Loading branch information
gautvenk committed Jun 29, 2020
1 parent f0854f9 commit bfc1098
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
1 change: 1 addition & 0 deletions libopflex/engine/OpflexClientConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ OpflexClientConnection::OpflexClientConnection(HandlerFactory& handlerFactory,
}

OpflexClientConnection::~OpflexClientConnection() {
pool->clearPendingItems(this);
}

const string& OpflexClientConnection::getName() {
Expand Down
27 changes: 23 additions & 4 deletions libopflex/engine/OpflexPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,39 @@ OpflexPool::~OpflexPool() {
}

void OpflexPool::addPendingItem(OpflexClientConnection* conn, const std::string& uri) {
std::string hostName = conn->getHostname();
const std::string& hostName = conn->getHostname();
std::unique_lock<std::mutex> lock(modify_uri_mutex);
if(pendingResolution[hostName].insert(uri).second == true) {
conn->getOpflexStats()->incrPolUnresolvedCount();
if (pendingResolution[hostName].insert(uri).second == true) {
conn->getOpflexStats()->incrPolUnresolvedCount();
LOG(TRACE) << "add pending UC: conn: " << hostName
<< " size: " << pendingResolution[hostName].size()
<< " UC: " << conn->getOpflexStats()->getPolUnresolvedCount()
<< " URI: " << uri;
}
}

void OpflexPool::clearPendingItems (OpflexClientConnection* conn)
{
const std::string& hostName = conn->getHostname();
std::unique_lock<std::mutex> lock(modify_uri_mutex);
if (pendingResolution.find(hostName) != pendingResolution.end()) {
pendingResolution[hostName].clear();
pendingResolution.erase(hostName);
LOG(TRACE) << "clearing pending UCs: conn: " << hostName;
}
}

void OpflexPool::removePendingItem(OpflexClientConnection* conn, const std::string& uri) {
std::string hostName = conn->getHostname();
const std::string& hostName = conn->getHostname();
std::unique_lock<std::mutex> lock(modify_uri_mutex);
std::set<std::string>::iterator rem = pendingResolution[hostName].find(uri);
if (rem != pendingResolution[hostName].end()) {
pendingResolution[hostName].erase(rem);
conn->getOpflexStats()->decrPolUnresolvedCount();
LOG(TRACE) << "del pending UC: conn: " << hostName
<< " size: " << pendingResolution[hostName].size()
<< " UC: " << conn->getOpflexStats()->getPolUnresolvedCount()
<< " URI: " << uri;
}
}

Expand Down
2 changes: 1 addition & 1 deletion libopflex/engine/include/opflex/engine/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class Processor : public internal::AbstractObjectListener,
NEW,
/** a local item that's been updated */
UPDATED,
/** a local item that's been send to the server */
/** a local item that's been sent to the server */
IN_SYNC,
/** A remote item that does not get resolved */
REMOTE,
Expand Down
5 changes: 5 additions & 0 deletions libopflex/engine/include/opflex/engine/internal/OpflexPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,11 @@ class OpflexPool : private boost::noncopyable {
*/
void removePendingItem(OpflexClientConnection* conn, const std::string& uri);

/**
* Clear the pending policies per connection
*/
void clearPendingItems(OpflexClientConnection* conn);

/**
* Register the given peer status listener to get updates on the
* health of the connection pool and on individual connections.
Expand Down

0 comments on commit bfc1098

Please sign in to comment.