Permalink
Browse files

Adds Elasticsearch 5.x support via storage type: elasticsearch-http (#…

…1403)

This allows use of Elasticsearch 5.x and its notable ingest pipeline
feature when using `zipkin-storage-elasticsearch-http`.

It is important to note that `zipkin-storage-elasticsearch` remains
pinned to ES 2.x libraries as they are compile incompatible with 5.x.
In other words, you must use http if you want to use ES 5 (for now).

Version detection is implemented in order to choose the coirrect index
template format for the major version number. The implementation of
such is string manip, as it was less work than making a new type.

This adds a new parameter `ES_PIPELINE` which allows you to manipulate
the json sent by Zipkin collector before it is indexed. This could be
used for many things including cleaning service names or adding ingest
timestamps.

Integration tests run version 2.x on CircleCI and 5.x on Travis
  • Loading branch information...
1 parent 43a0396 commit 1c39f51ede84a2895449a396a15eb460152e106f @adriancole adriancole committed on GitHub Nov 15, 2016
Showing with 461 additions and 83 deletions.
  1. +3 −3 .gitignore
  2. +2 −2 .travis.yml
  3. +2 −0 README.md
  4. +2 −2 pom.xml
  5. +4 −2 ...kin/autoconfigure/storage/elasticsearch/http/ZipkinElasticsearchHttpStorageAutoConfiguration.java
  6. +21 −6 ...sticsearch → storage/elasticsearch/http}/ZipkinElasticsearchHttpStorageAutoConfigurationTest.java
  7. +3 −1 zipkin-server/README.md
  8. +1 −0 zipkin-server/src/main/resources/zipkin-server.yml
  9. +26 −1 zipkin-storage/elasticsearch-http/README.md
  10. +16 −1 .../elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/CallbackListenableFuture.java
  11. +8 −4 ...n-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkIndexer.java
  12. +4 −2 ...orage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpBulkSpanIndexer.java
  13. +18 −0 zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpClient.java
  14. +10 −5 ...storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/HttpClientBuilder.java
  15. +17 −0 zipkin-storage/elasticsearch-http/src/test/java/zipkin/moshi/JsonReadersTest.java
  16. +6 −11 .../elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/ElasticsearchSpanConsumerTest.java
  17. +94 −0 ...sticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/CallbackListenableFutureTest.java
  18. +80 −1 ...in-storage/elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpClientTest.java
  19. +1 −1 ...lasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/HttpElasticsearchTestGraph.java
  20. +6 −2 ...storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/InternalElasticsearchClient.java
  21. +18 −5 zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/LazyClient.java
  22. +25 −3 zipkin-storage/elasticsearch/src/main/java/zipkin/storage/elasticsearch/NativeClient.java
  23. +2 −4 zipkin-storage/elasticsearch/src/main/resources/zipkin/storage/elasticsearch/zipkin_template.json
  24. +6 −23 ...orage/elasticsearch/src/test/java/zipkin/storage/elasticsearch/ElasticsearchSpanConsumerTest.java
  25. +1 −1 zipkin-storage/elasticsearch/src/test/java/zipkin/storage/elasticsearch/ElasticsearchTestGraph.java
  26. +49 −3 zipkin-storage/elasticsearch/src/test/java/zipkin/storage/elasticsearch/LazyClientTest.java
  27. +36 −0 zipkin-storage/elasticsearch/src/test/java/zipkin/storage/elasticsearch/NativeClientTest.java
