Skip to content

Commit

Permalink
Start [CALCITE-1935] Implement MATCH_RECOGNIZE
Browse files Browse the repository at this point in the history
  • Loading branch information
julianhyde committed Jul 22, 2018
1 parent 8a32357 commit 4dfaf1b
Show file tree
Hide file tree
Showing 3 changed files with 503 additions and 0 deletions.
146 changes: 146 additions & 0 deletions core/src/main/java/org/apache/calcite/runtime/Enumerables.java
Expand Up @@ -17,9 +17,19 @@
package org.apache.calcite.runtime; package org.apache.calcite.runtime;


import org.apache.calcite.interpreter.Row; import org.apache.calcite.interpreter.Row;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.function.Function1; import org.apache.calcite.linq4j.function.Function1;
import org.apache.calcite.util.CircularArrayList;


import com.google.common.collect.ImmutableList;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier; import java.util.function.Supplier;


/** /**
Expand Down Expand Up @@ -60,6 +70,142 @@ public static com.google.common.base.Supplier<Enumerable<Row>> toRow(
return () -> toRow(supplier.get()); return () -> toRow(supplier.get());
} }


public static <E, TResult> Enumerable<TResult> match(
Enumerable<E> enumerable,
List<String> stateNames,
Automaton<E> automaton,
Emitter<E, TResult> emitter) {
return new AbstractEnumerable<TResult>() {
public Enumerator<TResult> enumerator() {
return new Enumerator<TResult>() {
final Enumerator<E> inputEnumerator = enumerable.enumerator();

// When we implement partitions there will be one of these per
// partition.
final PartitionState partitionState = new PartitionState();

int inputRow = -1;

final CircularArrayList<E> recentRows = new CircularArrayList<>();

final Deque<TResult> emitRows = new ArrayDeque<>();

/** Current result row. Null if no row is ready. */
TResult resultRow;

public TResult current() {
Objects.requireNonNull(resultRow);
return resultRow;
}

public boolean moveNext() {
for (;;) {
resultRow = emitRows.pollFirst();
if (resultRow != null) {
return true;
}
// No rows are currently read to emit. Read the next input row,
// see whether it completes a match (or matches), and if so, add
// the resulting rows to the buffer.
if (!inputEnumerator.moveNext()) {
return false;
}
++inputRow;
final E e = inputEnumerator.current();
recentRows.add(e);
int earliestRetainedRow = Integer.MAX_VALUE;
for (int i = 0; i < partitionState.incompleteMatches.size(); i++) {
MatchState match = partitionState.incompleteMatches.get(i);
earliestRetainedRow = Math.min(earliestRetainedRow, match.firstRow);
final int state = automaton.nextState(match.state, e);
switch (state) {
case Automaton.ACCEPT:
final List<E> matchedRows =
recentRows.subList(0, 0); // TODO:
final List<Integer> rowStates = ImmutableList.of(); // TODO:
emitRows.addAll(
emitter.emit(matchedRows, rowStates,
partitionState.matchCount++));
// fall through
case Automaton.FAIL:
partitionState.incompleteMatches.remove(i--);
break;
default:
match.state = state;
}
}
// Try to start a match based on the current row
final int state = automaton.nextState(Automaton.START_STATE, e);
switch (state) {
case Automaton.ACCEPT:
final List<E> matchedRows = ImmutableList.of(e);
final List<Integer> rowStates = ImmutableList.of(state);
emitRows.addAll(
emitter.emit(matchedRows, rowStates,
partitionState.matchCount++));
// fall through
case Automaton.FAIL:
// since it immediately succeeded or failed, don't add
// it to the queue
break;
default:
partitionState.incompleteMatches.add(
new MatchState(inputRow, state));
}
}
}

public void reset() {
throw new UnsupportedOperationException();
}

public void close() {
inputEnumerator.close();
}
};
}
};
}

/** Transition table of a finite-state machine. Given a state and an
* element, tells the next state.
*
* @param <E> element type */
public interface Automaton<E> {
int START_STATE = 0;
int ACCEPT = -1;
int FAIL = -2;

int nextState(int state, E e);
}

/** Given a match (a list of rows, and their states) produces a list
* of rows to be output.
*
* @param <E> element type
* @param <TResult> result type */
public interface Emitter<E, TResult> {
List<TResult> emit(List<E> rows, List<Integer> rowStates, int match);
}

/** State of a partition.
*
* <p>(Currently there is only one partition.) */
private static class PartitionState {
final List<MatchState> incompleteMatches = new ArrayList<>();
int matchCount;
}

/** The state of an incomplete match. */
private static class MatchState {
final int firstRow;
int state;

private MatchState(int firstRow, int state) {
this.firstRow = firstRow;
this.state = state;
}
}
} }


// End Enumerables.java // End Enumerables.java

0 comments on commit 4dfaf1b

Please sign in to comment.