diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/Compiler.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/Compiler.java similarity index 87% rename from flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/Compiler.java rename to flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/Compiler.java index f882306..9487b25 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/Compiler.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/Compiler.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.uber.athena.flux.flink.compiler; +package com.uber.athena.flux.flink.compiler.api; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -34,8 +34,8 @@ public interface Compiler { * Compile the thing. * * @param senv stream execution environment - * @param fluxContext flux context + * @param compilerContext flux context * @param vertex compilation vertex. */ - void compile(StreamExecutionEnvironment senv, FluxContext fluxContext, CompilationVertex vertex); + void compile(StreamExecutionEnvironment senv, CompilerContext compilerContext, CompilerVertex vertex); } diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/FluxContext.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/CompilerContext.java similarity index 72% rename from flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/FluxContext.java rename to flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/CompilerContext.java index ecb7b2c..24d74bb 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/FluxContext.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/CompilerContext.java @@ -16,45 +16,42 @@ * limitations under the License. */ -package com.uber.athena.flux.flink.compiler; +package com.uber.athena.flux.flink.compiler.api; import com.uber.athena.flux.model.TopologyDef; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.operators.StreamOperator; import java.util.HashMap; import java.util.List; import java.util.Map; -public class FluxContext { +public class CompilerContext { // parsed Topology definition private TopologyDef topologyDef; - // Storm config + // Flink config private Configuration config; // components required to be instantiated from classpath JARs private List additionalComponents; /** - * The following are materialized objects from the {@link TopologyDef}. + * The following are materialized objects from the {@code TopologyDef}. */ private Map componentMap = new HashMap(); - private Map> dataStreamSourceMap = new HashMap>(); + private Map> dataStreamSourceMap = new HashMap<>(); - private Map> operatorMap = new HashMap>(); + private Map> operatorMap = new HashMap<>(); - private Map> dataStreamSinkMap = new HashMap>(); + private Map> dataStreamSinkMap = new HashMap<>(); /** - * The following is used by {@link CompilationGraph}. + * The following is used by {@code CompilerGraph}. */ - private Map compilationVertexMap = new HashMap<>(); + private Map> compilationVertexMap = new HashMap<>(); - public FluxContext(TopologyDef topologyDef, Configuration config) { + public CompilerContext(TopologyDef topologyDef, Configuration config) { this.topologyDef = topologyDef; this.config = config; } @@ -85,7 +82,7 @@ public void setConfig(Configuration config) { * @param id source id * @param source source object */ - public void addSource(String id, DataStreamSource source) { + public void addSource(String id, CompilerVertex source) { this.dataStreamSourceMap.put(id, source); } @@ -95,14 +92,14 @@ public void addSource(String id, DataStreamSource source) { * @param id sink ID * @param sink sink object */ - public void addSink(String id, DataStreamSink sink) { + public void addSink(String id, CompilerVertex sink) { this.dataStreamSinkMap.put(id, sink); } /** * add operator. */ - public void addOperator(String id, StreamOperator op) { + public void addOperator(String id, CompilerVertex op) { this.operatorMap.put(id, op); } @@ -131,7 +128,7 @@ public Object getComponent(String id) { * @param key vertex id, identical to the ComponentDef ID * @param value compilation vertex. */ - public void putCompilationVertex(String key, CompilationVertex value) { + public void putCompilationVertex(String key, CompilerVertex value) { compilationVertexMap.put(key, value); } @@ -141,7 +138,7 @@ public void putCompilationVertex(String key, CompilationVertex value) { * @param key vertex id, identical to the ComponentDef ID * @return compilation vertex. */ - public CompilationVertex getCompilationVertex(String key) { + public CompilerVertex getCompilationVertex(String key) { return compilationVertexMap.get(key); } } diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/CompilationGraph.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/CompilerGraph.java similarity index 64% rename from flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/CompilationGraph.java rename to flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/CompilerGraph.java index 19a7271..549371e 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/CompilationGraph.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/CompilerGraph.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package com.uber.athena.flux.flink.compiler; +package com.uber.athena.flux.flink.compiler.api; +import com.uber.athena.flux.flink.compiler.impl.datastream.DataStreamCompilerImpl; import com.uber.athena.flux.flink.runtime.FluxTopologyImpl; import com.uber.athena.flux.model.EdgeDef; import com.uber.athena.flux.model.OperatorDef; @@ -36,15 +37,10 @@ /** * Object holder for compilation procedure. */ -public class CompilationGraph { - private final FluxContext fluxContext; - private final StreamExecutionEnvironment senv; - private Queue compilationQueue = new PriorityQueue<>(); - - public CompilationGraph(StreamExecutionEnvironment senv, FluxContext fluxContext) { - this.senv = senv; - this.fluxContext = fluxContext; - } +public abstract class CompilerGraph { + protected CompilerContext compilerContext; + protected StreamExecutionEnvironment senv; + protected Queue> compilationQueue = new PriorityQueue<>(); /** * Compile current graph into a {@code FluxTopology}. @@ -52,34 +48,36 @@ public CompilationGraph(StreamExecutionEnvironment senv, FluxContext fluxContext * @return the topology */ public FluxTopologyImpl compile() { - constructCompilationGraph(fluxContext); - compileVertexQueue(senv, fluxContext); + constructCompilationGraph(compilerContext); + compileVertexQueue(senv, compilerContext); JobGraph jobGraph = senv.getStreamGraph().getJobGraph(); - FluxTopologyImpl fluxTopology = new FluxTopologyImpl(); + FluxTopologyImpl fluxTopology = new FluxTopologyImpl(senv); fluxTopology.setJobGraph(jobGraph); return fluxTopology; } - private void constructCompilationGraph(FluxContext fluxContext) { - Map compilationVertexBuilders = new HashMap<>(); - TopologyDef topologyDef = fluxContext.getTopologyDef(); + public abstract CompilerVertex constructCompilerVertex(CompilerVertex.Builder vertexBuilder); + + private void constructCompilationGraph(CompilerContext compilerContext) { + Map compilationVertexBuilders = new HashMap<>(); + TopologyDef topologyDef = compilerContext.getTopologyDef(); // Build the Compilation Graph // Add all vertices for (SourceDef sourceDef : topologyDef.getSources()) { compilationVertexBuilders.put( sourceDef.getId(), - new CompilationVertex.Builder().setVertex(sourceDef)); + new CompilerVertex.Builder().setVertex(sourceDef)); } for (SinkDef sinkDef : topologyDef.getSinks()) { compilationVertexBuilders.put( sinkDef.getId(), - new CompilationVertex.Builder().setVertex(sinkDef)); + new CompilerVertex.Builder().setVertex(sinkDef)); } for (OperatorDef operatorDef : topologyDef.getOperators()) { compilationVertexBuilders.put( operatorDef.getId(), - new CompilationVertex.Builder().setVertex(operatorDef)); + new CompilerVertex.Builder().setVertex(operatorDef)); } // Add all edges @@ -90,24 +88,24 @@ private void constructCompilationGraph(FluxContext fluxContext) { .addIncomingEdge(streamDef); } - for (Map.Entry entry : compilationVertexBuilders.entrySet()) { - CompilationVertex vertex = entry.getValue().build(); - this.fluxContext.putCompilationVertex(entry.getKey(), vertex); + for (Map.Entry entry : compilationVertexBuilders.entrySet()) { + CompilerVertex vertex = constructCompilerVertex(entry.getValue()); + this.compilerContext.putCompilationVertex(entry.getKey(), vertex); if (vertex.readyToCompile()) { this.compilationQueue.add(vertex); } } } - private void compileVertexQueue(StreamExecutionEnvironment senv, FluxContext fluxContext) { - CompilerImpl compilerImpl = new CompilerImpl(); + private void compileVertexQueue(StreamExecutionEnvironment senv, CompilerContext compilerContext) { + DataStreamCompilerImpl dataStreamCompilerImpl = new DataStreamCompilerImpl(); while (this.compilationQueue.size() > 0) { - CompilationVertex vertex = this.compilationQueue.poll(); - compilerImpl.compile(senv, fluxContext, vertex); + CompilerVertex vertex = this.compilationQueue.poll(); + dataStreamCompilerImpl.compile(senv, compilerContext, vertex); // set downstream vertex compilation flags. for (EdgeDef downstreamEdge : vertex.getOutgoingEdge()) { - CompilationVertex toVertex = this.fluxContext.getCompilationVertex(downstreamEdge.getTo()); + CompilerVertex toVertex = this.compilerContext.getCompilationVertex(downstreamEdge.getTo()); toVertex.addCompiledSourceCount(); if (toVertex.readyToCompile()) { this.compilationQueue.add(toVertex); diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/CompilationVertex.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/CompilerVertex.java similarity index 73% rename from flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/CompilationVertex.java rename to flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/CompilerVertex.java index d5e31c6..b020916 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/CompilationVertex.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/api/CompilerVertex.java @@ -16,30 +16,26 @@ * limitations under the License. */ -package com.uber.athena.flux.flink.compiler; +package com.uber.athena.flux.flink.compiler.api; import com.uber.athena.flux.model.EdgeDef; import com.uber.athena.flux.model.VertexDef; -import org.apache.flink.streaming.api.datastream.DataStream; import java.util.ArrayList; import java.util.List; -public class CompilationVertex { - - private final VertexDef vertex; - private final List incomingEdge; - private final List outgoingEdge; +/** + * Compiler vertex used within a Flux compilation context. + * + * @param type of the compilation results generated. + */ +public abstract class CompilerVertex { - private int compiledSourceCount; - private DataStream dataStream = null; + protected VertexDef vertex; + protected List incomingEdge; + protected List outgoingEdge; - CompilationVertex(VertexDef vertex, List incomingEdge, List outgoingEdge) { - this.vertex = vertex; - this.incomingEdge = incomingEdge; - this.outgoingEdge = outgoingEdge; - this.compiledSourceCount = 0; - } + protected int compiledSourceCount; /** * Increase compilation flag by one. Used after an upstream vertex has been compiled. @@ -57,6 +53,20 @@ public boolean readyToCompile() { return this.compiledSourceCount == incomingEdge.size(); } + /** + * Setting the result of the compilation. + * + * @param compilationResult the compilation result. + */ + public abstract void setCompilationResult(T compilationResult); + + /** + * Getting the result of the compilation, return null if not compiled. + * + * @return the compilation result. + */ + public abstract T getCompilationResult(); + //------------------------------------------------------------------------- // Getters //------------------------------------------------------------------------- @@ -73,15 +83,6 @@ public List getOutgoingEdge() { return outgoingEdge; } - public DataStream getDataStream() { - return dataStream; - } - - public void setDataStream(DataStream dataStream) { - this.dataStream = dataStream; - } - - // ------------------------------------------------------------------------ // Builder pattern // ------------------------------------------------------------------------ @@ -112,11 +113,16 @@ public Builder addOutgoingEdge(EdgeDef edgeDef) { return this; } - public CompilationVertex build() { - return new CompilationVertex( - vertex, - incomingEdge, - outgoingEdge); + public VertexDef getVertex() { + return vertex; + } + + public List getIncomingEdge() { + return incomingEdge; + } + + public List getOutgoingEdge() { + return outgoingEdge; } } } diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/FluxCompilerSuite.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/FluxCompilerSuite.java similarity index 66% rename from flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/FluxCompilerSuite.java rename to flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/FluxCompilerSuite.java index 67dd4fd..6613350 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/FluxCompilerSuite.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/FluxCompilerSuite.java @@ -16,8 +16,11 @@ * limitations under the License. */ -package com.uber.athena.flux.flink.compiler; +package com.uber.athena.flux.flink.compiler.impl; +import com.uber.athena.flux.flink.compiler.api.CompilerContext; +import com.uber.athena.flux.flink.compiler.api.CompilerGraph; +import com.uber.athena.flux.flink.compiler.impl.datastream.DataStreamCompilerGraph; import com.uber.athena.flux.flink.runtime.FluxTopologyImpl; import com.uber.athena.flux.model.TopologyDef; import org.apache.flink.configuration.Configuration; @@ -26,15 +29,19 @@ /** * Compilation framework for the flux topology. + * + *

Based on the topology definition type, and the supported compiler, + * this compiler suite will find the appropriate compilation framework + * to construct the Flink topology job graph. */ public class FluxCompilerSuite { private final TopologyDef topologyDef; private final Configuration config; private final StreamExecutionEnvironment streamExecutionEnvironment; - private final FluxContext fluxContext; + private final CompilerContext compilerContext; - private CompilationGraph compilationGraph; + private CompilerGraph compilerGraph; public FluxCompilerSuite( TopologyDef topologyDef, @@ -43,16 +50,21 @@ public FluxCompilerSuite( this.streamExecutionEnvironment = streamExecutionEnvironment; this.topologyDef = topologyDef; this.config = new Configuration(config); - this.fluxContext = new FluxContext(topologyDef, config); - this.compilationGraph = new CompilationGraph( + this.compilerContext = new CompilerContext(topologyDef, config); + // TODO: determine compilation graph impl based on API level. + this.compilerGraph = new DataStreamCompilerGraph( this.streamExecutionEnvironment, - this.fluxContext); + this.compilerContext) { + }; } /** * compile topology definition to {@code FluxTopology}. * - * @return flux topology. + *

The compilation should invoke the compilation framework based on + * constructed settings. + * + * @return a flux topology, different compilation suits might return different implementations. */ public FluxTopologyImpl compile() { Preconditions.checkNotNull(topologyDef, "topology cannot be null!"); @@ -61,6 +73,6 @@ public FluxTopologyImpl compile() { } private FluxTopologyImpl compileInternal() { - return this.compilationGraph.compile(); + return this.compilerGraph.compile(); } } diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/DataStreamCompilerGraph.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/DataStreamCompilerGraph.java new file mode 100644 index 0000000..e8d1c45 --- /dev/null +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/DataStreamCompilerGraph.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.athena.flux.flink.compiler.impl.datastream; + +import com.uber.athena.flux.flink.compiler.api.CompilerContext; +import com.uber.athena.flux.flink.compiler.api.CompilerGraph; +import com.uber.athena.flux.flink.compiler.api.CompilerVertex; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Object holder for compilation procedure specific for DataStream. + */ +public abstract class DataStreamCompilerGraph extends CompilerGraph { + + public DataStreamCompilerGraph(StreamExecutionEnvironment senv, CompilerContext compilerContext) { + super.senv = senv; + super.compilerContext = compilerContext; + } + + public CompilerVertex constructCompilerVertex(CompilerVertex.Builder vertexBuilder) { + return new DataStreamCompilerVertex( + vertexBuilder.getVertex(), + vertexBuilder.getIncomingEdge(), + vertexBuilder.getOutgoingEdge() + ); + } +} diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/CompilerImpl.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/DataStreamCompilerImpl.java similarity index 61% rename from flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/CompilerImpl.java rename to flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/DataStreamCompilerImpl.java index 786f2f1..8d90308 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/CompilerImpl.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/DataStreamCompilerImpl.java @@ -16,42 +16,45 @@ * limitations under the License. */ -package com.uber.athena.flux.flink.compiler; +package com.uber.athena.flux.flink.compiler.impl.datastream; +import com.uber.athena.flux.flink.compiler.api.Compiler; +import com.uber.athena.flux.flink.compiler.api.CompilerContext; +import com.uber.athena.flux.flink.compiler.api.CompilerVertex; import com.uber.athena.flux.model.OperatorDef; import com.uber.athena.flux.model.SinkDef; import com.uber.athena.flux.model.SourceDef; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Preconditions; -import static com.uber.athena.flux.flink.compiler.utils.CompilationUtils.compileOperator; -import static com.uber.athena.flux.flink.compiler.utils.CompilationUtils.compileSink; -import static com.uber.athena.flux.flink.compiler.utils.CompilationUtils.compileSource; +import static com.uber.athena.flux.flink.compiler.impl.datastream.utils.DataStreamCompilationUtils.compileOperator; +import static com.uber.athena.flux.flink.compiler.impl.datastream.utils.DataStreamCompilationUtils.compileSink; +import static com.uber.athena.flux.flink.compiler.impl.datastream.utils.DataStreamCompilationUtils.compileSource; /** * Compiler implementation for operator-level Flux compilation. */ -public class CompilerImpl implements Compiler { +public class DataStreamCompilerImpl implements Compiler { - public CompilerImpl() { + public DataStreamCompilerImpl() { } /** * Compile a single vertex into chaining datastream. * @param senv stream execution environment - * @param fluxContext flux context + * @param compilerContext flux context * @param vertex compilation vertex. */ @Override - public void compile(StreamExecutionEnvironment senv, FluxContext fluxContext, CompilationVertex vertex) { + public void compile(StreamExecutionEnvironment senv, CompilerContext compilerContext, CompilerVertex vertex) { Preconditions.checkArgument(vertex.readyToCompile()); try { if (vertex.getVertex() instanceof SourceDef) { - compileSource(fluxContext, senv, vertex); + compileSource(compilerContext, senv, (DataStreamCompilerVertex) vertex); } else if (vertex.getVertex() instanceof OperatorDef) { - compileOperator(fluxContext, vertex); + compileOperator(compilerContext, (DataStreamCompilerVertex) vertex); } else if (vertex.getVertex() instanceof SinkDef) { - compileSink(fluxContext, vertex); + compileSink(compilerContext, (DataStreamCompilerVertex) vertex); } } catch (Exception e) { throw new RuntimeException("Cannot compile vertex: " + vertex.getVertex().getId(), e); diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/DataStreamCompilerVertex.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/DataStreamCompilerVertex.java new file mode 100644 index 0000000..9f5d0b6 --- /dev/null +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/DataStreamCompilerVertex.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.athena.flux.flink.compiler.impl.datastream; + +import com.uber.athena.flux.flink.compiler.api.CompilerVertex; +import com.uber.athena.flux.model.EdgeDef; +import com.uber.athena.flux.model.VertexDef; +import org.apache.flink.streaming.api.datastream.DataStream; + +import java.util.List; + +public class DataStreamCompilerVertex extends CompilerVertex { + + private DataStream dataStream; + + DataStreamCompilerVertex(VertexDef vertex, List incomingEdge, List outgoingEdge) { + this.vertex = vertex; + this.incomingEdge = incomingEdge; + this.outgoingEdge = outgoingEdge; + this.compiledSourceCount = 0; + } + + @Override + public void setCompilationResult(DataStream compilationResult) { + this.dataStream = compilationResult; + } + + @Override + public DataStream getCompilationResult() { + return this.dataStream; + } +} diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/utils/CompilationUtils.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/utils/DataStreamCompilationUtils.java similarity index 80% rename from flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/utils/CompilationUtils.java rename to flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/utils/DataStreamCompilationUtils.java index b3b3e3f..1832caf 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/utils/CompilationUtils.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/impl/datastream/utils/DataStreamCompilationUtils.java @@ -16,10 +16,12 @@ * limitations under the License. */ -package com.uber.athena.flux.flink.compiler.utils; +package com.uber.athena.flux.flink.compiler.impl.datastream.utils; -import com.uber.athena.flux.flink.compiler.CompilationVertex; -import com.uber.athena.flux.flink.compiler.FluxContext; +import com.uber.athena.flux.flink.compiler.api.CompilerContext; +import com.uber.athena.flux.flink.compiler.api.CompilerVertex; +import com.uber.athena.flux.flink.compiler.impl.datastream.DataStreamCompilerVertex; +import com.uber.athena.flux.flink.compiler.utils.ReflectiveInvokeUtils; import com.uber.athena.flux.model.ConfigMethodDef; import com.uber.athena.flux.model.ObjectDef; import com.uber.athena.flux.model.OperatorDef; @@ -29,7 +31,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -46,45 +47,45 @@ import java.util.List; @SuppressWarnings("unchecked") -public final class CompilationUtils { - private static final Logger LOG = LoggerFactory.getLogger(CompilationUtils.class); +public final class DataStreamCompilationUtils { + private static final Logger LOG = LoggerFactory.getLogger(DataStreamCompilationUtils.class); - private CompilationUtils() { + private DataStreamCompilationUtils() { } /** * compile source. * - * @param fluxContext flux context + * @param compilerContext flux context * @param senv stream execution environment to start the source definition * @param vertex compilation vertex * @throws Exception when compilation fails. */ public static void compileSource( - FluxContext fluxContext, + CompilerContext compilerContext, StreamExecutionEnvironment senv, - CompilationVertex vertex) throws Exception { + DataStreamCompilerVertex vertex) throws Exception { // Compile vertex SourceDef sourceDef = (SourceDef) vertex.getVertex(); - SourceFunction sourceFunction = (SourceFunction) buildObject(sourceDef, fluxContext); + SourceFunction sourceFunction = (SourceFunction) buildObject(sourceDef, compilerContext); DataStreamSource dataStreamSource = senv.addSource(sourceFunction, sourceDef.getId()); // set compilation results - vertex.setDataStream(dataStreamSource); - fluxContext.addSource(sourceDef.getId(), dataStreamSource); + vertex.setCompilationResult(dataStreamSource); + compilerContext.addSource(sourceDef.getId(), vertex); } /** * compile operator. * - * @param fluxContext flux context + * @param compilerContext flux context * @param vertex compilation vertex * @throws Exception when compilation fails. */ public static void compileOperator( - FluxContext fluxContext, - CompilationVertex vertex) throws Exception { + CompilerContext compilerContext, + DataStreamCompilerVertex vertex) throws Exception { if (vertex.getIncomingEdge().size() != 1) { throw new UnsupportedOperationException( "Cannot compile zero input or multiple input operators as this moment"); @@ -92,31 +93,31 @@ public static void compileOperator( // Fetch upstream OperatorDef operatorDef = (OperatorDef) vertex.getVertex(); String sourceId = vertex.getIncomingEdge().get(0).getFrom(); - CompilationVertex source = fluxContext.getCompilationVertex(sourceId); - DataStream sourceStream = source.getDataStream(); + CompilerVertex source = compilerContext.getCompilationVertex(sourceId); + DataStream sourceStream = ((DataStreamCompilerVertex) source).getCompilationResult(); // Compile vertex - OneInputStreamOperator operator = (OneInputStreamOperator) buildObject(operatorDef, fluxContext); + OneInputStreamOperator operator = (OneInputStreamOperator) buildObject(operatorDef, compilerContext); DataStream stream = sourceStream.transform( operatorDef.getId(), resolveTypeInformation(operatorDef.getTypeInformation()), operator); // set compilation results - vertex.setDataStream(stream); - fluxContext.addOperator(operatorDef.getId(), operator); + vertex.setCompilationResult(stream); + compilerContext.addOperator(operatorDef.getId(), vertex); } /** * compile sink. * - * @param fluxContext flux context + * @param compilerContext flux context * @param vertex compilation vertex * @throws Exception when compilation fails. */ public static void compileSink( - FluxContext fluxContext, - CompilationVertex vertex) throws Exception { + CompilerContext compilerContext, + DataStreamCompilerVertex vertex) throws Exception { if (vertex.getIncomingEdge().size() != 1) { throw new UnsupportedOperationException( "Cannot compile zero input or multiple input sink as this moment"); @@ -124,18 +125,19 @@ public static void compileSink( // Fetch upstream SinkDef sinkDef = (SinkDef) vertex.getVertex(); String sourceId = vertex.getIncomingEdge().get(0).getFrom(); - CompilationVertex source = fluxContext.getCompilationVertex(sourceId); - DataStream sourceStream = source.getDataStream(); + CompilerVertex source = compilerContext.getCompilationVertex(sourceId); + DataStream sourceStream = ((DataStreamCompilerVertex) source).getCompilationResult(); // Compile vertex - SinkFunction sink = (SinkFunction) buildObject(sinkDef, fluxContext); - DataStreamSink streamSink = sourceStream.addSink(sink); + SinkFunction sink = (SinkFunction) buildObject(sinkDef, compilerContext); + // returned DataStreamSink is ignored + sourceStream.addSink(sink); // set compilation results - fluxContext.addSink(sinkDef.getId(), streamSink); + compilerContext.addSink(sinkDef.getId(), vertex); } - private static Object buildObject(ObjectDef def, FluxContext fluxContext) throws Exception { + private static Object buildObject(ObjectDef def, CompilerContext compilerContext) throws Exception { Class clazz = Class.forName(def.getClassName()); Object obj = null; if (def.hasConstructorArgs()) { @@ -143,7 +145,7 @@ private static Object buildObject(ObjectDef def, FluxContext fluxContext) throws List cArgs = def.getConstructorArgs(); if (def.hasReferences()) { - cArgs = ReflectiveInvokeUtils.resolveReferences(cArgs, fluxContext); + cArgs = ReflectiveInvokeUtils.resolveReferences(cArgs, compilerContext); } Constructor con = ReflectiveInvokeUtils.findCompatibleConstructor(cArgs, clazz); @@ -161,12 +163,12 @@ private static Object buildObject(ObjectDef def, FluxContext fluxContext) throws } else { obj = clazz.newInstance(); } - applyProperties(def, obj, fluxContext); - invokeConfigMethods(def, obj, fluxContext); + applyProperties(def, obj, compilerContext); + invokeConfigMethods(def, obj, compilerContext); return obj; } - private static void applyProperties(ObjectDef bean, Object instance, FluxContext context) + private static void applyProperties(ObjectDef bean, Object instance, CompilerContext context) throws Exception { List props = bean.getProperties(); Class clazz = instance.getClass(); @@ -190,7 +192,7 @@ private static void applyProperties(ObjectDef bean, Object instance, FluxContext } } - private static void invokeConfigMethods(ObjectDef bean, Object instance, FluxContext context) + private static void invokeConfigMethods(ObjectDef bean, Object instance, CompilerContext context) throws InvocationTargetException, IllegalAccessException { List methodDefs = bean.getConfigMethods(); diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/utils/ReflectiveInvokeUtils.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/utils/ReflectiveInvokeUtils.java index 82b3aba..57c2d99 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/utils/ReflectiveInvokeUtils.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/compiler/utils/ReflectiveInvokeUtils.java @@ -18,7 +18,7 @@ package com.uber.athena.flux.flink.compiler.utils; -import com.uber.athena.flux.flink.compiler.FluxContext; +import com.uber.athena.flux.flink.compiler.api.CompilerContext; import com.uber.athena.flux.model.ComponentReferenceDef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +30,7 @@ import java.util.List; @SuppressWarnings("unchecked") -final class ReflectiveInvokeUtils { +public final class ReflectiveInvokeUtils { private static final Logger LOG = LoggerFactory.getLogger(ReflectiveInvokeUtils.class); private ReflectiveInvokeUtils() { @@ -44,7 +44,7 @@ private ReflectiveInvokeUtils() { * @return constructor method * @throws NoSuchMethodException cannot be found */ - static Constructor findCompatibleConstructor(List args, Class target) + public static Constructor findCompatibleConstructor(List args, Class target) throws NoSuchMethodException { Constructor retval = null; int eligibleCount = 0; @@ -80,7 +80,7 @@ static Constructor findCompatibleConstructor(List args, Class target) * @param context the flux compilation context used to search for reference objects. * @return java.lang.Method */ - static List resolveReferences(List args, FluxContext context) { + public static List resolveReferences(List args, CompilerContext context) { LOG.debug("Checking arguments for references."); List cArgs = new ArrayList(); // resolve references @@ -102,7 +102,7 @@ static List resolveReferences(List args, FluxContext context) { * @param methodName method name * @return java.lang.Method */ - static Method findCompatibleMethod(List args, Class target, String methodName) { + public static Method findCompatibleMethod(List args, Class target, String methodName) { Method retval = null; int eligibleCount = 0; @@ -194,7 +194,7 @@ private static boolean isPrimitiveBoolean(Class clazz) { * @param parameterTypes list of parameter types * @return argument object list. */ - static Object[] getArgsWithListCoercian(List args, Class[] parameterTypes) { + public static Object[] getArgsWithListCoercian(List args, Class[] parameterTypes) { if (parameterTypes.length != args.size()) { throw new IllegalArgumentException("Contructor parameter count does not egual argument size."); } diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyExecutionResultImpl.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxExecutionResultImpl.java similarity index 82% rename from flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyExecutionResultImpl.java rename to flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxExecutionResultImpl.java index 49f3f07..f2dbd6a 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyExecutionResultImpl.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxExecutionResultImpl.java @@ -18,13 +18,14 @@ package com.uber.athena.flux.flink.runtime; -import com.uber.athena.flux.api.topology.FluxTopologyExecutionResult; +import com.uber.athena.flux.api.topology.FluxExecutionResult; import org.apache.flink.api.common.JobExecutionResult; -public class FluxTopologyExecutionResultImpl implements FluxTopologyExecutionResult { +public class FluxExecutionResultImpl implements FluxExecutionResult { private transient JobExecutionResult executionResult; - public FluxTopologyExecutionResultImpl() { + public FluxExecutionResultImpl(JobExecutionResult jobExecutionResult) { + this.executionResult = jobExecutionResult; } public JobExecutionResult getExecutionResult() { diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyBuilderImpl.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyBuilderImpl.java index c18d8e8..5d098a3 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyBuilderImpl.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyBuilderImpl.java @@ -18,9 +18,8 @@ package com.uber.athena.flux.flink.runtime; -import com.uber.athena.flux.api.topology.FluxTopology; import com.uber.athena.flux.api.topology.FluxTopologyBuilder; -import com.uber.athena.flux.flink.compiler.FluxCompilerSuite; +import com.uber.athena.flux.flink.compiler.impl.FluxCompilerSuite; import com.uber.athena.flux.model.TopologyDef; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -52,7 +51,7 @@ public static FluxTopologyBuilderImpl createFluxBuilder() { } /** - * Create {@link FluxTopology} that is used by Flink to execute in runtime. + * Create {@code FluxTopology} that is used by Flink to execute in runtime. * * @param topologyDef YAML compiled topology definition * @param conf extra global configuration @@ -84,7 +83,7 @@ private static Configuration generateFlinkConfiguration(Map conf } /** - * Compile into a {@link FluxTopology}. + * Compile into a {@code FluxTopology}. * * @param topologyDef topology def * @param config global config @@ -92,7 +91,7 @@ private static Configuration generateFlinkConfiguration(Map conf * @throws IOException when compilation fails */ @Override - public FluxTopologyImpl getTopology( + public FluxTopologyImpl createTopology( TopologyDef topologyDef, Map config) throws IOException { @@ -105,22 +104,5 @@ public FluxTopologyImpl getTopology( throw wrapAsIOException(e); } } - - /** - * Execute the topology, must compile first. - *

- * Execution will happen in the current defined stream execution environment. - *

- * - * @param fluxTopology the flux topology - * @return execution results. - * @throws Exception when execution fails. - */ - @Override - public FluxTopologyExecutionResultImpl execute(FluxTopology fluxTopology) throws Exception { - FluxTopologyExecutionResultImpl executionResult = new FluxTopologyExecutionResultImpl(); - executionResult.setExecutionResult(this.senv.execute()); - return executionResult; - } } diff --git a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyImpl.java b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyImpl.java index 1079368..b176598 100644 --- a/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyImpl.java +++ b/flink-flux-core/src/main/java/com/uber/athena/flux/flink/runtime/FluxTopologyImpl.java @@ -18,21 +18,30 @@ package com.uber.athena.flux.flink.runtime; +import com.uber.athena.flux.api.topology.FluxExecutionResult; import com.uber.athena.flux.api.topology.FluxTopology; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.List; /** - * implementation of the Flux Topology, encloses the job graph and the classpath jar files. + * implementation of the Flux Topology that is represented by Flink job graph. + * + *

This implementation encloses the {@link JobGraph} and the additional classpath jar files + * in the construction: job graph represents the compiled Flink topology logic; + * and the jar path(s) represents the additional user jar dependencies required for + * execution. */ public class FluxTopologyImpl implements FluxTopology { + private transient StreamExecutionEnvironment senv; private transient JobGraph jobGraph; private transient List additionalJars; - public FluxTopologyImpl() { + public FluxTopologyImpl(StreamExecutionEnvironment senv) { + this.senv = senv; } public List getAdditionalJars() { @@ -50,6 +59,21 @@ public JobGraph getJobGraph() { public void setJobGraph(JobGraph jobGraph) { this.jobGraph = jobGraph; } + + + + /** + * Execute the topology. + * + *

Execution will happen in the current defined stream execution environment. + * + * @return execution results. + * @throws Exception when execution fails. + */ + @Override + public FluxExecutionResult execute() throws Exception { + return new FluxExecutionResultImpl(senv.execute()); + } } diff --git a/flink-flux-core/src/test/java/com/uber/athena/flux/flink/runtime/TopologyRuntimeITCase.java b/flink-flux-core/src/test/java/com/uber/athena/flux/flink/runtime/TopologyRuntimeITCase.java index 906c0fe..5aeb781 100644 --- a/flink-flux-core/src/test/java/com/uber/athena/flux/flink/runtime/TopologyRuntimeITCase.java +++ b/flink-flux-core/src/test/java/com/uber/athena/flux/flink/runtime/TopologyRuntimeITCase.java @@ -31,8 +31,8 @@ public void testBasicTopologyRuntime() throws Exception { TopologyDef topologyDef = FluxParser.parseResource("/configs/basic_topology.yaml", false, true, null, false); topologyDef.validate(); FluxTopologyBuilderImpl fluxBuilder = FluxTopologyBuilderImpl.createFluxBuilder(); - FluxTopology topology = fluxBuilder.getTopology(topologyDef, null); - fluxBuilder.execute(topology); + FluxTopology topology = fluxBuilder.createTopology(topologyDef, null); + topology.execute(); } @Test @@ -40,8 +40,8 @@ public void testRepartitionTopologyRuntime() throws Exception { TopologyDef topologyDef = FluxParser.parseResource("/configs/repartition_topology.yaml", false, true, null, false); topologyDef.validate(); FluxTopologyBuilderImpl fluxBuilder = FluxTopologyBuilderImpl.createFluxBuilder(); - FluxTopology topology = fluxBuilder.getTopology(topologyDef, null); - fluxBuilder.execute(topology); + FluxTopology topology = fluxBuilder.createTopology(topologyDef, null); + topology.execute(); } @Test @@ -50,7 +50,7 @@ public void testKafkaTopologyRuntime() throws Exception { TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_topology.yaml", false, true, null, false); topologyDef.validate(); FluxTopologyBuilderImpl fluxBuilder = FluxTopologyBuilderImpl.createFluxBuilder(); - FluxTopology topology = fluxBuilder.getTopology(topologyDef, null); - fluxBuilder.execute(topology); + FluxTopology topology = fluxBuilder.createTopology(topologyDef, null); + topology.execute(); } } diff --git a/flink-flux-core/src/test/java/com/uber/athena/flux/flink/topology/TopologyCompilationTest.java b/flink-flux-core/src/test/java/com/uber/athena/flux/flink/topology/TopologyCompilationTest.java index 06b4137..7d23336 100644 --- a/flink-flux-core/src/test/java/com/uber/athena/flux/flink/topology/TopologyCompilationTest.java +++ b/flink-flux-core/src/test/java/com/uber/athena/flux/flink/topology/TopologyCompilationTest.java @@ -33,7 +33,7 @@ public void testBasicTopologyCompilation() throws Exception { TopologyDef topologyDef = FluxParser.parseResource("/configs/basic_topology.yaml", false, true, null, false); topologyDef.validate(); FluxTopologyBuilderImpl fluxBuilder = FluxTopologyBuilderImpl.createFluxBuilder(); - FluxTopologyImpl topology = fluxBuilder.getTopology(topologyDef, null); + FluxTopologyImpl topology = fluxBuilder.createTopology(topologyDef, null); assertNotNull(topology.getJobGraph()); } @@ -42,7 +42,7 @@ public void testRepartitionTopologyCompilation() throws Exception { TopologyDef topologyDef = FluxParser.parseResource("/configs/repartition_topology.yaml", false, true, null, false); topologyDef.validate(); FluxTopologyBuilderImpl fluxBuilder = FluxTopologyBuilderImpl.createFluxBuilder(); - FluxTopologyImpl topology = fluxBuilder.getTopology(topologyDef, null); + FluxTopologyImpl topology = fluxBuilder.createTopology(topologyDef, null); assertNotNull(topology.getJobGraph()); } @@ -51,7 +51,7 @@ public void testKafkaTopologyCompilation() throws Exception { TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_topology.yaml", false, true, null, false); topologyDef.validate(); FluxTopologyBuilderImpl fluxBuilder = FluxTopologyBuilderImpl.createFluxBuilder(); - FluxTopologyImpl topology = fluxBuilder.getTopology(topologyDef, null); + FluxTopologyImpl topology = fluxBuilder.createTopology(topologyDef, null); assertNotNull(topology.getJobGraph()); } } diff --git a/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopologyExecutionResult.java b/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxExecutionResult.java similarity index 81% rename from flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopologyExecutionResult.java rename to flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxExecutionResult.java index c3cfd1a..a888c85 100644 --- a/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopologyExecutionResult.java +++ b/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxExecutionResult.java @@ -18,5 +18,11 @@ package com.uber.athena.flux.api.topology; -public interface FluxTopologyExecutionResult { +/** + * Execution results from a Flux topology execution. + * + *

Specific Flux Topology execution framework should implement this to construct + * result. + */ +public interface FluxExecutionResult { } diff --git a/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopology.java b/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopology.java index 1cec683..3b7f13e 100644 --- a/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopology.java +++ b/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopology.java @@ -21,6 +21,15 @@ /** * Flux topology that can be self convert into Execution job graph. * + *

Specific Flux Topology execution framework should implement this interface + * and the concrete implementation should be executable within the framework. */ public interface FluxTopology { + + /** + * Execute the flux topology and generate {@code FluxExecutionResult}. + * + * @return the execution result. + */ + FluxExecutionResult execute() throws Exception; } diff --git a/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopologyBuilder.java b/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopologyBuilder.java index 8ad25ab..d1f20e8 100644 --- a/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopologyBuilder.java +++ b/flux-core/src/main/java/com/uber/athena/flux/api/topology/FluxTopologyBuilder.java @@ -26,13 +26,11 @@ /** * Marker interface for objects that can produce `StormTopology` objects. * - *

If a `topology-source` class implements the `getTopology()` method, Flux will - * call that method. Otherwise, it will introspect the given class and look for a - * similar method that produces a `StormTopology` instance. + *

If a `topology-source` class implements the `createTopology()` method, Flux will + * call that method. * - *

Note that it is not strictly necessary for a class to implement this interface. - * If a class defines a method with a similar signature, Flux should be able to find - * and invoke it. + *

A specific Flux execution framework should implement this interface and produce + * concrete {@code FluxTopology} implementations that can be executed within the framework. */ public interface FluxTopologyBuilder { @@ -43,13 +41,5 @@ public interface FluxTopologyBuilder { * @param config configuration global map * @return topology */ - FluxTopology getTopology(TopologyDef topologyDef, Map config) throws IOException; - - /** - * Execute the topology in current Builder environment. - * - * @param fluxTopology the flux topology - * @return execution result - */ - FluxTopologyExecutionResult execute(FluxTopology fluxTopology) throws Exception; + FluxTopology createTopology(TopologyDef topologyDef, Map config) throws IOException; } diff --git a/flux-core/src/main/java/com/uber/athena/flux/model/SinkDef.java b/flux-core/src/main/java/com/uber/athena/flux/model/SinkDef.java index 2a307a9..817cbb5 100644 --- a/flux-core/src/main/java/com/uber/athena/flux/model/SinkDef.java +++ b/flux-core/src/main/java/com/uber/athena/flux/model/SinkDef.java @@ -19,7 +19,7 @@ package com.uber.athena.flux.model; /** - * Bean representation of a Storm spout. + * Bean representation of a Flink sink. */ public class SinkDef extends VertexDef { } diff --git a/flux-core/src/main/java/com/uber/athena/flux/model/SourceDef.java b/flux-core/src/main/java/com/uber/athena/flux/model/SourceDef.java index 5cd645b..7100328 100644 --- a/flux-core/src/main/java/com/uber/athena/flux/model/SourceDef.java +++ b/flux-core/src/main/java/com/uber/athena/flux/model/SourceDef.java @@ -19,7 +19,7 @@ package com.uber.athena.flux.model; /** - * Bean representation of a Storm spout. + * Bean representation of a Flink Source. */ public class SourceDef extends VertexDef { } diff --git a/flux-core/src/main/java/com/uber/athena/flux/model/TopologyDef.java b/flux-core/src/main/java/com/uber/athena/flux/model/TopologyDef.java index bad61dc..8b32b82 100644 --- a/flux-core/src/main/java/com/uber/athena/flux/model/TopologyDef.java +++ b/flux-core/src/main/java/com/uber/athena/flux/model/TopologyDef.java @@ -32,10 +32,11 @@ * *

It consists of the following: * 1. The topology name - * 2. A `java.util.Map` representing the `org.apache.storm.config` for the topology - * 3. A list of spout definitions - * 4. A list of bolt definitions - * 5. A list of stream definitions that define the flow between spouts and bolts. + * 2. A `java.util.Map` representing the Flink {@code Configuration} for the topology + * 3. A list of source definitions + * 4. A list of operator definitions + * 5. A list of sink definitions + * 6. A list of stream definitions that define the flow between components. */ public class TopologyDef { private static final Logger LOG = LoggerFactory.getLogger(TopologyDef.class); @@ -45,7 +46,7 @@ public class TopologyDef { private List includes; private Map config = new HashMap(); - // the following are required if we're defining a core storm topology DAG in YAML, etc. + // the following are required if we're defining a core Flink topology DAG in YAML, etc. private Map operatorMap = new LinkedHashMap(); private Map sourceMap = new LinkedHashMap(); private Map sinkMap = new LinkedHashMap(); diff --git a/pom.xml b/pom.xml index 33038bd..ee67dd5 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,12 @@ under the License. ${project.version} + + com.uber.athena + flink-flux-core + ${project.version} + + org.yaml