Skip to content

Commit

Permalink
Added check for updating function, other error hanlding enhacements
Browse files Browse the repository at this point in the history
  • Loading branch information
james-jw committed Jan 19, 2016
1 parent ad476ab commit c18ec61
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 47 deletions.
99 changes: 82 additions & 17 deletions src/main/java/org/jw/basex/async/XqDeferred.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

import static org.basex.query.QueryError.*;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.basex.query.*;
import org.basex.query.ann.Annotation;
import org.basex.query.expr.*;
import org.basex.query.util.list.*;
import org.basex.query.value.*;
Expand Down Expand Up @@ -55,6 +59,7 @@ public class XqDeferred extends FItem implements XQFunction {
*/
public XqDeferred(final FItem work, final Value args, final Map callbacksIn) throws QueryException {
super(SeqType.ANY_FUN, new AnnList());
XqPromise.ensureNotUpdatingFunction(work);
_work = new FItem[] { work };
_arguments = args;
addCallbacks(callbacksIn);
Expand All @@ -66,15 +71,11 @@ public XqDeferred(final Value deferreds, final Map callbacksIn) throws QueryExce
_work = new FItem[(int) deferreds.size()];
int i = 0;
for(Item item : deferreds) {
XqPromise.ensureNotUpdatingFunction((FItem) item);
_work[i++] = (FItem) item;
}
}

public XqDeferred(final FItem[] deferreds) {
super(SeqType.ANY_FUN, new AnnList());
_work = deferreds;
}

public XqDeferred(final List<Future<Value>> futures) {
super(SeqType.ANY_FUN, new AnnList());
_futures = futures;
Expand All @@ -98,6 +99,10 @@ public void addCallbacks(Map callbacksIn) throws QueryException {

public void addCallbacks(String name, Value... callbacksIn) throws QueryException {
if(callbacksIn == null) { return; }
if(_futures != null) {
throw new QueryException("Deferred busy due to an earlier call to 'fork'. Queue the callbacks before forking the work.");
}

if(name.matches("^(done|fail|always|then)$") == false) {
throw new QueryException("Invalid callback name provided: " + name);
}
Expand All @@ -113,6 +118,8 @@ public void addCallbacks(String name, Value... callbacksIn) throws QueryExceptio

for(Value callback : callbacksIn) {
if(callback instanceof FItem) {
XqPromise.ensureNotUpdatingFunction((FItem)callback);

existing.add((FItem)callback);
} else if(callback.size() > 0) {
addCallbacks(name, valueToArray(callback));
Expand Down Expand Up @@ -165,21 +172,49 @@ private Value processInvocation(QueryContext qc, InputInfo ii, Value... args) th
}
} catch (QueryException e) {
failed = true;
String msg = e.getMessage();
notifyCallbacks(getCallbacks(XqPromise.always, ii), qc, ii, Str.get(msg == null ? "" : msg));
out = notifyCallbacks(getCallbacks(XqPromise.fail, ii), qc, ii, args);
Value err = this.mapError(e, qc, ii, args);
notifyCallbacks(getCallbacks(XqPromise.always, ii), qc, ii, err);
out = notifyFailCallback(err, e, qc, ii, args);
} catch (Throwable e) {
try {
File writer = new File("C:\\xq-promise-deferred.log");
PrintStream ps = new PrintStream(writer);
e.printStackTrace(ps);
} catch (IOException e1) {
e1.printStackTrace();
}

throw e;
}

if(!failed) {
out = processThen(out, qc, ii);
out = notifyThenCallback(out, qc, ii);
notifyCallbacks(getCallbacks(XqPromise.done, ii), qc, ii, out);
notifyCallbacks(getCallbacks(XqPromise.always, ii), qc, ii, out);
}

return out;
}

private Value processNormalInvocation(QueryContext qc, InputInfo ii, Value... args) throws QueryException {

private Value mapError(QueryException e, QueryContext qc, InputInfo ii, Value... args) throws QueryException {
ValueBuilder eb = new ValueBuilder();

eb.add(Str.get(e.error() == null ? e.qname().toString(): e.error().code));
eb.add(Str.get(e.getLocalizedMessage()));
eb.add(Str.get(e.file()));
eb.add(Int.get(e.line()));
eb.add(Int.get(e.column()));
eb.add(Array.from(this._work));
eb.add(Array.from(args));

if (e.value() != null) {
eb.add(e.value());
}

return XqPromise.get_errorMapFunction().invokeValue(qc, ii, eb.value());
}

private Value processNormalInvocation(QueryContext qc, InputInfo ii, Value... args) throws QueryException {
ValueBuilder vb = new ValueBuilder();
for(FItem work : _work) {
vb.add(invokeFunctionItem(work, qc, ii, args));
Expand Down Expand Up @@ -243,7 +278,7 @@ private Value invokeFunctionItem(FItem funcItem, QueryContext qc, InputInfo ii,
* @throws QueryException
*/
@SuppressWarnings("javadoc")
private Value processThen(Value in, QueryContext qc, InputInfo ii) throws QueryException {
private Value notifyThenCallback(Value in, QueryContext qc, InputInfo ii) throws QueryException {
List<FItem> handlers = getCallbacks(XqPromise.then, ii);
Value lastResult = in;
if(handlers != null) {
Expand All @@ -253,6 +288,39 @@ private Value processThen(Value in, QueryContext qc, InputInfo ii) throws QueryE
}
return lastResult;
}

/**
* Calls the first error handler, passing its result to the pipeline.
* If the error handler throws an exception and a second error handler exists,
* that handler will be called with the error.
*
* This continues until either a value or empty sequence is returned or an error is thrown.
* If an error is thrown. The entire process is stopped.
*/
@SuppressWarnings("javadoc")
private Value notifyFailCallback(Value err, QueryException rawError, QueryContext qc, InputInfo ii, Value... args) throws QueryException {
List<FItem> handlers = getCallbacks(XqPromise.fail, ii);
if(handlers != null && handlers.size() > 0) {
Value lastResult = err;
for(final FItem item : handlers) {
try {
lastResult = invokeFunctionItem(item, qc, ii, lastResult);
break;
} catch (QueryException ex) {
if(handlers.get(handlers.size() - 1) == item) {
// Error was thrown from the last error handler, oh well
throw ex;
}

lastResult = mapError(ex, qc, ii, lastResult);
}
}

return lastResult;
} else {
throw new QueryException(rawError);
}
}

/**
* @param handlers - Callbacks to notify
Expand Down Expand Up @@ -311,8 +379,7 @@ public FItem coerceTo(FuncType ft, QueryContext qc, InputInfo ii, boolean opt)

@Override
public Object toJava() throws QueryException {
// TODO Auto-generated method stub
return null;
return this;
}

@Override
Expand All @@ -321,8 +388,6 @@ public String toString() {
}

@Override
public void plan(FElem root) {
// TODO
}
public void plan(FElem root) { }

}
37 changes: 23 additions & 14 deletions src/main/java/org/jw/basex/async/XqForkJoinTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,20 @@ public class XqForkJoinTask<T extends Value> extends RecursiveTask<Value>impleme
* @param qc - QueryContext
* @param ii - Input Information
* @param value - Arguments.
* @throws QueryException
*/
public XqForkJoinTask(Value deferreds, int computeSize, long start, long end, QueryContext qcIn, InputInfo iiIn, Value... value) {
public XqForkJoinTask(Value deferreds, int computeSize, long start, long end, QueryContext qcIn, InputInfo iiIn, Value... value) throws QueryException {
work = deferreds;

for(Value deferred : deferreds) {
if(deferred instanceof FItem && !(deferred instanceof XqDeferred)) {
XqPromise.ensureNotUpdatingFunction((FItem) deferred);
if (((FItem)deferred).arity() != 0) {
throw new QueryException("Invalid input: fork-join can only accept deferred objects, or zero arity functions.");
}
}
}

qc = qcIn;
ii = iiIn;
_start = start;
Expand All @@ -50,7 +61,7 @@ public XqForkJoinTask(Value deferreds, int computeSize, long start, long end, Qu

}

public XqForkJoinTask(Value deferreds, long start, long end, QueryContext qcIn, InputInfo iiIn, Value... args) {
public XqForkJoinTask(Value deferreds, long start, long end, QueryContext qcIn, InputInfo iiIn, Value... args) throws QueryException {
this(deferreds, 2, start, end, qcIn, iiIn, args);
}

Expand All @@ -69,11 +80,7 @@ protected Value compute() {
try {
for (long i = _start, j = 0; i < _end && j < _computeSize; i++, j++) {
FItem deferred = (FItem) work.itemAt(i);
if (deferred.arity() == 0) {
vb.add(deferred.invokeValue(qc, ii));
} else {
throw new QueryException("Invalid input: fork-join can only accept deferred objects or functions with an arity of 0.");
}
vb.add(deferred.invokeValue(qc, ii));
}
} catch (QueryException ex) {
this.completeExceptionally(ex);
Expand All @@ -82,13 +89,15 @@ protected Value compute() {
} else {
// Split the work
long split = length / 2;
XqForkJoinTask<Value> second = new XqForkJoinTask<Value>(work, _start + split, _end, new QueryContext(qc), ii, Empty.SEQ);

_end = _start + split;

second.fork();
vb.add(this.compute());
vb.add((Value)second.join());
XqForkJoinTask<Value> second;
try {
second = new XqForkJoinTask<Value>(work, _start + split, _end, new QueryContext(qc), ii, Empty.SEQ);
_end = _start + split;

second.fork();
vb.add(this.compute());
vb.add((Value)second.join());
} catch (QueryException e) {}
}

return vb.value();
Expand Down
23 changes: 17 additions & 6 deletions src/main/java/org/jw/basex/async/XqPromise.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.util.logging.Logger;

import org.basex.query.*;
import org.basex.query.ann.Annotation;
import org.basex.query.util.list.AnnList;
import org.basex.query.value.*;
import org.basex.query.value.item.*;
import org.basex.query.value.map.*;
Expand Down Expand Up @@ -160,8 +162,9 @@ public Value fail(final Value deferred, final Value callbacks) throws QueryExcep
* Forks a piece of work or an unexecuted promise.
* @param - Work or promise chaine to execute
* @return - A promise to retrieve the result from, if required.
* @throws QueryException
*/
public Value fork(final Value promises) {
public Value fork(final Value promises) throws QueryException {
List<Future<Value>> out = new ArrayList<Future<Value>>((int) promises.size());

if (executor.isShutdown() || executor.isTerminated()) {
Expand Down Expand Up @@ -244,11 +247,11 @@ private Throwable findQueryException(Throwable e) {
* @throws QueryException
*/
public Value forkJoin(final Value deferreds, Int workSplit, Int threadsIn) throws QueryException {
ForkJoinPool customPool = new ForkJoinPool(Integer.parseInt(threadsIn + ""));
XqForkJoinTask<Value> task = new XqForkJoinTask<Value>(deferreds, Integer.parseInt(workSplit.toString() + ""), 0l, deferreds.size(), new QueryContext(queryContext), null);
Value out = customPool.invoke(task);
customPool.shutdown();
return out;
ForkJoinPool customPool = new ForkJoinPool(Integer.parseInt(threadsIn + ""));
XqForkJoinTask<Value> task = new XqForkJoinTask<Value>(deferreds, Integer.parseInt(workSplit.toString() + ""), 0l, deferreds.size(), new QueryContext(queryContext), null);
Value out = customPool.invoke(task);
customPool.shutdown();
return out;
}

@Override
Expand All @@ -262,6 +265,14 @@ public void close() {

pool.shutdown();
}

public static void ensureNotUpdatingFunction(FItem item) throws QueryException {
FItem cb = (FItem) item;
AnnList anns = cb.annotations();
if(anns != null && anns.contains(Annotation.UPDATING)) {
throw new QueryException("Error: Updating expressions are not allowed in 'xq-promise' callbacks.");
}
}

public static FItem get_errorMapFunction() {
return _errorMapFunction;
Expand Down
40 changes: 40 additions & 0 deletions src/main/xquery/xq-promise.xqm
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ declare function p:when($deferreds as function(*)*) as function (*) {
promise:when($deferreds)
};

(:~
: Combines a set of promises into a single promise and applies a set of callbacks
:)
declare function p:when($deferreds as function(*)*, $callbacks as map(*)) {
promise:when($deferreds, $callbacks)
};

(:~
: Adds the callbacks provided to the appropriate chains on the provided deferreds
:)
Expand All @@ -94,6 +101,14 @@ declare function p:fork($work as function(*)) as function(*) {
promise:fork($work)
};

(:~
: Forks the provided functions or deferred work in a new thread. Returns a sealed promise
: which can no long accept callbacks.
:)
declare function p:fork($work as function(*), $arguments as item()*) as function(*) {
promise:fork($work, $arguments)
};

(:~
: Forks the provided functions or deferred work in a fork join fashion, returning the results once all
: forked computation is complete.
Expand All @@ -102,3 +117,28 @@ declare function p:fork-join($work as function(*)*) as item()* {
promise:fork-join($work)
};

(:~
: Forks the provided functions or deferred work in a fork join fashion, returning the results once all
: forked computation is complete.
:)
declare function p:fork-join($work as function(*)*, $compute-size as xs:integer) as item()* {
promise:fork-join($work, $compute-size)
};

(:~
: Forks the provided functions or deferred work in a fork join fashion, returning the results once all
: forked computation is complete.
:)
declare function p:fork-join($work as function(*)*, $compute-size as xs:integer, $max-forks as xs:integer) as item()* {
promise:fork-join($work, $compute-size, $max-forks)
};

(:~
: Denotes is a function is a deferred promise
:)
declare function p:is-promise($function as function(*)) as xs:boolean {
promise:is-promise($function)
};



Loading

0 comments on commit c18ec61

Please sign in to comment.