Skip to content
This repository has been archived by the owner on Mar 9, 2019. It is now read-only.

Commit

Permalink
- code generator support for merge-join's copyLeft(), copyRight(), co…
Browse files Browse the repository at this point in the history
…mpare() and compareNextLeftKey()

- add line prefixes to generated code log
- support VectorContainers in declareVectorValueSetupAndMember()
- merge master
  • Loading branch information
vrtx committed Aug 27, 2013
1 parent 10db087 commit aa8f082
Show file tree
Hide file tree
Showing 13 changed files with 339 additions and 244 deletions.
Expand Up @@ -44,7 +44,7 @@ public JaninoClassCompiler(ClassLoader parentClassLoader) {
}

public byte[] getClassByteCode(final String className, final String code) throws CompileException, IOException, ClassNotFoundException, ClassTransformationException {
logger.debug("Compiling:\n {}", code);
logger.debug("Compiling:\n {}", prefixLineNumbers(code));
StringReader reader = new StringReader(code);
Scanner scanner = new Scanner((String) null, reader);
Java.CompilationUnit compilationUnit = new Parser(scanner).parseCompilationUnit();
Expand All @@ -55,6 +55,21 @@ public byte[] getClassByteCode(final String className, final String code) throws
return classFiles[0].toByteArray();
}


private String prefixLineNumbers(String code) {
if (!debugLines) return code;
String out = new String();
int i = 1;
for (String line : code.split("\n")) {
String lineNum = "" + i++;
out += lineNum + ":";
for (int spaces = 0; spaces < 7 - lineNum.length(); ++spaces)
out += " ";
out += line + "\n";
}
return out;
}

public void setDebuggingInformation(boolean debugSource, boolean debugLines, boolean debugVars) {
this.debugSource = debugSource;
this.debugLines = debugLines;
Expand Down
Expand Up @@ -108,11 +108,16 @@ public JBlock getEvalBlock(){
// public JBlock getCleanupBlock(){
// return getBlock(getCurrentMapping().getCleanup());
// }

public JVar declareVectorValueSetupAndMember(String batchName, TypedFieldId fieldId){

public JVar declareVectorValueSetupAndMember(String batchName, TypedFieldId fieldId) {
return declareVectorValueSetupAndMember(batchName, fieldId, false);
}

public JVar declareVectorValueSetupAndMember(String batchName, TypedFieldId fieldId, boolean isVectorContainer) {
Class<?> valueVectorClass = TypeHelper.getValueVectorClass(fieldId.getType().getMinorType(), fieldId.getType().getMode());
JClass vvClass = model.ref(valueVectorClass);
JClass retClass = vvClass;
String accessorMethod = isVectorContainer ? "getVectorAccessor" : "getValueAccessorById";
String vectorAccess = "getValueVector";
if(fieldId.isHyperReader()){
retClass = retClass.array();
Expand All @@ -127,7 +132,7 @@ public JVar declareVectorValueSetupAndMember(String batchName, TypedFieldId fiel
objClass, //
getNextVar("tmp"), //
JExpr.direct(batchName)
.invoke("getValueAccessorById") //
.invoke(accessorMethod) //
.arg(JExpr.lit(fieldId.getFieldId())) //
.arg( vvClass.dotclass())
.invoke(vectorAccess)//
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.config.UnionExchange;

public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> {
Expand All @@ -38,7 +39,7 @@ public T visitExchange(Exchange exchange, X value) throws E{
}

@Override
public T visitUnion(UnionExchange union, X value) throws E {
public T visitUnion(Union union, X value) throws E {
return visitOp(union, value);
}

Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.config.UnionExchange;

/**
Expand All @@ -45,7 +46,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitStore(Store store, EXTRA value) throws EXCEP;

public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
public RETURN visitUnion(UnionExchange union, EXTRA value) throws EXCEP;
public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
Expand Down
Expand Up @@ -28,12 +28,14 @@
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.RandomReceiver;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator;
import org.apache.drill.exec.physical.impl.join.MergeJoinCreator;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreator;
Expand Down
@@ -1,9 +1,10 @@
package org.apache.drill.exec.physical.impl.join;

import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorContainer;

public interface JoinEvaluator {
public abstract void setup(RecordBatch left, RecordBatch right, RecordBatch outgoing);
public abstract boolean copy(int leftPosition, int rightPosition, int outputPosition);
public abstract int compare(int leftPosition, int rightPosition);
public abstract void doSetup(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;

}
@@ -0,0 +1,78 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/

package org.apache.drill.exec.physical.impl.join;

import org.apache.drill.exec.compile.sig.CodeGeneratorSignature;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorContainer;

import javax.inject.Named;

import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;


public interface JoinInnerSignature extends CodeGeneratorSignature {

public static final MappingSet SETUP_MAPPING =
new MappingSet("null", "null",
GM("doSetup", "doSetup", null, null),
GM("doSetup", "doSetup", null, null));
public static final MappingSet COPY_LEFT_MAPPING =
new MappingSet("leftIndex", "outIndex",
GM("doSetup", "doCopyLeft", null, null),
GM("doSetup", "doCopyLeft", null, null));
public static final MappingSet COPY_RIGHT_MAPPING =
new MappingSet("rightIndex", "outIndex",
GM("doSetup", "doCopyRight", null, null),
GM("doSetup", "doCopyRight", null, null));
public static final MappingSet COMPARE_MAPPING =
new MappingSet("leftIndex", "rightIndex",
GM("doSetup", "doCompare", null, null),
GM("doSetup", "doCompare", null, null));
public static final MappingSet COMPARE_RIGHT_MAPPING =
new MappingSet("rightIndex", "null",
GM("doSetup", "doCompare", null, null),
GM("doSetup", "doCompare", null, null));
public static final MappingSet COMPARE_LEFT_MAPPING =
new MappingSet("leftIndex", "null",
GM("doSetup", "doCompareNextLeftKey", null, null),
GM("doSetup", "doCompareNextLeftKey", null, null));
public static final MappingSet COMPARE_NEXT_LEFT_MAPPING =
new MappingSet("nextLeftIndex", "null",
GM("doSetup", "doCompareNextLeftKey", null, null),
GM("doSetup", "doCompareNextLeftKey", null, null));

public void doSetup(@Named("context") FragmentContext context,
@Named("status") JoinStatus status,
@Named("outgoing") VectorContainer outgoing) throws SchemaChangeException;

public int doCompare(@Named("leftIndex") int leftIndex,
@Named("rightIndex") int rightIndex);

public int doCompareNextLeftKey(@Named("leftIndex") int leftIndex);

public boolean doCopyLeft(@Named("leftIndex") int leftIndex,
@Named("outIndex") int outIndex);

public boolean doCopyRight(@Named("rightIndex") int rightIndex,
@Named("outIndex") int outIndex);

}
Expand Up @@ -15,13 +15,13 @@ public static enum RightSourceMode {
INCOMING_BATCHES, QUEUED_BATCHES;
}

public final RecordBatch left;
private int leftPosition;
private final RecordBatch left;
private IterOutcome lastLeft;

public final RecordBatch right;
private int rightPosition;
private int svRightPosition;
private final RecordBatch right;
private IterOutcome lastRight;

private int outputPosition;
Expand Down Expand Up @@ -84,14 +84,6 @@ public final void resetOutputPos() {
outputPosition = 0;
}

public final RecordBatch getLeftBatch() {
return left;
}

public final RecordBatch getRightBatch() {
return right;
}

public final void notifyLeftRepeating() {
leftRepeating = true;
// outputBatch.resetBatchBuilder();
Expand Down

0 comments on commit aa8f082

Please sign in to comment.