Skip to content

Commit

Permalink
[MirrorOrch]: Mirror Session Retention across Warm Reboot (#1054)
Browse files Browse the repository at this point in the history
* [MirrorOrch]: Mirror Session Retention across Warm Reboot

After warm reboot, it is expected that the monitor port of
the mirror session is retained - no changing on the monitor
port withint the ECMP group members and the LAG members. This
is due to the general of the sairedis comparison logic and
the minimalization of SAI function calls during reconciliation.

Changes:
1. Add bake() and postBake() functions in MirrorOrch
   bake() function retrieves the state database information
   and get the VLAN + monitor port information.
   postBake() function leverages the information and recovers
   the active mirror sessions the same as before warm reboot.
2. state database format change
   Instead of storing the object ID of the monitor port, store
   the alias of the monitor port.
   Instead of storing true/false of VLAN header, store the VLAN
   ID.

Update: Freeze doTask() function instead of update() function

With this update, we could fix potential orchagent issues before
the warm reboot when the monitor port was wrongly calculated.

Signed-off-by: Shu0T1an ChenG <shuche@microsoft.com>
  • Loading branch information
stcheng committed Sep 20, 2019
1 parent a5b6e7c commit d823dd1
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 33 deletions.
196 changes: 172 additions & 24 deletions orchagent/mirrororch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
#include "swssnet.h"
#include "converter.h"
#include "mirrororch.h"
#include "tokenize.h"

#define MIRROR_SESSION_STATUS "status"
#define MIRROR_SESSION_STATUS_ACTIVE "active"
#define MIRROR_SESSION_STATUS_INACTIVE "inactive"
#define MIRROR_SESSION_NEXT_HOP_IP "next_hop_ip"
#define MIRROR_SESSION_SRC_IP "src_ip"
#define MIRROR_SESSION_DST_IP "dst_ip"
#define MIRROR_SESSION_GRE_TYPE "gre_type"
Expand All @@ -23,7 +25,7 @@
#define MIRROR_SESSION_DST_MAC_ADDRESS "dst_mac"
#define MIRROR_SESSION_MONITOR_PORT "monitor_port"
#define MIRROR_SESSION_ROUTE_PREFIX "route_prefix"
#define MIRROR_SESSION_VLAN_HEADER_VALID "vlan_header_valid"
#define MIRROR_SESSION_VLAN_ID "vlan_id"
#define MIRROR_SESSION_POLICER "policer"

#define MIRROR_SESSION_DEFAULT_VLAN_PRI 0
Expand Down Expand Up @@ -58,6 +60,7 @@ MirrorEntry::MirrorEntry(const string& platform) :
}

nexthopInfo.prefix = IpPrefix("0.0.0.0/0");
nexthopInfo.nexthop = IpAddress("0.0.0.0");
}

