diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/common/EndpointHandler.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/common/EndpointHandler.java index 39dcd5ea4..0c8c4e288 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/common/EndpointHandler.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/common/EndpointHandler.java @@ -1,7 +1,7 @@ package io.odpf.dagger.core.processors.common; import io.odpf.dagger.core.processors.ColumnNameManager; -import io.odpf.dagger.core.processors.types.SourceConfig; +import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.types.Row; @@ -29,7 +29,6 @@ */ public class EndpointHandler { private static final Logger LOGGER = LoggerFactory.getLogger(EndpointHandler.class.getName()); - private SourceConfig sourceConfig; private MeterStatsManager meterStatsManager; private ErrorReporter errorReporter; private String[] inputProtoClasses; @@ -48,13 +47,11 @@ public class EndpointHandler { * @param columnNameManager the column name manager * @param descriptorManager the descriptor manager */ - public EndpointHandler(SourceConfig sourceConfig, - MeterStatsManager meterStatsManager, + public EndpointHandler(MeterStatsManager meterStatsManager, ErrorReporter errorReporter, String[] inputProtoClasses, ColumnNameManager columnNameManager, DescriptorManager descriptorManager) { - this.sourceConfig = sourceConfig; this.meterStatsManager = meterStatsManager; this.errorReporter = errorReporter; this.inputProtoClasses = inputProtoClasses; @@ -63,19 +60,20 @@ public EndpointHandler(SourceConfig sourceConfig, } /** - * Get endpoint or query variables values. + * Get external post processor variables values. * * @param rowManager the row manager + * @param variableType the variable type + * @parm variables the variable list * @param resultFuture the result future * @return the array object */ - public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultFuture resultFuture) { - String queryVariables = sourceConfig.getVariables(); - if (StringUtils.isEmpty(queryVariables)) { + public Object[] getVariablesValue(RowManager rowManager, ExternalPostProcessorVariableType variableType, String variables, ResultFuture resultFuture) { + if (StringUtils.isEmpty(variables)) { return new Object[0]; } - String[] requiredInputColumns = queryVariables.split(","); + String[] requiredInputColumns = variables.split(","); ArrayList inputColumnValues = new ArrayList<>(); if (descriptorMap == null) { descriptorMap = createDescriptorMap(requiredInputColumns, inputProtoClasses, resultFuture); @@ -84,7 +82,7 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF for (String inputColumnName : requiredInputColumns) { int inputColumnIndex = columnNameManager.getInputIndex(inputColumnName); if (inputColumnIndex == -1) { - throw new InvalidConfigurationException(String.format("Column '%s' not found as configured in the endpoint/query variable", inputColumnName)); + throw new InvalidConfigurationException(String.format("Column '%s' not found as configured in the '%s' variable", inputColumnName, variableType)); } Descriptors.FieldDescriptor fieldDescriptor = descriptorMap.get(inputColumnName); @@ -105,11 +103,12 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF * * @param resultFuture the result future * @param rowManager the row manager - * @param endpointVariablesValues the endpoint variables values + * @param variables the request/header variables + * @param variablesValue the variables value * @return the boolean */ - public boolean isQueryInvalid(ResultFuture resultFuture, RowManager rowManager, Object[] endpointVariablesValues) { - if (!StringUtils.isEmpty(sourceConfig.getVariables()) && (Arrays.asList(endpointVariablesValues).isEmpty() || Arrays.stream(endpointVariablesValues).allMatch(""::equals))) { + public boolean isQueryInvalid(ResultFuture resultFuture, RowManager rowManager, String variables, Object[] variablesValue) { + if (!StringUtils.isEmpty(variables) && (Arrays.asList(variablesValue).isEmpty() || Arrays.stream(variablesValue).allMatch(""::equals))) { LOGGER.warn("Could not populate any request variable. Skipping external calls"); meterStatsManager.markEvent(ExternalSourceAspects.EMPTY_INPUT); resultFuture.complete(singleton(rowManager.getAll())); diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/AsyncConnector.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/AsyncConnector.java index eb5622e79..50144d98d 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/AsyncConnector.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/AsyncConnector.java @@ -161,7 +161,7 @@ public void open(Configuration configuration) throws Exception { meterStatsManager = new MeterStatsManager(getRuntimeContext().getMetricGroup(), true); } if (endpointHandler == null) { - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, schemaConfig.getInputProtoClasses(), schemaConfig.getColumnNameManager(), descriptorManager); } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/es/EsAsyncConnector.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/es/EsAsyncConnector.java index 93e76a4bb..97c3a3c3f 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/es/EsAsyncConnector.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/es/EsAsyncConnector.java @@ -9,8 +9,8 @@ import io.odpf.dagger.core.processors.external.AsyncConnector; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.types.Row; - import io.odpf.dagger.core.utils.Constants; +import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -79,8 +79,8 @@ protected void createClient() { protected void process(Row input, ResultFuture resultFuture) { RowManager rowManager = new RowManager(input); Object[] endpointVariablesValues = getEndpointHandler() - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); - if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, endpointVariablesValues)) { + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.ENDPOINT_VARIABLE, esSourceConfig.getVariables(), resultFuture); + if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, esSourceConfig.getVariables(), endpointVariablesValues)) { return; } String esEndpoint = String.format(esSourceConfig.getPattern(), endpointVariablesValues); diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/grpc/GrpcAsyncConnector.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/grpc/GrpcAsyncConnector.java index 173be65f2..db28e5b23 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/grpc/GrpcAsyncConnector.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/grpc/GrpcAsyncConnector.java @@ -20,6 +20,7 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import io.odpf.dagger.core.utils.Constants; +import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,8 +91,8 @@ protected void process(Row input, ResultFuture resultFuture) throws Excepti RowManager rowManager = new RowManager(input); Object[] requestVariablesValues = getEndpointHandler() - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); - if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) { + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, grpcSourceConfig.getVariables(), resultFuture); + if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, grpcSourceConfig.getVariables(), requestVariablesValues)) { return; } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpAsyncConnector.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpAsyncConnector.java index fcb401904..5e6b786a3 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpAsyncConnector.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpAsyncConnector.java @@ -15,6 +15,7 @@ import org.apache.flink.types.Row; import io.odpf.dagger.core.utils.Constants; +import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.BoundRequestBuilder; import org.slf4j.Logger; @@ -94,12 +95,14 @@ protected void process(Row input, ResultFuture resultFuture) { RowManager rowManager = new RowManager(input); Object[] requestVariablesValues = getEndpointHandler() - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); - if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) { + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, httpSourceConfig.getRequestVariables(), resultFuture); + Object[] dynamicHeaderVariablesValues = getEndpointHandler() + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.HEADER_VARIABLES, httpSourceConfig.getHeaderVariables(), resultFuture); + if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getRequestVariables(), requestVariablesValues) || getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getHeaderVariables(), dynamicHeaderVariablesValues)) { return; } - BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues); + BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getMeterStatsManager(), rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry()); httpResponseHandler.startTimer(); diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpSourceConfig.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpSourceConfig.java index 60ac14d82..d30efc2ad 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpSourceConfig.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpSourceConfig.java @@ -20,6 +20,8 @@ public class HttpSourceConfig implements Serializable, SourceConfig { private String verb; private String requestPattern; private String requestVariables; + private String headerPattern; + private String headerVariables; private String streamTimeout; private String connectTimeout; private boolean failOnErrors; @@ -40,21 +42,25 @@ public class HttpSourceConfig implements Serializable, SourceConfig { * @param verb the verb * @param requestPattern the request pattern * @param requestVariables the request variables + * @param headerPattern the dynamic header pattern + * @param headerVariables the header variables * @param streamTimeout the stream timeout * @param connectTimeout the connect timeout * @param failOnErrors the fail on errors * @param type the type * @param capacity the capacity - * @param headers the headers + * @param headers the static headers * @param outputMapping the output mapping * @param metricId the metric id * @param retainResponseType the retain response type */ - public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map headers, Map outputMapping, String metricId, boolean retainResponseType) { + public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map headers, Map outputMapping, String metricId, boolean retainResponseType) { this.endpoint = endpoint; this.verb = verb; this.requestPattern = requestPattern; this.requestVariables = requestVariables; + this.headerPattern = headerPattern; + this.headerVariables = headerVariables; this.streamTimeout = streamTimeout; this.connectTimeout = connectTimeout; this.failOnErrors = failOnErrors; @@ -102,6 +108,24 @@ public String getRequestVariables() { return requestVariables; } + /** + * Gets header pattern. + * + * @return the header pattern + */ + public String getHeaderPattern() { + return headerPattern; + } + + /** + * Gets header Variable. + * + * @return the header Variable + */ + public String getHeaderVariables() { + return headerVariables; + } + @Override public String getPattern() { return requestPattern; @@ -208,11 +232,11 @@ public boolean equals(Object o) { return false; } HttpSourceConfig that = (HttpSourceConfig) o; - return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId); + return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId); } @Override public int hashCode() { - return Objects.hash(endpoint, verb, requestPattern, requestVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType); + return Objects.hash(endpoint, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType); } } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpGetRequestHandler.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpGetRequestHandler.java index 58c033d78..c4e99d3ff 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpGetRequestHandler.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpGetRequestHandler.java @@ -1,9 +1,14 @@ package io.odpf.dagger.core.processors.external.http.request; +import com.google.gson.Gson; +import io.netty.util.internal.StringUtil; import io.odpf.dagger.core.processors.external.http.HttpSourceConfig; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.BoundRequestBuilder; +import java.util.HashMap; +import java.util.Map; + /** * The Http get request handler. */ @@ -11,6 +16,7 @@ public class HttpGetRequestHandler implements HttpRequestHandler { private HttpSourceConfig httpSourceConfig; private AsyncHttpClient httpClient; private Object[] requestVariablesValues; + private Object[] dynamicHeaderVariablesValues; /** * Instantiates a new Http get request handler. @@ -19,10 +25,11 @@ public class HttpGetRequestHandler implements HttpRequestHandler { * @param httpClient the http client * @param requestVariablesValues the request variables values */ - public HttpGetRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) { + public HttpGetRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues) { this.httpSourceConfig = httpSourceConfig; this.httpClient = httpClient; this.requestVariablesValues = requestVariablesValues; + this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues; } @Override @@ -31,7 +38,12 @@ public BoundRequestBuilder create() { String endpoint = httpSourceConfig.getEndpoint(); String requestEndpoint = endpoint + endpointPath; BoundRequestBuilder getRequest = httpClient.prepareGet(requestEndpoint); - return addHeaders(getRequest, httpSourceConfig.getHeaders()); + Map headers = httpSourceConfig.getHeaders(); + if (!StringUtil.isNullOrEmpty(httpSourceConfig.getHeaderPattern())) { + String dynamicHeader = String.format(httpSourceConfig.getHeaderPattern(), dynamicHeaderVariablesValues); + headers.putAll(new Gson().fromJson(dynamicHeader, HashMap.class)); + } + return addHeaders(getRequest, headers); } @Override diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpPostRequestHandler.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpPostRequestHandler.java index 5dc7b9540..a51cefbd8 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpPostRequestHandler.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpPostRequestHandler.java @@ -1,9 +1,14 @@ package io.odpf.dagger.core.processors.external.http.request; +import com.google.gson.Gson; +import io.netty.util.internal.StringUtil; import io.odpf.dagger.core.processors.external.http.HttpSourceConfig; import org.asynchttpclient.AsyncHttpClient; import org.asynchttpclient.BoundRequestBuilder; +import java.util.HashMap; +import java.util.Map; + /** * The Http post request handler. */ @@ -11,7 +16,7 @@ public class HttpPostRequestHandler implements HttpRequestHandler { private HttpSourceConfig httpSourceConfig; private AsyncHttpClient httpClient; private Object[] requestVariablesValues; - + private Object[] dynamicHeaderVariablesValues; /** * Instantiates a new Http post request handler. * @@ -19,10 +24,11 @@ public class HttpPostRequestHandler implements HttpRequestHandler { * @param httpClient the http client * @param requestVariablesValues the request variables values */ - public HttpPostRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) { + public HttpPostRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues) { this.httpSourceConfig = httpSourceConfig; this.httpClient = httpClient; this.requestVariablesValues = requestVariablesValues; + this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues; } @Override @@ -32,7 +38,12 @@ public BoundRequestBuilder create() { BoundRequestBuilder postRequest = httpClient .preparePost(endpoint) .setBody(requestBody); - return addHeaders(postRequest, httpSourceConfig.getHeaders()); + Map headers = httpSourceConfig.getHeaders(); + if (!StringUtil.isNullOrEmpty(httpSourceConfig.getHeaderPattern())) { + String dynamicHeader = String.format(httpSourceConfig.getHeaderPattern(), dynamicHeaderVariablesValues); + headers.putAll(new Gson().fromJson(dynamicHeader, HashMap.class)); + } + return addHeaders(postRequest, headers); } @Override diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpRequestFactory.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpRequestFactory.java index d45023a4c..1e61129ea 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpRequestFactory.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/request/HttpRequestFactory.java @@ -19,11 +19,11 @@ public class HttpRequestFactory { * @param requestVariablesValues the request variables values * @return the bound request builder */ - public static BoundRequestBuilder createRequest(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) { + public static BoundRequestBuilder createRequest(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] headerVariablesValues) { ArrayList httpRequestHandlers = new ArrayList<>(); - httpRequestHandlers.add(new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues)); - httpRequestHandlers.add(new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues)); + httpRequestHandlers.add(new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues)); + httpRequestHandlers.add(new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues)); HttpRequestHandler httpRequestHandler = httpRequestHandlers .stream() diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/pg/PgAsyncConnector.java b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/pg/PgAsyncConnector.java index 46595164e..c1b83d502 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/pg/PgAsyncConnector.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/processors/external/pg/PgAsyncConnector.java @@ -10,6 +10,7 @@ import io.odpf.dagger.core.processors.external.AsyncConnector; import io.odpf.dagger.core.metrics.aspects.ExternalSourceAspects; import io.odpf.dagger.core.utils.Constants; +import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.pgclient.PgConnectOptions; @@ -89,8 +90,8 @@ public void process(Row input, ResultFuture resultFuture) { RowManager rowManager = new RowManager(input); Object[] queryVariablesValues = getEndpointHandler() - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); - if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, queryVariablesValues)) { + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.QUERY_VARIABLES, pgSourceConfig.getVariables(), resultFuture); + if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, pgSourceConfig.getVariables(), queryVariablesValues)) { return; } diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/utils/Constants.java b/dagger-core/src/main/java/io/odpf/dagger/core/utils/Constants.java index 43dba0945..c4d84a481 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/utils/Constants.java @@ -147,4 +147,5 @@ public class Constants { public static final long MAX_EVENT_LOOP_EXECUTE_TIME_DEFAULT = 10000; public static final int LONGBOW_OUTPUT_ADDITIONAL_ARITY = 3; + public enum ExternalPostProcessorVariableType { REQUEST_VARIABLES, HEADER_VARIABLES, QUERY_VARIABLES, ENDPOINT_VARIABLE }; } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorConfigTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorConfigTest.java index a34e757d5..8c5ab8fa8 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorConfigTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/PostProcessorConfigTest.java @@ -64,7 +64,7 @@ public void shouldReturnHttpExternalSourceConfig() { outputMapping = new OutputMapping("$.data.tensor.values[0]"); outputMappings.put("surge_factor", outputMapping); - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "post", null, null, "5000", "5000", true, null, null, headerMap, outputMappings, null, false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8000", "post", null, null, null, null, "5000", "5000", true, null, null, headerMap, outputMappings, null, false); assertEquals(httpSourceConfig, defaultPostProcessorConfig.getExternalSource().getHttpConfig().get(0)); } @@ -120,7 +120,7 @@ public void shouldBeEmptyWhenNoneOfTheConfigsExist() { @Test public void shouldNotBeEmptyWhenExternalSourceHasHttpConfigExist() { ArrayList http = new ArrayList<>(); - http.add(new HttpSourceConfig("", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false)); + http.add(new HttpSourceConfig("", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), "metricId_01", false)); ArrayList es = new ArrayList<>(); ArrayList pg = new ArrayList<>(); ExternalSourceConfig externalSourceConfig = new ExternalSourceConfig(http, es, pg, new ArrayList<>()); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/common/EndpointHandlerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/common/EndpointHandlerTest.java index e103815c6..7cb27945e 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/common/EndpointHandlerTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/common/EndpointHandlerTest.java @@ -1,5 +1,6 @@ package io.odpf.dagger.core.processors.common; +import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType; import io.odpf.stencil.StencilClientFactory; import io.odpf.stencil.client.StencilClient; import io.odpf.dagger.common.core.StencilClientOrchestrator; @@ -69,10 +70,10 @@ public void shouldReturnEndpointQueryVariableValuesForPrimitiveDataFromDescripto row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "customer_id"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture); assertArrayEquals(endpointOrQueryVariablesValues, new Object[] {"123456"}); } @@ -88,10 +89,10 @@ public void shouldReturnEndpointQueryVariableValuesForPrimitiveDataIfInputColumn row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "id"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture); assertArrayEquals(endpointOrQueryVariablesValues, new Object[] {"123456"}); } @@ -116,10 +117,10 @@ public void shouldReturnJsonValueOfEndpointQueryValuesInCaseOfArray() { row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "test_enums"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture); assertArrayEquals(new Object[] {"[\"+I[UNKNOWN]\",\"+I[TYPE1]\"]"}, endpointOrQueryVariablesValues); } @@ -141,10 +142,10 @@ public void shouldReturnJsonValueOfEndpointQueryValuesIncaseOfComplexDatatype() row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "driver_pickup_location"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, "driver_pickup_location", resultFuture); assertArrayEquals(endpointOrQueryVariablesValues, new Object[] {"{\"name\":\"test_driver\",\"address\":null,\"latitude\":172.5,\"longitude\":175.5,\"type\":null,\"note\":null,\"place_id\":null,\"accuracy_meter\":null,\"gate_id\":null}"}); } @@ -161,10 +162,10 @@ public void shouldReturnEndpointQueryVariableValuesForPrimitiveDataFromDescripto row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "customer_id"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture); assertArrayEquals(endpointOrQueryVariablesValues, new Object[] {"123456"}); } @@ -182,10 +183,10 @@ public void shouldInferEndpointVariablesFromTheCorrectStreams() { row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "customer_url"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture); assertArrayEquals(endpointOrQueryVariablesValues, new Object[] {"test_order_number", "customer_url_test"}); } @@ -203,16 +204,16 @@ public void shouldReturnEmptyObjectIfNoQueryVariables() { row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "customer_url"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture); assertArrayEquals(endpointOrQueryVariablesValues, new Object[] {}); } @Test - public void shouldThrowErrorIfVariablesAreNotProperlyConfigures() { + public void shouldThrowErrorIfRequestVariablesAreNotProperlyConfigures() { when(sourceConfig.getVariables()).thenReturn("czx"); inputProtoClasses = new String[] {"io.odpf.dagger.consumer.TestBookingLogMessage", "io.odpf.dagger.consumer.TestBookingLogMessage"}; @@ -224,11 +225,11 @@ public void shouldThrowErrorIfVariablesAreNotProperlyConfigures() { row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "customer_url"}), descriptorManager); InvalidConfigurationException exception = assertThrows(InvalidConfigurationException.class, () -> endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture)); - assertEquals("Column 'czx' not found as configured in the endpoint/query variable", exception.getMessage()); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture)); + assertEquals("Column 'czx' not found as configured in the 'REQUEST_VARIABLES' variable", exception.getMessage()); } @Test @@ -249,11 +250,11 @@ public void shouldThrowErrorIfInputProtoNotFound() { row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "driver_pickup_location"}), descriptorManager); assertThrows(NullPointerException.class, - () -> endpointHandler.getEndpointOrQueryVariablesValues(rowManager, resultFuture)); + () -> endpointHandler.getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture)); verify(errorReporter, times(1)).reportFatalException(any(DescriptorNotFoundException.class)); verify(resultFuture, times(1)).completeExceptionally(any(DescriptorNotFoundException.class)); } @@ -270,12 +271,12 @@ public void shouldCheckIfQueryIsValid() { row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "customer_id"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture); - boolean queryInvalid = endpointHandler.isQueryInvalid(resultFuture, rowManager, endpointOrQueryVariablesValues); + boolean queryInvalid = endpointHandler.isQueryInvalid(resultFuture, rowManager, sourceConfig.getVariables(), endpointOrQueryVariablesValues); assertFalse(queryInvalid); } @@ -291,12 +292,12 @@ public void shouldCheckIfQueryIsInValidInCaseOfSingeEmptyVariableValueForSingleF row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "customer_id"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture); - boolean queryInvalid = endpointHandler.isQueryInvalid(resultFuture, rowManager, endpointOrQueryVariablesValues); + boolean queryInvalid = endpointHandler.isQueryInvalid(resultFuture, rowManager, sourceConfig.getVariables(), endpointOrQueryVariablesValues); assertTrue(queryInvalid); verify(resultFuture, times(1)).complete(any()); verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.EMPTY_INPUT); @@ -315,12 +316,12 @@ public void shouldCheckIfQueryIsValidInCaseOfSomeVariableValue() { row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "customer_id"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture); - boolean queryInvalid = endpointHandler.isQueryInvalid(resultFuture, rowManager, endpointOrQueryVariablesValues); + boolean queryInvalid = endpointHandler.isQueryInvalid(resultFuture, rowManager, sourceConfig.getVariables(), endpointOrQueryVariablesValues); assertFalse(queryInvalid); } @@ -337,12 +338,12 @@ public void shouldCheckIfQueryIsInvalidInCaseOfAllVariableValues() { row.setField(1, new Row(1)); RowManager rowManager = new RowManager(row); - endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter, + endpointHandler = new EndpointHandler(meterStatsManager, errorReporter, inputProtoClasses, getColumnNameManager(new String[] {"order_number", "customer_id"}), descriptorManager); Object[] endpointOrQueryVariablesValues = endpointHandler - .getEndpointOrQueryVariablesValues(rowManager, resultFuture); + .getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, sourceConfig.getVariables(), resultFuture); - boolean queryInvalid = endpointHandler.isQueryInvalid(resultFuture, rowManager, endpointOrQueryVariablesValues); + boolean queryInvalid = endpointHandler.isQueryInvalid(resultFuture, rowManager, sourceConfig.getVariables(), endpointOrQueryVariablesValues); assertTrue(queryInvalid); verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.EMPTY_INPUT); } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/ExternalPostProcessorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/ExternalPostProcessorTest.java index 0205252ed..f31f3c568 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/ExternalPostProcessorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/ExternalPostProcessorTest.java @@ -72,7 +72,7 @@ public void setup() { HashMap httpColumnNames = new HashMap<>(); httpColumnNames.put("http_field_1", new OutputMapping("")); httpColumnNames.put("http_field_2", new OutputMapping("")); - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "POST", "/some/patttern/%s", "variable", "123", "234", false, "type", "20", new HashMap<>(), httpColumnNames, "metricId_01", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpColumnNames, "metricId_01", false); HashMap esOutputMapping = new HashMap<>(); esOutputMapping.put("es_field_1", new OutputMapping("")); EsSourceConfig esSourceConfig = new EsSourceConfig("host", "port", "", "", "endpointPattern", @@ -135,7 +135,7 @@ public void shouldProcessWithRightConfiguration() { outputMapping.put("order_id", new OutputMapping("path")); List httpSourceConfigs = new ArrayList<>(); - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "POST", "/some/patttern/%s", "variable", "123", "234", false, "type", "20", new HashMap<>(), outputMapping, "metricId_01", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), outputMapping, "metricId_01", false); httpSourceConfigs.add(httpSourceConfig); List esSourceConfigs = new ArrayList<>(); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/ExternalSourceConfigTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/ExternalSourceConfigTest.java index c3adf236f..177859b60 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/ExternalSourceConfigTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/ExternalSourceConfigTest.java @@ -29,7 +29,7 @@ public void setUp() { HashMap httpOutputMapping = new HashMap<>(); httpOutputMapping.put("http_field_1", new OutputMapping("")); httpOutputMapping.put("http_field_2", new OutputMapping("")); - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "POST", "/some/patttern/%s", "variable", "123", "234", false, "type", "20", new HashMap<>(), httpOutputMapping, "metricId_01", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("endpoint", "POST", "/some/patttern/%s", "variable", "", "", "123", "234", false, "type", "20", new HashMap<>(), httpOutputMapping, "metricId_01", false); http = new ArrayList<>(); http.add(httpSourceConfig); es = new ArrayList<>(); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/es/EsAsyncConnectorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/es/EsAsyncConnectorTest.java index ade3dd94c..e8360543d 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/es/EsAsyncConnectorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/es/EsAsyncConnectorTest.java @@ -184,7 +184,7 @@ public void shouldNotEnrichOutputWhenEndpointVariableIsInvalid() throws Exceptio ArgumentCaptor invalidConfigurationExceptionCaptor = ArgumentCaptor.forClass(InvalidConfigurationException.class); ArgumentCaptor reportExceptionCaptor = ArgumentCaptor.forClass(InvalidConfigurationException.class); - String expectedExceptionMessage = "Column 'invalid_variable' not found as configured in the endpoint/query variable"; + String expectedExceptionMessage = "Column 'invalid_variable' not found as configured in the 'ENDPOINT_VARIABLE' variable"; verify(resultFuture, times(1)).completeExceptionally(invalidConfigurationExceptionCaptor.capture()); assertEquals(expectedExceptionMessage, invalidConfigurationExceptionCaptor.getValue().getMessage()); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/grpc/GrpcAsyncConnectorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/grpc/GrpcAsyncConnectorTest.java index 7b9c04857..032c2c34f 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/grpc/GrpcAsyncConnectorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/grpc/GrpcAsyncConnectorTest.java @@ -206,12 +206,12 @@ public void shouldCompleteExceptionallyWhenEndpointVariableIsInvalid() throws Ex verify(meterStatsManager, times(1)).markEvent(INVALID_CONFIGURATION); ArgumentCaptor reportInvalidConfigCaptor = ArgumentCaptor.forClass(InvalidConfigurationException.class); verify(errorReporter, times(1)).reportFatalException(reportInvalidConfigCaptor.capture()); - assertEquals("Column 'invalid_variable' not found as configured in the endpoint/query variable", + assertEquals("Column 'invalid_variable' not found as configured in the 'REQUEST_VARIABLES' variable", reportInvalidConfigCaptor.getValue().getMessage()); ArgumentCaptor invalidConfigurationExceptionArgumentCaptor = ArgumentCaptor.forClass(InvalidConfigurationException.class); verify(resultFuture, times(1)).completeExceptionally(invalidConfigurationExceptionArgumentCaptor.capture()); - assertEquals("Column 'invalid_variable' not found as configured in the endpoint/query variable", + assertEquals("Column 'invalid_variable' not found as configured in the 'REQUEST_VARIABLES' variable", invalidConfigurationExceptionArgumentCaptor.getValue().getMessage()); } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/grpc/GrpcSourceConfigTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/grpc/GrpcSourceConfigTest.java index fbed065c6..8b7331ecf 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/grpc/GrpcSourceConfigTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/grpc/GrpcSourceConfigTest.java @@ -119,13 +119,13 @@ public void hasTypeShouldBeTrueWhenTypeIsPresent() { @Test public void hasTypeShouldBeFalseWhenTypeIsNull() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", null, "", false, null, "", new HashMap<>(), new HashMap<>(), metricId, false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", null, "", false, null, "", new HashMap<>(), new HashMap<>(), metricId, false); assertFalse(httpSourceConfig.hasType()); } @Test public void hasTypeShouldBeFalseWhenTypeIsEmpty() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), metricId, false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), metricId, false); assertFalse(httpSourceConfig.hasType()); } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpAsyncConnectorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpAsyncConnectorTest.java index 2f401b59d..0d049484c 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpAsyncConnectorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpAsyncConnectorTest.java @@ -97,7 +97,7 @@ public void setUp() { externalMetricConfig = new ExternalMetricConfig("metricId-http-01", shutDownPeriod, telemetryEnabled); defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", - "customer_id", "123", "234", false, httpConfigType, "345", + "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); } @@ -184,10 +184,10 @@ public void shouldCompleteExceptionallyIfOutputDescriptorNotFound() throws Excep } @Test - public void shouldCompleteExceptionallyWhenEndpointVariableIsInvalid() throws Exception { + public void shouldCompleteExceptionallyWhenRequestVariableIsInvalid() throws Exception { when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); String invalidRequestVariable = "invalid_variable"; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", invalidRequestVariable, "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", invalidRequestVariable, "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); @@ -198,19 +198,19 @@ public void shouldCompleteExceptionallyWhenEndpointVariableIsInvalid() throws Ex verify(meterStatsManager, times(1)).markEvent(INVALID_CONFIGURATION); ArgumentCaptor exceptionArgumentCaptor = ArgumentCaptor.forClass(InvalidConfigurationException.class); verify(errorReporter, times(1)).reportFatalException(exceptionArgumentCaptor.capture()); - assertEquals("Column 'invalid_variable' not found as configured in the endpoint/query variable", exceptionArgumentCaptor.getValue().getMessage()); + assertEquals("Column 'invalid_variable' not found as configured in the 'REQUEST_VARIABLES' variable", exceptionArgumentCaptor.getValue().getMessage()); ArgumentCaptor futureCaptor = ArgumentCaptor.forClass(InvalidConfigurationException.class); verify(resultFuture, times(1)).completeExceptionally(futureCaptor.capture()); - assertEquals("Column 'invalid_variable' not found as configured in the endpoint/query variable", + assertEquals("Column 'invalid_variable' not found as configured in the 'REQUEST_VARIABLES' variable", futureCaptor.getValue().getMessage()); } @Test public void shouldCompleteExceptionallyWhenEndpointVariableIsEmptyAndRequiredInPattern() throws Exception { String emptyRequestVariable = ""; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", emptyRequestVariable, "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", emptyRequestVariable, "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); @@ -227,7 +227,7 @@ public void shouldCompleteExceptionallyWhenEndpointVariableIsEmptyAndRequiredInP @Test public void shouldEnrichWhenEndpointVariableIsEmptyAndNotRequiredInPattern() throws Exception { String emptyRequestVariable = ""; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"static\"}", emptyRequestVariable, "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"static\"}", emptyRequestVariable, "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); @@ -247,7 +247,7 @@ public void shouldEnrichWhenEndpointVariableIsEmptyAndNotRequiredInPattern() thr @Test public void shouldCompleteExceptionallyWhenEndpointPatternIsInvalid() throws Exception { String invalidRequestPattern = "{\"key\": \"%\"}"; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", invalidRequestPattern, "customer_id", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", invalidRequestPattern, "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); @@ -262,7 +262,7 @@ public void shouldCompleteExceptionallyWhenEndpointPatternIsInvalid() throws Exc @Test public void shouldGetDescriptorFromOutputProtoIfTypeNotGiven() throws Exception { - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", false, null, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, "345", headers, outputMapping, "metricId_02", false); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); @@ -277,7 +277,7 @@ public void shouldGetDescriptorFromOutputProtoIfTypeNotGiven() throws Exception @Test public void shouldGetDescriptorFromTypeIfGiven() throws Exception { - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", false, "TestBookingLogMessage", "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, "TestBookingLogMessage", "345", headers, outputMapping, "metricId_02", false); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); @@ -293,7 +293,7 @@ public void shouldGetDescriptorFromTypeIfGiven() throws Exception { @Test public void shouldCompleteExceptionallyWhenEndpointPatternIsIncompatible() throws Exception { String invalidRequestPattern = "{\"key\": \"%d\"}"; - defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", invalidRequestPattern, "customer_id", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", invalidRequestPattern, "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); @@ -337,9 +337,85 @@ public void shouldAddCustomHeaders() throws Exception { verify(boundRequestBuilder, times(1)).addHeader("content-type", "application/json"); } + @Test + public void shouldAddDynamicHeaders() throws Exception { + when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); + when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); + when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); + when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); + HttpSourceConfig dynamicHeaderHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", + "customer_id", "{\"X_KEY\": \"%s\"}", "customer_id", "123", "234", false, httpConfigType, "345", + headers, outputMapping, "metricId_02", false); + HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(dynamicHeaderHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); + + httpAsyncConnector.open(flinkConfiguration); + httpAsyncConnector.asyncInvoke(streamData, resultFuture); + verify(boundRequestBuilder, times(2)).addHeader(anyString(), anyString()); + verify(boundRequestBuilder, times(1)).addHeader("content-type", "application/json"); + verify(boundRequestBuilder, times(1)).addHeader("X_KEY", "123456"); + } + + @Test + public void shouldNotAddDynamicHeaders() throws Exception { + when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); + when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); + when(stencilClientOrchestrator.getStencilClient()).thenReturn(stencilClient); + when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); + HttpSourceConfig dynamicHeaderHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", + "customer_id", "{\"X_KEY\": \"%s\"}", "customer_ids", "123", "234", false, httpConfigType, "345", + headers, outputMapping, "metricId_02", false); + HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(dynamicHeaderHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); + httpAsyncConnector.open(flinkConfiguration); + httpAsyncConnector.asyncInvoke(streamData, resultFuture); + + verify(meterStatsManager, times(1)).markEvent(INVALID_CONFIGURATION); + verify(errorReporter, times(1)).reportFatalException(any(InvalidConfigurationException.class)); + verify(resultFuture, times(1)).completeExceptionally(any(InvalidConfigurationException.class)); + } + + @Test + public void shouldCompleteExceptionallyWhenHeaderVariableIsInvalid() throws Exception { + when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); + String invalidHeaderVariable = "invalid_variable"; + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "{\"key\": \"%s\"}", invalidHeaderVariable, "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); + when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); + HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); + + httpAsyncConnector.open(flinkConfiguration); + httpAsyncConnector.asyncInvoke(streamData, resultFuture); + + verify(meterStatsManager, times(1)).markEvent(INVALID_CONFIGURATION); + ArgumentCaptor exceptionArgumentCaptor = ArgumentCaptor.forClass(InvalidConfigurationException.class); + verify(errorReporter, times(1)).reportFatalException(exceptionArgumentCaptor.capture()); + assertEquals("Column 'invalid_variable' not found as configured in the 'HEADER_VARIABLES' variable", exceptionArgumentCaptor.getValue().getMessage()); + + ArgumentCaptor futureCaptor = ArgumentCaptor.forClass(InvalidConfigurationException.class); + verify(resultFuture, times(1)).completeExceptionally(futureCaptor.capture()); + + assertEquals("Column 'invalid_variable' not found as configured in the 'HEADER_VARIABLES' variable", + futureCaptor.getValue().getMessage()); + } + + @Test + public void shouldCompleteExceptionallyWhenHeaderPatternIsIncompatible() throws Exception { + String invalidHeaderPattern = "{\"key\": \"%d\"}"; + defaultHttpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", invalidHeaderPattern, "customer_id", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); + when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(boundRequestBuilder); + when(boundRequestBuilder.setBody("{\"key\": \"123456\"}")).thenReturn(boundRequestBuilder); + HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); + httpAsyncConnector.open(flinkConfiguration); + httpAsyncConnector.asyncInvoke(streamData, resultFuture); + + verify(meterStatsManager, times(1)).markEvent(INVALID_CONFIGURATION); + verify(errorReporter, times(1)).reportFatalException(any(InvalidConfigurationException.class)); + verify(resultFuture, times(1)).completeExceptionally(any(InvalidConfigurationException.class)); + } + @Test public void shouldThrowExceptionInTimeoutIfFailOnErrorIsTrue() throws Exception { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); httpAsyncConnector.timeout(streamData, resultFuture); @@ -349,7 +425,7 @@ public void shouldThrowExceptionInTimeoutIfFailOnErrorIsTrue() throws Exception @Test public void shouldReportFatalInTimeoutIfFailOnErrorIsTrue() throws Exception { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); httpAsyncConnector.timeout(streamData, resultFuture); @@ -359,7 +435,7 @@ public void shouldReportFatalInTimeoutIfFailOnErrorIsTrue() throws Exception { @Test public void shouldReportNonFatalInTimeoutIfFailOnErrorIsFalse() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); httpAsyncConnector.timeout(streamData, resultFuture); @@ -379,7 +455,7 @@ public void shouldPassTheInputWithRowSizeCorrespondingToColumnNamesInTimeoutIfFa public void shouldThrowExceptionIfUnsupportedHttpVerbProvided() throws Exception { when(defaultDescriptorManager.getDescriptor(inputProtoClasses[0])).thenReturn(TestBookingLogMessage.getDescriptor()); - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "PATCH", "{\"key\": \"%s\"}", "customer_id", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "PATCH", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(httpSourceConfig, externalMetricConfig, schemaConfig, httpClient, errorReporter, meterStatsManager, defaultDescriptorManager); httpAsyncConnector.open(flinkConfiguration); @@ -413,4 +489,5 @@ public void shouldThrowIfRuntimeContextNotInitialized() throws Exception { HttpAsyncConnector httpAsyncConnector = new HttpAsyncConnector(defaultHttpSourceConfig, externalMetricConfig, schemaConfig, httpClient, null, meterStatsManager, defaultDescriptorManager); httpAsyncConnector.open(flinkConfiguration); } + } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandlerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandlerTest.java index c5fc30f84..7fa9548a9 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandlerTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpResponseHandlerTest.java @@ -74,7 +74,7 @@ public void setup() { streamData.setField(1, new Row(2)); rowManager = new RowManager(streamData); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); } @Test @@ -148,7 +148,7 @@ public void shouldPassInputIfFailOnErrorFalseAndStatusCodeIsOtherThan5XXAnd4XX() @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs404() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(404); @@ -167,7 +167,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs404() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs4XXOtherThan404() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(400); @@ -185,7 +185,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs4XXOtherThan404() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs5XX() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(502); @@ -203,7 +203,7 @@ public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIs5XX() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndStatusCodeIsOtherThan5XXAnd4XX() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); when(response.getStatusCode()).thenReturn(302); @@ -237,7 +237,7 @@ public void shouldPassInputIfFailOnErrorFalseAndOnThrowable() { @Test public void shouldThrowErrorIfFailOnErrorTrueAndOnThrowable() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Throwable throwable = new Throwable("throwable message"); @@ -258,7 +258,7 @@ public void shouldPopulateSingleResultFromHttpCallInInputRow() { outputMapping.put("surge_factor", new OutputMapping("$.surge")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); @@ -284,7 +284,7 @@ public void shouldPopulateMultipleResultsFromHttpCallInInputRow() { outputMapping.put("s2_id_level", new OutputMapping("$.prediction")); outputColumnNames = Arrays.asList("surge_factor", "s2_id_level"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); @@ -313,7 +313,7 @@ public void shouldThrowExceptionIfFieldNotFoundInFieldDescriptorWhenTypeIsPassed outputMapping.put("surge_factor", new OutputMapping("$.surge")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, httpConfigType, "345", headers, outputMapping, "metricId_02", false); when(response.getStatusCode()).thenReturn(200); when(response.getResponseBody()).thenReturn("{\n" + " \"surge\": 0.732\n" @@ -331,7 +331,7 @@ public void shouldThrowExceptionIfPathIsWrongIfFailOnErrorsTrue() { outputMapping.put("surge_factor", new OutputMapping("invalidPath")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", true, httpConfigType, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); @@ -356,7 +356,7 @@ public void shouldPopulateResultAsObjectIfTypeIsNotPassedAndRetainResponseTypeIs outputMapping.put("surge_factor", new OutputMapping("$.surge")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", false, null, "345", headers, outputMapping, "metricId_02", true); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, "345", headers, outputMapping, "metricId_02", true); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); @@ -381,7 +381,7 @@ public void shouldNotPopulateResultAsObjectIfTypeIsNotPassedAndRetainResponseTyp outputMapping.put("surge_factor", new OutputMapping("$.surge")); outputColumnNames = Collections.singletonList("surge_factor"); columnNameManager = new ColumnNameManager(inputColumnNames, outputColumnNames); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "123", "234", false, null, "345", headers, outputMapping, "metricId_02", false); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "customer_id", "", "", "123", "234", false, null, "345", headers, outputMapping, "metricId_02", false); HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, meterStatsManager, rowManager, columnNameManager, descriptor, resultFuture, errorReporter, new PostResponseTelemetry()); Row resultStreamData = new Row(2); Row outputData = new Row(2); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpSourceConfigTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpSourceConfigTest.java index 96220922b..6ee87cc1f 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpSourceConfigTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/HttpSourceConfigTest.java @@ -22,6 +22,8 @@ public class HttpSourceConfigTest { private String verb; private String requestPattern; private String requestVariables; + private String headerPattern; + private String headerVariables; private String connectTimeout; private boolean failOnErrors; private String type; @@ -41,13 +43,15 @@ public void setup() { verb = "POST"; requestPattern = "/customers/customer/%s"; requestVariables = "customer_id"; + headerPattern = "{\"X_KEY\":\"%s\"}"; + headerVariables = "customer_id"; connectTimeout = "234"; failOnErrors = false; type = "InputProtoMessage"; capacity = "345"; metricId = "metricId-http-01"; retainResponseType = false; - defaultHttpSourceConfig = new HttpSourceConfig(endpoint, verb, requestPattern, requestVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headerMap, outputMappings, metricId, retainResponseType); + defaultHttpSourceConfig = new HttpSourceConfig(endpoint, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headerMap, outputMappings, metricId, retainResponseType); } @Test @@ -107,13 +111,13 @@ public void hasTypeShouldBeTrueWhenTypeIsPresent() { @Test public void hasTypeShouldBeFalseWhenTypeIsNull() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", null, "", false, null, "", new HashMap<>(), new HashMap<>(), metricId, false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", null, "", false, null, "", new HashMap<>(), new HashMap<>(), metricId, false); assertFalse(httpSourceConfig.hasType()); } @Test public void hasTypeShouldBeFalseWhenTypeIsEmpty() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), metricId, false); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("", "", "", "", "", "", "", "", false, "", "", new HashMap<>(), new HashMap<>(), metricId, false); assertFalse(httpSourceConfig.hasType()); } @@ -147,7 +151,7 @@ public void shouldValidate() { @Test public void shouldThrowExceptionIfAllFieldsMissing() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig(null, null, null, requestVariables, null, null, false, null, capacity, null, null, metricId, retainResponseType); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig(null, null, null, requestVariables, null, null, null, null, false, null, capacity, null, null, metricId, retainResponseType); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> httpSourceConfig.validateFields()); assertEquals("Missing required fields: [endpoint, streamTimeout, requestPattern, verb, connectTimeout, outputMapping]", exception.getMessage()); @@ -156,7 +160,7 @@ public void shouldThrowExceptionIfAllFieldsMissing() { @Test public void shouldThrowExceptionIfSomeFieldsMissing() { - HttpSourceConfig httpSourceConfig = new HttpSourceConfig("localhost", "post", "body", requestVariables, null, null, false, null, capacity, null, null, "metricId_01", retainResponseType); + HttpSourceConfig httpSourceConfig = new HttpSourceConfig("localhost", "post", "body", requestVariables, null, null, null, null, false, null, capacity, null, null, "metricId_01", retainResponseType); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> httpSourceConfig.validateFields()); assertEquals("Missing required fields: [streamTimeout, connectTimeout, outputMapping]", exception.getMessage()); @@ -170,7 +174,7 @@ public void shouldThrowExceptionIfFieldsOfNestedObjectsAreMissing() { outputMappings.put("field", outputMappingWithNullField); defaultHttpSourceConfig = new HttpSourceConfig("http://localhost", - "post", "request_body", requestVariables, "4000", "1000", false, "", capacity, headerMap, outputMappings, "metricId_01", retainResponseType); + "post", "request_body", requestVariables, "", "", "4000", "1000", false, "", capacity, headerMap, outputMappings, "metricId_01", retainResponseType); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> defaultHttpSourceConfig.validateFields()); assertEquals("Missing required fields: [path]", exception.getMessage()); @@ -185,7 +189,7 @@ public void shouldThrowExceptionIfRequestPatternIsEmpty() { outputMappings.put("field", outputMappingWithNullField); defaultHttpSourceConfig = new HttpSourceConfig("http://localhost", - "post", "", requestVariables, "4000", "1000", false, "", capacity, headerMap, outputMappings, "metricId_01", retainResponseType); + "post", "", requestVariables, "", "", "4000", "1000", false, "", capacity, headerMap, outputMappings, "metricId_01", retainResponseType); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> defaultHttpSourceConfig.validateFields()); assertEquals("Missing required fields: [requestPattern]", exception.getMessage()); @@ -220,7 +224,7 @@ public void shouldReturnMandatoryFields() { @Test public void shouldValidateWhenOutputMappingIsEmpty() { - defaultHttpSourceConfig = new HttpSourceConfig(endpoint, verb, requestPattern, requestVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headerMap, new HashMap<>(), "metricId_01", retainResponseType); + defaultHttpSourceConfig = new HttpSourceConfig(endpoint, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headerMap, new HashMap<>(), "metricId_01", retainResponseType); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> defaultHttpSourceConfig.validateFields()); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpGetRequestHandlerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpGetRequestHandlerTest.java index b2479d306..dbe61e24c 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpGetRequestHandlerTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpGetRequestHandlerTest.java @@ -11,7 +11,7 @@ import java.util.HashMap; import static org.junit.Assert.*; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; public class HttpGetRequestHandlerTest { @@ -23,33 +23,76 @@ public class HttpGetRequestHandlerTest { private HttpSourceConfig httpSourceConfig; private ArrayList requestVariablesValues; + private ArrayList headerVariablesValues; @Before public void setup() { initMocks(this); requestVariablesValues = new ArrayList<>(); requestVariablesValues.add(1); + headerVariablesValues = new ArrayList<>(); + headerVariablesValues.add("1"); + headerVariablesValues.add("2"); } @Test public void shouldReturnTrueForGetVerbOnCanCreate() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "{\"key\": \"%s\"}", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); - HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray()); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray()); assertTrue(httpGetRequestBuilder.canCreate()); } @Test public void shouldReturnFalseForVerbOtherThanGetOnCanBuild() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); - HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray()); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray()); assertFalse(httpGetRequestBuilder.canCreate()); } @Test public void shouldBuildGetRequest() { when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "/key/%s", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); - HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray()); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "/key/%s", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray()); assertEquals(request, httpGetRequestBuilder.create()); } + + @Test + public void shouldBuildGetRequestWithOnlyDynamicHeader() { + when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); + when(request.addHeader("header_key", "1")).thenReturn(request); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "/key/%s", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray()); + httpGetRequestBuilder.create(); + verify(request, times(1)).addHeader(anyString(), anyString()); + verify(request, times(1)).addHeader("header_key", "1"); + } + + @Test + public void shouldBuildGetRequestWithDynamicAndStaticHeader() { + when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); + when(request.addHeader("header_key", "1")).thenReturn(request); + HashMap staticHeader = new HashMap(); + staticHeader.put("static", "2"); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "/key/%s", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray()); + httpGetRequestBuilder.create(); + verify(request, times(2)).addHeader(anyString(), anyString()); + verify(request, times(1)).addHeader("header_key", "1"); + verify(request, times(1)).addHeader("static", "2"); + } + + @Test + public void shouldBuildGetRequestWithMultipleDynamicAndStaticHeaders() { + when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); + HashMap staticHeader = new HashMap(); + staticHeader.put("static", "3"); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "/key/%s", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%s\"}", "1,2", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + HttpGetRequestHandler httpGetRequestBuilder = new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray()); + httpGetRequestBuilder.create(); + verify(request, times(3)).addHeader(anyString(), anyString()); + verify(request, times(1)).addHeader("header_key_1", "1"); + verify(request, times(1)).addHeader("header_key_2", "2"); + verify(request, times(1)).addHeader("static", "3"); + } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpPostRequestHandlerTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpPostRequestHandlerTest.java index 99a4493ba..5010b228c 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpPostRequestHandlerTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpPostRequestHandlerTest.java @@ -11,7 +11,7 @@ import java.util.HashMap; import static org.junit.Assert.*; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; public class HttpPostRequestHandlerTest { @@ -24,35 +24,78 @@ public class HttpPostRequestHandlerTest { private HttpSourceConfig httpSourceConfig; private ArrayList requestVariablesValues; + private ArrayList dynamicHeaderVariablesValues; @Before public void setup() { initMocks(this); requestVariablesValues = new ArrayList<>(); requestVariablesValues.add(1); + dynamicHeaderVariablesValues = new ArrayList<>(); + dynamicHeaderVariablesValues.add("1"); + dynamicHeaderVariablesValues.add("2"); } @Test public void shouldReturnTrueForPostVerbOnCanCreate() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); - HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray()); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray()); assertTrue(httpPostRequestBuilder.canCreate()); } @Test public void shouldReturnFalseForVerbOtherThanPostOnCanBuild() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "{\"key\": \"%s\"}", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); - HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray()); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray()); assertFalse(httpPostRequestBuilder.canCreate()); } @Test - public void shouldBuildPostRequest() { + public void shouldBuildPostRequestWithoutHeader() { when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(request); when(request.setBody("{\"key\": \"1\"}")).thenReturn(request); - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); - HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray()); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray()); assertEquals(request, httpPostRequestBuilder.create()); } + @Test + public void shouldBuildPostRequestWithOnlyDynamicHeader() { + when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(request); + when(request.setBody("{\"key\": \"1\"}")).thenReturn(request); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", false); + HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray()); + httpPostRequestBuilder.create(); + verify(request, times(1)).addHeader(anyString(), anyString()); + verify(request, times(1)).addHeader("header_key", "1"); + } + + @Test + public void shouldBuildPostRequestWithDynamicAndStaticHeader() { + when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(request); + when(request.setBody("{\"key\": \"1\"}")).thenReturn(request); + HashMap staticHeader = new HashMap(); + staticHeader.put("static", "2"); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key\": \"%s\"}", "1", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray()); + httpPostRequestBuilder.create(); + verify(request, times(2)).addHeader(anyString(), anyString()); + verify(request, times(1)).addHeader("header_key", "1"); + verify(request, times(1)).addHeader("static", "2"); + } + + @Test + public void shouldBuildPostRequestWithMultipleDynamicAndStaticHeaders() { + when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(request); + when(request.setBody("{\"key\": \"1\"}")).thenReturn(request); + HashMap staticHeader = new HashMap(); + staticHeader.put("static", "3"); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "{\"header_key_1\": \"%s\",\"header_key_2\": \"%s\"}", "1,2", "123", "234", false, "type", "345", staticHeader, null, "metricId_01", false); + HttpPostRequestHandler httpPostRequestBuilder = new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues.toArray(), dynamicHeaderVariablesValues.toArray()); + httpPostRequestBuilder.create(); + verify(request, times(3)).addHeader(anyString(), anyString()); + verify(request, times(1)).addHeader("header_key_1", "1"); + verify(request, times(1)).addHeader("header_key_2", "2"); + verify(request, times(1)).addHeader("static", "3"); + } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpRequestFactoryTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpRequestFactoryTest.java index 9e76146f6..0a6144954 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpRequestFactoryTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/http/request/HttpRequestFactoryTest.java @@ -25,22 +25,24 @@ public class HttpRequestFactoryTest { private HttpSourceConfig httpSourceConfig; private ArrayList requestVariablesValues; + private ArrayList headerVariablesValues; private boolean retainResponseType; @Before public void setup() { initMocks(this); requestVariablesValues = new ArrayList<>(); + headerVariablesValues = new ArrayList<>(); requestVariablesValues.add(1); retainResponseType = false; } @Test public void shouldReturnPostRequestOnTheBasisOfConfiguration() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "POST", "{\"key\": \"%s\"}", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); when(httpClient.preparePost("http://localhost:8080/test")).thenReturn(request); when(request.setBody("{\"key\": \"123456\"}")).thenReturn(request); - HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues.toArray()); + HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray()); verify(httpClient, times(1)).preparePost("http://localhost:8080/test"); verify(httpClient, times(0)).prepareGet(any(String.class)); @@ -48,9 +50,9 @@ public void shouldReturnPostRequestOnTheBasisOfConfiguration() { @Test public void shouldReturnGetRequestOnTheBasisOfConfiguration() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "/key/%s", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "GET", "/key/%s", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); when(httpClient.prepareGet("http://localhost:8080/test/key/1")).thenReturn(request); - HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues.toArray()); + HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray()); verify(httpClient, times(1)).prepareGet("http://localhost:8080/test/key/1"); verify(httpClient, times(0)).preparePost(any(String.class)); @@ -58,8 +60,8 @@ public void shouldReturnGetRequestOnTheBasisOfConfiguration() { @Test public void shouldThrowExceptionForUnsupportedHttpVerb() { - httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "PATCH", "/key/%s", "1", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); - assertThrows(InvalidHttpVerbException.class, () -> HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues.toArray())); + httpSourceConfig = new HttpSourceConfig("http://localhost:8080/test", "PATCH", "/key/%s", "1", "", "", "123", "234", false, "type", "345", new HashMap<>(), null, "metricId_01", retainResponseType); + assertThrows(InvalidHttpVerbException.class, () -> HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues.toArray(), headerVariablesValues.toArray())); } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/pg/PgAsyncConnectorTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/pg/PgAsyncConnectorTest.java index ec71872d5..21909f98e 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/pg/PgAsyncConnectorTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/processors/external/pg/PgAsyncConnectorTest.java @@ -158,12 +158,12 @@ public void shouldNotEnrichOutputWhenQueryVariableIsInvalid() throws Exception { ArgumentCaptor invalidConfigCaptor = ArgumentCaptor.forClass(InvalidConfigurationException.class); verify(resultFuture, times(1)).completeExceptionally(invalidConfigCaptor.capture()); - assertEquals("Column 'invalid_variable' not found as configured in the endpoint/query variable", invalidConfigCaptor.getValue().getMessage()); + assertEquals("Column 'invalid_variable' not found as configured in the 'QUERY_VARIABLES' variable", invalidConfigCaptor.getValue().getMessage()); verify(meterStatsManager, times(1)).markEvent(ExternalSourceAspects.INVALID_CONFIGURATION); ArgumentCaptor reportExceptionCaptor = ArgumentCaptor.forClass(InvalidConfigurationException.class); verify(errorReporter, times(1)).reportFatalException(reportExceptionCaptor.capture()); - assertEquals("Column 'invalid_variable' not found as configured in the endpoint/query variable", reportExceptionCaptor.getValue().getMessage()); + assertEquals("Column 'invalid_variable' not found as configured in the 'QUERY_VARIABLES' variable", reportExceptionCaptor.getValue().getMessage()); verify(pgClient, never()).query(any(String.class)); } diff --git a/docs/docs/advance/post_processor.md b/docs/docs/advance/post_processor.md index f8f02cb24..564da0a7a 100644 --- a/docs/docs/advance/post_processor.md +++ b/docs/docs/advance/post_processor.md @@ -247,6 +247,20 @@ List of comma-separated parameters to be replaced in request_pattern, these vari - Example value: `customer_id` - Type: `optional` +##### `header_pattern` + +Template for the dynamic headers. + +- Example value: `{"key": "%s"}` +- Type: `optional` + +##### `header_variables` + +List of comma-separated parameters to be replaced in header_pattern, these variables must be present in the input proto and selected via the SQL query. + +- Example value: `customer_id` +- Type: `optional` + ##### `stream_timeout` The timeout value for the stream in ms. @@ -325,6 +339,8 @@ PROCESSOR_POSTPROCESSOR_CONFIG = { "verb": "get", "request_pattern": "/customers/customer/%s", "request_variables": "customer_id", + "header_pattern": "{\"Header_Key\": \"%s\"}", + "header_variables": "wallet_id", "stream_timeout": "5000", "connect_timeout": "5000", "fail_on_errors": "false", diff --git a/version.txt b/version.txt index 3a4036fb4..53a75d673 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.2.5 +0.2.6