Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Elastic Api Version header #1147

Merged
merged 9 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.18.0
- Added request header `Elastic-Api-Version` for serverless [#1147](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1147)

## 11.17.0
- Added support to http compression level. Deprecated `http_compression` in favour of `compression_level` and enabled compression level 1 by default. [#1148](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1148)

Expand Down
4 changes: 3 additions & 1 deletion lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,9 @@ def use_event_type?(noop_required_client)
def install_template
TemplateManager.install_template(self)
rescue => e
@logger.error("Failed to install template", message: e.message, exception: e.class, backtrace: e.backtrace)
details = { message: e.message, exception: e.class, backtrace: e.backtrace }
details[:body] = e.response_body if e.respond_to?(:response_body)
@logger.error("Failed to install template", details)
end

def setup_ecs_compatibility_related_defaults
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def emulate_batch_error_response(actions, http_code, reason)
end

def get(path)
response = @pool.get(path, nil)
response = @pool.get(path)
LogStash::Json.load(response.body)
end

Expand Down
167 changes: 106 additions & 61 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ def initialize(response_code, url, request_body, response_body)
@response_body = response_body
end

def invalid_eav_header?
@response_code == 400 && @response_body&.include?(ELASTIC_API_VERSION)
end

def invalid_credentials?
@response_code == 401
end

def forbidden?
@response_code == 403
end

end
class HostUnreachableError < Error;
attr_reader :original_error, :url
Expand Down Expand Up @@ -48,7 +60,9 @@ def initialize(original_error, url)
:sniffer_delay => 10,
}.freeze

BUILD_FLAVOUR_SERVERLESS = 'serverless'.freeze
BUILD_FLAVOR_SERVERLESS = 'serverless'.freeze
ELASTIC_API_VERSION = "Elastic-Api-Version".freeze
DEFAULT_EAV_HEADER = { ELASTIC_API_VERSION => "2023-10-31" }.freeze

def initialize(logger, adapter, initial_urls=[], options={})
@logger = logger
Expand Down Expand Up @@ -77,7 +91,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
@license_checker = options[:license_checker] || LogStash::PluginMixins::ElasticSearch::NoopLicenseChecker::INSTANCE

@last_es_version = Concurrent::AtomicReference.new
@build_flavour = Concurrent::AtomicReference.new
@build_flavor = Concurrent::AtomicReference.new
end

def start
Expand Down Expand Up @@ -232,39 +246,56 @@ def get_license(url)
end

def health_check_request(url)
response = perform_request_to_url(url, :head, @healthcheck_path)
raise BadResponseCodeError.new(response.code, url, nil, response.body) unless (200..299).cover?(response.code)
logger.debug("Running health check to see if an Elasticsearch connection is working",
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
begin
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
end
end

def healthcheck!(register_phase = true)
# Try to keep locking granularity low such that we don't affect IO...
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta|
begin
logger.debug("Running health check to see if an Elasticsearch connection is working",
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
health_check_request(url)
_, health_bad_code_err = health_check_request(url)
root_response, root_bad_code_err = get_root_path(url) if health_bad_code_err.nil? || register_phase

# when called from resurrectionist skip the product check done during register phase
if register_phase
if !elasticsearch?(url)
raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
end
raise LogStash::ConfigurationError,
"Could not read Elasticsearch. Please check the credentials" if root_bad_code_err&.invalid_credentials?
raise LogStash::ConfigurationError,
"Could not read Elasticsearch. Please check the privileges" if root_bad_code_err&.forbidden?
# when customer_headers is invalid
raise LogStash::ConfigurationError,
"The Elastic-Api-Version header is not valid" if root_bad_code_err&.invalid_eav_header?
# when it is not Elasticserach
raise LogStash::ConfigurationError,
"Could not connect to a compatible version of Elasticsearch" if root_bad_code_err.nil? && !elasticsearch?(root_response)

test_serverless_connection(url, root_response)
end

raise health_bad_code_err if health_bad_code_err
raise root_bad_code_err if root_bad_code_err

# If no exception was raised it must have succeeded!
logger.warn("Restored connection to ES instance", url: url.sanitized.to_s)
# We reconnected to this node, check its ES version
version_info = get_es_version(url)
es_version = version_info.fetch('number', nil)
build_flavour = version_info.fetch('build_flavor', nil)

if es_version.nil?
logger.warn("Failed to retrieve Elasticsearch version data from connected endpoint, connection aborted", :url => url.sanitized.to_s)
next
end

# We check its ES version
es_version, build_flavor = parse_es_version(root_response)
logger.warn("Failed to retrieve Elasticsearch build flavor") if build_flavor.nil?
logger.warn("Failed to retrieve Elasticsearch version data from connected endpoint, connection aborted", :url => url.sanitized.to_s) if es_version.nil?
next if es_version.nil?

@state_mutex.synchronize do
meta[:version] = es_version
set_last_es_version(es_version, url)
set_build_flavour(build_flavour)
set_build_flavor(build_flavor)

alive = @license_checker.appropriate_license?(self, url)
meta[:state] = alive ? :alive : :dead
Expand All @@ -275,40 +306,21 @@ def healthcheck!(register_phase = true)
end
end

def elasticsearch?(url)
def get_root_path(url, params={})
begin
response = perform_request_to_url(url, :get, ROOT_URI_PATH)
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return false if response.code == 401 || response.code == 403
raise e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
end

version_info = LogStash::Json.load(response.body)
return false if version_info['version'].nil?

version = ::Gem::Version.new(version_info["version"]['number'])
return false if version < ::Gem::Version.new('6.0.0')

if VERSION_6_TO_7.satisfied_by?(version)
return valid_tagline?(version_info)
elsif VERSION_7_TO_7_14.satisfied_by?(version)
build_flavor = version_info["version"]['build_flavor']
return false if build_flavor.nil? || build_flavor != 'default' || !valid_tagline?(version_info)
else
# case >= 7.14
lower_headers = response.headers.transform_keys {|key| key.to_s.downcase }
product_header = lower_headers['x-elastic-product']
return false if product_header != 'Elasticsearch'
end
return true
rescue => e
logger.error("Unable to retrieve Elasticsearch version", url: url.sanitized.to_s, exception: e.class, message: e.message)
false
end

def valid_tagline?(version_info)
tagline = version_info['tagline']
tagline == "You Know, for Search"
def test_serverless_connection(url, root_response)
_, build_flavor = parse_es_version(root_response)
params = { :headers => DEFAULT_EAV_HEADER }
_, bad_code_err = get_root_path(url, params) if build_flavor == BUILD_FLAVOR_SERVERLESS
raise LogStash::ConfigurationError, "The Elastic-Api-Version header is not valid" if bad_code_err&.invalid_eav_header?
end

def stop_resurrectionist
Expand All @@ -334,6 +346,7 @@ def perform_request(method, path, params={}, body=nil)
end

def perform_request_to_url(url, method, path, params={}, body=nil)
params[:headers] = DEFAULT_EAV_HEADER.merge(params[:headers] || {}) if serverless?
@adapter.perform_request(url, method, path, params, body)
end

Expand Down Expand Up @@ -476,15 +489,6 @@ def return_connection(url)
end
end

def get_es_version(url)
response = perform_request_to_url(url, :get, ROOT_URI_PATH)
return nil unless (200..299).cover?(response.code)

response = LogStash::Json.load(response.body)

response.fetch('version', {})
end

def last_es_version
@last_es_version.get
end
Expand All @@ -494,7 +498,7 @@ def maximum_seen_major_version
end

def serverless?
@build_flavour.get == BUILD_FLAVOUR_SERVERLESS
@build_flavor.get == BUILD_FLAVOR_SERVERLESS
end

private
Expand Down Expand Up @@ -526,9 +530,50 @@ def warn_on_higher_major_version(major, url)
previous_major: @maximum_seen_major_version, new_major: major, node_url: url.sanitized.to_s)
end

def set_build_flavour(flavour)
@build_flavour.set(flavour)
def set_build_flavor(flavor)
@build_flavor.set(flavor)
end

def parse_es_version(response)
return nil, nil unless (200..299).cover?(response&.code)

response = LogStash::Json.load(response&.body)
version_info = response.fetch('version', {})
es_version = version_info.fetch('number', nil)
build_flavor = version_info.fetch('build_flavor', nil)

return es_version, build_flavor
end

def elasticsearch?(response)
return false if response.nil?

version_info = LogStash::Json.load(response.body)
return false if version_info['version'].nil?

version = ::Gem::Version.new(version_info["version"]['number'])
return false if version < ::Gem::Version.new('6.0.0')

if VERSION_6_TO_7.satisfied_by?(version)
return valid_tagline?(version_info)
elsif VERSION_7_TO_7_14.satisfied_by?(version)
build_flavor = version_info["version"]['build_flavor']
return false if build_flavor.nil? || build_flavor != 'default' || !valid_tagline?(version_info)
else
# case >= 7.14
lower_headers = response.headers.transform_keys {|key| key.to_s.downcase }
product_header = lower_headers['x-elastic-product']
return false if product_header != 'Elasticsearch'
end
return true
rescue => e
logger.error("Unable to retrieve Elasticsearch version", exception: e.class, message: e.message)
false
end

def valid_tagline?(version_info)
tagline = version_info['tagline']
tagline == "You Know, for Search"
end
end
end; end; end; end;
4 changes: 3 additions & 1 deletion lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ def discover_cluster_uuid
cluster_info = client.get('/')
plugin_metadata.set(:cluster_uuid, cluster_info['cluster_uuid'])
rescue => e
@logger.error("Unable to retrieve Elasticsearch cluster uuid", message: e.message, exception: e.class, backtrace: e.backtrace)
details = { message: e.message, exception: e.class, backtrace: e.backtrace }
details[:body] = e.response_body if e.respond_to?(:response_body)
@logger.error("Unable to retrieve Elasticsearch cluster uuid", details)
end

def retrying_submit(actions)
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.17.0'
s.version = '11.18.0'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down