Skip to content

Commit

Permalink
DRILL-256 revised patch
Browse files Browse the repository at this point in the history
  • Loading branch information
mehant authored and StevenMPhillips committed Oct 31, 2013
1 parent 964f014 commit 6c78890
Show file tree
Hide file tree
Showing 22 changed files with 993 additions and 27 deletions.
3 changes: 3 additions & 0 deletions distribution/src/resources/drill-override.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,7 @@ drill.exec: {
global.max.width: 100,
executor.threads: 4
}
trace: {
directory: "/var/log/drill"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ public interface ExecConstants {
public static final String CLIENT_RPC_THREADS = "drill.exec.rpc.user.client.threads";
public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads";
public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
public static final String TRACE_DUMP_DIRECTORY = "drill.exec.trace.directory";
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.compile.ClassTransformer;
import org.apache.drill.exec.compile.QueryClassLoader;
import org.apache.drill.exec.exception.ClassTransformationException;
Expand Down Expand Up @@ -141,4 +142,8 @@ public FunctionImplementationRegistry getFunctionRegistry(){
public QueryClassLoader getClassLoader(){
return loader;
}

public DrillConfig getConfig() {
return context.getConfig();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public T visitProject(Project project, X value) throws E{
return visitOp(project, value);
}

@Override
public T visitTrace(Trace trace, X value) throws E{
return visitOp(trace, value);
}

@Override
public T visitSort(Sort sort, X value) throws E{
return visitOp(sort, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
public RETURN visitTrace(Trace trace, EXTRA value) throws EXCEP;
public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP;
public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.drill.exec.physical.config;

import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

@JsonTypeName("trace")
public class Trace extends AbstractSingle {

static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Trace.class);

/* Tag associated with each trace operator
* Printed along with trace output to distinguish
* between multiple trace operators within same plan
*/
public final String traceTag;

public Trace(@JsonProperty("child") PhysicalOperator child, @JsonProperty("tag") String traceTag) {
super(child);
this.traceTag = traceTag;
}

@Override
public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
return physicalVisitor.visitTrace(this, value);
}

@Override
public OperatorCost getCost() {

/* Compute the total size (row count * row size) */
Size size = child.getSize();
long diskSize = size.getRecordCount() * size.getRecordSize();

return new OperatorCost(0, diskSize, 0, child.getSize().getRecordCount());
}

@Override
public Size getSize() {
return child.getSize();
}

@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
return new Trace(child, traceTag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
import org.apache.drill.exec.physical.impl.trace.TraceBatchCreator;
import org.apache.drill.exec.physical.impl.union.UnionBatchCreator;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.json.JSONScanBatchCreator;
Expand Down Expand Up @@ -73,9 +74,10 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private AggBatchCreator abc = new AggBatchCreator();
private MergeJoinCreator mjc = new MergeJoinCreator();
private RootExec root = null;

private TraceBatchCreator tbc = new TraceBatchCreator();

private ImplCreator(){}

public RootExec getRoot(){
return root;
}
Expand All @@ -85,6 +87,10 @@ public RecordBatch visitProject(Project op, FragmentContext context) throws Exec
return pbc.getBatch(context, op, getChildren(op, context));
}

@Override
public RecordBatch visitTrace(Trace op, FragmentContext context) throws ExecutionSetupException {
return tbc.getBatch(context, op, getChildren(op, context));
}
@Override
public RecordBatch visitSubScan(SubScan subScan, FragmentContext context) throws ExecutionSetupException {
Preconditions.checkNotNull(subScan);
Expand Down Expand Up @@ -153,7 +159,7 @@ public RecordBatch visitFilter(Filter filter, FragmentContext context) throws Ex
return fbc.getBatch(context, filter, getChildren(filter, context));
}


@Override
public RecordBatch visitStreamingAggregate(StreamingAggregate config, FragmentContext context)
throws ExecutionSetupException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.drill.exec.physical.impl;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.Trace;

import java.util.List;

import com.google.common.collect.Lists;


public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, FragmentContext, ExecutionSetupException> {

static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceInjector.class);

static int traceTagCount = 0;


RootExec root = null;
private ScreenCreator sc = new ScreenCreator();

public static PhysicalOperator getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
TraceInjector tI = new TraceInjector();
PhysicalOperator newOp = root.accept(tI, context);

return newOp;
}

/**
* Traverse the physical plan and inject the trace operator after
* every operator.
* @param op Physical operator under which the trace operator will be injected
* @param context Fragment context
* @return same physical operator as passed in, but its child will be a trace operator
* whose child will be the original child of this operator
* @throws ExecutionSetupException
*/
@Override
public PhysicalOperator visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException{

List<PhysicalOperator> newChildren = Lists.newArrayList();
List<PhysicalOperator> list = null;
PhysicalOperator newOp = op;

/* Get the list of child operators */
for (PhysicalOperator child : op)
{
newChildren.add(child.accept(this, context));
}

list = Lists.newArrayList();

/* For every child operator create a trace operator as its parent */
for (int i = 0; i < newChildren.size(); i++)
{
String traceTag = newChildren.toString() + Integer.toString(traceTagCount++);

/* Trace operator */
Trace traceOp = new Trace(newChildren.get(i), traceTag);
list.add(traceOp);
}

/* Inject trace operator */
if (list.size() > 0)
newOp = op.getNewWithChildren(list);

return newOp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.drill.exec.physical.impl.trace;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Trace;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;

import com.google.common.base.Preconditions;

import java.util.List;

public class TraceBatchCreator implements BatchCreator<Trace> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);

@Override
public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) throws ExecutionSetupException {
//Preconditions.checkArgument(children.size() == 1);
return new TraceRecordBatch(config, children.iterator().next(), context);
}


}
Loading

0 comments on commit 6c78890

Please sign in to comment.