Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.odpf.dagger.core.processors.common;

import io.odpf.dagger.core.processors.ColumnNameManager;
import io.odpf.dagger.core.processors.types.SourceConfig;
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.types.Row;

Expand Down Expand Up @@ -29,7 +29,6 @@
*/
public class EndpointHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(EndpointHandler.class.getName());
private SourceConfig sourceConfig;
private MeterStatsManager meterStatsManager;
private ErrorReporter errorReporter;
private String[] inputProtoClasses;
Expand All @@ -48,13 +47,11 @@ public class EndpointHandler {
* @param columnNameManager the column name manager
* @param descriptorManager the descriptor manager
*/
public EndpointHandler(SourceConfig sourceConfig,
MeterStatsManager meterStatsManager,
public EndpointHandler(MeterStatsManager meterStatsManager,
ErrorReporter errorReporter,
String[] inputProtoClasses,
ColumnNameManager columnNameManager,
DescriptorManager descriptorManager) {
this.sourceConfig = sourceConfig;
this.meterStatsManager = meterStatsManager;
this.errorReporter = errorReporter;
this.inputProtoClasses = inputProtoClasses;
Expand All @@ -63,19 +60,20 @@ public EndpointHandler(SourceConfig sourceConfig,
}

/**
* Get endpoint or query variables values.
* Get external post processor variables values.
*
* @param rowManager the row manager
* @param variableType the variable type
* @parm variables the variable list
* @param resultFuture the result future
* @return the array object
*/
public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultFuture<Row> resultFuture) {
String queryVariables = sourceConfig.getVariables();
if (StringUtils.isEmpty(queryVariables)) {
public Object[] getVariablesValue(RowManager rowManager, ExternalPostProcessorVariableType variableType, String variables, ResultFuture<Row> resultFuture) {
if (StringUtils.isEmpty(variables)) {
return new Object[0];
}

String[] requiredInputColumns = queryVariables.split(",");
String[] requiredInputColumns = variables.split(",");
ArrayList<Object> inputColumnValues = new ArrayList<>();
if (descriptorMap == null) {
descriptorMap = createDescriptorMap(requiredInputColumns, inputProtoClasses, resultFuture);
Expand All @@ -84,7 +82,7 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF
for (String inputColumnName : requiredInputColumns) {
int inputColumnIndex = columnNameManager.getInputIndex(inputColumnName);
if (inputColumnIndex == -1) {
throw new InvalidConfigurationException(String.format("Column '%s' not found as configured in the endpoint/query variable", inputColumnName));
throw new InvalidConfigurationException(String.format("Column '%s' not found as configured in the '%s' variable", inputColumnName, variableType));
}

Descriptors.FieldDescriptor fieldDescriptor = descriptorMap.get(inputColumnName);
Expand All @@ -105,11 +103,12 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF
*
* @param resultFuture the result future
* @param rowManager the row manager
* @param endpointVariablesValues the endpoint variables values
* @param variables the request/header variables
* @param variablesValue the variables value
* @return the boolean
*/
public boolean isQueryInvalid(ResultFuture<Row> resultFuture, RowManager rowManager, Object[] endpointVariablesValues) {
if (!StringUtils.isEmpty(sourceConfig.getVariables()) && (Arrays.asList(endpointVariablesValues).isEmpty() || Arrays.stream(endpointVariablesValues).allMatch(""::equals))) {
public boolean isQueryInvalid(ResultFuture<Row> resultFuture, RowManager rowManager, String variables, Object[] variablesValue) {
if (!StringUtils.isEmpty(variables) && (Arrays.asList(variablesValue).isEmpty() || Arrays.stream(variablesValue).allMatch(""::equals))) {
LOGGER.warn("Could not populate any request variable. Skipping external calls");
meterStatsManager.markEvent(ExternalSourceAspects.EMPTY_INPUT);
resultFuture.complete(singleton(rowManager.getAll()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void open(Configuration configuration) throws Exception {
meterStatsManager = new MeterStatsManager(getRuntimeContext().getMetricGroup(), true);
}
if (endpointHandler == null) {
endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter,
endpointHandler = new EndpointHandler(meterStatsManager, errorReporter,
schemaConfig.getInputProtoClasses(), schemaConfig.getColumnNameManager(), descriptorManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import io.odpf.dagger.core.processors.external.AsyncConnector;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.types.Row;

import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
Expand Down Expand Up @@ -79,8 +79,8 @@ protected void createClient() {
protected void process(Row input, ResultFuture<Row> resultFuture) {
RowManager rowManager = new RowManager(input);
Object[] endpointVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, endpointVariablesValues)) {
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.ENDPOINT_VARIABLE, esSourceConfig.getVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, esSourceConfig.getVariables(), endpointVariablesValues)) {
return;
}
String esEndpoint = String.format(esSourceConfig.getPattern(), endpointVariablesValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,8 +91,8 @@ protected void process(Row input, ResultFuture<Row> resultFuture) throws Excepti
RowManager rowManager = new RowManager(input);

Object[] requestVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) {
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, grpcSourceConfig.getVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, grpcSourceConfig.getVariables(), requestVariablesValues)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.flink.types.Row;

import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,12 +95,14 @@ protected void process(Row input, ResultFuture<Row> resultFuture) {
RowManager rowManager = new RowManager(input);

Object[] requestVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) {
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, httpSourceConfig.getRequestVariables(), resultFuture);
Object[] dynamicHeaderVariablesValues = getEndpointHandler()
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.HEADER_VARIABLES, httpSourceConfig.getHeaderVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getRequestVariables(), requestVariablesValues) || getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getHeaderVariables(), dynamicHeaderVariablesValues)) {
return;
}

BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues);
BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues);
HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getMeterStatsManager(),
rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry());
httpResponseHandler.startTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
private String verb;
private String requestPattern;
private String requestVariables;
private String headerPattern;
private String headerVariables;
private String streamTimeout;
private String connectTimeout;
private boolean failOnErrors;
Expand All @@ -40,21 +42,25 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
* @param verb the verb
* @param requestPattern the request pattern
* @param requestVariables the request variables
* @param headerPattern the dynamic header pattern
* @param headerVariables the header variables
* @param streamTimeout the stream timeout
* @param connectTimeout the connect timeout
* @param failOnErrors the fail on errors
* @param type the type
* @param capacity the capacity
* @param headers the headers
* @param headers the static headers
* @param outputMapping the output mapping
* @param metricId the metric id
* @param retainResponseType the retain response type
*/
public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
this.endpoint = endpoint;
this.verb = verb;
this.requestPattern = requestPattern;
this.requestVariables = requestVariables;
this.headerPattern = headerPattern;
this.headerVariables = headerVariables;
this.streamTimeout = streamTimeout;
this.connectTimeout = connectTimeout;
this.failOnErrors = failOnErrors;
Expand Down Expand Up @@ -102,6 +108,24 @@ public String getRequestVariables() {
return requestVariables;
}

/**
* Gets header pattern.
*
* @return the header pattern
*/
public String getHeaderPattern() {
return headerPattern;
}

/**
* Gets header Variable.
*
* @return the header Variable
*/
public String getHeaderVariables() {
return headerVariables;
}

@Override
public String getPattern() {
return requestPattern;
Expand Down Expand Up @@ -208,11 +232,11 @@ public boolean equals(Object o) {
return false;
}
HttpSourceConfig that = (HttpSourceConfig) o;
return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
}

@Override
public int hashCode() {
return Objects.hash(endpoint, verb, requestPattern, requestVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
return Objects.hash(endpoint, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package io.odpf.dagger.core.processors.external.http.request;

import com.google.gson.Gson;
import io.netty.util.internal.StringUtil;
import io.odpf.dagger.core.processors.external.http.HttpSourceConfig;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;

import java.util.HashMap;
import java.util.Map;

/**
* The Http get request handler.
*/
public class HttpGetRequestHandler implements HttpRequestHandler {
private HttpSourceConfig httpSourceConfig;
private AsyncHttpClient httpClient;
private Object[] requestVariablesValues;
private Object[] dynamicHeaderVariablesValues;

/**
* Instantiates a new Http get request handler.
Expand All @@ -19,10 +25,11 @@ public class HttpGetRequestHandler implements HttpRequestHandler {
* @param httpClient the http client
* @param requestVariablesValues the request variables values
*/
public HttpGetRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) {
public HttpGetRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues) {
this.httpSourceConfig = httpSourceConfig;
this.httpClient = httpClient;
this.requestVariablesValues = requestVariablesValues;
this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues;
}

@Override
Expand All @@ -31,7 +38,12 @@ public BoundRequestBuilder create() {
String endpoint = httpSourceConfig.getEndpoint();
String requestEndpoint = endpoint + endpointPath;
BoundRequestBuilder getRequest = httpClient.prepareGet(requestEndpoint);
return addHeaders(getRequest, httpSourceConfig.getHeaders());
Map<String, String> headers = httpSourceConfig.getHeaders();
if (!StringUtil.isNullOrEmpty(httpSourceConfig.getHeaderPattern())) {
String dynamicHeader = String.format(httpSourceConfig.getHeaderPattern(), dynamicHeaderVariablesValues);
headers.putAll(new Gson().fromJson(dynamicHeader, HashMap.class));
}
return addHeaders(getRequest, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
package io.odpf.dagger.core.processors.external.http.request;

import com.google.gson.Gson;
import io.netty.util.internal.StringUtil;
import io.odpf.dagger.core.processors.external.http.HttpSourceConfig;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;

import java.util.HashMap;
import java.util.Map;

/**
* The Http post request handler.
*/
public class HttpPostRequestHandler implements HttpRequestHandler {
private HttpSourceConfig httpSourceConfig;
private AsyncHttpClient httpClient;
private Object[] requestVariablesValues;

private Object[] dynamicHeaderVariablesValues;
/**
* Instantiates a new Http post request handler.
*
* @param httpSourceConfig the http source config
* @param httpClient the http client
* @param requestVariablesValues the request variables values
*/
public HttpPostRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) {
public HttpPostRequestHandler(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] dynamicHeaderVariablesValues) {
this.httpSourceConfig = httpSourceConfig;
this.httpClient = httpClient;
this.requestVariablesValues = requestVariablesValues;
this.dynamicHeaderVariablesValues = dynamicHeaderVariablesValues;
}

@Override
Expand All @@ -32,7 +38,12 @@ public BoundRequestBuilder create() {
BoundRequestBuilder postRequest = httpClient
.preparePost(endpoint)
.setBody(requestBody);
return addHeaders(postRequest, httpSourceConfig.getHeaders());
Map<String, String> headers = httpSourceConfig.getHeaders();
if (!StringUtil.isNullOrEmpty(httpSourceConfig.getHeaderPattern())) {
String dynamicHeader = String.format(httpSourceConfig.getHeaderPattern(), dynamicHeaderVariablesValues);
headers.putAll(new Gson().fromJson(dynamicHeader, HashMap.class));
}
return addHeaders(postRequest, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ public class HttpRequestFactory {
* @param requestVariablesValues the request variables values
* @return the bound request builder
*/
public static BoundRequestBuilder createRequest(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues) {
public static BoundRequestBuilder createRequest(HttpSourceConfig httpSourceConfig, AsyncHttpClient httpClient, Object[] requestVariablesValues, Object[] headerVariablesValues) {

ArrayList<HttpRequestHandler> httpRequestHandlers = new ArrayList<>();
httpRequestHandlers.add(new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues));
httpRequestHandlers.add(new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues));
httpRequestHandlers.add(new HttpPostRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues));
httpRequestHandlers.add(new HttpGetRequestHandler(httpSourceConfig, httpClient, requestVariablesValues, headerVariablesValues));

HttpRequestHandler httpRequestHandler = httpRequestHandlers
.stream()
Expand Down
Loading