Skip to content
Merged
24 changes: 24 additions & 0 deletions lib/fluent/plugin/netflow_fields.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,30 @@ option:
95:
- 4
- :app_id
150:
- :uint32
- :flowStartSeconds
151:
- :uint32
- :flowEndSeconds
152:
- :uint64
- :flowStartMilliseconds
153:
- :uint64
- :flowEndMilliseconds
154:
- :uint64
- :flowStartMicroseconds
155:
- :uint64
- :flowEndMicroseconds
156:
- :uint64
- :flowStartNanoseconds
157:
- :uint64
- :flowEndNanoseconds
234:
- :uint32
- :ingress_vrf_id
Expand Down
67 changes: 63 additions & 4 deletions lib/fluent/plugin/parser_netflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ def format_for_switched(time)
time.utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ")
end

def format_for_flowSeconds(time)
time.utc.strftime("%Y-%m-%dT%H:%M:%S")
end

def format_for_flowMilliSeconds(time)
time.utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ")
end

def format_for_flowMicroSeconds(time)
time.utc.strftime("%Y-%m-%dT%H:%M:%S.%6NZ")
end

def format_for_flowNanoSeconds(time)
time.utc.strftime("%Y-%m-%dT%H:%M:%S.%9NZ")
end

NETFLOW_V5_HEADER_FORMAT = 'nnNNNNnn'
NETFLOW_V5_HEADER_BYTES = 24
NETFLOW_V5_RECORD_FORMAT = 'NNNnnNNNNnnnnnnnxx'
Expand Down Expand Up @@ -283,10 +299,53 @@ def handle_v9_flowset_data(host, pdu, flowset, block)

event['flowset_id'] = flowset.flowset_id

r.each_pair {|k,v| event[k.to_s] = v }
unless @switched_times_from_uptime
event['first_switched'] = format_for_switched(msec_from_boot_to_time(event['first_switched'], pdu.uptime, time, 0)) if event['first_switched']
event['last_switched'] = format_for_switched(msec_from_boot_to_time(event['last_switched'], pdu.uptime, time, 0)) if event['last_switched']
r.each_pair do |k, v|
case k
when :first_switched
unless @switched_times_from_uptime
event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

end
when :last_switched
unless @switched_times_from_uptime
event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0))
end
when :flowStartSeconds
event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0))
when :flowEndSeconds
event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0))
when :flowStartMilliseconds
divisor = 1_000
microseconds = (v.snapshot % 1_000) * 1_000
event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds))
when :flowEndMilliseconds
divisor = 1_000
microseconds = (v.snapshot % 1_000) * 1_000
event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds))
when :flowStartMicroseconds
divisor = 1_000_000
microseconds = (v.snapshot % 1_000_000)
event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds))
when :flowEndMicroseconds
divisor = 1_000_000
microseconds = (v.snapshot % 1_000_000)
event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds))
when :flowStartNanoseconds
divisor = 1_000_000_000
microseconds = (v.snapshot % 1_000_000_000) / 1_000
nanoseconds = v.snapshot % 1_000_000_000
time_with_nano = Time.at(v.snapshot / divisor, microseconds)
time_with_nano.nsec = nanoseconds
event[k.to_s] = format_for_flowNanoSeconds(time_with_nano)
when :flowEndNanoseconds
divisor = 1_000_000_000
microseconds = (v.snapshot % 1_000_000_000) / 1_000
nanoseconds = v.snapshot % 1_000_000_000
time_with_nano = Time.at(v.snapshot / divisor, microseconds)
time_with_nano.nsec = nanoseconds
event[k.to_s] = format_for_flowNanoSeconds(time_with_nano)
else
event[k.to_s] = v.snapshot
end
end

if sampler_id = r['flow_sampler_id']
Expand Down
Binary file added test/dump/netflow.v9.flowStartMilliseconds.dump
Binary file not shown.
Binary file not shown.
53 changes: 53 additions & 0 deletions test/test_parser_netflow9.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ def raw_template
@raw_template ||= File.read(File.expand_path('../dump/netflow.v9.template.dump', __FILE__))
end

def raw_flowStartMilliseconds_template
@raw_flowStartMilliseconds_template ||= File.read(File.expand_path('../dump/netflow.v9.template.flowStartMilliseconds.dump', __FILE__))
end

def raw_mpls_template
@raw_mpls_template ||= File.read(File.expand_path('../dump/netflow.v9.mpls-template.dump', __FILE__))
end
Expand All @@ -23,6 +27,10 @@ def raw_data
@raw_data ||= File.read(File.expand_path('../dump/netflow.v9.dump', __FILE__))
end

def raw_flowStartMilliseconds_data
@raw_flowStartMilliseconds_data ||= File.read(File.expand_path('../dump/netflow.v9.flowStartMilliseconds.dump', __FILE__))
end

def raw_mpls_data
@raw_mpls_data ||= File.read(File.expand_path('../dump/netflow.v9.mpls-data.dump', __FILE__))
end
Expand Down Expand Up @@ -95,6 +103,51 @@ def raw_2byte_as_template
assert_equal expected_record, parsed.first[1]
end

test 'parse netflow v9 binary data (flowStartMilliseconds)' do
parser = create_parser

parsed = []
parser.call raw_flowStartMilliseconds_template, DEFAULT_HOST
parser.call(raw_flowStartMilliseconds_data, DEFAULT_HOST) do |time, record|
parsed << [time, record]
end

assert_equal 1, parsed.size
assert_equal Time.parse('2016-02-12T04:02:25Z').to_i, parsed.first[0]
expected_record = {
# header
'version' => 9,
'flow_seq_num' => 4645895,
'flowset_id' => 261,

# flowset
'in_pkts' => 1,
'in_bytes' => 60,
'ipv4_src_addr' => '192.168.0.1',
'ipv4_dst_addr' => '192.168.0.2',
'input_snmp' => 54,
'output_snmp' => 29,
'flowEndMilliseconds' => '2016-02-12T04:02:09.053Z',
'flowStartMilliseconds' => '2016-02-12T04:02:09.053Z',
'l4_src_port' => 80,
'l4_dst_port' => 32822,
'src_as' => 0,
'dst_as' => 65000,
'bgp_ipv4_next_hop' => '192.168.0.3',
'src_mask' => 24,
'dst_mask' => 24,
'protocol' => 6,
'tcp_flags' => 0x12,
'src_tos' => 0x0,
'direction' => 0,
'forwarding_status' => 0b01000000,
'flow_sampler_id' => 1,
'ingress_vrf_id' => 1610612736,
'egress_vrf_id' => 1610612736
}
assert_equal expected_record, parsed.first[1]
end

test 'parse netflow v9 binary data after sampler data is cached' do
parser = create_parser

Expand Down