Skip to content

Commit

Permalink
Debug-validation: track thread id in flow
Browse files Browse the repository at this point in the history
In workers mode, all packets of a single flow should be handled by
a single thread. In autofp mode, all packets in a flow should be
handled by the same stream/detect thread.

This patch stores the id in the flow and aborts if more than one
thread handling a flow.

The check and store are done from the stream engine, so the autofp
case works as well.
  • Loading branch information
victorjulien committed Mar 8, 2014
1 parent 3cb039c commit 939e3e3
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 4 deletions.
10 changes: 10 additions & 0 deletions src/flow-util.h
Expand Up @@ -39,6 +39,14 @@
#define RESET_COUNTERS(f)
#endif

#ifdef DEBUG_VALIDATION
#define RESET_THREAD_ID(f) do { \
(f)->thread_id = 0; \
} while (0)
#else
#define RESET_THREAD_ID(f)
#endif

#define FLOW_INITIALIZE(f) do { \
(f)->sp = 0; \
(f)->dp = 0; \
Expand Down Expand Up @@ -70,6 +78,7 @@
SC_ATOMIC_INIT((f)->autofp_tmqh_flow_qid); \
(void) SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \
RESET_COUNTERS((f)); \
RESET_THREAD_ID((f)); \
} while (0)

/** \brief macro to recycle a flow before it goes into the spare queue for reuse.
Expand Down Expand Up @@ -107,6 +116,7 @@
(void) SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \
} \
RESET_COUNTERS((f)); \
RESET_THREAD_ID((f)); \
} while(0)

#define FLOW_DESTROY(f) do { \
Expand Down
3 changes: 3 additions & 0 deletions src/flow.h
Expand Up @@ -382,6 +382,9 @@ typedef struct Flow_
uint32_t tosrcpktcnt;
uint64_t bytecnt;
#endif
#ifdef DEBUG_VALIDATION
u_long thread_id;
#endif
} Flow;

enum {
Expand Down
11 changes: 11 additions & 0 deletions src/stream-tcp.c
Expand Up @@ -4171,6 +4171,17 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,

DEBUG_ASSERT_FLOW_LOCKED(p->flow);

#ifdef DEBUG_VALIDATION
if (p->pkt_src == PKT_SRC_WIRE) {
if (p->flow->thread_id == 0)
p->flow->thread_id = tv->thread_id;
else {
SCLogDebug("tv->id %d", (int)tv->thread_id);
BUG_ON(p->flow->thread_id != tv->thread_id);
}
}
#endif

SCLogDebug("p->pcap_cnt %"PRIu64, p->pcap_cnt);

TcpSession *ssn = (TcpSession *)p->flow->protoctx;
Expand Down
3 changes: 1 addition & 2 deletions src/threadvars.h
Expand Up @@ -69,8 +69,7 @@ typedef struct ThreadVars_ {
/** no of times the thread has been restarted on failure */
uint8_t restarted;

/** local id */
int id;
u_long thread_id;

/** queue's */
Tmq *inq;
Expand Down
8 changes: 6 additions & 2 deletions src/tm-threads.c
Expand Up @@ -646,7 +646,9 @@ void *TmThreadsSlotPktAcqLoop(void *td) {

if (tv->thread_setup_flags != 0)
TmThreadSetupOptions(tv);

#ifdef DEBUG_VALIDATION
tv->thread_id = SCGetThreadIdLong();
#endif
/* Drop the capabilities for this thread */
SCDropCaps(tv);

Expand Down Expand Up @@ -759,7 +761,9 @@ void *TmThreadsSlotVar(void *td)

if (tv->thread_setup_flags != 0)
TmThreadSetupOptions(tv);

#ifdef DEBUG_VALIDATION
tv->thread_id = SCGetThreadIdLong();
#endif
/* Drop the capabilities for this thread */
SCDropCaps(tv);

Expand Down

0 comments on commit 939e3e3

Please sign in to comment.