From a8a0c6cdba2d294e3714943c712a3ebfa74abd61 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 8 Jun 2022 14:19:58 +0800 Subject: [PATCH] [FLINK-24865][CEP] Support MATCH_RECOGNIZE in Batch mode This closes #18408 --- .../plan/nodes/exec/batch/BatchExecMatch.java | 57 ++ .../nodes/exec/common/CommonExecMatch.java | 432 ++++++++++++ .../nodes/exec/stream/StreamExecMatch.java | 387 +---------- .../physical/batch/BatchPhysicalMatch.java | 65 ++ .../physical/common/CommonPhysicalMatch.java | 101 +++ .../batch/BatchPhysicalMatchRule.java | 63 ++ .../common/CommonPhysicalMatchRule.java | 176 +++++ .../physical/stream/StreamPhysicalMatch.scala | 50 +- .../plan/rules/FlinkBatchRuleSets.scala | 3 + .../stream/StreamPhysicalMatchRule.scala | 132 +--- .../MatchRecognizeValidationTest.java | 354 ++++++++++ .../plan/batch/sql/MatchRecognizeTest.java | 87 +++ .../exec/operator/BatchOperatorNameTest.java | 20 + .../batch/sql/MatchRecognizeITCase.java | 645 ++++++++++++++++++ .../plan/batch/sql/MatchRecognizeTest.xml | 74 ++ .../exec/operator/BatchOperatorNameTest.xml | 108 +++ .../MatchRecognizeValidationTest.scala | 347 ---------- .../match/PatternTranslatorTestBase.scala | 6 +- 18 files changed, 2217 insertions(+), 890 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java create mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.xml delete mode 100644 flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java new file mode 100644 index 00000000000000..4f02be5f91c204 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch; +import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Collections; + +/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */ +public class BatchExecMatch extends CommonExecMatch + implements BatchExecNode, MultipleTransformationTranslator { + + public BatchExecMatch( + ReadableConfig tableConfig, + MatchSpec matchSpec, + InputProperty inputProperty, + RowType outputType, + String description) { + super( + ExecNodeContext.newNodeId(), + ExecNodeContext.newContext(BatchExecMatch.class), + ExecNodeContext.newPersistedConfig(BatchExecMatch.class, tableConfig), + matchSpec, + Collections.singletonList(inputProperty), + outputType, + description); + } + + @Override + public boolean isProcTime(RowType inputRowType) { + return true; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java new file mode 100644 index 00000000000000..d961b5a4c96725 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.common; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cep.EventComparator; +import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.operator.CepOperator; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.cep.pattern.Quantifier; +import org.apache.flink.cep.pattern.conditions.BooleanConditions; +import org.apache.flink.cep.pattern.conditions.IterativeCondition; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.codegen.CodeGenUtils; +import org.apache.flink.table.planner.codegen.CodeGeneratorContext; +import org.apache.flink.table.planner.codegen.MatchCodeGenerator; +import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec; +import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner; +import org.apache.flink.table.runtime.operators.match.RowDataEventComparator; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.MathUtils; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlMatchRecognize; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.tools.RelBuilder; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Common {@link ExecNode} which matches along with MATCH_RECOGNIZE. */ +public abstract class CommonExecMatch extends ExecNodeBase + implements ExecNode, MultipleTransformationTranslator { + + public static final String MATCH_TRANSFORMATION = "match"; + + private final MatchSpec matchSpec; + + public CommonExecMatch( + int id, + ExecNodeContext context, + ReadableConfig persistedConfig, + MatchSpec matchSpec, + List inputProperties, + LogicalType outputType, + String description) { + super(id, context, persistedConfig, inputProperties, outputType, description); + checkArgument(inputProperties.size() == 1); + this.matchSpec = checkNotNull(matchSpec); + } + + @SuppressWarnings("unchecked") + @Override + protected Transformation translateToPlanInternal( + PlannerBase planner, ExecNodeConfig config) { + final ExecEdge inputEdge = getInputEdges().get(0); + final Transformation inputTransform = + (Transformation) inputEdge.translateToPlan(planner); + final RowType inputRowType = (RowType) inputEdge.getOutputType(); + + checkOrderKeys(inputRowType); + final EventComparator eventComparator = + createEventComparator( + config, planner.getFlinkContext().getClassLoader(), inputRowType); + final Transformation timestampedInputTransform = + translateOrder(inputTransform, inputRowType, config); + + final Tuple2, List> cepPatternAndNames = + translatePattern( + matchSpec, + config, + planner.getFlinkContext().getClassLoader(), + planner.createRelBuilder(), + inputRowType); + final Pattern cepPattern = cepPatternAndNames.f0; + + // TODO remove this once it is supported in CEP library + if (NFACompiler.canProduceEmptyMatches(cepPattern)) { + throw new TableException( + "Patterns that can produce empty matches are not supported. There must be at least one non-optional state."); + } + + // TODO remove this once it is supported in CEP library + if (cepPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { + throw new TableException( + "Greedy quantifiers are not allowed as the last element of a Pattern yet. " + + "Finish your pattern with either a simple variable or reluctant quantifier."); + } + + if (matchSpec.isAllRows()) { + throw new TableException("All rows per match mode is not supported yet."); + } + + final int[] partitionKeys = matchSpec.getPartition().getFieldIndices(); + final InternalTypeInfo inputTypeInfo = + (InternalTypeInfo) inputTransform.getOutputType(); + final TypeSerializer inputSerializer = + inputTypeInfo.createSerializer(planner.getExecEnv().getConfig()); + final NFACompiler.NFAFactory nfaFactory = + NFACompiler.compileFactory(cepPattern, false); + final MatchCodeGenerator generator = + new MatchCodeGenerator( + new CodeGeneratorContext( + config, planner.getFlinkContext().getClassLoader()), + planner.createRelBuilder(), + false, // nullableInput + JavaScalaConversionUtil.toScala(cepPatternAndNames.f1), + JavaScalaConversionUtil.toScala(Optional.empty()), + CodeGenUtils.DEFAULT_COLLECTOR_TERM()); + generator.bindInput( + inputRowType, + CodeGenUtils.DEFAULT_INPUT1_TERM(), + JavaScalaConversionUtil.toScala(Optional.empty())); + final PatternProcessFunctionRunner patternProcessFunction = + generator.generateOneRowPerMatchExpression( + (RowType) getOutputType(), partitionKeys, matchSpec.getMeasures()); + final CepOperator operator = + new CepOperator<>( + inputSerializer, + isProcTime(inputRowType), + nfaFactory, + eventComparator, + cepPattern.getAfterMatchSkipStrategy(), + patternProcessFunction, + null); + final OneInputTransformation transform = + ExecNodeUtil.createOneInputTransformation( + timestampedInputTransform, + createTransformationMeta(MATCH_TRANSFORMATION, config), + operator, + InternalTypeInfo.of(getOutputType()), + timestampedInputTransform.getParallelism()); + final RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector( + planner.getFlinkContext().getClassLoader(), partitionKeys, inputTypeInfo); + transform.setStateKeySelector(selector); + transform.setStateKeyType(selector.getProducedType()); + + if (inputsContainSingleton()) { + transform.setParallelism(1); + transform.setMaxParallelism(1); + } + return transform; + } + + protected void checkOrderKeys(RowType inputRowType) {} + + private EventComparator createEventComparator( + ExecNodeConfig config, ClassLoader classLoader, RowType inputRowType) { + SortSpec orderKeys = matchSpec.getOrderKeys(); + if (orderKeys.getFieldIndices().length > 1) { + GeneratedRecordComparator rowComparator = + ComparatorCodeGenerator.gen( + config, classLoader, "RowDataComparator", inputRowType, orderKeys); + return new RowDataEventComparator(rowComparator); + } else { + return null; + } + } + + protected Transformation translateOrder( + Transformation inputTransform, RowType inputRowType, ReadableConfig config) { + return inputTransform; + } + + @VisibleForTesting + public static Tuple2, List> translatePattern( + MatchSpec matchSpec, + ReadableConfig config, + ClassLoader classLoader, + RelBuilder relBuilder, + RowType inputRowType) { + final PatternVisitor patternVisitor = + new PatternVisitor(config, classLoader, relBuilder, inputRowType, matchSpec); + + final Pattern cepPattern; + if (matchSpec.getInterval().isPresent()) { + Time interval = translateTimeBound(matchSpec.getInterval().get()); + cepPattern = matchSpec.getPattern().accept(patternVisitor).within(interval); + } else { + cepPattern = matchSpec.getPattern().accept(patternVisitor); + } + return new Tuple2<>(cepPattern, new ArrayList<>(patternVisitor.names)); + } + + private static Time translateTimeBound(RexNode interval) { + if (interval instanceof RexLiteral) { + final RexLiteral l = (RexLiteral) interval; + if (l.getTypeName().getFamily() == SqlTypeFamily.INTERVAL_DAY_TIME) { + return Time.milliseconds(l.getValueAs(Long.class)); + } + } + throw new TableException( + "Only constant intervals with millisecond resolution are supported as time constraints of patterns."); + } + + public abstract boolean isProcTime(RowType inputRowType); + + /** The visitor to traverse the pattern RexNode. */ + private static class PatternVisitor extends RexDefaultVisitor> { + private final ReadableConfig config; + private final ClassLoader classLoader; + private final RelBuilder relBuilder; + private final RowType inputRowType; + private final MatchSpec matchSpec; + private final LinkedHashSet names; + private Pattern pattern; + + public PatternVisitor( + ReadableConfig config, + ClassLoader classLoader, + RelBuilder relBuilder, + RowType inputRowType, + MatchSpec matchSpec) { + this.config = config; + this.classLoader = classLoader; + this.relBuilder = relBuilder; + this.inputRowType = inputRowType; + this.matchSpec = matchSpec; + this.names = new LinkedHashSet<>(); + } + + @Override + public Pattern visitLiteral(RexLiteral literal) { + String patternName = literal.getValueAs(String.class); + pattern = translateSingleVariable(pattern, patternName); + + RexNode patternDefinition = matchSpec.getPatternDefinitions().get(patternName); + if (patternDefinition != null) { + MatchCodeGenerator generator = + new MatchCodeGenerator( + new CodeGeneratorContext(config, classLoader), + relBuilder, + false, // nullableInput + JavaScalaConversionUtil.toScala(new ArrayList<>(names)), + JavaScalaConversionUtil.toScala(Optional.of(patternName)), + CodeGenUtils.DEFAULT_COLLECTOR_TERM()); + generator.bindInput( + inputRowType, + CodeGenUtils.DEFAULT_INPUT1_TERM(), + JavaScalaConversionUtil.toScala(Optional.empty())); + IterativeCondition condition = + generator.generateIterativeCondition(patternDefinition); + return pattern.where(condition); + } else { + return pattern.where(BooleanConditions.trueFunction()); + } + } + + @Override + public Pattern visitCall(RexCall call) { + SqlOperator operator = call.getOperator(); + if (operator == SqlStdOperatorTable.PATTERN_CONCAT) { + pattern = call.operands.get(0).accept(this); + pattern = call.operands.get(1).accept(this); + return pattern; + } else if (operator == SqlStdOperatorTable.PATTERN_QUANTIFIER) { + final RexLiteral name; + if (call.operands.get(0) instanceof RexLiteral) { + name = (RexLiteral) call.operands.get(0); + } else { + throw new TableException( + String.format( + "Expression not supported: %s Group patterns are not supported yet.", + call.operands.get(0))); + } + + pattern = name.accept(this); + int startNum = + MathUtils.checkedDownCast( + ((RexLiteral) call.operands.get(1)).getValueAs(Long.class)); + int endNum = + MathUtils.checkedDownCast( + ((RexLiteral) call.operands.get(2)).getValueAs(Long.class)); + boolean isGreedy = !((RexLiteral) call.operands.get(3)).getValueAs(Boolean.class); + + return applyQuantifier(pattern, startNum, endNum, isGreedy); + } else if (operator == SqlStdOperatorTable.PATTERN_ALTER) { + throw new TableException( + String.format( + "Expression not supported: %s. Currently, CEP doesn't support branching patterns.", + call)); + } else if (operator == SqlStdOperatorTable.PATTERN_PERMUTE) { + throw new TableException( + String.format( + "Expression not supported: %s. Currently, CEP doesn't support PERMUTE patterns.", + call)); + } else if (operator == SqlStdOperatorTable.PATTERN_EXCLUDE) { + throw new TableException( + String.format( + "Expression not supported: %s. Currently, CEP doesn't support '{-' '-}' patterns.", + call)); + } else { + throw new TableException("This should not happen."); + } + } + + @Override + public Pattern visitNode(RexNode rexNode) { + throw new TableException( + String.format("Unsupported expression within Pattern: [%s]", rexNode)); + } + + private Pattern translateSingleVariable( + Pattern previousPattern, String patternName) { + if (names.contains(patternName)) { + throw new TableException( + "Pattern variables must be unique. That might change in the future."); + } else { + names.add(patternName); + } + + if (previousPattern != null) { + return previousPattern.next(patternName); + } else { + return Pattern.begin(patternName, translateSkipStrategy()); + } + } + + private AfterMatchSkipStrategy translateSkipStrategy() { + switch (matchSpec.getAfter().getKind()) { + case LITERAL: + SqlMatchRecognize.AfterOption afterOption = + ((RexLiteral) matchSpec.getAfter()) + .getValueAs(SqlMatchRecognize.AfterOption.class); + switch (afterOption) { + case SKIP_PAST_LAST_ROW: + return AfterMatchSkipStrategy.skipPastLastEvent(); + case SKIP_TO_NEXT_ROW: + return AfterMatchSkipStrategy.skipToNext(); + default: + throw new TableException("This should not happen."); + } + case SKIP_TO_FIRST: + return AfterMatchSkipStrategy.skipToFirst(getPatternTarget()) + .throwExceptionOnMiss(); + case SKIP_TO_LAST: + return AfterMatchSkipStrategy.skipToLast(getPatternTarget()) + .throwExceptionOnMiss(); + default: + throw new TableException( + String.format( + "Corrupted query tree. Unexpected %s for after match strategy.", + matchSpec.getAfter())); + } + } + + private String getPatternTarget() { + return ((RexLiteral) ((RexCall) matchSpec.getAfter()).getOperands().get(0)) + .getValueAs(String.class); + } + + private Pattern applyQuantifier( + Pattern pattern, int startNum, int endNum, boolean greedy) { + boolean isOptional = startNum == 0 && endNum == 1; + + final Pattern newPattern; + if (startNum == 0 && endNum == -1) { // zero or more + newPattern = pattern.oneOrMore().optional().consecutive(); + } else if (startNum == 1 && endNum == -1) { // one or more + newPattern = pattern.oneOrMore().consecutive(); + } else if (isOptional) { // optional + newPattern = pattern.optional(); + } else if (endNum != -1) { // times + newPattern = pattern.times(startNum, endNum).consecutive(); + } else { // times or more + newPattern = pattern.timesOrMore(startNum).consecutive(); + } + + if (greedy && (isOptional || startNum == endNum)) { + return newPattern; + } else if (greedy) { + return newPattern.greedy(); + } else if (isOptional) { + throw new TableException("Reluctant optional variables are not supported yet."); + } else { + return newPattern; + } + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java index 6d1dbad480fb32..6bc018528d8266 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java @@ -19,74 +19,31 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; import org.apache.flink.FlinkVersion; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.cep.EventComparator; -import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy; -import org.apache.flink.cep.nfa.compiler.NFACompiler; -import org.apache.flink.cep.operator.CepOperator; -import org.apache.flink.cep.pattern.Pattern; -import org.apache.flink.cep.pattern.Quantifier; -import org.apache.flink.cep.pattern.conditions.BooleanConditions; -import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.api.transformations.OneInputTransformation; -import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.planner.codegen.CodeGenUtils; -import org.apache.flink.table.planner.codegen.CodeGeneratorContext; -import org.apache.flink.table.planner.codegen.MatchCodeGenerator; -import org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator; -import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch; import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec; import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; -import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; -import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor; -import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; -import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; -import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; -import org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner; -import org.apache.flink.table.runtime.operators.match.RowDataEventComparator; import org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.typeutils.TypeCheckUtils; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.MathUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.SqlMatchRecognize; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.type.SqlTypeFamily; -import org.apache.calcite.tools.RelBuilder; - -import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedHashSet; import java.util.List; -import java.util.Optional; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision; -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; /** Stream {@link ExecNode} which matches along with MATCH_RECOGNIZE. */ @ExecNodeMetadata( @@ -98,11 +55,10 @@ }, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) -public class StreamExecMatch extends ExecNodeBase +public class StreamExecMatch extends CommonExecMatch implements StreamExecNode, MultipleTransformationTranslator { public static final String TIMESTAMP_INSERTER_TRANSFORMATION = "timestamp-inserter"; - public static final String MATCH_TRANSFORMATION = "match"; public static final String FIELD_NAME_MATCH_SPEC = "matchSpec"; @@ -134,111 +90,12 @@ public StreamExecMatch( @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { - super(id, context, persistedConfig, inputProperties, outputType, description); - checkArgument(inputProperties.size() == 1); - this.matchSpec = checkNotNull(matchSpec); + super(id, context, persistedConfig, matchSpec, inputProperties, outputType, description); + this.matchSpec = matchSpec; } - @SuppressWarnings("unchecked") @Override - protected Transformation translateToPlanInternal( - PlannerBase planner, ExecNodeConfig config) { - final ExecEdge inputEdge = getInputEdges().get(0); - final Transformation inputTransform = - (Transformation) inputEdge.translateToPlan(planner); - final RowType inputRowType = (RowType) inputEdge.getOutputType(); - - checkOrderKeys(inputRowType); - final EventComparator eventComparator = - createEventComparator( - config, planner.getFlinkContext().getClassLoader(), inputRowType); - final Transformation timestampedInputTransform = - translateOrder(inputTransform, inputRowType, config); - - final Tuple2, List> cepPatternAndNames = - translatePattern( - matchSpec, - config, - planner.getFlinkContext().getClassLoader(), - planner.createRelBuilder(), - inputRowType); - final Pattern cepPattern = cepPatternAndNames.f0; - - // TODO remove this once it is supported in CEP library - if (NFACompiler.canProduceEmptyMatches(cepPattern)) { - throw new TableException( - "Patterns that can produce empty matches are not supported. There must be at least one non-optional state."); - } - - // TODO remove this once it is supported in CEP library - if (cepPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) { - throw new TableException( - "Greedy quantifiers are not allowed as the last element of a Pattern yet. " - + "Finish your pattern with either a simple variable or reluctant quantifier."); - } - - if (matchSpec.isAllRows()) { - throw new TableException("All rows per match mode is not supported yet."); - } - - final int[] partitionKeys = matchSpec.getPartition().getFieldIndices(); - final SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0); - final LogicalType timeOrderFieldType = - inputRowType.getTypeAt(timeOrderField.getFieldIndex()); - - final boolean isProctime = TypeCheckUtils.isProcTime(timeOrderFieldType); - final InternalTypeInfo inputTypeInfo = - (InternalTypeInfo) inputTransform.getOutputType(); - final TypeSerializer inputSerializer = - inputTypeInfo.createSerializer(planner.getExecEnv().getConfig()); - final NFACompiler.NFAFactory nfaFactory = - NFACompiler.compileFactory(cepPattern, false); - final MatchCodeGenerator generator = - new MatchCodeGenerator( - new CodeGeneratorContext( - config, planner.getFlinkContext().getClassLoader()), - planner.createRelBuilder(), - false, // nullableInput - JavaScalaConversionUtil.toScala(cepPatternAndNames.f1), - JavaScalaConversionUtil.toScala(Optional.empty()), - CodeGenUtils.DEFAULT_COLLECTOR_TERM()); - generator.bindInput( - inputRowType, - CodeGenUtils.DEFAULT_INPUT1_TERM(), - JavaScalaConversionUtil.toScala(Optional.empty())); - final PatternProcessFunctionRunner patternProcessFunction = - generator.generateOneRowPerMatchExpression( - (RowType) getOutputType(), partitionKeys, matchSpec.getMeasures()); - final CepOperator operator = - new CepOperator<>( - inputSerializer, - isProctime, - nfaFactory, - eventComparator, - cepPattern.getAfterMatchSkipStrategy(), - patternProcessFunction, - null); - final OneInputTransformation transform = - ExecNodeUtil.createOneInputTransformation( - timestampedInputTransform, - createTransformationMeta(MATCH_TRANSFORMATION, config), - operator, - InternalTypeInfo.of(getOutputType()), - timestampedInputTransform.getParallelism()); - final RowDataKeySelector selector = - KeySelectorUtil.getRowDataSelector( - planner.getFlinkContext().getClassLoader(), partitionKeys, inputTypeInfo); - transform.setStateKeySelector(selector); - transform.setStateKeyType(selector.getProducedType()); - - if (inputsContainSingleton()) { - transform.setParallelism(1); - transform.setMaxParallelism(1); - } - return transform; - } - - private void checkOrderKeys(RowType inputRowType) { + public void checkOrderKeys(RowType inputRowType) { SortSpec orderKeys = matchSpec.getOrderKeys(); if (orderKeys.getFieldSize() == 0) { throw new TableException("You must specify either rowtime or proctime for order by."); @@ -261,20 +118,8 @@ private void checkOrderKeys(RowType inputRowType) { } } - private EventComparator createEventComparator( - ExecNodeConfig config, ClassLoader classLoader, RowType inputRowType) { - SortSpec orderKeys = matchSpec.getOrderKeys(); - if (orderKeys.getFieldIndices().length > 1) { - GeneratedRecordComparator rowComparator = - ComparatorCodeGenerator.gen( - config, classLoader, "RowDataComparator", inputRowType, orderKeys); - return new RowDataEventComparator(rowComparator); - } else { - return null; - } - } - - private Transformation translateOrder( + @Override + public Transformation translateOrder( Transformation inputTransform, RowType inputRowType, ReadableConfig config) { SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0); int timeOrderFieldIdx = timeOrderField.getFieldIndex(); @@ -306,217 +151,11 @@ private Transformation translateOrder( } } - @VisibleForTesting - public static Tuple2, List> translatePattern( - MatchSpec matchSpec, - ReadableConfig config, - ClassLoader classLoader, - RelBuilder relBuilder, - RowType inputRowType) { - final PatternVisitor patternVisitor = - new PatternVisitor(config, classLoader, relBuilder, inputRowType, matchSpec); - - final Pattern cepPattern; - if (matchSpec.getInterval().isPresent()) { - Time interval = translateTimeBound(matchSpec.getInterval().get()); - cepPattern = matchSpec.getPattern().accept(patternVisitor).within(interval); - } else { - cepPattern = matchSpec.getPattern().accept(patternVisitor); - } - return new Tuple2<>(cepPattern, new ArrayList<>(patternVisitor.names)); - } - - private static Time translateTimeBound(RexNode interval) { - if (interval instanceof RexLiteral) { - final RexLiteral l = (RexLiteral) interval; - if (l.getTypeName().getFamily() == SqlTypeFamily.INTERVAL_DAY_TIME) { - return Time.milliseconds(l.getValueAs(Long.class)); - } - } - throw new TableException( - "Only constant intervals with millisecond resolution are supported as time constraints of patterns."); - } - - /** The visitor to traverse the pattern RexNode. */ - private static class PatternVisitor extends RexDefaultVisitor> { - private final ReadableConfig config; - private final ClassLoader classLoader; - private final RelBuilder relBuilder; - private final RowType inputRowType; - private final MatchSpec matchSpec; - private final LinkedHashSet names; - private Pattern pattern; - - public PatternVisitor( - ReadableConfig config, - ClassLoader classLoader, - RelBuilder relBuilder, - RowType inputRowType, - MatchSpec matchSpec) { - this.config = config; - this.classLoader = classLoader; - this.relBuilder = relBuilder; - this.inputRowType = inputRowType; - this.matchSpec = matchSpec; - this.names = new LinkedHashSet<>(); - } - - @Override - public Pattern visitLiteral(RexLiteral literal) { - String patternName = literal.getValueAs(String.class); - pattern = translateSingleVariable(pattern, patternName); - - RexNode patternDefinition = matchSpec.getPatternDefinitions().get(patternName); - if (patternDefinition != null) { - MatchCodeGenerator generator = - new MatchCodeGenerator( - new CodeGeneratorContext(config, classLoader), - relBuilder, - false, // nullableInput - JavaScalaConversionUtil.toScala(new ArrayList<>(names)), - JavaScalaConversionUtil.toScala(Optional.of(patternName)), - CodeGenUtils.DEFAULT_COLLECTOR_TERM()); - generator.bindInput( - inputRowType, - CodeGenUtils.DEFAULT_INPUT1_TERM(), - JavaScalaConversionUtil.toScala(Optional.empty())); - IterativeCondition condition = - generator.generateIterativeCondition(patternDefinition); - return pattern.where(condition); - } else { - return pattern.where(BooleanConditions.trueFunction()); - } - } - - @Override - public Pattern visitCall(RexCall call) { - SqlOperator operator = call.getOperator(); - if (operator == SqlStdOperatorTable.PATTERN_CONCAT) { - pattern = call.operands.get(0).accept(this); - pattern = call.operands.get(1).accept(this); - return pattern; - } else if (operator == SqlStdOperatorTable.PATTERN_QUANTIFIER) { - final RexLiteral name; - if (call.operands.get(0) instanceof RexLiteral) { - name = (RexLiteral) call.operands.get(0); - } else { - throw new TableException( - String.format( - "Expression not supported: %s Group patterns are not supported yet.", - call.operands.get(0))); - } - - pattern = name.accept(this); - int startNum = - MathUtils.checkedDownCast( - ((RexLiteral) call.operands.get(1)).getValueAs(Long.class)); - int endNum = - MathUtils.checkedDownCast( - ((RexLiteral) call.operands.get(2)).getValueAs(Long.class)); - boolean isGreedy = !((RexLiteral) call.operands.get(3)).getValueAs(Boolean.class); - - return applyQuantifier(pattern, startNum, endNum, isGreedy); - } else if (operator == SqlStdOperatorTable.PATTERN_ALTER) { - throw new TableException( - String.format( - "Expression not supported: %s. Currently, CEP doesn't support branching patterns.", - call)); - } else if (operator == SqlStdOperatorTable.PATTERN_PERMUTE) { - throw new TableException( - String.format( - "Expression not supported: %s. Currently, CEP doesn't support PERMUTE patterns.", - call)); - } else if (operator == SqlStdOperatorTable.PATTERN_EXCLUDE) { - throw new TableException( - String.format( - "Expression not supported: %s. Currently, CEP doesn't support '{-' '-}' patterns.", - call)); - } else { - throw new TableException("This should not happen."); - } - } - - @Override - public Pattern visitNode(RexNode rexNode) { - throw new TableException( - String.format("Unsupported expression within Pattern: [%s]", rexNode)); - } - - private Pattern translateSingleVariable( - Pattern previousPattern, String patternName) { - if (names.contains(patternName)) { - throw new TableException( - "Pattern variables must be unique. That might change in the future."); - } else { - names.add(patternName); - } - - if (previousPattern != null) { - return previousPattern.next(patternName); - } else { - return Pattern.begin(patternName, translateSkipStrategy()); - } - } - - private AfterMatchSkipStrategy translateSkipStrategy() { - switch (matchSpec.getAfter().getKind()) { - case LITERAL: - SqlMatchRecognize.AfterOption afterOption = - ((RexLiteral) matchSpec.getAfter()) - .getValueAs(SqlMatchRecognize.AfterOption.class); - switch (afterOption) { - case SKIP_PAST_LAST_ROW: - return AfterMatchSkipStrategy.skipPastLastEvent(); - case SKIP_TO_NEXT_ROW: - return AfterMatchSkipStrategy.skipToNext(); - default: - throw new TableException("This should not happen."); - } - case SKIP_TO_FIRST: - return AfterMatchSkipStrategy.skipToFirst(getPatternTarget()) - .throwExceptionOnMiss(); - case SKIP_TO_LAST: - return AfterMatchSkipStrategy.skipToLast(getPatternTarget()) - .throwExceptionOnMiss(); - default: - throw new TableException( - String.format( - "Corrupted query tree. Unexpected %s for after match strategy.", - matchSpec.getAfter())); - } - } - - private String getPatternTarget() { - return ((RexLiteral) ((RexCall) matchSpec.getAfter()).getOperands().get(0)) - .getValueAs(String.class); - } - - private Pattern applyQuantifier( - Pattern pattern, int startNum, int endNum, boolean greedy) { - boolean isOptional = startNum == 0 && endNum == 1; - - final Pattern newPattern; - if (startNum == 0 && endNum == -1) { // zero or more - newPattern = pattern.oneOrMore().optional().consecutive(); - } else if (startNum == 1 && endNum == -1) { // one or more - newPattern = pattern.oneOrMore().consecutive(); - } else if (isOptional) { // optional - newPattern = pattern.optional(); - } else if (endNum != -1) { // times - newPattern = pattern.times(startNum, endNum).consecutive(); - } else { // times or more - newPattern = pattern.timesOrMore(startNum).consecutive(); - } - - if (greedy && (isOptional || startNum == endNum)) { - return newPattern; - } else if (greedy) { - return newPattern.greedy(); - } else if (isOptional) { - throw new TableException("Reluctant optional variables are not supported yet."); - } else { - return newPattern; - } - } + @Override + public boolean isProcTime(RowType inputRowType) { + final SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0); + final LogicalType timeOrderFieldType = + inputRowType.getTypeAt(timeOrderField.getFieldIndex()); + return TypeCheckUtils.isProcTime(timeOrderFieldType); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java new file mode 100644 index 00000000000000..2f57564ef8a4e2 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalMatch.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.physical.batch; + +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.logical.MatchRecognize; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMatch; +import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalMatch; +import org.apache.flink.table.planner.plan.utils.MatchUtil; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; + +import java.util.List; + +import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig; + +/** Batch physical RelNode which matches along with MATCH_RECOGNIZE. */ +public class BatchPhysicalMatch extends CommonPhysicalMatch implements BatchPhysicalRel { + + public BatchPhysicalMatch( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode inputNode, + MatchRecognize logicalMatch, + RelDataType outputRowType) { + super(cluster, traitSet, inputNode, logicalMatch, outputRowType); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new BatchPhysicalMatch( + getCluster(), traitSet, inputs.get(0), getLogicalMatch(), deriveRowType()); + } + + @Override + public ExecNode translateToExecNode() { + return new BatchExecMatch( + unwrapTableConfig(this), + MatchUtil.createMatchSpec(getLogicalMatch()), + InputProperty.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType()), + getRelDetailedDescription()); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java new file mode 100644 index 00000000000000..fc95986df402fa --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalMatch.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.physical.common; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.planner.plan.logical.MatchRecognize; +import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel; +import org.apache.flink.table.planner.plan.utils.PythonUtil; +import org.apache.flink.table.planner.plan.utils.RelExplainUtil; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.type.RelDataType; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +/** Base physical RelNode which matches along with MATCH_RECOGNIZE. */ +public abstract class CommonPhysicalMatch extends SingleRel implements FlinkPhysicalRel { + + private final MatchRecognize logicalMatch; + private final RelDataType outputRowType; + + public CommonPhysicalMatch( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode inputNode, + MatchRecognize logicalMatch, + RelDataType outputRowType) { + super(cluster, traitSet, inputNode); + if (logicalMatch.measures().values().stream() + .anyMatch(m -> PythonUtil.containsPythonCall(m, null)) + || logicalMatch.patternDefinitions().values().stream() + .anyMatch(p -> PythonUtil.containsPythonCall(p, null))) { + throw new TableException("Python Function can not be used in MATCH_RECOGNIZE for now."); + } + this.logicalMatch = logicalMatch; + this.outputRowType = outputRowType; + } + + @Override + protected RelDataType deriveRowType() { + return outputRowType; + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + RelDataType inputRowType = getInput().getRowType(); + Seq fieldNames = + JavaConverters.asScalaBufferConverter(inputRowType.getFieldNames()).asScala(); + return super.explainTerms(pw) + .itemIf( + "partitionBy", + RelExplainUtil.fieldToString( + logicalMatch.partitionKeys().toArray(), inputRowType), + !logicalMatch.partitionKeys().isEmpty()) + .itemIf( + "orderBy", + RelExplainUtil.collationToString(logicalMatch.orderKeys(), inputRowType), + !logicalMatch.orderKeys().getFieldCollations().isEmpty()) + .itemIf( + "measures", + RelExplainUtil.measuresDefineToString( + logicalMatch.measures(), + fieldNames.toList(), + this::getExpressionString, + convertToExpressionDetail(pw.getDetailLevel())), + !logicalMatch.measures().isEmpty()) + .item("rowsPerMatch", RelExplainUtil.rowsPerMatchToString(logicalMatch.allRows())) + .item("after", RelExplainUtil.afterMatchToString(logicalMatch.after(), fieldNames)) + .item("pattern", logicalMatch.pattern().toString()) + .itemIf( + "subset", + RelExplainUtil.subsetToString(logicalMatch.subsets()), + !logicalMatch.subsets().isEmpty()) + .item("define", logicalMatch.patternDefinitions()); + } + + public MatchRecognize getLogicalMatch() { + return logicalMatch; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java new file mode 100644 index 00000000000000..84a74407e44612 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalMatchRule.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.batch; + +import org.apache.flink.table.planner.plan.logical.MatchRecognize; +import org.apache.flink.table.planner.plan.nodes.FlinkConventions; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalMatch; +import org.apache.flink.table.planner.plan.rules.physical.common.CommonPhysicalMatchRule; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; + +/** + * The physical rule is responsible for convert {@link FlinkLogicalMatch} to {@link + * BatchPhysicalMatch}. + */ +public class BatchPhysicalMatchRule extends CommonPhysicalMatchRule { + + public static final RelOptRule INSTANCE = new BatchPhysicalMatchRule(); + + private BatchPhysicalMatchRule() { + super( + FlinkLogicalMatch.class, + FlinkConventions.LOGICAL(), + FlinkConventions.BATCH_PHYSICAL(), + "BatchPhysicalMatchRule"); + } + + @Override + public RelNode convert(RelNode rel) { + return super.convert(rel, FlinkConventions.BATCH_PHYSICAL()); + } + + @Override + protected RelNode convertToPhysicalMatch( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode convertInput, + MatchRecognize matchRecognize, + RelDataType rowType) { + return new BatchPhysicalMatch(cluster, traitSet, convertInput, matchRecognize, rowType); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java new file mode 100644 index 00000000000000..397476b0ebb1ef --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.common; + +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.plan.logical.MatchRecognize; +import org.apache.flink.table.planner.plan.nodes.FlinkConvention; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch; +import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; +import org.apache.flink.table.planner.plan.utils.MatchUtil.AggregationPatternVariableFinder; +import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTrait; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.util.ImmutableBitSet; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * The physical rule is responsible for converting {@link FlinkLogicalMatch} to physical Match rel. + */ +public abstract class CommonPhysicalMatchRule extends ConverterRule { + + public CommonPhysicalMatchRule( + Class clazz, RelTrait in, RelTrait out, String descriptionPrefix) { + super(clazz, in, out, descriptionPrefix); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalMatch logicalMatch = call.rel(0); + + validateAggregations(logicalMatch.getMeasures().values()); + validateAggregations(logicalMatch.getPatternDefinitions().values()); + // This check might be obsolete once CALCITE-2747 is resolved + validateAmbiguousColumns(logicalMatch); + return true; + } + + public RelNode convert(RelNode rel, FlinkConvention convention) { + FlinkLogicalMatch logicalMatch = (FlinkLogicalMatch) rel; + RelTraitSet traitSet = rel.getTraitSet().replace(convention); + ImmutableBitSet partitionKeys = logicalMatch.getPartitionKeys(); + + FlinkRelDistribution requiredDistribution = + partitionKeys.isEmpty() + ? FlinkRelDistribution.SINGLETON() + : FlinkRelDistribution.hash(logicalMatch.getPartitionKeys().asList(), true); + RelTraitSet requiredTraitSet = + rel.getCluster() + .getPlanner() + .emptyTraitSet() + .replace(requiredDistribution) + .replace(convention); + + RelNode convertInput = RelOptRule.convert(logicalMatch.getInput(), requiredTraitSet); + + try { + Class.forName( + "org.apache.flink.cep.pattern.Pattern", + false, + Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + throw new TableException( + "MATCH RECOGNIZE clause requires flink-cep dependency to be present on the classpath.", + e); + } + return convertToPhysicalMatch( + rel.getCluster(), + traitSet, + convertInput, + new MatchRecognize( + logicalMatch.getPattern(), + logicalMatch.getPatternDefinitions(), + logicalMatch.getMeasures(), + logicalMatch.getAfter(), + logicalMatch.getSubsets(), + logicalMatch.isAllRows(), + logicalMatch.getPartitionKeys(), + logicalMatch.getOrderKeys(), + logicalMatch.getInterval()), + logicalMatch.getRowType()); + } + + protected abstract RelNode convertToPhysicalMatch( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode convertInput, + MatchRecognize matchRecognize, + RelDataType rowType); + + private void validateAggregations(Iterable expr) { + AggregationsValidator validator = new AggregationsValidator(); + expr.forEach(e -> e.accept(validator)); + } + + private void validateAmbiguousColumns(FlinkLogicalMatch logicalMatch) { + if (logicalMatch.isAllRows()) { + throw new TableException("All rows per match mode is not supported yet."); + } else { + validateAmbiguousColumnsOnRowPerMatch( + logicalMatch.getPartitionKeys(), + logicalMatch.getMeasures().keySet(), + logicalMatch.getInput().getRowType(), + logicalMatch.getRowType()); + } + } + + private void validateAmbiguousColumnsOnRowPerMatch( + ImmutableBitSet partitionKeys, + Set measuresNames, + RelDataType inputSchema, + RelDataType expectedSchema) { + int actualSize = partitionKeys.toArray().length + measuresNames.size(); + int expectedSize = expectedSchema.getFieldCount(); + if (actualSize != expectedSize) { + // try to find ambiguous column + + String ambiguousColumns = + Arrays.stream(partitionKeys.toArray()) + .mapToObj(k -> inputSchema.getFieldList().get(k).getName()) + .filter(measuresNames::contains) + .collect(Collectors.joining(", ", "{", "}")); + + throw new ValidationException( + String.format("Columns ambiguously defined: %s", ambiguousColumns)); + } + } + + private static class AggregationsValidator extends RexDefaultVisitor { + + @Override + public Object visitCall(RexCall call) { + SqlOperator operator = call.getOperator(); + if (operator instanceof SqlAggFunction) { + call.accept(new AggregationPatternVariableFinder()); + } else { + call.getOperands().forEach(o -> o.accept(this)); + } + return null; + } + + @Override + public Object visitNode(RexNode rexNode) { + return null; + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala index b97d0c17b61694..5a873fb1af694b 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMatch.scala @@ -17,14 +17,12 @@ */ package org.apache.flink.table.planner.plan.nodes.physical.stream -import org.apache.flink.table.api.TableException import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.logical.MatchRecognize import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch +import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalMatch import org.apache.flink.table.planner.plan.utils.MatchUtil -import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall -import org.apache.flink.table.planner.plan.utils.RelExplainUtil._ import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig import _root_.java.util @@ -38,59 +36,21 @@ class StreamPhysicalMatch( cluster: RelOptCluster, traitSet: RelTraitSet, inputNode: RelNode, - val logicalMatch: MatchRecognize, + logicalMatch: MatchRecognize, outputRowType: RelDataType) - extends SingleRel(cluster, traitSet, inputNode) + extends CommonPhysicalMatch(cluster, traitSet, inputNode, logicalMatch, outputRowType) with StreamPhysicalRel { - if ( - logicalMatch.measures.values().exists(containsPythonCall(_)) || - logicalMatch.patternDefinitions.values().exists(containsPythonCall(_)) - ) { - throw new TableException("Python Function can not be used in MATCH_RECOGNIZE for now.") - } - override def requireWatermark: Boolean = { - val rowtimeFields = getInput.getRowType.getFieldList + val rowTimeFields = getInput.getRowType.getFieldList .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) - rowtimeFields.nonEmpty + rowTimeFields.nonEmpty } - override def deriveRowType(): RelDataType = outputRowType - override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { new StreamPhysicalMatch(cluster, traitSet, inputs.get(0), logicalMatch, outputRowType) } - override def explainTerms(pw: RelWriter): RelWriter = { - val inputRowType = getInput.getRowType - val fieldNames = inputRowType.getFieldNames.toList - super - .explainTerms(pw) - .itemIf( - "partitionBy", - fieldToString(logicalMatch.partitionKeys.toArray, inputRowType), - !logicalMatch.partitionKeys.isEmpty) - .itemIf( - "orderBy", - collationToString(logicalMatch.orderKeys, inputRowType), - !logicalMatch.orderKeys.getFieldCollations.isEmpty) - .itemIf( - "measures", - measuresDefineToString( - logicalMatch.measures, - fieldNames, - getExpressionString, - convertToExpressionDetail(pw.getDetailLevel)), - !logicalMatch.measures.isEmpty - ) - .item("rowsPerMatch", rowsPerMatchToString(logicalMatch.allRows)) - .item("after", afterMatchToString(logicalMatch.after, fieldNames)) - .item("pattern", logicalMatch.pattern.toString) - .itemIf("subset", subsetToString(logicalMatch.subsets), !logicalMatch.subsets.isEmpty) - .item("define", logicalMatch.patternDefinitions) - } - override def translateToExecNode(): ExecNode[_] = { new StreamExecMatch( unwrapTableConfig(this), diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index 22b30f63e4c8a1..30bc66961549fe 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -321,6 +321,7 @@ object FlinkBatchRuleSets { FlinkLogicalRank.CONVERTER, FlinkLogicalWindowAggregate.CONVERTER, FlinkLogicalSnapshot.CONVERTER, + FlinkLogicalMatch.CONVERTER, FlinkLogicalSink.CONVERTER, FlinkLogicalLegacySink.CONVERTER, FlinkLogicalDistribution.BATCH_CONVERTER @@ -411,6 +412,8 @@ object FlinkBatchRuleSets { BatchPhysicalSingleRowJoinRule.INSTANCE, BatchPhysicalLookupJoinRule.SNAPSHOT_ON_TABLESCAN, BatchPhysicalLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN, + // CEP + BatchPhysicalMatchRule.INSTANCE, // correlate BatchPhysicalConstantTableFunctionScanRule.INSTANCE, BatchPhysicalCorrelateRule.INSTANCE, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala index 3142fb22bef78e..baecd116bd9f18 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalMatchRule.scala @@ -17,145 +17,35 @@ */ package org.apache.flink.table.planner.plan.rules.physical.stream -import org.apache.flink.table.api.{TableException, ValidationException} -import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution import org.apache.flink.table.planner.plan.logical.MatchRecognize import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMatch -import org.apache.flink.table.planner.plan.utils.{MatchUtil, RexDefaultVisitor} +import org.apache.flink.table.planner.plan.rules.physical.common.CommonPhysicalMatchRule -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.plan.{RelOptCluster, RelOptRule, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.convert.ConverterRule -import org.apache.calcite.rex.{RexCall, RexNode} -import org.apache.calcite.sql.SqlAggFunction -import org.apache.calcite.util.ImmutableBitSet - -import scala.collection.JavaConverters._ -import scala.collection.mutable class StreamPhysicalMatchRule - extends ConverterRule( + extends CommonPhysicalMatchRule( classOf[FlinkLogicalMatch], FlinkConventions.LOGICAL, FlinkConventions.STREAM_PHYSICAL, "StreamPhysicalMatchRule") { - override def matches(call: RelOptRuleCall): Boolean = { - val logicalMatch: FlinkLogicalMatch = call.rel(0) - - validateAggregations(logicalMatch.getMeasures.values().asScala) - validateAggregations(logicalMatch.getPatternDefinitions.values().asScala) - // This check might be obsolete once CALCITE-2747 is resolved - validateAmbiguousColumns(logicalMatch) - true - } - override def convert(rel: RelNode): RelNode = { - val logicalMatch: FlinkLogicalMatch = rel.asInstanceOf[FlinkLogicalMatch] - val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) - val partitionKeys = logicalMatch.getPartitionKeys - - val requiredDistribution = if (!partitionKeys.isEmpty) { - FlinkRelDistribution.hash(logicalMatch.getPartitionKeys.asList()) - } else { - FlinkRelDistribution.SINGLETON - } - val requiredTraitSet = rel.getCluster.getPlanner - .emptyTraitSet() - .replace(requiredDistribution) - .replace(FlinkConventions.STREAM_PHYSICAL) - - val convertInput: RelNode = - RelOptRule.convert(logicalMatch.getInput, requiredTraitSet) - - try { - Class - .forName( - "org.apache.flink.cep.pattern.Pattern", - false, - Thread.currentThread().getContextClassLoader) - } catch { - case ex: ClassNotFoundException => - throw new TableException( - "MATCH RECOGNIZE clause requires flink-cep dependency to be present on the classpath.", - ex) - } - - new StreamPhysicalMatch( - rel.getCluster, - traitSet, - convertInput, - MatchRecognize( - logicalMatch.getPattern, - logicalMatch.getPatternDefinitions, - logicalMatch.getMeasures, - logicalMatch.getAfter, - logicalMatch.getSubsets, - logicalMatch.isAllRows, - logicalMatch.getPartitionKeys, - logicalMatch.getOrderKeys, - logicalMatch.getInterval - ), - logicalMatch.getRowType) - } - - private def validateAggregations(expr: Iterable[RexNode]): Unit = { - val validator = new AggregationsValidator - expr.foreach(_.accept(validator)) + super.convert(rel, FlinkConventions.STREAM_PHYSICAL) } - private def validateAmbiguousColumns(logicalMatch: FlinkLogicalMatch): Unit = { - if (logicalMatch.isAllRows) { - throw new TableException("All rows per match mode is not supported yet.") - } else { - validateAmbiguousColumnsOnRowPerMatch( - logicalMatch.getPartitionKeys, - logicalMatch.getMeasures.keySet().asScala, - logicalMatch.getInput.getRowType, - logicalMatch.getRowType) - } + override def convertToPhysicalMatch( + cluster: RelOptCluster, + traitSet: RelTraitSet, + convertInput: RelNode, + matchRecognize: MatchRecognize, + rowType: RelDataType): RelNode = { + new StreamPhysicalMatch(cluster, traitSet, convertInput, matchRecognize, rowType) } - - private def validateAmbiguousColumnsOnRowPerMatch( - partitionKeys: ImmutableBitSet, - measuresNames: mutable.Set[String], - inputSchema: RelDataType, - expectedSchema: RelDataType): Unit = { - val actualSize = partitionKeys.toArray.length + measuresNames.size - val expectedSize = expectedSchema.getFieldCount - if (actualSize != expectedSize) { - // try to find ambiguous column - - val ambiguousColumns = partitionKeys.toArray - .map(inputSchema.getFieldList.get(_).getName) - .filter(measuresNames.contains) - .mkString("{", ", ", "}") - - throw new ValidationException(s"Columns ambiguously defined: $ambiguousColumns") - } - } - - private class AggregationsValidator extends RexDefaultVisitor[Object] { - - override def visitCall(call: RexCall): AnyRef = { - call.getOperator match { - case _: SqlAggFunction => - call.accept(new MatchUtil.AggregationPatternVariableFinder) - case _ => - call.getOperands.asScala.foreach(_.accept(this)) - } - - null - } - - override def visitNode(rexNode: RexNode): AnyRef = { - null - } - } - } object StreamPhysicalMatchRule { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java new file mode 100644 index 00000000000000..cb40481db8d467 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/batch/sql/validation/MatchRecognizeValidationTest.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.batch.sql.validation; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg; +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.table.planner.utils.TableTestUtil; + +import org.apache.calcite.sql.SqlMatchRecognize; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + +/** Validation test for {@link SqlMatchRecognize}. */ +@RunWith(Parameterized.class) +public class MatchRecognizeValidationTest extends TableTestBase { + + private static final String STREAM = "stream"; + private static final String BATCH = "batch"; + + @Parameterized.Parameter public String mode; + + @Parameterized.Parameters(name = "mode = {0}") + public static Collection parameters() { + return Arrays.asList(STREAM, BATCH); + } + + @Rule public ExpectedException expectedException = ExpectedException.none(); + + private TableTestUtil util; + private TableEnvironment tEnv; + + @Before + public void setup() { + util = + STREAM.equals(mode) + ? streamTestUtil(TableConfig.getDefault()) + : batchTestUtil(TableConfig.getDefault()); + tEnv = util.getTableEnv(); + tEnv.executeSql( + "CREATE TABLE Ticker (\n" + + " `symbol` VARCHAR,\n" + + " `price` INT,\n" + + " `tax` INT,\n" + + " `proctime` as PROCTIME()\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true'\n" + + ")"); + tEnv.executeSql( + "CREATE TABLE MyTable (\n" + + " a BIGINT,\n" + + " b INT,\n" + + " proctime as PROCTIME()\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true'\n" + + ")"); + } + + @After + public void after() { + util.getTableEnv().executeSql("DROP TABLE Ticker"); + util.getTableEnv().executeSql("DROP TABLE MyTable"); + } + + /** Function 'MATCH_ROWTIME()' can only be used in MATCH_RECOGNIZE. */ + @Test(expected = ValidationException.class) + public void testMatchRowTimeInSelect() { + String sql = "SELECT MATCH_ROWTIME() FROM MyTable"; + util.verifyExplain(sql); + } + + /** Function 'MATCH_PROCTIME()' can only be used in MATCH_RECOGNIZE. */ + @Test(expected = ValidationException.class) + public void testMatchProcTimeInSelect() { + String sql = "SELECT MATCH_PROCTIME() FROM MyTable"; + util.verifyExplain(sql); + } + + @Test + public void testSortProcessingTimeDesc() { + if (STREAM.equals(mode)) { + expectedException.expect(TableException.class); + expectedException.expectMessage( + "Primary sort order of a streaming table must be ascending on time."); + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime DESC\n" + + " MEASURES\n" + + " A.symbol AS aSymbol\n" + + " PATTERN (A B)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } + } + + @Test + public void testSortProcessingTimeSecondaryField() { + if (STREAM.equals(mode)) { + expectedException.expect(TableException.class); + expectedException.expectMessage( + "You must specify either rowtime or proctime for order by as the first one."); + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY price, proctime\n" + + " MEASURES\n" + + " A.symbol AS aSymbol\n" + + " PATTERN (A B)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } + } + + @Test + public void testSortNoOrder() { + if (STREAM.equals(mode)) { + expectedException.expect(TableException.class); + expectedException.expectMessage( + "You must specify either rowtime or proctime for order by."); + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " MEASURES\n" + + " A.symbol AS aSymbol\n" + + " PATTERN (A B)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } + } + + @Test + public void testUpdatesInUpstreamOperatorNotSupported() { + if (STREAM.equals(mode)) { + expectedException.expect(TableException.class); + expectedException.expectMessage( + "Match Recognize doesn't support consuming update changes which is produced by node GroupAggregate("); + String sqlQuery = + "SELECT *\n" + + "FROM (SELECT DISTINCT * FROM Ticker)\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " A.symbol AS aSymbol\n" + + " ONE ROW PER MATCH" + + " PATTERN (A B)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } + } + + @Test + public void testAggregatesOnMultiplePatternVariablesNotSupported() { + expectedException.expect(ValidationException.class); + expectedException.expectMessage("SQL validation failed."); + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " SUM(A.price + B.tax) AS taxedPrice\n" + + " PATTERN (A B)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } + + @Test + public void testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs() { + expectedException.expect(ValidationException.class); + expectedException.expectMessage("Aggregation must be applied to a single pattern variable"); + util.addTemporarySystemFunction("weightedAvg", new WeightedAvg()); + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " weightedAvg(A.price, B.tax) AS weightedAvg\n" + + " PATTERN (A B)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } + + @Test + public void testValidatingAmbiguousColumns() { + expectedException.expect(ValidationException.class); + expectedException.expectMessage("Columns ambiguously defined: {symbol, price}"); + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " PARTITION BY symbol, price\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " A.symbol AS symbol,\n" + + " A.price AS price\n" + + " PATTERN (A)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } + + // *************************************************************************************** + // * Those validations are temporary. We should remove those tests once we support those * + // * features. * + // *************************************************************************************** + + /** Python Function can not be used in MATCH_RECOGNIZE for now. */ + @Test + public void testMatchPythonFunction() { + expectedException.expect(TableException.class); + expectedException.expectMessage( + "Python Function can not be used in MATCH_RECOGNIZE for now."); + util.addTemporarySystemFunction("pyFunc", new PythonScalarFunction("pyFunc")); + String sql = + "SELECT T.aa as ta\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " A.a as aa,\n" + + " pyFunc(1,2) as bb\n" + + " PATTERN (A B)\n" + + " DEFINE\n" + + " A AS a = 1,\n" + + " B AS b = 'b'\n" + + ") AS T"; + util.verifyExplain(sql); + } + + @Test + public void testAllRowsPerMatch() { + expectedException.expect(TableException.class); + expectedException.expectMessage("All rows per match mode is not supported yet."); + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " A.symbol AS aSymbol\n" + + " ALL ROWS PER MATCH\n" + + " PATTERN (A B)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } + + @Test + public void testGreedyQuantifierAtTheEndIsNotSupported() { + expectedException.expect(TableException.class); + expectedException.expectMessage( + "Greedy quantifiers are not allowed as the last element of a " + + "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier."); + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " A.symbol AS aSymbol\n" + + " PATTERN (A B+)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } + + @Test + public void testPatternsProducingEmptyMatchesAreNotSupported() { + expectedException.expect(TableException.class); + expectedException.expectMessage( + "Patterns that can produce empty matches are not supported. " + + "There must be at least one non-optional state."); + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " A.symbol AS aSymbol\n" + + " PATTERN (A*)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } + + @Test + public void testDistinctAggregationsNotSupported() { + expectedException.expect(ValidationException.class); + expectedException.expectMessage("SQL validation failed."); + String sqlQuery = + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " COUNT(DISTINCT A.price) AS price\n" + + " PATTERN (A B)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ") AS T"; + tEnv.executeSql(sqlQuery); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.java new file mode 100644 index 00000000000000..eca0bb0cfcfb2d --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * imitations under the License. + */ + +package org.apache.flink.table.planner.plan.batch.sql; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.apache.calcite.sql.SqlMatchRecognize; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** Tests for {@link SqlMatchRecognize}. */ +public class MatchRecognizeTest extends TableTestBase { + + private BatchTableTestUtil util; + + @Before + public void before() { + util = batchTestUtil(TableConfig.getDefault()); + util.getTableEnv() + .executeSql( + "CREATE TABLE Ticker (\n" + + " `symbol` VARCHAR,\n" + + " `price` INT,\n" + + " `tax` INT,\n" + + " `ts_ltz` as PROCTIME()\n" + + ") with (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true'\n" + + ")"); + } + + @After + public void after() { + util.getTableEnv().executeSql("DROP TABLE Ticker"); + } + + @Test + public void testCascadeMatch() { + String sqlQuery = + "SELECT *\n" + + "FROM (\n" + + " SELECT\n" + + " symbol,\n" + + " price\n" + + " FROM Ticker\n" + + " MATCH_RECOGNIZE (\n" + + " PARTITION BY symbol\n" + + " ORDER BY ts_ltz" + + " MEASURES\n" + + " A.price as price,\n" + + " A.tax as tax\n" + + " ONE ROW PER MATCH\n" + + " PATTERN (A)\n" + + " DEFINE\n" + + " A AS A.price > 0\n" + + " ) AS T\n" + + " GROUP BY symbol, price\n" + + ")\n" + + "MATCH_RECOGNIZE (\n" + + " PARTITION BY symbol\n" + + " MEASURES\n" + + " A.price as dPrice\n" + + " PATTERN (A)\n" + + " DEFINE\n" + + " A AS A.symbol = 'a'\n" + + ")"; + util.verifyExecPlan(sqlQuery); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java index a336a42a2f630d..80c8baa66e0d2a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.java @@ -152,4 +152,24 @@ public void testLegacySourceSink() { util.testingTableEnv().registerTableSinkInternal("MySink", sink); verifyInsert("insert into MySink select * from MySource"); } + + @Test + public void testMatch() { + createSourceWithTimeAttribute(); + String sql = + "SELECT T.aid, T.bid, T.cid\n" + + " FROM MyTable MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " `A\"`.a AS aid,\n" + + " \u006C.a AS bid,\n" + + " C.a AS cid\n" + + " PATTERN (`A\"` \u006C C)\n" + + " DEFINE\n" + + " `A\"` AS a = 1,\n" + + " \u006C AS b = 2,\n" + + " C AS c = 'c'\n" + + " ) AS T"; + verifyQuery(sql); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java new file mode 100644 index 00000000000000..31860779478bdb --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java @@ -0,0 +1,645 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * imitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.apache.calcite.sql.SqlMatchRecognize; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import static org.apache.flink.api.common.typeinfo.Types.DOUBLE; +import static org.apache.flink.api.common.typeinfo.Types.INT; +import static org.apache.flink.api.common.typeinfo.Types.LONG; +import static org.apache.flink.api.common.typeinfo.Types.ROW_NAMED; +import static org.apache.flink.api.common.typeinfo.Types.STRING; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** IT Case for testing {@link SqlMatchRecognize}. */ +public class MatchRecognizeITCase { + + private StreamExecutionEnvironment env; + private StreamTableEnvironment tEnv; + + @Before + public void setup() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + } + + @Test + public void testSimplePattern() { + tEnv.createTemporaryView( + "MyTable", + tEnv.fromDataStream( + env.fromElements( + Row.of(1, "a"), + Row.of(2, "z"), + Row.of(3, "b"), + Row.of(4, "c"), + Row.of(5, "d"), + Row.of(6, "a"), + Row.of(7, "b"), + Row.of(8, "c"), + Row.of(9, "h")) + .returns(ROW_NAMED(new String[] {"id", "name"}, INT, STRING)), + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .columnByExpression("proctime", "PROCTIME()") + .build())); + TableResult tableResult = + tEnv.executeSql( + "SELECT T.aid, T.bid, T.cid\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " `A\"`.id AS aid,\n" + + " \u006C.id AS bid,\n" + + " C.id AS cid\n" + + " PATTERN (`A\"` \u006C C)\n" + + " DEFINE\n" + + " `A\"` AS name = 'a',\n" + + " \u006C AS name = 'b',\n" + + " C AS name = 'c'\n" + + ") AS T"); + assertEquals( + Collections.singletonList(Row.of(6, 7, 8)), + CollectionUtil.iteratorToList(tableResult.collect())); + } + + @Test + public void testSimplePatternWithNulls() { + tEnv.createTemporaryView( + "MyTable", + tEnv.fromDataStream( + env.fromElements( + Row.of(1, "a", null), + Row.of(2, "b", null), + Row.of(3, "c", null), + Row.of(4, "d", null), + Row.of(5, null, null), + Row.of(6, "a", null), + Row.of(7, "b", null), + Row.of(8, "c", null), + Row.of(9, null, null)) + .returns( + ROW_NAMED( + new String[] {"id", "name", "nullField"}, + INT, + STRING, + STRING)), + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("nullField", DataTypes.STRING()) + .columnByExpression("proctime", "PROCTIME()") + .build())); + TableResult tableResult = + tEnv.executeSql( + "SELECT T.aid, T.bNull, T.cid, T.aNull\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " A.id AS aid,\n" + + " A.nullField AS aNull,\n" + + " LAST(B.nullField) AS bNull,\n" + + " C.id AS cid\n" + + " PATTERN (A B C)\n" + + " DEFINE\n" + + " A AS name = 'a' AND nullField IS NULL,\n" + + " B AS name = 'b' AND LAST(A.nullField) IS NULL,\n" + + " C AS name = 'c'\n" + + ") AS T"); + assertEquals( + Arrays.asList(Row.of(1, null, 3, null), Row.of(6, null, 8, null)), + CollectionUtil.iteratorToList(tableResult.collect())); + } + + @Test + public void testCodeSplitsAreProperlyGenerated() { + tEnv.getConfig().setMaxGeneratedCodeLength(1); + tEnv.createTemporaryView( + "MyTable", + tEnv.fromDataStream( + env.fromElements( + Row.of(1, "a", "key1", "second_key3"), + Row.of(2, "b", "key1", "second_key3"), + Row.of(3, "c", "key1", "second_key3"), + Row.of(4, "d", "key", "second_key"), + Row.of(5, "e", "key", "second_key"), + Row.of(6, "a", "key2", "second_key4"), + Row.of(7, "b", "key2", "second_key4"), + Row.of(8, "c", "key2", "second_key4"), + Row.of(9, "f", "key", "second_key")) + .returns( + ROW_NAMED( + new String[] {"id", "name", "key1", "key2"}, + INT, + STRING, + STRING, + STRING)), + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("key1", DataTypes.STRING()) + .column("key2", DataTypes.STRING()) + .columnByExpression("proctime", "PROCTIME()") + .build())); + TableResult tableResult = + tEnv.executeSql( + "SELECT *\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " PARTITION BY key1, key2\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " A.id AS aid,\n" + + " A.key1 AS akey1,\n" + + " LAST(B.id) AS bid,\n" + + " C.id AS cid,\n" + + " C.key2 AS ckey2\n" + + " PATTERN (A B C)\n" + + " DEFINE\n" + + " A AS name = 'a' AND key1 LIKE '%key%' AND id > 0,\n" + + " B AS name = 'b' AND LAST(A.name, 2) IS NULL,\n" + + " C AS name = 'c' AND LAST(A.name) = 'a'\n" + + ") AS T"); + List actual = CollectionUtil.iteratorToList(tableResult.collect()); + actual.sort(Comparator.comparing(o -> String.valueOf(o.getField(0)))); + assertEquals( + Arrays.asList( + Row.of("key1", "second_key3", 1, "key1", 2, 3, "second_key3"), + Row.of("key2", "second_key4", 6, "key2", 7, 8, "second_key4")), + actual); + } + + @Test + public void testLogicalOffsets() { + tEnv.createTemporaryView( + "Ticker", + tEnv.fromDataStream( + env.fromElements( + Row.of("ACME", 1L, 19, 1), + Row.of("ACME", 2L, 17, 2), + Row.of("ACME", 3L, 13, 3), + Row.of("ACME", 4L, 20, 4), + Row.of("ACME", 5L, 20, 5), + Row.of("ACME", 6L, 26, 6), + Row.of("ACME", 7L, 20, 7), + Row.of("ACME", 8L, 25, 8)) + .returns( + ROW_NAMED( + new String[] {"symbol", "tstamp", "price", "tax"}, + STRING, + LONG, + INT, + INT)), + Schema.newBuilder() + .column("symbol", DataTypes.STRING()) + .column("tstamp", DataTypes.BIGINT()) + .column("price", DataTypes.INT()) + .column("tax", DataTypes.INT()) + .columnByExpression("proctime", "PROCTIME()") + .build())); + TableResult tableResult = + tEnv.executeSql( + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " FIRST(DOWN.tstamp) AS start_tstamp,\n" + + " LAST(DOWN.tstamp) AS bottom_tstamp,\n" + + " UP.tstamp AS end_tstamp,\n" + + " FIRST(DOWN.price + DOWN.tax + 1) AS bottom_total,\n" + + " UP.price + UP.tax AS end_total\n" + + " ONE ROW PER MATCH\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (DOWN{2,} UP)\n" + + " DEFINE\n" + + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n" + + " UP AS price < FIRST(DOWN.price)\n" + + ") AS T"); + assertEquals( + Collections.singletonList(Row.of(6L, 7L, 8L, 33, 33)), + CollectionUtil.iteratorToList(tableResult.collect())); + } + + @Test + public void testLogicalOffsetsWithStarVariable() { + tEnv.createTemporaryView( + "Ticker", + tEnv.fromDataStream( + env.fromElements( + Row.of(1, "ACME", 1L, 20), + Row.of(2, "ACME", 2L, 19), + Row.of(3, "ACME", 3L, 18), + Row.of(4, "ACME", 4L, 17), + Row.of(5, "ACME", 5L, 16), + Row.of(6, "ACME", 6L, 15), + Row.of(7, "ACME", 7L, 14), + Row.of(8, "ACME", 8L, 20)) + .returns( + ROW_NAMED( + new String[] {"id", "symbol", "tstamp", "price"}, + INT, + STRING, + LONG, + INT)), + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("symbol", DataTypes.STRING()) + .column("tstamp", DataTypes.BIGINT()) + .column("price", DataTypes.INT()) + .columnByExpression("proctime", "PROCTIME()") + .build())); + TableResult tableResult = + tEnv.executeSql( + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " FIRST(id, 0) as id0,\n" + + " FIRST(id, 1) as id1,\n" + + " FIRST(id, 2) as id2,\n" + + " FIRST(id, 3) as id3,\n" + + " FIRST(id, 4) as id4,\n" + + " FIRST(id, 5) as id5,\n" + + " FIRST(id, 6) as id6,\n" + + " FIRST(id, 7) as id7,\n" + + " LAST(id, 0) as id8,\n" + + " LAST(id, 1) as id9,\n" + + " LAST(id, 2) as id10,\n" + + " LAST(id, 3) as id11,\n" + + " LAST(id, 4) as id12,\n" + + " LAST(id, 5) as id13,\n" + + " LAST(id, 6) as id14,\n" + + " LAST(id, 7) as id15\n" + + " ONE ROW PER MATCH\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (`DOWN\"`{2,} UP)\n" + + " DEFINE\n" + + " `DOWN\"` AS price < LAST(price, 1) OR LAST(price, 1) IS NULL,\n" + + " UP AS price = FIRST(price) AND price > FIRST(price, 3) AND price = LAST(price, 7)\n" + + ") AS T"); + assertEquals( + Collections.singletonList(Row.of(1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 3, 2, 1)), + CollectionUtil.iteratorToList(tableResult.collect())); + } + + @Test + public void testLogicalOffsetOutsideOfRangeInMeasures() { + tEnv.createTemporaryView( + "Ticker", + tEnv.fromDataStream( + env.fromElements( + Row.of("ACME", 1L, 19, 1), + Row.of("ACME", 2L, 17, 2), + Row.of("ACME", 3L, 13, 3), + Row.of("ACME", 4L, 20, 4)) + .returns( + ROW_NAMED( + new String[] {"symbol", "tstamp", "price", "tax"}, + STRING, + LONG, + INT, + INT)), + Schema.newBuilder() + .column("symbol", DataTypes.STRING()) + .column("tstamp", DataTypes.BIGINT()) + .column("price", DataTypes.INT()) + .column("tax", DataTypes.INT()) + .columnByExpression("proctime", "PROCTIME()") + .build())); + TableResult tableResult = + tEnv.executeSql( + "SELECT *\n" + + "FROM Ticker\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " FIRST(DOWN.price) as first,\n" + + " LAST(DOWN.price) as last,\n" + + " FIRST(DOWN.price, 5) as nullPrice\n" + + " ONE ROW PER MATCH\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (DOWN{2,} UP)\n" + + " DEFINE\n" + + " DOWN AS price < LAST(DOWN.price, 1) OR LAST(DOWN.price, 1) IS NULL,\n" + + " UP AS price > LAST(DOWN.price)\n" + + ") AS T"); + assertEquals( + Collections.singletonList(Row.of(19, 13, null)), + CollectionUtil.iteratorToList(tableResult.collect())); + } + + /** + * This query checks: + * + *

