Skip to content

Commit

Permalink
Support for Humio with Elasticsearch Bulk API
Browse files Browse the repository at this point in the history
  • Loading branch information
norrs committed Nov 18, 2021
1 parent ad85f73 commit 7e836f7
Show file tree
Hide file tree
Showing 6 changed files with 407 additions and 24 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ LOGEVENTS_LOGGER_COM_EXAMPLE=DEBUG,TRACE@mdc:user=superuser|admin|tester
* [Display logs on a web dashboard](https://jhannes.github.io/logevents/apidocs/org/logevents/observers/WebLogEventObserver.html)
* [Elasticsearch](https://jhannes.github.io/logevents/apidocs/org/logevents/observers/ElasticsearchLogEventObserver.html)
. Logging directly to Elastic search Index API avoids edge cases when writing and parsing log files
* [Humio](https://jhannes.github.io/logevents/apidocs/org/logevents/observers/HumioLogEventObserver.html)
. Logging directly to Humio via their [Elastic search Bulk API](https://library.humio.com/stable/docs/ingesting-data/log-shippers/other-log-shippers/#elasticsearch-bulk-api).
* [Azure Application Insights](https://jhannes.github.io/logevents/apidocs/org/logevents/extend/azure/ApplicationInsightsLogEventObserver.html) (
requires optional com.microsoft.azure:applicationinsights-core dependency)
* [JMX integration](https://jhannes.github.io/logevents/apidocs/org/logevents/jmx/LogEventsMBeanFactory.html) to view
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.logevents.observers;

import static org.logevents.util.NetUtils.NO_AUTHORIZATION_HEADER;

import org.logevents.LogEvent;
import org.logevents.config.Configuration;
import org.logevents.formatting.JsonLogEventFormatter;
Expand All @@ -12,6 +14,8 @@

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -27,37 +31,74 @@
* <pre>
* observer.elastic=ElasticSearchLogEventObserver
* observer.elastic.elasticsearchUrl=http://localhost:9200
* observer.elastic.elasticsearchUrlPath=_bulk
* observer.elastic.elasticsearchAuthorizationHeader=repositoryId injectApiToken
* observer.elastic.index=my-test-index
* observer.elastic.idleThreshold=PT2S
* observer.elastic.cooldownTime=PT1S
* observer.elastic.maximumWaitTime=PT30S
* observer.elastic.suppressMarkers=PERSONAL_DATA
* observer.elastic.formatter.excludedMdcKeys=secret
* </pre>
*
* <h3>Elasticsearch configuration</h3>
*
* <ul>
* <li><code>elasticsearchUrl</code> should point to where the elasticsearch API lives. It should contain an URI scheme and authority.</li>
* <li><code>elasticsearchUrlPath</code> should point to the http path where the Elasticsearch Bulk API lives.</li>
* </ul>
*
* <h4>Authorization</h4>
*
* The configurable <code>elasitcsearchAuthorizationHeader</code> is the value the client will include as
* Authorization header
* when communicating with <code>elasticsearchUrl</code>. It is not to be confused by Basic authentication. If you
* need basic authentication you need to remember to provide its configuration value as '<code>Basic
* base64encodedValueHere</code>'.
*
* @see <a href="https://datatracker.ietf.org/doc/html/rfc3986#section-3">RFC3986 #section3 about URI syntax</a>
*/
public class ElasticsearchLogEventObserver extends AbstractBatchingLogEventObserver {

private static final String DEFAULT_ELASTICSEARCH_BULK_API_PATH = "_bulk";
private static final String APPLICATION_X_NDJSON = "application/x-ndjson";
private final URL elasticsearchUrl;
private final String elasticsearchUrlPath;
private final String elasticsearchAuthorizationHeaderValue;
private final String index;
private final JsonLogEventFormatter formatter;

private Proxy proxy = Proxy.NO_PROXY;

public ElasticsearchLogEventObserver(Map<String, String> properties, String prefix) {
this(new Configuration(properties, prefix));
}

public ElasticsearchLogEventObserver(Configuration configuration) {
this.elasticsearchUrl = configuration.getUrl("elasticsearchUrl");
this.elasticsearchUrlPath = configuration.optionalString("elasticsearchUrlPath").orElse(getDefaultPath());
this.elasticsearchAuthorizationHeaderValue = configuration.optionalString("elasticsearchAuthorizationHeader").orElse(NO_AUTHORIZATION_HEADER);
this.index = configuration.getString("index");
this.formatter = configuration.createInstanceWithDefault("formatter", JsonLogEventFormatter.class);
this.configureBatching(configuration);
this.configureFilter(configuration, Level.TRACE);
this.configureMarkers(configuration);
this.configureProxy(configuration);
configuration.checkForUnknownFields();
}

public ElasticsearchLogEventObserver(URL elasticsearchUrl, String index) {
this.elasticsearchUrl = elasticsearchUrl;
this.index = index;
this.formatter = new JsonLogEventFormatter();
protected String getDefaultPath() {
return DEFAULT_ELASTICSEARCH_BULK_API_PATH;
}


public void configureProxy(Configuration configuration) {
configuration.optionalString("proxy").ifPresent(proxyHost -> {
int colonPos = proxyHost.lastIndexOf(':');
String hostname = colonPos != -1 ? proxyHost.substring(0, colonPos) : proxyHost;
int proxyPort = colonPos != -1 ? Integer.parseInt(proxyHost.substring(colonPos+1)) : 80;
this.proxy = new Proxy(Proxy.Type.HTTP, InetSocketAddress.createUnresolved(hostname, proxyPort));
});
}

@Override
Expand Down Expand Up @@ -95,12 +136,18 @@ List<String> indexDocuments(Iterable<LogEvent> logEvents) throws IOException {
}
jsons.add("");

URL url = new URL(this.elasticsearchUrl, "_bulk");
HttpURLConnection connection = NetUtils.post(url,
String.join("\n", jsons), "application/x-ndjson");
URL url = new URL(elasticsearchUrl, elasticsearchUrlPath);
HttpURLConnection connection = NetUtils.post(
url,
String.join("\n", jsons),
APPLICATION_X_NDJSON,
proxy,
elasticsearchAuthorizationHeaderValue);

Map<String, Object> response = JsonParser.parseObject(connection);
return parseBulkApiResponse(JsonParser.parseObject(connection));
}

protected List<String> parseBulkApiResponse(Map<String, Object> response) throws IOException {
List<Map<String, Object>> items = JsonUtil.getObjectList(response, "items");
return items.stream()
.map(o -> JsonUtil.getObject(o, "index"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.logevents.observers;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.logevents.util.JsonUtil;

/**
* Publishes asynchronously to Humio
* <a href="https://library.humio.com/stable/docs/ingesting-data/log-shippers/other-log-shippers/#elasticsearch-bulk-api">Elasticsearch Bulk API</a>
*
* <h3>Sample configuration</h3>
* <pre>
* observer.humio=HumioLogEventObserver
* observer.humio.elasticsearchUrl=http://localhost:9200
* observer.humio.elasticsearchUrlPath=api/v1/ingest/elastic-bulk
* observer.humio.elasticsearchAuthorizationHeader=repositoryId injectApiToken
* observer.humio.index=my-test-index
* observer.humio.idleThreshold=PT2S
* observer.humio.cooldownTime=PT1S
* observer.humio.maximumWaitTime=PT30S
* observer.humio.suppressMarkers=PERSONAL_DATA
* observer.humio.formatter.excludedMdcKeys=secret
* </pre>
*
* <h3>Elasticsearch configuration</h3>
*
* <ul>
* <li><code>elasticsearchUrl</code> should point to where the elasticsearch API lives. It should contain an URI scheme and authority.</li>
* <li><code>elasticsearchUrlPath</code> should point to the http path where the Elasticsearch Bulk API lives.</li>
* </ul>
*
* <h4>Authorization</h4>
*
* The configurable <code>elasitcsearchAuthorizationHeader</code> is the value the client will include as
* Authorization header
* when communicating with <code>elasticsearchUrl</code>. It is not to be confused by Basic authentication. If you
* need basic authentication you need to remember to provide its configuration value as '<code>Basic
* base64encodedValueHere</code>'.
*
* @see <a href="https://datatracker.ietf.org/doc/html/rfc3986#section-3">RFC3986 #section3 about URI syntax</a>
*/
public class HumioLogEventObserver extends ElasticsearchLogEventObserver {

private static final String DEFAULT_HUMIO_BULK_API_PATH = "api/v1/ingest/elastic-bulk";
private static final List<String> EMPTY_LIST_SINCE_HUMIO_API_HAS_NO_SANE_BULK_API_RESPONSE_CONTAINING_DOCUMENT_IDS_FROM_INSERT =
Collections.emptyList();

public HumioLogEventObserver(Map<String, String> properties, String prefix) {
super(properties, prefix);
}

@Override
protected String getDefaultPath() {
return DEFAULT_HUMIO_BULK_API_PATH;
}

@Override
protected List<String> parseBulkApiResponse(Map<String, Object> response) throws IOException {
boolean isAnyApiErrors = Boolean.parseBoolean(String.valueOf(JsonUtil.getField(response, "errors")));
if (isAnyApiErrors) {
List<Map<String, Object>> items = JsonUtil.getObjectList(response, "items");
long numberOfFailedMessages = items.stream()
.filter(o -> !String.valueOf(JsonUtil.getField(JsonUtil.getObject(o, "create"), "status")).equals("200"))
.count();

throw new IOException("Failed sending "+ numberOfFailedMessages + " out of " + items.size() + " entries");
}
return EMPTY_LIST_SINCE_HUMIO_API_HAS_NO_SANE_BULK_API_RESPONSE_CONTAINING_DOCUMENT_IDS_FROM_INSERT;
}
}
15 changes: 12 additions & 3 deletions logevents/src/main/java/org/logevents/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,33 @@
import java.util.stream.Collectors;

public class NetUtils {
public static final String NO_AUTHORIZATION_HEADER = null;


public static HttpURLConnection post(URL url, String contentBody, String contentType) throws IOException {
return post(url, contentBody, contentType, Proxy.NO_PROXY);
return post(url, contentBody, contentType, Proxy.NO_PROXY, NO_AUTHORIZATION_HEADER);
}

public static HttpURLConnection post(URL url, String contentBody, String contentType, Proxy proxy) throws IOException {
public static HttpURLConnection post(URL url, String contentBody, String contentType, Proxy proxy, String authorizationHeaderValue) throws IOException {
HttpURLConnection connection = (HttpURLConnection) url.openConnection(proxy);
connection.setRequestMethod("POST");
connection.setDoInput(true);
connection.setDoOutput(true);
connection.setRequestProperty("Content-Type", contentType);
if (authorizationHeaderValue != null && !authorizationHeaderValue.trim().isEmpty()) {
connection.setRequestProperty("Authorization", authorizationHeaderValue);
}
connection.getOutputStream().write(contentBody.getBytes());
connection.getOutputStream().flush();
return connection;
}

public static String postJson(URL url, String json, Proxy proxy) throws IOException {
HttpURLConnection connection = post(url, json, "application/json", proxy);
return postJson(url, json, proxy, NO_AUTHORIZATION_HEADER);
}

public static String postJson(URL url, String json, Proxy proxy, String authorizationHeaderValue) throws IOException {
HttpURLConnection connection = post(url, json, "application/json", proxy, authorizationHeaderValue);

int statusCode = connection.getResponseCode();
if (statusCode >= 400) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.logevents.observers;

import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -19,7 +17,6 @@
import java.io.IOException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Duration;
import java.time.ZonedDateTime;
Expand All @@ -29,20 +26,18 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class ElasticsearchLogEventObserverTest {

private ElasticsearchLogEventObserver observer = new ElasticsearchLogEventObserver(toURL("http://localhost:9200"), "logevents-unit-test");
private ElasticsearchLogEventObserver observer = new ElasticsearchLogEventObserver(defaultConfigurationMap(), "observer.elasticsearch");

private URL toURL(String s) {
try {
return new URL(s);
} catch (MalformedURLException e) {
throw ExceptionUtil.softenException(e);
}
private Map<String, String> defaultConfigurationMap() {
Map<String, String> config = new HashMap<>();
config.put("observer.elasticsearch.elasticsearchUrl", "http://localhost:9200");
config.put("observer.elasticsearch.index", "logevents-unit-test");
return config;
}

@Test
Expand Down Expand Up @@ -82,8 +77,8 @@ public void shouldWriteExceptions() {

assertEquals(payload.get("exception.class"), event.getThrowable().getClass().getName());
assertEquals(payload.get("exception.message"), event.getThrowable().getMessage());
MatcherAssert.assertThat(payload.get("exception.stackTrace").toString(),
containsString("at org.logeventsdemo.internal.MyClassName.internalMethod(MyClassName.java:311)"));
assertContains("at org.logeventsdemo.internal.MyClassName.internalMethod(MyClassName.java:311)",
payload.get("exception.stackTrace").toString());
}

@Test
Expand Down Expand Up @@ -148,4 +143,9 @@ void verifyElasticsearchConnection() throws IOException {
}
}

private void assertContains(String expected, String actual) {
assertTrue("Expected <" + actual + "> to contain <" + expected + ">",
actual.contains(expected));
}

}

0 comments on commit 7e836f7

Please sign in to comment.