Skip to content

Commit

Permalink
omfwd imudp: Add support for bind-to-device
Browse files Browse the repository at this point in the history
Add support for bind-to-device option to omfwd and imudp modules.
Configured using device="name". Only new syntax format is supported.
e.g.,

input(type="imudp" port=["10514"] device="eth0" name="udp")
action(type="omfwd" Target="192.168.1.23" Port="10514" Device="eth0")

Signed-off-by: David Ahern <dsa@cumulusnetworks.com>
  • Loading branch information
David Ahern committed Nov 16, 2016
1 parent 7e99440 commit 19e5d06
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 16 deletions.
18 changes: 17 additions & 1 deletion plugins/imudp/imudp.c
Expand Up @@ -100,6 +100,7 @@ static time_t ttLastDiscard = 0; /* timestamp when a message from a non-permitte
/* config vars for legacy config system */
static struct configSettings_s {
uchar *pszBindAddr; /* IP to bind socket to */
char *pszBindDevice; /* Device to bind socket to */
uchar *pszSchedPolicy; /* scheduling policy string */
uchar *pszBindRuleset; /* name of Ruleset to bind to */
int iSchedPrio; /* scheduling priority */
Expand All @@ -108,6 +109,7 @@ static struct configSettings_s {

struct instanceConf_s {
uchar *pszBindAddr; /* IP to bind socket to */
char *pszBindDevice; /* Device to bind socket to */
uchar *pszBindPort; /* Port to bind socket to */
uchar *pszBindRuleset; /* name of ruleset to bind to */
uchar *inputname;
Expand Down Expand Up @@ -180,6 +182,7 @@ static struct cnfparamdescr inppdescr[] = {
{ "name", eCmdHdlrGetWord, 0 },
{ "name.appendport", eCmdHdlrBinary, 0 },
{ "address", eCmdHdlrString, 0 },
{ "device", eCmdHdlrString, 0 },
{ "ratelimit.interval", eCmdHdlrInt, 0 },
{ "ratelimit.burst", eCmdHdlrInt, 0 },
{ "rcvbufsize", eCmdHdlrSize, 0 },
Expand Down Expand Up @@ -208,6 +211,7 @@ createInstance(instanceConf_t **pinst)

inst->pszBindPort = NULL;
inst->pszBindAddr = NULL;
inst->pszBindDevice = NULL;
inst->pszBindRuleset = NULL;
inst->inputname = NULL;
inst->bAppendPortToInpname = 0;
Expand Down Expand Up @@ -248,6 +252,11 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal)
} else {
CHKmalloc(inst->pszBindAddr = ustrdup(cs.pszBindAddr));
}
if((cs.pszBindDevice == NULL) || (cs.pszBindDevice[0] == '\0')) {
inst->pszBindDevice= NULL;
} else {
CHKmalloc(inst->pszBindDevice = strdup(cs.pszBindDevice));
}
if((cs.pszBindRuleset == NULL) || (cs.pszBindRuleset[0] == '\0')) {
inst->pszBindRuleset = NULL;
} else {
Expand Down Expand Up @@ -291,7 +300,7 @@ addListner(instanceConf_t *inst)

DBGPRINTF("Trying to open syslog UDP ports at %s:%s.\n", bindName, inst->pszBindPort);

newSocks = net.create_udp_socket(bindAddr, port, 1, inst->rcvbuf, inst->ipfreebind);
newSocks = net.create_udp_socket(bindAddr, port, 1, inst->rcvbuf, inst->ipfreebind, inst->pszBindDevice);
if(newSocks != NULL) {
/* we now need to add the new sockets to the existing set */
/* ready to copy */
Expand Down Expand Up @@ -932,6 +941,8 @@ createListner(es_str_t *port, struct cnfparamvals *pvals)
inst->dfltTZ = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "address")) {
inst->pszBindAddr = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "device")) {
inst->pszBindDevice = (char*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "ruleset")) {
inst->pszBindRuleset = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(inppblk.descr[i].name, "ratelimit.burst")) {
Expand Down Expand Up @@ -995,6 +1006,7 @@ CODESTARTbeginCnfLoad
cs.pszBindRuleset = NULL;
cs.pszSchedPolicy = NULL;
cs.pszBindAddr = NULL;
cs.pszBindDevice = NULL;
cs.iSchedPrio = SCHED_PRIO_UNSET;
cs.iTimeRequery = TIME_REQUERY_DFLT;
ENDbeginCnfLoad
Expand Down Expand Up @@ -1072,6 +1084,7 @@ CODESTARTendCnfLoad
free(cs.pszBindRuleset);
free(cs.pszSchedPolicy);
free(cs.pszBindAddr);
free(cs.pszBindDevice);
ENDendCnfLoad


Expand Down Expand Up @@ -1138,6 +1151,7 @@ CODESTARTfreeCnf
for(inst = pModConf->root ; inst != NULL ; ) {
free(inst->pszBindPort);
free(inst->pszBindAddr);
free(inst->pszBindDevice);
free(inst->inputname);
free(inst->dfltTZ);
del = inst;
Expand Down Expand Up @@ -1295,6 +1309,8 @@ static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __a
{
free(cs.pszBindAddr);
cs.pszBindAddr = NULL;
free(cs.pszBindDevice);
cs.pszBindDevice = NULL;
free(cs.pszSchedPolicy);
cs.pszSchedPolicy = NULL;
free(cs.pszBindRuleset);
Expand Down
2 changes: 1 addition & 1 deletion plugins/omudpspoof/omudpspoof.c
Expand Up @@ -576,7 +576,7 @@ static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData)
}
DBGPRINTF("%s found, resuming.\n", pData->host);
pWrkrData->f_addr = res;
pWrkrData->pSockArray = net.create_udp_socket((uchar*)pData->host, NULL, 0, 0, 0);
pWrkrData->pSockArray = net.create_udp_socket((uchar*)pData->host, NULL, 0, 0, 0, NULL);

finalize_it:
if(iRet != RS_RET_OK) {
Expand Down
14 changes: 13 additions & 1 deletion runtime/net.c
Expand Up @@ -1223,7 +1223,7 @@ closeUDPListenSockets(int *pSockArr)
* param rcvbuf indicates desired rcvbuf size; 0 means OS default
*/
static int *
create_udp_socket(uchar *hostname, uchar *pszPort, int bIsServer, int rcvbuf, int ipfreebind)
create_udp_socket(uchar *hostname, uchar *pszPort, int bIsServer, int rcvbuf, int ipfreebind, char *device)
{
struct addrinfo hints, *res, *r;
int error, maxs, *s, *socks, on = 1;
Expand Down Expand Up @@ -1283,6 +1283,18 @@ create_udp_socket(uchar *hostname, uchar *pszPort, int bIsServer, int rcvbuf, in
}
# endif

if(device) {
# if defined(SO_BINDTODEVICE)
if(setsockopt(*s, SOL_SOCKET, SO_BINDTODEVICE, device, strlen(device) + 1) < 0)
# endif
{
errmsg.LogError(errno, NO_ERRCODE, "setsockopt(SO_BINDTODEVICE)");
close(*s);
*s = -1;
continue;
}
}

/* if we have an error, we "just" suspend that socket. Eventually
* other sockets will work. At the end of this function, we check
* if we managed to open at least one socket. If not, we'll write
Expand Down
5 changes: 3 additions & 2 deletions runtime/net.h
Expand Up @@ -148,7 +148,7 @@ BEGINinterface(net) /* name must also be changed in ENDinterface macro! */
void (*PrintAllowedSenders)(int iListToPrint);
void (*clearAllowedSenders)(uchar*);
void (*debugListenInfo)(int fd, char *type);
int *(*create_udp_socket)(uchar *hostname, uchar *LogPort, int bIsServer, int rcvbuf, int ipfreebind);
int *(*create_udp_socket)(uchar *hostname, uchar *LogPort, int bIsServer, int rcvbuf, int ipfreebind, char *device);
void (*closeUDPListenSockets)(int *finet);
int (*isAllowedSender)(uchar *pszType, struct sockaddr *pFrom, const char *pszFromHost); /* deprecated! */
rsRetVal (*getLocalHostname)(uchar**);
Expand All @@ -168,8 +168,9 @@ BEGINinterface(net) /* name must also be changed in ENDinterface macro! */
int *pACLAddHostnameOnFail; /* add hostname to acl when DNS resolving has failed */
int *pACLDontResolve; /* add hostname to acl instead of resolving it to IP(s) */
/* v8 cvthname() signature change -- rgerhards, 2013-01-18 */
/* v9 create_udp_socket() signature change -- dsahern, 2016-11-11 */
ENDinterface(net)
#define netCURR_IF_VERSION 8 /* increment whenever you change the interface structure! */
#define netCURR_IF_VERSION 9 /* increment whenever you change the interface structure! */

/* prototypes */
PROTOTYPEObj(net);
Expand Down
4 changes: 2 additions & 2 deletions runtime/netstrm.c
Expand Up @@ -326,13 +326,13 @@ GetRemAddr(netstrm_t *pThis, struct sockaddr_storage **ppAddr)
* rgerhards, 2008-03-19
*/
static rsRetVal
Connect(netstrm_t *pThis, int family, uchar *port, uchar *host)
Connect(netstrm_t *pThis, int family, uchar *port, uchar *host, char *device)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrm);
assert(port != NULL);
assert(host != NULL);
iRet = pThis->Drvr.Connect(pThis->pDrvrData, family, port, host);
iRet = pThis->Drvr.Connect(pThis->pDrvrData, family, port, host, device);
RETiRet;
}

Expand Down
5 changes: 3 additions & 2 deletions runtime/netstrm.h
Expand Up @@ -47,7 +47,7 @@ BEGINinterface(netstrm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*AcceptConnReq)(netstrm_t *pThis, netstrm_t **ppNew);
rsRetVal (*Rcv)(netstrm_t *pThis, uchar *pRcvBuf, ssize_t *pLenBuf);
rsRetVal (*Send)(netstrm_t *pThis, uchar *pBuf, ssize_t *pLenBuf);
rsRetVal (*Connect)(netstrm_t *pThis, int family, unsigned char *port, unsigned char *host);
rsRetVal (*Connect)(netstrm_t *pThis, int family, unsigned char *port, unsigned char *host, char *device);
rsRetVal (*GetRemoteHName)(netstrm_t *pThis, uchar **pszName);
rsRetVal (*GetRemoteIP)(netstrm_t *pThis, prop_t **ip);
rsRetVal (*SetDrvrMode)(netstrm_t *pThis, int iMode);
Expand Down Expand Up @@ -76,12 +76,13 @@ BEGINinterface(netstrm) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetKeepAliveTime)(netstrm_t *pThis, int keepAliveTime);
rsRetVal (*SetKeepAliveIntvl)(netstrm_t *pThis, int keepAliveIntvl);
ENDinterface(netstrm)
#define netstrmCURR_IF_VERSION 7 /* increment whenever you change the interface structure! */
#define netstrmCURR_IF_VERSION 8 /* increment whenever you change the interface structure! */
/* interface version 3 added GetRemAddr()
* interface version 4 added EnableKeepAlive() -- rgerhards, 2009-06-02
* interface version 5 changed return of CheckConnection from void to rsRetVal -- alorbach, 2012-09-06
* interface version 6 changed signature of GetRemoteIP() -- rgerhards, 2013-01-21
* interface version 7 added KeepAlive parameter set functions
* interface version 8 changed signature of Connect() -- dsa, 2016-11-14
* */

/* prototypes */
Expand Down
5 changes: 3 additions & 2 deletions runtime/nsd.h
Expand Up @@ -54,7 +54,7 @@ BEGINinterface(nsd) /* name must also be changed in ENDinterface macro! */
rsRetVal (*Abort)(nsd_t *pThis);
rsRetVal (*Rcv)(nsd_t *pThis, uchar *pRcvBuf, ssize_t *pLenBuf);
rsRetVal (*Send)(nsd_t *pThis, uchar *pBuf, ssize_t *pLenBuf);
rsRetVal (*Connect)(nsd_t *pThis, int family, unsigned char *port, unsigned char *host);
rsRetVal (*Connect)(nsd_t *pThis, int family, unsigned char *port, unsigned char *host, char *device);
rsRetVal (*LstnInit)(netstrms_t *pNS, void *pUsr, rsRetVal(*fAddLstn)(void*,netstrm_t*),
uchar *pLstnPort, uchar *pLstnIP, int iSessMax);
rsRetVal (*AcceptConnReq)(nsd_t *pThis, nsd_t **ppThis);
Expand Down Expand Up @@ -84,12 +84,13 @@ BEGINinterface(nsd) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetKeepAliveProbes)(nsd_t *pThis, int keepAliveProbes);
rsRetVal (*SetKeepAliveTime)(nsd_t *pThis, int keepAliveTime);
ENDinterface(nsd)
#define nsdCURR_IF_VERSION 8 /* increment whenever you change the interface structure! */
#define nsdCURR_IF_VERSION 9 /* increment whenever you change the interface structure! */
/* interface version 4 added GetRemAddr()
* interface version 5 added EnableKeepAlive() -- rgerhards, 2009-06-02
* interface version 6 changed return of CheckConnection from void to rsRetVal -- alorbach, 2012-09-06
* interface version 7 changed signature ofGetRempoteIP() -- rgerhards, 2013-01-21
* interface version 8 added keep alive parameter set functions
* interface version 9 changed signature of Connect() -- dsa, 2016-11-14
*/

