Skip to content

Commit

Permalink
Backporting fixes from master
Browse files Browse the repository at this point in the history
#297
#305
#310
#312

Signed-off-by: Gautam Venkataramanan <gautam.chennai@gmail.com>
  • Loading branch information
gautvenk committed Dec 15, 2020
1 parent e462e6c commit 50c1c3b
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 70 deletions.
9 changes: 7 additions & 2 deletions agent-ovs/lib/EndpointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1368,9 +1368,13 @@ void EndpointManager::getEndpointsByIface(const std::string& ifaceName,
getEps(ifaceName, iface_ep_map, eps);
}

const ip_ep_map_t& EndpointManager::getIPLocalEpMap (void) {
std::shared_ptr<const Endpoint> EndpointManager::getEpFromLocalMap (const std::string& ip) {
unique_lock<mutex> guard(ep_mutex);
return ip_local_ep_map;
const auto& itr = ip_local_ep_map.find(ip);
if (itr != ip_local_ep_map.end()) {
return itr->second;
}
return nullptr;
}

void EndpointManager::getEndpointUUIDs( /* out */ str_uset_t& eps) {
Expand Down Expand Up @@ -1444,6 +1448,7 @@ void EndpointManager::updateEndpointCounters(const std::string& uuid,

mutator.commit();
#ifdef HAVE_PROMETHEUS_SUPPORT
lock_guard<mutex> guard(ep_mutex);
ep_map_t::iterator it = ep_map.find(uuid);
if (it != ep_map.end()) {
EndpointState& es = it->second;
Expand Down
1 change: 1 addition & 0 deletions agent-ovs/lib/IdGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ void IdGenerator::initNamespace(const std::string& nmspc,

void IdGenerator::collectGarbage(const std::string& ns,
garbage_cb_t cb) {
lock_guard<mutex> guard(id_mutex);
NamespaceMap::iterator nitr = namespaces.find(ns);
if (nitr == namespaces.end()) {
return;
Expand Down
7 changes: 4 additions & 3 deletions agent-ovs/lib/include/opflexagent/EndpointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,12 @@ class EndpointManager : public PolicyListener,
void getEndpointUUIDs( /* out */ std::unordered_set<std::string>& eps);

/**
* Get IP to local EP map
* Get Endpoint from Local map based on IP
*
* @return IP to local EP map
* @param ip IP address of the endpoint
* @return shared ptr to the endpoint if available
*/
const ip_ep_map_t& getIPLocalEpMap(void);
std::shared_ptr<const Endpoint> getEpFromLocalMap(const std::string& ip);

/**
* Get the endpoints that are on a particular access interface
Expand Down
3 changes: 2 additions & 1 deletion agent-ovs/ovs/ContractStatsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ void ContractStatsManager::on_timer(const error_code& ec) {
TableState::cookie_callback_t cb_func;
cb_func = [this](uint64_t cookie, uint16_t priority,
const struct match& match) {
const std::lock_guard<std::mutex> lock(pstatMtx);
updateFlowEntryMap(contractState, cookie, priority, match);
};

// Request Switch Manager to provide flow entries
{
std::lock_guard<std::mutex> lock(pstatMtx);
switchManager.forEachCookieMatch(IntFlowManager::POL_TABLE_ID,
cb_func);
const std::lock_guard<std::mutex> lock(pstatMtx);
PolicyCounterMap_t newClassCountersMap;
on_timer_base(ec, contractState, newClassCountersMap);
generatePolicyStatsObjects(&newClassCountersMap);
Expand Down
151 changes: 106 additions & 45 deletions agent-ovs/ovs/IntFlowManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
#include <cstdlib>
#include <cstring>
#include <sstream>
#include <sys/resource.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <boost/system/error_code.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
Expand Down Expand Up @@ -149,8 +152,9 @@ IntFlowManager::IntFlowManager(Agent& agent_,
floodScope(FLOOD_DOMAIN), tunnelPortStr("4789"),
virtualRouterEnabled(false), routerAdv(false),
virtualDHCPEnabled(false), conntrackEnabled(false), dropLogRemotePort(0),
serviceStatsFlowDisabled(true),
advertManager(agent, *this), isSyncing(false), stopping(false) {
serviceStatsFlowDisabled(false),
advertManager(agent, *this), isSyncing(false), stopping(false),
svcStatsTaskQueue(svcStatsIOService) {
// set up flow tables
switchManager.setMaxFlowTables(NUM_FLOW_TABLES);
SwitchManager::TableDescriptionMap fwdTblDescr;
Expand Down Expand Up @@ -183,6 +187,30 @@ void IntFlowManager::start(bool serviceStatsFlowDisabled_) {

initPlatformConfig();
createStaticFlows();

svcStatsIOWork.reset(new boost::asio::io_service::work(svcStatsIOService));
svcStatsThread.reset(new std::thread([this]() {
LOG(DEBUG) << "svcStatsThread start IO run";
const pid_t tid = syscall(SYS_gettid);
// By default sched policy is SCHED_OTHER for all threads in linux.
// Default priority is 0. SCHED_FIFO/RR will make threads with
// min prio of 1 and max prio of 99, and these policy threads will
// always preempt threads with SCHED_OTHER. Instead of changing all
// the existing threads to SCHED_FIFO/RR and innfluence priorities,
// there is a way to influence NICE values of SCHED_OTHER threads.
// Nice values vary from -20(high) to +19(low)
// Refer:
// 1. https://man7.org/linux/man-pages/man7/sched.7.html
// 2. https://linux.die.net/man/2/setpriority <-- this sets the nice
// value of SCHED_OTHER threads
if (setpriority(PRIO_PROCESS, tid, 19)) {
LOG(ERROR) << "Unable to set low priority for svcStatsThread";
return;
}
svcStatsIOService.run();
LOG(DEBUG) << "svcStatsThread no more IO";
}));
LOG(DEBUG) << "Starting svcStatsIOWork and svcStatsThread";
}

void IntFlowManager::registerModbListeners() {
Expand All @@ -200,6 +228,16 @@ void IntFlowManager::stop() {
LOG(DEBUG) << "Stopping IntFlowManager";
stopping = true;

if (svcStatsIOWork) {
LOG(DEBUG) << "Stopping svcStatsIOWork";
svcStatsIOWork.reset();
}
if (svcStatsThread) {
LOG(DEBUG) << "Stopping svcStatsThread";
svcStatsThread->join();
svcStatsThread.reset();
}

agent.getEndpointManager().unregisterListener(this);
agent.getServiceManager().unregisterListener(this);
agent.getExtraConfigManager().unregisterListener(this);
Expand Down Expand Up @@ -2402,8 +2440,6 @@ updateSvcStatsCounters (const uint64_t &cookie,
const uint64_t &newPktCount,
const uint64_t &newByteCount)
{
const std::lock_guard<mutex> lock(svcStatMutex);

// Additional safety for stats flows:
// Nothing must be reported from ServiceStatsManager
// if serviceStatsFlowDisabled=true, since the stats flows wont be created
Expand Down Expand Up @@ -2972,6 +3008,48 @@ void IntFlowManager::clearSvcStatsCounters (const std::string& uuid,
mutator.commit();
}

void IntFlowManager::handleUpdateSvcStatsFlows (const string& task_id)
{
bool is_svc = strcmp(task_id.substr(0,1).c_str(),"1") == 0;
bool is_add = strcmp(task_id.substr(1,1).c_str(),"1") == 0;
const string& uuid = task_id.substr(2);

LOG(DEBUG) << "##### Updating service stats flows:"
<< " uuid: " << uuid
<< " is_svc: " << is_svc
<< " is_add: " << is_add << "#######";

updatePodSvcStatsFlows(uuid, is_svc, is_add);
updateSvcTgtStatsFlows(uuid, is_svc, is_add);
updateSvcNodeStatsFlows(uuid, is_svc, is_add);
updateSvcExtStatsFlows(uuid, is_svc, is_add);

if (is_svc && is_add) {
// Svc Stats flow programming happens in this low prio thread.
// SNAT/DNAT flows will be programmed initially from a different thread.
// It will get updated here if needed for stats to work.
ServiceManager& srvMgr = agent.getServiceManager();
shared_ptr<const Service> asWrapper = srvMgr.getService(uuid);
if (!asWrapper || !asWrapper->getDomainURI()) {
LOG(DEBUG) << "unable to get service from uuid";
return;
}
programServiceSnatDnatFlows(uuid);
} else {
unordered_set<string> svcUuids;
ServiceManager& svcMgr = agent.getServiceManager();
svcMgr.getServiceUUIDs(svcUuids);
for (const string& svcUuid : svcUuids) {
// If EP IP is added, and happens to be NH of this service, then cookie needs to be updated
// If EP IP is deleted, and happens to be NH of this service, then cookie needs to be removed
// If EP IP is modified:
// - if it became NH of this service, then cookie needs to be updated
// - if it moved away from being NH of this service, then cookie needs to be removed
programServiceSnatDnatFlows(svcUuid);
}
}
}

/**
* Add/del stats flows:
* Cluster/E-W:
Expand All @@ -2982,20 +3060,20 @@ void IntFlowManager::updateSvcStatsFlows (const string& uuid,
const bool& is_svc,
const bool& is_add)
{
const std::lock_guard<mutex> lock(svcStatMutex);

if (serviceStatsFlowDisabled)
return;

LOG(DEBUG) << "##### Updating service stats flows:"
<< " uuid: " << uuid
<< " is_svc: " << is_svc
<< " is_add: " << is_add << "#######";

updatePodSvcStatsFlows(uuid, is_svc, is_add);
updateSvcTgtStatsFlows(uuid, is_svc, is_add);
updateSvcNodeStatsFlows(uuid, is_svc, is_add);
updateSvcExtStatsFlows(uuid, is_svc, is_add);
std::string task_id;
if (is_svc)
task_id += "1";
else
task_id += "0";
if (is_add)
task_id += "1";
else
task_id += "0";
task_id += uuid;
svcStatsTaskQueue.dispatch(task_id, [=]() { handleUpdateSvcStatsFlows(task_id); });
}

void IntFlowManager::updateSvcExtStatsFlows (const string &uuid,
Expand Down Expand Up @@ -3152,14 +3230,13 @@ void IntFlowManager::updateSvcExtStatsFlows (const string &uuid,

for (auto const& sm : as.getServiceMappings()) {
for (const string& nhipstr : sm.getNextHopIPs()) {
const ip_ep_map_t& ip_ep_map = agent.getEndpointManager().getIPLocalEpMap();
const auto& itr = ip_ep_map.find(nhipstr);
if (itr != ip_ep_map.end()) {
const auto& pEp = agent.getEndpointManager().getEpFromLocalMap(nhipstr);
if (pEp) {
svcTgtCkAddExpr(flow_uuid,
uuid,
nhipstr, sm, as,
as.getAttributes(),
itr->second->getAttributes());
pEp->getAttributes());
}
}
}
Expand Down Expand Up @@ -3202,12 +3279,6 @@ void IntFlowManager::updateSvcExtStatsFlows (const string &uuid,
// If an IP is deleted, then cookie will be deleted
for (const string& svcUuid : svcUuids) {
updateSvcExtStatsFlows(svcUuid, true, true);
// If EP IP is modified:
// - if it became NH of this service, then SNAT/DNAT flows should be
// updated with stats cookie
// - if it moved away from being NH of this service, then SNAT/DNAT
// flows shouldnt match on stats cookie
programServiceSnatDnatFlows(svcUuid);
}
}
}
Expand Down Expand Up @@ -3440,14 +3511,13 @@ void IntFlowManager::updateSvcNodeStatsFlows (const string &uuid,

for (auto const& sm : as.getServiceMappings()) {
for (const string& nhipstr : sm.getNextHopIPs()) {
const ip_ep_map_t& ip_ep_map = agent.getEndpointManager().getIPLocalEpMap();
const auto& itr = ip_ep_map.find(nhipstr);
if (itr != ip_ep_map.end()) {
const auto& pEp = agent.getEndpointManager().getEpFromLocalMap(nhipstr);
if (pEp) {
svcNodeFlowAddExpr(flow_uuid,
uuid,
nhipstr, sm,
as.getAttributes(),
itr->second->getAttributes());
pEp->getAttributes());
}
}
}
Expand Down Expand Up @@ -3694,14 +3764,13 @@ void IntFlowManager::updateSvcTgtStatsFlows (const string &uuid,

for (auto const& sm : as.getServiceMappings()) {
for (const string& nhipstr : sm.getNextHopIPs()) {
const ip_ep_map_t& ip_ep_map = agent.getEndpointManager().getIPLocalEpMap();
const auto& itr = ip_ep_map.find(nhipstr);
if (itr != ip_ep_map.end()) {
const auto& pEp = agent.getEndpointManager().getEpFromLocalMap(nhipstr);
if (pEp) {
svcTgtFlowAddExpr(flow_uuid,
uuid,
nhipstr, sm,
as.getAttributes(),
itr->second->getAttributes());
pEp->getAttributes());
}
}
}
Expand Down Expand Up @@ -3743,12 +3812,6 @@ void IntFlowManager::updateSvcTgtStatsFlows (const string &uuid,
// If an IP is deleted, then stats flows will be deleted
for (const string& svcUuid : svcUuids) {
updateSvcTgtStatsFlows(svcUuid, true, true);
// If EP IP is added, and happens to be NH of this service, then cookie needs to be updated
// If EP IP is deleted, and happens to be NH of this service, then cookie needs to be removed
// If EP IP is modified:
// - if it became NH of this service, then cookie needs to be updated
// - if it moved away from being NH of this service, then cookie needs to be removed
programServiceSnatDnatFlows(svcUuid);
}
}

Expand Down Expand Up @@ -3782,6 +3845,7 @@ void IntFlowManager::updatePodSvcStatsFlows (const string &uuid,
clearPodSvcStatsCounters("svctoep:"+uuid);
idGen.erase(ID_NMSPC_SVCSTATS, "eptosvc:"+uuid);
idGen.erase(ID_NMSPC_SVCSTATS, "svctoep:"+uuid);
const std::lock_guard<mutex> lock(svcStatMutex);
podSvcUuidCkMap.erase(uuid);
};

Expand Down Expand Up @@ -3833,6 +3897,7 @@ void IntFlowManager::updatePodSvcStatsFlows (const string &uuid,
}
}

const std::lock_guard<mutex> lock(svcStatMutex);
const string& ingStr = "eptosvc:"+uuid;
const string& egrStr = "svctoep:"+uuid;
auto itr = podSvcUuidCkMap.find(uuid);
Expand Down Expand Up @@ -4133,17 +4198,14 @@ void IntFlowManager::updateServiceSnatDnatFlows(const string& uuid,
}

vector<address> nextHopAddrs;
const ip_ep_map_t& ip_ep_map =
agent.getEndpointManager().getIPLocalEpMap();
for (const string& ipstr : sm.getNextHopIPs()) {
auto nextHopAddr = address::from_string(ipstr, ec);
if (ec) {
LOG(WARNING) << "Invalid service next hop IP: "
<< ipstr << ": " << ec.message();
} else {
if (loopback) {
const auto& it = ip_ep_map.find(ipstr);
if (it == ip_ep_map.end())
if (!agent.getEndpointManager().getEpFromLocalMap(ipstr))
continue;
}
nextHopAddrs.push_back(nextHopAddr);
Expand Down Expand Up @@ -6243,8 +6305,7 @@ static bool svcStatsIdGarbageCb(EndpointManager& epManager,
}

// ensure the pod is still local
const ip_ep_map_t& ip_ep_map = epManager.getIPLocalEpMap();
if (ip_ep_map.find(nhipStr) != ip_ep_map.end())
if (epManager.getEpFromLocalMap(nhipStr))
return true;
}
return false;
Expand Down
2 changes: 1 addition & 1 deletion agent-ovs/ovs/OVSRenderer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ void OVSRenderer::setProperties(const ptree& properties) {

ifaceStatsEnabled = properties.get<bool>(STATS_INTERFACE_ENABLED, true);
contractStatsEnabled = properties.get<bool>(STATS_CONTRACT_ENABLED, true);
serviceStatsFlowDisabled = properties.get<bool>(STATS_SERVICE_FLOWDISABLED, true);
serviceStatsFlowDisabled = properties.get<bool>(STATS_SERVICE_FLOWDISABLED, false);
serviceStatsEnabled = properties.get<bool>(STATS_SERVICE_ENABLED, true);
secGroupStatsEnabled = properties.get<bool>(STATS_SECGROUP_ENABLED, true);
ifaceStatsInterval = properties.get<long>(STATS_INTERFACE_INTERVAL, 30000);
Expand Down
9 changes: 7 additions & 2 deletions agent-ovs/ovs/PolicyStatsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ void PolicyStatsManager::registerConnection(SwitchConnection* connection) {
this->connection = connection;
}

void PolicyStatsManager::start(bool register_listener) {
void PolicyStatsManager::start(bool register_listener,
boost::optional<boost::asio::io_service&> io_service) {
stopping = false;

LOG(DEBUG) << "Starting policy stats manager " << this;
Expand All @@ -73,7 +74,11 @@ void PolicyStatsManager::start(bool register_listener) {
connection->RegisterMessageHandler(OFPTYPE_FLOW_REMOVED, this);
{
std::lock_guard<std::mutex> lock(timer_mutex);
timer.reset(new deadline_timer(agent->getAgentIOService(),
if (io_service)
timer.reset(new deadline_timer(io_service.get(),
milliseconds(timer_interval)));
else
timer.reset(new deadline_timer(agent->getAgentIOService(),
milliseconds(timer_interval)));
}
}
Expand Down
Loading

0 comments on commit 50c1c3b

Please sign in to comment.