Skip to content

Commit

Permalink
Rip out ExpressionFieldRange
Browse files Browse the repository at this point in the history
Most of its logic wasn't actually hooked up so it doesn't actually offer much
advantage over ExpressionCompare. When we go to actually implement these
optimizations, it should be generic, not just for FieldPaths.
  • Loading branch information
RedBeard0531 committed Oct 8, 2013
1 parent 62d754f commit 4f3813f
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 621 deletions.
339 changes: 0 additions & 339 deletions src/mongo/db/pipeline/expression.cpp
Expand Up @@ -619,62 +619,6 @@ namespace {
/* CMP */ { { false, false, false }, Expression::CMP, "$cmp" },
};

intrusive_ptr<Expression> ExpressionCompare::optimize() {
/* first optimize the comparison operands */
intrusive_ptr<Expression> pE(ExpressionNary::optimize());

/*
If the result of optimization is no longer a comparison, there's
nothing more we can do.
*/
ExpressionCompare *pCmp = dynamic_cast<ExpressionCompare *>(pE.get());
if (!pCmp)
return pE;

/* check to see if optimizing comparison operator is supported */
CmpOp newOp = pCmp->cmpOp;
// CMP and NE cannot use ExpressionFieldRange which is what this optimization uses
if (newOp == CMP || newOp == NE)
return pE;

/*
There's one localized optimization we recognize: a comparison
between a field and a constant. If we recognize that pattern,
replace it with an ExpressionFieldRange.
When looking for this pattern, note that the operands could appear
in any order. If we need to reverse the sense of the comparison to
put it into the required canonical form, do so.
*/
intrusive_ptr<Expression> pLeft(pCmp->vpOperand[0]);
intrusive_ptr<Expression> pRight(pCmp->vpOperand[1]);
intrusive_ptr<ExpressionFieldPath> pFieldPath(
dynamic_pointer_cast<ExpressionFieldPath>(pLeft));
intrusive_ptr<ExpressionConstant> pConstant;
if (pFieldPath.get()) {
pConstant = dynamic_pointer_cast<ExpressionConstant>(pRight);
if (!pConstant.get())
return pE; // there's nothing more we can do
}
else {
/* if the first operand wasn't a path, see if it's a constant */
pConstant = dynamic_pointer_cast<ExpressionConstant>(pLeft);
if (!pConstant.get())
return pE; // there's nothing more we can do

/* the left operand was a constant; see if the right is a path */
pFieldPath = dynamic_pointer_cast<ExpressionFieldPath>(pRight);
if (!pFieldPath.get())
return pE; // there's nothing more we can do

/* these were not in canonical order, so reverse the sense */
newOp = cmpLookup[newOp].reverse;
}

return ExpressionFieldRange::create(
pFieldPath, newOp, pConstant->getValue());
}

Value ExpressionCompare::evaluateInternal(const Variables& vars) const {
Value pLeft(vpOperand[0]->evaluateInternal(vars));
Value pRight(vpOperand[1]->evaluateInternal(vars));
Expand Down Expand Up @@ -1295,289 +1239,6 @@ namespace {
}
}

/* --------------------- ExpressionFieldRange -------------------------- */

intrusive_ptr<Expression> ExpressionFieldRange::optimize() {
/* if there is no range to match, this will never evaluate true */
if (!pRange.get())
return ExpressionConstant::create(Value(false));

/*
If we ended up with a double un-ended range, anything matches. I
don't know how that can happen, given intersect()'s interface, but
here it is, just in case.
*/
if (pRange->pBottom.missing() && pRange->pTop.missing())
return ExpressionConstant::create(Value(true));

/*
In all other cases, we have to test candidate values. The
intersect() method has already optimized those tests, so there
aren't any more optimizations to look for here.
*/
return intrusive_ptr<Expression>(this);
}

void ExpressionFieldRange::addDependencies(set<string>& deps, vector<string>* path) const {
pFieldPath->addDependencies(deps);
}

Value ExpressionFieldRange::evaluateInternal(const Variables& vars) const {
/* if there's no range, there can't be a match */
if (!pRange.get())
return Value(false);

/* get the value of the specified field */
Value pValue(pFieldPath->evaluateInternal(vars));

/* see if it fits within any of the ranges */
if (pRange->contains(pValue))
return Value(true);

return Value(false);
}

Value ExpressionFieldRange::serialize() const {
// serializing results in an unoptimized form that will be reoptimized at parse time

if (!pRange.get()) {
/* nothing will satisfy this predicate */
return serializeConstant(Value(false));
}

if (pRange->pTop.missing() && pRange->pBottom.missing()) {
/* any value will satisfy this predicate */
return serializeConstant(Value(true));
}

// FIXME Append constant values using the $const operator. SERVER-6769

// FIXME This checks pointer equality not value equality.
if (pRange->pTop == pRange->pBottom) {
return Value(DOC("$eq" << DOC_ARRAY(pFieldPath->serialize()
<< serializeConstant(pRange->pTop)
)));
}

Document gtDoc;
if (!pRange->pBottom.missing()) {
const StringData& op = (pRange->bottomOpen ? "$gt" : "$gte");
gtDoc = DOC(op << DOC_ARRAY(pFieldPath->serialize()
<< serializeConstant(pRange->pBottom)
));

if (pRange->pTop.missing()) {
return Value(gtDoc);
}
}

Document ltDoc;
if (!pRange->pTop.missing()) {
const StringData& op = (pRange->topOpen ? "$lt" : "$lte");
ltDoc = DOC(op << DOC_ARRAY(pFieldPath->serialize()
<< serializeConstant(pRange->pTop)
));

if (pRange->pBottom.missing()) {
return Value(ltDoc);
}
}

return Value(DOC("$and" << DOC_ARRAY(gtDoc << ltDoc)));
}

