Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to Serverless Elasticsearch #1145

Merged
merged 10 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.16.0
- Added support to Serverless Elasticsearch [#1445](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1145)

## 11.15.9
- allow dlq_ settings when using data streams [#1144](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1144)

Expand Down
4 changes: 4 additions & 0 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ def maximum_seen_major_version
@pool.maximum_seen_major_version
end

def serverless?
@pool.serverless?
end

def alive_urls_count
@pool.alive_urls_count
end
Expand Down
19 changes: 17 additions & 2 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def initialize(original_error, url)
:sniffer_delay => 10,
}.freeze

BUILD_FLAVOUR_SERVERLESS = 'serverless'.freeze

def initialize(logger, adapter, initial_urls=[], options={})
@logger = logger
@adapter = adapter
Expand Down Expand Up @@ -75,6 +77,7 @@ def initialize(logger, adapter, initial_urls=[], options={})
@license_checker = options[:license_checker] || LogStash::PluginMixins::ElasticSearch::NoopLicenseChecker::INSTANCE

@last_es_version = Concurrent::AtomicReference.new
@build_flavour = Concurrent::AtomicReference.new
end

def start
Expand Down Expand Up @@ -250,14 +253,18 @@ def healthcheck!(register_phase = true)
# If no exception was raised it must have succeeded!
logger.warn("Restored connection to ES instance", url: url.sanitized.to_s)
# We reconnected to this node, check its ES version
es_version = get_es_version(url)
version_info = get_es_version(url)
es_version = version_info.fetch('number', nil)
build_flavour = version_info.fetch('build_flavor', nil)

if es_version.nil?
logger.warn("Failed to retrieve Elasticsearch version data from connected endpoint, connection aborted", :url => url.sanitized.to_s)
next
end
@state_mutex.synchronize do
meta[:version] = es_version
set_last_es_version(es_version, url)
set_build_flavour(build_flavour)

alive = @license_checker.appropriate_license?(self, url)
meta[:state] = alive ? :alive : :dead
Expand Down Expand Up @@ -475,7 +482,7 @@ def get_es_version(url)

response = LogStash::Json.load(response.body)

response.fetch('version', {}).fetch('number', nil)
response.fetch('version', {})
end

def last_es_version
Expand All @@ -486,6 +493,10 @@ def maximum_seen_major_version
@state_mutex.synchronize { @maximum_seen_major_version }
end

def serverless?
@build_flavour.get == BUILD_FLAVOUR_SERVERLESS
end

private

# @private executing within @state_mutex
Expand Down Expand Up @@ -515,5 +526,9 @@ def warn_on_higher_major_version(major, url)
previous_major: @maximum_seen_major_version, new_major: major, node_url: url.sanitized.to_s)
end

def set_build_flavour(flavour)
@build_flavour.set(flavour)
end

end
end; end; end; end;
11 changes: 9 additions & 2 deletions lib/logstash/outputs/elasticsearch/ilm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,18 @@ def ilm_in_use?
return @ilm_actually_enabled if defined?(@ilm_actually_enabled)
@ilm_actually_enabled =
begin
if @ilm_enabled == 'auto'
if serverless?
raise LogStash::ConfigurationError, "Invalid ILM configuration `ilm_enabled => true`. " +
jsvd marked this conversation as resolved.
Show resolved Hide resolved
"Serverless Elasticsearch cluster does not support Index Lifecycle Management." if @ilm_enabled.to_s == 'true'
@logger.info("ILM auto configuration (`ilm_enabled => auto` or unset) resolved to `false`. "\
"Serverless Elasticsearch cluster does not support Index Lifecycle Management.") if @ilm_enabled == 'auto'
false
elsif @ilm_enabled == 'auto'
if ilm_on_by_default?
ilm_alias_set?
else
@logger.info("Index Lifecycle Management is set to 'auto', but will be disabled - Your Elasticsearch cluster is before 7.0.0, which is the minimum version required to automatically run Index Lifecycle Management")
@logger.info("ILM auto configuration (`ilm_enabled => auto` or unset) resolved to `false`."\
" Elasticsearch cluster is before 7.0.0, which is the minimum version required to automatically run Index Lifecycle Management")
false
end
elsif @ilm_enabled.to_s == 'true'
Expand Down
2 changes: 2 additions & 0 deletions lib/logstash/outputs/elasticsearch/license_checker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ def initialize(logger)
# @param url [LogStash::Util::SafeURI] ES node URL
# @return [Boolean] true if provided license is deemed appropriate
def appropriate_license?(pool, url)
return true if pool.serverless?

