Skip to content

Commit

Permalink
Convert Exceptions to Throwables
Browse files Browse the repository at this point in the history
Throwables are required because we cannot let anything get past us. In single
threaded applications you only have to deal with Exceptions. Any error will
simply propagate upwards and terminate the application. In multi-threaded
applications you cannot have threads die willy-nilly, instead you want the owner
(ultimately the main thread) to know it errored.
and rethrow.

If we only do it for Exceptions, any triggered assert will kill the thread while
the rest of the application hangs waiting until the thread is done.

That is why we catch throwables, move them to the owner, and rethrow there.
  • Loading branch information
skoppe committed Oct 19, 2021
1 parent 6b83214 commit 6ba3c1b
Show file tree
Hide file tree
Showing 27 changed files with 179 additions and 106 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ A Sender is a lazy Task (in the general sense of the word). It needs to be conne

It can be used to model many asynchronous operations: Futures, Fiber, Coroutines, Threads, etc. It enforces structured concurrency because a Sender cannot start without it being awaited on.

`setValue` is the only one allowed to throw exceptions, and if it does, `setError` is called with the Exception. `setDone` is called when the operation has been cancelled.
`setValue` is the only one allowed to throw exceptions, and if it does, `setError` is called with the Throwable. `setDone` is called when the operation has been cancelled.

See http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/p0443r14.html for the C++ proposal for introducing Senders/Receivers.

Expand Down
55 changes: 55 additions & 0 deletions source/concurrency/error.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
module concurrency.error;

// In order to handle Errors that get thrown on non-main threads we have to make a clone. There are cases where the Error is constructed in TLS, which either might be overwritten with another Error (if the thread continues work and hits another Error), or might point to invalid memory once the thread is cleaned up. In order to be safe we have to clone the Error.

class ThrowableClone(T : Throwable) : T {
this(Args...)(Throwable.TraceInfo info, Args args) @safe nothrow {
super(args);
if (info)
this.info = new ClonedTraceInfo(info);
}
}

// The reason it accepts a Throwable is because there might be classes that derive directly from Throwable but aren't Exceptions. We treat them as errors here.
Throwable clone(Throwable t) nothrow @safe {
import core.exception;
if (auto a = cast(AssertError)t)
return new ThrowableClone!AssertError(t.info, a.msg, a.file, a.line, a.next);
if (auto r = cast(RangeError)t)
return new ThrowableClone!RangeError(t.info, r.file, r.line, r.next);
if (auto e = cast(Error)t)
return new ThrowableClone!Error(t.info, t.msg, t.file, t.line, t.next);
return new ThrowableClone!Throwable(t.info, t.msg, t.file, t.line, t.next);
}

