Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Elastic Api Version header #1147

Merged
merged 9 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.18.0
- Added request header `Elastic-Api-Version` for serverless [#1147](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1147)

## 11.17.0
- Added support to http compression level. Deprecated `http_compression` in favour of `compression_level` and enabled compression level 1 by default. [#1148](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1148)

Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def emulate_batch_error_response(actions, http_code, reason)
end

def get(path)
response = @pool.get(path, nil)
response = @pool.get(path)
LogStash::Json.load(response.body)
end

Expand Down
16 changes: 9 additions & 7 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def initialize(original_error, url)
:sniffer_delay => 10,
}.freeze

BUILD_FLAVOUR_SERVERLESS = 'serverless'.freeze
BUILD_FLAVOR_SERVERLESS = 'serverless'.freeze
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze

def initialize(logger, adapter, initial_urls=[], options={})
@logger = logger
Expand Down Expand Up @@ -77,7 +78,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
@license_checker = options[:license_checker] || LogStash::PluginMixins::ElasticSearch::NoopLicenseChecker::INSTANCE

@last_es_version = Concurrent::AtomicReference.new
@build_flavour = Concurrent::AtomicReference.new
@build_flavor = Concurrent::AtomicReference.new
end

def start
Expand Down Expand Up @@ -255,7 +256,7 @@ def healthcheck!(register_phase = true)
# We reconnected to this node, check its ES version
version_info = get_es_version(url)
es_version = version_info.fetch('number', nil)
build_flavour = version_info.fetch('build_flavor', nil)
build_flavor = version_info.fetch('build_flavor', nil)

if es_version.nil?
logger.warn("Failed to retrieve Elasticsearch version data from connected endpoint, connection aborted", :url => url.sanitized.to_s)
Expand All @@ -264,7 +265,7 @@ def healthcheck!(register_phase = true)
@state_mutex.synchronize do
meta[:version] = es_version
set_last_es_version(es_version, url)
set_build_flavour(build_flavour)
set_build_flavor(build_flavor)

alive = @license_checker.appropriate_license?(self, url)
meta[:state] = alive ? :alive : :dead
Expand Down Expand Up @@ -334,6 +335,7 @@ def perform_request(method, path, params={}, body=nil)
end

def perform_request_to_url(url, method, path, params={}, body=nil)
params[:headers] = DEFAULT_EAV_HEADER.merge(params[:headers] || {}) if serverless?
@adapter.perform_request(url, method, path, params, body)
end

Expand Down Expand Up @@ -494,7 +496,7 @@ def maximum_seen_major_version
end

def serverless?
@build_flavour.get == BUILD_FLAVOUR_SERVERLESS
@build_flavor.get == BUILD_FLAVOR_SERVERLESS
end

private
Expand Down Expand Up @@ -526,8 +528,8 @@ def warn_on_higher_major_version(major, url)
previous_major: @maximum_seen_major_version, new_major: major, node_url: url.sanitized.to_s)
end

def set_build_flavour(flavour)
@build_flavour.set(flavour)
def set_build_flavor(flavor)
@build_flavor.set(flavor)
end

end
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.17.0'
s.version = '11.18.0'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
36 changes: 33 additions & 3 deletions spec/unit/outputs/elasticsearch/http_client/pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def body
end


describe "build flavour tracking" do
describe "build flavor tracking" do
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://somehost:9200")] }

let(:es_version_info) { [ { "number" => '8.9.0', "build_flavor" => "serverless" } ] }
Expand All @@ -284,7 +284,7 @@ def body
{"tagline" => "You Know, for Search",
"version" => {
"number" => '8.9.0',
"build_flavor" => LogStash::Outputs::ElasticSearch::HttpClient::Pool::BUILD_FLAVOUR_SERVERLESS} },
"build_flavor" => LogStash::Outputs::ElasticSearch::HttpClient::Pool::BUILD_FLAVOR_SERVERLESS} },
{ "X-Elastic-Product" => "Elasticsearch" }
) }

Expand All @@ -293,7 +293,7 @@ def body
subject.start
end

it "picks the build flavour" do
it "picks the build flavor" do
expect(subject.serverless?).to be_truthy
end
end
Expand Down Expand Up @@ -327,6 +327,36 @@ def body
end
end

describe "elastic api version header" do
let(:eav) { "Elastic-Api-Version" }

context "when it is serverless" do
before(:each) do
expect(subject).to receive(:serverless?).and_return(true)
end

it "add the default header" do
expect(adapter).to receive(:perform_request).with(anything, :get, "/", anything, anything) do |_, _, _, params, _|
expect(params[:headers]).to eq({ "User-Agent" => "chromium", "Elastic-Api-Version" => "2023-10-31"})
end
subject.perform_request_to_url(initial_urls, :get, "/", { :headers => { "User-Agent" => "chromium" }} )
end
end

context "when it is stateful" do
before(:each) do
expect(subject).to receive(:serverless?).and_return(false)
end

it "add the default header" do
expect(adapter).to receive(:perform_request).with(anything, :get, "/", anything, anything) do |_, _, _, params, _|
expect(params[:headers]).to be_nil
end
subject.perform_request_to_url(initial_urls, :get, "/" )
end
end
end

# TODO: extract to ElasticSearchOutputLicenseChecker unit spec
describe "license checking with ElasticSearchOutputLicenseChecker" do
let(:options) do
Expand Down
2 changes: 1 addition & 1 deletion spec/unit/outputs/elasticsearch/http_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
}

it "returns the hash response" do
expect(subject.pool).to receive(:get).with(path, nil).and_return(get_response)
expect(subject.pool).to receive(:get).with(path).and_return(get_response)
expect(subject.get(path)["body"]).to eq(body)
end
end
Expand Down