Skip to content
Browse files

- merge from testing-0.8.12-r0:

 - tcp updates (lots)
 - makefile mips support
 - tm timer workarround (present also in stable), t_relay_tls changed
  to t_relay_to_tls
 - udp_flood sleep & throttle support
  • Loading branch information...
1 parent 5cdca24 commit 06aaa54ff3a6609f6bf2f772666e2bbc870691b8 Andrei Pelinescu-Onciul committed
Showing with 404 additions and 130 deletions.
  1. +47 −11 Makefile.defs
  2. +5 −5 etc/ser.cfg
  3. +2 −2 fifo_server.c
  4. +5 −0 modules/tm/timer.c
  5. +1 −1 modules/tm/tm_load.h
  6. +83 −10 pass_fd.c
  7. +2 −0 pass_fd.h
  8. +4 −2 tcp_conn.h
  9. +199 −80 tcp_main.c
  10. +25 −17 tcp_read.c
  11. +1 −1 test/test.cfg
  12. +30 −1 test/udp_flood.c
View
58 Makefile.defs
@@ -29,6 +29,7 @@
# 2003-09-25 added -pthread into LIBS when compiling on FreeBSD/alpha
# and other FreeBSD arches for which no fast locking assembly
# code exists (sobomax)
+# 2003-11-08 mips1 support introduced (andrei)
# check if already included/exported
@@ -43,7 +44,7 @@ export makefile_defs
VERSION = 0
PATCHLEVEL = 8
SUBLEVEL = 12
-EXTRAVERSION = dev-22-tcp_aliases
+EXTRAVERSION = -dev-23-merged
RELEASE=$(VERSION).$(PATCHLEVEL).$(SUBLEVEL)$(EXTRAVERSION)
OS = $(shell uname -s | sed -e s/SunOS/solaris/ | tr "[A-Z]" "[a-z]")
@@ -51,7 +52,7 @@ ARCH = $(shell uname -m |sed -e s/i.86/i386/ -e s/sun4u/sparc64/ \
-e s/armv4l/arm/)
# TLS support
-TLS ?=
+TLS ?=
ifneq ($(TLS),)
RELEASE:=$(RELEASE)-tls
endif
@@ -240,16 +241,16 @@ endif
# -DUSE_POSIX_SEM
# uses posix semaphores for locking (faster than sys v)
# -DBUSY_WAIT
-# uses busy waiting on the lock
+# uses busy waiting on the lock (FAST_LOCK)
# -DADAPTIVE_WAIT
# try busy waiting for a while and if the lock is still held go to
-# force reschedule
+# force reschedule (FAST_LOCK)
# -DADAPTIVE_WAIT_LOOPS=number
# number of loops we busy wait, after "number" loops have elapsed we
-# force a reschedule
+# force a reschedule (FAST_LOCK)
# -DNOSMP
# don't use smp compliant locking (faster but won't work on SMP machines)
-# (not yet enabled)
+# (not yet enabled) (FAST_LOCK)
# -DNO_PINGTEL_TAG_HACK
# if enabled, To-header-field will be less liberal and will not accept
# 'tag=' (tag parameter with equal sign and without value); it is called
@@ -273,12 +274,11 @@ DEFS+= $(extra_defs) \
-DCFG_DIR='"$(cfg-target)"'\
-DPKG_MALLOC \
-DSHM_MEM -DSHM_MMAP \
- -DADAPTIVE_WAIT -DADAPTIVE_WAIT_LOOPS=1024 \
-DDNS_IP_HACK \
-DUSE_IPV6 \
-DUSE_TCP \
-DDISABLE_NAGLE \
- -DDBG_QM_MALLOC \
+ # -DDBG_QM_MALLOC \
# -DF_MALLOC \
# -DDBG_F_MALLOC \
# -DDBG_QM_MALLOC \
@@ -386,8 +386,13 @@ ifeq ($(ARCH), ppc)
use_fast_lock=yes
endif
+ifeq ($(ARCH), mips)
+# mips1 arch. (e.g. R3000) - no hardware locking support
+ use_fast_lock=no
+endif
+
ifeq ($(use_fast_lock), yes)
- DEFS+= -DFAST_LOCK
+ DEFS+= -DFAST_LOCK -DADAPTIVE_WAIT -DADAPTIVE_WAIT_LOOPS=1024
found_lock_method=yes
endif
@@ -519,15 +524,46 @@ else
#really old version
$(warning You are using an old and unsupported gcc \
version ($(CC_SHORTVER)), compile at your own risk!)
-
+
endif # CC_SHORTVER, 2.9x
endif # CC_SHORTVER, 3.0
+
+else # CC_NAME, gcc
+ #other compilers
+$(error Unsupported compiler ($(CC):$(CC_NAME)), try gcc)
+endif #CC_NAME, gcc
+endif #ARCH, arm
+ #if mips (R3000)
+ifeq ($(ARCH), mips)
+ # if gcc
+ifeq ($(CC_NAME), gcc)
+ #common stuff
+ CFLAGS=-O9 -funroll-loops -Wcast-align $(PROFILE) \
+ -Wall \
+ #if gcc 3.0
+ifeq ($(CC_SHORTVER), 3.0)
+ CFLAGS+= -mcpu=r3000
+ #-mcpu=athlon
+else
+ifeq ($(CC_SHORTVER), 2.9x) #older gcc version (2.9[1-5])
+$(warning Old gcc detected ($(CC_SHORTVER)), use gcc 3.0.x \
+ for better results)
+
+ CFLAGS+=-mcpu=r3000
+else
+ #really old version
+$(warning You are using an old and unsupported gcc \
+ version ($(CC_SHORTVER)), compile at your own risk!)
+
+endif # CC_SHORTVER, 2.9x
+endif # CC_SHORTVER, 3.0
+
else # CC_NAME, gcc
#other compilers
$(error Unsupported compiler ($(CC):$(CC_NAME)), try gcc)
endif #CC_NAME, gcc
-endif #ARCH, i386
+endif #ARCH, mips
CFLAGS+= $(CC_EXTRA_OPTS)
View
10 etc/ser.cfg
@@ -6,9 +6,9 @@
# ----------- global configuration parameters ------------------------
-debug=3 # debug level (cmd line: -dddddddddd)
-fork=yes
-log_stderror=no # (cmd line: -E)
+#debug=3 # debug level (cmd line: -dddddddddd)
+#fork=yes
+#log_stderror=no # (cmd line: -E)
/* Uncomment these lines to enter debugging mode
fork=no
@@ -18,8 +18,8 @@ log_stderror=yes
check_via=no # (cmd. line: -v)
dns=no # (cmd. line: -r)
rev_dns=no # (cmd. line: -R)
-port=5060
-children=4
+#port=5060
+#children=4
fifo="/tmp/ser_fifo"
# ------------------ module loading ----------------------------------
View
4 fifo_server.c
@@ -349,8 +349,8 @@ static int fifo_check(int fd, char* fname)
*/
if ((lst.st_dev!=fst.st_dev)||(lst.st_ino!=fst.st_ino)){
LOG(L_ERR, "ERROR: security: fifo_check: inode/dev number differ"
- ": %ld %ld (%s)\n",
- fst.st_ino, lst.st_ino, fname);
+ ": %d %d (%s)\n",
+ (int)fst.st_ino, (int)lst.st_ino, fname);
return -1;
}
/* success */
View
5 modules/tm/timer.c
@@ -326,6 +326,11 @@ inline static void final_response_handler( void *attr)
struct cell *t;
r_buf = (struct retr_buf*)attr;
+ if (r_buf==0){
+ /* or BUG?, ignoring it for now */
+ LOG(L_CRIT, "ERROR: final_response_handler(0) called\n");
+ return;
+ }
t=r_buf->my_T;
# ifdef EXTRA_DEBUG
View
2 modules/tm/tm_load.h
@@ -52,7 +52,7 @@
#define T_RELAY_TO "t_relay_to"
#define T_RELAY_TO_UDP "t_relay_to_udp"
#define T_RELAY_TO_TCP "t_relay_to_tcp"
-#define T_RELAY_TO_TLS "t_relay_tls"
+#define T_RELAY_TO_TLS "t_relay_to_tls"
#define T_RELAY "t_relay"
#define T_REPLY "t_reply"
#define T_REPLY_WB "t_reply_with_body"
View
93 pass_fd.c
@@ -29,6 +29,8 @@
* --------
* 2002-11-29 created by andrei
* 2003-02-20 added solaris support (! HAVE_MSGHDR_MSG_CONTROL) (andrei)
+ * 2003-11-03 added send_all, recv_all and updated send/get_fd
+ * to handle signals (andrei)
*/
#ifdef USE_TCP
@@ -37,10 +39,55 @@
#include <sys/socket.h>
#include <sys/uio.h>
#include <stdlib.h> /* for NULL definition on openbsd */
+#include <errno.h>
+#include <string.h>
#include "dprint.h"
+
+/* receive all the data or returns error (handles EINTR etc.)
+ * returns: bytes read or error (<0)
+ * can return < data_len if EOF */
+int recv_all(int socket, void* data, int data_len)
+{
+ int b_read;
+ int n;
+
+ b_read=0;
+ do{
+ n=recv(socket, data+b_read, data_len-b_read, MSG_WAITALL);
+ if (n<0){
+ /* error */
+ if (errno==EINTR) continue; /* signal, try again */
+ LOG(L_CRIT, "ERROR: recv_all: recv on %d failed: %s\n",
+ socket, strerror(errno));
+ return n;
+ }
+ b_read+=n;
+ }while( (b_read!=data_len) && (n));
+ return b_read;
+}
+
+
+/* sends all data (takes care of signals) (assumes blocking fd)
+ * returns number of bytes sent or < 0 for an error */
+int send_all(int socket, void* data, int data_len)
+{
+ int n;
+
+again:
+ n=send(socket, data, data_len, 0);
+ if (n<0){
+ /* error */
+ if (errno==EINTR) goto again; /* signal, try again */
+ LOG(L_CRIT, "ERROR: send_all: send on %d failed: %s\n",
+ socket, strerror(errno));
+ }
+ return n;
+}
+
+
/* at least 1 byte must be sent! */
int send_fd(int unix_socket, void* data, int data_len, int fd)
{
@@ -76,8 +123,13 @@ int send_fd(int unix_socket, void* data, int data_len, int fd)
msg.msg_iov=iov;
msg.msg_iovlen=1;
-
+again:
ret=sendmsg(unix_socket, &msg, 0);
+ if (ret<0){
+ if (errno==EINTR) goto again;
+ LOG(L_CRIT, "ERROR: send_fd: sendmsg failed on %d: %s\n",
+ unix_socket, strerror(errno));
+ }
return ret;
}
@@ -90,6 +142,7 @@ int receive_fd(int unix_socket, void* data, int data_len, int* fd)
struct iovec iov[1];
int new_fd;
int ret;
+ int n;
#ifdef HAVE_MSGHDR_MSG_CONTROL
struct cmsghdr* cmsg;
union{
@@ -112,26 +165,47 @@ int receive_fd(int unix_socket, void* data, int data_len, int* fd)
msg.msg_iov=iov;
msg.msg_iovlen=1;
- ret=recvmsg(unix_socket, &msg, 0);
- if (ret<=0) goto error;
+again:
+ ret=recvmsg(unix_socket, &msg, MSG_WAITALL);
+ if (ret<0){
+ if (errno==EINTR) goto again;
+ LOG(L_CRIT, "ERROR: receive_fd: recvmsg on %d failed: %s\n",
+ unix_socket, strerror(errno));
+ goto error;
+ }
+ if (ret==0){
+ /* EOF */
+ LOG(L_CRIT, "ERROR: receive_fd: EOF on %d\n", unix_socket);
+ goto error;
+ }
+ if (ret<data_len){
+ LOG(L_WARN, "WARNING: receive_fd: too few bytes read (%d from %d)"
+ "trying to fix...\n", ret, data_len);
+ n=recv_all(unix_socket, (char*)data+ret, data_len-ret);
+ if (n>=0) ret+=n;
+ else{
+ ret=n;
+ goto error;
+ }
+ }
#ifdef HAVE_MSGHDR_MSG_CONTROL
cmsg=CMSG_FIRSTHDR(&msg);
if ((cmsg!=0) && (cmsg->cmsg_len==CMSG_LEN(sizeof(new_fd)))){
if (cmsg->cmsg_type!= SCM_RIGHTS){
- LOG(L_ERR, "receive_fd: msg control type != SCM_RIGHTS\n");
+ LOG(L_ERR, "ERROR: receive_fd: msg control type != SCM_RIGHTS\n");
ret=-1;
goto error;
}
if (cmsg->cmsg_level!= SOL_SOCKET){
- LOG(L_ERR, "receive_fd: msg level != SOL_SOCKET\n");
+ LOG(L_ERR, "ERROR: receive_fd: msg level != SOL_SOCKET\n");
ret=-1;
goto error;
}
*fd=*((int*) CMSG_DATA(cmsg));
}else{
- LOG(L_ERR, "receive_fd: no descriptor passed, cmsg=%p, len=%d\n",
- cmsg, cmsg->cmsg_len);
+ LOG(L_ERR, "ERROR: receive_fd: no descriptor passed, cmsg=%p,"
+ "len=%d\n", cmsg, cmsg->cmsg_len);
*fd=-1;
/* it's not really an error */
}
@@ -139,8 +213,8 @@ int receive_fd(int unix_socket, void* data, int data_len, int* fd)
if (msg.msg_accrightslen==sizeof(int)){
*fd=new_fd;
}else{
- LOG(L_ERR, "receive_fd: no descriptor passed, accrightslen=%d\n",
- msg.msg_accrightslen);
+ LOG(L_ERR, "ERROR: receive_fd: no descriptor passed,"
+ " accrightslen=%d\n", msg.msg_accrightslen);
*fd=-1;
}
#endif
@@ -148,5 +222,4 @@ int receive_fd(int unix_socket, void* data, int data_len, int* fd)
error:
return ret;
}
-
#endif
View
2 pass_fd.h
@@ -32,6 +32,8 @@
int send_fd(int unix_socket, void* data, int data_len, int fd);
int receive_fd(int unix_socket, void* data, int data_len, int* fd);
+int recv_all(int socket, void* data, int data_len);
+int send_all(int socket, void* data, int data_len);
#endif
View
6 tcp_conn.h
@@ -44,8 +44,8 @@
#define TCP_CON_MAX_ALIASES 4 /* maximum number of port aliases */
#define TCP_BUF_SIZE 65535
-#define TCP_CON_TIMEOUT 60 /* in seconds */
-#define TCP_CON_SEND_TIMEOUT 30 /* timeout after a send */
+#define TCP_CON_TIMEOUT 120 /* in seconds */
+#define TCP_CON_SEND_TIMEOUT 120 /* timeout after a send */
#define TCP_CHILD_TIMEOUT 5 /* after 5 seconds, the child "returns"
the connection to the tcp master process */
#define TCP_MAIN_SELECT_TIMEOUT 5 /* how often "tcp main" checks for timeout*/
@@ -73,6 +73,8 @@ enum tcp_conn_states { S_CONN_ERROR=-2, S_CONN_BAD=-1, S_CONN_OK=0,
/* fd communication commands */
enum conn_cmds { CONN_DESTROY=-3, CONN_ERROR=-2, CONN_EOF=-1, CONN_RELEASE,
CONN_GET_FD, CONN_NEW };
+/* CONN_RELEASE, EOF, ERROR, DESTROY can be used by "reader" processes
+ * CONN_GET_FD, NEW, ERROR only by writers */
struct tcp_req{
struct tcp_req* next;
View
279 tcp_main.c
@@ -44,6 +44,10 @@
* 2003-07-09 tls_close called before closing the tcp connection (andrei)
* 2003-10-24 converted to the new socket_info lists (andrei)
* 2003-10-27 tcp port aliases support added (andrei)
+ * 2003-11-04 always lock before manipulating refcnt; sendchild
+ * does not inc refcnt by itself anymore (andrei)
+ * 2003-11-07 different unix sockets are used for fd passing
+ * to/from readers/writers (andrei)
*/
@@ -100,7 +104,7 @@
struct tcp_child{
pid_t pid;
int proc_no; /* ser proc_no, for debugging */
- int unix_sock; /* unix sock fd, copied from pt*/
+ int unix_sock; /* unix "read child" sock fd */
int busy;
int n_reqs; /* number of requests serviced so far */
};
@@ -242,7 +246,13 @@ struct tcp_connection* tcpconn_connect(union sockaddr_union* server, int type)
strerror(errno), errno);
si=0; /* try to go on */
}
- si=find_tcp_si(&my_name);
+#ifdef USE_TLS
+ if (type==PROTO_TLS)
+ si=find_tls_si(&my_name);
+ else
+#endif
+ si=find_tcp_si(&my_name);
+
if (si==0){
LOG(L_ERR, "ERROR: tcp_connect: could not find coresponding"
" listening socket, using default...\n");
@@ -443,9 +453,20 @@ int tcpconn_add_alias(int id, int port, int proto)
+void tcpconn_ref(struct tcp_connection* c)
+{
+ TCPCONN_LOCK;
+ c->refcnt++; /* FIXME: atomic_dec */
+ TCPCONN_UNLOCK;
+}
+
+
+
void tcpconn_put(struct tcp_connection* c)
{
+ TCPCONN_LOCK;
c->refcnt--; /* FIXME: atomic_dec */
+ TCPCONN_UNLOCK;
}
@@ -455,6 +476,7 @@ int tcp_send(int type, char* buf, unsigned len, union sockaddr_union* to,
int id)
{
struct tcp_connection *c;
+ struct tcp_connection *tmp;
struct ip_addr ip;
int port;
int fd;
@@ -494,22 +516,25 @@ int tcp_send(int type, char* buf, unsigned len, union sockaddr_union* to,
LOG(L_ERR, "ERROR: tcp_send: connect failed\n");
return -1;
}
- c->refcnt++;
+ c->refcnt++; /* safe to do it w/o locking, it's not yet
+ available to the rest of the world */
fd=c->s;
/* send the new tcpconn to "tcp main" */
response[0]=(long)c;
response[1]=CONN_NEW;
- n=write(unix_tcp_sock, response, sizeof(response));
- if (n<0){
+ n=send_all(unix_tcp_sock, response, sizeof(response));
+ if (n<=0){
LOG(L_ERR, "BUG: tcp_send: failed write: %s (%d)\n",
strerror(errno), errno);
+ n=-1;
goto end;
}
n=send_fd(unix_tcp_sock, &c, sizeof(c), c->s);
- if (n<0){
+ if (n<=0){
LOG(L_ERR, "BUG: tcp_send: failed send_fd: %s (%d)\n",
strerror(errno), errno);
+ n=-1;
goto end;
}
goto send_it;
@@ -517,21 +542,34 @@ int tcp_send(int type, char* buf, unsigned len, union sockaddr_union* to,
get_fd:
/* todo: see if this is not the same process holding
* c and if so send directly on c->fd */
- DBG("tcp_send: tcp connection found, acquiring fd\n");
+ DBG("tcp_send: tcp connection found (%p), acquiring fd\n", c);
/* get the fd */
response[0]=(long)c;
response[1]=CONN_GET_FD;
- n=write(unix_tcp_sock, response, sizeof(response));
- if (n<0){
+ n=send_all(unix_tcp_sock, response, sizeof(response));
+ if (n<=0){
LOG(L_ERR, "BUG: tcp_send: failed to get fd(write):%s (%d)\n",
strerror(errno), errno);
+ n=-1;
goto release_c;
}
DBG("tcp_send, c= %p, n=%d\n", c, n);
+ tmp=c;
n=receive_fd(unix_tcp_sock, &c, sizeof(c), &fd);
- if (n<0){
+ if (n<=0){
LOG(L_ERR, "BUG: tcp_send: failed to get fd(receive_fd):"
" %s (%d)\n", strerror(errno), errno);
+ n=-1;
+ goto release_c;
+ }
+ if (c!=tmp){
+ LOG(L_CRIT, "BUG: tcp_send: get_fd: got different connection:"
+ " %p (id= %d, refcnt=%d state=%d != "
+ " %p (id= %d, refcnt=%d state=%d (n=%d)\n",
+ c, c->id, c->refcnt, c->state,
+ tmp, tmp->id, tmp->refcnt, tmp->state, n
+ );
+ n=-1; /* fail */
goto release_c;
}
DBG("tcp_send: after receive_fd: c= %p n=%d fd=%d\n",c, n, fd);
@@ -567,11 +605,12 @@ int tcp_send(int type, char* buf, unsigned len, union sockaddr_union* to,
/* tell "main" it should drop this (optional it will t/o anyway?)*/
response[0]=(long)c;
response[1]=CONN_ERROR;
- n=write(unix_tcp_sock, response, sizeof(response));
- /* CONN_ERROR wil auto-dec refcnt => we must not call tcpconn_put !!*/
- if (n<0){
+ n=send_all(unix_tcp_sock, response, sizeof(response));
+ /* CONN_ERROR will auto-dec refcnt => we must not call tcpconn_put !!*/
+ if (n<=0){
LOG(L_ERR, "BUG: tcp_send: error return failed (write):%s (%d)\n",
strerror(errno), errno);
+ n=-1;
}
close(fd);
return n; /* error return, no tcpconn_put */
@@ -737,19 +776,21 @@ static int send2child(struct tcp_connection* tcpconn)
tcp_children[idx].busy++;
tcp_children[idx].n_reqs++;
- tcpconn->refcnt++;
if (min_busy){
- LOG(L_WARN, "WARNING: send2child:no free tcp receiver, "
+ LOG(L_WARN, "WARNING: send2child: no free tcp receiver, "
" connection passed to the least busy one (%d)\n",
min_busy);
}
DBG("send2child: to tcp child %d %d(%d), %p\n", idx,
tcp_children[idx].proc_no,
tcp_children[idx].pid, tcpconn);
- send_fd(tcp_children[idx].unix_sock, &tcpconn, sizeof(tcpconn),
- tcpconn->s);
+ if (send_fd(tcp_children[idx].unix_sock, &tcpconn, sizeof(tcpconn),
+ tcpconn->s)<=0){
+ LOG(L_ERR, "ERROR: send2child: send_fd failed\n");
+ return -1;
+ }
- return 0; /* just to fix a warning*/
+ return 0;
}
@@ -776,6 +817,8 @@ static inline void handle_new_connect(struct socket_info* si,
/* add socket to list */
tcpconn=tcpconn_new(new_sock, &su, si, si->proto, S_CONN_ACCEPT);
if (tcpconn){
+ tcpconn->refcnt++; /* safe, not yet available to the
+ outside world */
tcpconn_add(tcpconn);
DBG("tcp_main_loop: new connection: %p %d\n",
tcpconn, tcpconn->s);
@@ -784,6 +827,7 @@ static inline void handle_new_connect(struct socket_info* si,
LOG(L_ERR,"ERROR: tcp_main_loop: no children "
"available\n");
TCPCONN_LOCK;
+ tcpconn->refcnt--;
if (tcpconn->refcnt==0){
close(tcpconn->s);
_tcpconn_rm(tcpconn);
@@ -795,6 +839,34 @@ static inline void handle_new_connect(struct socket_info* si,
}
+/* used internally by tcp_main_loop() */
+static void tcpconn_destroy(struct tcp_connection* tcpconn)
+{
+ int fd;
+
+ TCPCONN_LOCK; /*avoid races w/ tcp_send*/
+ tcpconn->refcnt--;
+ if (tcpconn->refcnt==0){
+ DBG("tcp_main_loop: destroying connection\n");
+ fd=tcpconn->s;
+#ifdef USE_TLS
+ /*FIXME: lock ->writelock ? */
+ if (tcpconn->type==PROTO_TLS)
+ tls_close(tcpconn, fd);
+#endif
+ _tcpconn_rm(tcpconn);
+ close(fd);
+ }else{
+ /* force timeout */
+ tcpconn->timeout=0;
+ tcpconn->state=S_CONN_BAD;
+ DBG("tcp_main_loop: delaying ...\n");
+
+ }
+ TCPCONN_UNLOCK;
+}
+
+
void tcp_main_loop()
{
int r;
@@ -843,6 +915,14 @@ void tcp_main_loop()
if (pt[r].unix_sock>maxfd) maxfd=pt[r].unix_sock;
}
}
+ for (r=0; r<tcp_children_no; r++){
+ if (tcp_children[r].unix_sock>0){ /* we can't have 0,
+ we never close it!*/
+ FD_SET(tcp_children[r].unix_sock, &master_set);
+ if (tcp_children[r].unix_sock>maxfd)
+ maxfd=tcp_children[r].unix_sock;
+ }
+ }
/* main loop*/
@@ -872,6 +952,7 @@ void tcp_main_loop()
for (h=0; h<TCP_ID_HASH_SIZE; h++){
for(tcpconn=tcpconn_id_hash[h]; tcpconn && n;
tcpconn=tcpconn->id_next){
+ /* FIXME: is refcnt==0 really necessary? */
if ((tcpconn->refcnt==0)&&(FD_ISSET(tcpconn->s, &sel_set))){
/* new data available */
n--;
@@ -879,10 +960,12 @@ void tcp_main_loop()
DBG("tcp_main_loop: data available on %p [h:%d] %d\n",
tcpconn, h, tcpconn->s);
FD_CLR(tcpconn->s, &master_set);
+ tcpconn_ref(tcpconn); /* refcnt ++ */
if (send2child(tcpconn)<0){
LOG(L_ERR,"ERROR: tcp_main_loop: no "
"children available\n");
TCPCONN_LOCK;
+ tcpconn->refcnt--;
if (tcpconn->refcnt==0){
fd=tcpconn->s;
_tcpconn_rm(tcpconn);
@@ -894,109 +977,138 @@ void tcp_main_loop()
}
}
/* check unix sockets & listen | destroy connections */
- /* start from 1, the "main" process does not transmit anything*/
- for (r=1; r<process_no && n; r++){
- if ( (pt[r].unix_sock>0) && FD_ISSET(pt[r].unix_sock, &sel_set)){
+ /* tcp_children readers first */
+ for (r=0; r<tcp_children_no && n; r++){
+ if ( (tcp_children[r].unix_sock>0) &&
+ FD_ISSET(tcp_children[r].unix_sock, &sel_set)){
/* (we can't have a fd==0, 0 is never closed )*/
n--;
- /* errno==EINTR !!! TODO*/
-read_again:
- bytes=read(pt[r].unix_sock, response, sizeof(response));
+ /* read until sizeof(response)
+ * (this is a SOCK_STREAM so read is not atomic */
+ bytes=recv_all(tcp_children[r].unix_sock, response,
+ sizeof(response));
if (bytes==0){
/* EOF -> bad, child has died */
- LOG(L_CRIT, "BUG: tcp_main_loop: dead child %d\n", r);
+ LOG(L_CRIT, "BUG: tcp_main_loop: dead tcp child %d\n", r);
/* don't listen on it any more */
- FD_CLR(pt[r].unix_sock, &master_set);
+ FD_CLR(tcp_children[r].unix_sock, &master_set);
/*exit(-1);*/
- continue;
+ continue; /* skip this and try the next one */
}else if (bytes<0){
- if (errno==EINTR) goto read_again;
- else{
- LOG(L_CRIT, "ERROR: tcp_main_loop: read from child: "
- " %s\n", strerror(errno));
- /* try to continue ? */
- continue;
- }
+ LOG(L_CRIT, "ERROR: tcp_main_loop: read from tcp child %d "
+ "%s\n", r, strerror(errno));
+ /* try to ignore ? */
+ continue; /* skip this and try the next one */
}
- DBG("tcp_main_loop: read response= %lx, %ld from %d (%d)\n",
- response[0], response[1], r, pt[r].pid);
+ DBG("tcp_main_loop: reader response= %lx, %ld from %d \n",
+ response[0], response[1], r);
cmd=response[1];
+ tcpconn=(struct tcp_connection*)response[0];
switch(cmd){
case CONN_RELEASE:
- if (pt[r].idx>=0){
- tcp_children[pt[r].idx].busy--;
- }else{
- LOG(L_CRIT, "BUG: tcp_main_loop: CONN_RELEASE\n");
- }
- tcpconn=(struct tcp_connection*)response[0];
+ tcp_children[r].busy--;
if (tcpconn){
- if (tcpconn->state==S_CONN_BAD)
- goto tcpconn_destroy;
+ if (tcpconn->state==S_CONN_BAD){
+ tcpconn_destroy(tcpconn);
+ break;
+ }
FD_SET(tcpconn->s, &master_set);
if (maxfd<tcpconn->s) maxfd=tcpconn->s;
/* update the timeout*/
tcpconn->timeout=get_ticks()+TCP_CON_TIMEOUT;
tcpconn_put(tcpconn);
- DBG("tcp_main_loop: %p refcnt= %d\n",
- tcpconn, tcpconn->refcnt);
+ DBG("tcp_main_loop: CONN_RELEASE %p"
+ " refcnt= %d\n",
+ tcpconn, tcpconn->refcnt);
}
break;
case CONN_ERROR:
case CONN_DESTROY:
case CONN_EOF:
/* WARNING: this will auto-dec. refcnt! */
- if (pt[r].idx>=0){
- tcp_children[pt[r].idx].busy--;
- }else{
- LOG(L_CRIT, "BUG: tcp_main_loop: CONN_RELEASE\n");
+ tcp_children[pt[r].idx].busy--;
+ if (tcpconn){
+ if (tcpconn->s!=-1)
+ FD_CLR(tcpconn->s, &master_set);
+ tcpconn_destroy(tcpconn);
}
- tcpconn=(struct tcp_connection*)response[0];
+ break;
+ default:
+ LOG(L_CRIT, "BUG: tcp_main_loop: unknown cmd %d"
+ " from tcp reader %d\n",
+ cmd, r);
+ }
+ }
+ }
+ /* check "send" unix sockets & listen | destroy connections */
+ /* start from 1, the "main" process does not transmit anything*/
+ for (r=1; r<process_no && n; r++){
+ if ( (pt[r].unix_sock>0) && FD_ISSET(pt[r].unix_sock, &sel_set)){
+ /* (we can't have a fd==0, 0 is never closed )*/
+ n--;
+ /* read until sizeof(response)
+ * (this is a SOCK_STREAM so read is not atomic */
+ bytes=recv_all(pt[r].unix_sock, response, sizeof(response));
+ if (bytes==0){
+ /* EOF -> bad, child has died */
+ LOG(L_CRIT, "BUG: tcp_main_loop: dead child %d\n", r);
+ /* don't listen on it any more */
+ FD_CLR(pt[r].unix_sock, &master_set);
+ /*exit(-1);*/
+ continue; /* skip this and try the next one */
+ }else if (bytes<0){
+ LOG(L_CRIT, "ERROR: tcp_main_loop: read from child: %s\n",
+ strerror(errno));
+ /* try to ignore ? */
+ continue; /* skip this and try the next one */
+ }
+
+ DBG("tcp_main_loop: read response= %lx, %ld from %d (%d)\n",
+ response[0], response[1], r, pt[r].pid);
+ cmd=response[1];
+ tcpconn=(struct tcp_connection*)response[0];
+ switch(cmd){
+ case CONN_ERROR:
if (tcpconn){
if (tcpconn->s!=-1)
FD_CLR(tcpconn->s, &master_set);
- tcpconn_destroy:
- TCPCONN_LOCK; /*avoid races w/ tcp_send*/
- tcpconn->refcnt--;
- if (tcpconn->refcnt==0){
- DBG("tcp_main_loop: destroying connection\n");
- fd=tcpconn->s;
-#ifdef USE_TLS
- if (tcpconn->type==PROTO_TLS)
- tls_close(tcpconn, fd);
-#endif
- _tcpconn_rm(tcpconn);
- close(fd);
- }else{
- /* force timeout */
- tcpconn->timeout=0;
- tcpconn->state=S_CONN_BAD;
- DBG("tcp_main_loop: delaying ...\n");
-
- }
- TCPCONN_UNLOCK;
+ tcpconn_destroy(tcpconn);
}
break;
case CONN_GET_FD:
/* send the requested FD */
- tcpconn=(struct tcp_connection*)response[0];
/* WARNING: take care of setting refcnt properly to
* avoid race condition */
if (tcpconn){
- send_fd(pt[r].unix_sock, &tcpconn,
- sizeof(tcpconn), tcpconn->s);
+ if (send_fd(pt[r].unix_sock, &tcpconn,
+ sizeof(tcpconn), tcpconn->s)<=0){
+ LOG(L_ERR, "ERROR: tcp_main_loop:"
+ "send_fd failed\n");
+ }
}else{
LOG(L_CRIT, "BUG: tcp_main_loop: null pointer\n");
}
break;
case CONN_NEW:
/* update the fd in the requested tcpconn*/
- tcpconn=(struct tcp_connection*)response[0];
/* WARNING: take care of setting refcnt properly to
* avoid race condition */
if (tcpconn){
- receive_fd(pt[r].unix_sock, &tcpconn,
- sizeof(tcpconn), &tcpconn->s);
+ bytes=receive_fd(pt[r].unix_sock, &tcpconn,
+ sizeof(tcpconn), &tcpconn->s);
+ if (bytes<sizeof(tcpconn)){
+ if (bytes<0){
+ LOG(L_CRIT, "BUG: tcp_main_loop:"
+ " CONN_NEW: receive_fd "
+ "failed\n");
+ }else{
+ LOG(L_CRIT, "BUG: tcp_main_loop:"
+ " CONN_NEW: to few bytes "
+ "received (%d)\n", bytes );
+ }
+ break; /* try to ignore */
+ }
/* add tcpconn to the list*/
tcpconn_add(tcpconn);
FD_SET(tcpconn->s, &master_set);
@@ -1012,7 +1124,7 @@ void tcp_main_loop()
cmd);
}
}
- }
+ } /* for */
/* remove old connections */
tcpconn_timeout(&master_set);
@@ -1114,6 +1226,7 @@ int tcp_init_children()
{
int r;
int sockfd[2];
+ int reader_fd[2]; /* for comm. with the tcp children read */
pid_t pid;
@@ -1127,6 +1240,11 @@ int tcp_init_children()
strerror(errno));
goto error;
}
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, reader_fd)<0){
+ LOG(L_ERR, "ERROR: tcp_main: socketpair failed: %s\n",
+ strerror(errno));
+ goto error;
+ }
process_no++;
pid=fork();
@@ -1137,11 +1255,12 @@ int tcp_init_children()
}else if (pid>0){
/* parent */
close(sockfd[1]);
+ close(reader_fd[1]);
tcp_children[r].pid=pid;
tcp_children[r].proc_no=process_no;
tcp_children[r].busy=0;
tcp_children[r].n_reqs=0;
- tcp_children[r].unix_sock=sockfd[0];
+ tcp_children[r].unix_sock=reader_fd[0];
pt[process_no].pid=pid;
pt[process_no].unix_sock=sockfd[0];
pt[process_no].idx=r;
@@ -1156,7 +1275,7 @@ int tcp_init_children()
LOG(L_ERR, "init_children failed\n");
goto error;
}
- tcp_receive_loop(sockfd[1]);
+ tcp_receive_loop(reader_fd[1]);
}
}
return 0;
View
42 tcp_read.c
@@ -399,20 +399,11 @@ int tcp_read_req(struct tcp_connection* con)
req=&con->req;
#ifdef USE_TLS
if (con->type==PROTO_TLS){
- if (con->state==S_CONN_ACCEPT){
- if (tls_accept(con, 0)!=0){
- resp=CONN_ERROR;
- goto end_req;
- }
- if(con->state!=S_CONN_OK) goto end_req; /* not enough data */
- }
- if(con->state==S_CONN_CONNECT){
- if (tls_connect(con, 0)!=0){
- resp=CONN_ERROR;
- goto end_req;
- }
- if(con->state!=S_CONN_OK) goto end_req; /* not enough data */
+ if (tls_fix_read_conn(con)!=0){
+ resp=CONN_ERROR;
+ goto end_req;
}
+ if(con->state!=S_CONN_OK) goto end_req; /* not enough data */
}
#endif
@@ -547,7 +538,8 @@ void release_tcpconn(struct tcp_connection* c, long state, int unix_sock)
/* errno==EINTR, EWOULDBLOCK a.s.o todo */
response[0]=(long)c;
response[1]=state;
- write(unix_sock, response, sizeof(response));
+ if (send_all(unix_sock, response, sizeof(response))<=0)
+ LOG(L_ERR, "ERROR: release_tcpconn: send_all failed\n");
}
@@ -625,12 +617,18 @@ void tcp_receive_loop(int unix_sock)
release_tcpconn(con, resp, unix_sock);
goto skip;
}
-#ifdef USE_TLS
- if (con->type==PROTO_TLS) tls_tcpconn_update_fd(con, s);
-#endif
con->timeout=get_ticks()+TCP_CHILD_TIMEOUT;
FD_SET(s, &master_set);
if (maxfd<s) maxfd=s;
+ if (con==list){
+ LOG(L_CRIT, "BUG: tcp_receive_loop: duplicate"
+ " connection recevied: %p, id %d, fd %d, refcnt %d"
+ " state %d (n=%d)\n", con, con->id, con->fd,
+ con->refcnt, con->state, n);
+ resp=CONN_ERROR;
+ release_tcpconn(con, resp, unix_sock);
+ goto skip; /* try to recover */
+ }
tcpconn_listadd(list, con, c_next, c_prev);
}
skip:
@@ -641,12 +639,22 @@ void tcp_receive_loop(int unix_sock)
DBG("tcp receive: list fd=%d, id=%d, timeout=%d, refcnt=%d\n",
con->fd, con->id, con->timeout, con->refcnt);
#endif
+ if (con->state<0){
+ /* S_CONN_BAD or S_CONN_ERROR, remove it */
+ resp=CONN_ERROR;
+ FD_CLR(con->fd, &master_set);
+ tcpconn_listrm(list, con, c_next, c_prev);
+ con->state=S_CONN_BAD;
+ release_tcpconn(con, resp, unix_sock);
+ continue;
+ }
if (nfds && FD_ISSET(con->fd, &sel_set)){
#ifdef EXTRA_DEBUG
DBG("tcp receive: match, fd:isset\n");
#endif
nfds--;
resp=tcp_read_req(con);
+
if (resp<0){
FD_CLR(con->fd, &master_set);
tcpconn_listrm(list, con, c_next, c_prev);
View
2 test/test.cfg
@@ -26,7 +26,7 @@ rev_dns=off # (cmd. line: -R)
alias=iptel.org
alias="foo.bar"
fifo="/tmp/ser_fifo"
-listen= tcp:10.0.0.179:5065
+#listen= tcp:10.0.0.179:5065
alias= tcp:all:5065
tcp_accept_aliases=yes
View
31 test/udp_flood.c
@@ -50,6 +50,8 @@ Options:\n\
-d address destination address\n\
-p port destination port\n\
-c count number of packets to be sent\n\
+ -s usec microseconds to sleep before sending \"throttle\" packets\n\
+ -t throttle number of packets to send before sleeping\n\
-v increase verbosity level\n\
-V version number\n\
-h this help message\n\
@@ -74,6 +76,9 @@ int main (int argc, char** argv)
char *fname;
char *dst;
int port;
+ long usec;
+ int throttle;
+ int t;
/* init */
count=0;
@@ -81,9 +86,11 @@ int main (int argc, char** argv)
fname=0;
dst=0;
port=0;
+ usec=0;
+ throttle=0;
opterr=0;
- while ((c=getopt(argc,argv, "f:c:d:p:vhV"))!=-1){
+ while ((c=getopt(argc,argv, "f:c:d:p:s:t:vhV"))!=-1){
switch(c){
case 'f':
fname=optarg;
@@ -108,6 +115,20 @@ int main (int argc, char** argv)
goto error;
}
break;
+ case 's':
+ usec=strtol(optarg, &tmp, 10);
+ if ((tmp==0)||(*tmp)){
+ fprintf(stderr, "bad count: -c %s\n", optarg);
+ goto error;
+ }
+ break;
+ case 't':
+ throttle=strtol(optarg, &tmp, 10);
+ if ((tmp==0)||(*tmp)){
+ fprintf(stderr, "bad count: -c %s\n", optarg);
+ goto error;
+ }
+ break;
case 'V':
printf("version: %s\n", version);
printf("%s\n",id);
@@ -197,12 +218,20 @@ int main (int argc, char** argv)
/* flood loop */
+ t=throttle;
for (r=0; r<count; r++){
if ((verbose>1)&&(r%1000)) putchar('.');
if (send(sock, buf, n, 0)==-1) {
fprintf(stderr, "Error: send: %s\n", strerror(errno));
exit(1);
}
+ if (usec){
+ t--;
+ if (t==0){
+ usleep(usec);
+ t=throttle;
+ }
+ }
}
printf("\n%d packets sent, %d bytes each => total %d bytes\n",
count, n, n*count);

0 comments on commit 06aaa54

Please sign in to comment.
Something went wrong with that request. Please try again.