license = extract_license(pool.get_license(url))
case license_status(license)
when 'active'
Expand Down
36 changes: 27 additions & 9 deletions lib/logstash/outputs/elasticsearch/template_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ def self.install_template(plugin)
"We recommend either setting `template_api => legacy` to continue providing legacy-style templates, " +
"or migrating your template to the composable style and setting `template_api => composable`. " +
"The legacy template API is slated for removal in Elasticsearch 9.")
elsif plugin.template_api == 'legacy' && plugin.serverless?
raise LogStash::ConfigurationError, "Invalid template configuration `template_api => legacy`. Serverless Elasticsearch does not support legacy template API."
end


if plugin.template
plugin.logger.info("Using mapping template from", :path => plugin.template)
template = read_template_file(plugin.template)
Expand Down Expand Up @@ -61,11 +64,13 @@ def self.resolve_template_settings(plugin, template)
plugin.logger.trace("Resolving ILM template settings: under 'settings' key", :template => template, :template_api => plugin.template_api, :es_version => plugin.maximum_seen_major_version)
legacy_index_template_settings(template)
else
template_endpoint = template_endpoint(plugin)
plugin.logger.trace("Resolving ILM template settings: template doesn't have 'settings' or 'template' fields, falling back to auto detection", :template => template, :template_api => plugin.template_api, :es_version => plugin.maximum_seen_major_version, :template_endpoint => template_endpoint)
template_endpoint == INDEX_TEMPLATE_ENDPOINT ?
composable_index_template_settings(template) :
use_index_template = index_template?(plugin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're not really using templates here, but figuring out which API for templating to use, maybe a better name:

Suggested change
use_index_template = index_template?(plugin)
use_index_template_api = index_template_api?(plugin)

plugin.logger.trace("Resolving ILM template settings: template doesn't have 'settings' or 'template' fields, falling back to auto detection", :template => template, :template_api => plugin.template_api, :es_version => plugin.maximum_seen_major_version, :index_template => use_index_template)
if use_index_template
composable_index_template_settings(template)
else
legacy_index_template_settings(template)
end
end
end

Expand Down Expand Up @@ -100,12 +105,25 @@ def self.read_template_file(template_path)
end

def self.template_endpoint(plugin)
if plugin.template_api == 'auto'
plugin.maximum_seen_major_version < 8 ? LEGACY_TEMPLATE_ENDPOINT : INDEX_TEMPLATE_ENDPOINT
elsif plugin.template_api.to_s == 'legacy'
LEGACY_TEMPLATE_ENDPOINT
index_template?(plugin) ? INDEX_TEMPLATE_ENDPOINT : LEGACY_TEMPLATE_ENDPOINT
end

def self.index_template?(plugin)
case plugin.serverless?
when true
true
else
INDEX_TEMPLATE_ENDPOINT
case plugin.template_api
when 'auto'
plugin.maximum_seen_major_version >= 8
when 'composable'
true
when 'legacy'
false
else
plugin.logger.warn("Invalid template_api value #{plugin.template_api}")
true
end
end
end

Expand Down
4 changes: 4 additions & 0 deletions lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ def maximum_seen_major_version
client.maximum_seen_major_version
end

def serverless?
client.serverless?
end

def alive_urls_count
client.alive_urls_count
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.15.9'
s.version = '11.16.0'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
13 changes: 8 additions & 5 deletions spec/es_spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,14 @@ def routing_field_name
end

def self.es_version
[
nilify(RSpec.configuration.filter[:es_version]),
nilify(ENV['ES_VERSION']),
nilify(ENV['ELASTIC_STACK_VERSION']),
].compact.first
{
"number" => [
nilify(RSpec.configuration.filter[:es_version]),
nilify(ENV['ES_VERSION']),
nilify(ENV['ELASTIC_STACK_VERSION']),
].compact.first,
"build_flavor" => 'default'
}
end

RSpec::Matchers.define :have_hits do |expected|
Expand Down
16 changes: 16 additions & 0 deletions spec/fixtures/license_check/active.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"license": {
"status": "active",
"uid": "d85d2c6a-b96d-3cc6-96db-5571a789b156",
"type": "enterprise",
"issue_date": "1970-01-01T00:00:00.000Z",
"issue_date_in_millis": 0,
"expiry_date": "2100-01-01T00:00:00.000Z",
"expiry_date_in_millis": 4102444800000,
"max_nodes": null,
"max_resource_units": 100000,
"issued_to": "Elastic Cloud",
"issuer": "API",
"start_date_in_millis": 0
}
}
5 changes: 5 additions & 0 deletions spec/fixtures/license_check/inactive.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"license": {
"status": "inactive"
}
}
34 changes: 29 additions & 5 deletions spec/unit/outputs/elasticsearch/http_client/pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
let(:logger) { Cabin::Channel.get }
let(:adapter) { LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter.new(logger, {}) }
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] }
let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests
let(:es_node_versions) { [ "0.0.0" ] }
let(:options) { {:resurrect_delay => 3, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests
let(:es_version_info) { [ { "number" => '0.0.0', "build_flavor" => 'default'} ] }
let(:license_status) { 'active' }

subject { described_class.new(logger, adapter, initial_urls, options) }
Expand All @@ -22,7 +22,7 @@

allow(::Manticore::Client).to receive(:new).and_return(manticore_double)

allow(subject).to receive(:get_es_version).with(any_args).and_return(*es_node_versions)
allow(subject).to receive(:get_es_version).with(any_args).and_return(*es_version_info)
allow(subject.license_checker).to receive(:license_status).and_return(license_status)
end

Expand Down Expand Up @@ -267,13 +267,37 @@ def body
end

context "if there are nodes with multiple major versions" do
let(:es_node_versions) { [ "0.0.0", "6.0.0" ] }
let(:es_version_info) { [ { "number" => '0.0.0', "build_flavor" => 'default'}, { "number" => '6.0.0', "build_flavor" => 'default'} ] }
it "picks the largest major version" do
expect(subject.maximum_seen_major_version).to eq(6)
end
end
end


describe "build flavour tracking" do
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://somehost:9200")] }

let(:es_version_info) { [ { "number" => '8.9.0', "build_flavor" => LogStash::Outputs::ElasticSearch::HttpClient::Pool::BUILD_FLAVOUR_SERVERLESS } ] }
kaisecheng marked this conversation as resolved.
Show resolved Hide resolved

let(:valid_response) { MockResponse.new(200,
{"tagline" => "You Know, for Search",
"version" => {
"number" => '8.9.0',
"build_flavor" => LogStash::Outputs::ElasticSearch::HttpClient::Pool::BUILD_FLAVOUR_SERVERLESS} },
{ "X-Elastic-Product" => "Elasticsearch" }
) }

before(:each) do
allow(subject).to receive(:perform_request_to_url).and_return(valid_response)
subject.start
end

it "picks the build flavour" do
expect(subject.serverless?).to be_truthy
end
end

describe "license checking" do
before(:each) do
allow(subject).to receive(:health_check_request)
Expand Down Expand Up @@ -364,7 +388,7 @@ def body
let(:adapter) { double("Manticore Adapter") }
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] }
let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests
let(:es_node_versions) { [ "0.0.0" ] }
let(:es_version_info) { [{ "number" => '0.0.0', "build_flavor" => 'default'}] }
let(:license_status) { 'active' }

