Skip to content

Commit

Permalink
Added SSL support and update clients
Browse files Browse the repository at this point in the history
  • Loading branch information
edmocosta committed Oct 26, 2023
1 parent 41d51a6 commit 57f5fa4
Show file tree
Hide file tree
Showing 25 changed files with 1,146 additions and 395 deletions.
2 changes: 2 additions & 0 deletions .ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ ENV DISTRIBUTION=$DISTRIBUTION
# INTEGRATION="true" while integration testing (false-y by default)
ARG INTEGRATION
ENV INTEGRATION=$INTEGRATION
ARG SECURE_INTEGRATION
ENV SECURE_INTEGRATION=$SECURE_INTEGRATION
RUN gem install bundler -v '< 2'
WORKDIR /usr/share/plugins/plugin
RUN bundle install --with test ci
Expand Down
15 changes: 11 additions & 4 deletions .ci/docker-compose.override.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ services:
- DISTRIBUTION_SUFFIX=${DISTRIBUTION_SUFFIX}
- INTEGRATION=${INTEGRATION:-false}
- SECURE_INTEGRATION=${SECURE_INTEGRATION:-false}
depends_on:
- enterprise_search

elasticsearch:
build:
Expand All @@ -37,8 +39,7 @@ services:
soft: -1
hard: -1
ports:
- 9200:9200
# networks: ['stack']
- "9200:9200"

enterprise_search:
build:
Expand All @@ -54,6 +55,12 @@ services:
- ENT_SEARCH_DEFAULT_PASSWORD=password
- secret_management.encryption_keys=[changeme]
- allow_es_settings_modification=true
- ent_search.ssl.enabled=${SECURE_INTEGRATION:-false}
- ent_search.ssl.keystore.path=/certificates/root_keystore.jks
- ent_search.ssl.keystore.password=changeme
volumes:
- ../spec/fixtures/certificates:/certificates
ports:
- 3002:3002

- "3002:3002"
depends_on:
- elasticsearch
11 changes: 10 additions & 1 deletion .ci/logstash-run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@
set -ex

export USER='logstash'

source .ci/retrieve_app_search_credentials.sh

bundle exec rspec spec && bundle exec rspec spec --tag integration
bundle exec rspec --format=documentation spec/unit --tag ~integration:true --tag ~secure_integration:true

if [[ "$SECURE_INTEGRATION" == "true" ]]; then
extra_tag_args=" --tag secure_integration:true"
else
extra_tag_args="--tag ~secure_integration:true"
fi

bundle exec rspec --format=documentation --tag integration $extra_tag_args spec/integration
16 changes: 12 additions & 4 deletions .ci/retrieve_app_search_credentials.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/bin/bash

