Skip to content

Commit

Permalink
runtime-core: Add comments and optimize FSSOptimiser
Browse files Browse the repository at this point in the history
  • Loading branch information
minborg committed Jun 1, 2017
1 parent edd5f52 commit 8cba8b6
Showing 1 changed file with 23 additions and 4 deletions.
Expand Up @@ -74,13 +74,13 @@ public final class FilterSortedSkipOptimizer<ENTITY> implements SqlStreamOptimiz
private final SkipOperation SKIP_OPERATION = new SkipOperation();
private final LimitOperation LIMIT_OPERATION = new LimitOperation();

private final List<Operation<ENTITY>> FILTER_SORTED_SKIP_PATH = Arrays.asList(
private final List<Operation<ENTITY>> FILTER_SORTED_SKIP_LIMIT_PATH = Arrays.asList(
FILTER_OPERATION,
SORTED_OPERATION,
SKIP_OPERATION,
LIMIT_OPERATION
);
private final List<Operation<ENTITY>> SORTED_FILTER_SKIP_PATH = Arrays.asList(
private final List<Operation<ENTITY>> SORTED_FILTER_SKIP_LIMIT_PATH = Arrays.asList(
SORTED_OPERATION,
FILTER_OPERATION,
SKIP_OPERATION,
Expand Down Expand Up @@ -244,22 +244,37 @@ private void traverse(Pipeline pipeline,
final Consumers<ENTITY> consumers = new Consumers<>(filterConsumer, sortedConsumer, skipConsumer, limitConsumer);

final Action<?, ?> firstAction = pipeline.getFirst();

// The path is the way we can walk the stream pipeline
// and still satisfy the requirement on this optimizer
// There are two paths:
// Sorted*,Filter*,Skip*,Limit*
// Filter*,Sorted*,Skip*,Limit*
// If there are other operations types in between, the optimizer will not kick in
final List<Operation<ENTITY>> path;
if (firstAction instanceof SortedComparatorAction) {
path = SORTED_FILTER_SKIP_PATH;
path = SORTED_FILTER_SKIP_LIMIT_PATH;
} else {
path = FILTER_SORTED_SKIP_PATH;
path = FILTER_SORTED_SKIP_LIMIT_PATH;
}

// Keeps track on where we are in the path
// Start with the first operation type (i.e. either SORTED or FILTER)
Operation<ENTITY> operation = path.get(0);

for (Action<?, ?> action : pipeline) {

// Are we on the first operation type in the path
if (operation == path.get(0)) {
// Check if the current stream action is of the first operational type (e.g. SORTED)
if (operation.is(action)) {
// If so, consume the stream action (e.g. increase a counter or put it in a list)
operation.consume(action, consumers);
continue;
} else {
// Check if the current stream action is of the second operational type (e.g. FILTER)
if (path.get(1).is(action)) {
// Move the operation state to the second operational type
operation = path.get(1);
} else {
if (path.get(2).is(action)) {
Expand All @@ -275,9 +290,11 @@ private void traverse(Pipeline pipeline,
}
}

// The same principle as above but starting at the second operation type in the path
if (operation == path.get(1)) {
if (operation.is(action)) {
operation.consume(action, consumers);
continue;
} else {
if (path.get(2).is(action)) {
operation = path.get(2);
Expand All @@ -294,6 +311,7 @@ private void traverse(Pipeline pipeline,
if (operation == path.get(2)) {
if (operation.is(action)) {
operation.consume(action, consumers);
continue;
} else {
if (path.get(3).is(action)) {
operation = path.get(3);
Expand All @@ -306,6 +324,7 @@ private void traverse(Pipeline pipeline,
if (operation == path.get(3)) {
if (operation.is(action)) {
operation.consume(action, consumers);
continue;
} else {
return;
}
Expand Down

0 comments on commit 8cba8b6

Please sign in to comment.