subject { LogStash::Outputs::ElasticSearch::HttpClient::Pool.new(logger, adapter, initial_urls, options) }
Expand Down
24 changes: 23 additions & 1 deletion spec/unit/outputs/elasticsearch/template_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
let(:template_api) { "composable" }

it 'resolves composable index template API compatible setting' do
expect(plugin).to receive(:serverless?).and_return(false)
expect(plugin).to receive(:maximum_seen_major_version).at_least(:once).and_return(8) # required to log
template = {}
described_class.resolve_template_settings(plugin, template)
Expand All @@ -84,6 +85,7 @@
let(:template_api) { "legacy" }

it 'resolves legacy index template API compatible setting' do
expect(plugin).to receive(:serverless?).and_return(false)
expect(plugin).to receive(:maximum_seen_major_version).at_least(:once).and_return(7) # required to log
template = {}
described_class.resolve_template_settings(plugin, template)
Expand All @@ -97,6 +99,7 @@
describe "with ES < 8 versions" do

it 'resolves legacy index template API compatible setting' do
expect(plugin).to receive(:serverless?).and_return(false)
expect(plugin).to receive(:maximum_seen_major_version).at_least(:once).and_return(7)
template = {}
described_class.resolve_template_settings(plugin, template)
Expand All @@ -106,6 +109,7 @@

