Skip to content

Commit

Permalink
TEIID-3119 updating dup removal to use tree logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Sep 8, 2014
1 parent 91b2030 commit 015857e
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 223 deletions.
Expand Up @@ -450,18 +450,20 @@ protected RelationalNode convertNode(PlanNode node)

case NodeConstants.Types.SORT:
case NodeConstants.Types.DUP_REMOVE:
SortNode sortNode = new SortNode(getID());
OrderBy orderBy = (OrderBy) node.getProperty(NodeConstants.Info.SORT_ORDER);
if (orderBy != null) {
sortNode.setSortElements(orderBy.getOrderByItems());
}
if (node.getType() == NodeConstants.Types.DUP_REMOVE) {
sortNode.setMode(Mode.DUP_REMOVE);
} else if (node.hasBooleanProperty(NodeConstants.Info.IS_DUP_REMOVAL)) {
sortNode.setMode(Mode.DUP_REMOVE_SORT);
processNode = new DupRemoveNode(getID());
} else {
SortNode sortNode = new SortNode(getID());
OrderBy orderBy = (OrderBy) node.getProperty(NodeConstants.Info.SORT_ORDER);
if (orderBy != null) {
sortNode.setSortElements(orderBy.getOrderByItems());
}
if (node.hasBooleanProperty(NodeConstants.Info.IS_DUP_REMOVAL)) {
sortNode.setMode(Mode.DUP_REMOVE_SORT);
}

processNode = sortNode;
}

