Skip to content

Commit

Permalink
Merge branch 'main' of github.com:logstash-plugins/logstash-filter-el…
Browse files Browse the repository at this point in the history
…asticsearch into add_eav_header

# Conflicts:
#	CHANGELOG.md
#	logstash-filter-elasticsearch.gemspec
  • Loading branch information
kaisecheng committed Sep 26, 2023
2 parents b2ae3d1 + 0adc005 commit 6fbd357
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 21 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## 3.16.0
- Added request header `Elastic-Api-Version` for serverless [#174](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/174)

## 3.15.3
- Fixes a memory leak that occurs when a pipeline containing this filter terminates, which could become significant if the pipeline is cycled repeatedly [#173](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/173)

## 3.15.2
- Added checking for `query` and `query_template`. [#171](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/171)

Expand Down
10 changes: 6 additions & 4 deletions lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "logstash/json"
require 'logstash/plugin_mixins/ca_trusted_fingerprint_support'
require "logstash/plugin_mixins/normalize_config_support"
require "monitor"

require_relative "elasticsearch/client"
require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore"
Expand Down Expand Up @@ -139,7 +140,8 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base

include LogStash::PluginMixins::NormalizeConfigSupport

attr_reader :clients_pool
include MonitorMixin
attr_reader :shared_client

BUILD_FLAVOR_SERVERLESS = 'serverless'.freeze
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
Expand All @@ -162,8 +164,6 @@ def self.validate_value(value, validator)
end

def register
@clients_pool = java.util.concurrent.ConcurrentHashMap.new

#Load query if it exists
if @query_template
if File.zero?(@query_template)
Expand Down Expand Up @@ -356,7 +356,9 @@ def new_client
end

def get_client
@clients_pool.computeIfAbsent(Thread.current, lambda { |x| new_client })
@shared_client || synchronize do
@shared_client ||= new_client
end
end

# get an array of path elements from a path reference
Expand Down
3 changes: 3 additions & 0 deletions lib/logstash/filters/elasticsearch/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def initialize(logger, hosts, options = {})
transport_options[:headers].merge!(setup_api_key(api_key))
transport_options[:headers].merge!({ 'user-agent' => "#{user_agent}" })

transport_options[:pool_max] = 1000
transport_options[:pool_max_per_route] = 100

logger.warn "Supplied proxy setting (proxy => '') has no effect" if @proxy.eql?('')
transport_options[:proxy] = proxy.to_s if proxy && !proxy.eql?('')

Expand Down
1 change: 1 addition & 0 deletions logstash-filter-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-elasticsearch'
s.version = '3.15.3'
s.version = '3.16.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Copies fields from previous log events in Elasticsearch to current events "
Expand Down
45 changes: 28 additions & 17 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,6 @@
Thread.current[:filter_elasticsearch_client] = nil
end

# Since the Elasticsearch Ruby client is not thread safe
# and under high load we can get error with the connection pool
# we have decided to create a new instance per worker thread which
# will be lazy created on the first call to `#filter`
#
# I am adding a simple test case for future changes
it "uses a different connection object per thread wait" do
expect(plugin.clients_pool.size).to eq(0)

Thread.new { plugin.filter(event) }.join
Thread.new { plugin.filter(event) }.join

expect(plugin.clients_pool.size).to eq(2)
end

it "should enhance the current event with new data" do
plugin.filter(event)
expect(event.get("code")).to eq(404)
Expand Down Expand Up @@ -471,6 +456,32 @@ def wait_receive_request
Thread.current[:filter_elasticsearch_client] = nil
end

it 'uses a threadsafe transport adapter' do
client = plugin.send(:get_client).client
# we currently rely on the threadsafety guarantees provided by Manticore
# this spec is a safeguard to trigger an assessment of thread-safety should
# we choose a different transport adapter in the future.
transport_class = extract_transport(client).options.fetch(:transport_class)
expect(transport_class).to equal ::Elasticsearch::Transport::Transport::HTTP::Manticore
end

it 'uses a client with sufficient connection pool size' do
client = plugin.send(:get_client).client
transport_options = extract_transport(client).options.fetch(:transport_options)
# pool_max and pool_max_per_route are manticore-specific transport options
expect(transport_options).to include(:pool_max => 1000, :pool_max_per_route => 100)
end

it 'uses a single shared client across threads' do
q = Queue.new
10.times.map do
Thread.new(plugin) { |instance| q.push instance.send(:get_client) }
end.map(&:join)

first = q.pop
expect(q.pop).to be(first) until q.empty?
end

describe "cloud.id" do
let(:valid_cloud_id) do
'sample:dXMtY2VudHJhbDEuZ2NwLmNsb3VkLmVzLmlvJGFjMzFlYmI5MDI0MTc3MzE1NzA0M2MzNGZkMjZmZDQ2OjkyNDMkYTRjMDYyMzBlNDhjOGZjZTdiZTg4YTA3NGEzYmIzZTA6OTI0NA=='
Expand Down Expand Up @@ -773,9 +784,9 @@ def wait_receive_request
end

it "should read and send non-ascii query" do
expect(client).to receive(:search).with(
expect(client).to receive(:search).with({
:body => { "query" => { "terms" => { "lock" => [ "잠금", "uzávěr" ] } } },
:index => "")
:index => ""})

plugin.filter(LogStash::Event.new)
end
Expand Down

0 comments on commit 6fbd357

Please sign in to comment.