From b3c80c7148401eb3c11cfbf27ddf65d5878b9860 Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Wed, 24 Aug 2016 09:09:04 +0900 Subject: [PATCH 01/13] add: fields flowStartSeconds flowEndSeconds flowStartMilliseconds flowEndMilliseconds flowStartMicroseconds flowEndMicroseconds flowStartNanoseconds flowEndNanoseconds Signed-off-by: Takashi Umeno --- lib/fluent/plugin/netflow_fields.yaml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/lib/fluent/plugin/netflow_fields.yaml b/lib/fluent/plugin/netflow_fields.yaml index 492b82d..b4f8060 100644 --- a/lib/fluent/plugin/netflow_fields.yaml +++ b/lib/fluent/plugin/netflow_fields.yaml @@ -254,6 +254,30 @@ option: 95: - 4 - :app_id + 150: + - :uint32 + - :flowStartSeconds + 151: + - :uint32 + - :flowEndSeconds + 152: + - :uint64 + - :flowStartMilliseconds + 153: + - :uint64 + - :flowEndMilliseconds + 154: + - :uint64 + - :flowStartMicroseconds + 155: + - :uint64 + - :flowEndMicroseconds + 156: + - :uint64 + - :flowStartNanoseconds + 157: + - :uint64 + - :flowEndNanoseconds 234: - :uint32 - :ingress_vrf_id From 70fbd8521a422c5e07c0a64a69234bb8b5de9852 Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Wed, 24 Aug 2016 11:44:12 +0900 Subject: [PATCH 02/13] remove: .to_i Signed-off-by: Takashi Umeno --- lib/fluent/plugin/parser_netflow.rb | 34 +++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb index 17a5dad..aedffc3 100644 --- a/lib/fluent/plugin/parser_netflow.rb +++ b/lib/fluent/plugin/parser_netflow.rb @@ -185,6 +185,7 @@ def forV5(payload, block) def handle_v9(host, pdu, block) pdu.records.each do |flowset| + $log.warn 'flowset', flowset_id: flowset.flowset_id case flowset.flowset_id when 0 handle_v9_flowset_template(host, pdu, flowset) @@ -289,6 +290,39 @@ def handle_v9_flowset_data(host, pdu, flowset, block) event['last_switched'] = format_for_switched(msec_from_boot_to_time(event['last_switched'], pdu.uptime, time, 0)) if event['last_switched'] end + r.each_pair do |k, v| + $log.warn 'k.to_s', k.to_s + case k.to_s + when /^flow(?:Start|End)Seconds$/ + # event[@target][k.to_s] = LogStash::Timestamp.at(v.snapshot).to_iso8601 + event[k.to_s] = format_for_switched(Time.at(v.snapshot, 0)) + when /^flow(?:Start|End)(Milli|Micro|Nano)seconds$/ + divisor = + case $1 + when 'Milli' + 1_000 + when 'Micro' + 1_000_000 + when 'Nano' + 1_000_000_000 + end + microseconds = + case $1 + when 'Milli' + (v.snapshot % 1_000) * 1_000 + when 'Micro' + (v.snapshot % 1_000_000) + when 'Nano' + (v.snapshot % 1_000_000_000) / 1_000 + end + + # event[@target][k.to_s] = LogStash::Timestamp.at(v.snapshot.to_f / divisor).to_iso8601 + event[k.to_s] = format_for_switched(Time.at(v.snapshot / divisor, microseconds)) + else + event[k.to_s] = v.snapshot + end + end + if sampler_id = r['flow_sampler_id'] sampler_key = "#{host}|#{pdu.source_id}|#{sampler_id}" if sampler = @samplers_v9[sampler_key] From def7a8c7b326a80d80169979d031ab6f594d9c1b Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Wed, 24 Aug 2016 22:30:38 +0900 Subject: [PATCH 03/13] add: function format_for_flowSeconds(time) format_for_flowMilliSeconds(time) format_for_flowMicroSeconds(time) format_for_flowNanoSeconds(time) Signed-off-by: Takashi Umeno --- lib/fluent/plugin/parser_netflow.rb | 34 ++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb index aedffc3..3c6a710 100644 --- a/lib/fluent/plugin/parser_netflow.rb +++ b/lib/fluent/plugin/parser_netflow.rb @@ -82,6 +82,22 @@ def format_for_switched(time) time.utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ") end + def format_for_flowSeconds(time) + time.utc.strftime("%Y-%m-%dT%H:%M:%S") + end + + def format_for_flowMilliSeconds(time) + time.utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ") + end + + def format_for_flowMicroSeconds(time) + time.utc.strftime("%Y-%m-%dT%H:%M:%S.%6NZ") + end + + def format_for_flowNanoSeconds(time) + time.utc.strftime("%Y-%m-%dT%H:%M:%S.%9NZ") + end + NETFLOW_V5_HEADER_FORMAT = 'nnNNNNnn' NETFLOW_V5_HEADER_BYTES = 24 NETFLOW_V5_RECORD_FORMAT = 'NNNnnNNNNnnnnnnnxx' @@ -291,11 +307,9 @@ def handle_v9_flowset_data(host, pdu, flowset, block) end r.each_pair do |k, v| - $log.warn 'k.to_s', k.to_s case k.to_s when /^flow(?:Start|End)Seconds$/ - # event[@target][k.to_s] = LogStash::Timestamp.at(v.snapshot).to_iso8601 - event[k.to_s] = format_for_switched(Time.at(v.snapshot, 0)) + event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) when /^flow(?:Start|End)(Milli|Micro|Nano)seconds$/ divisor = case $1 @@ -316,8 +330,18 @@ def handle_v9_flowset_data(host, pdu, flowset, block) (v.snapshot % 1_000_000_000) / 1_000 end - # event[@target][k.to_s] = LogStash::Timestamp.at(v.snapshot.to_f / divisor).to_iso8601 - event[k.to_s] = format_for_switched(Time.at(v.snapshot / divisor, microseconds)) + case $1 + when 'Milli' + event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) + when 'Micro' + event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) + when 'Nano' + nanoseconds = v.snapshot % 1_000_000_000 + time_with_nano = Time.at(v.snapshot / divisor, microseconds) + time_with_nano.nsec = nanoseconds + event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) + end + else event[k.to_s] = v.snapshot end From a4d9478680458ffd42db94f1ce756ff23cc50d56 Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Wed, 24 Aug 2016 22:46:00 +0900 Subject: [PATCH 04/13] remove: $log.warn 'flowset', flowset_id: flowset.flowset_id Signed-off-by: Takashi Umeno --- lib/fluent/plugin/parser_netflow.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb index 3c6a710..375491a 100644 --- a/lib/fluent/plugin/parser_netflow.rb +++ b/lib/fluent/plugin/parser_netflow.rb @@ -201,7 +201,6 @@ def forV5(payload, block) def handle_v9(host, pdu, block) pdu.records.each do |flowset| - $log.warn 'flowset', flowset_id: flowset.flowset_id case flowset.flowset_id when 0 handle_v9_flowset_template(host, pdu, flowset) From 6a1c3c563b02dd1482ae6cce0dde566c85f618dd Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Thu, 25 Aug 2016 18:12:05 +0900 Subject: [PATCH 05/13] repair: first_switched and last_switched Signed-off-by: Takashi Umeno --- lib/fluent/plugin/parser_netflow.rb | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb index 375491a..7f9a1b8 100644 --- a/lib/fluent/plugin/parser_netflow.rb +++ b/lib/fluent/plugin/parser_netflow.rb @@ -299,14 +299,17 @@ def handle_v9_flowset_data(host, pdu, flowset, block) event['flowset_id'] = flowset.flowset_id - r.each_pair {|k,v| event[k.to_s] = v } - unless @switched_times_from_uptime - event['first_switched'] = format_for_switched(msec_from_boot_to_time(event['first_switched'], pdu.uptime, time, 0)) if event['first_switched'] - event['last_switched'] = format_for_switched(msec_from_boot_to_time(event['last_switched'], pdu.uptime, time, 0)) if event['last_switched'] - end - r.each_pair do |k, v| case k.to_s + when /^first_switched$/ + unless @switched_times_from_uptime + event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) + end + when /^last_switched$/ + unless @switched_times_from_uptime + event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) + end + when /^flow(?:Start|End)Seconds$/ event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) when /^flow(?:Start|End)(Milli|Micro|Nano)seconds$/ From 7bdb0fa906ee9a4377236efb9d5260b931444e5c Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Thu, 25 Aug 2016 20:23:23 +0900 Subject: [PATCH 06/13] change: when /^(?:first|last)_switched$/ Signed-off-by: Takashi Umeno --- lib/fluent/plugin/parser_netflow.rb | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb index 7f9a1b8..28d4a06 100644 --- a/lib/fluent/plugin/parser_netflow.rb +++ b/lib/fluent/plugin/parser_netflow.rb @@ -301,14 +301,10 @@ def handle_v9_flowset_data(host, pdu, flowset, block) r.each_pair do |k, v| case k.to_s - when /^first_switched$/ + when /^(?:first|last)_switched$/ unless @switched_times_from_uptime event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) end - when /^last_switched$/ - unless @switched_times_from_uptime - event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) - end when /^flow(?:Start|End)Seconds$/ event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) From 0d855cddf25b06ccaf7cc1b3e9a18c1b31271e32 Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Sat, 3 Sep 2016 11:38:07 +0900 Subject: [PATCH 07/13] change: rewrite with string Signed-off-by: Takashi Umeno --- lib/fluent/plugin/parser_netflow.rb | 73 +++++++++++++++-------------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb index 28d4a06..b0f2e27 100644 --- a/lib/fluent/plugin/parser_netflow.rb +++ b/lib/fluent/plugin/parser_netflow.rb @@ -301,45 +301,48 @@ def handle_v9_flowset_data(host, pdu, flowset, block) r.each_pair do |k, v| case k.to_s - when /^(?:first|last)_switched$/ + when 'first_switched'.freeze unless @switched_times_from_uptime event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) end - - when /^flow(?:Start|End)Seconds$/ - event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) - when /^flow(?:Start|End)(Milli|Micro|Nano)seconds$/ - divisor = - case $1 - when 'Milli' - 1_000 - when 'Micro' - 1_000_000 - when 'Nano' - 1_000_000_000 - end - microseconds = - case $1 - when 'Milli' - (v.snapshot % 1_000) * 1_000 - when 'Micro' - (v.snapshot % 1_000_000) - when 'Nano' - (v.snapshot % 1_000_000_000) / 1_000 - end - - case $1 - when 'Milli' - event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) - when 'Micro' - event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) - when 'Nano' - nanoseconds = v.snapshot % 1_000_000_000 - time_with_nano = Time.at(v.snapshot / divisor, microseconds) - time_with_nano.nsec = nanoseconds - event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) + when 'last_switched'.freeze + unless @switched_times_from_uptime + event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) end - + when 'flowStartSeconds'.freeze + event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) + when 'flowEndSeconds'.freeze + event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) + when 'flowStartMilliseconds'.freeze + divisor = 1_000 + microseconds = (v.snapshot % 1_000) * 1_000 + event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) + when 'flowEndMilliseconds'.freeze + divisor = 1_000 + microseconds = (v.snapshot % 1_000) * 1_000 + event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) + when 'flowStartMicroseconds'.freeze + divisor = 1_000_000 + microseconds = (v.snapshot % 1_000_000) + event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) + when 'flowEndMicroseconds'.freeze + divisor = 1_000_000 + microseconds = (v.snapshot % 1_000_000) + event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) + when 'flowStartNanoseconds'.freeze + divisor = 1_000_000_000 + microseconds = (v.snapshot % 1_000_000_000) / 1_000 + nanoseconds = v.snapshot % 1_000_000_000 + time_with_nano = Time.at(v.snapshot / divisor, microseconds) + time_with_nano.nsec = nanoseconds + event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) + when 'flowEndNanoseconds'.freeze + divisor = 1_000_000_000 + microseconds = (v.snapshot % 1_000_000_000) / 1_000 + nanoseconds = v.snapshot % 1_000_000_000 + time_with_nano = Time.at(v.snapshot / divisor, microseconds) + time_with_nano.nsec = nanoseconds + event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) else event[k.to_s] = v.snapshot end From fee137fc866eee3bb625e7c1a48a1471958c5194 Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Thu, 6 Oct 2016 23:28:42 +0900 Subject: [PATCH 08/13] change: from k.to_s to key. --- lib/fluent/plugin/parser_netflow.rb | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb index b0f2e27..8e21476 100644 --- a/lib/fluent/plugin/parser_netflow.rb +++ b/lib/fluent/plugin/parser_netflow.rb @@ -300,51 +300,52 @@ def handle_v9_flowset_data(host, pdu, flowset, block) event['flowset_id'] = flowset.flowset_id r.each_pair do |k, v| - case k.to_s + key = k.to_s + case key when 'first_switched'.freeze unless @switched_times_from_uptime - event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) + event[key] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) end when 'last_switched'.freeze unless @switched_times_from_uptime - event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) + event[key] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) end when 'flowStartSeconds'.freeze - event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) + event[key] = format_for_flowSeconds(Time.at(v.snapshot, 0)) when 'flowEndSeconds'.freeze - event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) + event[key] = format_for_flowSeconds(Time.at(v.snapshot, 0)) when 'flowStartMilliseconds'.freeze divisor = 1_000 microseconds = (v.snapshot % 1_000) * 1_000 - event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) + event[key] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) when 'flowEndMilliseconds'.freeze divisor = 1_000 microseconds = (v.snapshot % 1_000) * 1_000 - event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) + event[key] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) when 'flowStartMicroseconds'.freeze divisor = 1_000_000 microseconds = (v.snapshot % 1_000_000) - event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) + event[key] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) when 'flowEndMicroseconds'.freeze divisor = 1_000_000 microseconds = (v.snapshot % 1_000_000) - event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) + event[key] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) when 'flowStartNanoseconds'.freeze divisor = 1_000_000_000 microseconds = (v.snapshot % 1_000_000_000) / 1_000 nanoseconds = v.snapshot % 1_000_000_000 time_with_nano = Time.at(v.snapshot / divisor, microseconds) time_with_nano.nsec = nanoseconds - event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) + event[key] = format_for_flowNanoSeconds(time_with_nano) when 'flowEndNanoseconds'.freeze divisor = 1_000_000_000 microseconds = (v.snapshot % 1_000_000_000) / 1_000 nanoseconds = v.snapshot % 1_000_000_000 time_with_nano = Time.at(v.snapshot / divisor, microseconds) time_with_nano.nsec = nanoseconds - event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) + event[key] = format_for_flowNanoSeconds(time_with_nano) else - event[k.to_s] = v.snapshot + event[key] = v.snapshot end end From 3f274e74f273fe2bfe6b26dec5abb203a4dca753 Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Fri, 7 Oct 2016 11:08:06 +0900 Subject: [PATCH 09/13] undo: use k.to_s --- lib/fluent/plugin/parser_netflow.rb | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb index 8e21476..b0f2e27 100644 --- a/lib/fluent/plugin/parser_netflow.rb +++ b/lib/fluent/plugin/parser_netflow.rb @@ -300,52 +300,51 @@ def handle_v9_flowset_data(host, pdu, flowset, block) event['flowset_id'] = flowset.flowset_id r.each_pair do |k, v| - key = k.to_s - case key + case k.to_s when 'first_switched'.freeze unless @switched_times_from_uptime - event[key] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) + event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) end when 'last_switched'.freeze unless @switched_times_from_uptime - event[key] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) + event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) end when 'flowStartSeconds'.freeze - event[key] = format_for_flowSeconds(Time.at(v.snapshot, 0)) + event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) when 'flowEndSeconds'.freeze - event[key] = format_for_flowSeconds(Time.at(v.snapshot, 0)) + event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) when 'flowStartMilliseconds'.freeze divisor = 1_000 microseconds = (v.snapshot % 1_000) * 1_000 - event[key] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) + event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) when 'flowEndMilliseconds'.freeze divisor = 1_000 microseconds = (v.snapshot % 1_000) * 1_000 - event[key] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) + event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) when 'flowStartMicroseconds'.freeze divisor = 1_000_000 microseconds = (v.snapshot % 1_000_000) - event[key] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) + event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) when 'flowEndMicroseconds'.freeze divisor = 1_000_000 microseconds = (v.snapshot % 1_000_000) - event[key] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) + event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) when 'flowStartNanoseconds'.freeze divisor = 1_000_000_000 microseconds = (v.snapshot % 1_000_000_000) / 1_000 nanoseconds = v.snapshot % 1_000_000_000 time_with_nano = Time.at(v.snapshot / divisor, microseconds) time_with_nano.nsec = nanoseconds - event[key] = format_for_flowNanoSeconds(time_with_nano) + event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) when 'flowEndNanoseconds'.freeze divisor = 1_000_000_000 microseconds = (v.snapshot % 1_000_000_000) / 1_000 nanoseconds = v.snapshot % 1_000_000_000 time_with_nano = Time.at(v.snapshot / divisor, microseconds) time_with_nano.nsec = nanoseconds - event[key] = format_for_flowNanoSeconds(time_with_nano) + event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) else - event[key] = v.snapshot + event[k.to_s] = v.snapshot end end From 050179fc3d89f49c86acc389824a01c933d02700 Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Sat, 29 Oct 2016 18:06:57 +0900 Subject: [PATCH 10/13] change: using symbol for case and use k.to_s in [] --- lib/fluent/plugin/parser_netflow.rb | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/fluent/plugin/parser_netflow.rb b/lib/fluent/plugin/parser_netflow.rb index b0f2e27..8b58a44 100644 --- a/lib/fluent/plugin/parser_netflow.rb +++ b/lib/fluent/plugin/parser_netflow.rb @@ -300,43 +300,43 @@ def handle_v9_flowset_data(host, pdu, flowset, block) event['flowset_id'] = flowset.flowset_id r.each_pair do |k, v| - case k.to_s - when 'first_switched'.freeze + case k + when :first_switched unless @switched_times_from_uptime event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) end - when 'last_switched'.freeze + when :last_switched unless @switched_times_from_uptime event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0)) end - when 'flowStartSeconds'.freeze + when :flowStartSeconds event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) - when 'flowEndSeconds'.freeze + when :flowEndSeconds event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0)) - when 'flowStartMilliseconds'.freeze + when :flowStartMilliseconds divisor = 1_000 microseconds = (v.snapshot % 1_000) * 1_000 event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) - when 'flowEndMilliseconds'.freeze + when :flowEndMilliseconds divisor = 1_000 microseconds = (v.snapshot % 1_000) * 1_000 event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds)) - when 'flowStartMicroseconds'.freeze + when :flowStartMicroseconds divisor = 1_000_000 microseconds = (v.snapshot % 1_000_000) event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) - when 'flowEndMicroseconds'.freeze + when :flowEndMicroseconds divisor = 1_000_000 microseconds = (v.snapshot % 1_000_000) event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds)) - when 'flowStartNanoseconds'.freeze + when :flowStartNanoseconds divisor = 1_000_000_000 microseconds = (v.snapshot % 1_000_000_000) / 1_000 nanoseconds = v.snapshot % 1_000_000_000 time_with_nano = Time.at(v.snapshot / divisor, microseconds) time_with_nano.nsec = nanoseconds event[k.to_s] = format_for_flowNanoSeconds(time_with_nano) - when 'flowEndNanoseconds'.freeze + when :flowEndNanoseconds divisor = 1_000_000_000 microseconds = (v.snapshot % 1_000_000_000) / 1_000 nanoseconds = v.snapshot % 1_000_000_000 From 2c31d491210c68606cf5390b4958b195a406ac9e Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Sun, 6 Nov 2016 18:23:30 +0900 Subject: [PATCH 11/13] Add tests for netflow v9 (flowStartMilliseconds) --- test/dump/netflow.v9.flowStartMilliseconds.dump | Bin 0 -> 100 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 test/dump/netflow.v9.flowStartMilliseconds.dump diff --git a/test/dump/netflow.v9.flowStartMilliseconds.dump b/test/dump/netflow.v9.flowStartMilliseconds.dump new file mode 100644 index 0000000000000000000000000000000000000000..56822d270d0a1849dd913b9534836180e490837d GIT binary patch literal 100 zcmZSJWMKT;FhM13Z)6~Y+Y@#M1`b6=R)zou1_nkTwmGnZff0zAfNV1$mIVq2U0$*b fK?gK|#DU=N3!pk?2?;hK1_lQP#sr`c2m=8CJUtV| literal 0 HcmV?d00001 From f166773a0acfbaa0b9d8a64184bf6571ee19d503 Mon Sep 17 00:00:00 2001 From: Takashi Umeno Date: Sun, 6 Nov 2016 18:43:19 +0900 Subject: [PATCH 12/13] Add tests for netflow v9 (flowStartMilliseconds) --- ...netflow.v9.template.flowStartMilliseconds.dump | Bin 0 -> 120 bytes 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 test/dump/netflow.v9.template.flowStartMilliseconds.dump diff --git a/test/dump/netflow.v9.template.flowStartMilliseconds.dump b/test/dump/netflow.v9.template.flowStartMilliseconds.dump new file mode 100644 index 0000000000000000000000000000000000000000..a2c4208907f02536375224167a00c1b3a9341dce GIT binary patch literal 120 zcmW;EI}Sil6olcq!gW1^Mr{W+pw(+6dg~A?P_r1jQLzSzKZ-9WnamtTGVi)A<2eoL uFDto5(rg0N^5o0FAw`TF2g#0*MuLK@L Date: Sun, 6 Nov 2016 19:36:22 +0900 Subject: [PATCH 13/13] Add tests for netflow v9 (flowStartMilliseconds) --- test/test_parser_netflow9.rb | 53 ++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/test/test_parser_netflow9.rb b/test/test_parser_netflow9.rb index 571b181..97448f8 100644 --- a/test/test_parser_netflow9.rb +++ b/test/test_parser_netflow9.rb @@ -15,6 +15,10 @@ def raw_template @raw_template ||= File.read(File.expand_path('../dump/netflow.v9.template.dump', __FILE__)) end + def raw_flowStartMilliseconds_template + @raw_flowStartMilliseconds_template ||= File.read(File.expand_path('../dump/netflow.v9.template.flowStartMilliseconds.dump', __FILE__)) + end + def raw_mpls_template @raw_mpls_template ||= File.read(File.expand_path('../dump/netflow.v9.mpls-template.dump', __FILE__)) end @@ -23,6 +27,10 @@ def raw_data @raw_data ||= File.read(File.expand_path('../dump/netflow.v9.dump', __FILE__)) end + def raw_flowStartMilliseconds_data + @raw_flowStartMilliseconds_data ||= File.read(File.expand_path('../dump/netflow.v9.flowStartMilliseconds.dump', __FILE__)) + end + def raw_mpls_data @raw_mpls_data ||= File.read(File.expand_path('../dump/netflow.v9.mpls-data.dump', __FILE__)) end @@ -95,6 +103,51 @@ def raw_2byte_as_template assert_equal expected_record, parsed.first[1] end + test 'parse netflow v9 binary data (flowStartMilliseconds)' do + parser = create_parser + + parsed = [] + parser.call raw_flowStartMilliseconds_template, DEFAULT_HOST + parser.call(raw_flowStartMilliseconds_data, DEFAULT_HOST) do |time, record| + parsed << [time, record] + end + + assert_equal 1, parsed.size + assert_equal Time.parse('2016-02-12T04:02:25Z').to_i, parsed.first[0] + expected_record = { + # header + 'version' => 9, + 'flow_seq_num' => 4645895, + 'flowset_id' => 261, + + # flowset + 'in_pkts' => 1, + 'in_bytes' => 60, + 'ipv4_src_addr' => '192.168.0.1', + 'ipv4_dst_addr' => '192.168.0.2', + 'input_snmp' => 54, + 'output_snmp' => 29, + 'flowEndMilliseconds' => '2016-02-12T04:02:09.053Z', + 'flowStartMilliseconds' => '2016-02-12T04:02:09.053Z', + 'l4_src_port' => 80, + 'l4_dst_port' => 32822, + 'src_as' => 0, + 'dst_as' => 65000, + 'bgp_ipv4_next_hop' => '192.168.0.3', + 'src_mask' => 24, + 'dst_mask' => 24, + 'protocol' => 6, + 'tcp_flags' => 0x12, + 'src_tos' => 0x0, + 'direction' => 0, + 'forwarding_status' => 0b01000000, + 'flow_sampler_id' => 1, + 'ingress_vrf_id' => 1610612736, + 'egress_vrf_id' => 1610612736 + } + assert_equal expected_record, parsed.first[1] + end + test 'parse netflow v9 binary data after sampler data is cached' do parser = create_parser