Skip to content

Commit

Permalink
Merge pull request #155 from spring-cloud/issues_#124_asyncresttemplate
Browse files Browse the repository at this point in the history
[#124] Added support for async rest template
  • Loading branch information
marcingrzejszczak committed Feb 11, 2016
2 parents ee8d7a5 + 8d8fb39 commit a362aa7
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.sleuth.instrument.web.client;

import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.SpanAccessor;
import org.springframework.cloud.sleuth.event.ClientReceivedEvent;
import org.springframework.cloud.sleuth.event.ClientSentEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.http.HttpRequest;
import org.springframework.util.StringUtils;

/**
* Abstraction over classes that interact with Http requests. Allows you
* to enrich the request headers with trace related information.
*
* @author Marcin Grzejszczak
*/
abstract class AbstractTraceHttpRequestInterceptor
implements ApplicationEventPublisherAware {

private ApplicationEventPublisher publisher;
private final SpanAccessor accessor;

protected AbstractTraceHttpRequestInterceptor(SpanAccessor accessor) {
this.accessor = accessor;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}

private void enrichWithTraceHeaders(HttpRequest request, Span span) {
setIdHeader(request, Span.TRACE_ID_NAME, span.getTraceId());
setIdHeader(request, Span.SPAN_ID_NAME, span.getSpanId());
if (!span.isExportable()) {
setHeader(request, Span.NOT_SAMPLED_NAME, "true");
}
setHeader(request, Span.SPAN_NAME_NAME, span.getName().toString());
setIdHeader(request, Span.PARENT_ID_NAME, getParentId(span));
setHeader(request, Span.PROCESS_ID_NAME, span.getProcessId());
}

private Long getParentId(Span span) {
return !span.getParents().isEmpty() ? span.getParents().get(0) : null;
}

protected void doNotSampleThisSpan(HttpRequest request) {
setHeader(request, Span.NOT_SAMPLED_NAME, "true");
}

private void setHeader(HttpRequest request, String name, String value) {
if (StringUtils.hasText(value) && !request.getHeaders().containsKey(name) &&
this.accessor.isTracing()) {
request.getHeaders().add(name, value);
}
}

private void setIdHeader(HttpRequest request, String name, Long value) {
if (value != null) {
setHeader(request, name, Span.toHex(value));
}
}

/**
* Enriches the request with proper headers and publishes
* the client sent event
*/
protected void publishStartEvent(HttpRequest request) {
Span span = currentSpan();
enrichWithTraceHeaders(request, span);
publish(new ClientSentEvent(this, span));
}

/**
* Close the current span and emit the ClientReceivedEvent
*/
public void finish() {
if (!isTracing()) {
return;
}
publish(new ClientReceivedEvent(this, currentSpan()));
}

private void publish(ApplicationEvent event) {
if (this.publisher != null) {
this.publisher.publishEvent(event);
}
}

private Span currentSpan() {
return this.accessor.getCurrentSpan();
}

protected boolean isTracing() {
return this.accessor.isTracing();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.sleuth.instrument.web.client;

import java.io.IOException;
import java.net.URI;

import org.springframework.cloud.sleuth.SpanAccessor;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.AsyncClientHttpRequest;
import org.springframework.http.client.AsyncClientHttpRequestFactory;

/**
* Wrapper that adds trace related headers to the created AsyncClientHttpRequest
*
* @see org.springframework.web.client.RestTemplate
* @see SpanAccessor
*
* @author Marcin Grzejszczak
* @author Spencer Gibb
*/
public class TraceAsyncClientHttpRequestFactoryWrapper extends AbstractTraceHttpRequestInterceptor
implements AsyncClientHttpRequestFactory {

private final AsyncClientHttpRequestFactory delegate;

public TraceAsyncClientHttpRequestFactoryWrapper(SpanAccessor accessor,
AsyncClientHttpRequestFactory delegate) {
super(accessor);
this.delegate = delegate;
}

@Override
public AsyncClientHttpRequest createAsyncRequest(URI uri, HttpMethod httpMethod)
throws IOException {
AsyncClientHttpRequest request = this.delegate.createAsyncRequest(uri, httpMethod);
if (!isTracing()) {
doNotSampleThisSpan(request);
return request;
}
publishStartEvent(request);
return request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.sleuth.instrument.web.client;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import org.springframework.cloud.sleuth.Tracer;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

/**
* AsyncListenableTaskExecutor that wraps all Runnable / Callable tasks into
* their trace related representation
*/
public class TraceAsyncListenableTaskExecutor implements AsyncListenableTaskExecutor {

private final AsyncListenableTaskExecutor delegate;
private final Tracer tracer;

TraceAsyncListenableTaskExecutor(AsyncListenableTaskExecutor delegate,
Tracer tracer) {
this.delegate = delegate;
this.tracer = tracer;
}

@Override
public ListenableFuture<?> submitListenable(Runnable task) {
return this.delegate.submitListenable(this.tracer.wrap(task));
}

@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
return this.delegate.submitListenable(this.tracer.wrap(task));
}

@Override
public void execute(Runnable task, long startTimeout) {
this.delegate.execute(this.tracer.wrap(task), startTimeout);
}

@Override
public Future<?> submit(Runnable task) {
return this.delegate.submit(this.tracer.wrap(task));
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return this.delegate.submit(this.tracer.wrap(task));
}

@Override
public void execute(Runnable task) {
this.delegate.execute(this.tracer.wrap(task));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void close() {
this.delegate.close();
}
finally {
this.interceptor.close();
this.interceptor.finish();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2015 the original author or authors.
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,18 +17,11 @@

import java.io.IOException;

import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.SpanAccessor;
import org.springframework.cloud.sleuth.event.ClientReceivedEvent;
import org.springframework.cloud.sleuth.event.ClientSentEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.util.StringUtils;

/**
* Interceptor that verifies whether the trance and span id has been set on the request
Expand All @@ -37,76 +30,26 @@
* @see org.springframework.web.client.RestTemplate
* @see SpanAccessor
*
* @author Marcin Grzejszczak, 4financeIT
* @author Marcin Grzejszczak
* @author Spencer Gibb
*/
public class TraceRestTemplateInterceptor
implements ClientHttpRequestInterceptor, ApplicationEventPublisherAware {

private ApplicationEventPublisher publisher;

private SpanAccessor accessor;
public class TraceRestTemplateInterceptor extends AbstractTraceHttpRequestInterceptor
implements ClientHttpRequestInterceptor {

public TraceRestTemplateInterceptor(SpanAccessor accessor) {
this.accessor = accessor;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
super(accessor);
}

@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body,
ClientHttpRequestExecution execution) throws IOException {
Span span = getCurrentSpan();
if (span == null) {
setHeader(request, Span.NOT_SAMPLED_NAME, "true");
if (!isTracing()) {
doNotSampleThisSpan(request);
return execution.execute(request, body);
}
setHeader(request, Span.TRACE_ID_NAME, span.getTraceId());
setHeader(request, Span.SPAN_ID_NAME, span.getSpanId());
if (!span.isExportable()) {
setHeader(request, Span.NOT_SAMPLED_NAME, "true");
}
setHeader(request, Span.SPAN_NAME_NAME, span.getName().toString());
setHeader(request, Span.PARENT_ID_NAME, getParentId(span));
setHeader(request, Span.PROCESS_ID_NAME, span.getProcessId());
publish(new ClientSentEvent(this, span));
publishStartEvent(request);
return new TraceHttpResponse(this, execution.execute(request, body));
}

public void close() {
if (getCurrentSpan() == null) {
return;
}
publish(new ClientReceivedEvent(this, getCurrentSpan()));
}

private void publish(ApplicationEvent event) {
if (this.publisher != null) {
this.publisher.publishEvent(event);
}
}

private Long getParentId(Span span) {
return !span.getParents().isEmpty() ? span.getParents().get(0) : null;
}

public void setHeader(HttpRequest request, String name, String value) {
if (StringUtils.hasText(value) && !request.getHeaders().containsKey(name) && this.accessor.isTracing()) {
request.getHeaders().add(name, value);
}
}

public void setHeader(HttpRequest request, String name, Long value) {
if (value != null) {
setHeader(request, name, Span.toHex(value));
}
}

private Span getCurrentSpan() {
return this.accessor.getCurrentSpan();
}

}
Loading

0 comments on commit a362aa7

Please sign in to comment.