function wait_for_enterprise_search {
url="${1?url is required}"
local continue=1
set +e
while [ $continue -gt 0 ]; do
curl --connect-timeout 5 --max-time 10 --retry 10 --retry-delay 30 --retry-max-time 120 -s -o /dev/null ${ENTERPRISE_SEARCH_URL}/login
curl --connect-timeout 5 --max-time 10 --retry 10 --retry-delay 30 --retry-max-time 120 -s -o /dev/null --insecure ${url}/login
continue=$?
if [ $continue -gt 0 ]; then
sleep 1
Expand All @@ -13,14 +15,20 @@ function wait_for_enterprise_search {

function load_api_keys {
local CREDENTIALS_URL="${ENTERPRISE_SEARCH_URL}/as/credentials/collection?page%5Bcurrent%5D=1"
echo $(curl -u${ENTERPRISE_SEARCH_USERNAME}:${ENTERPRISE_SEARCH_PASSWORD} -s ${CREDENTIALS_URL} | sed -E "s/.*(${1}-[[:alnum:]]{24}).*/\1/")
echo $(curl -u${ENTERPRISE_SEARCH_USERNAME}:${ENTERPRISE_SEARCH_PASSWORD} -s ${CREDENTIALS_URL} --insecure | sed -E "s/.*(${1}-[[:alnum:]]{24}).*/\1/")
}

export ENTERPRISE_SEARCH_USERNAME=${ENTERPRISE_SEARCH_USERNAME:-"enterprise_search"}
export ENTERPRISE_SEARCH_PASSWORD=${ENTERPRISE_SEARCH_PASSWORD:-"password"}
export ENTERPRISE_SEARCH_URL=${ENTERPRISE_SEARCH_URL:-"http://enterprise_search:3002"}
export SECURE_INTEGRATION=${SECURE_INTEGRATION:-false}

if [[ "$SECURE_INTEGRATION" == "true" ]]; then
export ENTERPRISE_SEARCH_URL=${ENTERPRISE_SEARCH_URL:-"https://enterprise_search:3002"}
else
export ENTERPRISE_SEARCH_URL=${ENTERPRISE_SEARCH_URL:-"http://enterprise_search:3002"}
fi

wait_for_enterprise_search
wait_for_enterprise_search $ENTERPRISE_SEARCH_URL

export APP_SEARCH_PRIVATE_KEY=`load_api_keys private`
export APP_SEARCH_SEARCH_KEY=`load_api_keys search`
Expand Down
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,10 @@ import:

env:
- DISTRIBUTION=default ELASTIC_STACK_VERSION=7.x
- DISTRIBUTION=default ELASTIC_STACK_VERSION=7.x SECURE_INTEGRATION=true
- DISTRIBUTION=default ELASTIC_STACK_VERSION=7.x SNAPSHOT=true
- DISTRIBUTION=default ELASTIC_STACK_VERSION=8.x SNAPSHOT=true
- DISTRIBUTION=default ELASTIC_STACK_VERSION=7.x SNAPSHOT=true SECURE_INTEGRATION=true
- DISTRIBUTION=default ELASTIC_STACK_VERSION=8.x
- DISTRIBUTION=default ELASTIC_STACK_VERSION=8.x SECURE_INTEGRATION=true
- DISTRIBUTION=default ELASTIC_STACK_VERSION=8.x SNAPSHOT=true
- DISTRIBUTION=default ELASTIC_STACK_VERSION=8.x SECURE_INTEGRATION=true
123 changes: 44 additions & 79 deletions lib/logstash/outputs/elastic_app_search.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# encoding: utf-8
require "logstash/outputs/base"
require "elastic-app-search"
require "elastic-enterprise-search"
require 'logstash/plugin_mixins/deprecation_logger_support'
require 'logstash/outputs/base'

class LogStash::Outputs::ElasticAppSearch < LogStash::Outputs::Base
include LogStash::PluginMixins::DeprecationLoggerSupport

config_name "elastic_app_search"
require 'logstash/plugin_mixins/enterprise_search/ssl_configs'
require 'logstash/plugin_mixins/enterprise_search/client'

include LogStash::PluginMixins::EnterpriseSearch::SSLConfigs

config_name 'elastic_app_search'

# The name of the search engine you created in App Search, an information
# repository that includes the indexed document records.
Expand All @@ -21,16 +22,11 @@ class LogStash::Outputs::ElasticAppSearch < LogStash::Outputs::Base
# or, if the field is missing from the event and cannot be resolved at all.
config :engine, :validate => :string, :required => true

# The hostname of the App Search API that is associated with your App Search account.
# Set this when using the https://www.elastic.co/cloud/app-search-service
config :host, :validate => :string

# The value of the API endpoint in the form of a URL. Note: The value of the of the `path` setting will be will be appended to this URL.
# Set this when using the https://www.elastic.co/downloads/app-search
config :url, :validate => :string
# The value of the API endpoint in the form of a URL.
config :url, :validate => :string, :required => true, :default => Elastic::EnterpriseSearch::Utils::DEFAULT_HOST

# The private API Key with write permissions. Visit the https://app.swiftype.com/as/credentials
# in the App Search dashboard to find the key associated with your account.
# The private API Key with write permissions.
# https://www.elastic.co/guide/en/app-search/current/authentication.html#authentication-api-keys
config :api_key, :validate => :password, :required => true

# Where to move the value from the `@timestamp` field.
Expand All @@ -46,76 +42,61 @@ class LogStash::Outputs::ElasticAppSearch < LogStash::Outputs::Base
# like `myapp-%{sequence_id}`. Reusing ids will cause documents to be rewritten.
config :document_id, :validate => :string

# The path that is appended to the `url` parameter when connecting to a https://www.elastic.co/downloads/app-search
config :path, :validate => :string, :default => "/api/as/v1/"
ENGINE_WITH_SPRINTF_REGEX = /^.*%\{.+\}.*$/.freeze

ENGINE_WITH_SPRINTF_REGEX = /^.*%\{.+\}.*$/

public
def register
@use_old_client = false
if @host.nil? && @url.nil?
raise ::LogStash::ConfigurationError.new("Please specify either \"url\" (for self-managed) or \"host\" (for SaaS).")
elsif @host && @url
raise ::LogStash::ConfigurationError.new("Both \"url\" or \"host\" can't be set simultaneously. Please specify either \"url\" (for self-managed ot Elastic Enterprise Search) or \"host\" (for SaaS).")
elsif @host && path_is_set? # because path has a default value we need extra work to if the user set it
raise ::LogStash::ConfigurationError.new("The setting \"path\" is not compatible with \"host\". Use \"path\" only with \"url\".")
elsif @host
@deprecation_logger.deprecated("Deprecated service usage, the `host` setting will be removed when Swiftype AppSearch service is shutdown")
@use_old_client = true
@client = Elastic::AppSearch::Client.new(:host_identifier => @host, :api_key => @api_key.value)
elsif @url
if path_is_set?
@deprecation_logger.deprecated("Deprecated service usage, the `path` setting will be removed when Swiftype AppSearch service is shutdown")
@use_old_client = true
@client = Elastic::AppSearch::Client.new(:api_endpoint => @url + @path, :api_key => @api_key.value)
else
@client = Elastic::EnterpriseSearch::AppSearch::Client.new(:host => @url, :http_auth => @api_key.value, :external_url => @url)
end
end
check_connection! unless @engine =~ ENGINE_WITH_SPRINTF_REGEX
@retry_disabled = false
@client = LogStash::PluginMixins::EnterpriseSearch::AppSearch::Client.new(client_options, params: params)
check_connection!
rescue => e
if e.message =~ /401/
raise ::LogStash::ConfigurationError.new("Failed to connect to App Search. Error: 401. Please check your credentials")
raise LogStash::ConfigurationError, "Failed to connect to App Search. Please check your credentials. Error: #{e.message}"
elsif e.message =~ /404/
raise ::LogStash::ConfigurationError.new("Failed to connect to App Search. Error: 404. Please check if host '#{@host}' is correct and you've created an engine with name '#{@engine}'")
raise LogStash::ConfigurationError, "Failed to connect to App Search. Please check if url '#{@url}' is correct and you've created an engine with name '#{@engine}'. Error: #{e.message}"
else
raise ::LogStash::ConfigurationError.new("Failed to connect to App Search. #{e.message}")
raise LogStash::ConfigurationError, "Failed to connect to App Search. Error: #{e.message}"
end
end

public
def multi_receive(events)
# because App Search has a limit of 100 documents per bulk
events.each_slice(100) do |events|
batch = format_batch(events)
if @logger.trace?
@logger.trace("Sending bulk to App Search", :size => batch.size, :data => batch.inspect)
@logger.trace('Sending bulk to App Search', :size => batch.size, :data => batch.inspect)
end
index(batch)
end
end

private

def client_options
options = { :host => @url, :http_auth => @api_key.value }
options[:logger] = @logger if @logger.debug?
options[:tracer] = @logger if @logger.trace?
options
end

def format_batch(events)
docs_for_engine = {}
events.each do |event|
doc = event.to_hash
# we need to remove default fields that start with "@"
# since Elastic App Search doesn't accept them
if @timestamp_destination
doc[@timestamp_destination] = doc.delete("@timestamp")
doc[@timestamp_destination] = doc.delete('@timestamp')
else # delete it
doc.delete("@timestamp")
doc.delete('@timestamp')
end
if @document_id
doc["id"] = event.sprintf(@document_id)
doc['id'] = event.sprintf(@document_id)
end
doc.delete("@version")
doc.delete('@version')
resolved_engine = event.sprintf(@engine)
unless docs_for_engine[resolved_engine]
if @logger.debug?
@logger.debug("Creating new engine segment in batch to send", :resolved_engine => resolved_engine)
@logger.debug('Creating new engine segment in batch to send', :resolved_engine => resolved_engine)
end
docs_for_engine[resolved_engine] = []
end
Expand All @@ -130,15 +111,15 @@ def index(batch)
if resolved_engine =~ ENGINE_WITH_SPRINTF_REGEX || resolved_engine =~ /^\s*$/
raise "Cannot resolve engine field name #{@engine} from event"
end
if connected_to_swiftype?
response = @client.index_documents(resolved_engine, documents)
else
response = @client.index_documents(resolved_engine, {:documents => documents})
end

response = @client.index_documents(resolved_engine, { :documents => documents })
report(documents, response)
rescue => e
@logger.error("Failed to execute index operation. Retrying..", :exception => e.class, :reason => e.message,
:resolved_engine => resolved_engine, :backtrace => e.backtrace)
@logger.error('Failed to execute index operation.', :exception => e.class, :reason => e.message,
:resolved_engine => resolved_engine, :backtrace => e.backtrace, :retry => !@retry_disabled)

raise e if @retry_disabled

sleep(1)
retry
end
Expand All @@ -147,33 +128,17 @@ def index(batch)

def report(documents, response)
documents.each_with_index do |document, i|
if connected_to_swiftype?
errors = response[i]["errors"]
else
errors = response.body[i]["errors"]
end
errors = response.body[i]['errors']
if errors.empty?
@logger.trace? && @logger.trace("Document was indexed with no errors", :document => document)
@logger.trace? && @logger.trace('Document was indexed with no errors', :document => document)
else
@logger.warn("Document failed to index. Dropping..", :document => document, :errors => errors.to_a)
@logger.warn('Document failed to index. Dropping..', :document => document, :errors => errors.to_a)
end
end
end

def check_connection!
if connected_to_swiftype?
@client.get_engine(@engine)
else
res = @client.list_engines({:page_size => 1})
raise "Received HTTP error code #{res.status}" unless res.status == 200
end
end

def path_is_set?
original_params.key?("path")
end

def connected_to_swiftype?
@use_old_client
res = @client.list_engines({ 'page[size]': 1 })
raise "Received HTTP error code #{res.status}" unless res.status == 200
end
end

0 comments on commit 57f5fa4

Please sign in to comment.