diff --git a/lib/fluent/plugin/netflow_fields.yaml b/lib/fluent/plugin/netflow_fields.yaml index 492b82d..b4f8060 100644 --- a/lib/fluent/plugin/netflow_fields.yaml +++ b/lib/fluent/plugin/netflow_fields.yaml @@ -254,6 +254,30 @@ option: 95: - 4 - :app_id + 150: + - :uint32 + - :flowStartSeconds + 151: + - :uint32 + - :flowEndSeconds + 152: + - :uint64 + - :flowStartMilliseconds + 153: + - :uint64 + - :flowEndMilliseconds + 154: + - :uint64 + - :flowStartMicroseconds + 155: + - :uint64 + - :flowEndMicroseconds + 156: + - :uint64 + - :flowStartNanoseconds + 157: + - :uint64 + - :flowEndNanoseconds 234: - :uint32 - :ingress_vrf_id diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb index 17a5dad..8b58a44 100644 --- a/lib/fluent/plugin/parser_netflow.rb +++ b/lib/fluent/plugin/parser_netflow.rb @@ -82,6 +82,22 @@ def format_for_switched(time) time.utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ") end + def format_for_flowSeconds(time) + time.utc.strftime("%Y-%m-%dT%H:%M:%S") + end + + def format_for_flowMilliSeconds(time) + time.utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ") + end + + def format_for_flowMicroSeconds(time) + time.utc.strftime("%Y-%m-%dT%H:%M:%S.%6NZ") + end + + def format_for_flowNanoSeconds(time) + time.utc.strftime("%Y-%m-%dT%H:%M:%S.%9NZ") + end + NETFLOW_V5_HEADER_FORMAT = 'nnNNNNnn' NETFLOW_V5_HEADER_BYTES = 24 NETFLOW_V5_RECORD_FORMAT = 'NNNnnNNNNnnnnnnnxx' @@ -283,10 +299,53 @@ def handle_v9_flowset_data(host, pdu, flowset, block) event['flowset_id'] = flowset.flowset_id - r.each_pair {|k,v| event[k.to_s] = v } - unless @switched_times_from_uptime - event['first_switched'] = format_for_switched(msec_from_boot_to_time(event['first_switched'], pdu.uptime, time, 0)) if event['first_switched'] - event['last_switched'] = format_for_switched(msec_from_boot_to_time(event['last_switched'], pdu.uptime, time, 0)) if event['last_switched'] + r.each_pair do |k, v| + case k + when :first_switched + unless @switched_times_from_uptime + event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) + end + when :last_switched + unless @switched_times_from_uptime + event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) + end + when :flowStartSeconds + event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) + when :flowEndSeconds + event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) + when :flowStartMilliseconds + divisor = 1_000 + microseconds = (v.snapshot % 1_000) * 1_000 + event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) + when :flowEndMilliseconds + divisor = 1_000 + microseconds = (v.snapshot % 1_000) * 1_000 + event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) + when :flowStartMicroseconds + divisor = 1_000_000 + microseconds = (v.snapshot % 1_000_000) + event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) + when :flowEndMicroseconds + divisor = 1_000_000 + microseconds = (v.snapshot % 1_000_000) + event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) + when :flowStartNanoseconds + divisor = 1_000_000_000 + microseconds = (v.snapshot % 1_000_000_000) / 1_000 + nanoseconds = v.snapshot % 1_000_000_000 + time_with_nano = Time.at(v.snapshot / divisor, microseconds) + time_with_nano.nsec = nanoseconds + event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) + when :flowEndNanoseconds + divisor = 1_000_000_000 + microseconds = (v.snapshot % 1_000_000_000) / 1_000 + nanoseconds = v.snapshot % 1_000_000_000 + time_with_nano = Time.at(v.snapshot / divisor, microseconds) + time_with_nano.nsec = nanoseconds + event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) + else + event[k.to_s] = v.snapshot + end end if sampler_id = r['flow_sampler_id'] diff --git a/test/dump/netflow.v9.flowStartMilliseconds.dump b/test/dump/netflow.v9.flowStartMilliseconds.dump new file mode 100644 index 0000000..56822d2 Binary files /dev/null and b/test/dump/netflow.v9.flowStartMilliseconds.dump differ diff --git a/test/dump/netflow.v9.template.flowStartMilliseconds.dump b/test/dump/netflow.v9.template.flowStartMilliseconds.dump new file mode 100644 index 0000000..a2c4208 Binary files /dev/null and b/test/dump/netflow.v9.template.flowStartMilliseconds.dump differ diff --git a/test/test_parser_netflow9.rb b/test/test_parser_netflow9.rb index 571b181..97448f8 100644 --- a/test/test_parser_netflow9.rb +++ b/test/test_parser_netflow9.rb @@ -15,6 +15,10 @@ def raw_template @raw_template ||= File.read(File.expand_path('../dump/netflow.v9.template.dump', __FILE__)) end + def raw_flowStartMilliseconds_template + @raw_flowStartMilliseconds_template ||= File.read(File.expand_path('../dump/netflow.v9.template.flowStartMilliseconds.dump', __FILE__)) + end + def raw_mpls_template @raw_mpls_template ||= File.read(File.expand_path('../dump/netflow.v9.mpls-template.dump', __FILE__)) end @@ -23,6 +27,10 @@ def raw_data @raw_data ||= File.read(File.expand_path('../dump/netflow.v9.dump', __FILE__)) end + def raw_flowStartMilliseconds_data + @raw_flowStartMilliseconds_data ||= File.read(File.expand_path('../dump/netflow.v9.flowStartMilliseconds.dump', __FILE__)) + end + def raw_mpls_data @raw_mpls_data ||= File.read(File.expand_path('../dump/netflow.v9.mpls-data.dump', __FILE__)) end @@ -95,6 +103,51 @@ def raw_2byte_as_template assert_equal expected_record, parsed.first[1] end + test 'parse netflow v9 binary data (flowStartMilliseconds)' do + parser = create_parser + + parsed = [] + parser.call raw_flowStartMilliseconds_template, DEFAULT_HOST + parser.call(raw_flowStartMilliseconds_data, DEFAULT_HOST) do |time, record| + parsed << [time, record] + end + + assert_equal 1, parsed.size + assert_equal Time.parse('2016-02-12T04:02:25Z').to_i, parsed.first[0] + expected_record = { + # header + 'version' => 9, + 'flow_seq_num' => 4645895, + 'flowset_id' => 261, + + # flowset + 'in_pkts' => 1, + 'in_bytes' => 60, + 'ipv4_src_addr' => '192.168.0.1', + 'ipv4_dst_addr' => '192.168.0.2', + 'input_snmp' => 54, + 'output_snmp' => 29, + 'flowEndMilliseconds' => '2016-02-12T04:02:09.053Z', + 'flowStartMilliseconds' => '2016-02-12T04:02:09.053Z', + 'l4_src_port' => 80, + 'l4_dst_port' => 32822, + 'src_as' => 0, + 'dst_as' => 65000, + 'bgp_ipv4_next_hop' => '192.168.0.3', + 'src_mask' => 24, + 'dst_mask' => 24, + 'protocol' => 6, + 'tcp_flags' => 0x12, + 'src_tos' => 0x0, + 'direction' => 0, + 'forwarding_status' => 0b01000000, + 'flow_sampler_id' => 1, + 'ingress_vrf_id' => 1610612736, + 'egress_vrf_id' => 1610612736 + } + assert_equal expected_record, parsed.first[1] + end + test 'parse netflow v9 binary data after sampler data is cached' do parser = create_parser