Skip to content

Commit

Permalink
[FLINK-33263][bugfix][table-planner] Remove redundant transformation …
Browse files Browse the repository at this point in the history
…verification logic.
  • Loading branch information
SuDewei authored and libenchao committed Mar 4, 2024
1 parent e168cd4 commit e0b6c12
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.api.dag;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.ResourceSpec;
Expand Down Expand Up @@ -603,20 +602,6 @@ public String toString() {
+ '}';
}

@VisibleForTesting
public String toStringWithoutId() {
return getClass().getSimpleName()
+ "{"
+ "name='"
+ name
+ '\''
+ ", outputType="
+ outputType
+ ", parallelism="
+ parallelism
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class BatchPlanner(
processors
}

override def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
beforeTranslation()
val planner = createDummyPlanner()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,7 @@ abstract class PlannerBase(
* @return
* The [[Transformation]] DAG that corresponds to the node DAG.
*/
@VisibleForTesting
def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]

def addExtraTransformation(transformation: Transformation[_]): Unit = {
if (!extraTransformations.contains(transformation)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class StreamPlanner(

override protected def getExecNodeGraphProcessors: Seq[ExecNodeGraphProcessor] = Seq()

override def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
beforeTranslation()
val planner = createDummyPlanner()
val transformations = execGraph.getRootNodes.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,20 +729,25 @@ Calc(select=[ts, a, b], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
</TestCase>

<TestCase name="testSetParallelismForSource">
<Resource name="sql">
<![CDATA[SELECT * FROM src LEFT JOIN changelog_src on src.id = changelog_src.id WHERE src.c > 1]]>
</Resource>
<Resource name="ast">
<![CDATA[
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
LogicalProject(id=[$0], b=[$1], c=[$2], id0=[$3], a=[$4])
+- LogicalFilter(condition=[>($2, 1)])
+- LogicalJoin(condition=[=($0, $3)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, src]])
+- LogicalTableScan(table=[[default_catalog, default_database, changelog_src]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[=(id, id0)], select=[id, b, c, id0, a], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
:- Exchange(distribution=[hash[id]])
: +- Calc(select=[id, b, c], where=[>(c, 1)])
: +- TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[id, b, c])
+- Exchange(distribution=[hash[id]])
+- ChangelogNormalize(key=[id])
+- Exchange(distribution=[hash[id]])
+- TableSourceScan(table=[[default_catalog, default_database, changelog_src]], fields=[id, a])
== Optimized Execution Plan ==
Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, b, c, id0, a], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
:- Exchange(distribution=[hash[id]])
: +- Calc(select=[id, b, c], where=[(c > 1)])
Expand All @@ -751,22 +756,60 @@ Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, b, c, id0, a], le
+- ChangelogNormalize(key=[id])
+- Exchange(distribution=[hash[id]])
+- TableSourceScan(table=[[default_catalog, default_database, changelog_src]], fields=[id, a])
]]>
</Resource>
<Resource name="transformation">
<![CDATA[
TwoInputTransformation{name='Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, b, c, id0, a], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])', outputType=ROW<`id` INT, `b` STRING, `c` INT, `id0` INT, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+- PartitionTransformation{name='Exchange(distribution=[hash[id]])', outputType=ROW<`id` INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+- OneInputTransformation{name='Calc(select=[id, b, c], where=[(c > 1)])', outputType=ROW<`id` INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+- SourceTransformationWrapper{name='ChangeToDefaultParallel', outputType=ROW<`id` INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+- LegacySourceTransformation{name='TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[id, b, c])', outputType=ROW<`id` INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=3}
+- PartitionTransformation{name='Exchange(distribution=[hash[id]])', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+- OneInputTransformation{name='ChangelogNormalize(key=[id])', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+- PartitionTransformation{name='Exchange(distribution=[hash[id]])', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+- PartitionTransformation{name='Partitioner', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+- SourceTransformationWrapper{name='ChangeToDefaultParallel', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
+- LegacySourceTransformation{name='TableSourceScan(table=[[default_catalog, default_database, changelog_src]], fields=[id, a])', outputType=ROW<`id` INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=5}
]]>
</Resource>
== Physical Execution Plan ==
{
"nodes" : [ {
"id" : ,
"type" : "Source: src[]",
"pact" : "Data Source",
"contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[id, b, c])",
"parallelism" : 3
}, {
"id" : ,
"type" : "Calc[]",
"pact" : "Operator",
"contents" : "[]:Calc(select=[id, b, c], where=[(c > 1)])",
"parallelism" : 10,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "Source: changelog_src[]",
"pact" : "Data Source",
"contents" : "[]:TableSourceScan(table=[[default_catalog, default_database, changelog_src]], fields=[id, a])",
"parallelism" : 5
}, {
"id" : ,
"type" : "ChangelogNormalize[]",
"pact" : "Operator",
"contents" : "[]:ChangelogNormalize(key=[id])",
"parallelism" : 10,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : ,
"type" : "Join[]",
"pact" : "Operator",
"contents" : "[]:Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, b, c, id0, a], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])",
"parallelism" : 10,
"predecessors" : [ {
"id" : ,
"ship_strategy" : "HASH",
"side" : "second"
}, {
"id" : ,
"ship_strategy" : "HASH",
"side" : "second"
} ]
} ]
}]]>
</Resource>
</TestCase>
</Root>
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ class TableScanTest extends TableTestBase {
@Test
def testSetParallelismForSource(): Unit = {
val config = TableConfig.getDefault
config.set(ExecutionConfigOptions.TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED, Boolean.box(false))
config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, Int.box(10))
val util = streamTestUtil(config)

util.addTable("""
Expand Down Expand Up @@ -809,8 +809,12 @@ class TableScanTest extends TableTestBase {
| 'enable-projection-push-down' = 'false'
|)
""".stripMargin)
util.verifyTransformation(
"SELECT * FROM src LEFT JOIN changelog_src " +
"on src.id = changelog_src.id WHERE src.c > 1")
val query =
"""
|SELECT *
|FROM src LEFT JOIN changelog_src
|ON src.id = changelog_src.id WHERE src.c > 1
|""".stripMargin
util.verifyExplain(query, ExplainDetail.JSON_EXECUTION_PLAN)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.utils

import org.apache.flink.FlinkVersion
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.dag.Transformation
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.configuration.BatchExecutionOptions
Expand Down Expand Up @@ -88,7 +87,7 @@ import org.junit.jupiter.api.extension.{BeforeEachCallback, ExtendWith, Extensio
import org.junit.jupiter.api.io.TempDir
import org.junit.platform.commons.support.AnnotationSupport

import java.io.{File, IOException, PrintWriter, StringWriter}
import java.io.{File, IOException}
import java.net.URL
import java.nio.file.{Files, Path, Paths}
import java.time.Duration
Expand Down Expand Up @@ -703,20 +702,6 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
withQueryBlockAlias = false)
}

/**
* Verify the AST (abstract syntax tree), the optimized exec plan and tranformation for the given
* SELECT query. Note: An exception will be thrown if the given sql can't be translated to exec
* plan and transformation result is wrong.
*/
def verifyTransformation(query: String): Unit = {
doVerifyPlan(
query,
Array.empty[ExplainDetail],
withRowType = false,
Array(PlanKind.AST, PlanKind.OPT_EXEC, PlanKind.TRANSFORM),
withQueryBlockAlias = false)
}

/** Verify the explain result for the given SELECT query. See more about [[Table#explain()]]. */
def verifyExplain(query: String): Unit = verifyExplain(getTableEnv.sqlQuery(query))

Expand Down Expand Up @@ -1055,14 +1040,6 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
""
}

// build transformation graph if `expectedPlans` contains TRANSFORM
val transformation = if (expectedPlans.contains(PlanKind.TRANSFORM)) {
val optimizedNodes = getPlanner.translateToExecNodeGraph(optimizedRels, true)
System.lineSeparator + getTransformations(getPlanner.translateToPlan(optimizedNodes))
} else {
""
}

// check whether the sql equals to the expected if the `relNodes` are translated from sql
assertSqlEqualsOrExpandFunc()
// check ast plan
Expand All @@ -1081,10 +1058,6 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
if (expectedPlans.contains(PlanKind.OPT_EXEC)) {
assertEqualsOrExpand("optimized exec plan", optimizedExecPlan, expand = false)
}
// check transformation graph
if (expectedPlans.contains(PlanKind.TRANSFORM)) {
assertEqualsOrExpand("transformation", transformation, expand = false)
}
}

private def doVerifyExplain(explainResult: String, extraDetails: ExplainDetail*): Unit = {
Expand Down Expand Up @@ -1144,25 +1117,6 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
replaceEstimatedCost(optimizedPlan)
}

private def getTransformations(transformations: java.util.List[Transformation[_]]): String = {
val stringWriter = new StringWriter()
val printWriter = new PrintWriter(stringWriter)
transformations.foreach(transformation => getTransformation(printWriter, transformation, 0))
stringWriter.toString
}

private def getTransformation(
printWriter: PrintWriter,
transformation: Transformation[_],
level: Int): Unit = {
if (level == 0) {
printWriter.println(transformation.toStringWithoutId)
} else {
printWriter.println(("\t" * level) + "+- " + transformation.toStringWithoutId)
}
transformation.getInputs.foreach(child => getTransformation(printWriter, child, level + 1))
}

/** Replace the estimated costs for the given plan, because it may be unstable. */
protected def replaceEstimatedCost(s: String): String = {
var str = s.replaceAll("\\r\\n", "\n")
Expand Down Expand Up @@ -1670,9 +1624,6 @@ object PlanKind extends Enumeration {

/** Optimized Execution Plan */
val OPT_EXEC: Value = Value("OPT_EXEC")

/** Transformation */
val TRANSFORM: Value = Value("TRANSFORM")
}

object TableTestUtil {
Expand Down

0 comments on commit e0b6c12

Please sign in to comment.