Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Modify the API structure for Flink-Flux compilation framework (#6)
Browse files Browse the repository at this point in the history
Initial commit to separate the Flink Flux compiler and execution / runtime / service / config framework. changes to APIs

1. Compiler API level contains the compiling vertex encap; the compilation graph algorithm: BFS; the compilation context that stores intermediate compilation data
2. Compiler Impl level has been moved to a datastream specific impl. later on can support operator specific impl.
3. Separated out specific utilities and API level independent components such as the compiler suite.
  • Loading branch information
walterddr committed May 12, 2019
1 parent 7602027 commit 46a946c
Show file tree
Hide file tree
Showing 22 changed files with 324 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package com.uber.athena.flux.flink.compiler;
package com.uber.athena.flux.flink.compiler.api;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Expand All @@ -34,8 +34,8 @@ public interface Compiler {
* Compile the thing.
*
* @param senv stream execution environment
* @param fluxContext flux context
* @param compilerContext flux context
* @param vertex compilation vertex.
*/
void compile(StreamExecutionEnvironment senv, FluxContext fluxContext, CompilationVertex vertex);
void compile(StreamExecutionEnvironment senv, CompilerContext compilerContext, CompilerVertex vertex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,42 @@
* limitations under the License.
*/

package com.uber.athena.flux.flink.compiler;
package com.uber.athena.flux.flink.compiler.api;

import com.uber.athena.flux.model.TopologyDef;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.operators.StreamOperator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FluxContext {
public class CompilerContext {
// parsed Topology definition
private TopologyDef topologyDef;

// Storm config
// Flink config
private Configuration config;

// components required to be instantiated from classpath JARs
private List<Object> additionalComponents;

/**
* The following are materialized objects from the {@link TopologyDef}.
* The following are materialized objects from the {@code TopologyDef}.
*/
private Map<String, Object> componentMap = new HashMap<String, Object>();

private Map<String, DataStreamSource<?>> dataStreamSourceMap = new HashMap<String, DataStreamSource<?>>();
private Map<String, CompilerVertex<?>> dataStreamSourceMap = new HashMap<>();

private Map<String, StreamOperator<?>> operatorMap = new HashMap<String, StreamOperator<?>>();
private Map<String, CompilerVertex<?>> operatorMap = new HashMap<>();

private Map<String, DataStreamSink<?>> dataStreamSinkMap = new HashMap<String, DataStreamSink<?>>();
private Map<String, CompilerVertex<?>> dataStreamSinkMap = new HashMap<>();

/**
* The following is used by {@link CompilationGraph}.
* The following is used by {@code CompilerGraph}.
*/
private Map<String, CompilationVertex> compilationVertexMap = new HashMap<>();
private Map<String, CompilerVertex<?>> compilationVertexMap = new HashMap<>();

public FluxContext(TopologyDef topologyDef, Configuration config) {
public CompilerContext(TopologyDef topologyDef, Configuration config) {
this.topologyDef = topologyDef;
this.config = config;
}
Expand Down Expand Up @@ -85,7 +82,7 @@ public void setConfig(Configuration config) {
* @param id source id
* @param source source object
*/
public void addSource(String id, DataStreamSource<?> source) {
public void addSource(String id, CompilerVertex<?> source) {
this.dataStreamSourceMap.put(id, source);
}

Expand All @@ -95,14 +92,14 @@ public void addSource(String id, DataStreamSource<?> source) {
* @param id sink ID
* @param sink sink object
*/
public void addSink(String id, DataStreamSink<?> sink) {
public void addSink(String id, CompilerVertex<?> sink) {
this.dataStreamSinkMap.put(id, sink);
}

/**
* add operator.
*/
public void addOperator(String id, StreamOperator<?> op) {
public void addOperator(String id, CompilerVertex<?> op) {
this.operatorMap.put(id, op);
}

Expand Down Expand Up @@ -131,7 +128,7 @@ public Object getComponent(String id) {
* @param key vertex id, identical to the ComponentDef ID
* @param value compilation vertex.
*/
public void putCompilationVertex(String key, CompilationVertex value) {
public void putCompilationVertex(String key, CompilerVertex<?> value) {
compilationVertexMap.put(key, value);
}

Expand All @@ -141,7 +138,7 @@ public void putCompilationVertex(String key, CompilationVertex value) {
* @param key vertex id, identical to the ComponentDef ID
* @return compilation vertex.
*/
public CompilationVertex getCompilationVertex(String key) {
public CompilerVertex<?> getCompilationVertex(String key) {
return compilationVertexMap.get(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package com.uber.athena.flux.flink.compiler;
package com.uber.athena.flux.flink.compiler.api;

import com.uber.athena.flux.flink.compiler.impl.datastream.DataStreamCompilerImpl;
import com.uber.athena.flux.flink.runtime.FluxTopologyImpl;
import com.uber.athena.flux.model.EdgeDef;
import com.uber.athena.flux.model.OperatorDef;
Expand All @@ -36,50 +37,47 @@
/**
* Object holder for compilation procedure.
*/
public class CompilationGraph {
private final FluxContext fluxContext;
private final StreamExecutionEnvironment senv;
private Queue<CompilationVertex> compilationQueue = new PriorityQueue<>();

public CompilationGraph(StreamExecutionEnvironment senv, FluxContext fluxContext) {
this.senv = senv;
this.fluxContext = fluxContext;
}
public abstract class CompilerGraph {
protected CompilerContext compilerContext;
protected StreamExecutionEnvironment senv;
protected Queue<CompilerVertex<?>> compilationQueue = new PriorityQueue<>();

/**
* Compile current graph into a {@code FluxTopology}.
*
* @return the topology
*/
public FluxTopologyImpl compile() {
constructCompilationGraph(fluxContext);
compileVertexQueue(senv, fluxContext);
constructCompilationGraph(compilerContext);
compileVertexQueue(senv, compilerContext);
JobGraph jobGraph = senv.getStreamGraph().getJobGraph();
FluxTopologyImpl fluxTopology = new FluxTopologyImpl();
FluxTopologyImpl fluxTopology = new FluxTopologyImpl(senv);
fluxTopology.setJobGraph(jobGraph);
return fluxTopology;
}

private void constructCompilationGraph(FluxContext fluxContext) {
Map<String, CompilationVertex.Builder> compilationVertexBuilders = new HashMap<>();
TopologyDef topologyDef = fluxContext.getTopologyDef();
public abstract CompilerVertex<?> constructCompilerVertex(CompilerVertex.Builder vertexBuilder);

private void constructCompilationGraph(CompilerContext compilerContext) {
Map<String, CompilerVertex.Builder> compilationVertexBuilders = new HashMap<>();
TopologyDef topologyDef = compilerContext.getTopologyDef();

// Build the Compilation Graph
// Add all vertices
for (SourceDef sourceDef : topologyDef.getSources()) {
compilationVertexBuilders.put(
sourceDef.getId(),
new CompilationVertex.Builder().setVertex(sourceDef));
new CompilerVertex.Builder().setVertex(sourceDef));
}
for (SinkDef sinkDef : topologyDef.getSinks()) {
compilationVertexBuilders.put(
sinkDef.getId(),
new CompilationVertex.Builder().setVertex(sinkDef));
new CompilerVertex.Builder().setVertex(sinkDef));
}
for (OperatorDef operatorDef : topologyDef.getOperators()) {
compilationVertexBuilders.put(
operatorDef.getId(),
new CompilationVertex.Builder().setVertex(operatorDef));
new CompilerVertex.Builder().setVertex(operatorDef));
}

// Add all edges
Expand All @@ -90,24 +88,24 @@ private void constructCompilationGraph(FluxContext fluxContext) {
.addIncomingEdge(streamDef);
}

for (Map.Entry<String, CompilationVertex.Builder> entry : compilationVertexBuilders.entrySet()) {
CompilationVertex vertex = entry.getValue().build();
this.fluxContext.putCompilationVertex(entry.getKey(), vertex);
for (Map.Entry<String, CompilerVertex.Builder> entry : compilationVertexBuilders.entrySet()) {
CompilerVertex vertex = constructCompilerVertex(entry.getValue());
this.compilerContext.putCompilationVertex(entry.getKey(), vertex);
if (vertex.readyToCompile()) {
this.compilationQueue.add(vertex);
}
}
}

private void compileVertexQueue(StreamExecutionEnvironment senv, FluxContext fluxContext) {
CompilerImpl compilerImpl = new CompilerImpl();
private void compileVertexQueue(StreamExecutionEnvironment senv, CompilerContext compilerContext) {
DataStreamCompilerImpl dataStreamCompilerImpl = new DataStreamCompilerImpl();
while (this.compilationQueue.size() > 0) {
CompilationVertex vertex = this.compilationQueue.poll();
compilerImpl.compile(senv, fluxContext, vertex);
CompilerVertex<?> vertex = this.compilationQueue.poll();
dataStreamCompilerImpl.compile(senv, compilerContext, vertex);

// set downstream vertex compilation flags.
for (EdgeDef downstreamEdge : vertex.getOutgoingEdge()) {
CompilationVertex toVertex = this.fluxContext.getCompilationVertex(downstreamEdge.getTo());
CompilerVertex toVertex = this.compilerContext.getCompilationVertex(downstreamEdge.getTo());
toVertex.addCompiledSourceCount();
if (toVertex.readyToCompile()) {
this.compilationQueue.add(toVertex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,26 @@
* limitations under the License.
*/

package com.uber.athena.flux.flink.compiler;
package com.uber.athena.flux.flink.compiler.api;

import com.uber.athena.flux.model.EdgeDef;
import com.uber.athena.flux.model.VertexDef;
import org.apache.flink.streaming.api.datastream.DataStream;

import java.util.ArrayList;
import java.util.List;

public class CompilationVertex {

private final VertexDef vertex;
private final List<EdgeDef> incomingEdge;
private final List<EdgeDef> outgoingEdge;
/**
* Compiler vertex used within a Flux compilation context.
*
* @param <T> type of the compilation results generated.
*/
public abstract class CompilerVertex<T> {

private int compiledSourceCount;
private DataStream dataStream = null;
protected VertexDef vertex;
protected List<EdgeDef> incomingEdge;
protected List<EdgeDef> outgoingEdge;

CompilationVertex(VertexDef vertex, List<EdgeDef> incomingEdge, List<EdgeDef> outgoingEdge) {
this.vertex = vertex;
this.incomingEdge = incomingEdge;
this.outgoingEdge = outgoingEdge;
this.compiledSourceCount = 0;
}
protected int compiledSourceCount;

/**
* Increase compilation flag by one. Used after an upstream vertex has been compiled.
Expand All @@ -57,6 +53,20 @@ public boolean readyToCompile() {
return this.compiledSourceCount == incomingEdge.size();
}

/**
* Setting the result of the compilation.
*
* @param compilationResult the compilation result.
*/
public abstract void setCompilationResult(T compilationResult);

/**
* Getting the result of the compilation, return null if not compiled.
*
* @return the compilation result.
*/
public abstract T getCompilationResult();

//-------------------------------------------------------------------------
// Getters
//-------------------------------------------------------------------------
Expand All @@ -73,15 +83,6 @@ public List<EdgeDef> getOutgoingEdge() {
return outgoingEdge;
}

public DataStream getDataStream() {
return dataStream;
}

public void setDataStream(DataStream dataStream) {
this.dataStream = dataStream;
}


// ------------------------------------------------------------------------
// Builder pattern
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -112,11 +113,16 @@ public Builder addOutgoingEdge(EdgeDef edgeDef) {
return this;
}

public CompilationVertex build() {
return new CompilationVertex(
vertex,
incomingEdge,
outgoingEdge);
public VertexDef getVertex() {
return vertex;
}

public List<EdgeDef> getIncomingEdge() {
return incomingEdge;
}

public List<EdgeDef> getOutgoingEdge() {
return outgoingEdge;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
* limitations under the License.
*/

package com.uber.athena.flux.flink.compiler;
package com.uber.athena.flux.flink.compiler.impl;

import com.uber.athena.flux.flink.compiler.api.CompilerContext;
import com.uber.athena.flux.flink.compiler.api.CompilerGraph;
import com.uber.athena.flux.flink.compiler.impl.datastream.DataStreamCompilerGraph;
import com.uber.athena.flux.flink.runtime.FluxTopologyImpl;
import com.uber.athena.flux.model.TopologyDef;
import org.apache.flink.configuration.Configuration;
Expand All @@ -26,15 +29,19 @@

/**
* Compilation framework for the flux topology.
*
* <p>Based on the topology definition type, and the supported compiler,
* this compiler suite will find the appropriate compilation framework
* to construct the Flink topology job graph.
*/
public class FluxCompilerSuite {

private final TopologyDef topologyDef;
private final Configuration config;
private final StreamExecutionEnvironment streamExecutionEnvironment;
private final FluxContext fluxContext;
private final CompilerContext compilerContext;

private CompilationGraph compilationGraph;
private CompilerGraph compilerGraph;

public FluxCompilerSuite(
TopologyDef topologyDef,
Expand All @@ -43,16 +50,21 @@ public FluxCompilerSuite(
this.streamExecutionEnvironment = streamExecutionEnvironment;
this.topologyDef = topologyDef;
this.config = new Configuration(config);
this.fluxContext = new FluxContext(topologyDef, config);
this.compilationGraph = new CompilationGraph(
this.compilerContext = new CompilerContext(topologyDef, config);
// TODO: determine compilation graph impl based on API level.
this.compilerGraph = new DataStreamCompilerGraph(
this.streamExecutionEnvironment,
this.fluxContext);
this.compilerContext) {
};
}

/**
* compile topology definition to {@code FluxTopology}.
*
* @return flux topology.
* <p>The compilation should invoke the compilation framework based on
* constructed settings.
*
* @return a flux topology, different compilation suits might return different implementations.
*/
public FluxTopologyImpl compile() {
Preconditions.checkNotNull(topologyDef, "topology cannot be null!");
Expand All @@ -61,6 +73,6 @@ public FluxTopologyImpl compile() {
}

private FluxTopologyImpl compileInternal() {
return this.compilationGraph.compile();
return this.compilerGraph.compile();
}
}
Loading

0 comments on commit 46a946c

Please sign in to comment.