Skip to content

Commit

Permalink
Adding limit operator and sql rules
Browse files Browse the repository at this point in the history
  • Loading branch information
tnachen committed Oct 14, 2013
1 parent 251022f commit b7d41eb
Show file tree
Hide file tree
Showing 19 changed files with 450 additions and 62 deletions.
Expand Up @@ -28,21 +28,21 @@
@JsonTypeName("limit")
public class Limit extends SingleInputOperator{

private final int first;
private final int last;
private final Integer first;
private final Integer last;

@JsonCreator
public Limit(@JsonProperty("first") int first, @JsonProperty("last") int last) {
public Limit(@JsonProperty("first") Integer first, @JsonProperty("last") Integer last) {
super();
this.first = first;
this.last = last;
}

public int getFirst() {
public Integer getFirst() {
return first;
}

public int getLast() {
public Integer getLast() {
return last;
}

Expand Down
Expand Up @@ -32,19 +32,9 @@
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.common.logical.data.CollapsingAggregate;
import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.Join;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.common.logical.data.*;
import org.apache.drill.common.logical.data.Order.Direction;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.logical.data.Project;
import org.apache.drill.common.logical.data.Scan;
import org.apache.drill.common.logical.data.Segment;
import org.apache.drill.common.logical.data.SinkOperator;
import org.apache.drill.common.logical.data.Store;
import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
Expand All @@ -57,6 +47,7 @@
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.store.StorageEngine;

Expand Down Expand Up @@ -139,7 +130,11 @@ public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerEx
return new SelectionVectorRemover(new Sort(input, ods, false));
}


@Override
public PhysicalOperator visitLimit(org.apache.drill.common.logical.data.Limit limit, Object value) throws OptimizerException {
PhysicalOperator input = limit.getInput().accept(this, value);
return new SelectionVectorRemover(new Limit(input, limit.getFirst(), limit.getLast()));
}

@Override
public PhysicalOperator visitCollapsingAggregate(CollapsingAggregate agg, Object value)
Expand Down
Expand Up @@ -17,19 +17,7 @@
*/
package org.apache.drill.exec.physical.base;

import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.config.HashToRandomExchange;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.RandomReceiver;
import org.apache.drill.exec.physical.config.RangeSender;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.config.UnionExchange;
import org.apache.drill.exec.physical.config.*;

public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class);
Expand Down Expand Up @@ -58,7 +46,11 @@ public T visitProject(Project project, X value) throws E{
public T visitSort(Sort sort, X value) throws E{
return visitOp(sort, value);
}


@Override
public T visitLimit(Limit limit, X value) throws E {
return visitOp(limit, value);
}

@Override
public T visitStreamingAggregate(StreamingAggregate agg, X value) throws E {
Expand Down
Expand Up @@ -17,19 +17,7 @@
*/
package org.apache.drill.exec.physical.base;

import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.config.HashToRandomExchange;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.RandomReceiver;
import org.apache.drill.exec.physical.config.RangeSender;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.config.UnionExchange;
import org.apache.drill.exec.physical.config.*;

/**
* Visitor class designed to traversal of a operator tree. Basis for a number of operator manipulations including fragmentation and materialization.
Expand All @@ -50,6 +38,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP;
public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
public RETURN visitSender(Sender sender, EXTRA value) throws EXCEP;
public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP;
Expand Down
@@ -0,0 +1,63 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.drill.exec.physical.config;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;

@JsonTypeName("limit")
public class Limit extends AbstractSingle {
private final Integer first;
private final Integer last;

@JsonCreator
public Limit(@JsonProperty("child") PhysicalOperator child, @JsonProperty("first") Integer first, @JsonProperty("last") Integer last) {
super(child);
this.first = first;
this.last = last;
}

public Integer getFirst() {
return first;
}

public Integer getLast() {
return last;
}

@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
return new Limit(child, first, last);
}

@Override
public OperatorCost getCost() {
return new OperatorCost(0, 0, 0, 0.25f);
}

@Override
public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
return physicalVisitor.visitLimit(this, value);
}
}
Expand Up @@ -26,21 +26,12 @@
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.RandomReceiver;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.config.*;
import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator;
import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
import org.apache.drill.exec.physical.impl.limit.LimitBatchCreator;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
Expand Down Expand Up @@ -72,6 +63,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private SingleSenderCreator ssc = new SingleSenderCreator();
private ProjectBatchCreator pbc = new ProjectBatchCreator();
private FilterBatchCreator fbc = new FilterBatchCreator();
private LimitBatchCreator lbc = new LimitBatchCreator();
private UnionBatchCreator unionbc = new UnionBatchCreator();
private SVRemoverCreator svc = new SVRemoverCreator();
private SortBatchCreator sbc = new SortBatchCreator();
Expand Down Expand Up @@ -121,6 +113,11 @@ public RecordBatch visitSort(Sort sort, FragmentContext context) throws Executio
return sbc.getBatch(context, sort, getChildren(sort, context));
}

@Override
public RecordBatch visitLimit(Limit limit, FragmentContext context) throws ExecutionSetupException {
return lbc.getBatch(context, limit, getChildren(limit, context));
}

@Override
public RecordBatch visitMergeJoin(MergeJoinPOP op, FragmentContext context) throws ExecutionSetupException {
return mjc.getBatch(context, op, getChildren(op, context));
Expand Down
@@ -0,0 +1,17 @@
package org.apache.drill.exec.physical.impl.limit;

import com.google.common.collect.Iterables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;

import java.util.List;

public class LimitBatchCreator implements BatchCreator<Limit> {
@Override
public RecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children) throws ExecutionSetupException {
return new LimitRecordBatch(config, context, Iterables.getOnlyElement(children));
}
}
@@ -0,0 +1,107 @@
package org.apache.drill.exec.physical.impl.limit;

import com.beust.jcommander.internal.Lists;
import com.google.common.base.Objects;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.ValueVector;

import java.util.List;

public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {

private SelectionVector2 outgoingSv;
private SelectionVector2 incomingSv;

public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) {
super(popConfig, context, incoming);
outgoingSv = new SelectionVector2(context.getAllocator());
}

@Override
protected void setupNewSchema() throws SchemaChangeException {
container.clear();

List<TransferPair> transfers = Lists.newArrayList();

for(VectorWrapper<?> v : incoming){
TransferPair pair = v.getValueVector().getTransferPair();
container.add(pair.getTo());
transfers.add(pair);
}

BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();

switch(svMode){
case NONE:
break;
case TWO_BYTE:
this.incomingSv = incoming.getSelectionVector2();
break;
default:
throw new UnsupportedOperationException();
}

container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE);


for(TransferPair tp : transfers) {
tp.transfer();
}
}

@Override
public SelectionVector2 getSelectionVector2() {
return outgoingSv;
}

@Override
protected void doWork() {
int recordCount = incoming.getRecordCount();
outgoingSv.allocateNew(recordCount);

if(incomingSv != null) {
limitWithSV(recordCount);
} else {
limitWithNoSV(recordCount);
}
}

private void limitWithNoSV(int recordCount) {
int svIndex = 0;
int offset = Math.max(0, Objects.firstNonNull(popConfig.getFirst(), 0));
int fetch = Math.min(recordCount, Objects.firstNonNull(popConfig.getLast(), recordCount));
for(char i = (char) offset; i < fetch; i++) {
outgoingSv.setIndex(svIndex, i);
svIndex++;
}
outgoingSv.setRecordCount(svIndex);
}

private void limitWithSV(int recordCount) {
int svIndex = 0;
int offset = Math.max(0, Objects.firstNonNull(popConfig.getFirst(), 0));
int fetch = Math.min(recordCount, Objects.firstNonNull(popConfig.getLast(), recordCount));
for(int i = offset; i < fetch; i++) {
char index = incomingSv.getIndex(i);
outgoingSv.setIndex(svIndex, index);
svIndex++;
}

outgoingSv.setRecordCount(svIndex);
}

@Override
public int getRecordCount() {
return outgoingSv.getCount();
}

@Override
protected void cleanup(){
super.cleanup();
outgoingSv.clear();
}
}
Expand Up @@ -19,6 +19,7 @@

import org.apache.drill.exec.physical.base.*;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;

Expand Down Expand Up @@ -91,6 +92,12 @@ public Void visitStore(Store store, Wrapper wrapper) {
return super.visitStore(store, wrapper);
}

@Override
public Void visitLimit(Limit limit, Wrapper value) throws RuntimeException {
// TODO: Implement this
return visitOp(limit, value);
}

@Override
public Void visitOp(PhysicalOperator op, Wrapper wrapper) {
if(op instanceof HasAffinity){
Expand Down

0 comments on commit b7d41eb

Please sign in to comment.