Skip to content

Commit

Permalink
SAMZA-1204: Visualize StreamGraph and ExecutionPlan
Browse files Browse the repository at this point in the history
Once a Samza application (using fluent API) is deployed, an execution plan will be generated by the ExecutionPlanner. The plan JSON will be written to a file (plan.json) under the ./plan directory, which also contains the plan.html and javscripts (js folder).

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Author: xinyuiscool <xinyuliu.us@gmail.com>

Reviewers: Jake Maes <jmakes@apache.org>

Closes apache#127 from xinyuiscool/SAMZA-1204
  • Loading branch information
Xinyu Liu committed May 2, 2017
1 parent ad1f161 commit b71b253
Show file tree
Hide file tree
Showing 29 changed files with 651 additions and 194 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ rat {
'**/non-responsive.less',
'**/ropa-sans.css',
'**/syntax.css',
'**/d3.v3.min.js',
'**/dagre-d3.min.js',
'.idea/**',
'.reviewboardrc',
'docs/_site/**',
Expand Down Expand Up @@ -396,6 +398,7 @@ project(":samza-shell") {
classifier = 'dist'
from 'src/main/bash'
from 'src/main/resources'
from 'src/main/visualizer'
}

artifacts {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.samza.execution;

import java.util.List;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.JobConfig;
import org.apache.samza.system.StreamSpec;

Expand All @@ -28,10 +29,11 @@
* This interface represents Samza {@link org.apache.samza.application.StreamApplication}
* plans for physical execution.
*/
@InterfaceStability.Unstable
public interface ExecutionPlan {

/**
* Returns the configs for single stage job, in the order of topologically sort.
* Returns the configs for single stage job, in topological sort order.
* @return list of job configs
*/
List<JobConfig> getJobConfigs();
Expand All @@ -43,9 +45,10 @@ public interface ExecutionPlan {
List<StreamSpec> getIntermediateStreams();

/**
* Returns the JSON representation of the plan for visualization
* @return json string
* @throws Exception exception
* Returns the JSON representation of the plan.
* @return JSON string
* @throws Exception exception during JSON serialization, including {@link java.io.IOException}
* and {@link org.codehaus.jackson.JsonGenerationException}
*/
String getPlanAsJson() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
// create physical job graph based on stream graph
JobGraph jobGraph = createJobGraph(streamGraph);

// fetch the external streams partition info
updateExistingPartitions(jobGraph, streamManager);

if (!jobGraph.getIntermediateStreamEdges().isEmpty()) {
// figure out the partitions for internal streams
calculatePartitions(streamGraph, jobGraph);
Expand All @@ -84,7 +87,7 @@ public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
// For this phase, we have a single job node for the whole dag
String jobName = config.get(JobConfig.JOB_NAME());
String jobId = config.get(JobConfig.JOB_ID(), "1");
JobNode node = jobGraph.getOrCreateNode(jobName, jobId, streamGraph);
JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId, streamGraph);

// add sources
sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
Expand All @@ -104,9 +107,6 @@ public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
* Figure out the number of partitions of all streams
*/
/* package private */ void calculatePartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
// fetch the external streams partition info
updateExistingPartitions(jobGraph, streamManager);

// calculate the partitions for the input streams of join operators
calculateJoinInputPartitions(streamGraph, jobGraph);

Expand Down Expand Up @@ -167,7 +167,7 @@ public ExecutionPlan plan(StreamGraphImpl streamGraph) throws Exception {
Set<OperatorSpec> visited = new HashSet<>();

streamGraph.getInputStreams().entrySet().forEach(entry -> {
StreamEdge streamEdge = jobGraph.getOrCreateEdge(entry.getKey());
StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey());
// Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
outputStreamToJoinSpec, joinQ, visited);
Expand Down
45 changes: 26 additions & 19 deletions samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.operators.StreamGraphImpl;
Expand Down Expand Up @@ -65,40 +66,46 @@
this.config = config;
}

/**
* Returns the configs for single stage job, in the order of topologically sort.
* @return list of job configs
*/
@Override
public List<JobConfig> getJobConfigs() {
return getJobNodes().stream().map(JobNode::generateConfig).collect(Collectors.toList());
String json = "";
try {
json = getPlanAsJson();
} catch (Exception e) {
log.warn("Failed to generate plan JSON", e);
}

final String planJson = json;
return getJobNodes().stream().map(n -> n.generateConfig(planJson)).collect(Collectors.toList());
}

/**
* Returns the intermediate streams that need to be created.
* @return intermediate {@link StreamSpec}s
*/
@Override
public List<StreamSpec> getIntermediateStreams() {
return getIntermediateStreamEdges().stream()
.map(streamEdge -> streamEdge.getStreamSpec())
.collect(Collectors.toList());
}

/**
* Returns the JSON representation of the plan for visualization
* @return json string
* @throws Exception
*/
@Override
public String getPlanAsJson() throws Exception {
return jsonGenerator.toJson(this);
}

/**
* Returns the config for this application
* @return {@link ApplicationConfig}
*/
public ApplicationConfig getApplicationConfig() {
return new ApplicationConfig(config);
}

/**
* Add a source stream to a {@link JobNode}
* @param input source stream
* @param node the job node that consumes from the source
*/
void addSource(StreamSpec input, JobNode node) {
StreamEdge edge = getOrCreateEdge(input);
StreamEdge edge = getOrCreateStreamEdge(input);
edge.addTargetNode(node);
node.addInEdge(edge);
sources.add(edge);
Expand All @@ -110,7 +117,7 @@ void addSource(StreamSpec input, JobNode node) {
* @param node the job node that outputs to the sink
*/
void addSink(StreamSpec output, JobNode node) {
StreamEdge edge = getOrCreateEdge(output);
StreamEdge edge = getOrCreateStreamEdge(output);
edge.addSourceNode(node);
node.addOutEdge(edge);
sinks.add(edge);
Expand All @@ -123,7 +130,7 @@ void addSink(StreamSpec output, JobNode node) {
* @param to the target node
*/
void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
StreamEdge edge = getOrCreateEdge(streamSpec);
StreamEdge edge = getOrCreateStreamEdge(streamSpec);
edge.addSourceNode(from);
edge.addTargetNode(to);
from.addOutEdge(edge);
Expand All @@ -137,7 +144,7 @@ void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) {
* @param jobId id of the job
* @return
*/
JobNode getOrCreateNode(String jobName, String jobId, StreamGraphImpl streamGraph) {
JobNode getOrCreateJobNode(String jobName, String jobId, StreamGraphImpl streamGraph) {
String nodeId = JobNode.createId(jobName, jobId);
JobNode node = nodes.get(nodeId);
if (node == null) {
Expand All @@ -152,7 +159,7 @@ JobNode getOrCreateNode(String jobName, String jobId, StreamGraphImpl streamGrap
* @param streamSpec spec of the StreamEdge
* @return stream edge
*/
StreamEdge getOrCreateEdge(StreamSpec streamSpec) {
StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec) {
String streamId = streamSpec.getId();
StreamEdge edge = edges.get(streamId);
if (edge == null) {
Expand Down

0 comments on commit b71b253

Please sign in to comment.