Skip to content

Commit

Permalink
Introduce sFLOW collector
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-odintsov committed Dec 1, 2014
1 parent 84a9b86 commit bbbd14d
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 144 deletions.
5 changes: 4 additions & 1 deletion CMakeLists.txt
Expand Up @@ -22,7 +22,7 @@ add_definitions(-DPF_RING)
add_library(libpatricia STATIC libpatricia/patricia.c)

# Our sFLOW plugin
add_library(sflow_plugin STATIC sflow_collector.cpp)
add_library(sflow_plugin STATIC sflow_plugin/sflow_collector.cpp)

# Main tool
add_executable(fastnetmon fastnetmon.cpp)
Expand Down Expand Up @@ -59,6 +59,9 @@ target_link_libraries(fastnetmon pcap)
# Our libs
target_link_libraries(fastnetmon libpatricia)

# Our plugins
target_link_libraries(fastnetmon sflow_plugin)

# Custom PF_RING
include_directories(${PFRING_INCLUDE_DIRS})
target_link_libraries(fastnetmon ${PFRING_LIBRARIES})
37 changes: 25 additions & 12 deletions fastnetmon.cpp
Expand Up @@ -22,6 +22,13 @@
#include <netdb.h>

#include "libpatricia/patricia.h"
#include "fastnetmon_types.h"

// #define sFLOW

#ifdef sFLOW
#include "sflow_plugin/sflow_collector.h"
#endif

#include <ncurses.h>

