Skip to content

Commit

Permalink
[FLINK-4599] [table] Add 'explain()' also to StreamTableEnvironment
Browse files Browse the repository at this point in the history
This closes apache#2485.
  • Loading branch information
chobeat authored and yuzhongliu committed Dec 5, 2016
1 parent d5fefab commit 897f647
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 0 deletions.
53 changes: 53 additions & 0 deletions docs/dev/table_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2501,3 +2501,56 @@ The Table API provides a configuration (the so-called `TableConfig`) to modify r
By default, the Table API supports `null` values. Null handling can be disabled to improve preformance by setting the `nullCheck` property in the `TableConfig` to `false`.

{% top %}

Explaining a Table
----
The Table API provides a mechanism to describe the graph of operations that leads to the resulting output. This is done through the `TableEnvironment#explain(table)` method. It returns a string describing two graphs: the Abstract Syntax Tree of the relational algebra query and Flink's Execution Plan of the Job.

Table `explain` is supported for both `BatchTableEnvironment` and `StreamTableEnvironment`. Currently `StreamTableEnvironment` doesn't support the explanation of the Execution Plan.

The following code shows an example and the corresponding output:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));

Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1.unionAll(table2);

String explanation = tEnv.explain(table);
System.out.println(explanation);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1.unionAll(table2)

val explanation: String = tEnv.explain(table)
println(explanation)
{% endhighlight %}
</div>
</div>

{% highlight text %}
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])
{% endhighlight %}

{% top %}



Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,20 @@ abstract class StreamTableEnvironment(

}

/**
* Returns the AST of the specified Table API and SQL queries and the execution plan to compute
* the result of the given [[Table]].
*
* @param table The table for which the AST and execution plan will be returned.
*/
def explain(table: Table): String = {

val ast = RelOptUtil.toString(table.getRelNode)

s"== Abstract Syntax Tree ==" +
System.lineSeparator +
ast

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.stream

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.TableEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.junit.Assert.assertEquals
import org.junit._

class ExplainStreamTest
extends StreamingMultipleProgramsTestBase {

val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile

@Test
def testFilter(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val table = env.fromElements((1, "hello"))
.toTable(tEnv, 'a, 'b)
.filter("a % 2 = 0")

val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testFilterStream0.out").mkString.replaceAll("\\r\\n", "\n")
assertEquals(result, source)
}

@Test
def testUnion(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1.unionAll(table2)

val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
val source = scala.io.Source.fromFile(testFilePath +
"../../src/test/scala/resources/testUnionStream0.out").mkString.replaceAll("\\r\\n", "\n")
assertEquals(result, source)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
== Abstract Syntax Tree ==
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataStreamTable_0]])
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])

0 comments on commit 897f647

Please sign in to comment.