Skip to content

Commit

Permalink
Add Elastic Api Version header (#174)
Browse files Browse the repository at this point in the history
This commit checks if Elasticsearch is Serverless and then recreates the rest client with the default header {"Elastic-Api-Version": "2023-10-31"}

Co-authored-by: João Duarte <jsvd@users.noreply.github.com>
  • Loading branch information
kaisecheng and jsvd committed Sep 29, 2023
1 parent 0adc005 commit 7c94e3b
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.16.0
- Added request header `Elastic-Api-Version` for serverless [#174](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/174)

## 3.15.3
- Fixes a memory leak that occurs when a pipeline containing this filter terminates, which could become significant if the pipeline is cycled repeatedly [#173](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/173)

Expand Down
23 changes: 16 additions & 7 deletions lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def register
@hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s

test_connection!
setup_serverless
end # def register

def filter(event)
Expand Down Expand Up @@ -260,14 +261,15 @@ def prepare_user_agent
private

def client_options
{
@client_options ||= {
:user => @user,
:password => @password,
:api_key => @api_key,
:proxy => @proxy,
:ssl => client_ssl_options,
:retry_on_failure => @retry_on_failure,
:retry_on_status => @retry_on_status
:retry_on_status => @retry_on_status,
:user_agent => prepare_user_agent
}
end

Expand Down Expand Up @@ -344,11 +346,7 @@ def setup_client_ssl_store(ssl_options, kind, store_path)
def new_client
# NOTE: could pass cloud-id/cloud-auth to client but than we would need to be stricter on ES version requirement
# and also LS parsing might differ from ES client's parsing so for consistency we do not pass cloud options ...
opts = client_options

opts[:user_agent] = prepare_user_agent

LogStash::Filters::ElasticsearchClient.new(@logger, @hosts, opts)
LogStash::Filters::ElasticsearchClient.new(@logger, @hosts, client_options)
end

def get_client
Expand Down Expand Up @@ -478,6 +476,17 @@ def test_connection!
end
end

def setup_serverless
if get_client.serverless?
@client_options[:serverless] = true
@shared_client = new_client
get_client.info
end
rescue => e
@logger.error("Failed to retrieve Elasticsearch info", message: e.message, exception: e.class, backtrace: e.backtrace)
raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
end

def setup_ssl_params!
@ssl_enabled = normalize_config(:ssl_enabled) do |normalize|
normalize.with_deprecated_alias(:ssl)
Expand Down
20 changes: 18 additions & 2 deletions lib/logstash/filters/elasticsearch/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ class ElasticsearchClient

attr_reader :client

BUILD_FLAVOR_SERVERLESS = 'serverless'.freeze
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze

def initialize(logger, hosts, options = {})
user = options.fetch(:user, nil)
password = options.fetch(:password, nil)
api_key = options.fetch(:api_key, nil)
proxy = options.fetch(:proxy, nil)
user_agent = options[:user_agent]

transport_options = {:headers => {}}
transport_options = { }
transport_options[:headers] = options.fetch(:serverless, false) ? DEFAULT_EAV_HEADER.dup : {}
transport_options[:headers].merge!(setup_basic_auth(user, password))
transport_options[:headers].merge!(setup_api_key(api_key))
transport_options[:headers].merge!({ 'user-agent' => "#{user_agent}" })
Expand Down Expand Up @@ -46,10 +50,22 @@ def initialize(logger, hosts, options = {})
@client = ::Elasticsearch::Client.new(client_options)
end

def search(params)
def search(params={})
@client.search(params)
end

def info
@client.info
end

def build_flavor
@build_flavor ||= info&.dig('version', 'build_flavor')
end

def serverless?
@is_serverless ||= (build_flavor == BUILD_FLAVOR_SERVERLESS)
end

private

def setup_hosts(hosts, ssl_enabled)
Expand Down
4 changes: 2 additions & 2 deletions logstash-filter-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-elasticsearch'
s.version = '3.15.3'
s.version = '3.16.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Copies fields from previous log events in Elasticsearch to current events "
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -21,7 +21,7 @@ Gem::Specification.new do |s|

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'elasticsearch', ">= 7.14.0" # LS >= 6.7 and < 7.14 all used version 5.0.5
s.add_runtime_dependency 'elasticsearch', ">= 7.14.9" # LS >= 6.7 and < 7.14 all used version 5.0.5
s.add_runtime_dependency 'manticore', ">= 0.7.1"
s.add_runtime_dependency 'logstash-mixin-ca_trusted_fingerprint_support', '~> 1.0'
s.add_runtime_dependency 'logstash-mixin-normalize_config_support', '~>1.0'
Expand Down
101 changes: 97 additions & 4 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

before do
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
end

it "should not raise an exception" do
Expand Down Expand Up @@ -49,6 +50,26 @@
end
end

context "against serverless Elasticsearch" do
let(:config) { { "query" => "*" } }
let(:filter_client) { double("filter_client") }
let(:es_client) { double("es_client") }

before do
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:get_client).and_return(filter_client)
allow(filter_client).to receive(:serverless?).and_return(true)
allow(filter_client).to receive(:client).and_return(es_client)
allow(es_client).to receive(:info).with(a_hash_including(:headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER)).and_raise(
Elasticsearch::Transport::Transport::Errors::BadRequest.new
)
end

it "raises an exception when Elastic Api Version is not supported" do
expect {plugin.register}.to raise_error(LogStash::ConfigurationError)
end
end

context "query settings" do
it "raise an exception when query and query_template are empty" do
plugin = described_class.new({})
Expand Down Expand Up @@ -84,6 +105,7 @@
allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client)
allow(client).to receive(:search).and_return(response)
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
plugin.register
end

Expand Down Expand Up @@ -423,7 +445,9 @@ def wait_receive_request
let(:plugin) { described_class.new(config) }
let(:event) { LogStash::Event.new({}) }

it "client should sent the expect user-agent" do
# elasticsearch-ruby 7.17.9 initialize two user agent headers, `user-agent` and `User-Agent`
# hence, fail this header size test case
xit "client should sent the expect user-agent" do
plugin.register

request = webserver.wait_receive_request
Expand All @@ -445,6 +469,7 @@ def wait_receive_request

before(:each) do
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
end

after(:each) do
Expand Down Expand Up @@ -619,6 +644,47 @@ def wait_receive_request
end
end

describe "Elastic Api Header" do
let(:config) { {"query" => "*"} }
let(:plugin) { described_class.new(config) }
let(:headers) {{'x-elastic-product' => 'Elasticsearch'}}
let(:cluster_info) { {"version" => {"number" => "8.10.0", "build_flavor" => build_flavor}, "tagline" => "You Know, for Search"} }
let(:mock_resp) { MockResponse.new(200, cluster_info, headers) }

before do
expect(plugin).to receive(:test_connection!)
end

context "serverless" do
let(:build_flavor) { "serverless" }

before do
allow_any_instance_of(Elasticsearch::Client).to receive(:perform_request).with(any_args).and_return(mock_resp)
end

it 'propagates header to es client' do
plugin.register
client = plugin.send(:get_client).client
expect( extract_transport(client).options[:transport_options][:headers] ).to match hash_including("Elastic-Api-Version" => "2023-10-31")
end
end

context "stateful" do
let(:build_flavor) { "default" }

before do
expect_any_instance_of(Elasticsearch::Client).to receive(:perform_request).with(any_args).and_return(mock_resp)
end

it 'does not propagate header to es client' do
plugin.register
client = plugin.send(:get_client).client
expect( extract_transport(client).options[:transport_options][:headers] ).to match hash_not_including("Elastic-Api-Version" => "2023-10-31")
end
end

end

describe "ca_trusted_fingerprint" do
let(:ca_trusted_fingerprint) { SecureRandom.hex(32) }
let(:config) { {"ssl_enabled" => true, "ca_trusted_fingerprint" => ca_trusted_fingerprint, "query" => "*"}}
Expand All @@ -627,7 +693,10 @@ def wait_receive_request

if Gem::Version.create(LOGSTASH_VERSION) >= Gem::Version.create("8.3.0")
context 'the generated trust_strategy' do
before(:each) { allow(plugin).to receive(:test_connection!) }
before(:each) do
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
end

it 'is passed to the Manticore client' do
expect(Manticore::Client).to receive(:new)
Expand Down Expand Up @@ -666,7 +735,10 @@ def wait_receive_request

subject(:plugin) { described_class.new(config) }

before(:each) { allow(plugin).to receive(:test_connection!) }
before(:each) do
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
end

it 'is passed to the Manticore client' do
expect(Manticore::Client).to receive(:new)
Expand Down Expand Up @@ -694,7 +766,10 @@ def wait_receive_request
let(:config) { {"query" => "*"} }
let(:plugin) { described_class.new(config) }

before { allow(plugin).to receive(:test_connection!) }
before do
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
end

it "should set localhost:9200 as hosts" do
plugin.register
Expand All @@ -719,6 +794,7 @@ def wait_receive_request
before(:each) do
allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client)
allow(plugin).to receive(:test_connection!)
allow(plugin).to receive(:setup_serverless)
plugin.register
end

Expand All @@ -736,4 +812,21 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie
client.transport.respond_to?(:transport) ? client.transport.transport : client.transport
end

class MockResponse
attr_reader :code, :headers

def initialize(code = 200, body = nil, headers = {})
@code = code
@body = body
@headers = headers
end

def body
@body
end

def status
@code
end
end
end
2 changes: 2 additions & 0 deletions spec/filters/elasticsearch_ssl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
before do
allow(es_client_double).to receive(:close)
allow(es_client_double).to receive(:ping).with(any_args).and_return(double("pong").as_null_object)
allow(es_client_double).to receive(:info).with(any_args).and_return({"version" => {"number" => "7.5.0", "build_flavor" => "default"},
"tagline" => "You Know, for Search"})
allow(Elasticsearch::Client).to receive(:new).and_return(es_client_double)
end

Expand Down

0 comments on commit 7c94e3b

Please sign in to comment.