Skip to content

Commit

Permalink
redis: Updating fpmsyncd and orchagent to use redis pipeline (sonic-n…
Browse files Browse the repository at this point in the history
  • Loading branch information
qiluo-msft authored and Shuotian Cheng committed Jan 5, 2017
1 parent 9be6620 commit 3681cd2
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 18 deletions.
21 changes: 11 additions & 10 deletions fpmsyncd/fpmlink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ using namespace swss;
using namespace std;

FpmLink::FpmLink(int port) :
m_bufSize(FPM_MAX_MSG_LEN * 2),
MSG_BATCH_SIZE(256),
m_bufSize(FPM_MAX_MSG_LEN * MSG_BATCH_SIZE),
m_messageBuffer(NULL),
m_pos(0),
m_connected(false),
Expand All @@ -21,20 +22,20 @@ FpmLink::FpmLink(int port) :

m_server_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (m_server_socket < 0)
throw std::system_error(errno, std::system_category());
throw system_error(errno, system_category());

if (setsockopt(m_server_socket, SOL_SOCKET, SO_REUSEADDR, &true_val,
sizeof(true_val)) < 0)
{
close(m_server_socket);
throw std::system_error(errno, std::system_category());
throw system_error(errno, system_category());
}

if (setsockopt(m_server_socket, SOL_SOCKET, SO_KEEPALIVE, &true_val,
sizeof(true_val)) < 0)
{
close(m_server_socket);
throw std::system_error(errno, std::system_category());
throw system_error(errno, system_category());
}

memset (&addr, 0, sizeof (addr));
Expand All @@ -45,13 +46,13 @@ FpmLink::FpmLink(int port) :
if (bind(m_server_socket, (struct sockaddr *)&addr, sizeof(addr)) < 0)
{
close(m_server_socket);
throw std::system_error(errno, std::system_category());
throw system_error(errno, system_category());
}

if (listen(m_server_socket, 2) != 0)
{
close(m_server_socket);
throw std::system_error(errno, std::system_category());
throw system_error(errno, system_category());
}

m_server_up = true;
Expand All @@ -75,7 +76,7 @@ void FpmLink::accept()
m_connection_socket = ::accept(m_server_socket, (struct sockaddr *)&client_addr,
&client_len);
if (m_connection_socket < 0)
throw std::system_error(errno, std::system_category());
throw system_error(errno, system_category());

SWSS_LOG_INFO("New connection accepted from: %s\n", inet_ntoa(client_addr.sin_addr));
}
Expand Down Expand Up @@ -107,7 +108,7 @@ void FpmLink::readMe()
if (read == 0)
throw FpmConnectionClosedException();
if (read < 0)
throw std::system_error(errno, std::system_category());
throw system_error(errno, system_category());
m_pos+= read;

/* Check for complete messages */
Expand All @@ -123,13 +124,13 @@ void FpmLink::readMe()
break;

if (!fpm_msg_ok(hdr, left))
throw std::system_error(make_error_code(errc::bad_message), "Malformed FPM message received");
throw system_error(make_error_code(errc::bad_message), "Malformed FPM message received");