1. count(D.price) produces 0, because no rows matched to D 2. sum(D.price) produces null, + * because no rows matched to D 3. aggregates that take multiple parameters work 4. aggregates + * with expressions work + */ + @Test + public void testAggregates() { + tEnv.getConfig().setMaxGeneratedCodeLength(1); + tEnv.createTemporaryView( + "MyTable", + tEnv.fromDataStream( + env.fromElements( + Row.of(1, "a", 1, 0.8, 1), + Row.of(2, "z", 2, 0.8, 3), + Row.of(3, "b", 1, 0.8, 2), + Row.of(4, "c", 1, 0.8, 5), + Row.of(5, "d", 4, 0.1, 5), + Row.of(6, "a", 2, 1.5, 2), + Row.of(7, "b", 2, 0.8, 3), + Row.of(8, "c", 1, 0.8, 2), + Row.of(9, "h", 4, 0.8, 3), + Row.of(10, "h", 4, 0.8, 3), + Row.of(11, "h", 2, 0.8, 3), + Row.of(12, "h", 2, 0.8, 3)) + .returns( + ROW_NAMED( + new String[] { + "id", "name", "price", "rate", "weight" + }, + INT, + STRING, + INT, + DOUBLE, + INT)), + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("price", DataTypes.INT()) + .column("rate", DataTypes.DOUBLE()) + .column("weight", DataTypes.INT()) + .columnByExpression("proctime", "PROCTIME()") + .build())); + tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg()); + TableResult tableResult = + tEnv.executeSql( + "SELECT *\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " FIRST(id) as startId,\n" + + " SUM(A.price) AS sumA,\n" + + " COUNT(D.price) AS countD,\n" + + " SUM(D.price) as sumD,\n" + + " weightedAvg(price, weight) as wAvg,\n" + + " AVG(B.price) AS avgB,\n" + + " SUM(B.price * B.rate) as sumExprB,\n" + + " LAST(id) as endId\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (A+ B+ C D? E)\n" + + " DEFINE\n" + + " A AS SUM(A.price) < 6,\n" + + " B AS SUM(B.price * B.rate) < SUM(A.price) AND\n" + + " SUM(B.price * B.rate) > 0.2 AND\n" + + " SUM(B.price) >= 1 AND\n" + + " AVG(B.price) >= 1 AND\n" + + " weightedAvg(price, weight) > 1\n" + + ") AS T"); + assertEquals( + Arrays.asList( + Row.of(1, 5, 0L, null, 2L, 3, 3.4D, 8), + Row.of(9, 4, 0L, null, 3L, 4, 3.2D, 12)), + CollectionUtil.iteratorToList(tableResult.collect())); + } + + @Test + public void testAggregatesWithNullInputs() { + tEnv.getConfig().setMaxGeneratedCodeLength(1); + tEnv.createTemporaryView( + "MyTable", + tEnv.fromDataStream( + env.fromElements( + Row.of(1, "a", 10), + Row.of(2, "z", 10), + Row.of(3, "b", null), + Row.of(4, "c", null), + Row.of(5, "d", 3), + Row.of(6, "c", 3), + Row.of(7, "c", 3), + Row.of(8, "c", 3), + Row.of(9, "c", 2)) + .returns( + ROW_NAMED( + new String[] {"id", "name", "price"}, + INT, + STRING, + INT)), + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("price", DataTypes.INT()) + .columnByExpression("proctime", "PROCTIME()") + .build())); + tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg()); + TableResult tableResult = + tEnv.executeSql( + "SELECT *\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " SUM(A.price) as sumA,\n" + + " COUNT(A.id) as countAId,\n" + + " COUNT(A.price) as countAPrice,\n" + + " COUNT(*) as countAll,\n" + + " COUNT(price) as countAllPrice,\n" + + " LAST(id) as endId\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (A+ C)\n" + + " DEFINE\n" + + " A AS SUM(A.price) < 30,\n" + + " C AS C.name = 'c'\n" + + ") AS T"); + assertEquals( + Collections.singletonList(Row.of(29, 7L, 5L, 8L, 6L, 8)), + CollectionUtil.iteratorToList(tableResult.collect())); + } + + @Test + public void testAccessingCurrentTime() { + tEnv.createTemporaryView( + "MyTable", + tEnv.fromDataStream( + env.fromElements(Row.of(1, "a")) + .returns(ROW_NAMED(new String[] {"id", "name"}, INT, STRING)), + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .columnByExpression("proctime", "PROCTIME()") + .build())); + TableResult tableResult = + tEnv.executeSql( + "SELECT T.aid\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " A.id AS aid,\n" + + " A.proctime AS aProctime,\n" + + " LAST(A.proctime + INTERVAL '1' second) as calculatedField\n" + + " PATTERN (A)\n" + + " DEFINE\n" + + " A AS proctime >= (CURRENT_TIMESTAMP - INTERVAL '1' day)\n" + + ") AS T"); + assertEquals( + Collections.singletonList(Row.of(1)), + CollectionUtil.iteratorToList(tableResult.collect())); + } + + @Test + public void testUserDefinedFunctions() { + tEnv.getConfig().setMaxGeneratedCodeLength(1); + tEnv.createTemporaryView( + "MyTable", + tEnv.fromDataStream( + env.fromElements( + Row.of(1, "a", 1), + Row.of(2, "a", 1), + Row.of(3, "a", 1), + Row.of(4, "a", 1), + Row.of(5, "a", 1), + Row.of(6, "b", 1), + Row.of(7, "a", 1), + Row.of(8, "a", 1), + Row.of(9, "f", 1)) + .returns( + ROW_NAMED( + new String[] {"id", "name", "price"}, + INT, + STRING, + INT)), + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("price", DataTypes.INT()) + .columnByExpression("proctime", "PROCTIME()") + .build())); + tEnv.createTemporarySystemFunction("prefix", new PrefixingScalarFunc()); + tEnv.createTemporarySystemFunction("countFrom", new RichAggFunc()); + String prefix = "PREF"; + int startFrom = 4; + Configuration jobParameters = new Configuration(); + jobParameters.setString("prefix", prefix); + jobParameters.setString("start", Integer.toString(startFrom)); + env.getConfig().setGlobalJobParameters(jobParameters); + TableResult tableResult = + tEnv.executeSql( + String.format( + "SELECT *\n" + + "FROM MyTable\n" + + "MATCH_RECOGNIZE (\n" + + " ORDER BY proctime\n" + + " MEASURES\n" + + " FIRST(id) as firstId,\n" + + " prefix(A.name) as prefixedNameA,\n" + + " countFrom(A.price) as countFromA,\n" + + " LAST(id) as lastId\n" + + " AFTER MATCH SKIP PAST LAST ROW\n" + + " PATTERN (A+ C)\n" + + " DEFINE\n" + + " A AS prefix(A.name) = '%s:a' AND countFrom(A.price) <= %d\n" + + ") AS T", + prefix, 4 + 4)); + assertEquals( + Arrays.asList(Row.of(1, "PREF:a", 8, 5), Row.of(7, "PREF:a", 6, 9)), + CollectionUtil.iteratorToList(tableResult.collect())); + } + + /** Test prefixing function.. */ + public static class PrefixingScalarFunc extends ScalarFunction { + + private String prefix = "ERROR_VALUE"; + + @Override + public void open(FunctionContext context) throws Exception { + prefix = context.getJobParameter("prefix", ""); + } + + public String eval(String value) { + return String.format("%s:%s", prefix, value); + } + } + + /** Test count accumulator. */ + public static class CountAcc { + public Integer count; + + public CountAcc(Integer count) { + this.count = count; + } + } + + /** Test rich aggregate function. */ + public static class RichAggFunc extends AggregateFunction { + + private Integer start = 0; + + @Override + public void open(FunctionContext context) throws Exception { + start = Integer.valueOf(context.getJobParameter("start", "0")); + } + + @Override + public void close() throws Exception { + start = 0; + } + + @Override + public CountAcc createAccumulator() { + return new CountAcc(start); + } + + @Override + public Integer getValue(CountAcc accumulator) { + return accumulator.count; + } + + public void accumulate(CountAcc countAcc, Integer value) { + countAcc.count += value; + } + } +} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.xml new file mode 100644 index 00000000000000..ba1da9ec9872b6 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/MatchRecognizeTest.xml @@ -0,0 +1,74 @@ + + + + + + 0 + ) AS T + GROUP BY symbol, price +) +MATCH_RECOGNIZE ( + PARTITION BY symbol + MEASURES + A.price as dPrice + PATTERN (A) + DEFINE + A AS A.symbol = 'a' +)]]> + + + (PREV(A.$1, 0), 0)]], inputFields=[[symbol, price, tax, ts_ltz]]) + +- LogicalProject(symbol=[$0], price=[$1], tax=[$2], ts_ltz=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, Ticker]]) +]]> + + + (PREV(A.$1, 0), 0)}]) + +- Calc(select=[symbol, price, tax, PROCTIME() AS ts_ltz]) + +- Exchange(distribution=[hash[symbol]]) + +- TableSourceScan(table=[[default_catalog, default_database, Ticker]], fields=[symbol, price, tax]) +]]> + + + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml index 9e8ac59965deef..5552fb5ba706cf 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/operator/BatchOperatorNameTest.xml @@ -2093,6 +2093,114 @@ Calc(select=[b, w$end AS window_end, EXPR$2]) "side" : "second" } ] } ] +}]]> + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala deleted file mode 100644 index 6897fb8735cf93..00000000000000 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/validation/MatchRecognizeValidationTest.scala +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.api.stream.sql.validation - -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.table.api._ -import org.apache.flink.table.api.bridge.scala._ -import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.WeightedAvg -import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.PythonScalarFunction -import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils.ToMillis -import org.apache.flink.table.planner.utils.TableTestBase -import org.apache.flink.types.Row - -import org.junit.Test - -import java.sql.Timestamp - -class MatchRecognizeValidationTest extends TableTestBase { - - private val streamUtil = scalaStreamTestUtil() - streamUtil.addDataStream[(Int, String, Timestamp)]( - "MyTable", - 'a, - 'b, - 'rowtime.rowtime, - 'proctime.proctime) - streamUtil.addDataStream[(String, Long, Int, Int)]( - "Ticker", - 'symbol, - 'tstamp, - 'price, - 'tax, - 'proctime.proctime) - streamUtil.addFunction("ToMillis", new ToMillis) - - /** Function 'MATCH_ROWTIME()' can only be used in MATCH_RECOGNIZE * */ - @Test(expected = classOf[ValidationException]) - def testMatchRowtimeInSelect(): Unit = { - val sql = "SELECT MATCH_ROWTIME() FROM MyTable" - streamUtil.verifyExplain(sql) - } - - /** Function 'MATCH_PROCTIME()' can only be used in MATCH_RECOGNIZE * */ - @Test(expected = classOf[ValidationException]) - def testMatchProctimeInSelect(): Unit = { - val sql = "SELECT MATCH_PROCTIME() FROM MyTable" - streamUtil.verifyExplain(sql) - } - - @Test - def testSortProcessingTimeDesc(): Unit = { - thrown.expectMessage("Primary sort order of a streaming table must be ascending on time.") - thrown.expect(classOf[TableException]) - - val sqlQuery = - s""" - |SELECT * - |FROM Ticker - |MATCH_RECOGNIZE ( - | ORDER BY proctime DESC - | MEASURES - | A.symbol AS aSymbol - | PATTERN (A B) - | DEFINE - | A AS symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] - } - - @Test - def testSortProcessingTimeSecondaryField(): Unit = { - thrown.expectMessage( - "You must specify either rowtime or proctime for order by as " + - "the first one.") - thrown.expect(classOf[TableException]) - - val sqlQuery = - s""" - |SELECT * - |FROM Ticker - |MATCH_RECOGNIZE ( - | ORDER BY price, proctime - | MEASURES - | A.symbol AS aSymbol - | PATTERN (A B) - | DEFINE - | A AS symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] - } - - @Test - def testSortNoOrder(): Unit = { - thrown.expectMessage("You must specify either rowtime or proctime for order by.") - thrown.expect(classOf[TableException]) - - val sqlQuery = - s""" - |SELECT * - |FROM Ticker - |MATCH_RECOGNIZE ( - | MEASURES - | A.symbol AS aSymbol - | PATTERN (A B) - | DEFINE - | A AS symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] - } - - @Test - def testUpdatesInUpstreamOperatorNotSupported(): Unit = { - thrown.expectMessage( - "Match Recognize doesn't support consuming update changes " + - "which is produced by node GroupAggregate(") - thrown.expect(classOf[TableException]) - - val sqlQuery = - s""" - |SELECT * - |FROM (SELECT DISTINCT * FROM Ticker) - |MATCH_RECOGNIZE ( - | ORDER BY proctime - | MEASURES - | A.symbol AS aSymbol - | ONE ROW PER MATCH - | PATTERN (A B) - | DEFINE - | A AS symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toRetractStream[Row] - } - - @Test - def testAggregatesOnMultiplePatternVariablesNotSupported(): Unit = { - thrown.expect(classOf[ValidationException]) - thrown.expectMessage("SQL validation failed.") - - val sqlQuery = - s""" - |SELECT * - |FROM Ticker - |MATCH_RECOGNIZE ( - | ORDER BY proctime - | MEASURES - | SUM(A.price + B.tax) AS taxedPrice - | PATTERN (A B) - | DEFINE - | A AS A.symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] - } - - @Test - def testAggregatesOnMultiplePatternVariablesNotSupportedInUDAGs(): Unit = { - thrown.expect(classOf[ValidationException]) - thrown.expectMessage("Aggregation must be applied to a single pattern variable") - - streamUtil.addFunction("weightedAvg", new WeightedAvg) - - val sqlQuery = - s""" - |SELECT * - |FROM Ticker - |MATCH_RECOGNIZE ( - | ORDER BY proctime - | MEASURES - | weightedAvg(A.price, B.tax) AS weightedAvg - | PATTERN (A B) - | DEFINE - | A AS A.symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] - } - - @Test - def testValidatingAmbiguousColumns(): Unit = { - thrown.expectMessage("Columns ambiguously defined: {symbol, price}") - thrown.expect(classOf[ValidationException]) - - val sqlQuery = - s""" - |SELECT * - |FROM Ticker - |MATCH_RECOGNIZE ( - | PARTITION BY symbol, price - | ORDER BY proctime - | MEASURES - | A.symbol AS symbol, - | A.price AS price - | PATTERN (A) - | DEFINE - | A AS symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] - } - - // *************************************************************************************** - // * Those validations are temporary. We should remove those tests once we support those * - // * features. * - // *************************************************************************************** - - /** Python Function can not be used in MATCH_RECOGNIZE for now * */ - @Test - def testMatchPythonFunction() = { - thrown.expectMessage("Python Function can not be used in MATCH_RECOGNIZE for now.") - thrown.expect(classOf[TableException]) - - streamUtil.addFunction("pyFunc", new PythonScalarFunction("pyFunc")) - val sql = - """SELECT T.aa as ta - |FROM MyTable - |MATCH_RECOGNIZE ( - | ORDER BY proctime - | MEASURES - | A.a as aa, - | pyFunc(1,2) as bb - | PATTERN (A B) - | DEFINE - | A AS a = 1, - | B AS b = 'b' - |) AS T""".stripMargin - streamUtil.verifyExplain(sql) - } - - @Test - def testAllRowsPerMatch(): Unit = { - thrown.expectMessage("All rows per match mode is not supported yet.") - thrown.expect(classOf[TableException]) - - val sqlQuery = - s""" - |SELECT * - |FROM Ticker - |MATCH_RECOGNIZE ( - | ORDER BY proctime - | MEASURES - | A.symbol AS aSymbol - | ALL ROWS PER MATCH - | PATTERN (A B) - | DEFINE - | A AS symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] - } - - @Test - def testGreedyQuantifierAtTheEndIsNotSupported(): Unit = { - thrown.expectMessage( - "Greedy quantifiers are not allowed as the last element of a " + - "Pattern yet. Finish your pattern with either a simple variable or reluctant quantifier.") - thrown.expect(classOf[TableException]) - - val sqlQuery = - s""" - |SELECT * - |FROM Ticker - |MATCH_RECOGNIZE ( - | ORDER BY proctime - | MEASURES - | A.symbol AS aSymbol - | PATTERN (A B+) - | DEFINE - | A AS symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] - } - - @Test - def testPatternsProducingEmptyMatchesAreNotSupported(): Unit = { - thrown.expectMessage( - "Patterns that can produce empty matches are not supported. " + - "There must be at least one non-optional state.") - thrown.expect(classOf[TableException]) - - val sqlQuery = - s""" - |SELECT * - |FROM Ticker - |MATCH_RECOGNIZE ( - | ORDER BY proctime - | MEASURES - | A.symbol AS aSymbol - | PATTERN (A*) - | DEFINE - | A AS symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] - } - - @Test - def testDistinctAggregationsNotSupported(): Unit = { - thrown.expect(classOf[ValidationException]) - - val sqlQuery = - s""" - |SELECT * - |FROM Ticker - |MATCH_RECOGNIZE ( - | ORDER BY proctime - | MEASURES - | COUNT(DISTINCT A.price) AS price - | PATTERN (A B) - | DEFINE - | A AS A.symbol = 'a' - |) AS T - |""".stripMargin - - streamUtil.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row] - } -} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala index e62f9fb6b5be45..ac4db52febc77f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala @@ -30,7 +30,7 @@ import org.apache.flink.table.data.RowData import org.apache.flink.table.expressions.Expression import org.apache.flink.table.planner.calcite.FlinkPlannerImpl import org.apache.flink.table.planner.delegation.PlannerBase -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalDataStreamScan, StreamPhysicalMatch} import org.apache.flink.table.planner.plan.utils.MatchUtil import org.apache.flink.table.planner.utils.TableTestUtil @@ -105,9 +105,9 @@ abstract class PatternTranslatorTestBase extends TestLogger { } val dataMatch = optimized.asInstanceOf[StreamPhysicalMatch] - val p = StreamExecMatch + val p = CommonExecMatch .translatePattern( - MatchUtil.createMatchSpec(dataMatch.logicalMatch), + MatchUtil.createMatchSpec(dataMatch.getLogicalMatch), new Configuration, Thread.currentThread().getContextClassLoader, context._1,