void ExpressionFieldRange::toMatcherBson(
BSONObjBuilder *pBuilder) const {
verify(pRange.get()); // otherwise, we can't do anything

/* if there are no endpoints, then every value is accepted */
if (pRange->pBottom.missing() && pRange->pTop.missing())
return; // nothing to add to the predicate

/* we're going to need the field path */
//TODO Fix for $$vars. This method isn't currently used so low-priority
string fieldPath(pFieldPath->getFieldPath().getPath(false));

BSONObjBuilder range;
if (!pRange->pBottom.missing()) {
/* the test for equality doesn't generate a subobject */
if (pRange->pBottom == pRange->pTop) {
pRange->pBottom.addToBsonObj(pBuilder, fieldPath);
return;
}

pRange->pBottom.addToBsonObj(
pBuilder, (pRange->bottomOpen ? "$gt" : "$gte"));
}

if (!pRange->pTop.missing()) {
pRange->pTop.addToBsonObj(
pBuilder, (pRange->topOpen ? "$lt" : "$lte"));
}

pBuilder->append(fieldPath, range.done());
}

intrusive_ptr<ExpressionFieldRange> ExpressionFieldRange::create(
const intrusive_ptr<ExpressionFieldPath> &pFieldPath, CmpOp cmpOp,
const Value& pValue) {
intrusive_ptr<ExpressionFieldRange> pE(
new ExpressionFieldRange(pFieldPath, cmpOp, pValue));
return pE;
}

ExpressionFieldRange::ExpressionFieldRange(
const intrusive_ptr<ExpressionFieldPath> &pTheFieldPath, CmpOp cmpOp,
const Value& pValue):
pFieldPath(pTheFieldPath),
pRange(new Range(cmpOp, pValue)) {
}

void ExpressionFieldRange::intersect(CmpOp cmpOp, const Value& pValue) {

/* create the new range */
scoped_ptr<Range> pNew(new Range(cmpOp, pValue));

/*
Go through the range list. For every range, either add the
intersection of that to the range list, or if there is none, the
original range. This has the effect of restricting overlapping
ranges, but leaving non-overlapping ones as-is.
*/
pRange.reset(pRange->intersect(pNew.get()));
}

ExpressionFieldRange::Range::Range(CmpOp cmpOp, const Value& pValue):
bottomOpen(false),
topOpen(false),
pBottom(),
pTop() {
switch(cmpOp) {

case EQ:
pBottom = pTop = pValue;
break;

case GT:
bottomOpen = true;
/* FALLTHROUGH */
case GTE:
topOpen = true;
pBottom = pValue;
break;

case LT:
topOpen = true;
/* FALLTHROUGH */
case LTE:
bottomOpen = true;
pTop = pValue;
break;

case NE:
case CMP:
verify(false); // not allowed
break;
}
}

ExpressionFieldRange::Range::Range(const Range &rRange):
bottomOpen(rRange.bottomOpen),
topOpen(rRange.topOpen),
pBottom(rRange.pBottom),
pTop(rRange.pTop) {
}

ExpressionFieldRange::Range::Range(
const Value& pTheBottom, bool theBottomOpen,
const Value& pTheTop, bool theTopOpen):
bottomOpen(theBottomOpen),
topOpen(theTopOpen),
pBottom(pTheBottom),
pTop(pTheTop) {
}

ExpressionFieldRange::Range *ExpressionFieldRange::Range::intersect(
const Range *pRange) const {
/*
Find the max of the bottom end of the ranges.
Start by assuming the maximum is from pRange. Then, if we have
values of our own, see if they're greater.
*/
Value pMaxBottom(pRange->pBottom);
bool maxBottomOpen = pRange->bottomOpen;
if (!pBottom.missing()) {
if (pRange->pBottom.missing()) {
pMaxBottom = pBottom;
maxBottomOpen = bottomOpen;
}
else {
const int cmp = Value::compare(pBottom, pRange->pBottom);
if (cmp == 0)
maxBottomOpen = bottomOpen || pRange->bottomOpen;
else if (cmp > 0) {
pMaxBottom = pBottom;
maxBottomOpen = bottomOpen;
}
}
}

/*
Find the minimum of the tops of the ranges.
Start by assuming the minimum is from pRange. Then, if we have
values of our own, see if they are less.
*/
Value pMinTop(pRange->pTop);
bool minTopOpen = pRange->topOpen;
if (!pTop.missing()) {
if (pRange->pTop.missing()) {
pMinTop = pTop;
minTopOpen = topOpen;
}
else {
const int cmp = Value::compare(pTop, pRange->pTop);
if (cmp == 0)
minTopOpen = topOpen || pRange->topOpen;
else if (cmp < 0) {
pMinTop = pTop;
minTopOpen = topOpen;
}
}
}

/*
If the intersections didn't create a disjoint set, create the
new range.
*/
if (Value::compare(pMaxBottom, pMinTop) <= 0)
return new Range(pMaxBottom, maxBottomOpen, pMinTop, minTopOpen);

/* if we got here, the intersection is empty */
return NULL;
}

bool ExpressionFieldRange::Range::contains(const Value& pValue) const {
if (!pBottom.missing()) {
const int cmp = Value::compare(pValue, pBottom);
if (cmp < 0)
return false;
if (bottomOpen && (cmp == 0))
return false;
}

if (!pTop.missing()) {
const int cmp = Value::compare(pValue, pTop);
if (cmp > 0)
return false;
if (topOpen && (cmp == 0))
return false;
}

return true;
}


/* ------------------------- ExpressionLet ----------------------------- */

Expand Down

0 comments on commit 4f3813f

Please sign in to comment.