Skip to content

Commit

Permalink
feat: Add statsdreporter to bq sink (#196)
Browse files Browse the repository at this point in the history
* feat: Bigquery sink using depot (#154)

* chore: fix checkstyle

* feat: Checkpointing on bq sink (#187)

* feat: prepare for commit

* fix: clear the messages for pushing to bq

* docs: add documentation for BQ sink in Dagger

- [#188]

* chore: version bump of depot

* feat: add statsd reporter for bq sink

* chore: review comments

Co-authored-by: Sumit Aich <aich.1998@gmail.com>
  • Loading branch information
lavkesh and sumitaich1998 committed Sep 27, 2022
1 parent b93574c commit cdf61cb
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 132 deletions.
2 changes: 1 addition & 1 deletion dagger-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ configurations {
dependencies {
minimalJar project(path: ':dagger-common', configuration: 'minimalCommonJar')
minimalJar project(path: ':dagger-functions', configuration: 'minimalFunctionsJar')
minimalJar('io.odpf:depot:0.2.0') {
minimalJar('io.odpf:depot:0.3.1') {
exclude group: 'org.apache.httpcomponents'
exclude module: 'stencil', group: 'io.odpf'
exclude group: 'com.google.protobuf'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package io.odpf.dagger.core;

import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.core.config.ConfigurationProvider;
import io.odpf.dagger.core.config.ConfigurationProviderFactory;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.core.config.ConfigurationProvider;
import io.odpf.dagger.core.config.ConfigurationProviderFactory;

import java.util.TimeZone;

/**
Expand Down
70 changes: 33 additions & 37 deletions dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
package io.odpf.dagger.core;

import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import io.odpf.dagger.core.source.Stream;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ApiExpression;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.core.StreamInfo;
Expand All @@ -20,6 +9,7 @@
import io.odpf.dagger.common.watermark.StreamWatermarkAssigner;
import io.odpf.dagger.common.watermark.WatermarkStrategyDefinition;
import io.odpf.dagger.core.exception.UDFFactoryClassNotDefinedException;
import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import io.odpf.dagger.core.processors.PostProcessorFactory;
import io.odpf.dagger.core.processors.PreProcessorConfig;
import io.odpf.dagger.core.processors.PreProcessorFactory;
Expand All @@ -31,6 +21,14 @@
import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.functions.udfs.python.PythonUdfConfig;
import io.odpf.dagger.functions.udfs.python.PythonUdfManager;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ApiExpression;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.lang.reflect.Constructor;
Expand All @@ -48,11 +46,12 @@
*/
public class StreamManager {

private StencilClientOrchestrator stencilClientOrchestrator;
private final Configuration configuration;
private final StreamExecutionEnvironment executionEnvironment;
private StreamTableEnvironment tableEnvironment;
private MetricsTelemetryExporter telemetryExporter = new MetricsTelemetryExporter();
private final StreamTableEnvironment tableEnvironment;
private final MetricsTelemetryExporter telemetryExporter = new MetricsTelemetryExporter();
private StencilClientOrchestrator stencilClientOrchestrator;
private DaggerStatsDReporter daggerStatsDReporter;

/**
* Instantiates a new Stream manager.
Expand All @@ -74,6 +73,8 @@ public StreamManager(Configuration configuration, StreamExecutionEnvironment exe
*/
public StreamManager registerConfigs() {
stencilClientOrchestrator = new StencilClientOrchestrator(configuration);
org.apache.flink.configuration.Configuration flinkConfiguration = (org.apache.flink.configuration.Configuration) this.executionEnvironment.getConfiguration();
daggerStatsDReporter = DaggerStatsDReporter.Provider.provide(flinkConfiguration, configuration);

executionEnvironment.setMaxParallelism(configuration.getInteger(Constants.FLINK_PARALLELISM_MAX_KEY, Constants.FLINK_PARALLELISM_MAX_DEFAULT));
executionEnvironment.setParallelism(configuration.getInteger(FLINK_PARALLELISM_KEY, FLINK_PARALLELISM_DEFAULT));
Expand All @@ -86,7 +87,6 @@ public StreamManager registerConfigs() {
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(configuration.getInteger(FLINK_CHECKPOINT_MAX_CONCURRENT_KEY, FLINK_CHECKPOINT_MAX_CONCURRENT_DEFAULT));
executionEnvironment.getConfig().setGlobalJobParameters(configuration.getParam());


tableEnvironment.getConfig().setIdleStateRetention(Duration.ofMinutes(configuration.getInteger(FLINK_RETENTION_IDLE_STATE_MINUTE_KEY, FLINK_RETENTION_IDLE_STATE_MINUTE_DEFAULT)));
return this;
}
Expand All @@ -100,22 +100,23 @@ public StreamManager registerSourceWithPreProcessors() {
long watermarkDelay = configuration.getLong(FLINK_WATERMARK_DELAY_MS_KEY, FLINK_WATERMARK_DELAY_MS_DEFAULT);
Boolean enablePerPartitionWatermark = configuration.getBoolean(FLINK_WATERMARK_PER_PARTITION_ENABLE_KEY, FLINK_WATERMARK_PER_PARTITION_ENABLE_DEFAULT);
PreProcessorConfig preProcessorConfig = PreProcessorFactory.parseConfig(configuration);
getStreams().forEach(stream -> {
String tableName = stream.getStreamName();
WatermarkStrategyDefinition watermarkStrategyDefinition = getSourceWatermarkDefinition(enablePerPartitionWatermark);
DataStream<Row> dataStream = stream.registerSource(executionEnvironment, watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelay));
StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new LastColumnWatermark());

DataStream<Row> rowSingleOutputStreamOperator = streamWatermarkAssigner
.assignTimeStampAndWatermark(dataStream, watermarkDelay, enablePerPartitionWatermark);

TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType());
StreamInfo streamInfo = new StreamInfo(rowSingleOutputStreamOperator, tableSchema.getFieldNames());
streamInfo = addPreProcessor(streamInfo, tableName, preProcessorConfig);

Table table = tableEnvironment.fromDataStream(streamInfo.getDataStream(), getApiExpressions(streamInfo));
tableEnvironment.createTemporaryView(tableName, table);
});
StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter)
.forEach(stream -> {
String tableName = stream.getStreamName();
WatermarkStrategyDefinition watermarkStrategyDefinition = getSourceWatermarkDefinition(enablePerPartitionWatermark);
DataStream<Row> dataStream = stream.registerSource(executionEnvironment, watermarkStrategyDefinition.getWatermarkStrategy(watermarkDelay));
StreamWatermarkAssigner streamWatermarkAssigner = new StreamWatermarkAssigner(new LastColumnWatermark());

DataStream<Row> rowSingleOutputStreamOperator = streamWatermarkAssigner
.assignTimeStampAndWatermark(dataStream, watermarkDelay, enablePerPartitionWatermark);

TableSchema tableSchema = TableSchema.fromTypeInfo(dataStream.getType());
StreamInfo streamInfo = new StreamInfo(rowSingleOutputStreamOperator, tableSchema.getFieldNames());
streamInfo = addPreProcessor(streamInfo, tableName, preProcessorConfig);

Table table = tableEnvironment.fromDataStream(streamInfo.getDataStream(), getApiExpressions(streamInfo));
tableEnvironment.createTemporaryView(tableName, table);
});
return this;
}

Expand Down Expand Up @@ -228,12 +229,7 @@ private StreamInfo addPreProcessor(StreamInfo streamInfo, String tableName, PreP
private void addSink(StreamInfo streamInfo) {
SinkOrchestrator sinkOrchestrator = new SinkOrchestrator(telemetryExporter);
sinkOrchestrator.addSubscriber(telemetryExporter);
streamInfo.getDataStream().sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator));
streamInfo.getDataStream().sinkTo(sinkOrchestrator.getSink(configuration, streamInfo.getColumnNames(), stencilClientOrchestrator, daggerStatsDReporter));
}

List<Stream> getStreams() {
org.apache.flink.configuration.Configuration flinkConfiguration = (org.apache.flink.configuration.Configuration) this.executionEnvironment.getConfiguration();
DaggerStatsDReporter daggerStatsDReporter = DaggerStatsDReporter.Provider.provide(flinkConfiguration, configuration);
return StreamsFactory.getStreams(configuration, stencilClientOrchestrator, daggerStatsDReporter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.odpf.dagger.core.exception;

import java.io.IOException;

public class BigQueryWriterException extends IOException {

public BigQueryWriterException(String message, Throwable cause) {
super(message, cause);
}

public BigQueryWriterException(String message) {
super(message);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.odpf.dagger.core.sink;

import io.odpf.dagger.core.sink.bigquery.BigquerySinkBuilder;
import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import io.odpf.dagger.core.sink.bigquery.BigQuerySinkBuilder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
Expand Down Expand Up @@ -48,7 +49,7 @@ public SinkOrchestrator(MetricsTelemetryExporter telemetryExporter) {
* @columnNames columnNames the column names
* @StencilClientOrchestrator stencilClientOrchestrator the stencil client orchestrator
*/
public Sink getSink(Configuration configuration, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator) {
public Sink getSink(Configuration configuration, String[] columnNames, StencilClientOrchestrator stencilClientOrchestrator, DaggerStatsDReporter daggerStatsDReporter) {
String sinkType = configuration.getString("SINK_TYPE", "influx");
addMetric(TelemetryTypes.SINK_TYPE.getValue(), sinkType);
Sink sink;
Expand All @@ -73,8 +74,9 @@ public Sink getSink(Configuration configuration, String[] columnNames, StencilCl
sink = new LogSink(columnNames);
break;
case "bigquery":
sink = BigquerySinkBuilder.create()
sink = BigQuerySinkBuilder.create()
.setColumnNames(columnNames)
.setDaggerStatsDReporter(daggerStatsDReporter)
.setConfiguration(configuration)
.setStencilClientOrchestrator(stencilClientOrchestrator)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer;
import io.odpf.dagger.core.metrics.reporters.ErrorReporter;
import io.odpf.dagger.core.metrics.reporters.ErrorReporterFactory;
import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import io.odpf.dagger.core.utils.Constants;
import io.odpf.depot.OdpfSink;
import io.odpf.depot.bigquery.BigQuerySinkFactory;
Expand All @@ -24,30 +25,32 @@
import java.util.Optional;
import java.util.Set;

public class BigquerySink implements Sink<Row, Void, Void, Void> {
public class BigQuerySink implements Sink<Row, Void, Void, Void> {
private final ProtoSerializer protoSerializer;
private final Configuration configuration;
private final DaggerStatsDReporter daggerStatsDReporter;
private transient BigQuerySinkFactory sinkFactory;

protected BigquerySink(Configuration configuration, ProtoSerializer protoSerializer) {
this(configuration, protoSerializer, null);
protected BigQuerySink(Configuration configuration, ProtoSerializer protoSerializer, DaggerStatsDReporter daggerStatsDReporter) {
this(configuration, protoSerializer, null, daggerStatsDReporter);
}

/**
* Constructor for testing.
*/
protected BigquerySink(Configuration configuration, ProtoSerializer protoSerializer, BigQuerySinkFactory sinkFactory) {
protected BigQuerySink(Configuration configuration, ProtoSerializer protoSerializer, BigQuerySinkFactory sinkFactory, DaggerStatsDReporter daggerStatsDReporter) {
this.configuration = configuration;
this.protoSerializer = protoSerializer;
this.sinkFactory = sinkFactory;
this.daggerStatsDReporter = daggerStatsDReporter;
}

@Override
public SinkWriter<Row, Void, Void> createWriter(InitContext context, List<Void> states) {
ErrorReporter errorReporter = ErrorReporterFactory.getErrorReporter(context.metricGroup(), configuration);
if (sinkFactory == null) {
BigQuerySinkConfig sinkConfig = ConfigFactory.create(BigQuerySinkConfig.class, configuration.getParam().toMap());
sinkFactory = new BigQuerySinkFactory(sinkConfig);
sinkFactory = new BigQuerySinkFactory(sinkConfig, daggerStatsDReporter.buildStatsDReporter());
try {
sinkFactory.init();
} catch (Exception e) {
Expand All @@ -66,7 +69,7 @@ public SinkWriter<Row, Void, Void> createWriter(InitContext context, List<Void>
for (String s : Splitter.on(",").omitEmptyStrings().split(errorsForFailing)) {
errorTypesForFailing.add(ErrorType.valueOf(s.trim()));
}
return new BigquerySinkWriter(protoSerializer, odpfSink, batchSize, errorReporter, errorTypesForFailing);
return new BigQuerySinkWriter(protoSerializer, odpfSink, batchSize, errorReporter, errorTypesForFailing);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,34 @@
import io.odpf.dagger.common.configuration.Configuration;
import io.odpf.dagger.common.core.StencilClientOrchestrator;
import io.odpf.dagger.common.serde.proto.serialization.ProtoSerializer;
import io.odpf.dagger.core.metrics.reporters.statsd.DaggerStatsDReporter;
import org.apache.flink.api.java.utils.ParameterTool;

import java.util.HashMap;
import java.util.Map;

public class BigquerySinkBuilder {
public class BigQuerySinkBuilder {

private String[] columnNames;
private StencilClientOrchestrator stencilClientOrchestrator;
private Configuration configuration;
private DaggerStatsDReporter daggerStatsDReporter;

private BigquerySinkBuilder() {
private BigQuerySinkBuilder() {
}

public static BigquerySinkBuilder create() {
return new BigquerySinkBuilder();
public static BigQuerySinkBuilder create() {
return new BigQuerySinkBuilder();
}

public BigquerySink build() {
public BigQuerySink build() {
ProtoSerializer protoSerializer = new ProtoSerializer(
configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS", ""),
configuration.getString("SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS", ""),
columnNames,
stencilClientOrchestrator);
Configuration conf = setDefaultValues(configuration);
return new BigquerySink(conf, protoSerializer);
return new BigQuerySink(conf, protoSerializer, daggerStatsDReporter);
}

private Configuration setDefaultValues(Configuration inputConf) {
Expand All @@ -40,22 +42,28 @@ private Configuration setDefaultValues(Configuration inputConf) {
configMap.put("SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY", "LONG_POLLING");
configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS", "60000");
configMap.put("SCHEMA_REGISTRY_STENCIL_FETCH_HEADERS", "");
configMap.put("SINK_METRICS_APPLICATION_PREFIX", "dagger_");
configMap.put("SINK_BIGQUERY_ROW_INSERT_ID_ENABLE", "false");
return new Configuration(ParameterTool.fromMap(configMap));
}

public BigquerySinkBuilder setConfiguration(Configuration configuration) {
public BigQuerySinkBuilder setConfiguration(Configuration configuration) {
this.configuration = configuration;
return this;
}

public BigquerySinkBuilder setColumnNames(String[] columnNames) {
public BigQuerySinkBuilder setColumnNames(String[] columnNames) {
this.columnNames = columnNames;
return this;
}

public BigquerySinkBuilder setStencilClientOrchestrator(StencilClientOrchestrator stencilClientOrchestrator) {
public BigQuerySinkBuilder setStencilClientOrchestrator(StencilClientOrchestrator stencilClientOrchestrator) {
this.stencilClientOrchestrator = stencilClientOrchestrator;
return this;
}

public BigQuerySinkBuilder setDaggerStatsDReporter(DaggerStatsDReporter daggerStatsDReporter) {
this.daggerStatsDReporter = daggerStatsDReporter;
return this;
}
}

0 comments on commit cdf61cb

Please sign in to comment.