Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/python_release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Python Package
on:
release:
types: [created]

jobs:
publishPythonZip:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Zip Python Udf
run: |
cd dagger-py-functions
zip -r python_udfs.zip udfs -x "*/__init__.py"
zip -jr data.zip data
zip -r dagger-py-functions.zip requirements.txt data.zip python_udfs.zip
- name: Upload Release
uses: ncipollo/release-action@v1
with:
artifacts: dagger-py-functions/dagger-py-functions.zip
allowUpdates: true
omitNameDuringUpdate: true
omitBodyDuringUpdate: true
omitPrereleaseDuringUpdate: true
token: ${{ secrets.GITHUB_TOKEN }}
27 changes: 27 additions & 0 deletions .github/workflows/python_validation.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Python Validation

on: push

jobs:
pythonValidation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.8
uses: actions/setup-python@v3
with:
python-version: '3.8'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install apache-flink==1.14.3
cd dagger-py-functions
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
- name: Test with pytest
run: |
cd dagger-py-functions
pytest --disable-warnings
3 changes: 2 additions & 1 deletion dagger-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ dependencies {

dependenciesCommonJar ('org.apache.hadoop:hadoop-client:2.8.3') {
exclude module:"commons-cli"
exclude module:"commons-compress"
}
dependenciesCommonJar 'com.google.cloud.bigdataoss:gcs-connector:1.9.0-hadoop2'
dependenciesCommonJar 'org.apache.flink:flink-metrics-dropwizard:' + flinkVersion
dependenciesCommonJar 'org.apache.flink:flink-json:' + flinkVersion
dependenciesCommonJar 'com.jayway.jsonpath:json-path:2.4.0'
dependenciesCommonJar 'io.odpf:stencil:0.1.6'
dependenciesCommonJar 'io.odpf:stencil:0.2.1'
dependenciesCommonJar 'com.google.code.gson:gson:2.8.2'
dependenciesCommonJar 'org.apache.parquet:parquet-column:1.12.2'

Expand Down
4 changes: 4 additions & 0 deletions dagger-core/env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ METRIC_TELEMETRY_ENABLE=true
# == Others ==
FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.FunctionFactory
FLINK_ROWTIME_ATTRIBUTE_NAME=rowtime

# == Python Udf ==
PYTHON_UDF_ENABLE=false
PYTHON_UDF_CONFIG={"PYTHON_FILES":"/path/to/files.zip", "PYTHON_REQUIREMENTS": "requirements.txt", "PYTHON_FN_EXECUTION_BUNDLE_SIZE": "1000"}
13 changes: 12 additions & 1 deletion dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@
import io.odpf.dagger.core.sink.SinkOrchestrator;
import io.odpf.dagger.core.source.StreamsFactory;
import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.functions.udfs.python.PythonUdfConfig;
import io.odpf.dagger.functions.udfs.python.PythonUdfManager;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.List;

import static io.odpf.dagger.core.utils.Constants.*;
import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_DEFAULT;
import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_KEY;
import static org.apache.flink.table.api.Expressions.$;

/**
Expand Down Expand Up @@ -138,7 +143,13 @@ private ApiExpression[] getApiExpressions(StreamInfo streamInfo) {
*
* @return the stream manager
*/
public StreamManager registerFunctions() {
public StreamManager registerFunctions() throws IOException {
if (configuration.getBoolean(PYTHON_UDF_ENABLE_KEY, PYTHON_UDF_ENABLE_DEFAULT)) {
PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration);
PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig);
pythonUdfManager.registerPythonFunctions();
}

String[] functionFactoryClasses = configuration
.getString(Constants.FUNCTION_FACTORY_CLASSES_KEY, Constants.FUNCTION_FACTORY_CLASSES_DEFAULT)
.split(",");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.odpf.dagger.core.processors.common;

import io.odpf.dagger.core.processors.ColumnNameManager;
import io.odpf.dagger.core.processors.types.SourceConfig;
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.types.Row;

Expand Down Expand Up @@ -29,7 +29,6 @@
*/
public class EndpointHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(EndpointHandler.class.getName());
private SourceConfig sourceConfig;
private MeterStatsManager meterStatsManager;
private ErrorReporter errorReporter;
private String[] inputProtoClasses;
Expand All @@ -48,13 +47,11 @@ public class EndpointHandler {
* @param columnNameManager the column name manager
* @param descriptorManager the descriptor manager
*/
public EndpointHandler(SourceConfig sourceConfig,
MeterStatsManager meterStatsManager,
public EndpointHandler(MeterStatsManager meterStatsManager,
ErrorReporter errorReporter,
String[] inputProtoClasses,
ColumnNameManager columnNameManager,
DescriptorManager descriptorManager) {
this.sourceConfig = sourceConfig;
this.meterStatsManager = meterStatsManager;
this.errorReporter = errorReporter;
this.inputProtoClasses = inputProtoClasses;
Expand All @@ -63,19 +60,20 @@ public EndpointHandler(SourceConfig sourceConfig,
}

/**
* Get endpoint or query variables values.
* Get external post processor variables values.
*
* @param rowManager the row manager
* @param variableType the variable type
* @parm variables the variable list
* @param resultFuture the result future
* @return the array object
*/
public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultFuture<Row> resultFuture) {
String queryVariables = sourceConfig.getVariables();
if (StringUtils.isEmpty(queryVariables)) {
public Object[] getVariablesValue(RowManager rowManager, ExternalPostProcessorVariableType variableType, String variables, ResultFuture<Row> resultFuture) {
if (StringUtils.isEmpty(variables)) {
return new Object[0];
}

String[] requiredInputColumns = queryVariables.split(",");
String[] requiredInputColumns = variables.split(",");
ArrayList<Object> inputColumnValues = new ArrayList<>();
if (descriptorMap == null) {
descriptorMap = createDescriptorMap(requiredInputColumns, inputProtoClasses, resultFuture);
Expand All @@ -84,7 +82,7 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF
for (String inputColumnName : requiredInputColumns) {
int inputColumnIndex = columnNameManager.getInputIndex(inputColumnName);
if (inputColumnIndex == -1) {
throw new InvalidConfigurationException(String.format("Column '%s' not found as configured in the endpoint/query variable", inputColumnName));
throw new InvalidConfigurationException(String.format("Column '%s' not found as configured in the '%s' variable", inputColumnName, variableType));
}

Descriptors.FieldDescriptor fieldDescriptor = descriptorMap.get(inputColumnName);
Expand All @@ -105,11 +103,12 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF
*
* @param resultFuture the result future
* @param rowManager the row manager
* @param endpointVariablesValues the endpoint variables values
* @param variables the request/header variables
* @param variablesValue the variables value
* @return the boolean
*/
public boolean isQueryInvalid(ResultFuture<Row> resultFuture, RowManager rowManager, Object[] endpointVariablesValues) {
if (!StringUtils.isEmpty(sourceConfig.getVariables()) && (Arrays.asList(endpointVariablesValues).isEmpty() || Arrays.stream(endpointVariablesValues).allMatch(""::equals))) {
public boolean isQueryInvalid(ResultFuture<Row> resultFuture, RowManager rowManager, String variables, Object[] variablesValue) {
if (!StringUtils.isEmpty(variables) && (Arrays.asList(variablesValue).isEmpty() || Arrays.stream(variablesValue).allMatch(""::equals))) {
LOGGER.warn("Could not populate any request variable. Skipping external calls");
meterStatsManager.markEvent(ExternalSourceAspects.EMPTY_INPUT);
resultFuture.complete(singleton(rowManager.getAll()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void open(Configuration configuration) throws Exception {
meterStatsManager = new MeterStatsManager(getRuntimeContext().getMetricGroup(), true);
}
if (endpointHandler == null) {
endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter,
endpointHandler = new EndpointHandler(meterStatsManager, errorReporter,
schemaConfig.getInputProtoClasses(), schemaConfig.getColumnNameManager(), descriptorManager);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import io.odpf.dagger.core.processors.external.AsyncConnector;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.types.Row;

import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
Expand Down Expand Up @@ -79,8 +79,8 @@ protected void createClient() {
protected void process(Row input, ResultFuture<Row> resultFuture) {
RowManager rowManager = new RowManager(input);
Object[] endpointVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, endpointVariablesValues)) {
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.ENDPOINT_VARIABLE, esSourceConfig.getVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, esSourceConfig.getVariables(), endpointVariablesValues)) {
return;
}
String esEndpoint = String.format(esSourceConfig.getPattern(), endpointVariablesValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,8 +91,8 @@ protected void process(Row input, ResultFuture<Row> resultFuture) throws Excepti
RowManager rowManager = new RowManager(input);

Object[] requestVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) {
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, grpcSourceConfig.getVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, grpcSourceConfig.getVariables(), requestVariablesValues)) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.flink.types.Row;

import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,12 +95,14 @@ protected void process(Row input, ResultFuture<Row> resultFuture) {
RowManager rowManager = new RowManager(input);

Object[] requestVariablesValues = getEndpointHandler()
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) {
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, httpSourceConfig.getRequestVariables(), resultFuture);
Object[] dynamicHeaderVariablesValues = getEndpointHandler()
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.HEADER_VARIABLES, httpSourceConfig.getHeaderVariables(), resultFuture);
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getRequestVariables(), requestVariablesValues) || getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getHeaderVariables(), dynamicHeaderVariablesValues)) {
return;
}

BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues);
BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues);
HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getMeterStatsManager(),
rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry());
httpResponseHandler.startTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
private String verb;
private String requestPattern;
private String requestVariables;
private String headerPattern;
private String headerVariables;
private String streamTimeout;
private String connectTimeout;
private boolean failOnErrors;
Expand All @@ -40,21 +42,25 @@ public class HttpSourceConfig implements Serializable, SourceConfig {
* @param verb the verb
* @param requestPattern the request pattern
* @param requestVariables the request variables
* @param headerPattern the dynamic header pattern
* @param headerVariables the header variables
* @param streamTimeout the stream timeout
* @param connectTimeout the connect timeout
* @param failOnErrors the fail on errors
* @param type the type
* @param capacity the capacity
* @param headers the headers
* @param headers the static headers
* @param outputMapping the output mapping
* @param metricId the metric id
* @param retainResponseType the retain response type
*/
public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
public HttpSourceConfig(String endpoint, String verb, String requestPattern, String requestVariables, String headerPattern, String headerVariables, String streamTimeout, String connectTimeout, boolean failOnErrors, String type, String capacity, Map<String, String> headers, Map<String, OutputMapping> outputMapping, String metricId, boolean retainResponseType) {
this.endpoint = endpoint;
this.verb = verb;
this.requestPattern = requestPattern;
this.requestVariables = requestVariables;
this.headerPattern = headerPattern;
this.headerVariables = headerVariables;
this.streamTimeout = streamTimeout;
this.connectTimeout = connectTimeout;
this.failOnErrors = failOnErrors;
Expand Down Expand Up @@ -102,6 +108,24 @@ public String getRequestVariables() {
return requestVariables;
}

/**
* Gets header pattern.
*
* @return the header pattern
*/
public String getHeaderPattern() {
return headerPattern;
}

/**
* Gets header Variable.
*
* @return the header Variable
*/
public String getHeaderVariables() {
return headerVariables;
}

@Override
public String getPattern() {
return requestPattern;
Expand Down Expand Up @@ -208,11 +232,11 @@ public boolean equals(Object o) {
return false;
}
HttpSourceConfig that = (HttpSourceConfig) o;
return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
return failOnErrors == that.failOnErrors && retainResponseType == that.retainResponseType && Objects.equals(endpoint, that.endpoint) && Objects.equals(verb, that.verb) && Objects.equals(requestPattern, that.requestPattern) && Objects.equals(requestVariables, that.requestVariables) && Objects.equals(headerPattern, that.headerPattern) && Objects.equals(headerVariables, that.headerVariables) && Objects.equals(streamTimeout, that.streamTimeout) && Objects.equals(connectTimeout, that.connectTimeout) && Objects.equals(type, that.type) && Objects.equals(capacity, that.capacity) && Objects.equals(headers, that.headers) && Objects.equals(outputMapping, that.outputMapping) && Objects.equals(metricId, that.metricId);
}

@Override
public int hashCode() {
return Objects.hash(endpoint, verb, requestPattern, requestVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
return Objects.hash(endpoint, verb, requestPattern, requestVariables, headerPattern, headerVariables, streamTimeout, connectTimeout, failOnErrors, type, capacity, headers, outputMapping, metricId, retainResponseType);
}
}
Loading