Skip to content

Commit

Permalink
Fixes missing flows on mysql after shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
cardigliano committed Dec 20, 2018
1 parent e3e82c6 commit 706bbf8
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 13 deletions.
2 changes: 1 addition & 1 deletion include/Flow.h
Expand Up @@ -96,7 +96,7 @@ class Flow : public GenericHashEntry {
u_int32_t last_conntrack_update;
u_int32_t marker;
#endif

union {
struct {
char *last_url, *last_method;
Expand Down
7 changes: 4 additions & 3 deletions include/MySQLDB.h
Expand Up @@ -29,7 +29,8 @@ class MySQLDB : public DB {
MYSQL mysql;
bool db_operational;
struct timeval lastUpdateTime;
u_int32_t mysqlDroppedFlows;
u_int32_t mysqlDroppedFlows, mysqlConsumerDroppedFlows;
u_int32_t mysqlEnqueuedFlows;
u_int64_t mysqlExportedFlows, mysqlLastExportedFlows;
float mysqlExportRate;

Expand All @@ -55,9 +56,9 @@ class MySQLDB : public DB {
void checkPointCounters(bool drops_only) {
if(!drops_only)
checkpointExportedFlows = mysqlExportedFlows;
checkpointDroppedFlows = mysqlDroppedFlows;
checkpointDroppedFlows = mysqlDroppedFlows + mysqlConsumerDroppedFlows;
};
inline u_int32_t numDroppedFlows() const { return mysqlDroppedFlows; };
inline u_int32_t numDroppedFlows() const { return mysqlDroppedFlows + mysqlConsumerDroppedFlows; };
inline float exportRate() const { return mysqlExportRate; };
static char *escapeAphostrophes(const char *unescaped);
int flow2InsertValues(Flow *f, char *json, char *values_buf, size_t values_buf_len) const;
Expand Down
13 changes: 13 additions & 0 deletions include/SPSCQueue.h
Expand Up @@ -32,6 +32,7 @@ class SPSCQueue {
Mutex *m;

public:

SPSCQueue(bool multi_consumer = false) {
q = (spsc_queue_t *) calloc(1, sizeof(spsc_queue_t));
if(q == NULL) throw 1;
Expand All @@ -50,6 +51,18 @@ class SPSCQueue {

/* ************************************** */

inline bool isNotEmpty() {
u_int32_t next_tail;
bool rc;
if(m) m->lock(__FILE__, __LINE__);
next_tail = (q->shadow_tail + 1) & QUEUE_ITEMS_MASK;
rc = (next_tail != q->head);
if(m) m->unlock(__FILE__, __LINE__);
return(rc);
}

/* ************************************** */

inline bool dequeue(void** item) {
u_int32_t next_tail;
bool rc;
Expand Down
6 changes: 4 additions & 2 deletions src/Flow.cpp
Expand Up @@ -859,9 +859,11 @@ bool Flow::dumpFlow(bool dump_alert) {
}

if(!idle()) {
if((now - get_first_seen()) < CONST_DB_DUMP_FREQUENCY
|| (now - last_db_dump.last_dump) < CONST_DB_DUMP_FREQUENCY)
if(iface->getIfType() == interface_type_PCAP_DUMP
|| (now - get_first_seen()) < CONST_DB_DUMP_FREQUENCY
|| (now - last_db_dump.last_dump) < CONST_DB_DUMP_FREQUENCY) {
return(rc);
}
} else {
/* flows idle, i.e., ready to be purged, are always dumped */
}
Expand Down
5 changes: 3 additions & 2 deletions src/MySQLDB.cpp
Expand Up @@ -473,7 +473,8 @@ bool MySQLDB::createNprobeDBView() {
/* ******************************************* */

MySQLDB::MySQLDB(NetworkInterface *_iface) : DB(_iface) {
mysqlDroppedFlows = 0;
mysqlDroppedFlows = mysqlConsumerDroppedFlows = 0;
mysqlEnqueuedFlows = 0;
mysqlExportedFlows = 0, mysqlLastExportedFlows = 0;
mysqlExportRate = 0;
checkpointDroppedFlows = checkpointExportedFlows = 0;
Expand Down Expand Up @@ -531,7 +532,7 @@ void MySQLDB::lua(lua_State *vm, bool since_last_checkpoint) const {
lua_push_uint64_table_entry(vm, "flow_export_count",
mysqlExportedFlows - (since_last_checkpoint ? checkpointExportedFlows : 0));
lua_push_int32_table_entry(vm, "flow_export_drops",
mysqlDroppedFlows - (since_last_checkpoint ? checkpointDroppedFlows : 0));
(mysqlDroppedFlows + mysqlConsumerDroppedFlows) - (since_last_checkpoint ? checkpointDroppedFlows : 0));
lua_push_float_table_entry(vm, "flow_export_rate",
mysqlExportRate >= 0 ? mysqlExportRate : 0);
}
Expand Down
9 changes: 6 additions & 3 deletions src/NetworkInterface.cpp
Expand Up @@ -586,8 +586,6 @@ NetworkInterface::~NetworkInterface() {
}
#endif

if(db) db->shutdown();

if(getNumPackets() > 0) {
ntop->getTrace()->traceEvent(TRACE_NORMAL,
"Flushing host contacts for interface %s",
Expand All @@ -597,7 +595,11 @@ NetworkInterface::~NetworkInterface() {

deleteDataStructures();

if(db) delete db;
if(db) {
/* note: keep this after deleteDataStructures to flush aggregated flows */
db->shutdown();
delete db;
}
if(host_pools) delete host_pools; /* note: this requires ndpi_struct */
if(ifDescription) free(ifDescription);
if(discovery) delete discovery;
Expand Down Expand Up @@ -5339,6 +5341,7 @@ void NetworkInterface::runShutdownTasks() {
if (ntop->getPrefs()->flushFlowsOnShutdown()) {
/* Setting all flows as "ready to purge" (see isReadyToPurge) and dump them to the DB */
periodicStatsUpdate();
flushFlowDump();
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/Prefs.cpp
Expand Up @@ -28,7 +28,7 @@ Prefs::Prefs(Ntop *_ntop) {
ntop = _ntop, sticky_hosts = location_none,
ignore_vlans = false, simulate_vlans = false;
local_networks = strdup(CONST_DEFAULT_HOME_NET "," CONST_DEFAULT_LOCAL_NETS);
local_networks_set = false, shutdown_when_done = false, flush_flows_on_shutdown = false;
local_networks_set = false, shutdown_when_done = false, flush_flows_on_shutdown = true;
enable_users_login = true, disable_localhost_login = false;
enable_dns_resolution = sniff_dns_responses = true, use_promiscuous_mode = true;
resolve_all_host_ip = false, online_license_check = false, service_license_check = false;
Expand Down Expand Up @@ -1316,7 +1316,6 @@ int Prefs::setOption(int optkey, char *optarg) {

case 213:
shutdown_when_done = true;
flush_flows_on_shutdown = true;
break;

case 214:
Expand Down

0 comments on commit 706bbf8

Please sign in to comment.