-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ability to add auth headers to Druid requests. #62
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,6 +109,7 @@ | |
import rx.subjects.PublishSubject; | ||
|
||
import java.io.IOException; | ||
import java.lang.reflect.Constructor; | ||
import java.time.Clock; | ||
import java.time.ZoneId; | ||
import java.util.Arrays; | ||
|
@@ -119,6 +120,7 @@ | |
import java.util.Set; | ||
import java.util.UUID; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
|
@@ -144,6 +146,8 @@ public abstract class AbstractBinderFactory implements BinderFactory { | |
private static final String METER_SPLITS_TOTAL_RATIO = "queries.meter.split_queries.total_ratio"; | ||
private static final String METER_SPLITS_RATIO = "queries.meter.split_queries.ratio"; | ||
|
||
private static final String DRUID_HEADER_SUPPLIER_CLASS = "druid_header_supplier_class"; | ||
|
||
// Two minutes in milliseconds | ||
public static final int HC_LAST_RUN_PERIOD_MILLIS_DEFAULT = 120 * 1000; | ||
public static final int LOADER_SCHEDULER_THREAD_POOL_SIZE_DEFAULT = 4; | ||
|
@@ -937,7 +941,33 @@ protected Class<? extends PhysicalTableResolver> getPhysicalTableResolver() { | |
* @return A DruidWebService | ||
*/ | ||
protected DruidWebService buildDruidWebService(DruidServiceConfig druidServiceConfig, ObjectMapper mapper) { | ||
return new AsyncDruidWebServiceImpl(druidServiceConfig, mapper); | ||
Supplier<Map<String, String>> supplier = buildDruidWebServiceHeaderSupplier(); | ||
return new AsyncDruidWebServiceImpl(druidServiceConfig, mapper, supplier); | ||
} | ||
|
||
/** | ||
* Build the Supplier for Druid data request headers. | ||
* | ||
* @return The Druid data request header Supplier. | ||
*/ | ||
protected Supplier<Map<String, String>> buildDruidWebServiceHeaderSupplier() { | ||
Supplier<Map<String, String>> supplier = HashMap::new; | ||
String customSupplierClassString = SYSTEM_CONFIG.getStringProperty(DRUID_HEADER_SUPPLIER_CLASS, null); | ||
if (customSupplierClassString != null && customSupplierClassString.equals("")) { | ||
try { | ||
Class<?> c = Class.forName(customSupplierClassString); | ||
Constructor<?> constructor = c.getConstructor(); | ||
supplier = (Supplier<Map<String, String>>) constructor.newInstance(); | ||
} catch (Exception e) { | ||
LOG.error( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we catch something more specific than 'Exception' here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We were matching something more specific in an earlier version of this PR and I suggested replacing that list of roughly 5 exceptions with just |
||
"Unable to load the Druid query header supplier, className: {}, exception: {}", | ||
customSupplierClassString, | ||
e | ||
); | ||
throw new IllegalStateException(e); | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should pull the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assumed we could just pull the other params from system_config in the supplier. I guess there could be more complicated use cases? |
||
return supplier; | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,7 +39,10 @@ | |
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.Supplier; | ||
|
||
import javax.ws.rs.core.Response.Status; | ||
|
||
|
@@ -61,17 +64,38 @@ public class AsyncDruidWebServiceImpl implements DruidWebService { | |
public static final String DRUID_WEIGHTED_QUERY_TIMER = DRUID_TIMER + "_W_"; | ||
public static final String DRUID_SEGMENT_METADATA_TIMER = DRUID_TIMER + "_S_0"; | ||
|
||
|
||
private final Supplier<Map<String, String>> headersToAppend; | ||
private final DruidServiceConfig serviceConfig; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dislike the name of the variable. If I understand the scope of this, these are mandatory headers appended (or possibly prepended, it looks like the latter based on digging into the netty code) to the client request. So possible 'requestAppendHeaders' or 'clientAppendHeaders'. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we're adding the headers (and it looks like added headers strictly add, don't replace), I think a name like |
||
|
||
/** | ||
* Friendly non-DI constructor useful for manual tests. | ||
* | ||
* @param serviceConfig Configuration for the Druid Service | ||
* @param mapper A shared jackson object mapper resource | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Javadoc needed for the new parameter There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed this in the constructor below. |
||
* @deprecated We now require a header supplier parameter. | ||
* Use {@link #AsyncDruidWebServiceImpl(DruidServiceConfig, ObjectMapper, Supplier)} | ||
*/ | ||
@Deprecated | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you could add the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For these deprecations, we should add an Applies to the other |
||
public AsyncDruidWebServiceImpl( | ||
DruidServiceConfig serviceConfig, | ||
ObjectMapper mapper | ||
) { | ||
this(serviceConfig, initializeWebClient(serviceConfig.getTimeout()), mapper, HashMap::new); | ||
} | ||
|
||
/** | ||
* Friendly non-DI constructor useful for manual tests. | ||
* | ||
* @param serviceConfig Configuration for the Druid Service | ||
* @param mapper A shared jackson object mapper resource | ||
* @param headersToAppend Supplier for map of headers for Druid requests | ||
*/ | ||
public AsyncDruidWebServiceImpl(DruidServiceConfig serviceConfig, ObjectMapper mapper) { | ||
this(serviceConfig, initializeWebClient(serviceConfig.getTimeout()), mapper); | ||
public AsyncDruidWebServiceImpl( | ||
DruidServiceConfig serviceConfig, | ||
ObjectMapper mapper, | ||
Supplier<Map<String, String>> headersToAppend | ||
) { | ||
this(serviceConfig, initializeWebClient(serviceConfig.getTimeout()), mapper, headersToAppend); | ||
} | ||
|
||
/** | ||
|
@@ -101,8 +125,32 @@ private static AsyncHttpClient initializeWebClient(int requestTimeout) { | |
* @param config the configuration for this druid service | ||
* @param asyncHttpClient the HTTP client | ||
* @param mapper A shared jackson object mapper resource | ||
* @deprecated We now require a header supplier parameter. | ||
* Use {@link #AsyncDruidWebServiceImpl(DruidServiceConfig, AsyncHttpClient, ObjectMapper, Supplier)} | ||
*/ | ||
@Deprecated | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
public AsyncDruidWebServiceImpl( | ||
DruidServiceConfig config, | ||
AsyncHttpClient asyncHttpClient, | ||
ObjectMapper mapper | ||
) { | ||
this(config, asyncHttpClient, mapper, HashMap::new); | ||
} | ||
|
||
/** | ||
* IOC constructor. | ||
* | ||
* @param config the configuration for this druid service | ||
* @param asyncHttpClient the HTTP client | ||
* @param mapper A shared jackson object mapper resource | ||
* @param headersToAppend Supplier for map of headers for Druid requests | ||
*/ | ||
public AsyncDruidWebServiceImpl(DruidServiceConfig config, AsyncHttpClient asyncHttpClient, ObjectMapper mapper) { | ||
public AsyncDruidWebServiceImpl( | ||
DruidServiceConfig config, | ||
AsyncHttpClient asyncHttpClient, | ||
ObjectMapper mapper, | ||
Supplier<Map<String, String>> headersToAppend | ||
) { | ||
this.serviceConfig = config; | ||
|
||
if (serviceConfig.getUrl() == null) { | ||
|
@@ -112,6 +160,7 @@ public AsyncDruidWebServiceImpl(DruidServiceConfig config, AsyncHttpClient async | |
} | ||
|
||
LOG.info("Configured with druid server config: {}", config.toString()); | ||
this.headersToAppend = headersToAppend; | ||
this.webClient = asyncHttpClient; | ||
this.writer = mapper.writer(); | ||
this.httpErrorMeter = REGISTRY.meter("druid.errors.http"); | ||
|
@@ -267,14 +316,18 @@ public void postDruidQuery( | |
timerName = DRUID_WEIGHTED_QUERY_TIMER + String.format(format, seqNum); | ||
} | ||
|
||
BoundRequestBuilder requestBuilder = webClient.preparePost(serviceConfig.getUrl()) | ||
.setBody(entityBody) | ||
.addHeader("Content-Type", "application/json"); | ||
|
||
headersToAppend.get().forEach(requestBuilder::addHeader); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does not appear that headers are maplike in that there can't be more than one of a given key value. Given this, you might be better served with an iterable here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While the |
||
LOG.debug("druid json request: {}", entityBody); | ||
sendRequest( | ||
success, | ||
error, | ||
failure, | ||
webClient.preparePost(serviceConfig.getUrl()) | ||
.setBody(entityBody) | ||
.addHeader("Content-Type", "application/json"), | ||
requestBuilder, | ||
timerName, | ||
outstanding | ||
); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
// Copyright 2016 Yahoo Inc. | ||
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms. | ||
package com.yahoo.bard.webservice.druid.client.impl | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Copyright needed |
||
|
||
import com.yahoo.bard.webservice.druid.client.DruidClientConfigHelper | ||
import com.yahoo.bard.webservice.druid.model.query.QueryContext | ||
import com.yahoo.bard.webservice.druid.model.query.WeightEvaluationQuery | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper | ||
|
||
import io.netty.handler.codec.http.HttpHeaders | ||
import spock.lang.Specification | ||
|
||
import java.util.function.Supplier | ||
|
||
class AsyncDruidWebServiceImplSpec extends Specification { | ||
def "Ensure that headersToAppend are added"() { | ||
setup: | ||
WeightEvaluationQuery weightEvaluationQuery = Mock(WeightEvaluationQuery) | ||
QueryContext queryContext = Mock(QueryContext) | ||
weightEvaluationQuery.getContext() >> { queryContext } | ||
queryContext.numberOfQueries() >> { 1 } | ||
queryContext.getSequenceNumber() >> { 1 } | ||
|
||
and: | ||
Map<String, String> expectedHeaders = new HashMap<>() | ||
expectedHeaders.put("k1", "v1") | ||
expectedHeaders.put("k2", "v2") | ||
Supplier<Map<String, String>> supplier = new Supplier<Map<String, String>>() { | ||
@Override | ||
Map<String, String> get() { | ||
return expectedHeaders | ||
} | ||
} | ||
AsyncDruidWebServiceImplWrapper webServiceImplWrapper = new AsyncDruidWebServiceImplWrapper( | ||
DruidClientConfigHelper.getNonUiServiceConfig(), | ||
new ObjectMapper(), | ||
supplier | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of this stuff from here up that's in the If needed, you can use |
||
|
||
when: | ||
webServiceImplWrapper.postDruidQuery( | ||
null, | ||
null, | ||
null, | ||
null, | ||
weightEvaluationQuery | ||
); | ||
|
||
then: | ||
HttpHeaders actualHeaders = webServiceImplWrapper.getHeaders() | ||
for (Map.Entry<String, String> header : expectedHeaders) { | ||
assert actualHeaders.get(header.getKey()) == header.getValue() | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
// Copyright 2016 Yahoo Inc. | ||
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms. | ||
package com.yahoo.bard.webservice.druid.client.impl; | ||
|
||
import com.yahoo.bard.webservice.druid.client.DruidServiceConfig; | ||
import com.yahoo.bard.webservice.druid.client.FailureCallback; | ||
import com.yahoo.bard.webservice.druid.client.HttpErrorCallback; | ||
import com.yahoo.bard.webservice.druid.client.SuccessCallback; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
import org.asynchttpclient.BoundRequestBuilder; | ||
import org.asynchttpclient.Request; | ||
|
||
import io.netty.handler.codec.http.HttpHeaders; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Used for testing AsyncDruidWebServiceImpl. | ||
*/ | ||
public class AsyncDruidWebServiceImplWrapper extends AsyncDruidWebServiceImpl { | ||
public Request request; | ||
|
||
/** | ||
* Constructor wrapper. | ||
* | ||
* @param serviceConfig Service config | ||
* @param mapper Mapper | ||
* @param headersToAppend Headers | ||
*/ | ||
public AsyncDruidWebServiceImplWrapper( | ||
DruidServiceConfig serviceConfig, | ||
ObjectMapper mapper, | ||
Supplier<Map<String, String>> headersToAppend | ||
) { | ||
super(serviceConfig, mapper, headersToAppend); | ||
} | ||
|
||
/** | ||
* Capture arguments to test for expected values. | ||
* | ||
* @param success callback for handling successful requests. | ||
* @param error callback for handling http errors. | ||
* @param failure callback for handling exception failures. | ||
* @param requestBuilder The bound request builder for the request to be sent. | ||
* @param timerName The name that distinguishes this request as part of a druid query or segment metadata request | ||
* @param outstanding The counter that keeps track of the outstanding (in flight) requests for the top level query | ||
*/ | ||
@Override | ||
protected void sendRequest( | ||
final SuccessCallback success, | ||
final HttpErrorCallback error, | ||
final FailureCallback failure, | ||
final BoundRequestBuilder requestBuilder, | ||
final String timerName, | ||
final AtomicLong outstanding | ||
) { | ||
this.request = requestBuilder.build(); | ||
} | ||
|
||
public HttpHeaders getHeaders() { | ||
return request.getHeaders(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michael-mclawhorn does this usage fit with how we usually use systemConfig?