MirrorOrch::MirrorOrch(TableConnector stateDbConnector, TableConnector confDbConnector,
Expand All @@ -75,6 +78,74 @@ MirrorOrch::MirrorOrch(TableConnector stateDbConnector, TableConnector confDbCon
m_fdbOrch->attach(this);
}

bool MirrorOrch::bake()
{
SWSS_LOG_ENTER();

// Freeze the route update during orchagent restoration
m_freeze = true;

deque<KeyOpFieldsValuesTuple> entries;
vector<string> keys;
m_mirrorTable.getKeys(keys);
for (const auto &key : keys)
{
vector<FieldValueTuple> tuples;
m_mirrorTable.get(key, tuples);

bool active = false;
string monitor_port;
string next_hop_ip;

for (const auto &tuple : tuples)
{
if (fvField(tuple) == MIRROR_SESSION_STATUS)
{
active = fvValue(tuple) == MIRROR_SESSION_STATUS_ACTIVE;
}
else if (fvField(tuple) == MIRROR_SESSION_MONITOR_PORT)
{
monitor_port = fvValue(tuple);
}
else if (fvField(tuple) == MIRROR_SESSION_NEXT_HOP_IP)
{
next_hop_ip = fvValue(tuple);
}
}

if (!active)
{
continue;
}

SWSS_LOG_NOTICE("Found mirror session %s active before warm reboot",
key.c_str());

// Recover saved active session's monitor port
m_recoverySessionMap.emplace(
key, monitor_port + state_db_key_delimiter + next_hop_ip);
}

return Orch::bake();
}

bool MirrorOrch::postBake()
{
SWSS_LOG_ENTER();

SWSS_LOG_NOTICE("Start MirrorOrch post-baking");

// Unfreeze the route update
m_freeze = false;

Orch::doTask();

// Clean up the recovery cache
m_recoverySessionMap.clear();

return Orch::postBake();
}

void MirrorOrch::update(SubjectType type, void *cntx)
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -320,10 +391,11 @@ void MirrorOrch::setSessionState(const string& name, const MirrorEntry& session,
{
SWSS_LOG_ENTER();

SWSS_LOG_INFO("Setting mirroring sessions %s state\n", name.c_str());
SWSS_LOG_INFO("Update mirroring sessions %s state", name.c_str());

vector<FieldValueTuple> fvVector;
string value;

if (attr.empty() || attr == MIRROR_SESSION_STATUS)
{
value = session.status ? MIRROR_SESSION_STATUS_ACTIVE : MIRROR_SESSION_STATUS_INACTIVE;
Expand All @@ -332,8 +404,9 @@ void MirrorOrch::setSessionState(const string& name, const MirrorEntry& session,

if (attr.empty() || attr == MIRROR_SESSION_MONITOR_PORT)
{
value = sai_serialize_object_id(session.neighborInfo.portId);
fvVector.emplace_back(MIRROR_SESSION_MONITOR_PORT, value);
Port port;
m_portsOrch->getPort(session.neighborInfo.portId, port);
fvVector.emplace_back(MIRROR_SESSION_MONITOR_PORT, port.m_alias);
}

if (attr.empty() || attr == MIRROR_SESSION_DST_MAC_ADDRESS)
Expand All @@ -348,10 +421,16 @@ void MirrorOrch::setSessionState(const string& name, const MirrorEntry& session,
fvVector.emplace_back(MIRROR_SESSION_ROUTE_PREFIX, value);
}

if (attr.empty() || attr == MIRROR_SESSION_VLAN_HEADER_VALID)
if (attr.empty() || attr == MIRROR_SESSION_VLAN_ID)
{
value = to_string(session.neighborInfo.port.m_vlan_info.vlan_id);
fvVector.emplace_back(MIRROR_SESSION_VLAN_ID, value);
}

if (attr.empty() || attr == MIRROR_SESSION_NEXT_HOP_IP)
{
value = to_string(session.neighborInfo.port.m_type == Port::VLAN);
fvVector.emplace_back(MIRROR_SESSION_VLAN_HEADER_VALID, value);
value = session.nexthopInfo.nexthop.to_string();
fvVector.emplace_back(MIRROR_SESSION_NEXT_HOP_IP, value);
}

m_mirrorTable.set(name, fvVector);
Expand Down Expand Up @@ -396,32 +475,68 @@ bool MirrorOrch::getNeighborInfo(const string& name, MirrorEntry& session)
return false;
}

// Get the firt member of the LAG
Port member;
const auto& first_member_alias = *session.neighborInfo.port.m_members.begin();
m_portsOrch->getPort(first_member_alias, member);
// Recover the LAG member monitor port picked before warm reboot
// to minimalize the data plane changes across warm reboot.
if (m_recoverySessionMap.find(name) != m_recoverySessionMap.end())
{
string alias = tokenize(m_recoverySessionMap[name],
state_db_key_delimiter, 1)[0];
Port member;
m_portsOrch->getPort(alias, member);

SWSS_LOG_NOTICE("Recover mirror session %s with LAG member port %s",
name.c_str(), alias.c_str());
session.neighborInfo.portId = member.m_port_id;
}
else
{
// Get the firt member of the LAG
Port member;
string first_member_alias = *session.neighborInfo.port.m_members.begin();
m_portsOrch->getPort(first_member_alias, member);

session.neighborInfo.portId = member.m_port_id;
}

session.neighborInfo.portId = member.m_port_id;
return true;
}
case Port::VLAN:
{
SWSS_LOG_NOTICE("Get mirror session destination IP neighbor VLAN %d",
session.neighborInfo.port.m_vlan_info.vlan_id);
Port member;
if (!m_fdbOrch->getPort(session.neighborInfo.mac, session.neighborInfo.port.m_vlan_info.vlan_id, member))

// Recover the VLAN member monitor port picked before warm reboot
// since the FDB entries are not yet learned on the hardware
if (m_recoverySessionMap.find(name) != m_recoverySessionMap.end())
{
SWSS_LOG_NOTICE("Waiting to get FDB entry MAC %s under VLAN %s",
session.neighborInfo.mac.to_string().c_str(),
session.neighborInfo.port.m_alias.c_str());
return false;
string alias = tokenize(m_recoverySessionMap[name],
state_db_key_delimiter, 1)[0];
Port member;
m_portsOrch->getPort(alias, member);

SWSS_LOG_NOTICE("Recover mirror session %s with VLAN member port %s",
name.c_str(), alias.c_str());
session.neighborInfo.portId = member.m_port_id;
}
else
{
// Update monitor port
session.neighborInfo.portId = member.m_port_id;
return true;
Port member;
if (!m_fdbOrch->getPort(session.neighborInfo.mac,
session.neighborInfo.port.m_vlan_info.vlan_id, member))
{
SWSS_LOG_NOTICE("Waiting to get FDB entry MAC %s under VLAN %s",
session.neighborInfo.mac.to_string().c_str(),
session.neighborInfo.port.m_alias.c_str());
return false;
}
else
{
// Update monitor port
session.neighborInfo.portId = member.m_port_id;
}
}

return true;
}
default:
{
Expand Down Expand Up @@ -741,7 +856,7 @@ bool MirrorOrch::updateSessionType(const string& name, MirrorEntry& session)
SWSS_LOG_NOTICE("Update mirror session %s VLAN to %s",
name.c_str(), session.neighborInfo.port.m_alias.c_str());

setSessionState(name, session, MIRROR_SESSION_VLAN_HEADER_VALID);
setSessionState(name, session, MIRROR_SESSION_VLAN_ID);

return true;
}
Expand Down Expand Up @@ -782,7 +897,35 @@ void MirrorOrch::updateNextHop(const NextHopUpdate& update)

if (update.nexthopGroup != IpAddresses())
{
session.nexthopInfo.nexthop = *update.nexthopGroup.getIpAddresses().begin();
SWSS_LOG_NOTICE(" next hop IPs: %s", update.nexthopGroup.to_string().c_str());

// Recover the session based on the state database information
if (m_recoverySessionMap.find(name) != m_recoverySessionMap.end())
{
IpAddress nexthop = IpAddress(tokenize(m_recoverySessionMap[name],
state_db_key_delimiter, 1)[1]);

// Check if recovered next hop IP is within the update's next hop IPs
if (update.nexthopGroup.getIpAddresses().count(nexthop))
{
SWSS_LOG_NOTICE("Recover mirror session %s with next hop %s",
name.c_str(), nexthop.to_string().c_str());
session.nexthopInfo.nexthop = nexthop;
}
else
{
// Correct the next hop IP
SWSS_LOG_NOTICE("Correct mirror session %s next hop from %s to %s",
name.c_str(), session.nexthopInfo.nexthop.to_string().c_str(),
nexthop.to_string().c_str());
session.nexthopInfo.nexthop = *update.nexthopGroup.getIpAddresses().begin();
}
}
else
{
// Pick the first one from the next hop group
session.nexthopInfo.nexthop = *update.nexthopGroup.getIpAddresses().begin();
}
}
else
{
Expand Down Expand Up @@ -968,6 +1111,11 @@ void MirrorOrch::doTask(Consumer& consumer)
{
SWSS_LOG_ENTER();

if (m_freeze)
{
return;
}

if (!gPortsOrch->allPortsReady())
{
return;
Expand All @@ -991,7 +1139,7 @@ void MirrorOrch::doTask(Consumer& consumer)
}
else
{
SWSS_LOG_ERROR("Unknown operation type %s\n", op.c_str());
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
}

consumer.m_toSync.erase(it++);
Expand Down
6 changes: 6 additions & 0 deletions orchagent/mirrororch.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class MirrorOrch : public Orch, public Observer, public Subject
MirrorOrch(TableConnector appDbConnector, TableConnector confDbConnector,
PortsOrch *portOrch, RouteOrch *routeOrch, NeighOrch *neighOrch, FdbOrch *fdbOrch, PolicerOrch *policerOrch);

bool bake() override;
bool postBake() override;
void update(SubjectType, void *);
bool sessionExists(const string&);
bool getSessionStatus(const string&, bool&);
Expand All @@ -86,6 +88,10 @@ class MirrorOrch : public Orch, public Observer, public Subject
Table m_mirrorTable;

MirrorTable m_syncdMirrors;
// session_name -> VLAN | monitor_port_alias | next_hop_ip
map<string, string> m_recoverySessionMap;

bool m_freeze = false;

void createEntry(const string&, const vector<FieldValueTuple>&);
void deleteEntry(const string&);
Expand Down
19 changes: 13 additions & 6 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ Orch::Orch(DBConnector *db, const string tableName, int pri)

Orch::Orch(DBConnector *db, const vector<string> &tableNames)
{
for(auto it : tableNames)
for (auto it : tableNames)
{
addConsumer(db, it, default_orch_pri);
}
}

Orch::Orch(DBConnector *db, const vector<table_name_with_pri_t> &tableNames_with_pri)
{
for(const auto& it : tableNames_with_pri)
for (const auto& it : tableNames_with_pri)
{
addConsumer(db, it.first, it.second);
}
Expand All @@ -60,7 +60,7 @@ Orch::~Orch()
vector<Selectable *> Orch::getSelectables()
{
vector<Selectable *> selectables;
for(auto& it : m_consumerMap)
for (auto& it : m_consumerMap)
{
selectables.push_back(it.second.get());
}
Expand Down Expand Up @@ -240,7 +240,7 @@ bool Orch::bake()
{
SWSS_LOG_ENTER();

for(auto &it : m_consumerMap)
for (auto &it : m_consumerMap)
{
string executorName = it.first;
auto executor = it.second;
Expand All @@ -257,6 +257,13 @@ bool Orch::bake()
return true;
}

bool Orch::postBake()
{
SWSS_LOG_ENTER();

return true;
}

/*
- Validates reference has proper format which is [table_name:object_name]
- validates table_name exists
Expand Down Expand Up @@ -365,15 +372,15 @@ ref_resolve_status Orch::resolveFieldRefValue(

void Orch::doTask()
{
for(auto &it : m_consumerMap)
for (auto &it : m_consumerMap)
{
it.second->drain();
}
}

void Orch::dumpPendingTasks(vector<string> &ts)
{
for(auto &it : m_consumerMap)
for (auto &it : m_consumerMap)
{
Consumer* consumer = dynamic_cast<Consumer *>(it.second.get());
if (consumer == NULL)
Expand Down
2 changes: 2 additions & 0 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class Orch
// Prepare for warm start if Redis contains valid input data
// otherwise fallback to cold start
virtual bool bake();
// Clean up the state set in bake()
virtual bool postBake();

/* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
virtual void doTask();
Expand Down
Loading

0 comments on commit d823dd1

Please sign in to comment.