Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Base runtime for streaming applications

Christian Kreutzfeldt edited this page Apr 26, 2016 · 10 revisions

The StreamingAppRuntime class provides a base runtime to serve as foundation for Apache Flink based streaming applications. The intention is to speed up development of pipelines simply by providing implementations for features that are required but do not directly contribute to the event processing itself. To achieve this the class follows the approach of dropwizard.io.

The runtime currently provides features for

  • parsing the command-line input
  • reading, parsing and validating configurations
  • configuring an execution environment

To leverage the runtime foundation the custom class must extend StreamingAppRuntime and provide a configuration which is based on StreamingAppConfiguration.

The latter may be annotated with constraints that are defined inside javax.validation.constraints. These instructions are validated by the underlying XML/JSON library and may produce error messages accordingly.

The application itself is required to provide an implementation for

protected abstract void run(final T configuration) throws Exception;

which sets up and executes the flink pipeline.

To execute the application it is required to provide a JSON formatted configuration file which follows the structure as defined by the configuration class structure described above.

Example

The following example shows how to implement a processing pipeline based on the runtime foundation

public class LogProcessing extends StreamingAppRuntime<LogProcessingConfiguration> {

  protected void run(LogProcessingConfiguration cfg) throws Exception {
    StreamExecutionEnvironment env = getExecutionEnvironment();
    env.enableCheckpointing();
    DataStream<JSONObject> kafkaStream = env.addSource(kafkaSource);
    DataStream<JSONObject> eventStream = kafkaStream.filter(new JsonContentFilter(cfg.getContentFilters()));
    eventStream.addSink(elasticsearchSink);
    env.execute(cfg.getApplicationName());
  }

  public static void main(String[] args) throws Exception {
    new LogProcessing().run(args, System.out, LogProcessingConfiguration.class);
  }