Skip to content

Commit

Permalink
evapi: implemented receive buffering per connected client
Browse files Browse the repository at this point in the history
- incomplete data may be received on tcp, needing to wait to get more
  (only for netstring format)
  • Loading branch information
miconda committed Sep 18, 2015
1 parent dd0fd70 commit 4eb7656
Showing 1 changed file with 68 additions and 27 deletions.
95 changes: 68 additions & 27 deletions modules/evapi/evapi_dispatch.c
Expand Up @@ -45,12 +45,15 @@ static int _evapi_notify_sockets[2];
static int _evapi_netstring_format = 1;

#define EVAPI_IPADDR_SIZE 64
#define CLIENT_BUFFER_SIZE 32768
typedef struct _evapi_client {
int connected;
int sock;
unsigned short af;
unsigned short src_port;
char src_addr[EVAPI_IPADDR_SIZE];
char rbuffer[CLIENT_BUFFER_SIZE];
unsigned int rpos;
} evapi_client_t;

typedef struct _evapi_env {
Expand All @@ -60,7 +63,8 @@ typedef struct _evapi_env {
} evapi_env_t;

#define EVAPI_MAX_CLIENTS 8
static evapi_client_t _evapi_clients[EVAPI_MAX_CLIENTS];
/* last one used for error handling, not a real connected client */
static evapi_client_t _evapi_clients[EVAPI_MAX_CLIENTS+1];

typedef struct _evapi_evroutes {
int con_new;
Expand Down Expand Up @@ -225,36 +229,41 @@ int evapi_dispatch_notify(char *obuf, int olen)
*/
void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
#define CLIENT_BUFFER_SIZE 32768
char rbuffer[CLIENT_BUFFER_SIZE];
ssize_t rlen;
int i, k;
evapi_env_t evenv;
str frame;
char *sfp;
char *efp;

if(EV_ERROR & revents) {
perror("received invalid event\n");
return;
}

/* read message from client */
rlen = recv(watcher->fd, rbuffer, CLIENT_BUFFER_SIZE-1, 0);

if(rlen < 0) {
LM_ERR("cannot read the client message\n");
return;
}

for(i=0; i<EVAPI_MAX_CLIENTS; i++) {
if(_evapi_clients[i].connected==1 && _evapi_clients[i].sock==watcher->fd) {
break;
}
}
if(i==EVAPI_MAX_CLIENTS) {
LM_ERR("cannot lookup client socket %d\n", watcher->fd);
/* try to empty the socket anyhow */
rlen = recv(watcher->fd, _evapi_clients[i].rbuffer, CLIENT_BUFFER_SIZE-1, 0);
return;
}

/* read message from client */
rlen = recv(watcher->fd, _evapi_clients[i].rbuffer + _evapi_clients[i].rpos,
CLIENT_BUFFER_SIZE - 1 - _evapi_clients[i].rpos, 0);

if(rlen < 0) {
LM_ERR("cannot read the client message\n");
_evapi_clients[i].rpos = 0;
return;
}


cfg_update();

evapi_env_reset(&evenv);
Expand All @@ -265,48 +274,80 @@ void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
evapi_run_cfg_route(&evenv, _evapi_rts.con_closed);
_evapi_clients[i].connected = 0;
_evapi_clients[i].sock = 0;
_evapi_clients[i].rpos = 0;
ev_io_stop(loop, watcher);
free(watcher);
LM_INFO("client closing connection - pos [%d] addr [%s:%d]\n",
i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port);
return;
}

rbuffer[rlen] = '\0';
_evapi_clients[i].rbuffer[_evapi_clients[i].rpos+rlen] = '\0';

LM_NOTICE("{%d} [%s:%d] - received [%.*s]\n",
i, _evapi_clients[i].src_addr, _evapi_clients[i].src_port,
(int)rlen, rbuffer);
(int)rlen, _evapi_clients[i].rbuffer+_evapi_clients[i].rpos);
evenv.conidx = i;
evenv.eset = 1;
if(_evapi_netstring_format) {
/* netstring decapsulation */
k = 0;
while(k<rlen) {
while(k<_evapi_clients[i].rpos+rlen) {
frame.len = 0;
while(k<rlen) {
if(rbuffer[k]==' ' || rbuffer[k]=='\t'
|| rbuffer[k]=='\r' || rbuffer[k]=='\n')
while(k<_evapi_clients[i].rpos+rlen) {
if(_evapi_clients[i].rbuffer[k]==' '
|| _evapi_clients[i].rbuffer[k]=='\t'
|| _evapi_clients[i].rbuffer[k]=='\r'
|| _evapi_clients[i].rbuffer[k]=='\n')
k++;
else break;
}
if(k==rlen) return;
while(k<rlen) {
if(rbuffer[k]>='0' && rbuffer[k]<='9') {
frame.len = frame.len*10 + rbuffer[k] - '0';
if(k==_evapi_clients[i].rpos+rlen) {
_evapi_clients[i].rpos = 0;
return;
}
/* pointer to start of whole frame */
sfp = _evapi_clients[i].rbuffer + k;
while(k<_evapi_clients[i].rpos+rlen) {
if(_evapi_clients[i].rbuffer[k]>='0' && _evapi_clients[i].rbuffer[k]<='9') {
frame.len = frame.len*10 + _evapi_clients[i].rbuffer[k] - '0';
} else {
if(rbuffer[k]==':')
if(_evapi_clients[i].rbuffer[k]==':')
break;
/* invalid character - discard the rest */
_evapi_clients[i].rpos = 0;
return;
}
k++;
}
if(k==rlen || frame.len<=0) return;
if(frame.len + k>=rlen) return;
if(k==_evapi_clients[i].rpos+rlen || frame.len<=0) {
_evapi_clients[i].rpos = 0;
return;
}
if(frame.len + k>=_evapi_clients[i].rpos + rlen) {
/* partial data - shift back in buffer and wait to read more */
efp = _evapi_clients[i].rbuffer + _evapi_clients[i].rpos + rlen;
if(efp<=sfp) {
_evapi_clients[i].rpos = 0;
return;
}
_evapi_clients[i].rpos = (unsigned int)(efp-sfp);
if(efp-sfp > sfp-_evapi_clients[i].rbuffer) {
memcpy(_evapi_clients[i].rbuffer, sfp, _evapi_clients[i].rpos);
} else {
for(k=0; k<_evapi_clients[i].rpos; k++) {
_evapi_clients[i].rbuffer[k] = sfp[k];
}
}
return;
}
k++;
frame.s = rbuffer + k;
if(frame.s[frame.len]!=',') return;
frame.s = _evapi_clients[i].rbuffer + k;
if(frame.s[frame.len]!=',') {
/* invalid data - discard and reset buffer */
_evapi_clients[i].rpos = 0 ;
return;
}
frame.s[frame.len] = '\0';
k += frame.len ;
evenv.msg.s = frame.s;
Expand All @@ -315,7 +356,7 @@ void evapi_recv_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
k++;
}
} else {
evenv.msg.s = rbuffer;
evenv.msg.s = _evapi_clients[i].rbuffer;
evenv.msg.len = rlen;
evapi_run_cfg_route(&evenv, _evapi_rts.msg_received);
}
Expand Down

0 comments on commit 4eb7656

Please sign in to comment.