Skip to content
Permalink
Browse files

Really Great commit!

~100 times speed up for screen redraw and speed recalculation thread.

i7 3820 could process /11 (2 097 150 hosts!) network without any issues:

Screen updated in:              0 sec 265136 microseconds
Traffic calculated in:          0 sec 571686 microseconds

We switched from std::map to statically allocated vectors.
We optimized speed of allocation of this vectors.
We added exception handling in case of memory allocation error.
We do not process zero speed counters at all (huge speedup!)
We use partial_port instead sort (huge speedup!)
We switched to average data from Graphite
Disable connection tracking code overhead when it's disabled.

Related #290
Related #111
  • Loading branch information...
pavel-odintsov committed Jun 16, 2015
1 parent 4b428a1 commit eae33ce39f2128b68c3d2e8340ca2e197c12126d
Showing with 108 additions and 101 deletions.
  1. +108 −101 src/fastnetmon.cpp
@@ -6,6 +6,7 @@
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <new>
#include <signal.h>
#include <time.h>
#include <math.h>
@@ -213,24 +214,18 @@ map_for_subnet_counters PerSubnetAverageSpeedMap;
map_of_vector_counters_for_flow SubnetVectorMapFlow;

/* End of our data structs */

boost::mutex data_counters_mutex;
boost::mutex speed_counters_mutex;
boost::mutex total_counters_mutex;

boost::mutex ban_list_details_mutex;

boost::mutex ban_list_mutex;
boost::mutex flow_counter;

// map for flows
std::map<uint64_t, int> FlowCounter;

// Struct for string speed per IP
map_for_counters SpeedCounter;
map_of_vector_counters SubnetVectorMapSpeed;

// Struct for storing average speed per IP for specified interval
map_for_counters SpeedCounterAverage;
map_of_vector_counters SubnetVectorMapSpeedAverage;

#ifdef GEOIP
map_for_counters GeoIpCounter;
@@ -457,21 +452,51 @@ void store_data_in_redis(std::string key_name, std::string attack_details) {
}
#endif

