Skip to content

Commit

Permalink
Merge pull request hpcc-systems#2780 from jakesmith/rollupthor-2390
Browse files Browse the repository at this point in the history
gh-2390 rollup matching transform result, not left

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
  • Loading branch information
richardkchapman committed Jul 12, 2012
2 parents 4fb9f4d + 5cedae3 commit a21b40c
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 26 deletions.
3 changes: 3 additions & 0 deletions testing/ecl/key/rollup1.xml
Expand Up @@ -10,3 +10,6 @@
<Row><r>3</r></Row>
<Row><r>9</r></Row>
</Dataset>
<Dataset name='Result 3'>
<Row><r>1</r><res>112389</res></Row>
</Dataset>
16 changes: 14 additions & 2 deletions testing/ecl/rollup1.ecl
@@ -1,8 +1,20 @@
import std.system.thorlib;

d := dataset([{1},{1},{2},{3},{4},{8},{9}], { unsigned r; });

d t(d L, d r) := transform
SELF.r := IF (L.r=3,SKIP,L.r + 1);
SELF.r := IF (L.r=3,SKIP,L.r + 1);
END;

rollup(d, t(LEFT, RIGHT), LEFT.r = RIGHT.r);
rollup(d, t(LEFT, RIGHT), LEFT.r = RIGHT.r, LOCAL);
rollup(d, t(LEFT, RIGHT), LEFT.r >= RIGHT.r-1, LOCAL);

// spread across nodes and perform global rollup to string
distd := DISTRIBUTE(d, r / (thorlib.nodes()+1));
p := PROJECT(distd, TRANSFORM({ d, string res:=''; }, SELF := LEFT));
p t2(p l, p r) := transform
SELF.res := IF(L.res='',(string)l.r+(string)r.r,(string)l.res + (string)r.r);
SELF.r := IF(R.r=4, SKIP, L.r);
SELF := l;
END;
rollup(p, true, t2(LEFT, RIGHT));
70 changes: 46 additions & 24 deletions thorlcr/activities/rollup/throllupslave.cpp
Expand Up @@ -156,6 +156,7 @@ class CDedupRollupBaseActivity : public CSlaveActivity, implements IStopInput
bool global;
bool groupOp;
OwnedConstThorRow kept;
OwnedConstThorRow keptTransformed; // only used by rollup
Owned<IThorDataLink> input;
bool needFirstRow;

Expand Down Expand Up @@ -221,19 +222,26 @@ class CDedupRollupBaseActivity : public CSlaveActivity, implements IStopInput
if (!receiveMsg(msg, container.queryJob().queryMyRank()-1, mpTag)) // from previous node
return;
msg.read(numKept);
bool isKept;
msg.read(isKept);
if (isKept)
size32_t rowLen;
msg.read(rowLen);
if (rowLen)
kept.deserialize(rowif, rowLen, msg.readDirect(rowLen));
if (rollup)
{
size32_t r = msg.remaining();
kept.deserialize(rowif, r, msg.readDirect(r));
if (kept)
return;
msg.read(rowLen);
if (rowLen)
keptTransformed.deserialize(rowif, rowLen, msg.readDirect(rowLen));
else
keptTransformed.set(kept);
}
if (kept)
return;
}
kept.setown(input->nextRow());
if (!kept && global)
putNextKept(); // pass on now
if (rollup)
keptTransformed.set(kept);
}
}
bool putNextKept()
Expand All @@ -244,12 +252,29 @@ class CDedupRollupBaseActivity : public CSlaveActivity, implements IStopInput
return false;
CMessageBuffer msg;
msg.append(numKept);
msg.append(NULL != kept.get());
unsigned msgPos = msg.length();
msg.append((size32_t)0);
if (kept.get())
{
CMemoryRowSerializer msz(msg);
rowif->queryRowSerializer()->serialize(msz,(const byte *)kept.get());
size32_t sz = msg.length()-(msgPos+sizeof(sz));
msg.writeDirect(msgPos, sizeof(sz), &sz);
if (rollup)
{
msgPos = msg.length();
msg.append((size32_t)0);
if (kept.get()!=keptTransformed.get())
{
sz = msg.length();
rowif->queryRowSerializer()->serialize(msz,(const byte *)keptTransformed.get());
sz = msg.length()-(msgPos+sizeof(sz));
msg.writeDirect(msgPos, sizeof(sz), &sz);
}
}
}
else if (rollup)
msg.append((size32_t)0);
container.queryJob().queryJobComm().send(msg, container.queryJob().queryMyRank()+1, mpTag); // to next node
return true;
}
Expand Down Expand Up @@ -415,18 +440,12 @@ class CRollupSlaveActivity : public CDedupRollupBaseActivity, public CThorDataLi
: CDedupRollupBaseActivity(_container, true, global, groupOp), CThorDataLink(this)
{
}

~CRollupSlaveActivity()
{
}

void init(MemoryBuffer &data, MemoryBuffer &slaveData)
{
CDedupRollupBaseActivity::init(data, slaveData);
appendOutputLinked(this); // adding 'me' to outputs array
ruhelper = static_cast <IHThorRollupArg *> (queryHelper());
}

inline bool eog()
{
if (abortSoon)
Expand All @@ -443,20 +462,17 @@ class CRollupSlaveActivity : public CDedupRollupBaseActivity, public CThorDataLi
return true;
return false;
}

virtual void start()
{
ActivityTimer s(totalCycles, timeActivities, NULL);
CDedupRollupBaseActivity::start();
dataLinkStart(id, container.queryId());
}

virtual void stop()
{
CDedupRollupBaseActivity::stop();
dataLinkStop();
}

CATCH_NEXTROW()
{
ActivityTimer t(totalCycles, timeActivities, NULL);
Expand All @@ -465,12 +481,15 @@ class CRollupSlaveActivity : public CDedupRollupBaseActivity, public CThorDataLi
if (eog())
return NULL;
OwnedConstThorRow next;
loop {
loop
{
next.setown(input->nextRow());
if (!next) {
if (!next)
{
if (!groupOp)
next.setown(input->nextRow());
if (!next) {
if (!next)
{
if (global&&putNextKept()) // send kept to next node
return NULL;
break;
Expand All @@ -481,13 +500,16 @@ class CRollupSlaveActivity : public CDedupRollupBaseActivity, public CThorDataLi
if (!ruhelper->matches(kept, next))
break;
RtlDynamicRowBuilder ret(queryRowAllocator());
unsigned thisSize = ruhelper->transform(ret, kept, next);
unsigned thisSize = ruhelper->transform(ret, keptTransformed, next);
kept.setown(next.getClear());
if (thisSize)
kept.setown(ret.finalizeRowClear(thisSize));
keptTransformed.setown(ret.finalizeRowClear(thisSize));
}
OwnedConstThorRow row = kept.getClear();
OwnedConstThorRow row = keptTransformed.getClear();
kept.setown(next.getClear());
if (row) {
keptTransformed.set(kept);
if (row)
{
dataLinkIncrement();
return row.getClear();
}
Expand Down

0 comments on commit a21b40c

Please sign in to comment.