Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flexible partitioning #10

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions conf/socialite-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
export SOCIALITE_BASE_PORT=50100

# The heap size (in MB) for a worker.
export SOCIALITE_HEAPSIZE=8000
export SOCIALITE_HEAPSIZE=4000

# The number of worker nodes.
export SOCIALITE_WORKER_NUM=2

# The number of threads on a single worker.
export SOCIALITE_WORKER_THREAD_NUM=4
export SOCIALITE_WORKER_THREAD_NUM=2
4 changes: 3 additions & 1 deletion grammar/SociaLite.g
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ tokens {
SORT_BY;
ORDER_BY;
INDEX_BY;
SHARD_BY;
GROUP_BY;
RANGE;
BIT;
Expand Down Expand Up @@ -134,10 +135,11 @@ table_decl: ID '(' decls ')' table_opts? DOT_END -> ^(DECL ID decls ^(OPTION ta
;
table_opts: t_opt (','! t_opt)*
;
t_opt : 'sortby' col=ID (order=SORT_ORDER)? -> ^(SORT_BY $col $order?)
t_opt :'sortby' col=ID (order=SORT_ORDER)? -> ^(SORT_BY $col $order?)
|'orderby' ID -> ^(ORDER_BY ID)
|'indexby' ID -> ^(INDEX_BY ID)
|'groupby' '(' INT ')' -> ^(GROUP_BY INT)
|'shardby' ID -> ^(SHARD_BY ID)
|'predefined' -> PREDEFINED
|'concurrent' -> CONCURRENT
|'multiset' -> MULTISET
Expand Down
1 change: 1 addition & 0 deletions grammar/SociaLiteRule.g
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ t_opt returns [TableOpt result]
|^(ORDER_BY ID) {$result = new OrderBy($ID.text);}
|^(INDEX_BY ID) {$result = new IndexBy($ID.text);}
|^(GROUP_BY INT) { $result = new GroupBy(Integer.parseInt($INT.text));}
|^(SHARD_BY ID) {$result = new ShardBy($ID.text);}
| PREDEFINED {$result = new Predefined(); }
| CONCURRENT {$result = new Concurrent(); }
| MULTISET {$result = new MultiSet();}
Expand Down
80 changes: 36 additions & 44 deletions src/java/socialite/codegen/Analysis.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ public Analysis(Parser _p) {
p = _p;
}

public Rule getCanonicalRule(Rule r) { return p.getCanonRule(r); }
public Object monitor() { return p.monitor(); }

public Query getQuery() { return query; }
public List<Epoch> getEpochs() { return epochs; }
public List<Rule> getRules() { return rules; }
Expand Down Expand Up @@ -237,15 +234,47 @@ public void run() {
computeRuleDeps();
makeEpochs();

optimizeLock();
createDeltaRules();

privatize();
processRemoteRules();

prepareEpochs();
processDropTableStmt();
}

void optimizeLock() {
for (Rule r: rules) {
selectPartitionTable(r);
}
}
void selectPartitionTable(Rule r) {
Table headT = tableMap.get(r.getHead().name());
Param param = r.getHead().inputParams()[headT.getPartitionColumn()];
if (r.getBodyP().size()==0) {
r.setAsyncEval();
return;
}
Literal l = r.getBody().get(0);
if (l instanceof Predicate) {
Predicate p = (Predicate)l;
Table t = tableMap.get(p.name());
r.setPartitionTable(t, p);
if (p.first().equals(param)) {
r.setAsyncEval();
return;
}
}
for (Predicate p:r.getBodyP()) {
Table t = tableMap.get(p.name());
int col = t.getPartitionColumn();
if (p.inputParams()[col].equals(param)) {
r.setAsyncEval();
r.setPartitionTable(t, p);
return;
}
}
}

void processDropTableStmt() {
for (TableStmt s:tableStmts) {
if (s instanceof DropTable) {
Expand Down Expand Up @@ -299,19 +328,6 @@ void setGroupby() {
for (Rule r : rules) {
setGroupby(r);
}
/*for (Table t:newTables) {
if (!t.hasGroupby()) {
if (t.getColumn(t.numColumns()-1).isIndexed()) {
continue;
}
int groupby = t.getColumns().length-1;
if (t.nestingBegins(groupby)) continue;
try { t.setGroupByColNum(groupby); }
catch (InternalException e) {
Assert.impossible();
}
}
}*/
}

void setGroupby(Rule r) {
Expand Down Expand Up @@ -339,24 +355,6 @@ void setGroupby(Rule r, Table t, int col) {
}
}

boolean isLeftRec(Rule r) {
List<Rule> rulesUsingThis = r.getRulesUsingThis();
if (!rulesUsingThis.contains(r)) return false;

Predicate h = r.getHead();
int count = 0;
for (Predicate p : r.getBodyP()) {
if (p.isNegated()) continue;
if (h.name().equals(p.name())) {
count++;
}
}
Predicate f = r.firstP();
if (count == 1 && canMatch(h, f))
return true;
return false;
}

public static boolean canMatch(Predicate p1, Predicate p2,
Map<String, Table> map) {
if (!p1.name().equals(p2.name())) return false;
Expand Down Expand Up @@ -459,13 +457,6 @@ public static boolean isSequentialRule(Rule r, Map<String, Table> tableMap) {
}
}

// XXX: re-implement privatization as rule rewriting, so that JoinerCodeGen does not need to know the details.
// e.g. Triangle(0, $inc(1)) :- Edge(a,b), Edge(b,c), Edge(c,a).
// => _Thread_Local_Triangle(worker, $inc(1)) :- Edge(a,b), Edge(b,c), Edge(c,a), worker=$workerId().
void privatize() {
if (true) return;
}

@SuppressWarnings("rawtypes")
List copyParams(List params) {
List<Object> tmp = new ArrayList<Object>();
Expand Down Expand Up @@ -1664,8 +1655,9 @@ public static boolean isResolved(Rule rule, Predicate p, Variable v) {
return resolved[p.getPos()].contains(v);
}
public static boolean isResolved(Set<Variable>[] resolvedVarsArray, Predicate p, Object param) {
if (param instanceof Variable)
if (param instanceof Variable) {
return isResolved(resolvedVarsArray, p, (Variable)param);
}
assert !(param instanceof Function);
return true;
}
Expand Down
46 changes: 25 additions & 21 deletions src/java/socialite/codegen/ArrayNestedTable.stg
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,22 @@ package socialite.tables;
}
}
}
void wlock(<first(columns).type> a) {
void markFilled(int i) {
if (lockStatus == LockStatus.disabled) {
filled.set(i, (byte)1);
} else {
// wunlock() will mark the slot.
}
}
void wlock(int i) {
if (lockStatus == LockStatus.disabled) { return; }

_lock((int)(a-base), writeMarker);
_lock(i, writeMarker);
}
void wunlock(<first(columns).type> a) {
void wunlock(int i) {
if (lockStatus == LockStatus.disabled) { return; }

if (filled.get((int)(a-base)) != 1) {
filled.set((int)(a-base), (byte)1);
}
filled.set(i, (byte)1);
}

public boolean isEntryEmpty(int i) {
Expand Down Expand Up @@ -199,9 +204,7 @@ package socialite.tables;
int i=(int)(n-base); rangeCheck(i);
if (isEntryEmpty(i)) { return false; }

if (true<rest(columns): {c|&&(col<i>(i)<EqOpMap.(c.type)>(a<i>))}>) {
return true;
} else { return false; }
return true<rest(columns): {c|&&(col<i>(i)<EqOpMap.(c.type)>(a<i>))}>;
}
public boolean contains(int n<rest(columns):{c|, <c.type> a<i>}><nestedColumns:{c|, <c.type> b<i>}>) {
boolean contains_prefix=contains_prefix(n<rest(columns):{c|, a<i>}>);
Expand All @@ -212,11 +215,11 @@ package socialite.tables;
}
public boolean insert(<columns:{c|<c.type> a<i0>};separator=","><nestedColumns:{c|, <c.type> b<i>}>) {
int i=(int)(a0-base); rangeCheck(i);
wlock(a0);
wlock(i);
try {
return insert_really(<columns:{c|a<i0>};separator=","><nestedColumns:{c|,b<i>}>);
} finally {
wunlock(a0);
wunlock(i);
}
}
boolean insert_really(<columns:{c|<c.type> a<i0>};separator=","><nestedColumns:{c|, <c.type> b<i>}>) {
Expand All @@ -241,7 +244,7 @@ package socialite.tables;
if (nestedTables[i]==null) { nested = nestedTables[i] = <nestedTable>.create(); }
nested.insert(<nestedColumns:{c|b<i>};separator=", ">);
<idxCols:{ic|index<ic.relPos>.add(a<ic.relPos>, i);<\n>}>
filled.compareAndSet(i, (byte)0, (byte)1);
markFilled(i);
return true;
}

Expand Down Expand Up @@ -337,15 +340,15 @@ groupbyArrayNested() ::= <<
int i=(int)(a0-base); rangeCheck(i);

boolean updated = false;
wlock(a0);
wlock(i);
if (isEntryEmpty(i)) {
updated = insert_really(<columns:{c|a<i0>};separator=","><nestedColumns:{c|,b<i0>}>);
wunlock(a0);
wunlock(i);
} else if (true<rest(gbColumns):{c|&&(col<i>[i]<EqOpMap.(c.type)>(a<i>))}>) {
wunlock(a0);
wunlock(i);
updated = nestedTables[i].groupby_update(<nestedColumns:{c|b<i0>,}><gbAggrColumns:{c|f<i0>};separator=",">);
} else {
wunlock(a0);
wunlock(i);
throw new AssertionError("Unexpected groupby operation");
}
return updated;
Expand All @@ -355,9 +358,9 @@ groupbyArrayNested() ::= <<
<gbAggrColumns:{c|AbstractAggregation f<i0>};separator=",">) {
int i=(int)(a0-base); rangeCheck(i);

boolean updated = false;
boolean updatedAcc = false;
boolean equals = false;
wlock(a0);
wlock(i);
try {
if (isEntryEmpty(i)) {
return insert_really(<columns:{c|a<i0>};separator=","><nestedColumns:{c|,b<i0>}>);
Expand All @@ -368,15 +371,16 @@ groupbyArrayNested() ::= <<
<c.type> ans<i0> = f<i0>.apply(oldVal<i0>, a<c.relPos>);

equals &= a<c.relPos><EqOpMap.(c.type)>(oldVal<i0>);
updated |= !(ans<i0><EqOpMap.(c.type)>(oldVal<i0>));
boolean updated = !(ans<i0><EqOpMap.(c.type)>(oldVal<i0>));
if (updated) {
a<c.relPos> = ans<i0>;
\} else {
a<c.relPos> = oldVal<i0>;
\}
updatedAcc |= updated;
\}
}>
if (updated) {
if (updatedAcc) {
<rest(columns): {c|setCol<i>(i, a<i>);<\n>}>;
nestedTables[i].clear();
nestedTables[i].insert(offset<nestedColumns:{c|, b<i0>}>);
Expand All @@ -390,7 +394,7 @@ groupbyArrayNested() ::= <<
} else {
throw new AssertionError("Unexpected groupby operation");
}
} finally { wunlock(a0); }
} finally { wunlock(i); }
}
<endif> // // ] gbNestedColumns
>>
34 changes: 20 additions & 14 deletions src/java/socialite/codegen/ArrayTable.stg
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,22 @@ package socialite.tables;
}
}

void wlock(<first(columns).type> a) {
void markFilled(int i) {
if (lockStatus == LockStatus.disabled) {
filled.set(i, (byte)1);
} else {
// wunlock() will mark the slot.
}
}
void wlock(int i) {
if (lockStatus == LockStatus.disabled) { return; }

_lock((int)(a-base), writeMarker);
_lock(i, writeMarker);
}
void wunlock(<first(columns).type> a) {
void wunlock(int i) {
if (lockStatus == LockStatus.disabled) { return; }

if (filled.get((int)(a-base)) != 1) {
filled.set((int)(a-base), (byte)1);
}
filled.set(i, (byte)1);
}

public boolean isEntryEmpty(int i) {
Expand Down Expand Up @@ -219,11 +224,11 @@ package socialite.tables;
}

public boolean insert(<first(columns).type> a0<rest(columns): {c|, <c.type> a<i>}>) {
wlock(a0);
wlock((int)(a0-base));
try {
return insert_really(a0<rest(columns): {c|, a<i>}>);
} finally {
wunlock(a0);
wunlock((int)(a0-base));
}
}
boolean insert_really(<first(columns).type> a0<rest(columns): {c|, <c.type> a<i>}>) {
Expand All @@ -239,7 +244,7 @@ package socialite.tables;

<rest(columns):{c|setCol<i>(i, a<i>);<\n>}>
<idxCols:{ic|index<ic.relPos>.add(a<ic.relPos>, i);<\n>}>
filled.set(i, (byte)1);
markFilled(i);
return true;
}

Expand Down Expand Up @@ -310,8 +315,8 @@ groupbyArray() ::= <<
public boolean groupby_update(<columns:{c|<c.type> a<i0>,}><gbAggrColumns:{c|AbstractAggregation f<i0>};separator=",">) {
int i=(int)(a0-base); rangeCheck(i);

boolean updated = false;
wlock(a0);
boolean updatedAcc = false;
wlock(i);
try {
if (isEntryEmpty(i)) {
return insert_really(<columns:{c|a<i0>}; separator=", ">);
Expand All @@ -320,15 +325,16 @@ groupbyArray() ::= <<
if (f<i0>!=null) {
<c.type> oldVal<i0> = col<c.relPos>(i);
<c.type> ans<i0> = f<i0>.apply(oldVal<i0>, a<c.relPos>);
updated |= !(ans<i0><EqOpMap.(c.type)>(oldVal<i0>));
boolean updated = !(ans<i0><EqOpMap.(c.type)>(oldVal<i0>));
if (updated) {
a<c.relPos> = ans<i0>;
\} else {
a<c.relPos> = oldVal<i0>;
\}
updatedAcc |= updated;
\}
}>
if (updated) {
if (updatedAcc) {
<rest(columns): {c|setCol<i>(i, a<i>);<\n>}>;
return true;
} else {
Expand All @@ -337,6 +343,6 @@ groupbyArray() ::= <<
} else {
throw new AssertionError("Unexpected groupby operation");
}
} finally { wunlock(a0); }
} finally { wunlock(i); }
}
>>
5 changes: 0 additions & 5 deletions src/java/socialite/codegen/CodeGenMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,6 @@ public void generateQuery(QueryVisitor qv) {
queryVisitor = qv;
queryClass = queryClass$.get(query.getP(), tableMap);
if (queryClass!=null) return;
Table queryT = tableMap.get(query.getP().name());
if (!queryT.isCompiled()) {
queryClass=null;
return;
}

QueryCodeGen qgen = new QueryCodeGen(query, tableMap, qv);
Compiler c = new Compiler();
Expand Down