if (hdr->msg_type == FPM_MSG_TYPE_NETLINK)
{
nl_msg *msg = nlmsg_convert((nlmsghdr *)fpm_msg_data(hdr));
if (msg == NULL)
throw std::system_error(make_error_code(errc::bad_message), "Unable to convert nlmsg");
throw system_error(make_error_code(errc::bad_message), "Unable to convert nlmsg");

nlmsg_set_proto(msg, NETLINK_ROUTE);
NetDispatcher::getInstance().onNetlinkMessage(msg);
Expand Down
1 change: 1 addition & 0 deletions fpmsyncd/fpmlink.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace swss {

class FpmLink : public Selectable {
public:
const int MSG_BATCH_SIZE;
FpmLink(int port = FPM_DEFAULT_PORT);
virtual ~FpmLink();

Expand Down
7 changes: 5 additions & 2 deletions fpmsyncd/fpmsyncd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ using namespace swss;
int main(int argc, char **argv)
{
DBConnector db(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0);
RouteSync sync(&db);
RedisPipeline pipeline(&db);
RouteSync sync(&pipeline);

NetDispatcher::getInstance().registerMessageHandler(RTM_NEWROUTE, &sync);
NetDispatcher::getInstance().registerMessageHandler(RTM_DELROUTE, &sync);
Expand All @@ -34,13 +35,15 @@ int main(int argc, char **argv)
int tempfd;
/* Reading FPM messages forever (and calling "readMe" to read them) */
s.select(&temps, &tempfd);
pipeline.flush();
SWSS_LOG_DEBUG("Pipeline flushed");
}
}
catch (FpmLink::FpmConnectionClosedException &e)
{
cout << "Connection lost, reconnecting..." << endl;
}
catch (const std::exception& e)
catch (const exception& e)
{
cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl;
return 0;
Expand Down
4 changes: 2 additions & 2 deletions fpmsyncd/routesync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
using namespace std;
using namespace swss;

RouteSync::RouteSync(DBConnector *db) :
m_routeTable(db, APP_ROUTE_TABLE_NAME)
RouteSync::RouteSync(RedisPipeline *pipeline) :
m_routeTable(pipeline, APP_ROUTE_TABLE_NAME, true)
{
m_nl_sock = nl_socket_alloc();
nl_connect(m_nl_sock, NETLINK_ROUTE);
Expand Down
2 changes: 1 addition & 1 deletion fpmsyncd/routesync.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class RouteSync : public NetMsg
public:
enum { MAX_ADDR_SIZE = 64 };

RouteSync(DBConnector *db);
RouteSync(RedisPipeline *pipeline);

virtual void onMsg(int nlmsg_type, struct nl_object *obj);

Expand Down
12 changes: 12 additions & 0 deletions orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE);
}

SWSS_LOG_NOTICE("Enable redis pipeline");

attr.id = SAI_REDIS_SWITCH_ATTR_USE_PIPELINE;
attr.value.booldata = true;

sai_switch_api->set_switch_attribute(&attr);
if (status != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Failed to enable redis pipeline %d", status);
exit(EXIT_FAILURE);
}

attr.id = SAI_SWITCH_ATTR_SRC_MAC_ADDRESS;
if (!gMacAddress)
{
Expand Down
28 changes: 25 additions & 3 deletions orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
#include <unistd.h>
#include "orchdaemon.h"

#include "logger.h"

#include <unistd.h>
#include "sairedis.h"

using namespace std;
using namespace swss;

/* select() function timeout retry time */
#define SELECT_TIMEOUT 1000

extern sai_switch_api_t* sai_switch_api;

/* Global variable gPortsOrch declared */
PortsOrch *gPortsOrch;
/* Global variable gFdbOrch declared */
Expand Down Expand Up @@ -77,6 +78,21 @@ bool OrchDaemon::init()
return true;
}

/* Flush redis through sairedis interface */
void OrchDaemon::flush()
{
SWSS_LOG_ENTER();

sai_attribute_t attr;
attr.id = SAI_REDIS_SWITCH_ATTR_FLUSH;
sai_status_t status = sai_switch_api->set_switch_attribute(&attr);
if (status != SAI_STATUS_SUCCESS)
{
SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status);
exit(EXIT_FAILURE);
}
}

void OrchDaemon::start()
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -106,6 +122,12 @@ void OrchDaemon::start()
for (Orch *o : m_orchList)
o->doTask();

/* Let sairedis to flush all SAI function call to ASIC DB.
* Normally the redis pipeline will flush when enough request
* accumulated. Still it is possible that small amount of
* requests live in it. When the daemon has nothing to do, it
* is a good chance to flush the pipeline */
flush();
continue;
}

Expand Down
1 change: 1 addition & 0 deletions orchagent/orchdaemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class OrchDaemon
Select *m_select;

Orch *getOrchByConsumer(ConsumerStateTable *c);
void flush();
};

#endif /* SWSS_ORCHDAEMON_H */

0 comments on commit 3681cd2

Please sign in to comment.