/* interface for the select call */
Expand Down
4 changes: 2 additions & 2 deletions runtime/nsd_gtls.c
Expand Up @@ -1680,7 +1680,7 @@ EnableKeepAlive(nsd_t *pNsd)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations" /* TODO: FIX Warnings! */
static rsRetVal
Connect(nsd_t *pNsd, int family, uchar *port, uchar *host)
Connect(nsd_t *pNsd, int family, uchar *port, uchar *host, char *device)
{
nsd_gtls_t *pThis = (nsd_gtls_t*) pNsd;
int sock;
Expand All @@ -1694,7 +1694,7 @@ Connect(nsd_t *pNsd, int family, uchar *port, uchar *host)
assert(port != NULL);
assert(host != NULL);

CHKiRet(nsd_ptcp.Connect(pThis->pTcp, family, port, host));
CHKiRet(nsd_ptcp.Connect(pThis->pTcp, family, port, host, device));

if(pThis->iMode == 0)
FINALIZE;
Expand Down
12 changes: 11 additions & 1 deletion runtime/nsd_ptcp.c
Expand Up @@ -709,7 +709,7 @@ EnableKeepAlive(nsd_t *pNsd)
* rgerhards, 2008-03-19
*/
static rsRetVal
Connect(nsd_t *pNsd, int family, uchar *port, uchar *host)
Connect(nsd_t *pNsd, int family, uchar *port, uchar *host, char *device)
{
nsd_ptcp_t *pThis = (nsd_ptcp_t*) pNsd;
struct addrinfo *res = NULL;
Expand All @@ -733,6 +733,16 @@ Connect(nsd_t *pNsd, int family, uchar *port, uchar *host)
ABORT_FINALIZE(RS_RET_IO_ERROR);
}

if(device) {
# if defined(SO_BINDTODEVICE)
if(setsockopt(pThis->sock, SOL_SOCKET, SO_BINDTODEVICE, device, strlen(device) + 1) < 0)
# endif
{
dbgprintf("setsockopt(SO_BINDTODEVICE) failed\n");
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
}

if(connect(pThis->sock, res->ai_addr, res->ai_addrlen) != 0) {
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
Expand Down
17 changes: 15 additions & 2 deletions tools/omfwd.c
Expand Up @@ -82,6 +82,7 @@ typedef struct _instanceData {
permittedPeers_t *pPermPeers;
int iStrmDrvrMode;
char *target;
char *device;
int compressionLevel; /* 0 - no compression, else level for zlib */
char *port;
int protocol;
Expand Down Expand Up @@ -155,6 +156,7 @@ static struct cnfparamblk modpblk =
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "target", eCmdHdlrGetWord, 0 },
{ "device", eCmdHdlrGetWord, 0 },
{ "port", eCmdHdlrGetWord, 0 },
{ "protocol", eCmdHdlrGetWord, 0 },
{ "tcp_framing", eCmdHdlrGetWord, 0 },
Expand Down Expand Up @@ -382,6 +384,7 @@ CODESTARTfreeInstance
free(pData->pszStrmDrvrAuthMode);
free(pData->port);
free(pData->target);
free(pData->device);
net.DestructPermittedPeers(&pData->pPermPeers);
ENDfreeInstance

Expand Down Expand Up @@ -415,6 +418,7 @@ static rsRetVal UDPSend(wrkrInstanceData_t *__restrict__ const pWrkrData,
int i;
unsigned lsent = 0;
sbool bSendSuccess;
sbool reInit = RSFALSE;
int lasterrno = ENOENT;
char errStr[1024];

Expand Down Expand Up @@ -444,6 +448,7 @@ static rsRetVal UDPSend(wrkrInstanceData_t *__restrict__ const pWrkrData,
bSendSuccess = RSTRUE;
break;
} else {
reInit = RSTRUE;
lasterrno = errno;
DBGPRINTF("sendto() error: %d = %s.\n",
lasterrno,
Expand All @@ -453,6 +458,12 @@ static rsRetVal UDPSend(wrkrInstanceData_t *__restrict__ const pWrkrData,
if (lsent == len && !pWrkrData->pData->bSendToAll)
break;
}

/* one or more send failures; close sockets and re-init */
if (reInit == RSTRUE) {
CHKiRet(closeUDPSockets(pWrkrData));
}

/* finished looping */
if(bSendSuccess == RSTRUE) {
if(pWrkrData->pData->iUDPSendDelay > 0) {
Expand Down Expand Up @@ -707,7 +718,7 @@ static rsRetVal TCPSendInit(void *pvData)
}
/* params set, now connect */
CHKiRet(netstrm.Connect(pWrkrData->pNetstrm, glbl.GetDefPFFamily(),
(uchar*)pData->port, (uchar*)pData->target));
(uchar*)pData->port, (uchar*)pData->target, pData->device));

/* set keep-alive if enabled */
if(pData->bKeepAlive) {
Expand Down Expand Up @@ -758,7 +769,7 @@ static rsRetVal doTryResume(wrkrInstanceData_t *pWrkrData)
dbgprintf("%s found, resuming.\n", pData->target);
pWrkrData->f_addr = res;
if(pWrkrData->pSockArray == NULL) {
pWrkrData->pSockArray = net.create_udp_socket((uchar*)pData->target, NULL, 0, 0, 0);
pWrkrData->pSockArray = net.create_udp_socket((uchar*)pData->target, NULL, 0, 0, 0, pData->device);
}
if(pWrkrData->pSockArray != NULL) {
pWrkrData->bIsConnected = 1;
Expand Down Expand Up @@ -987,6 +998,8 @@ CODESTARTnewActInst
continue;
if(!strcmp(actpblk.descr[i].name, "target")) {
pData->target = es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "device")) {
pData->device = es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "port")) {
pData->port = es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "protocol")) {
Expand Down

0 comments on commit 19e5d06

Please sign in to comment.