View
@@ -14,8 +14,8 @@ _site/
*.iml
*.swp
# .travis.yml installs things
-apache-cassandra-*
-elasticsearch-2*
-kafka_*
+/apache-cassandra-*
+/elasticsearch-*
+/kafka_*
# temporary directory used by travis/publish.sh for building gh-pages
/javadoc-builddir
View
@@ -40,8 +40,8 @@ before_install:
- git config credential.helper "store --file=.git/credentials"
- echo "https://$GH_TOKEN:@github.com" > .git/credentials
- # Manually install elasticsearch until https://github.com/travis-ci/apt-source-whitelist/issues/190
- - curl -SL https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.1/elasticsearch-2.2.1.tar.gz | tar xz
+ # Manually install elasticsearch 5 (since 2.x is default in travis)
+ - curl -SL https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.0.0.tar.gz | tar xz
- elasticsearch-*/bin/elasticsearch -d > /dev/null
# Manually install and run zk+kafka as it isn't an available service
View
@@ -73,6 +73,8 @@ The [CassandraStorage](zipkin-storage/cassandra) component is tested against Cas
### Elasticsearch
The [ElasticsearchStorage](zipkin-storage/elasticsearch) component is tested against Elasticsearch 2.3. It stores spans as json and has been designed for larger scale. This store requires a [spark job](https://github.com/openzipkin/zipkin-dependencies) to aggregate dependency links.
+Note: The storage type `elasticsearch-http` supports both 2.x and 5.x versions of Elasticsearch.
+
## Running the server from source
The [zipkin server](zipkin-server)
receives spans via HTTP POST and respond to queries from its UI. It can also run collectors, such as Scribe or Kafka.
View
@@ -61,7 +61,7 @@
-->
<mariadb-java-client.version>1.4.6</mariadb-java-client.version>
<HikariCP.version>2.4.7</HikariCP.version>
- <elasticsearch.version>2.4.0</elasticsearch.version>
+ <elasticsearch.version>2.4.1</elasticsearch.version>
<slf4j.version>1.7.21</slf4j.version>
<logback.version>1.1.7</logback.version>
<!-- be careful to not eagerly update as we can break other storage or transports!
@@ -539,7 +539,7 @@
<exclude>LICENSE</exclude>
<exclude>**/*.md</exclude>
<exclude>apache-cassandra-*/**</exclude>
- <exclude>elasticsearch-2*/**</exclude>
+ <exclude>elasticsearch-*/**</exclude>
<exclude>kafka_*/**</exclude>
<exclude>src/test/resources/**</exclude>
<exclude>src/main/resources/**</exclude>
@@ -15,6 +15,7 @@
import okhttp3.OkHttpClient;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
@@ -34,8 +35,9 @@
@Bean
@ConditionalOnMissingBean
InternalElasticsearchClient.Builder clientBuilder(
- @Qualifier("zipkinElasticsearchHttp") OkHttpClient client) {
- return HttpClientBuilder.create(client);
+ @Qualifier("zipkinElasticsearchHttp") OkHttpClient client,
+ @Value("${zipkin.storage.elasticsearch.pipeline:}") String pipeline) {
+ return HttpClientBuilder.create(client).pipeline(pipeline.isEmpty() ? null : pipeline);
}
/** cheap check to see if we are likely to include urls */
@@ -11,7 +11,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-package zipkin.autoconfigure.storage.elasticsearch;
+package zipkin.storage.elasticsearch.http;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
@@ -27,7 +27,6 @@
import org.springframework.context.annotation.Configuration;
import zipkin.autoconfigure.storage.elasticsearch.http.ZipkinElasticsearchHttpStorageAutoConfiguration;
import zipkin.autoconfigure.storage.elasticsearch.http.ZipkinElasticsearchOkHttpAutoConfiguration;
-import zipkin.storage.elasticsearch.InternalElasticsearchClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.util.EnvironmentTestUtils.addEnvironment;
@@ -55,7 +54,7 @@ public void doesntProvideClientBuilder_whenStorageTypeNotElasticsearch() {
context.refresh();
thrown.expect(NoSuchBeanDefinitionException.class);
- context.getBean(InternalElasticsearchClient.Builder.class);
+ context.getBean(HttpClientBuilder.class);
}
@Test
@@ -70,7 +69,24 @@ public void providesClientBuilder_whenStorageTypeElasticsearchAndHostsAreUrls()
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
context.refresh();
- assertThat(context.getBean(InternalElasticsearchClient.Builder.class)).isNotNull();
+ assertThat(context.getBean(HttpClientBuilder.class)).isNotNull();
+ }
+
+ @Test
+ public void configuresPipeline() {
+ context = new AnnotationConfigApplicationContext();
+ addEnvironment(context,
+ "zipkin.storage.type:elasticsearch",
+ "zipkin.storage.elasticsearch.hosts:http://host1:9200",
+ "zipkin.storage.elasticsearch.pipeline:zipkin"
+ );
+ context.register(PropertyPlaceholderAutoConfiguration.class,
+ ZipkinElasticsearchOkHttpAutoConfiguration.class,
+ ZipkinElasticsearchHttpStorageAutoConfiguration.class);
+ context.refresh();
+
+ assertThat(context.getBean(HttpClientBuilder.class).pipeline)
+ .isEqualTo("zipkin");
}
@Test
@@ -82,7 +98,7 @@ public void doesntProvideClientBuilder_whenStorageTypeElasticsearchAndHostsNotUr
context.refresh();
thrown.expect(NoSuchBeanDefinitionException.class);
- context.getBean(InternalElasticsearchClient.Builder.class);
+ context.getBean(HttpClientBuilder.class);
}
@Configuration
@@ -111,7 +127,6 @@ public void usesInterceptorsQualifiedWith_zipkinElasticsearchHttp() {
);
context.register(PropertyPlaceholderAutoConfiguration.class,
ZipkinElasticsearchOkHttpAutoConfiguration.class,
- ZipkinElasticsearchHttpStorageAutoConfiguration.class,
InterceptorConfiguration.class);
context.refresh();
@@ -168,7 +168,9 @@ The following apply when `STORAGE_TYPE` is set to `elasticsearch`:
https://search-domain-xyzzy.us-west-2.es.amazonaws.com) then Zipkin will attempt to
use the default AWS credential provider (env variables, system properties, config
files, or ec2 profiles) to sign outbound requests to the cluster.
- * `ES_AWS_DOMAIN`: The name of the AWS-hosted elasticsearch domain to use. Supercedes any set
+ * `ES_PIPELINE`: Only valid when the destination is Elasticsearch 5.x. Indicates the ingest
+ pipeline used before spans are indexed. No default.
+ * `ES_AWS_DOMAIN`: The name of the AWS-hosted elasticsearch domain to use. Supercedes any set
`ES_HOSTS`. Triggers the same request signing behavior as with `ES_HOSTS`, but
requires the additional IAM permission to describe the given domain.
* `ES_AWS_REGION`: An optional override to the default region lookup to search for the domain
@@ -81,6 +81,7 @@ zipkin:
cluster: ${ES_CLUSTER:elasticsearch}
# host is left unset intentionally, to defer the decision
hosts: ${ES_HOSTS:}
+ pipeline: ${ES_PIPELINE:}
aws:
domain: ${ES_AWS_DOMAIN:}
region: ${ES_AWS_REGION:}
@@ -2,6 +2,31 @@
This is is a plugin to the Elasticsearch storage component, which uses
HTTP by way of [OkHttp 3](https://github.com/square/okttp) and
-[Moshi](https://github.com/square/moshi).
+[Moshi](https://github.com/square/moshi). This currently supports both
+2.x and 5.x version families.
See [storage-elasticsearch](../elasticsearch) for more details.
+
+## Customizing the ingest pipeline
+
+When using Elasticsearch 5.x, you can setup an [ingest pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/master/pipeline.html)
+to perform custom processing.
+
+Here's an example, which you'd setup prior to configuring Zipkin to use
+it via `zipkin.storage.elasticsearch.http.HttpClientBuilder.pipeline`
+
+
+```
+PUT _ingest/pipeline/zipkin
+{
+ "description" : "add collector_timestamp_millis",
+ "processors" : [
+ {
+ "set" : {
+ "field": "collector_timestamp_millis",
+ "value": "{{_ingest.timestamp}}"
+ }
+ }
+ ]
+}
+```
@@ -40,13 +40,28 @@
call.cancel();
}
- @Override public void onResponse(Call call, Response response) throws IOException {
+ /** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */
+ @Override public void onResponse(Call call, Response response) {
try (ResponseBody responseBody = response.body()) {
if (response.isSuccessful()) {
set(convert(responseBody));
} else {
setException(new IllegalStateException("response failed: " + response));
}
+ } catch (Throwable t) {
+ propagateIfFatal(t);
+ setException(t);
+ }
+ }
+
+ // Taken from RxJava, which was taken from scala
+ static void propagateIfFatal(Throwable t) {
+ if (t instanceof VirtualMachineError) {
+ throw (VirtualMachineError) t;
+ } else if (t instanceof ThreadDeath) {
+ throw (ThreadDeath) t;
+ } else if (t instanceof LinkageError) {
+ throw (LinkageError) t;
}
}
@@ -20,6 +20,7 @@
import java.util.LinkedHashSet;
import java.util.Set;
import okhttp3.Call;
+import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
@@ -72,11 +73,14 @@ void writeDocument(T object) throws IOException {
/** Creates a bulk request when there is more than one object to store */
public ListenableFuture<Void> execute() throws IOException { // public to allow interface retrofit
- Request.Builder request = new Request.Builder().url(client.baseUrl.resolve("/_bulk")).tag(tag);
- request.post(RequestBody.create(APPLICATION_JSON, body.readByteString()));
- Call post = client.http.newCall(request.build());
+ HttpUrl url = client.pipeline != null
+ ? client.baseUrl.newBuilder("_bulk").addQueryParameter("pipeline", client.pipeline).build()
+ : client.baseUrl.resolve("_bulk");
- return new CallbackListenableFuture<Void>(post) {
+ Request request = new Request.Builder().url(url).tag(tag)
+ .post(RequestBody.create(APPLICATION_JSON, body.readByteString())).build();
+
+ return new CallbackListenableFuture<Void>(client.http.newCall(request)) {
@Override Void convert(ResponseBody responseBody) throws IOException {
if (!indices.isEmpty()) {
client.flush(Joiner.on(',').join(indices));
@@ -27,17 +27,19 @@
super(delegate, spanType);
}
- @Override public void add(String index, Span span, Long timestampMillis) throws IOException {
+ @Override
+ public HttpBulkSpanIndexer add(String index, Span span, Long timestampMillis) throws IOException {
String id = null; // Allow ES to choose an ID
if (timestampMillis == null) {
super.add(index, span, id);
- return;
+ return this;
}
writeIndexMetadata(index, id);
body.write(toSpanBytes(span, timestampMillis));
body.writeByte('\n');
if (client.flushOnWrites) indices.add(index);
+ return this;
}
@Override byte[] toJsonBytes(Span span) {
@@ -59,11 +59,13 @@
final Lazy<List<String>> hosts;
final OkHttpClient client;
final boolean flushOnWrites;
+ final String pipeline;
Factory(HttpClientBuilder builder) {
this.hosts = builder.hosts;
this.client = builder.client;
this.flushOnWrites = builder.flushOnWrites;
+ this.pipeline = builder.pipeline;
}
@Override public InternalElasticsearchClient create(String allIndices) {
@@ -79,6 +81,7 @@
final OkHttpClient http;
final HttpUrl baseUrl;
final boolean flushOnWrites;
+ final String pipeline;
final String[] allIndices;
@@ -90,9 +93,24 @@
this.baseUrl = HttpUrl.parse(hosts.get(0));
this.http = f.client;
this.flushOnWrites = f.flushOnWrites;
+ this.pipeline = f.pipeline;
this.allIndices = new String[] {allIndices};
}
+ @Override protected String getVersion() throws IOException {
+ Request getNode = new Request.Builder().url(baseUrl).tag("get-node").build();
+
+ try (Response response = http.newCall(getNode).execute()) {
+ if (!response.isSuccessful()) {
+ throw new IllegalStateException(response.body().string());
+ }
+
+ JsonReader version = enterPath(JsonReader.of(response.body().source()), "version", "number");
+ if (version == null) throw new IllegalStateException(".version.number not in response");
+ return version.nextString();
+ }
+ }
+
/**
* This is a blocking call, used inside a lazy. That's because no writes should occur until the
* template is available.
@@ -19,12 +19,12 @@
import zipkin.internal.Lazy;
import zipkin.storage.elasticsearch.InternalElasticsearchClient;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static zipkin.internal.Util.checkNotNull;
public final class HttpClientBuilder extends InternalElasticsearchClient.Builder {
final OkHttpClient client;
Lazy<List<String>> hosts;
- boolean compressionEnabled = true;
+ String pipeline;
boolean flushOnWrites;
public static HttpClientBuilder create(OkHttpClient client) {
@@ -53,9 +53,14 @@ public static HttpClientBuilder create(OkHttpClient client) {
return this;
}
- /** Default true. true implies that spans will be gzipped before transport. */
- public HttpClientBuilder compressionEnabled(boolean compressionEnabled) {
- this.compressionEnabled = compressionEnabled;
+ /**
+ * Only valid when the destination is Elasticsearch 5.x. Indicates the ingest pipeline used before
+ * spans are indexed. No default.
+ *
+ * <p>See https://www.elastic.co/guide/en/elasticsearch/reference/master/pipeline.html
+ */
+ public HttpClientBuilder pipeline(String pipeline) {
+ this.pipeline = pipeline;
return this;
}
@@ -24,6 +24,23 @@
public class JsonReadersTest {
@Test
+ public void enterPath_nested() throws IOException {
+ assertThat(JsonReaders.enterPath(JsonReader.of(new Buffer().writeUtf8("{\n"
+ + " \"name\" : \"Kamal\",\n"
+ + " \"cluster_name\" : \"elasticsearch\",\n"
+ + " \"version\" : {\n"
+ + " \"number\" : \"2.4.0\",\n"
+ + " \"build_hash\" : \"ce9f0c7394dee074091dd1bc4e9469251181fc55\",\n"
+ + " \"build_timestamp\" : \"2016-08-29T09:14:17Z\",\n"
+ + " \"build_snapshot\" : false,\n"
+ + " \"lucene_version\" : \"5.5.2\"\n"
+ + " },\n"
+ + " \"tagline\" : \"You Know, for Search\"\n"
+ + "}")), "version", "number").nextString())
+ .isEqualTo("2.4.0");
+ }
+
+ @Test
public void enterPath_nullOnNoInput() throws IOException {
assertThat(JsonReaders.enterPath(JsonReader.of(new Buffer()), "message"))
.isNull();
Oops, something went wrong.

0 comments on commit 1c39f51

Please sign in to comment.