class ClonedTraceInfo : Throwable.TraceInfo {
string[] buf;
this(Throwable.TraceInfo t) @trusted nothrow {
if (t) {
try {
foreach (i, line; t)
buf ~= line.idup();
} catch (Throwable t) {
// alas...
}
}
}
override int opApply(scope int delegate(ref const(char[])) dg) const {
return opApply((ref size_t, ref const(char[]) buf) => dg(buf));
}

override int opApply(scope int delegate(ref size_t, ref const(char[])) dg) const {
foreach(i, line; buf) {
if (dg(i,line))
return 1;
}
return 0;
}

override string toString() const {
string buf;
foreach ( i, line; this )
buf ~= i ? "\n" ~ line : line;
return buf;
}
}
20 changes: 10 additions & 10 deletions source/concurrency/nursery.d
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Nursery : StopSource {
Mutex mutex;
shared size_t busy = 0;
shared size_t counter = 0;
Exception exception; // first exception from sender, if any
Throwable throwable; // first throwable from sender, if any
ReceiverObject receiver;
StopCallback stopCallback;
Nursery assumeThreadSafe() @trusted shared nothrow {
Expand All @@ -53,9 +53,9 @@ class Nursery : StopSource {
return (cast()receiver).getScheduler();
}

private void setError(Exception e, size_t id) nothrow @safe shared {
private void setError(Throwable e, size_t id) nothrow @safe shared {
import core.atomic : cas;
with(assumeThreadSafe) cas(&exception, cast(Exception)null, e); // store exception if not already
with(assumeThreadSafe) cas(&throwable, cast(Throwable)null, e); // store throwable if not already
done(id);
stop();
}
Expand All @@ -71,18 +71,18 @@ class Nursery : StopSource {
operations = operations.remove(idx);
bool isDone = atomicOp!"-="(busy,1) == 0;
auto localReceiver = receiver;
auto localException = exception;
auto localThrowable = throwable;
if (isDone) {
exception = null;
throwable = null;
receiver = null;
stopCallback.dispose();
stopCallback = null;
}
mutex.unlock_nothrow();

if (isDone && localReceiver !is null) {
if (localException !is null) {
localReceiver.setError(localException);
if (localThrowable !is null) {
localReceiver.setError(localThrowable);
} else if (isStopRequested()) {
localReceiver.setDone();
} else {
Expand Down Expand Up @@ -135,7 +135,7 @@ class Nursery : StopSource {
this(Receiver receiver) { this.receiver = receiver; }
void setValue() @safe { receiver.setValue(); }
void setDone() nothrow @safe { receiver.setDone(); }
void setError(Exception e) nothrow @safe { receiver.setError(e); }
void setError(Throwable e) nothrow @safe { receiver.setError(e); }
SchedulerObjectBase getScheduler() nothrow @safe {
import concurrency.scheduler : toSchedulerObject;
return receiver.getScheduler().toSchedulerObject();
Expand Down Expand Up @@ -180,7 +180,7 @@ class Nursery : StopSource {
private interface ReceiverObject {
void setValue() @safe;
void setDone() nothrow @safe;
void setError(Exception e) nothrow @safe;
void setError(Throwable e) nothrow @safe;
SchedulerObjectBase getScheduler() nothrow @safe;
}

Expand Down Expand Up @@ -212,7 +212,7 @@ private struct NurseryReceiver(Value) {
nursery.done(id);
}

void setError(Exception e) nothrow @safe {
void setError(Throwable e) nothrow @safe {
nursery.setError(e, id);
}

Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/completewithcancellation.d
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ private struct CompleteWithCancellationReceiver(Receiver) {
void setDone() nothrow @safe {
receiver.setDone();
}
void setError(Exception e) nothrow @safe {
void setError(Throwable e) nothrow @safe {
receiver.setError(e);
}
mixin ForwardExtensionPoints!receiver;
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/finally_.d
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private struct FinallyReceiver(Value, Result, Receiver) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setValue(getResult());
}
mixin ForwardExtensionPoints!receiver;
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/forwardon.d
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private struct ForwardOnReceiver(Receiver, Value, Scheduler) {
void setDone() @safe nothrow {
DoneSender().via(scheduler.schedule()).connectHeap(receiver).start();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
ErrorSender(e).via(scheduler.schedule()).connectHeap(receiver).start();
}
mixin ForwardExtensionPoints!receiver;
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/ignoreerror.d
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private struct IEReceiver(Value, Receiver) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setDone();
}
mixin ForwardExtensionPoints!receiver;
Expand Down
4 changes: 2 additions & 2 deletions source/concurrency/operations/race.d
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private class State(Value) : StopSource {
shared SharedBitField!Flags bitfield;
static if (!is(Value == void))
Value value;
Exception exception;
Throwable exception;
bool noDropouts;
this(bool noDropouts) {
this.noDropouts = noDropouts;
Expand Down Expand Up @@ -162,7 +162,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
process(newState);
}
}
void setError(Exception exception) @safe nothrow {
void setError(Throwable exception) @safe nothrow {
with (state.bitfield.lock(Flags.doneOrError_produced, Counter.tick)) {
bool last = isLast(newState);
if (!isDoneOrErrorProduced(oldState)) {
Expand Down
4 changes: 2 additions & 2 deletions source/concurrency/operations/retry.d
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import std.traits;
struct Times {
int max = 5;
int n = 0;
bool failure(Exception e) @safe nothrow {
bool failure(Throwable e) @safe nothrow {
n++;
return n >= max;
}
Expand Down Expand Up @@ -39,7 +39,7 @@ private struct RetryReceiver(Receiver, Sender, Logic) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
if (logic.failure(e))
receiver.setError(e);
else {
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/stopon.d
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private struct StopOnReceiver(Receiver, Value) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}
auto getStopToken() nothrow @trusted {
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/then.d
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private struct ThenReceiver(Receiver, Value, Fun) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}
mixin ForwardExtensionPoints!receiver;
Expand Down
7 changes: 4 additions & 3 deletions source/concurrency/operations/toshared.d
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender,
alias Value = Sender.Value;
static if (!is(Value == void))
alias ValueRep = Value;
alias InternalValue = Algebraic!(Exception, ValueRep, Done);
alias InternalValue = Algebraic!(Throwable, ValueRep, Done);
alias DG = void delegate(InternalValue) nothrow @safe shared;
static struct SharedSenderOp(Receiver) {
SharedSender parent;
Expand Down Expand Up @@ -74,7 +74,7 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender,
cb.dispose();
receiver.setError(e);
}
}, (Exception e){
}, (Throwable e){
receiver.setError(e);
}, (Done d){
receiver.setDone();
Expand Down Expand Up @@ -114,7 +114,7 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender,
state.value = InternalValue(Done());
process();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
state.value = InternalValue(e);
process();
}
Expand Down Expand Up @@ -160,6 +160,7 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender,
} else {
auto localState = state;
release(); // release early
// TODO: what happens if the sender completed after release, but before pushBack?
localState.dgs.pushBack(dg);
}
}
Expand Down
4 changes: 2 additions & 2 deletions source/concurrency/operations/via.d
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private struct ViaAReceiver(ValueB, ValueA, Receiver) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}
mixin ForwardExtensionPoints!receiver;
Expand All @@ -55,7 +55,7 @@ private struct ViaBReceiver(SenderA, ValueB, Receiver) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}
mixin ForwardExtensionPoints!receiver;
Expand Down
4 changes: 2 additions & 2 deletions source/concurrency/operations/whenall.d
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private class WhenAllState(Value) : StopSource {
StopCallback cb;
static if (is(typeof(Value.values)))
Value value;
Exception exception;
Throwable exception;
shared SharedBitField!Flags bitfield;
}

Expand Down Expand Up @@ -175,7 +175,7 @@ private struct WhenAllReceiver(Receiver, InnerValue, Value) {
process(newState);
}
}
void setError(Exception exception) @safe nothrow {
void setError(Throwable exception) @safe nothrow {
with (state.bitfield.lock(Flags.doneOrError_produced, Counter.tick)) {
bool last = isLast(newState);
if (!isDoneOrErrorProduced(oldState)) {
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/withscheduler.d
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private struct WithSchedulerReceiver(Receiver, Value, Scheduler) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}
auto getScheduler() @safe nothrow {
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/withstopsource.d
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private struct SSReceiver(Receiver, Value) {
receiver.setDone();
}
// TODO: would be good if we only emit this function in the Sender actually could call it
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
resetStopCallback();
receiver.setError(e);
}
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/withstoptoken.d
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private struct STReceiver(Receiver, Value, Fun) {
void setDone() nothrow @safe {
receiver.setDone();
}
void setError(Exception e) nothrow @safe {
void setError(Throwable e) nothrow @safe {
receiver.setError(e);
}
mixin ForwardExtensionPoints!receiver;
Expand Down

0 comments on commit 6ba3c1b

Please sign in to comment.