Skip to content

Commit

Permalink
Added support for Memory Model Aware Atomic Operations. Closes #703
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-odintsov committed May 4, 2022
1 parent 15788f8 commit dad1c7b
Showing 1 changed file with 105 additions and 4 deletions.
109 changes: 105 additions & 4 deletions src/fastnetmon_logic.cpp
Expand Up @@ -2671,6 +2671,19 @@ void process_packet(simple_packet_t& current_packet) {
}

// Increment counter about total number of packets processes here
#ifdef USE_NEW_ATOMIC_BUILTINS
__atomic_add_fetch(&total_simple_packets_processed, 1, __ATOMIC_RELAXED);

if (current_packet.ip_protocol_version == 4) {
__atomic_add_fetch(&total_ipv4_packets, 1, __ATOMIC_RELAXED);
} else if (current_packet.ip_protocol_version == 6) {
__atomic_add_fetch(&total_ipv6_packets, 1, __ATOMIC_RELAXED);
} else {
// Non IP packets
__atomic_add_fetch(&non_ip_packets, 1, __ATOMIC_RELAXED);
return;
}
#else
__sync_fetch_and_add(&total_simple_packets_processed, 1);

if (current_packet.ip_protocol_version == 4) {
Expand All @@ -2682,6 +2695,7 @@ void process_packet(simple_packet_t& current_packet) {
__sync_fetch_and_add(&non_ip_packets, 1);
return;
}
#endif

uint64_t sampled_number_of_packets = current_packet.number_of_packets * current_packet.sample_ratio;
uint64_t sampled_number_of_bytes = current_packet.length * current_packet.sample_ratio;
Expand All @@ -2692,13 +2706,15 @@ void process_packet(simple_packet_t& current_packet) {
current_packet.packet_direction =
get_packet_direction_ipv6(lookup_tree_ipv6, current_packet.src_ipv6, current_packet.dst_ipv6, ipv6_cidr_subnet);

__sync_fetch_and_add(&total_counters_ipv6[current_packet.packet_direction].packets, sampled_number_of_packets);
__sync_fetch_and_add(&total_counters_ipv6[current_packet.packet_direction].bytes, sampled_number_of_bytes);


#ifdef USE_NEW_ATOMIC_BUILTINS
__atomic_add_fetch(&total_counters_ipv6[current_packet.packet_direction].packets, sampled_number_of_packets, __ATOMIC_RELAXED)
__atomic_add_fetch(&total_counters_ipv6[current_packet.packet_direction].bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);

__atomic_add_fetch(&total_ipv6_packets, 1, __ATOMIC_RELAXED);
#else
__sync_fetch_and_add(&total_counters_ipv6[current_packet.packet_direction].packets, sampled_number_of_packets);
__sync_fetch_and_add(&total_counters_ipv6[current_packet.packet_direction].bytes, sampled_number_of_bytes);

__sync_fetch_and_add(&total_ipv6_packets, 1);
#endif

Expand Down Expand Up @@ -2930,6 +2946,45 @@ void process_packet(simple_packet_t& current_packet) {
}
}

#ifdef USE_NEW_ATOMIC_BUILTINS
// Increment fields using data from specified packet
void increment_outgoing_counters(map_element_t* current_element,
simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes) {

// Update last update time
current_element->last_update_time = current_inaccurate_time;

// Main packet/bytes counter
__atomic_add_fetch(&current_element->out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);

// Fragmented IP packets
if (current_packet.ip_fragmented) {
__atomic_add_fetch(&current_element->fragmented_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->fragmented_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
}

if (current_packet.protocol == IPPROTO_TCP) {
__atomic_add_fetch(&current_element->tcp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->tcp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);

if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
__atomic_add_fetch(&current_element->tcp_syn_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->tcp_syn_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
}
} else if (current_packet.protocol == IPPROTO_UDP) {
__atomic_add_fetch(&current_element->udp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->udp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
} else if (current_packet.protocol == IPPROTO_ICMP) {
__atomic_add_fetch(&current_element->icmp_out_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->icmp_out_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
// no flow tracking for icmp
} else {
}
}
#else
// Increment fields using data from specified packet
void increment_outgoing_counters(map_element_t* current_element,
simple_packet_t& current_packet,
Expand Down Expand Up @@ -2967,6 +3022,50 @@ void increment_outgoing_counters(map_element_t* current_element,
} else {
}
}
#endif

#ifdef USE_NEW_ATOMIC_BUILTINS

// This function increments all our accumulators according to data from packet
void increment_incoming_counters(map_element_t* current_element,
simple_packet_t& current_packet,
uint64_t sampled_number_of_packets,
uint64_t sampled_number_of_bytes) {

// Uodate last update time
current_element->last_update_time = current_inaccurate_time;

// Main packet/bytes counter
__atomic_add_fetch(&current_element->in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);

// Count fragmented IP packets
if (current_packet.ip_fragmented) {
__atomic_add_fetch(&current_element->fragmented_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->fragmented_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
}

// Count per protocol packets
if (current_packet.protocol == IPPROTO_TCP) {
__atomic_add_fetch(&current_element->tcp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->tcp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);

if (extract_bit_value(current_packet.flags, TCP_SYN_FLAG_SHIFT)) {
__atomic_add_fetch(&current_element->tcp_syn_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->tcp_syn_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
}
} else if (current_packet.protocol == IPPROTO_UDP) {
__atomic_add_fetch(&current_element->udp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->udp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
} else if (current_packet.protocol == IPPROTO_ICMP) {
__atomic_add_fetch(&current_element->icmp_in_packets, sampled_number_of_packets, __ATOMIC_RELAXED);
__atomic_add_fetch(&current_element->icmp_in_bytes, sampled_number_of_bytes, __ATOMIC_RELAXED);
} else {
// TBD
}
}

#else

// This function increments all our accumulators according to data from packet
void increment_incoming_counters(map_element_t* current_element,
Expand Down Expand Up @@ -3007,6 +3106,8 @@ void increment_incoming_counters(map_element_t* current_element,
}
}

#endif

void system_counters_speed_thread_handler() {
while (true) {
// We recalculate it each second to avoid confusion
Expand Down

0 comments on commit dad1c7b

Please sign in to comment.