processNode = sortNode;
break;
case NodeConstants.Types.GROUP:
GroupingNode gnode = new GroupingNode(getID());
Expand All @@ -470,7 +472,7 @@ protected RelationalNode convertNode(PlanNode node)
gnode.setOutputMapping(groupingMap);
gnode.setRemoveDuplicates(node.hasBooleanProperty(NodeConstants.Info.IS_DUP_REMOVAL));
List<Expression> gCols = (List) node.getProperty(NodeConstants.Info.GROUP_COLS);
orderBy = (OrderBy) node.getProperty(Info.SORT_ORDER);
OrderBy orderBy = (OrderBy) node.getProperty(Info.SORT_ORDER);
if (orderBy == null) {
if (gCols != null) {
orderBy = new OrderBy(RuleChooseJoinStrategy.createExpressionSymbols(gCols));
Expand Down Expand Up @@ -571,10 +573,14 @@ protected RelationalNode convertNode(PlanNode node)
if(useAll) {
processNode = unionAllNode;
} else {
SortNode sNode = new SortNode(getID());
boolean onlyDupRemoval = node.hasBooleanProperty(NodeConstants.Info.IS_DUP_REMOVAL);
sNode.setMode(onlyDupRemoval?Mode.DUP_REMOVE:Mode.DUP_REMOVE_SORT);
processNode = sNode;
if (onlyDupRemoval) {
processNode = new DupRemoveNode(getID());
} else {
SortNode sNode = new SortNode(getID());
sNode.setMode(Mode.DUP_REMOVE_SORT);
processNode = sNode;
}

unionAllNode.setElements( (List) node.getProperty(NodeConstants.Info.OUTPUT_COLS) );
processNode.addChild(unionAllNode);
Expand Down
Expand Up @@ -106,6 +106,9 @@ public void sort() throws BlockedException,
if (originalVs.isDistinct() && sortSymbols.size() == originalVs.getTupleBuffer().getSchema().size()) {
dvs = originalVs;
} else {
//TODO: should not use the full buffer as it still contains the full source tuples
//alternatively if we're already sorted by the join node then processing distinct
//does not require a full pass
List<Boolean> sortDirection = Collections.nCopies(sortSymbols.size(), OrderBy.ASC);
this.sortUtility = new SortUtility(null, sortSymbols, sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(), dependentNode.getConnectionID(), originalVs.getTupleBuffer().getSchema());
this.sortUtility.setWorkingBuffer(originalVs.getTupleBuffer());
Expand Down
@@ -0,0 +1,98 @@
/*
* JBoss, Home of Professional Open Source.
* See the COPYRIGHT.txt file distributed with this work for information
* regarding copyright ownership. Some portions may be licensed
* to Red Hat, Inc. under one or more contributor license agreements.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA.
*/

package org.teiid.query.processor.relational;

import java.util.List;

import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.STree;
import org.teiid.common.buffer.STree.InsertMode;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;

public class DupRemoveNode extends RelationalNode {

private STree stree = null;
private TupleBatch batch;
private int counter;

public DupRemoveNode(int nodeID) {
super(nodeID);
}

public void reset() {
super.reset();
stree = null;
counter = 0;
batch = null;
}

@Override
public void open() throws TeiidComponentException, TeiidProcessingException {
super.open();

stree = getBufferManager().createSTree(this.getElements(), this.getConnectionID(), this.getElements().size());
}

public TupleBatch nextBatchDirect()
throws BlockedException, TeiidComponentException, TeiidProcessingException {
while (true) {
if (batch == null) {
batch = this.getChildren()[0].nextBatch();
}

List<List<?>> tuples = batch.getTuples();
for (;counter < tuples.size(); counter++) {
List<?> tuple = tuples.get(counter);
List<?> existing = stree.insert(tuple, InsertMode.NEW, -1);
if (existing != null) {
continue;
}
this.addBatchRow(tuple);
if (this.isBatchFull()) {
return pullBatch();
}
}
if (batch.getTerminationFlag()) {
terminateBatches();
return pullBatch();
}
batch = null;
counter = 0;
}
}

public void closeDirect() {
if (stree != null) {
stree.remove();
}
}

public Object clone(){
DupRemoveNode clonedNode = new DupRemoveNode(super.getID());
copyTo(clonedNode);
return clonedNode;
}

}
Expand Up @@ -272,10 +272,10 @@ protected void loadRight() throws TeiidComponentException,
break;
}
if (parent instanceof SortNode) {
SortNode sort = (SortNode)parent;
if (sort.getMode() != Mode.DUP_REMOVE) {
break;
}
break;
}
if (parent instanceof DupRemoveNode) {
break;
}
parent = parent.getParent();
}
Expand Down
Expand Up @@ -325,7 +325,41 @@ public TupleBatch nextBatchDirect()
if(this.phase == COLLECTION) {
collectionPhase();
}


/* if !removeDuplicates && !rollup && aggs don't need a full sort) {
* ...
* }
*
*/

/*if (true) {
*
*
List<Expression> schema = new ArrayList<Expression>();
schema.add(this.orderBy.get(0).getSymbol());
ElementSymbol es = new ElementSymbol("x");
es.setType(Object[].class);
schema.add(es);
STree tree = this.getBufferManager().createSTree(schema, this.getConnectionID(), 1);
TupleSource ts = getCollectionTupleSource();
List<?> tuple = null;
while ((tuple = ts.nextTuple()) != null) {
List<?> current = tree.find(tuple);
if (current != null) {
//updateAggregates(tuple);
} else {
List<Object> updated = new ArrayList<Object>(tuple);
updated.add(new Object[]{1}); //actually this should be the modified form
tree.insert(updated, InsertMode.NEW, -1);
}
}
//throw new AssertionError();
this.addBatchRow(Arrays.asList("1", 500000));
this.terminateBatches();
return pullBatch();
}*/

// If necessary, sort to determine groups (if no group cols, no need to sort)
if(this.phase == SORT) {
sortPhase();
Expand Down
Expand Up @@ -167,9 +167,7 @@ public void closeDirect() {
protected void getNodeString(StringBuffer str) {
super.getNodeString(str);
str.append("[").append(mode).append("] "); //$NON-NLS-1$ //$NON-NLS-2$
if (this.mode != Mode.DUP_REMOVE) {
str.append(this.items);
}
str.append(this.items);
}

protected void copyTo(SortNode target){
Expand All @@ -188,7 +186,7 @@ public Object clone(){
public PlanNode getDescriptionProperties() {
PlanNode props = super.getDescriptionProperties();

if(this.mode != Mode.DUP_REMOVE && this.items != null) {
if(this.items != null) {
props.addProperty(PROP_SORT_COLS, this.items.toString());
}

Expand Down Expand Up @@ -216,7 +214,7 @@ public TupleBuffer getBuffer(int maxRows) throws BlockedException, TeiidComponen
@Override
public boolean hasBuffer(boolean requireFinal) {
if (this.getElements().size() == this.getChildren()[0].getElements().size()) {
return !requireFinal || mode != Mode.DUP_REMOVE;
return true;
}
return false;
}
Expand Down

0 comments on commit 015857e

Please sign in to comment.