std::string draw_table(map_for_counters& my_map_packets, direction data_direction, bool do_redis_update, sort_type sort_item) {
std::string draw_table(direction data_direction, bool do_redis_update, sort_type sort_item) {
std::vector<pair_of_map_elements> vector_for_sort;

std::stringstream output_buffer;

// Preallocate memory for sort vector
vector_for_sort.reserve(my_map_packets.size());
// We use total networks size for this vector
vector_for_sort.reserve(total_number_of_hosts_in_our_networks);

for (map_for_counters::iterator ii = my_map_packets.begin(); ii != my_map_packets.end(); ++ii) {
// store all elements into vector for sorting
vector_for_sort.push_back(std::make_pair((*ii).first, (*ii).second));
// Switch to Average speed there!!!
map_of_vector_counters* current_speed_map = NULL;

if (print_average_traffic_counts) {
current_speed_map = &SubnetVectorMapSpeedAverage;
} else {
current_speed_map = &SubnetVectorMapSpeed;
}

map_element zero_map_element;
memset(&zero_map_element, 0, sizeof(zero_map_element));

unsigned int count_of_zero_speed_packets = 0;
for (map_of_vector_counters::iterator itr = current_speed_map->begin(); itr != current_speed_map->end(); ++itr) {
for (vector_of_counters::iterator vector_itr = itr->second.begin(); vector_itr != itr->second.end(); ++vector_itr) {
int current_index = vector_itr - itr->second.begin();

// convert to host order for math operations
uint32_t subnet_ip = ntohl(itr->first.first);
uint32_t client_ip_in_host_bytes_order = subnet_ip + current_index;

// covnert to our standard network byte order
uint32_t client_ip = htonl(client_ip_in_host_bytes_order);

// Do not add zero speed packets to sort list
if (memcmp((void*)&zero_map_element, &*vector_itr, sizeof(map_element)) != 0) {
vector_for_sort.push_back(std::make_pair(client_ip, *vector_itr));
} else {
count_of_zero_speed_packets++;
}
}
}

if (data_direction == INCOMING or data_direction == OUTGOING) {
std::sort(vector_for_sort.begin(), vector_for_sort.end(),
// Sort only first max_ips_in_list elements in this vector
std::partial_sort(vector_for_sort.begin(), vector_for_sort.begin() + max_ips_in_list, vector_for_sort.end(),
TrafficComparatorClass(data_direction, sort_item));
} else {
logger << log4cpp::Priority::ERROR << "Unexpected bahaviour on sort function";
@@ -483,8 +508,7 @@ std::string draw_table(map_for_counters& my_map_packets, direction data_directio
unsigned int element_number = 0;

// In this loop we print only top X talkers in our subnet to screen buffer
for (std::vector<pair_of_map_elements>::iterator ii = vector_for_sort.begin();
ii != vector_for_sort.end(); ++ii) {
for (std::vector<pair_of_map_elements>::iterator ii = vector_for_sort.begin(); ii != vector_for_sort.end(); ++ii) {
uint32_t client_ip = (*ii).first;
std::string client_ip_as_string = convert_ip_as_uint_to_string((*ii).first);

@@ -496,36 +520,18 @@ std::string draw_table(map_for_counters& my_map_packets, direction data_directio
uint64_t bps_average = 0;
uint64_t flows_average = 0;

map_element* current_speed_element = &SpeedCounter[client_ip];
// Here we could have average or instantaneous speed
map_element* current_speed_element = &ii->second;

// Create polymorphic pps, byte and flow counters
if (print_average_traffic_counts) {
map_element* current_average_speed_element = &SpeedCounterAverage[client_ip];

if (data_direction == INCOMING) {
pps_average = current_average_speed_element->in_packets;
bps_average = current_average_speed_element->in_bytes;
flows_average = current_average_speed_element->in_flows;
} else if (data_direction == OUTGOING) {
pps_average = current_average_speed_element->out_packets;
bps_average = current_average_speed_element->out_bytes;
flows_average = current_average_speed_element->out_flows;
}
}

// If we want absolute counters or we use graphite (it uses absoulute counters)
if (graphite_enabled or !graphite_enabled) {
map_element* current_speed_element = &SpeedCounter[client_ip];

if (data_direction == INCOMING) {
pps = current_speed_element->in_packets;
bps = current_speed_element->in_bytes;
flows = current_speed_element->in_flows;
} else if (data_direction == OUTGOING) {
pps = current_speed_element->out_packets;
bps = current_speed_element->out_bytes;
flows = current_speed_element->out_flows;
}
if (data_direction == INCOMING) {
pps = current_speed_element->in_packets;
bps = current_speed_element->in_bytes;
flows = current_speed_element->in_flows;
} else if (data_direction == OUTGOING) {
pps = current_speed_element->out_packets;
bps = current_speed_element->out_bytes;
flows = current_speed_element->out_flows;
}

uint64_t mbps = convert_speed_to_mbps(bps);
@@ -553,20 +559,19 @@ std::string draw_table(map_for_counters& my_map_packets, direction data_directio
ip_as_string_with_dash_delimiters.end(), '.', '_');

std::string graphite_current_prefix = graphite_prefix + "." + ip_as_string_with_dash_delimiters + "." + direction_as_string;

if (print_average_traffic_counts) {
graphite_current_prefix = graphite_current_prefix + ".average";
}

graphite_data[ graphite_current_prefix + ".pps" ] = pps;
graphite_data[ graphite_current_prefix + ".mbps" ] = mbps;
graphite_data[ graphite_current_prefix + ".flows" ] = flows;
}

if (print_average_traffic_counts) {
output_buffer << std::setw(6) << pps_average << " pps ";
output_buffer << std::setw(6) << mbps_average << " mbps ";
output_buffer << std::setw(6) << flows_average << " flows ";
} else {
output_buffer << std::setw(6) << pps << " pps ";
output_buffer << std::setw(6) << mbps << " mbps ";
output_buffer << std::setw(6) << flows << " flows ";
}
output_buffer << std::setw(6) << pps << " pps ";
output_buffer << std::setw(6) << mbps << " mbps ";
output_buffer << std::setw(6) << flows << " flows ";

output_buffer << is_banned << std::endl;
} else {
@@ -914,13 +919,18 @@ void subnet_vectors_allocator(prefix_t* prefix, void* data) {

subnet_t current_subnet = std::make_pair(subnet_as_integer, bitlen);

// Initialize map element
SubnetVectorMap[current_subnet] = vector_of_counters(network_size_in_ips);

// Zeroify all vector elements
map_element zero_map_element;
memset(&zero_map_element, 0, sizeof(zero_map_element));
std::fill(SubnetVectorMap[current_subnet].begin(), SubnetVectorMap[current_subnet].end(), zero_map_element);

// Initilize our counters with fill constructor
try {
SubnetVectorMap[current_subnet] = vector_of_counters(network_size_in_ips, zero_map_element);
SubnetVectorMapSpeed[current_subnet] = vector_of_counters(network_size_in_ips, zero_map_element);
SubnetVectorMapSpeedAverage[current_subnet] = vector_of_counters(network_size_in_ips, zero_map_element);
} catch (std::bad_alloc& ba) {
logger << log4cpp::Priority::ERROR << "Can't allocate memory for counters";
exit(1);
}

// Initilize map element
SubnetVectorMapFlow[current_subnet] = vector_of_flow_counters(network_size_in_ips);
@@ -1595,29 +1605,35 @@ void recalculate_speed() {

conntrack_main_struct* flow_counter_ptr = &SubnetVectorMapFlow[itr->first][current_index];

// todo: optimize this operations!
uint64_t total_out_flows =
(uint64_t)flow_counter_ptr->out_tcp.size() + (uint64_t)flow_counter_ptr->out_udp.size() +
(uint64_t)flow_counter_ptr->out_icmp.size() + (uint64_t)flow_counter_ptr->out_other.size();

uint64_t total_in_flows =
(uint64_t)flow_counter_ptr->in_tcp.size() + (uint64_t)flow_counter_ptr->in_udp.size() +
(uint64_t)flow_counter_ptr->in_icmp.size() + (uint64_t)flow_counter_ptr->in_other.size();

new_speed_element.out_flows = uint64_t((double)total_out_flows / speed_calc_period);
new_speed_element.in_flows = uint64_t((double)total_in_flows / speed_calc_period);

// Increment global counter
incoming_total_flows += new_speed_element.in_flows;
outgoing_total_flows += new_speed_element.out_flows;
if (enable_conection_tracking) {
// todo: optimize this operations!
// it's really bad and SLOW CODE
uint64_t total_out_flows =
(uint64_t)flow_counter_ptr->out_tcp.size() + (uint64_t)flow_counter_ptr->out_udp.size() +
(uint64_t)flow_counter_ptr->out_icmp.size() + (uint64_t)flow_counter_ptr->out_other.size();

uint64_t total_in_flows =
(uint64_t)flow_counter_ptr->in_tcp.size() + (uint64_t)flow_counter_ptr->in_udp.size() +
(uint64_t)flow_counter_ptr->in_icmp.size() + (uint64_t)flow_counter_ptr->in_other.size();

new_speed_element.out_flows = uint64_t((double)total_out_flows / speed_calc_period);
new_speed_element.in_flows = uint64_t((double)total_in_flows / speed_calc_period);

// Increment global counter
outgoing_total_flows += new_speed_element.out_flows;
incoming_total_flows += new_speed_element.in_flows;
} else {
new_speed_element.out_flows = 0;
new_speed_element.in_flows = 0;
}

/* Moving average recalculation */
// http://en.wikipedia.org/wiki/Moving_average#Application_to_measuring_computer_performance
// double speed_calc_period = 1;
double exp_power = -speed_calc_period / average_calculation_amount;
double exp_value = exp(exp_power);

map_element* current_average_speed_element = &SpeedCounterAverage[client_ip];
map_element* current_average_speed_element = &SubnetVectorMapSpeedAverage[itr->first][current_index];

current_average_speed_element->in_bytes = uint64_t(
new_speed_element.in_bytes +
@@ -1636,13 +1652,15 @@ void recalculate_speed() {
exp_value * ((double)current_average_speed_element->out_packets -
(double)new_speed_element.out_packets));

current_average_speed_element->out_flows =
uint64_t(new_speed_element.out_flows +
exp_value * ((double)current_average_speed_element->out_flows -
(double)new_speed_element.out_flows));
current_average_speed_element->in_flows = uint64_t(
new_speed_element.in_flows +
exp_value * ((double)current_average_speed_element->in_flows - (double)new_speed_element.in_flows));
if (enable_conection_tracking) {
current_average_speed_element->out_flows = uint64_t(
new_speed_element.out_flows +
exp_value * ((double)current_average_speed_element->out_flows - (double)new_speed_element.out_flows));

current_average_speed_element->in_flows = uint64_t(
new_speed_element.in_flows +
exp_value * ((double)current_average_speed_element->in_flows - (double)new_speed_element.in_flows));
}

/* Moving average recalculation end */

@@ -1658,15 +1676,9 @@ void recalculate_speed() {
execute_ip_ban(client_ip, new_speed_element, *current_average_speed_element, flow_attack_details, itr->first);
}

speed_counters_mutex.lock();
// map_element* current_speed_element = &SpeedCounter[client_ip];
//*current_speed_element = new_speed_element;
SpeedCounter[client_ip] = new_speed_element;
speed_counters_mutex.unlock();
SubnetVectorMapSpeed[itr->first][current_index] = new_speed_element;

data_counters_mutex.lock();
*vector_itr = zero_map_element;
data_counters_mutex.unlock();
}
}

@@ -1688,10 +1700,8 @@ void recalculate_speed() {
uint64_t((double)total_counters[index].packets / (double)speed_calc_period);

// nullify data counters after speed calculation
// total_counters_mutex.lock();
total_counters[index].bytes = 0;
total_counters[index].packets = 0;
// total_counters_mutex.unlock();
}

// Set time of previous startup
@@ -1741,12 +1751,12 @@ void traffic_draw_programm() {
<< "IPs ordered by: " << sort_parameter << "\n";

output_buffer << print_channel_speed("Incoming traffic", INCOMING) << std::endl;
output_buffer << draw_table(SpeedCounter, INCOMING, true, sorter);
output_buffer << draw_table(INCOMING, true, sorter);

output_buffer << std::endl;

output_buffer << print_channel_speed("Outgoing traffic", OUTGOING) << std::endl;
output_buffer << draw_table(SpeedCounter, OUTGOING, false, sorter);
output_buffer << draw_table(OUTGOING, false, sorter);

output_buffer << std::endl;

@@ -2380,16 +2390,13 @@ void execute_ip_ban(uint32_t client_ip, map_element speed_element, map_element a
current_attack.udp_in_bytes = speed_element.udp_in_bytes;
current_attack.icmp_in_bytes = speed_element.icmp_in_bytes;

// Add average counters
map_element* current_average_speed_element = &SpeedCounterAverage[client_ip];

current_attack.average_in_packets = current_average_speed_element->in_packets;
current_attack.average_in_bytes = current_average_speed_element->in_bytes;
current_attack.average_in_flows = current_average_speed_element->in_flows;
current_attack.average_in_packets = average_speed_element.in_packets;
current_attack.average_in_bytes = average_speed_element.in_bytes;
current_attack.average_in_flows = average_speed_element.in_flows;

current_attack.average_out_packets = current_average_speed_element->out_packets;
current_attack.average_out_bytes = current_average_speed_element->out_bytes;
current_attack.average_out_flows = current_average_speed_element->out_flows;
current_attack.average_out_packets = average_speed_element.out_packets;
current_attack.average_out_bytes = average_speed_element.out_bytes;
current_attack.average_out_flows = average_speed_element.out_flows;

ban_list_mutex.lock();
ban_list[client_ip] = current_attack;

0 comments on commit eae33ce

Please sign in to comment.
You can’t perform that action at this time.