Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
remove unnecessary casting in Flink-flux
Browse files Browse the repository at this point in the history
  • Loading branch information
Rong Rong committed May 9, 2019
1 parent efed7cd commit a93df6d
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public CompilationGraph(StreamExecutionEnvironment senv, FluxContext fluxContext
*
* @return the topology
*/
public FluxTopology compile() {
public FluxTopologyImpl compile() {
constructCompilationGraph(fluxContext);
compileVertexQueue(senv, fluxContext);
JobGraph jobGraph = senv.getStreamGraph().getJobGraph();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.uber.athena.flux.flink.compiler;

import com.uber.athena.flux.api.topology.FluxTopology;
import com.uber.athena.flux.flink.runtime.FluxTopologyImpl;
import com.uber.athena.flux.model.TopologyDef;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -54,13 +55,13 @@ public FluxCompilerSuite(
*
* @return flux topology.
*/
public FluxTopology compile() {
public FluxTopologyImpl compile() {
Preconditions.checkNotNull(topologyDef, "topology cannot be null!");
Preconditions.checkNotNull(streamExecutionEnvironment, "execution environment cannot be null!");
return this.compileInternal();
}

private FluxTopology compileInternal() {
private FluxTopologyImpl compileInternal() {
return this.compilationGraph.compile();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static FluxTopologyBuilderImpl createFluxBuilder() {
* @param conf extra global configuration
* @return a {@code FluxTopologyImpl} class that contains all required components.
*/
private FluxTopology compileTopologyDef(
private FluxTopologyImpl compileTopologyDef(
StreamExecutionEnvironment senv,
TopologyDef topologyDef,
Map<String, Object> conf) {
Expand Down Expand Up @@ -93,7 +93,7 @@ private static Configuration generateFlinkConfiguration(Map<String, Object> conf
* @throws IOException when compilation fails
*/
@Override
public FluxTopology getTopology(
public FluxTopologyImpl getTopology(
TopologyDef topologyDef,
Map<String, Object> config) throws IOException {

Expand All @@ -118,7 +118,7 @@ public FluxTopology getTopology(
* @throws Exception when execution fails.
*/
@Override
public FluxTopologyExecutionResult execute(FluxTopology fluxTopology) throws Exception {
public FluxTopologyExecutionResultImpl execute(FluxTopology fluxTopology) throws Exception {
FluxTopologyExecutionResultImpl executionResult = new FluxTopologyExecutionResultImpl();
executionResult.setExecutionResult(this.senv.execute());
return executionResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.uber.athena.flux.api.topology.FluxTopology;
import com.uber.athena.flux.model.TopologyDef;
import com.uber.athena.flux.parser.FluxParser;
import org.junit.Ignore;
import org.junit.Test;

public class TopologyRuntimeITCase {
Expand All @@ -44,6 +45,7 @@ public void testRepartitionTopologyRuntime() throws Exception {
}

@Test
@Ignore("Need to implement kafka test base to launch mini-kafka")
public void testKafkaTopologyRuntime() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_topology.yaml", false, true, null, false);
topologyDef.validate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void testBasicTopologyCompilation() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/basic_topology.yaml", false, true, null, false);
topologyDef.validate();
FluxTopologyBuilderImpl fluxBuilder = FluxTopologyBuilderImpl.createFluxBuilder();
FluxTopologyImpl topology = (FluxTopologyImpl) fluxBuilder.getTopology(topologyDef, null);
FluxTopologyImpl topology = fluxBuilder.getTopology(topologyDef, null);
assertNotNull(topology.getJobGraph());
}

Expand All @@ -42,7 +42,7 @@ public void testRepartitionTopologyCompilation() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/repartition_topology.yaml", false, true, null, false);
topologyDef.validate();
FluxTopologyBuilderImpl fluxBuilder = FluxTopologyBuilderImpl.createFluxBuilder();
FluxTopologyImpl topology = (FluxTopologyImpl) fluxBuilder.getTopology(topologyDef, null);
FluxTopologyImpl topology = fluxBuilder.getTopology(topologyDef, null);
assertNotNull(topology.getJobGraph());
}

Expand All @@ -51,7 +51,7 @@ public void testKafkaTopologyCompilation() throws Exception {
TopologyDef topologyDef = FluxParser.parseResource("/configs/kafka_topology.yaml", false, true, null, false);
topologyDef.validate();
FluxTopologyBuilderImpl fluxBuilder = FluxTopologyBuilderImpl.createFluxBuilder();
FluxTopologyImpl topology = (FluxTopologyImpl) fluxBuilder.getTopology(topologyDef, null);
FluxTopologyImpl topology = fluxBuilder.getTopology(topologyDef, null);
assertNotNull(topology.getJobGraph());
}
}

0 comments on commit a93df6d

Please sign in to comment.