Expand Down Expand Up @@ -182,21 +189,10 @@ typedef struct {
total_counter_element total_counters[4];
total_counter_element total_speed_counters[4];


unsigned int incoming_total_flows_speed = 0;
unsigned int outgoing_total_flows_speed = 0;

// simplified packet struct for lightweight save into memory
struct simple_packet {
uint32_t src_ip;
uint32_t dst_ip;
uint16_t source_port;
uint16_t destination_port;
unsigned int protocol;
unsigned int length;
uint8_t flags; /* tcp flags */
struct timeval ts;
};

// structure with attack details
struct attack_details {
direction attack_direction;
Expand Down Expand Up @@ -1618,6 +1614,13 @@ void init_logging() {
logger.info("Logger initialized!");
}


#ifdef sFLOW
void sflow_collection_task() {
start_sflow_collection(process_packet);
}
#endif

int main(int argc,char **argv) {
lookup_tree = New_Patricia(32);
whitelist_tree = New_Patricia(32);
Expand Down Expand Up @@ -1712,7 +1715,13 @@ int main(int argc,char **argv) {
// Run banlist cleaner thread
boost::thread cleanup_ban_list_thread(cleanup_ban_list);

// pf_ring processing
boost::thread main_packet_process_thread(main_packet_process_task);

#ifdef sFLOW
// sFLOW
boost::thread sflow_process_collector_thread(sflow_collection_task);
#endif

// Init ncurses screen
initscr();
Expand Down Expand Up @@ -1743,6 +1752,10 @@ int main(int argc,char **argv) {
}
}

#ifdef sFLOW
sflow_process_collector_thread.join();
#endif

// wait threads
main_packet_process_thread.join();
recalculate_speed_thread.join();
Expand Down
File renamed without changes.
139 changes: 8 additions & 131 deletions sflow_collector.cpp → sflow_plugin/sflow_collector.cpp
Expand Up @@ -6,7 +6,7 @@
#include <vector>
#include <ostream>

using namespace std;
#include "sflow_collector.h"

// sflowtool-3.32
#include "sflow.h"
Expand All @@ -23,20 +23,6 @@ using namespace std;
#include <setjmp.h>
#include <stdlib.h>

// simplified packet struct for lightweight save into memory
struct simple_packet {
uint32_t src_ip;
uint32_t dst_ip;
uint16_t source_port;
uint16_t destination_port;
unsigned int protocol;
unsigned int length;
uint8_t flags; /* tcp flags */
struct timeval ts;
};

typedef void (*process_packet_pointer)(simple_packet&);

/* same for tcp */
struct mytcphdr {
uint16_t th_sport; /* source port */
Expand Down Expand Up @@ -250,17 +236,18 @@ void skipTLVRecord(SFSample *sample, uint32_t tag, uint32_t len);
void readFlowSample(SFSample *sample, int expanded);
void readFlowSample_header(SFSample *sample);
void decodeIPV4(SFSample *sample);
int start_sflow_collection();
void print_simple_packet(struct simple_packet& packet);

process_packet_pointer process_func_ptr = NULL;
int main() {
process_func_ptr = print_simple_packet;
//int main() {
//process_func_ptr = print_simple_packet;

start_sflow_collection();
}
// start_sflow_collection();
//}

void start_sflow_collection(process_packet_pointer func_ptr) {
process_func_ptr = func_ptr;

int start_sflow_collection() {
unsigned int udp_buffer_size = 65536;
char udp_buffer[udp_buffer_size];

Expand Down Expand Up @@ -306,116 +293,6 @@ int start_sflow_collection() {
}
}

// copy && paste
int extract_bit_value(uint8_t num, int bit) {
if (bit > 0 && bit <= 8) {
return ( (num >> (bit-1)) & 1 );
} else {
return 0;
}
}


// copy && paste
string print_tcp_flags(uint8_t flag_value) {
vector<string> all_flags;

if (extract_bit_value(flag_value, 1)) {
all_flags.push_back("fin");
}

if (extract_bit_value(flag_value, 2)) {
all_flags.push_back("syn");
}

if (extract_bit_value(flag_value, 3)) {
all_flags.push_back("rst");
}

if (extract_bit_value(flag_value, 4)) {
all_flags.push_back("psh");
}

if (extract_bit_value(flag_value, 5)) {
all_flags.push_back("ack");
}

if (extract_bit_value(flag_value, 6)) {
all_flags.push_back("urg");
}


ostringstream flags_as_string;

if (all_flags.empty()) {
return "-";
}

// concatenate all vector elements with comma
std::copy(all_flags.begin(), all_flags.end() - 1, std::ostream_iterator<string>(flags_as_string, ","));

// add last element
flags_as_string << all_flags.back();

return flags_as_string.str();
}

// copy and paste
string convert_ip_as_uint_to_string(uint32_t ip_as_integer) {
struct in_addr ip_addr;
ip_addr.s_addr = ip_as_integer;
return (string)inet_ntoa(ip_addr);
}

string convert_timeval_to_date(struct timeval tv) {
time_t nowtime = tv.tv_sec;
struct tm *nowtm = localtime(&nowtime);

char tmbuf[64];
char buf[64];

strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);

snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, tv.tv_usec);

return string(buf);
}


// copy and paste
void print_simple_packet(struct simple_packet& packet) {
std::stringstream buffer;

string proto_name;
switch (packet.protocol) {
case IPPROTO_TCP:
proto_name = "tcp";
break;
case IPPROTO_UDP:
proto_name = "udp";
break;
case IPPROTO_ICMP:
proto_name = "icmp";
break;
default:
proto_name = "unknown";
break;
}

buffer<<convert_timeval_to_date(packet.ts)<<" ";

buffer
<<convert_ip_as_uint_to_string(packet.src_ip)<<":"<<packet.source_port
<<" > "
<<convert_ip_as_uint_to_string(packet.dst_ip)<<":"<<packet.destination_port
<<" protocol: "<<proto_name
<<" flags: "<<print_tcp_flags(packet.flags)
<<" size: "<<packet.length<<" bytes"<<"\n";

std::cout<<buffer.str();
}


uint32_t getData32_nobswap(SFSample *sample) {
uint32_t ans = *(sample->datap)++;
// make sure we didn't run off the end of the datagram. Thanks to
Expand Down

0 comments on commit bbbd14d

Please sign in to comment.