Skip to content

Commit

Permalink
[FLINK-24865][CEP] Support MATCH_RECOGNIZE in Batch mode
Browse files Browse the repository at this point in the history
This closes apache#18408
  • Loading branch information
SteNicholas authored and liujia10 committed Jul 22, 2022
1 parent c0e47cb commit a8a0c6c
Show file tree
Hide file tree
Showing 18 changed files with 2,217 additions and 890 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec.batch;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch;
import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
import org.apache.flink.table.types.logical.RowType;

import java.util.Collections;

/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
public class BatchExecMatch extends CommonExecMatch
implements BatchExecNode<RowData>, MultipleTransformationTranslator<RowData> {

public BatchExecMatch(
ReadableConfig tableConfig,
MatchSpec matchSpec,
InputProperty inputProperty,
RowType outputType,
String description) {
super(
ExecNodeContext.newNodeId(),
ExecNodeContext.newContext(BatchExecMatch.class),
ExecNodeContext.newPersistedConfig(BatchExecMatch.class, tableConfig),
matchSpec,
Collections.singletonList(inputProperty),
outputType,
description);
}

@Override
public boolean isProcTime(RowType inputRowType) {
return true;
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.physical.batch;

import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.logical.MatchRecognize;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecMatch;
import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalMatch;
import org.apache.flink.table.planner.plan.utils.MatchUtil;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;

import java.util.List;

import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;

/** Batch physical RelNode which matches along with MATCH_RECOGNIZE. */
public class BatchPhysicalMatch extends CommonPhysicalMatch implements BatchPhysicalRel {

public BatchPhysicalMatch(
RelOptCluster cluster,
RelTraitSet traitSet,
RelNode inputNode,
MatchRecognize logicalMatch,
RelDataType outputRowType) {
super(cluster, traitSet, inputNode, logicalMatch, outputRowType);
}

@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new BatchPhysicalMatch(
getCluster(), traitSet, inputs.get(0), getLogicalMatch(), deriveRowType());
}

@Override
public ExecNode<?> translateToExecNode() {
return new BatchExecMatch(
unwrapTableConfig(this),
MatchUtil.createMatchSpec(getLogicalMatch()),
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType()),
getRelDetailedDescription());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.physical.common;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.plan.logical.MatchRecognize;
import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
import org.apache.flink.table.planner.plan.utils.PythonUtil;
import org.apache.flink.table.planner.plan.utils.RelExplainUtil;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.type.RelDataType;

import scala.collection.JavaConverters;
import scala.collection.Seq;

/** Base physical RelNode which matches along with MATCH_RECOGNIZE. */
public abstract class CommonPhysicalMatch extends SingleRel implements FlinkPhysicalRel {

private final MatchRecognize logicalMatch;
private final RelDataType outputRowType;

public CommonPhysicalMatch(
RelOptCluster cluster,
RelTraitSet traitSet,
RelNode inputNode,
MatchRecognize logicalMatch,
RelDataType outputRowType) {
super(cluster, traitSet, inputNode);
if (logicalMatch.measures().values().stream()
.anyMatch(m -> PythonUtil.containsPythonCall(m, null))
|| logicalMatch.patternDefinitions().values().stream()
.anyMatch(p -> PythonUtil.containsPythonCall(p, null))) {
throw new TableException("Python Function can not be used in MATCH_RECOGNIZE for now.");
}
this.logicalMatch = logicalMatch;
this.outputRowType = outputRowType;
}

@Override
protected RelDataType deriveRowType() {
return outputRowType;
}

@Override
public RelWriter explainTerms(RelWriter pw) {
RelDataType inputRowType = getInput().getRowType();
Seq<String> fieldNames =
JavaConverters.asScalaBufferConverter(inputRowType.getFieldNames()).asScala();
return super.explainTerms(pw)
.itemIf(
"partitionBy",
RelExplainUtil.fieldToString(
logicalMatch.partitionKeys().toArray(), inputRowType),
!logicalMatch.partitionKeys().isEmpty())
.itemIf(
"orderBy",
RelExplainUtil.collationToString(logicalMatch.orderKeys(), inputRowType),
!logicalMatch.orderKeys().getFieldCollations().isEmpty())
.itemIf(
"measures",
RelExplainUtil.measuresDefineToString(
logicalMatch.measures(),
fieldNames.toList(),
this::getExpressionString,
convertToExpressionDetail(pw.getDetailLevel())),
!logicalMatch.measures().isEmpty())
.item("rowsPerMatch", RelExplainUtil.rowsPerMatchToString(logicalMatch.allRows()))
.item("after", RelExplainUtil.afterMatchToString(logicalMatch.after(), fieldNames))
.item("pattern", logicalMatch.pattern().toString())
.itemIf(
"subset",
RelExplainUtil.subsetToString(logicalMatch.subsets()),
!logicalMatch.subsets().isEmpty())
.item("define", logicalMatch.patternDefinitions());
}

public MatchRecognize getLogicalMatch() {
return logicalMatch;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.physical.batch;

import org.apache.flink.table.planner.plan.logical.MatchRecognize;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch;
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalMatch;
import org.apache.flink.table.planner.plan.rules.physical.common.CommonPhysicalMatchRule;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;

/**
* The physical rule is responsible for convert {@link FlinkLogicalMatch} to {@link
* BatchPhysicalMatch}.
*/
public class BatchPhysicalMatchRule extends CommonPhysicalMatchRule {

public static final RelOptRule INSTANCE = new BatchPhysicalMatchRule();

private BatchPhysicalMatchRule() {
super(
FlinkLogicalMatch.class,
FlinkConventions.LOGICAL(),
FlinkConventions.BATCH_PHYSICAL(),
"BatchPhysicalMatchRule");
}

@Override
public RelNode convert(RelNode rel) {
return super.convert(rel, FlinkConventions.BATCH_PHYSICAL());
}

@Override
protected RelNode convertToPhysicalMatch(
RelOptCluster cluster,
RelTraitSet traitSet,
RelNode convertInput,
MatchRecognize matchRecognize,
RelDataType rowType) {
return new BatchPhysicalMatch(cluster, traitSet, convertInput, matchRecognize, rowType);
}
}

0 comments on commit a8a0c6c

Please sign in to comment.