describe "with ES >= 8 versions" do
it 'resolves composable index template API compatible setting' do
expect(plugin).to receive(:serverless?).and_return(false)
expect(plugin).to receive(:maximum_seen_major_version).at_least(:once).and_return(8)
template = {}
described_class.resolve_template_settings(plugin, template)
Expand All @@ -123,6 +127,7 @@

describe "in version 8+" do
it "should use index template API" do
expect(plugin).to receive(:serverless?).and_return(false)
expect(plugin).to receive(:maximum_seen_major_version).at_least(:once).and_return(8)
endpoint = described_class.template_endpoint(plugin)
expect(endpoint).to be_equal(LogStash::Outputs::ElasticSearch::TemplateManager::INDEX_TEMPLATE_ENDPOINT)
Expand All @@ -131,6 +136,7 @@

describe "in version < 8" do
it "should use legacy template API" do
expect(plugin).to receive(:serverless?).and_return(false)
expect(plugin).to receive(:maximum_seen_major_version).at_least(:once).and_return(7)
endpoint = described_class.template_endpoint(plugin)
expect(endpoint).to be_equal(LogStash::Outputs::ElasticSearch::TemplateManager::LEGACY_TEMPLATE_ENDPOINT)
Expand All @@ -144,6 +150,7 @@

describe "in version 8+" do
it "should use legacy template API" do
expect(plugin).to receive(:serverless?).and_return(false)
expect(plugin).to receive(:maximum_seen_major_version).never
endpoint = described_class.template_endpoint(plugin)
expect(endpoint).to be_equal(LogStash::Outputs::ElasticSearch::TemplateManager::LEGACY_TEMPLATE_ENDPOINT)
Expand All @@ -157,11 +164,26 @@

describe "in version 8+" do
it "should use legacy template API" do
expect(plugin).to receive(:serverless?).and_return(false)
expect(plugin).to receive(:maximum_seen_major_version).never
endpoint = described_class.template_endpoint(plugin)
expect(endpoint).to be_equal(LogStash::Outputs::ElasticSearch::TemplateManager:: INDEX_TEMPLATE_ENDPOINT)
expect(endpoint).to be_equal(LogStash::Outputs::ElasticSearch::TemplateManager::INDEX_TEMPLATE_ENDPOINT)
end
end
end

describe "in serverless" do
[:auto, :composable, :legacy].each do |api|
let(:plugin_settings) { {"manage_template" => true, "template_api" => api.to_s} }
let(:plugin) { LogStash::Outputs::ElasticSearch.new(plugin_settings) }

it "use index template API when template_api set to #{api}" do
expect(plugin).to receive(:serverless?).and_return(true)
endpoint = described_class.template_endpoint(plugin)
expect(endpoint).to be_equal(LogStash::Outputs::ElasticSearch::TemplateManager::INDEX_TEMPLATE_ENDPOINT)
end
end

end
end
end