diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/PartialFinalType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/PartialFinalType.java new file mode 100644 index 00000000000000..5c0550e1cc132f --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/PartialFinalType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan; + +/** + * Enumerations for partial final aggregate types. + * + * @see org.apache.flink.table.plan.rules.physical.stream.SplitAggregateRule + */ +public enum PartialFinalType { + /** + * partial aggregate type represents partial-aggregation, + * which produces a partial distinct aggregated result based on group key and bucket number. + */ + PARTIAL, + /** + * final aggregate type represents final-aggregation, + * which produces final result based on the partially distinct aggregated result. + */ + FINAL, + /** + * the aggregate which has not been split. + */ + NONE +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala index 2e8bc8a9985c18..0b7429d350d530 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableConfig.scala @@ -17,6 +17,8 @@ */ package org.apache.flink.table.api +import org.apache.flink.configuration.{Configuration, GlobalConfiguration} + import _root_.java.util.TimeZone import _root_.java.math.MathContext @@ -47,6 +49,11 @@ class TableConfig { */ private var maxGeneratedCodeLength: Int = 64000 // just an estimate + /** + * Defines user-defined configuration + */ + private var conf = GlobalConfiguration.loadConfiguration() + /** * Sets the timezone for date/time/timestamp conversions. */ @@ -104,6 +111,19 @@ class TableConfig { } this.maxGeneratedCodeLength = maxGeneratedCodeLength } + + /** + * Returns user-defined configuration + */ + def getConf: Configuration = conf + + /** + * Sets user-defined configuration + */ + def setConf(conf: Configuration): Unit = { + this.conf = GlobalConfiguration.loadConfiguration() + this.conf.addAll(conf) + } } object TableConfig { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 0d525c7359fb4d..43826c0e580298 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -355,6 +355,11 @@ object FlinkTypeFactory { case _ => false } + def isRowtimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match { + case ti: TimeIndicatorRelDataType if ti.isEventTime => true + case _ => false + } + def toInternalType(relDataType: RelDataType): InternalType = relDataType.getSqlTypeName match { case BOOLEAN => InternalTypes.BOOLEAN case TINYINT => InternalTypes.BYTE diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWatermarkAssigner.scala index ab92a161a50ed3..8a026c7188a51d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWatermarkAssigner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/LogicalWatermarkAssigner.scala @@ -32,12 +32,12 @@ final class LogicalWatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, input: RelNode, - rowtimeField: String, - watermarkOffset: Long) - extends WatermarkAssigner(cluster, traits, input, rowtimeField, watermarkOffset) { + rowtimeFieldIndex: Option[Int], + watermarkDelay: Option[Long]) + extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkDelay) { override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { - new LogicalWatermarkAssigner(cluster, traits, inputs.get(0), rowtimeField, watermarkOffset) + new LogicalWatermarkAssigner(cluster, traits, inputs.get(0), rowtimeFieldIndex, watermarkDelay) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala index 42ad60c9c5b94d..76dd99a5fda57d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/Rank.scala @@ -109,9 +109,9 @@ abstract class Rank( }.mkString(", ") super.explainTerms(pw) .item("rankFunction", rankFunction) - .item("partitionBy", partitionKey.map(i => s"$$$i").mkString(",")) - .item("orderBy", Rank.sortFieldsToString(sortCollation)) .item("rankRange", rankRange.toString()) + .item("partitionBy", partitionKey.map(i => s"$$$i").mkString(",")) + .item("orderBy", RelExplainUtil.collationToString(sortCollation)) .item("select", select) } @@ -168,24 +168,3 @@ case class VariableRankRange(rankEndIndex: Int) extends RankRange { s"rankEnd=$$$rankEndIndex" } } - -object Rank { - def sortFieldsToString(collationSort: RelCollation): String = { - val fieldCollations = collationSort.getFieldCollations - .map(c => (c.getFieldIndex, FlinkRelOptUtil.directionToOrder(c.getDirection))) - - fieldCollations.map { - case (index, order) => s"$$$index ${order.getShortName}" - }.mkString(", ") - } - - def sortFieldsToString(collationSort: RelCollation, inputType: RelDataType): String = { - val fieldCollations = collationSort.getFieldCollations - .map(c => (c.getFieldIndex, FlinkRelOptUtil.directionToOrder(c.getDirection))) - val inputFieldNames = inputType.getFieldNames - - fieldCollations.map { - case (index, order) => s"${inputFieldNames.get(index)} ${order.getShortName}" - }.mkString(", ") - } -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala index 5976648ea846c4..2ac9e349b32acd 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/calcite/WatermarkAssigner.scala @@ -32,21 +32,21 @@ import scala.collection.JavaConversions._ abstract class WatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, - inputNode: RelNode, - val rowtimeField: String, - val watermarkOffset: Long) - extends SingleRel(cluster, traits, inputNode) { + inputRel: RelNode, + val rowtimeFieldIndex: Option[Int], + val watermarkDelay: Option[Long]) + extends SingleRel(cluster, traits, inputRel) { override def deriveRowType(): RelDataType = { - val inputRowType = inputNode.getRowType + val inputRowType = inputRel.getRowType val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] val newFieldList = inputRowType.getFieldList.map { f => - if (f.getName.equals(rowtimeField)) { - val rowtimeIndicatorType = typeFactory.createRowtimeIndicatorType() - new RelDataTypeFieldImpl(rowtimeField, f.getIndex, rowtimeIndicatorType) - } else { - f + rowtimeFieldIndex match { + case Some(index) if f.getIndex == index => + val rowtimeIndicatorType = typeFactory.createRowtimeIndicatorType() + new RelDataTypeFieldImpl(f.getName, f.getIndex, rowtimeIndicatorType) + case _ => f } } @@ -57,8 +57,9 @@ abstract class WatermarkAssigner( override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .item("fields", getRowType.getFieldNames) - .item("rowtimeField", rowtimeField) - .item("watermarkOffset", watermarkOffset) + .item("fields", getRowType.getFieldNames.mkString(", ")) + .itemIf("rowtimeField", getRowType.getFieldNames.get(rowtimeFieldIndex.getOrElse(0)), + rowtimeFieldIndex.isDefined) + .itemIf("watermarkDelay", watermarkDelay.getOrElse(0L), watermarkDelay.isDefined) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalExchange.scala similarity index 97% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonExchange.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalExchange.scala index 2d0cbe771caef3..3625676dcab44f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonExchange.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalExchange.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.common import org.apache.flink.table.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.plan.cost.FlinkCost._ import org.apache.flink.table.plan.cost.FlinkCostFactory -import org.apache.flink.table.plan.nodes.FlinkRelNode +import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} import org.apache.calcite.rel.core.Exchange @@ -33,13 +33,13 @@ import scala.collection.JavaConverters._ /** * Base class for flink [[Exchange]]. */ -abstract class CommonExchange( +abstract class CommonPhysicalExchange( cluster: RelOptCluster, traitSet: RelTraitSet, relNode: RelNode, relDistribution: RelDistribution) extends Exchange(cluster, traitSet, relNode, relDistribution) - with FlinkRelNode { + with FlinkPhysicalRel { override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { val inputRows = mq.getRowCount(input) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala new file mode 100644 index 00000000000000..ade75d8a9e1541 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonPhysicalJoin.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.common + +import org.apache.flink.table.plan.FlinkJoinRelType +import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel +import org.apache.flink.table.plan.util.{FlinkRelOptUtil, RelExplainUtil} + +import org.apache.calcite.rel.RelWriter +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} +import org.apache.calcite.rel.core.{Join, SemiJoin} +import org.apache.calcite.sql.validate.SqlValidatorUtil +import org.apache.calcite.util.mapping.IntPair + +import java.util +import java.util.Collections + +import scala.collection.JavaConversions._ + +/** + * Base physical class for flink [[Join]]. + */ +trait CommonPhysicalJoin extends Join with FlinkPhysicalRel { + + lazy val (joinInfo, filterNulls) = { + val filterNulls = new util.ArrayList[java.lang.Boolean] + val joinInfo = FlinkRelOptUtil.createJoinInfo(getLeft, getRight, getCondition, filterNulls) + (joinInfo, filterNulls.map(_.booleanValue()).toArray) + } + + lazy val keyPairs: List[IntPair] = joinInfo.pairs.toList + + // TODO supports FlinkJoinRelType.ANTI + lazy val flinkJoinType: FlinkJoinRelType = this match { + case sj: SemiJoin => FlinkJoinRelType.SEMI + case j: Join => FlinkJoinRelType.toFlinkJoinRelType(getJoinType) + case _ => throw new IllegalArgumentException(s"Illegal join node: ${this.getRelTypeName}") + } + + lazy val inputRowType: RelDataType = this match { + case sj: SemiJoin => + // Combines inputs' RowType, the result is different from SemiJoin's RowType. + SqlValidatorUtil.deriveJoinRowType( + sj.getLeft.getRowType, + sj.getRight.getRowType, + getJoinType, + sj.getCluster.getTypeFactory, + null, + Collections.emptyList[RelDataTypeField] + ) + case j: Join => getRowType + case _ => throw new IllegalArgumentException(s"Illegal join node: ${this.getRelTypeName}") + } + + override def explainTerms(pw: RelWriter): RelWriter = { + pw.input("left", getLeft).input("right", getRight) + .item("joinType", RelExplainUtil.joinTypeToString(flinkJoinType)) + .item("where", + RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString)) + .item("select", getRowType.getFieldNames.mkString(", ")) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRank.scala index c93fd299048b38..84750ea808b493 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRank.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalRank.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.plan.nodes.logical import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.calcite.{LogicalRank, Rank, RankRange} +import org.apache.flink.table.plan.util.RelExplainUtil import org.apache.calcite.plan._ import org.apache.calcite.rel.`type`.RelDataType @@ -60,7 +61,7 @@ class FlinkLogicalRank( pw.item("input", getInput) .item("rankFunction", rankFunction) .item("partitionBy", partitionKey.map(inputFieldNames.get(_)).mkString(",")) - .item("orderBy", Rank.sortFieldsToString(sortCollation, input.getRowType)) + .item("orderBy", RelExplainUtil.collationToString(sortCollation, input.getRowType)) .item("rankRange", rankRange.toString(inputFieldNames)) .item("select", getRowType.getFieldNames.mkString(", ")) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala index 5703315153fe1a..5d17fd4476e59d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWatermarkAssigner.scala @@ -35,16 +35,16 @@ class FlinkLogicalWatermarkAssigner( cluster: RelOptCluster, traits: RelTraitSet, input: RelNode, - rowtimeField: String, - watermarkOffset: Long) - extends WatermarkAssigner(cluster, traits, input, rowtimeField, watermarkOffset) + rowtimeFieldIndex: Option[Int], + watermarkDelay: Option[Long]) + extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkDelay) with FlinkLogicalRel { override def copy( traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { new FlinkLogicalWatermarkAssigner( - cluster, traitSet, inputs.get(0), rowtimeField, watermarkOffset) + cluster, traitSet, inputs.get(0), rowtimeFieldIndex, watermarkDelay) } } @@ -60,8 +60,8 @@ class FlinkLogicalWatermarkAssignerConverter extends ConverterRule( val newInput = RelOptRule.convert(watermark.getInput, FlinkConventions.LOGICAL) FlinkLogicalWatermarkAssigner.create( newInput, - watermark.rowtimeField, - watermark.watermarkOffset) + watermark.rowtimeFieldIndex, + watermark.watermarkDelay) } } @@ -70,12 +70,10 @@ object FlinkLogicalWatermarkAssigner { def create( input: RelNode, - rowtimeField: String, - watermarkOffset: Long): FlinkLogicalWatermarkAssigner = { + rowtimeFieldIndex: Option[Int], + watermarkDelay: Option[Long]): FlinkLogicalWatermarkAssigner = { val cluster = input.getCluster val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify() - new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtimeField, watermarkOffset) + new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtimeFieldIndex, watermarkDelay) } } - - diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala index 0624e7a341946b..1402b243c39445 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecExchange.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.plan.nodes.physical.batch -import org.apache.flink.table.plan.nodes.common.CommonExchange +import org.apache.flink.table.plan.nodes.common.CommonPhysicalExchange import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.{RelDistribution, RelNode} @@ -76,7 +76,7 @@ class BatchExecExchange( traitSet: RelTraitSet, inputRel: RelNode, relDistribution: RelDistribution) - extends CommonExchange(cluster, traitSet, inputRel, relDistribution) + extends CommonPhysicalExchange(cluster, traitSet, inputRel, relDistribution) with BatchPhysicalRel { override def copy( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala index d3ebe353c73814..2f59ba502dada5 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala @@ -31,12 +31,13 @@ import org.apache.calcite.tools.RelBuilder * Batch physical RelNode for aggregate. * *

There are two differences between this node and [[Aggregate]]: - * 1. This node supports two-stage aggregation to reduce shuffle data: - * local-aggregation (or named partial-aggregation in other engines) and - * global-aggregation (or named final-aggregation). - * local-aggregation produces a partial result for each group before shuffle, - * and then global-aggregation produces final result based on shuffled partial result. - * Two-stage aggregation is enabled only if all aggregate calls are mergeable. (e.g. SUM, AVG, MAX) + * 1. This node supports two-stage aggregation to reduce data-shuffling: + * local-aggregation and global-aggregation. + * local-aggregation produces a partial result for each group before shuffle in stage 1, + * and then the partially aggregated results are shuffled to global-aggregation + * which produces the final result in stage 2. + * Two-stage aggregation is enabled only if all aggregate functions are mergeable. + * (e.g. SUM, AVG, MAX) * 2. This node supports auxiliary group keys which will not be computed as key and * does not also affect the correctness of the final result. [[Aggregate]] does not distinguish * group keys and auxiliary group keys, and combines them as a complete `groupSet`. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecJoinBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecJoinBase.scala index e1609bdee6c2cc..17b963ca52e28f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecJoinBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecJoinBase.scala @@ -17,56 +17,13 @@ */ package org.apache.flink.table.plan.nodes.physical.batch -import org.apache.flink.table.plan.FlinkJoinRelType -import org.apache.flink.table.plan.util.RelExplainUtil +import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin -import org.apache.calcite.rel.RelWriter -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} -import org.apache.calcite.rel.core.{Join, JoinInfo, SemiJoin} -import org.apache.calcite.sql.validate.SqlValidatorUtil -import org.apache.calcite.util.mapping.IntPair - -import java.util.Collections - -import scala.collection.JavaConversions._ +import org.apache.calcite.rel.core.Join /** * Batch physical RelNode for [[Join]] */ -trait BatchExecJoinBase extends Join with BatchPhysicalRel { - - lazy val joinInfo: JoinInfo = JoinInfo.of(getLeft, getRight, getCondition) - - lazy val keyPairs: List[IntPair] = joinInfo.pairs.toList - - // TODO supports FlinkJoinRelType.ANTI - lazy val flinkJoinType: FlinkJoinRelType = this match { - case sj: SemiJoin => FlinkJoinRelType.SEMI - case j: Join => FlinkJoinRelType.toFlinkJoinRelType(getJoinType) - case _ => throw new IllegalArgumentException(s"Illegal join node: ${this.getRelTypeName}") - } - - lazy val inputRowType: RelDataType = this match { - case sj: SemiJoin => - // Combines inputs' RowType, the result is different from SemiJoin's RowType. - SqlValidatorUtil.deriveJoinRowType( - sj.getLeft.getRowType, - sj.getRight.getRowType, - getJoinType, - sj.getCluster.getTypeFactory, - null, - Collections.emptyList[RelDataTypeField] - ) - case j: Join => getRowType - case _ => throw new IllegalArgumentException(s"Illegal join node: ${this.getRelTypeName}") - } - - override def explainTerms(pw: RelWriter): RelWriter = { - pw.input("left", getLeft).input("right", getRight) - .item("where", - RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString)) - .item("join", getRowType.getFieldNames.mkString(", ")) - .item("joinType", RelExplainUtil.joinTypeToString(flinkJoinType)) - } +trait BatchExecJoinBase extends CommonPhysicalJoin with BatchPhysicalRel { } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala index 85f0ce8418e160..8df1a7a918a7a1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala @@ -104,10 +104,10 @@ class BatchExecOverAggregate( val constants: Seq[RexLiteral] = logicWindow.constants val writer = super.explainTerms(pw) - .itemIf("partitionBy", RelExplainUtil.fieldToString(partitionKeys, outputRowType), + .itemIf("partitionBy", RelExplainUtil.fieldToString(partitionKeys, inputRowType), partitionKeys.nonEmpty) .itemIf("orderBy", - RelExplainUtil.orderingToString(groups.head.orderKeys.getFieldCollations, outputRowType), + RelExplainUtil.collationToString(groups.head.orderKeys, inputRowType), orderKeyIndices.nonEmpty) var offset = inputRowType.getFieldCount diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala index 2429f04401f2c4..ad63540770f130 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecRank.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.plan.nodes.physical.batch import org.apache.flink.table.api.TableException import org.apache.flink.table.plan.cost.{FlinkCost, FlinkCostFactory} import org.apache.flink.table.plan.nodes.calcite.{ConstantRankRange, Rank, RankRange} +import org.apache.flink.table.plan.util.RelExplainUtil import org.apache.calcite.plan._ import org.apache.calcite.rel._ @@ -85,12 +86,12 @@ class BatchExecRank( } override def explainTerms(pw: RelWriter): RelWriter = { - val inputFieldNames = inputRel.getRowType.getFieldNames + val inputRowType = inputRel.getRowType pw.item("input", getInput) .item("rankFunction", rankFunction) - .item("partitionBy", partitionKey.map(inputFieldNames.get(_)).mkString(",")) - .item("orderBy", Rank.sortFieldsToString(sortCollation, inputRel.getRowType)) - .item("rankRange", rankRange.toString(inputFieldNames)) + .item("rankRange", rankRange.toString(inputRowType.getFieldNames)) + .item("partitionBy", RelExplainUtil.fieldToString(partitionKey.toArray, inputRowType)) + .item("orderBy", RelExplainUtil.collationToString(sortCollation, inputRowType)) .item("global", isGlobal) .item("select", getRowType.getFieldNames.mkString(", ")) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala index a3ae35f663ba79..2868ed3a48cc80 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSort.scala @@ -35,11 +35,11 @@ class BatchExecSort( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, - collations: RelCollation) - extends Sort(cluster, traitSet, inputRel, collations) + sortCollation: RelCollation) + extends Sort(cluster, traitSet, inputRel, sortCollation) with BatchPhysicalRel { - require(collations.getFieldCollations.size() > 0) + require(sortCollation.getFieldCollations.size() > 0) override def copy( traitSet: RelTraitSet, @@ -52,8 +52,7 @@ class BatchExecSort( override def explainTerms(pw: RelWriter): RelWriter = { pw.input("input", getInput) - .item("orderBy", - RelExplainUtil.orderingToString(collations.getFieldCollations, getRowType)) + .item("orderBy", RelExplainUtil.collationToString(sortCollation, getRowType)) } override def estimateRowCount(mq: RelMetadataQuery): Double = mq.getRowCount(getInput) @@ -63,7 +62,7 @@ class BatchExecSort( if (rowCount == null) { return null } - val numOfSortKeys = collations.getFieldCollations.size() + val numOfSortKeys = sortCollation.getFieldCollations.size() val cpuCost = FlinkCost.COMPARE_CPU_COST * numOfSortKeys * rowCount * Math.max(Math.log(rowCount), 1.0) val memCost = FlinkRelMdUtil.computeSortMemory(mq, getInput) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala index 452a554697d5d0..6e097a7eca9c7c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSortLimit.scala @@ -39,11 +39,11 @@ class BatchExecSortLimit( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, - collations: RelCollation, + sortCollation: RelCollation, offset: RexNode, fetch: RexNode, isGlobal: Boolean) - extends Sort(cluster, traitSet, inputRel, collations, offset, fetch) + extends Sort(cluster, traitSet, inputRel, sortCollation, offset, fetch) with BatchPhysicalRel { private val limitStart: Long = FlinkRelOptUtil.getLimitStart(offset) @@ -60,8 +60,7 @@ class BatchExecSortLimit( override def explainTerms(pw: RelWriter): RelWriter = { pw.input("input", getInput) - .item("orderBy", - RelExplainUtil.orderingToString(collations.getFieldCollations, getRowType)) + .item("orderBy", RelExplainUtil.collationToString(sortCollation, getRowType)) .item("offset", limitStart) .item("fetch", RelExplainUtil.fetchToString(fetch)) .item("global", isGlobal) @@ -84,7 +83,7 @@ class BatchExecSortLimit( override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { val inputRowCnt = mq.getRowCount(getInput()) val heapLen = Math.min(inputRowCnt, limitEnd) - val numOfSort = collations.getFieldCollations.size() + val numOfSort = sortCollation.getFieldCollations.size() val cpuCost = FlinkCost.COMPARE_CPU_COST * numOfSort * inputRowCnt * Math.log(heapLen) // assume memory is big enough to simplify the estimation. val memCost = heapLen * mq.getAverageRowSize(this) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCalc.scala new file mode 100644 index 00000000000000..652c9c8bdd1d1a --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecCalc.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.nodes.common.CommonCalc + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Calc +import org.apache.calcite.rex.RexProgram + +/** + * Stream physical RelNode for [[Calc]]. + */ +class StreamExecCalc( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + calcProgram: RexProgram, + outputRowType: RelDataType) + extends CommonCalc(cluster, traitSet, inputRel, calcProgram) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def deriveRowType(): RelDataType = outputRowType + + override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = { + new StreamExecCalc(cluster, traitSet, child, program, outputRowType) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDataStreamScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDataStreamScan.scala index 481b7b583a38a2..4f1910f6be5a54 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDataStreamScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDataStreamScan.scala @@ -47,8 +47,14 @@ class StreamExecDataStreamScan( override def producesUpdates: Boolean = dataStreamTable.producesUpdates + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + override def producesRetractions: Boolean = producesUpdates && isAccRetract + override def requireWatermark: Boolean = false + override def deriveRowType(): RelDataType = relDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { @@ -65,4 +71,5 @@ class StreamExecDataStreamScan( super.explainTerms(pw) .item("fields", getRowType.getFieldNames.asScala.mkString(", ")) } + } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExchange.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExchange.scala new file mode 100644 index 00000000000000..b503fb2d9bdedd --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExchange.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.nodes.common.CommonPhysicalExchange + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelDistribution, RelNode} + +/** + * Stream physical RelNode for [[org.apache.calcite.rel.core.Exchange]]. + */ +class StreamExecExchange( + cluster: RelOptCluster, + traitSet: RelTraitSet, + relNode: RelNode, + relDistribution: RelDistribution) + extends CommonPhysicalExchange(cluster, traitSet, relNode, relDistribution) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def copy( + traitSet: RelTraitSet, + newInput: RelNode, + newDistribution: RelDistribution): StreamExecExchange = { + new StreamExecExchange(cluster, traitSet, newInput, newDistribution) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExpand.scala new file mode 100644 index 00000000000000..edbedb994744ce --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExpand.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.nodes.calcite.Expand + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex.RexNode + +import java.util + +/** + * Stream physical RelNode for [[Expand]]. + */ +class StreamExecExpand( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + outputRowType: RelDataType, + projects: util.List[util.List[RexNode]], + expandIdIndex: Int) + extends Expand(cluster, traitSet, inputRel, outputRowType, projects, expandIdIndex) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecExpand(cluster, traitSet, inputs.get(0), outputRowType, projects, expandIdIndex) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecFirstLastRow.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecFirstLastRow.scala new file mode 100644 index 00000000000000..c819704ba3a53c --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecFirstLastRow.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Stream physical RelNode which deduplicate on keys and keeps only first row or last row. + *

NOTES: only supports sort on proctime now. + */ +class StreamExecFirstLastRow( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + uniqueKeys: Array[Int], + isRowtime: Boolean, + isLastRowMode: Boolean) + extends SingleRel(cluster, traitSet, inputRel) + with StreamPhysicalRel { + + def getUniqueKeys: Array[Int] = uniqueKeys + + override def producesUpdates: Boolean = isLastRowMode + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = true + + override def consumesRetractions: Boolean = true + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def deriveRowType(): RelDataType = getInput.getRowType + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecFirstLastRow( + cluster, + traitSet, + inputs.get(0), + uniqueKeys, + isRowtime, + isLastRowMode) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val fieldNames = getRowType.getFieldNames + val orderString = if (isRowtime) "ROWTIME" else "PROCTIME" + super.explainTerms(pw) + .item("mode", if (isLastRowMode) "LastRow" else "FirstRow") + .item("key", uniqueKeys.map(fieldNames.get).mkString(", ")) + .item("order", orderString) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala new file mode 100644 index 00000000000000..fbb81149f0e6eb --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.PartialFinalType +import org.apache.flink.table.plan.util.{AggregateInfoList, RelExplainUtil} + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.{RelNode, RelWriter} + +/** + * Stream physical RelNode for unbounded global group aggregate. + * + * @see [[StreamExecGroupAggregateBase]] for more info. + */ +class StreamExecGlobalGroupAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + inputRowType: RelDataType, + outputRowType: RelDataType, + val localAggInfoList: AggregateInfoList, + val globalAggInfoList: AggregateInfoList, + val grouping: Array[Int], + val partialFinalType: PartialFinalType) + extends StreamExecGroupAggregateBase(cluster, traitSet, inputRel) { + + override def producesUpdates = true + + override def needsUpdatesAsRetraction(input: RelNode) = true + + override def consumesRetractions = true + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def deriveRowType(): RelDataType = outputRowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new StreamExecGlobalGroupAggregate( + cluster, + traitSet, + inputs.get(0), + inputRowType, + outputRowType, + localAggInfoList, + globalAggInfoList, + grouping, + partialFinalType) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .itemIf("groupBy", + RelExplainUtil.fieldToString(grouping, inputRel.getRowType), grouping.nonEmpty) + .itemIf("partialFinalType", partialFinalType, partialFinalType != PartialFinalType.NONE) + .item("select", RelExplainUtil.streamGroupAggregationToString( + inputRel.getRowType, + getRowType, + globalAggInfoList, + grouping, + isGlobal = true)) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala new file mode 100644 index 00000000000000..e28678f0b3990d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregate.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.PartialFinalType +import org.apache.flink.table.plan.util.RelExplainUtil + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter} + +import java.util + +/** + * Stream physical RelNode for unbounded group aggregate. + * + * This node does support un-splittable aggregate function (e.g. STDDEV_POP). + * + * @see [[StreamExecGroupAggregateBase]] for more info. + */ +class StreamExecGroupAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + outputRowType: RelDataType, + val aggCalls: Seq[AggregateCall], + val grouping: Array[Int], + var partialFinalType: PartialFinalType = PartialFinalType.NONE) + extends StreamExecGroupAggregateBase(cluster, traitSet, inputRel) { + + override def producesUpdates = true + + override def needsUpdatesAsRetraction(input: RelNode) = true + + override def consumesRetractions = true + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def deriveRowType(): RelDataType = outputRowType + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecGroupAggregate( + cluster, + traitSet, + inputs.get(0), + outputRowType, + aggCalls, + grouping) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val inputRowType = getInput.getRowType + super.explainTerms(pw) + .itemIf("groupBy", + RelExplainUtil.fieldToString(grouping, inputRowType), grouping.nonEmpty) + .itemIf("partialFinalType", partialFinalType, partialFinalType != PartialFinalType.NONE) + // TODO print aggInfoList + .item("select", RelExplainUtil.streamGroupAggregationToString( + inputRowType, + getRowType, + aggCalls, + grouping)) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregateBase.scala new file mode 100644 index 00000000000000..7dd2c6bc26c74b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecGroupAggregateBase.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, SingleRel} +import org.apache.calcite.rel.core.Aggregate + +/** + * Base stream physical RelNode for unbounded group aggregate. + * + *

There are two differences between stream group aggregate and [[Aggregate]]: + * 1. This node supports two-stage aggregation to reduce data-shuffling: + * local-aggregation and global-aggregation. + * local-aggregation produces a partial result for each group before shuffle in stage 1, + * and then the partially aggregated results are shuffled by group key to global-aggregation + * which produces the final result in stage 2. + * local-global aggregation is enabled only if all aggregate functions are mergeable. + * (e.g. SUM, AVG, MAX) + * 2. stream group aggregate supports partial-final aggregation to resolve data-skew for + * distinct agg. + * partial-aggregation produces a partial distinct aggregated result based on + * group key and bucket number (which means the data is shuffled by group key and bucket number), + * and then the partially distinct aggregated result are shuffled by group key only + * to final-aggregation which produces final result. + * partial-final aggregation is enabled only if all distinct aggregate functions are splittable. + * (e.g. DISTINCT SUM, DISTINCT AVG, DISTINCT MAX) + * + *

NOTES: partial-aggregation supports local-global mode, so does final-aggregation. + */ +abstract class StreamExecGroupAggregateBase( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode) + extends SingleRel(cluster, traitSet, inputRel) + with StreamPhysicalRel { + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala new file mode 100644 index 00000000000000..c43c62d0be896d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.util.{AggregateInfoList, RelExplainUtil} + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter} + +import java.util + +/** + * Stream physical RelNode for unbounded incremental group aggregate. + * + *

Considering the following sub-plan: + * {{{ + * StreamExecGlobalGroupAggregate (final-global-aggregate) + * +- StreamExecExchange + * +- StreamExecLocalGroupAggregate (final-local-aggregate) + * +- StreamExecGlobalGroupAggregate (partial-global-aggregate) + * +- StreamExecExchange + * +- StreamExecLocalGroupAggregate (partial-local-aggregate) + * }}} + * + * partial-global-aggregate and final-local-aggregate can be combined as + * this node to share [[org.apache.flink.api.common.state.State]]. + * now the sub-plan is + * {{{ + * StreamExecGlobalGroupAggregate (final-global-aggregate) + * +- StreamExecExchange + * +- StreamExecIncrementalGroupAggregate + * +- StreamExecExchange + * +- StreamExecLocalGroupAggregate (partial-local-aggregate) + * }}} + * + * @see [[StreamExecGroupAggregateBase]] for more info. + */ +class StreamExecIncrementalGroupAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + inputRowType: RelDataType, + outputRowType: RelDataType, + val partialAggInfoList: AggregateInfoList, + finalAggInfoList: AggregateInfoList, + val finalAggCalls: Seq[AggregateCall], + val finalAggGrouping: Array[Int], + val partialAggGrouping: Array[Int]) + extends StreamExecGroupAggregateBase(cluster, traitSet, inputRel) { + + override def deriveRowType(): RelDataType = outputRowType + + override def producesUpdates = false + + override def needsUpdatesAsRetraction(input: RelNode) = true + + override def consumesRetractions = true + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecIncrementalGroupAggregate( + cluster, + traitSet, + inputs.get(0), + inputRowType, + outputRowType, + partialAggInfoList, + finalAggInfoList, + finalAggCalls, + finalAggGrouping, + partialAggGrouping) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val inputRowType = getInput.getRowType + super.explainTerms(pw) + .item("finalAggGrouping", RelExplainUtil.fieldToString(finalAggGrouping, inputRowType)) + .item("partialAggGrouping", + RelExplainUtil.fieldToString(partialAggGrouping, inputRel.getRowType)) + .item("select", RelExplainUtil.streamGroupAggregationToString( + inputRel.getRowType, + getRowType, + finalAggInfoList, + partialAggGrouping, + shuffleKey = Some(finalAggGrouping))) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoinBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoinBase.scala new file mode 100644 index 00000000000000..d5accfacc0903b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecJoinBase.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.FlinkJoinRelType +import org.apache.flink.table.plan.nodes.common.CommonPhysicalJoin + +import org.apache.calcite.plan._ +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.{CorrelationId, Join, JoinRelType} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode + +import scala.collection.JavaConversions._ + +/** + * Stream physical RelNode for [[Join]]. + */ +trait StreamExecJoinBase extends CommonPhysicalJoin with StreamPhysicalRel { + + override def producesUpdates: Boolean = { + flinkJoinType != FlinkJoinRelType.INNER && flinkJoinType != FlinkJoinRelType.SEMI + } + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = { + def getCurrentRel(rel: RelNode): RelNode = { + rel match { + case _: HepRelVertex => rel.asInstanceOf[HepRelVertex].getCurrentRel + case _ => rel + } + } + + val realInput = getCurrentRel(input) + val inputUniqueKeys = getCluster.getMetadataQuery.getUniqueKeys(realInput) + if (inputUniqueKeys != null) { + val joinKeys = if (input == getCurrentRel(getLeft)) { + keyPairs.map(_.source).toArray + } else { + keyPairs.map(_.target).toArray + } + val pkContainJoinKey = inputUniqueKeys.exists { + uniqueKey => joinKeys.forall(uniqueKey.toArray.contains(_)) + } + if (pkContainJoinKey) false else true + } else { + true + } + } + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = { + flinkJoinType match { + case FlinkJoinRelType.FULL | FlinkJoinRelType.RIGHT | FlinkJoinRelType.LEFT => true + case FlinkJoinRelType.ANTI => true + case _ => false + } + } + + override def requireWatermark: Boolean = false + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val elementRate = 100.0d * 2 // two input stream + planner.getCostFactory.makeCost(elementRate, elementRate, 0) + } + +} + +class StreamExecJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftRel: RelNode, + rightRel: RelNode, + condition: RexNode, + joinType: JoinRelType) + extends Join(cluster, traitSet, leftRel, rightRel, condition, Set.empty[CorrelationId], joinType) + with StreamExecJoinBase { + + override def copy( + traitSet: RelTraitSet, + conditionExpr: RexNode, + left: RelNode, + right: RelNode, + joinType: JoinRelType, + semiJoinDone: Boolean): Join = { + new StreamExecJoin(cluster, traitSet, left, right, conditionExpr, joinType) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala new file mode 100644 index 00000000000000..0b16192d4e4e71 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.PartialFinalType +import org.apache.flink.table.plan.util.{AggregateInfoList, RelExplainUtil} + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter} + +import java.util + +/** + * Stream physical RelNode for unbounded global group aggregate. + * + * @see [[StreamExecGroupAggregateBase]] for more info. + */ +class StreamExecLocalGroupAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + outputRowType: RelDataType, + val aggInfoList: AggregateInfoList, + val grouping: Array[Int], + val aggCalls: Seq[AggregateCall], + val partialFinalType: PartialFinalType) + extends StreamExecGroupAggregateBase(cluster, traitSet, inputRel) { + + override def producesUpdates = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions = true + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def deriveRowType(): RelDataType = outputRowType + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecLocalGroupAggregate( + cluster, + traitSet, + inputs.get(0), + outputRowType, + aggInfoList, + grouping, + aggCalls, + partialFinalType) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val inputRowType = getInput.getRowType + super.explainTerms(pw) + .itemIf("groupBy", RelExplainUtil.fieldToString(grouping, inputRowType), + grouping.nonEmpty) + .itemIf("partialFinalType", partialFinalType, partialFinalType != PartialFinalType.NONE) + .item("select", RelExplainUtil.streamGroupAggregationToString( + inputRowType, + getRowType, + aggInfoList, + grouping, + isLocal = true)) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala new file mode 100644 index 00000000000000..3a085aac07fd85 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecOverAggregate.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.CalcitePair +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.util.RelExplainUtil + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Window.Group +import org.apache.calcite.rel.core.{AggregateCall, Window} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rex.RexLiteral + +import java.util + +import scala.collection.JavaConverters._ + +/** + * Stream physical RelNode for time-based over [[Window]]. + */ +class StreamExecOverAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + outputRowType: RelDataType, + inputRowType: RelDataType, + logicWindow: Window) + extends SingleRel(cluster, traitSet, inputRel) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode) = true + + override def consumesRetractions = true + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = { + if (logicWindow.groups.size() != 1 + || logicWindow.groups.get(0).orderKeys.getFieldCollations.size() != 1) { + return false + } + val orderKey = logicWindow.groups.get(0).orderKeys.getFieldCollations.get(0) + val timeType = outputRowType.getFieldList.get(orderKey.getFieldIndex).getType + FlinkTypeFactory.isRowtimeIndicatorType(timeType) + } + + override def deriveRowType(): RelDataType = outputRowType + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecOverAggregate( + cluster, + traitSet, + inputs.get(0), + outputRowType, + inputRowType, + logicWindow + ) + } + + override def estimateRowCount(mq: RelMetadataQuery): Double = { + // over window: one input at least one output (do not introduce retract amplification) + mq.getRowCount(getInput) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + // by default, assume cost is proportional to number of rows + val rowCnt: Double = mq.getRowCount(this) + val count = (getRowType.getFieldCount - 1) * 1.0 / inputRel.getRowType.getFieldCount + planner.getCostFactory.makeCost(rowCnt, rowCnt * count, 0) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val overWindow: Group = logicWindow.groups.get(0) + val constants: Seq[RexLiteral] = logicWindow.constants.asScala + val partitionKeys: Array[Int] = overWindow.keys.toArray + val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + + super.explainTerms(pw) + .itemIf("partitionBy", RelExplainUtil.fieldToString(partitionKeys, inputRowType), + partitionKeys.nonEmpty) + .item("orderBy", RelExplainUtil.collationToString(overWindow.orderKeys, inputRowType)) + .item("window", RelExplainUtil.windowRangeToString(logicWindow, overWindow)) + .item("select", RelExplainUtil.overAggregationToString( + inputRowType, + outputRowType, + constants, + namedAggregates)) + } + + private def generateNamedAggregates: Seq[CalcitePair[AggregateCall, String]] = { + val overWindow: Group = logicWindow.groups.get(0) + + val aggregateCalls = overWindow.getAggregateCalls(logicWindow) + for (i <- 0 until aggregateCalls.size()) + yield new CalcitePair[AggregateCall, String](aggregateCalls.get(i), "w0$o" + i) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecRank.scala new file mode 100644 index 00000000000000..eb6b9e98d2d061 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecRank.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.`trait`.TraitUtil +import org.apache.flink.table.plan.nodes.calcite.{Rank, RankRange} +import org.apache.flink.table.plan.util.{RelExplainUtil, UpdatingPlanChecker} + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql.SqlRankFunction +import org.apache.calcite.util.ImmutableBitSet + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Stream physical RelNode for [[Rank]]. + * + * @see [[StreamExecTemporalSort]] which must be time-ascending-order sort without `limit`. + * @see [[StreamExecSort]] which can be used for testing now, its sort key can be any type. + */ +class StreamExecRank( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + rankFunction: SqlRankFunction, + partitionKey: ImmutableBitSet, + sortCollation: RelCollation, + rankRange: RankRange, + val outputRankFunColumn: Boolean) + extends Rank( + cluster, + traitSet, + inputRel, + rankFunction, + partitionKey, + sortCollation, + rankRange) + with StreamPhysicalRel { + + /** please uses [[getStrategy]] instead of this field */ + private var strategy: RankStrategy = _ + + def getStrategy(forceRecompute: Boolean = false): RankStrategy = { + if (strategy == null || forceRecompute) { + strategy = analyzeRankStrategy + } + strategy + } + + override def producesUpdates = true + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = { + getStrategy(forceRecompute = true) == RetractRank + } + + override def consumesRetractions = true + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def deriveRowType(): RelDataType = { + if (outputRankFunColumn) { + super.deriveRowType() + } else { + inputRel.getRowType + } + } + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecRank( + cluster, + traitSet, + inputs.get(0), + rankFunction, + partitionKey, + sortCollation, + rankRange, + outputRankFunColumn) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val inputRowType = inputRel.getRowType + pw.input("input", getInput) + .item("strategy", getStrategy()) + .item("rankFunction", rankFunction.getKind) + .item("rankRange", rankRange.toString(inputRowType.getFieldNames)) + .item("partitionBy", RelExplainUtil.fieldToString(partitionKey.toArray, inputRowType)) + .item("orderBy", RelExplainUtil.collationToString(sortCollation, inputRowType)) + .item("select", getRowType.getFieldNames.mkString(", ")) + } + + private def analyzeRankStrategy: RankStrategy = { + val rankInput = getInput + val mq = cluster.getMetadataQuery + val isUpdateStream = !UpdatingPlanChecker.isAppendOnly(rankInput) + + if (isUpdateStream) { + val inputIsAccRetract = TraitUtil.isAccRetract(rankInput) + val uniqueKeys = mq.getUniqueKeys(rankInput) + if (inputIsAccRetract || uniqueKeys == null || uniqueKeys.isEmpty + // unique key should contains partition key + || !uniqueKeys.exists(k => k.contains(partitionKey))) { + // input is AccRetract or extract the unique keys failed, + // and we fall back to using retract rank + RetractRank + } else { + // TODO get `isMonotonic` value by RelModifiedMonotonicity handler + val isMonotonic = false + + if (isMonotonic) { + //FIXME choose a set of primary key + UpdateFastRank(uniqueKeys.iterator().next().toArray) + } else { + val fieldCollations = sortCollation.getFieldCollations + if (fieldCollations.length == 1) { + // single sort key in update stream scenario (no monotonic) + // we can utilize unary rank function to speed up processing + UnaryUpdateRank(uniqueKeys.iterator().next().toArray) + } else { + // no other choices, have to use retract rank + RetractRank + } + } + } + } else { + AppendFastRank + } + } +} + +/** + * Base class of Strategy to choose different process function. + */ +sealed trait RankStrategy + +case object AppendFastRank extends RankStrategy + +case object RetractRank extends RankStrategy + +case class UpdateFastRank(primaryKeys: Array[Int]) extends RankStrategy { + override def toString: String = "UpdateFastRank" + primaryKeys.mkString("[", ",", "]") +} + +case class UnaryUpdateRank(primaryKeys: Array[Int]) extends RankStrategy { + override def toString: String = "UnaryUpdateRank" + primaryKeys.mkString("[", ",", "]") +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala new file mode 100644 index 00000000000000..7b74b67df3aa80 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.nodes.calcite.Sink +import org.apache.flink.table.sinks._ + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.RelNode + +import java.util + +/** + * Stream physical RelNode to to write data into an external sink defined by a [[TableSink]]. + */ +class StreamExecSink[T]( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + sink: TableSink[T], + sinkName: String) + extends Sink(cluster, traitSet, inputRel, sink, sinkName) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = + sink.isInstanceOf[BaseRetractStreamTableSink[_]] + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecSink(cluster, traitSet, inputs.get(0), sink, sinkName) + } + + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSort.scala new file mode 100644 index 00000000000000..2a366c3707cc67 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSort.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.annotation.Experimental +import org.apache.flink.table.plan.util.RelExplainUtil + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.core.Sort +import org.apache.calcite.rex.RexNode + +/** + * Stream physical RelNode for [[Sort]]. + * + *

NOTES: This class is used for testing now. + * + * @see [[StreamExecRank]] which must be with `limit` order by. + * @see [[StreamExecTemporalSort]] which must be time-ascending-order sort without `limit`. + * + *

e.g. + *

''SELECT * FROM TABLE ORDER BY ROWTIME, a'' will be converted to [[StreamExecTemporalSort]] + *

''SELECT * FROM TABLE ORDER BY a LIMIT 2'' will be converted to [[StreamExecRank]] + *

''SELECT * FROM TABLE ORDER BY a, ROWTIME'' will be converted to [[StreamExecSort]] + */ +@Experimental +class StreamExecSort( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + sortCollation: RelCollation) + extends Sort(cluster, traitSet, inputRel, sortCollation) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def copy( + traitSet: RelTraitSet, + input: RelNode, + newCollation: RelCollation, + offset: RexNode, + fetch: RexNode): Sort = { + new StreamExecSort(cluster, traitSet, input, newCollation) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + pw.input("input", getInput()) + .item("orderBy", RelExplainUtil.collationToString(sortCollation, getRowType)) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala new file mode 100644 index 00000000000000..03081f66be5742 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTableSourceScan.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan +import org.apache.flink.table.plan.schema.FlinkRelOptTable +import org.apache.flink.table.sources.StreamTableSource + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.metadata.RelMetadataQuery + +/** + * Stream physical RelNode to read data from an external source defined by a [[StreamTableSource]]. + */ +class StreamExecTableSourceScan( + cluster: RelOptCluster, + traitSet: RelTraitSet, + relOptTable: FlinkRelOptTable) + extends PhysicalTableSourceScan(cluster, traitSet, relOptTable) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new StreamExecTableSourceScan(cluster, traitSet, relOptTable) + } + + override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { + val rowCnt = mq.getRowCount(this) + val rowSize = mq.getAverageRowSize(this) + planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalSort.scala new file mode 100644 index 00000000000000..5a20acb4dd7ac9 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecTemporalSort.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.util.RelExplainUtil + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel._ +import org.apache.calcite.rel.core.Sort +import org.apache.calcite.rex.RexNode + +/** + * Stream physical RelNode for time-ascending-order [[Sort]] without `limit`. + * + * @see [[StreamExecRank]] which must be with `limit` order by. + * @see [[StreamExecSort]] which can be used for testing now, its sort key can be any type. + */ +class StreamExecTemporalSort( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRel: RelNode, + sortCollation: RelCollation) + extends Sort(cluster, traitSet, inputRel, sortCollation) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def copy( + traitSet: RelTraitSet, + input: RelNode, + newCollation: RelCollation, + offset: RexNode, + fetch: RexNode): Sort = { + new StreamExecTemporalSort(cluster, traitSet, input, newCollation) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + pw.input("input", getInput()) + .item("orderBy", RelExplainUtil.collationToString(sortCollation, getRowType)) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecUnion.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecUnion.scala new file mode 100644 index 00000000000000..93b22e4f54535d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecUnion.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.{SetOp, Union} +import org.apache.calcite.rel.{RelNode, RelWriter} + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Stream physical RelNode for [[Union]]. + */ +class StreamExecUnion( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputRels: util.List[RelNode], + all: Boolean, + outputRowType: RelDataType) + extends Union(cluster, traitSet, inputRels, all) + with StreamPhysicalRel { + + require(all, "Only support union all") + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def deriveRowType(): RelDataType = outputRowType + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode], all: Boolean): SetOp = { + new StreamExecUnion(cluster, traitSet, inputs, all, outputRowType) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("union", outputRowType.getFieldNames.mkString(", ")) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala new file mode 100644 index 00000000000000..b87080a525125b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import com.google.common.collect.ImmutableList +import org.apache.calcite.plan._ +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Values +import org.apache.calcite.rex.RexLiteral + +/** + * Stream physical RelNode for [[Values]]. + */ +class StreamExecValues( + cluster: RelOptCluster, + traitSet: RelTraitSet, + tuples: ImmutableList[ImmutableList[RexLiteral]], + outputRowType: RelDataType) + extends Values(cluster, outputRowType, tuples, traitSet) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def deriveRowType(): RelDataType = outputRowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new StreamExecValues(cluster, traitSet, getTuples, outputRowType) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala new file mode 100644 index 00000000000000..facd7126ebdad1 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.api.{TableConfigOptions, TableException} +import org.apache.flink.table.plan.`trait`.{MiniBatchIntervalTraitDef, MiniBatchMode} +import org.apache.flink.table.plan.nodes.calcite.WatermarkAssigner +import org.apache.flink.table.plan.optimize.program.FlinkOptimizeContext +import org.apache.flink.util.Preconditions + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.{RelNode, RelWriter} + +import java.util + +/** + * Stream physical RelNode for [[WatermarkAssigner]]. + */ +class StreamExecWatermarkAssigner( + cluster: RelOptCluster, + traits: RelTraitSet, + inputRel: RelNode, + rowtimeFieldIndex: Option[Int], + watermarkDelay: Option[Long]) + extends WatermarkAssigner(cluster, traits, inputRel, rowtimeFieldIndex, watermarkDelay) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = false + + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { + new StreamExecWatermarkAssigner( + cluster, + traitSet, + inputs.get(0), + rowtimeFieldIndex, + watermarkDelay) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val miniBatchInterval = traits.getTrait(MiniBatchIntervalTraitDef.INSTANCE).getMiniBatchInterval + + val value = miniBatchInterval.mode match { + case MiniBatchMode.None => + // 1. operator requiring watermark, but minibatch is not enabled + // 2. redundant watermark definition in DDL + // 3. existing window, and window minibatch is disabled. + "None" + case MiniBatchMode.ProcTime => + val config = cluster.getPlanner.getContext.asInstanceOf[FlinkOptimizeContext].getTableConfig + val miniBatchLatency = config.getConf.getLong( + TableConfigOptions.SQL_EXEC_MINIBATCH_ALLOW_LATENCY) + Preconditions.checkArgument(miniBatchLatency > 0, + "MiniBatch latency must be greater that 0.", null) + s"Proctime, ${miniBatchLatency}ms" + case MiniBatchMode.RowTime => + s"Rowtime, ${miniBatchInterval.interval}ms" + case o => throw new TableException(s"Unsupported mode: $o") + } + super.explainTerms(pw).item("miniBatchInterval", value) + } + +} + +object StreamExecWatermarkAssigner { + + def createRowTimeWatermarkAssigner( + cluster: RelOptCluster, + traits: RelTraitSet, + inputRel: RelNode, + rowtimeFieldIndex: Int, + watermarkDelay: Long): StreamExecWatermarkAssigner = { + new StreamExecWatermarkAssigner( + cluster, + traits, + inputRel, + Some(rowtimeFieldIndex), + Some(watermarkDelay)) + } + + def createIngestionTimeWatermarkAssigner( + cluster: RelOptCluster, + traits: RelTraitSet, + inputRel: RelNode): StreamExecWatermarkAssigner = { + new StreamExecWatermarkAssigner(cluster, traits, inputRel, None, None) + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWindowJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWindowJoin.scala new file mode 100644 index 00000000000000..96c6e74eb798e6 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecWindowJoin.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.physical.stream + +import org.apache.flink.table.plan.FlinkJoinRelType +import org.apache.flink.table.plan.util.RelExplainUtil + +import org.apache.calcite.plan._ +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter} +import org.apache.calcite.rex.RexNode + +import scala.collection.JavaConversions._ + +/** + * Stream physical RelNode for a time windowed stream join. + */ +class StreamExecWindowJoin( + cluster: RelOptCluster, + traitSet: RelTraitSet, + leftRel: RelNode, + rightRel: RelNode, + val joinCondition: RexNode, + val joinType: JoinRelType, + leftInputRowType: RelDataType, + rightInputRowType: RelDataType, + outputRowType: RelDataType, + val isRowTime: Boolean, + leftLowerBound: Long, + leftUpperBound: Long, + leftTimeIndex: Int, + rightTimeIndex: Int, + remainCondition: Option[RexNode]) + extends BiRel(cluster, traitSet, leftRel, rightRel) + with StreamPhysicalRel { + + override def producesUpdates: Boolean = false + + override def needsUpdatesAsRetraction(input: RelNode): Boolean = false + + override def consumesRetractions: Boolean = false + + override def producesRetractions: Boolean = false + + override def requireWatermark: Boolean = isRowTime + + override def deriveRowType(): RelDataType = outputRowType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new StreamExecWindowJoin( + cluster, + traitSet, + inputs.get(0), + inputs.get(1), + joinCondition, + joinType, + leftInputRowType, + rightInputRowType, + outputRowType, + isRowTime, + leftLowerBound, + leftUpperBound, + leftTimeIndex, + rightTimeIndex, + remainCondition) + } + + override def explainTerms(pw: RelWriter): RelWriter = { + val windowBounds = s"isRowTime=$isRowTime, leftLowerBound=$leftLowerBound, " + + s"leftUpperBound=$leftUpperBound, leftTimeIndex=$leftTimeIndex, " + + s"rightTimeIndex=$rightTimeIndex" + super.explainTerms(pw) + .item("where", + RelExplainUtil.expressionToString(joinCondition, outputRowType, getExpressionString)) + .item("join", getRowType.getFieldNames.mkString(", ")) + .item("joinType", + RelExplainUtil.joinTypeToString(FlinkJoinRelType.toFlinkJoinRelType(joinType))) + .item("windowBounds", windowBounds) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamPhysicalRel.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamPhysicalRel.scala index b402d1f75d844c..0a7d03445ab399 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamPhysicalRel.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamPhysicalRel.scala @@ -30,22 +30,26 @@ trait StreamPhysicalRel extends FlinkPhysicalRel { /** * Whether the [[StreamPhysicalRel]] produces update and delete changes. */ - def producesUpdates: Boolean = false + def producesUpdates: Boolean /** * Whether the [[StreamPhysicalRel]] requires retraction messages or not. */ - def needsUpdatesAsRetraction(input: RelNode): Boolean = false + def needsUpdatesAsRetraction(input: RelNode): Boolean /** * Whether the [[StreamPhysicalRel]] consumes retraction messages instead of forwarding them. * The node might or might not produce new retraction messages. */ - def consumesRetractions: Boolean = false + def consumesRetractions: Boolean /** * Whether the [[StreamPhysicalRel]] produces retraction messages. */ - def producesRetractions: Boolean = false + def producesRetractions: Boolean + /** + * Whether the [[StreamPhysicalRel]] requires rowtime watermark in processing logic. + */ + def requireWatermark: Boolean } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeContext.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeContext.scala index 4739ae4ca77bf2..59f887d3e18b74 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeContext.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkOptimizeContext.scala @@ -18,6 +18,8 @@ package org.apache.flink.table.plan.optimize.program +import org.apache.flink.table.api.TableConfig + import org.apache.calcite.plan.Context import org.apache.calcite.plan.volcano.VolcanoPlanner @@ -26,6 +28,11 @@ import org.apache.calcite.plan.volcano.VolcanoPlanner */ trait FlinkOptimizeContext extends Context { + /** + * Gets [[TableConfig]] instance defined in [[org.apache.flink.table.api.TableEnvironment]]. + */ + def getTableConfig: TableConfig + /** * Gets [[VolcanoPlanner]] instance defined in [[org.apache.flink.table.api.TableEnvironment]]. */ diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala new file mode 100644 index 00000000000000..49fcc7fb76d688 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTrait.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.`trait` + +import org.apache.flink.table.plan.`trait`.MiniBatchMode.MiniBatchMode + +import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef} + +/** + * The MiniBatchIntervalTrait is used to describe how the elements are divided into batches + * when flowing out from a [[org.apache.calcite.rel.RelNode]], + * e,g,. MiniBatchIntervalTrait(1000L, ProcTime) + * means elements are divided into 1000ms proctime mini batches. + */ +class MiniBatchIntervalTrait(miniBatchInterval: MiniBatchInterval) extends RelTrait { + + def getMiniBatchInterval: MiniBatchInterval = miniBatchInterval + + override def getTraitDef: RelTraitDef[_ <: RelTrait] = MiniBatchIntervalTraitDef.INSTANCE + + override def satisfies(`trait`: RelTrait): Boolean = this.equals(`trait`) + + override def register(planner: RelOptPlanner): Unit = {} + + override def hashCode(): Int = { + miniBatchInterval + .interval + .hashCode() + } + + override def equals(obj: Any): Boolean = { + obj match { + case eTrait: MiniBatchIntervalTrait => + this.getMiniBatchInterval == eTrait.getMiniBatchInterval + case _ => false + } + } + + override def toString: String = miniBatchInterval.mode + ": " + miniBatchInterval.interval +} + +/** + * @param interval interval of minibatch + * @param mode type of minibatch: rowtime/proctime + */ +case class MiniBatchInterval(interval: Long, mode: MiniBatchMode) + +object MiniBatchInterval { + val NONE = MiniBatchInterval(0L, MiniBatchMode.None) +} + +object MiniBatchIntervalTrait { + val NONE = new MiniBatchIntervalTrait(MiniBatchInterval.NONE) +} + +/** + * The type of minibatch interval: rowtime or proctime. + */ +object MiniBatchMode extends Enumeration { + type MiniBatchMode = Value + /** + * An operator in [[ProcTime]] mode requires watermarks emitted in proctime interval, + * i.e., unbounded group agg with minibatch enabled. + */ + val ProcTime = Value + + /** + * An operator in [[RowTime]] mode requires watermarks extracted from elements, + * and emitted in rowtime interval, e.g., window, window join... + */ + val RowTime = Value + + /** + * Default value, meaning no minibatch interval is required. + */ + val None = Value +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTraitDef.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTraitDef.scala new file mode 100644 index 00000000000000..5de31780c496c7 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/MiniBatchIntervalTraitDef.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.`trait` + +import org.apache.calcite.plan.{RelOptPlanner, RelTraitDef} +import org.apache.calcite.rel.RelNode + +class MiniBatchIntervalTraitDef extends RelTraitDef[MiniBatchIntervalTrait] { + + override def getTraitClass: Class[MiniBatchIntervalTrait] = classOf[MiniBatchIntervalTrait] + + override def getSimpleName: String = this.getClass.getSimpleName + + override def convert( + planner: RelOptPlanner, + rel: RelNode, + toTrait: MiniBatchIntervalTrait, + allowInfiniteCostConverters: Boolean): RelNode = { + rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs) + } + + override def canConvert( + planner: RelOptPlanner, + fromTrait: MiniBatchIntervalTrait, + toTrait: MiniBatchIntervalTrait): Boolean = true + + override def getDefault: MiniBatchIntervalTrait = MiniBatchIntervalTrait.NONE +} + +object MiniBatchIntervalTraitDef { + val INSTANCE = new MiniBatchIntervalTraitDef +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/TraitUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/TraitUtil.scala new file mode 100644 index 00000000000000..1dbb54ae536367 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/TraitUtil.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.`trait` + +import org.apache.calcite.plan.RelTrait +import org.apache.calcite.rel.{RelCollation, RelCollations, RelFieldCollation, RelNode} +import org.apache.calcite.util.mapping.Mappings + +import scala.collection.JavaConversions._ +import scala.collection.mutable + +/** + * Utility for [[RelTrait]] + */ +object TraitUtil { + + /** + * Apply collation based on the given mapping restrict. Returns RelCollations.EMPTY if there + * exists collation fields which has no target values in the given mapping. + * + * @param collation collation which to apply mapping + * @param mapping mapping columns to a target. + * @return A new collation after apply collation based on the given mapping restrict. + * Returns RelCollations.EMPTY if there exists collation fields which has no target + * values in the given mapping. + */ + def apply(collation: RelCollation, mapping: Mappings.TargetMapping): RelCollation = { + val fieldCollations = collation.getFieldCollations + if (fieldCollations.isEmpty) collation + else { + val newFieldCollations = mutable.ArrayBuffer[RelFieldCollation]() + fieldCollations.foreach { fieldCollation => + try { + val i = mapping.getTargetOpt(fieldCollation.getFieldIndex) + if (i >= 0) newFieldCollations.add(fieldCollation.copy(i)) else return RelCollations.EMPTY + } catch { + case _: IndexOutOfBoundsException => return RelCollations.EMPTY + } + } + RelCollations.of(newFieldCollations: _*) + } + } + + /** + * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode. + */ + def isAccRetract(rel: RelNode): Boolean = { + val accModeTrait = rel.getTraitSet.getTrait(AccModeTraitDef.INSTANCE) + null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/retractionTraitDefs.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/retractionTraitDefs.scala new file mode 100644 index 00000000000000..9cfae7e843d688 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/retractionTraitDefs.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.`trait` + +import org.apache.calcite.plan.{RelOptPlanner, RelTraitDef} +import org.apache.calcite.rel.RelNode + +/** + * Definition of the [[UpdateAsRetractionTrait]]. + */ +class UpdateAsRetractionTraitDef extends RelTraitDef[UpdateAsRetractionTrait] { + override def convert( + planner: RelOptPlanner, + rel: RelNode, + toTrait: UpdateAsRetractionTrait, + allowInfiniteCostConverters: Boolean): RelNode = { + + rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs) + } + + override def canConvert( + planner: RelOptPlanner, + fromTrait: UpdateAsRetractionTrait, + toTrait: UpdateAsRetractionTrait): Boolean = true + + override def getTraitClass: Class[UpdateAsRetractionTrait] = classOf[UpdateAsRetractionTrait] + + override def getSimpleName: String = this.getClass.getSimpleName + + override def getDefault: UpdateAsRetractionTrait = UpdateAsRetractionTrait.DEFAULT +} + +object UpdateAsRetractionTraitDef { + val INSTANCE = new UpdateAsRetractionTraitDef +} + +/** + * Definition of the [[AccModeTrait]]. + */ +class AccModeTraitDef extends RelTraitDef[AccModeTrait] { + + override def convert( + planner: RelOptPlanner, + rel: RelNode, + toTrait: AccModeTrait, + allowInfiniteCostConverters: Boolean): RelNode = { + + rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs) + } + + override def canConvert( + planner: RelOptPlanner, + fromTrait: AccModeTrait, + toTrait: AccModeTrait): Boolean = true + + override def getTraitClass: Class[AccModeTrait] = classOf[AccModeTrait] + + override def getSimpleName: String = this.getClass.getSimpleName + + override def getDefault: AccModeTrait = AccModeTrait.DEFAULT +} + +object AccModeTraitDef { + val INSTANCE = new AccModeTraitDef +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/retractionTraits.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/retractionTraits.scala new file mode 100644 index 00000000000000..49a9a919ead04c --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/trait/retractionTraits.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.`trait` + +import org.apache.flink.table.plan.`trait`.AccMode.AccMode + +import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef} +import org.apache.calcite.rel.RelNode + +/** + * Tracks if a [[RelNode]] needs to send update and delete changes as + * retraction messages. + */ +class UpdateAsRetractionTrait extends RelTrait { + + /** + * Defines whether the [[RelNode]] needs to send update and delete + * changes as retraction messages. + */ + private var updateAsRetraction: Boolean = false + + def this(updateAsRetraction: Boolean) { + this() + this.updateAsRetraction = updateAsRetraction + } + + def sendsUpdatesAsRetractions: Boolean = updateAsRetraction + + override def register(planner: RelOptPlanner): Unit = {} + + override def getTraitDef: RelTraitDef[_ <: RelTrait] = UpdateAsRetractionTraitDef.INSTANCE + + override def satisfies(`trait`: RelTrait): Boolean = this.equals(`trait`) + + override def toString: String = updateAsRetraction.toString + +} + +object UpdateAsRetractionTrait { + val DEFAULT = new UpdateAsRetractionTrait(false) +} + +/** + * Tracks the AccMode of a [[RelNode]]. + */ +class AccModeTrait extends RelTrait { + + /** Defines the accumulating mode for a operator. */ + private var accMode = AccMode.Acc + + def this(accMode: AccMode) { + this() + this.accMode = accMode + } + + def getAccMode: AccMode = accMode + + override def register(planner: RelOptPlanner): Unit = {} + + override def getTraitDef: RelTraitDef[_ <: RelTrait] = AccModeTraitDef.INSTANCE + + override def satisfies(`trait`: RelTrait): Boolean = this.equals(`trait`) + + override def toString: String = accMode.toString +} + +object AccModeTrait { + val DEFAULT = new AccModeTrait(AccMode.Acc) +} + +/** + * The [[AccMode]] determines how insert, update, and delete changes of tables are encoded + * by the messeages that an operator emits. + */ +object AccMode extends Enumeration { + type AccMode = Value + + /** + * An operator in [[Acc]] mode emits change messages as + * [[org.apache.flink.table.dataformat.BaseRow]] which encode a data row with header info, + * logically equivalent to (boolean, row). + * + * An operator in [[Acc]] mode may only produce update and delete messages, if the table has + * a unique key and all key attributes are contained in the Row. + * + * Changes are encoded as follows: + * - insert: (true, NewRow) + * - update: (true, NewRow) // the Row includes the full unique key to identify the row to update + * - delete: (false, OldRow) // the Row includes the full unique key to idenify the row to delete + * + */ + val Acc = Value + + /** + * * An operator in [[AccRetract]] mode emits change messages as + * [[org.apache.flink.table.dataformat.BaseRow]] which encode a data row with header info, + * logically equivalent to (boolean, row). + * + * Changes are encoded as follows: + * - insert: (true, NewRow) + * - update: (false, OldRow), (true, NewRow) // updates are encoded in two messages! + * - delete: (false, OldRow) + * + */ + val AccRetract = Value +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala index 5068e19e8b2c6e..dfdb1f0b503c90 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/FlinkRelOptUtil.scala @@ -20,17 +20,18 @@ package org.apache.flink.table.plan.util import org.apache.flink.api.common.operators.Order import org.apache.flink.table.api.TableException +import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelFieldCollation.Direction -import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.core.{AggregateCall, JoinInfo} import org.apache.calcite.rel.{RelFieldCollation, RelNode} import org.apache.calcite.rex.{RexLiteral, RexNode} import org.apache.calcite.sql.SqlKind +import org.apache.calcite.util.ImmutableIntList import org.apache.calcite.util.mapping.IntPair import java.util import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer object FlinkRelOptUtil { @@ -149,8 +150,8 @@ object FlinkRelOptUtil { right: RelNode, allowEmptyKey: Boolean = false): (Array[Int], Array[Int]) = { // get the equality keys - val leftKeys = ArrayBuffer.empty[Int] - val rightKeys = ArrayBuffer.empty[Int] + val leftKeys = mutable.ArrayBuffer.empty[Int] + val rightKeys = mutable.ArrayBuffer.empty[Int] if (keyPairs.isEmpty) { if (allowEmptyKey) { (leftKeys.toArray, rightKeys.toArray) @@ -185,4 +186,29 @@ object FlinkRelOptUtil { (leftKeys.toArray, rightKeys.toArray) } } + + /** + * Creates a [[JoinInfo]] by analyzing a condition. + * + *

NOTES: the functionality of the method is same with [[JoinInfo#of]], + * the only difference is that the methods could return `filterNulls`. + */ + def createJoinInfo( + left: RelNode, + right: RelNode, + condition: RexNode, + filterNulls: util.List[java.lang.Boolean]): JoinInfo = { + val leftKeys = new util.ArrayList[Integer] + val rightKeys = new util.ArrayList[Integer] + val remaining = RelOptUtil.splitJoinCondition( + left, right, condition, leftKeys, rightKeys, filterNulls) + + if (remaining.isAlwaysTrue) { + JoinInfo.of(ImmutableIntList.copyOf(leftKeys), ImmutableIntList.copyOf(rightKeys)) + } else { + // TODO create NonEquiJoinInfo directly + JoinInfo.of(left, right, condition) + } + } + } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala index fb2219dc29fa8f..80b3009c943bfa 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelExplainUtil.scala @@ -22,7 +22,7 @@ import org.apache.flink.table.api.TableException import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction} import org.apache.flink.table.plan.FlinkJoinRelType -import org.apache.calcite.rel.RelFieldCollation +import org.apache.calcite.rel.RelCollation import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.Window.Group import org.apache.calcite.rel.core.{AggregateCall, Window} @@ -47,14 +47,27 @@ object RelExplainUtil { } /** - * Converts sort fields to String. + * Converts [[RelCollation]] to String. + * + * format sort fields as field name with direction `shortString`. */ - def orderingToString( - orderFields: util.List[RelFieldCollation], - inputType: RelDataType): String = { - val fieldNames = inputType.getFieldNames - orderFields.map { - x => s"${fieldNames(x.getFieldIndex)} ${x.direction.shortString}" + def collationToString( + collation: RelCollation, + inputRowType: RelDataType): String = { + val inputFieldNames = inputRowType.getFieldNames + collation.getFieldCollations.map { c => + s"${inputFieldNames(c.getFieldIndex)} ${c.direction.shortString}" + }.mkString(", ") + } + + /** + * Converts [[RelCollation]] to String. + * + * format sort fields as field index with direction `shortString`. + */ + def collationToString(collation: RelCollation): String = { + collation.getFieldCollations.map { c => + s"$$${c.getFieldIndex} ${c.direction.shortString}" }.mkString(", ") } @@ -225,6 +238,217 @@ object RelExplainUtil { }.mkString(", ") } + def streamGroupAggregationToString( + inputRowType: RelDataType, + outputRowType: RelDataType, + aggCalls: Seq[AggregateCall], + grouping: Array[Int]): String = { + val inputFieldNames = inputRowType.getFieldNames + val outputFieldNames = outputRowType.getFieldNames + val aggStrings = aggCalls.map { call => + val distinct = if (call.isDistinct) { + if (call.getArgList.size() == 0) { + "DISTINCT" + } else { + "DISTINCT " + } + } else { + "" + } + val newArgList = call.getArgList.map(_.toInt).toList + val argListNames = if (newArgList.nonEmpty) { + newArgList.map(inputFieldNames(_)).mkString(", ") + } else { + "*" + } + + if (call.filterArg >= 0 && call.filterArg < inputFieldNames.size) { + s"${call.getAggregation}($distinct$argListNames) FILTER " + + s"${inputFieldNames(call.filterArg)}" + } else { + s"${call.getAggregation}($distinct$argListNames)" + } + } + (grouping.map(inputFieldNames(_)) ++ aggStrings).zip( + grouping.indices.map(outputFieldNames(_)) ++ outputFieldNames).map { + case (f, o) => if (f == o) { + f + } else { + s"$f AS $o" + } + }.mkString(", ") + } + + def streamGroupAggregationToString( + inputRowType: RelDataType, + outputRowType: RelDataType, + aggInfoList: AggregateInfoList, + grouping: Array[Int], + shuffleKey: Option[Array[Int]] = None, + isLocal: Boolean = false, + isGlobal: Boolean = false): String = { + + val aggInfos = aggInfoList.aggInfos + val distinctInfos = aggInfoList.distinctInfos + val distinctFieldNames = distinctInfos.indices.map(index => s"distinct$$$index") + // aggIndex -> distinctFieldName + val distinctAggs = distinctInfos.zip(distinctFieldNames) + .flatMap(f => f._1.aggIndexes.map(i => (i, f._2))) + .toMap + val aggFilters = { + val distinctAggFilters = distinctInfos + .flatMap(d => d.aggIndexes.zip(d.filterArgs)) + .toMap + val otherAggFilters = aggInfos + .map(info => (info.aggIndex, info.agg.filterArg)) + .toMap + otherAggFilters ++ distinctAggFilters + } + + val inFieldNames = inputRowType.getFieldNames.toList.toArray + val outFieldNames = outputRowType.getFieldNames.toList.toArray + val groupingNames = grouping.map(inFieldNames(_)) + val aggOffset = shuffleKey match { + case None => grouping.length + case Some(k) => k.length + } + val isIncremental: Boolean = shuffleKey.isDefined + + val aggStrings = if (isLocal) { + stringifyLocalAggregates(aggInfos, distinctInfos, distinctAggs, aggFilters, inFieldNames) + } else if (isGlobal || isIncremental) { + val accFieldNames = inputRowType.getFieldNames.toList.toArray + val aggOutputFieldNames = localAggOutputFieldNames(aggOffset, aggInfos, accFieldNames) + stringifyGlobalAggregates(aggInfos, distinctAggs, aggOutputFieldNames) + } else { + stringifyAggregates(aggInfos, distinctAggs, aggFilters, inFieldNames) + } + + val outputFieldNames = if (isLocal) { + grouping.map(inFieldNames(_)) ++ localAggOutputFieldNames(aggOffset, aggInfos, outFieldNames) + } else if (isIncremental) { + val accFieldNames = inputRowType.getFieldNames.toList.toArray + grouping.map(inFieldNames(_)) ++ localAggOutputFieldNames(aggOffset, aggInfos, accFieldNames) + } else { + outFieldNames + } + + (groupingNames ++ aggStrings).zip(outputFieldNames).map { + case (f, o) if f == o => f + case (f, o) => s"$f AS $o" + }.mkString(", ") + } + + private def stringifyGlobalAggregates( + aggInfos: Array[AggregateInfo], + distinctAggs: Map[Int, String], + accFieldNames: Seq[String]): Array[String] = { + aggInfos.zipWithIndex.map { case (aggInfo, index) => + val buf = new mutable.StringBuilder + buf.append(aggInfo.agg.getAggregation) + if (aggInfo.consumeRetraction) { + buf.append("_RETRACT") + } + buf.append("(") + if (index >= accFieldNames.length) { + println() + } + val argNames = accFieldNames(index) + if (distinctAggs.contains(index)) { + buf.append(s"${distinctAggs(index)} ") + } + buf.append(argNames).append(")") + buf.toString + } + } + + private def stringifyLocalAggregates( + aggInfos: Array[AggregateInfo], + distincts: Array[DistinctInfo], + distinctAggs: Map[Int, String], + aggFilters: Map[Int, Int], + inFieldNames: Array[String]): Array[String] = { + val aggStrs = aggInfos.zipWithIndex.map { case (aggInfo, index) => + val buf = new mutable.StringBuilder + buf.append(aggInfo.agg.getAggregation) + if (aggInfo.consumeRetraction) { + buf.append("_RETRACT") + } + buf.append("(") + val argNames = aggInfo.agg.getArgList.map(inFieldNames(_)) + if (distinctAggs.contains(index)) { + buf.append(if (argNames.nonEmpty) s"${distinctAggs(index)} " else distinctAggs(index)) + } + val argNameStr = if (argNames.nonEmpty) { + argNames.mkString(", ") + } else { + "*" + } + buf.append(argNameStr).append(")") + if (aggFilters(index) >= 0) { + val filterName = inFieldNames(aggFilters(index)) + buf.append(" FILTER ").append(filterName) + } + buf.toString + } + val distinctStrs = distincts.map { distinctInfo => + val argNames = distinctInfo.argIndexes.map(inFieldNames(_)).mkString(", ") + s"DISTINCT($argNames)" + } + aggStrs ++ distinctStrs + } + + private def localAggOutputFieldNames( + aggOffset: Int, + aggInfos: Array[AggregateInfo], + accNames: Array[String]): Array[String] = { + var offset = aggOffset + val aggOutputNames = aggInfos.map { info => + info.function match { + case _: AggregateFunction[_, _] => + val name = accNames(offset) + offset = offset + 1 + name + case _ => + // TODO supports DeclarativeAggregateFunction + throw new TableException("Unsupported now") + } + } + val distinctFieldNames = (offset until accNames.length).map(accNames) + aggOutputNames ++ distinctFieldNames + } + + private def stringifyAggregates( + aggInfos: Array[AggregateInfo], + distinctAggs: Map[Int, String], + aggFilters: Map[Int, Int], + inFields: Array[String]): Array[String] = { + // MAX_RETRACT(DISTINCT a) FILTER b + aggInfos.zipWithIndex.map { case (aggInfo, index) => + val buf = new mutable.StringBuilder + buf.append(aggInfo.agg.getAggregation) + if (aggInfo.consumeRetraction) { + buf.append("_RETRACT") + } + buf.append("(") + val argNames = aggInfo.agg.getArgList.map(inFields(_)) + if (distinctAggs.contains(index)) { + buf.append(if (argNames.nonEmpty) "DISTINCT " else "DISTINCT") + } + val argNameStr = if (argNames.nonEmpty) { + argNames.mkString(", ") + } else { + "*" + } + buf.append(argNameStr).append(")") + if (aggFilters(index) >= 0) { + val filterName = inFields(aggFilters(index)) + buf.append(" FILTER ").append(filterName) + } + buf.toString + } + } + /** * Converts over aggregate attributes to String. */ diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala new file mode 100644 index 00000000000000..e5e32e30a1d13c --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.flink.table.plan.nodes.physical.stream._ + +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.calcite.plan.volcano.RelSubset +import org.apache.calcite.rel.{RelNode, RelVisitor} +import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode} +import org.apache.calcite.sql.SqlKind + +import scala.collection.JavaConversions._ +import scala.collection.mutable + +object UpdatingPlanChecker { + + /** Validates that the plan produces only append changes. */ + def isAppendOnly(plan: RelNode): Boolean = { + val appendOnlyValidator = new AppendOnlyValidator + appendOnlyValidator.go(plan) + + appendOnlyValidator.isAppendOnly + } + + /** Extracts the unique keys of the table produced by the plan. */ + def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = { + getUniqueKeyGroups(plan).map(_.map(_._1).toArray) + } + + /** Extracts the unique keys and groups of the table produced by the plan. */ + def getUniqueKeyGroups(plan: RelNode): Option[Seq[(String, String)]] = { + val keyExtractor = new UniqueKeyExtractor + keyExtractor.visit(plan) + } + + private class AppendOnlyValidator extends RelVisitor { + + var isAppendOnly = true + + override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = { + node match { + case s: StreamPhysicalRel if s.producesUpdates => + isAppendOnly = false + case hep: HepRelVertex => + visit(hep.getCurrentRel, ordinal, parent) //remove wrapper node + case rs: RelSubset => + visit(rs.getOriginal, ordinal, parent) //remove wrapper node + case _ => + super.visit(node, ordinal, parent) + } + } + } + + /** Identifies unique key fields in the output of a RelNode. */ + private class UniqueKeyExtractor { + + // visit() function will return a tuple, the first element is the name of a key field, the + // second is a group name that is shared by all equivalent key fields. The group names are + // used to identify same keys, for example: select('pk as pk1, 'pk as pk2), both pk1 and pk2 + // belong to the same group, i.e., pk1. Here we use the lexicographic smallest attribute as + // the common group id. A node can have keys if it generates the keys by itself or it + // forwards keys from its input(s). + def visit(node: RelNode): Option[Seq[(String, String)]] = { + node match { + case c: StreamExecCalc => + val inputKeys = visit(node.getInput(0)) + // check if input has keys + if (inputKeys.isDefined) { + // track keys forward + val inNames = c.getInput.getRowType.getFieldNames + val inOutNames = c.getProgram.getNamedProjects.map(p => { + c.getProgram.expandLocalRef(p.left) match { + // output field is forwarded input field + case i: RexInputRef => (i.getIndex, p.right) + // output field is renamed input field + case a: RexCall if a.getKind.equals(SqlKind.AS) => + a.getOperands.get(0) match { + case ref: RexInputRef => + (ref.getIndex, p.right) + case _ => + (-1, p.right) + } + // output field is not forwarded from input + case _: RexNode => (-1, p.right) + } + }) + // filter all non-forwarded fields + .filter(_._1 >= 0) + // resolve names of input fields + .map(io => (inNames.get(io._1), io._2)) + + // filter by input keys + val inputKeysAndOutput = inOutNames + .filter(io => inputKeys.get.map(e => e._1).contains(io._1)) + + val inputKeysMap = inputKeys.get.toMap + val inOutGroups = inputKeysAndOutput.sorted.reverse + .map(e => (inputKeysMap(e._1), e._2)) + .toMap + + // get output keys + val outputKeys = inputKeysAndOutput + .map(io => (io._2, inOutGroups(inputKeysMap(io._1)))) + + // check if all keys have been preserved + if (outputKeys.map(_._2).distinct.length == inputKeys.get.map(_._2).distinct.length) { + // all key have been preserved (but possibly renamed) + Some(outputKeys) + } else { + // some (or all) keys have been removed. Keys are no longer unique and removed + None + } + } else { + None + } + + case _: StreamExecOverAggregate => + // keys are always forwarded by Over aggregate + visit(node.getInput(0)) + case a: StreamExecGroupAggregate => + // get grouping keys + val groupKeys = a.getRowType.getFieldNames.take(a.grouping.length) + Some(groupKeys.map(e => (e, e))) + + // TODO supports StreamExecGroupWindowAggregate + + case j: StreamExecJoin => + // get key(s) for join + val lInKeys = visit(j.getLeft) + val rInKeys = visit(j.getRight) + if (lInKeys.isEmpty || rInKeys.isEmpty) { + None + } else { + // Output of join must have keys if left and right both contain key(s). + // Key groups from both side will be merged by join equi-predicates + val lInNames: Seq[String] = j.getLeft.getRowType.getFieldNames + val rInNames: Seq[String] = j.getRight.getRowType.getFieldNames + val joinNames = j.getRowType.getFieldNames + + // if right field names equal to left field names, calcite will rename right + // field names. For example, T1(pk, a) join T2(pk, b), calcite will rename T2(pk, b) + // to T2(pk0, b). + val rInNamesToJoinNamesMap = rInNames + .zip(joinNames.subList(lInNames.size, joinNames.length)) + .toMap + + val lJoinKeys: Seq[String] = j.joinInfo.leftKeys + .map(lInNames.get(_)) + val rJoinKeys: Seq[String] = j.joinInfo.rightKeys + .map(rInNames.get(_)) + .map(rInNamesToJoinNamesMap(_)) + + val inKeys: Seq[(String, String)] = lInKeys.get ++ rInKeys.get + .map(e => (rInNamesToJoinNamesMap(e._1), rInNamesToJoinNamesMap(e._2))) + + getOutputKeysForNonWindowJoin( + joinNames, + inKeys, + lJoinKeys.zip(rJoinKeys) + ) + } + case _: StreamPhysicalRel => + // anything else does not forward keys, so we can stop + None + } + } + + /** + * Get output keys for non-window join according to it's inputs. + * + * @param inNames Field names of join + * @param inKeys Input keys of join + * @param joinKeys JoinKeys of join + * @return Return output keys of join + */ + def getOutputKeysForNonWindowJoin( + inNames: Seq[String], + inKeys: Seq[(String, String)], + joinKeys: Seq[(String, String)]) + : Option[Seq[(String, String)]] = { + + val nameToGroups = mutable.HashMap.empty[String, String] + + // merge two groups + def merge(nameA: String, nameB: String): Unit = { + val ga: String = findGroup(nameA) + val gb: String = findGroup(nameB) + if (!ga.equals(gb)) { + if (ga.compare(gb) < 0) { + nameToGroups += (gb -> ga) + } else { + nameToGroups += (ga -> gb) + } + } + } + + def findGroup(x: String): String = { + // find the group of x + var r: String = x + while (!nameToGroups(r).equals(r)) { + r = nameToGroups(r) + } + + // point all name to the group name directly + var a: String = x + var b: String = null + while (!nameToGroups(a).equals(r)) { + b = nameToGroups(a) + nameToGroups += (a -> r) + a = b + } + r + } + + // init groups + inNames.foreach(e => nameToGroups += (e -> e)) + inKeys.foreach(e => nameToGroups += (e._1 -> e._2)) + // merge groups + joinKeys.foreach(e => merge(e._1, e._2)) + // make sure all name point to the group name directly + inNames.foreach(findGroup) + + val outputGroups = inKeys.map(e => nameToGroups(e._1)).distinct + Some( + inNames + .filter(e => outputGroups.contains(nameToGroups(e))) + .map(e => (e, nameToGroups(e))) + ) + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala new file mode 100644 index 00000000000000..82b95901c72725 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} +import org.apache.flink.table.api.Table + +/** + * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete + * changes. + * + * @tparam T Type of records that this [[TableSink]] expects and supports. + */ +trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] { + + /** Emits the DataStream. */ + def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_] +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala new file mode 100644 index 00000000000000..3c4b5566600326 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} + +/** + * Defines an external stream table and provides write access to its data. + * + * @tparam T Type of the [[DataStream]] created by this [[TableSink]]. + */ +trait StreamTableSink[T] extends TableSink[T] { + + /** Emits the DataStream. */ + def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_] + +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java index f9987d1d95d9eb..0678e71867af7b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java @@ -65,4 +65,13 @@ public class TableConfigOptions { .withDescription("The buffer is to compress. The larger the buffer," + " the better the compression ratio, but the more memory consumption."); + // ------------------------------------------------------------------------ + // MiniBatch Options + // ------------------------------------------------------------------------ + + public static final ConfigOption SQL_EXEC_MINIBATCH_ALLOW_LATENCY = + key("sql.exec.mini-batch.allow-latency.ms") + .defaultValue(Long.MIN_VALUE) + .withDescription("MiniBatch allow latency(ms). Value > 0 means MiniBatch enabled."); + }