fluent-plugin-splunk-hec

[Fluentd](https://fluentd.org/) output plugin to send events to [Splunk](https://www.splunk.com) over the HEC (HTTP Event Collector) API.

## Installation

### RubyGems

```
$ gem install fluent-plugin-splunk-hec
```

### Bundler

Add following line to your Gemfile:

```ruby
gem "fluent-plugin-splunk-hec"
```

And then execute:

```
$ bundle
```

## Configuration

* See also: [Output Plugin Overview](https://docs.fluentd.org/v1.0/articles/output-plugin-overview)

### protocol (enum) (optional)

Which protocol to use to call HEC api, "http" or "https", default "https".

Available values: http, https

Default value: `https`.

### hec_host (string) (required)

The hostname/IP of the Splunk instance which has HTTP input enabled, or a HEC load balancer.

### hec_port (integer) (optional)

The port number of the HTTP input, or the HEC load balancer.

Default value: `8088`.

### hec_token (string) (required)

The HEC token.

### index (string) (optional)

The Splunk index indexs events, by default it is not set, and will use what is configured in the HTTP input. Liquid template is supported.

### host (string) (optional)

Set the host field for events, by default it's the hostname of the machine that runnning fluentd. Liquid template is supported.

### source (string) (optional)

The source will be applied to the events, by default it uses the event's tag. Liquid template is supported.

### sourcetype (string) (optional)

The sourcetype will be applied to the events, by default it is not set, and leave it to Splunk to figure it out. Liquid template is supported.

### disable_template (bool) (optional)

Disable Liquid template support. Once disabled, it cannot use Liquid templates in the `host`, `index`, `source`, `sourcetype` fields.

### coerce_to_utf8 (bool) (optional)



Default value: `true`.

### non_utf8_replacement_string (string) (optional)



Default value: ` `.


### \ section (optional) (single)

#### client_cert (string) (optional)

The path to a file containing a PEM-format CA certificate for this client.

#### ca_file (string) (optional)

The path to a file containing a PEM-format CA certificate.

#### ca_path (string) (optional)

The path to a directory containing CA certificates in PEM format.

#### ciphers (array) (optional)

List of SSl ciphers allowed.

#### client_pkey (string) (optional)

The client's SSL private key.

#### insecure (bool) (optional)

If `insecure` is set to true, it will not verify the server's certificate. If `ca_file` or `ca_path` is set, `insecure` will be ignored.



### \ section (optional) (single)

#### @type (string) (required)



## Copyright

* Copyright(c) 2018- Gimi Liang @ Splunk Inc.
* License
  * Apache License, Version 2.0 To allow pushes either set the 'allowed_push_host' + # to allow pushing to a single host or delete this section to allow pushing to any host. + if spec.respond_to?(:metadata) + spec.metadata["allowed_push_host"] = "TODO: Set to 'http://mygemserver.com'" + else + raise "RubyGems 2.0 or newer is required to protect against " \ + "public gem pushes." + end + + spec.require_paths = ["lib"] + spec.test_files = Dir.glob('test/**/**.rb') + spec.files = %w[ + CODE_OF_CONDUCT.md README.md LICENSE.txt + fluent-plugin-splunk-hec.gemspec + Gemfile Gemfile.lock + Rakefile + ] + Dir.glob('lib/**/**').reject(&File.method(:directory?)) + + spec.required_ruby_version = '>= 2.3.0' + + spec.add_runtime_dependency "fluentd", "~> 1.0" + spec.add_runtime_dependency "net-http-persistent", "~> 3.0" + + spec.add_development_dependency "bundler", "~> 1.16" + spec.add_development_dependency "rake", "~> 10.0" + spec.add_development_dependency "minitest", "~> 5.0" + spec.add_development_dependency "webmock", "~> 3.2" +end diff --git a/lib/fluent/plugin/formatter_nil.rb b/lib/fluent/plugin/formatter_nil.rb new file mode 100644 index 0000000..560a1a0 --- /dev/null +++ b/lib/fluent/plugin/formatter_nil.rb @@ -0,0 +1,28 @@ +# +# Copyright 2018- Zhimin (Gimi) Liang @ Splunk Inc. (https://github.com/Gimi) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require "fluent/plugin/formatter" + +module Fluent + module Plugin + class NilFormatter < Fluent::Plugin::Formatter + Fluent::Plugin.register_formatter("nil", self) + + def format(tag, time, record) + '' + end + end + end +end diff --git a/lib/fluent/plugin/out_splunk_hec.rb b/lib/fluent/plugin/out_splunk_hec.rb new file mode 100644 index 0000000..778cc4c --- /dev/null +++ b/lib/fluent/plugin/out_splunk_hec.rb @@ -0,0 +1,246 @@ +require "fluent/plugin/output" + +require 'openssl' +require 'net/http/persistent' + +module Fluent::Plugin + class SplunkHecOutput < Fluent::Plugin::Output + Fluent::Plugin.register_output('splunk_hec', self) + + helpers :formatter + + autoload :VERSION, "fluent/plugin/out_splunk_hec/version" + + desc 'Which protocol to use to call HEC api, "http" or "https", default "https".' + config_param :protocol, :enum, list: %i[http https], default: :https + + desc 'The hostname/IP of the Splunk instance which has HTTP input enabled, or a HEC load balancer.' + config_param :hec_host, :string + + desc 'The port number of the HTTP input, or the HEC load balancer.' + config_param :hec_port, :integer, default: 8088 + + desc 'The HEC token.' + config_param :hec_token, :string + + desc 'SSL configurations.' + config_section :ssl, param_name: 'ssl', required: false, multi: false, init: true do + desc "The path to a file containing a PEM-format CA certificate for this client." + config_param :client_cert, :string, default: nil + + desc 'The path to a file containing a PEM-format CA certificate.' + config_param :ca_file, :string, default: nil + + desc 'The path to a directory containing CA certificates in PEM format.' + config_param :ca_path, :string, default: nil + + desc 'List of SSl ciphers allowed.' + config_param :ciphers, :array, default: nil + + desc "The client's SSL private key." + config_param :client_pkey, :string, default: nil + + desc "If `insecure` is set to true, it will not verify the server's certificate. If `ca_file` or `ca_path` is set, `insecure` will be ignored." + config_param :insecure, :bool, default: false + end + + desc 'The Splunk index indexs events, by default it is not set, and will use what is configured in the HTTP input. Liquid template is supported.' + config_param :index, :string, default: nil + + desc "Set the host field for events, by default it's the hostname of the machine that runnning fluentd. Liquid template is supported." + config_param :host, :string, default: nil + + desc "The source will be applied to the events, by default it uses the event's tag. Liquid template is supported." + config_param :source, :string, default: nil + + desc 'The sourcetype will be applied to the events, by default it is not set, and leave it to Splunk to figure it out. Liquid template is supported.' + config_param :sourcetype, :string, default: nil + + desc 'Disable Liquid template support. Once disabled, it cannot use Liquid templates in the `host`, `index`, `source`, `sourcetype` fields.' + config_param :disable_template, :bool, default: false + + # Whether to allow non-UTF-8 characters in user logs. If set to true, any + # non-UTF-8 character would be replaced by the string specified by + # 'non_utf8_replacement_string'. If set to false, any non-UTF-8 character + # would trigger the plugin to error out. + config_param :coerce_to_utf8, :bool, :default => true + + # If 'coerce_to_utf8' is set to true, any non-UTF-8 character would be + # replaced by the string specified here. + config_param :non_utf8_replacement_string, :string, :default => ' ' + + config_section :format do + # the format section defined in formatter plugin help requires init. + # just defined a useless formatter as a placeholder. + config_param :@type, :string, default: 'nil' + end + + def initialize + super + @default_host = Socket.gethostname + @chunk_queue = SizedQueue.new 1 + end + + def configure(conf) + super + prepare_templates + construct_api + + @formatter = formatter_create + @formatter = nil if @formatter.is_a?(::Fluent::Plugin::NilFormatter) + end + + def start + super + start_worker_threads + end + + def format(tag, time, record) + values = { + 'tag' => tag, + 'record' => record + } + event = @formatter ? @formatter.format(tag, time, record) : record + + { + host: @host ? @host.render(values) : @default_host, + source: @source ? @source.render(values) : tag, + event: convert_to_utf8(event), + time: time.to_i + }.tap { |payload| + payload.update sourcetype: @sourcetype.render(values) if @sourcetype + payload.update index: @index.render(values) if @index + }.to_json + end + + def try_write(chunk) + log.debug { "Received new chunk, size=#{chunk.read.bytesize}" } + @chunk_queue << chunk + end + + def stop + @chunk_queue.close + super + end + + def multi_workers_ready? + true + end + + private + + def prepare_templates + template_fields = %w[@index @host @source @sourcetype] + + if @disable_template + # provides `render` method when template is diabled, so that + # we can handle the fields in the same ways no matter if templating + # is enabled or not. + self_render = Module.new { + def render(*args) self end + } + template_fields.each { |field| + v = instance_variable_get field + v.extend self_render if v + } + else + require 'liquid' + template_fields.each { |field| + v = instance_variable_get field + instance_variable_set field, Liquid::Template.parse(v) if v + } + end + end + + def construct_api + @hec_api = URI("#{@protocol}://#{@hec_host}:#{@hec_port}/services/collector") + rescue + raise Fluent::ConfigError, "hec_host (#{@hec_host}) and/or hec_port (#{@hec_port}) are invalid." + end + + def start_worker_threads + thread_create :"hec_worker_#{@hec_api}" do + http = new_connection + while chunk = get_next_chunk + send_to_hec http, chunk + end + end + end + + def get_next_chunk + @chunk_queue.pop @chunk_queue.closed? + rescue ThreadError # see SizedQueue#pop doc + nil + end + + def new_connection + Net::HTTP::Persistent.new.tap do |c| + c.verify_mode = @ssl.insecure ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER + c.cert = OpenSSL::X509::Certificate.new File.read(@ssl.client_cert) if @ssl.client_cert + c.key = OpenSSL::PKey::RSA.new File.read(@ssl.client_pkey) if @ssl.client_pkey + c.ca_file = @ssl.ca_file + c.ca_path = @ssl.ca_path + c.ciphers = @ssl.ciphers + + c.override_headers['Content-Type'] = 'application/json' + c.override_headers['User-Agent'] = "fluent-plugin-splunk_hec_out/#{VERSION}" + c.override_headers['Authorization'] = "Splunk #{@hec_token}" + end + end + + def send_to_hec(http, chunk) + post = Net::HTTP::Post.new @hec_api.request_uri + post.body = chunk.read + log.debug { "Sending #{post.body.bytesize} bytes to Splunk." } + + log.trace { "POST #{@hec_api} body=#{post.body}" } + response = http.request @hec_api, post + log.debug { "[Response] POST #{@hec_api}: #{response.inspect}" } + + # raise Exception to utilize Fluentd output plugin retry machanism + raise "Server error for POST #{@hec_api}, response: #{response.body}" if response.code.start_with?('5') + + # For both success response (2xx) and client errors (4xx), we will consume the chunk. + # Because there probably a bug in the code if we get 4xx errors, retry won't do any good. + commit_write(chunk.unique_id) + log.error "Failed POST to #{@hec_api}, response: #{response.body}" if not response.code.start_with?('2') + end + + # Encode as UTF-8. If 'coerce_to_utf8' is set to true in the config, any + # non-UTF-8 character would be replaced by the string specified by + # 'non_utf8_replacement_string'. If 'coerce_to_utf8' is set to false, any + # non-UTF-8 character would trigger the plugin to error out. + # Thanks to + # https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/dbc28575/lib/fluent/plugin/out_google_cloud.rb#L1284 + def convert_to_utf8(input) + if input.is_a?(Hash) + record = {} + input.each do |key, value| + record[convert_to_utf8(key)] = convert_to_utf8(value) + end + + return record + end + return input.map { |value| convert_to_utf8(value) } if input.is_a?(Array) + return input unless input.respond_to?(:encode) + + if @coerce_to_utf8 + input.encode( + 'utf-8', + invalid: :replace, + undef: :replace, + replace: @non_utf8_replacement_string) + else + begin + input.encode('utf-8') + rescue EncodingError + log.error { 'Encountered encoding issues potentially due to non ' \ + 'UTF-8 characters. To allow non-UTF-8 characters and ' \ + 'replace them with spaces, please set "coerce_to_utf8" ' \ + 'to true.' } + raise + end + end + end + end +end diff --git a/lib/fluent/plugin/out_splunk_hec/version.rb b/lib/fluent/plugin/out_splunk_hec/version.rb new file mode 100644 index 0000000..f03c9d8 --- /dev/null +++ b/lib/fluent/plugin/out_splunk_hec/version.rb @@ -0,0 +1 @@ +Fluent::Plugin::SplunkHecOutput::VERSION = File.read(File.expand_path('../../../../VERSION', File.dirname(__FILE__))).chomp.strip diff --git a/test/fluent/plugin/out_splunk_hec_test.rb b/test/fluent/plugin/out_splunk_hec_test.rb new file mode 100644 index 0000000..78869fd --- /dev/null +++ b/test/fluent/plugin/out_splunk_hec_test.rb @@ -0,0 +1,128 @@ +require "test_helper" + +describe Fluent::Plugin::SplunkHecOutput do + include Fluent::Test::Helpers + include PluginTestHelper + + before { Fluent::Test.setup } # setup router and others + + it { expect(::Fluent::Plugin::SplunkHecOutput::VERSION).wont_be_nil } + + describe "hec_host validation" do + describe "invalid host" do + it "should require hec_host" do + expect{ create_output_driver }.must_raise Fluent::ConfigError + end + + it { expect{ create_output_driver('hec_host %bad-host%') }.must_raise Fluent::ConfigError } + end + + describe "good host" do + it { + expect(create_output_driver('hec_host splunk.com').instance.hec_host).must_equal "splunk.com" + } + end + end + + it "should send request to Splunk" do + req = verify_sent_events { |r| + expect(r.body.scan(/test message/).size).must_equal 2 + } + expect(req).must_be_requested times: 1 + end + + describe "source" do + it "should use event tags by default" do + verify_sent_events() { |r| + expect(r.body).must_match(/"source"\s*:\s*"tag.event1"/) + expect(r.body).must_match(/"source"\s*:\s*"tag.event2"/) + } + end + + describe "use liquid templates" do + it "can use tag" do + verify_sent_events(%q) { |r| + expect(r.body).must_match(/"source"\s*:\s*"tag-event1"/) + expect(r.body).must_match(/"source"\s*:\s*"tag-event2"/) + } + end + + it "can use record" do + verify_sent_events('source "{{ record.id }}"') { |r| + expect(r.body).must_match(/"source"\s*:\s*"1st"/) + expect(r.body).must_match(/"source"\s*:\s*"2nd"/) + } + end + end + end + + describe "host" do + it "should use host machine's hostname by default" do + verify_sent_events() { |r| + expect(r.body).must_match(/"host"\s*:\s*"#{Socket.gethostname}"/) + } + end + + it "should understand liquid tempaltes" do + verify_sent_events(%q) { |r| + expect(r.body).must_match(/"host"\s*:\s*"tag-event1"/) + expect(r.body).must_match(/"host"\s*:\s*"tag-event2"/) + } + end + end + + describe "sourcetype" do + it "should not be set by default" do + verify_sent_events() { |r| + expect(r.body).wont_match(/"sourcetype"\s*:\s*"/) + true # `wont_match` returns `false` which will make webmock think it fails + } + end + + it "should understand liquid tempaltes" do + verify_sent_events(%q) { |r| + expect(r.body).must_match(/"sourcetype"\s*:\s*"tag-event1"/) + expect(r.body).must_match(/"sourcetype"\s*:\s*"tag-event2"/) + } + end + end + + it "should be able to disable liquid tempalte" do + verify_sent_events(<<~CONF) { |r| + disable_template true + host "{{ host }}" + source "{{ source }}" + sourcetype "{{ sourcetype }}" + CONF + expect(r.body.scan(/"host"\s*:\s*"{{ host }}"/).size).must_equal 2 + expect(r.body.scan(/"source"\s*:\s*"{{ source }}"/).size).must_equal 2 + expect(r.body.scan(/"sourcetype"\s*:\s*"{{ sourcetype }}"/).size).must_equal 2 + } + end + + it "should support use a formatter" do + verify_sent_events(<<~CONF) { |r| + + @type single_value + message_key message + add_newline false + + CONF + expect(r.body.scan(/"event"\s*:\s*"test message"/).size).must_equal 2 + } + end + + def verify_sent_events(conf = '', &blk) + host = "hec.splunk.com" + d = create_output_driver("hec_host #{host}", conf) + + hec_req = stub_hec_request("https://#{host}:8088").with &blk + + d.run do + d.feed("tag.event1", event_time, {"message" => "test message", "id" => "1st"}) + d.feed("tag.event2", event_time, {"message" => "test message", "id" => "2nd"}) + end + + hec_req + end +end diff --git a/test/lib/webmock/README.md b/test/lib/webmock/README.md new file mode 100644 index 0000000..9b68eed --- /dev/null +++ b/test/lib/webmock/README.md @@ -0,0 +1,3 @@ +There are two reasons why we stub out all these webmock adapter: +* Requiring 'http' (by the http_rb_adapter) will trigger a circle require warning (http/client <-> http/connection) +* We only need mocking the standard library `net/http`, and we don't want to load a bunch of not used libraries. diff --git a/test/lib/webmock/http_lib_adapters/curb_adapter.rb b/test/lib/webmock/http_lib_adapters/curb_adapter.rb new file mode 100644 index 0000000..e69de29 diff --git a/test/lib/webmock/http_lib_adapters/em_http_request_adapter.rb b/test/lib/webmock/http_lib_adapters/em_http_request_adapter.rb new file mode 100644 index 0000000..e69de29 diff --git a/test/lib/webmock/http_lib_adapters/excon_adapter.rb b/test/lib/webmock/http_lib_adapters/excon_adapter.rb new file mode 100644 index 0000000..e69de29 diff --git a/test/lib/webmock/http_lib_adapters/http_rb_adapter.rb b/test/lib/webmock/http_lib_adapters/http_rb_adapter.rb new file mode 100644 index 0000000..e69de29 diff --git a/test/lib/webmock/http_lib_adapters/httpclient_adapter.rb b/test/lib/webmock/http_lib_adapters/httpclient_adapter.rb new file mode 100644 index 0000000..e69de29 diff --git a/test/lib/webmock/http_lib_adapters/manticore_adapter.rb b/test/lib/webmock/http_lib_adapters/manticore_adapter.rb new file mode 100644 index 0000000..e69de29 diff --git a/test/lib/webmock/http_lib_adapters/patron_adapter.rb b/test/lib/webmock/http_lib_adapters/patron_adapter.rb new file mode 100644 index 0000000..e69de29 diff --git a/test/lib/webmock/http_lib_adapters/typhoeus_hydra_adapter.rb b/test/lib/webmock/http_lib_adapters/typhoeus_hydra_adapter.rb new file mode 100644 index 0000000..e69de29 diff --git a/test/test_helper.rb b/test/test_helper.rb new file mode 100644 index 0000000..30d9fed --- /dev/null +++ b/test/test_helper.rb @@ -0,0 +1,38 @@ +$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__) +$LOAD_PATH.unshift File.expand_path("../lib", __FILE__) +require "fluent/plugin/out_splunk_hec" + +require "fluent/test" +require "fluent/test/driver/output" +require "fluent/test/helpers" +require "minitest/autorun" +require "webmock/minitest" + +# make assertions from webmock available in minitest/spec +module Minitest::Expectations + infect_an_assertion :assert_requested, :must_be_requested, :reverse + infect_an_assertion :assert_not_requested, :wont_be_requested, :reverse +end + +TEST_HEC_TOKEN = "some-token".freeze + +module PluginTestHelper + def fluentd_conf_for(*lines) + basic_config = [ + "hec_token #{TEST_HEC_TOKEN}" + ] + (basic_config + lines).join("\n") + end + + def create_output_driver(*configs) + Fluent::Test::Driver::Output.new(Fluent::Plugin::SplunkHecOutput).tap { |d| + d.configure(fluentd_conf_for(*configs)) + } + end + + def stub_hec_request(endpoint) + stub_request(:post, "#{endpoint}/services/collector"). + with(headers: {"Authorization" => "Splunk #{TEST_HEC_TOKEN}", "User-Agent" => "fluent-plugin-splunk_hec_out/#{Fluent::Plugin::SplunkHecOutput::VERSION}"}). + to_return(body: '{"text":"Success","code":0}') + end +end