Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Exceptions to Throwables #28

Merged
merged 3 commits into from Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -9,7 +9,7 @@ A Sender is a lazy Task (in the general sense of the word). It needs to be conne

It can be used to model many asynchronous operations: Futures, Fiber, Coroutines, Threads, etc. It enforces structured concurrency because a Sender cannot start without it being awaited on.

`setValue` is the only one allowed to throw exceptions, and if it does, `setError` is called with the Exception. `setDone` is called when the operation has been cancelled.
`setValue` is the only one allowed to throw exceptions, and if it does, `setError` is called with the Throwable. `setDone` is called when the operation has been cancelled.

See http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2020/p0443r14.html for the C++ proposal for introducing Senders/Receivers.

Expand Down
55 changes: 55 additions & 0 deletions source/concurrency/error.d
@@ -0,0 +1,55 @@
module concurrency.error;

// In order to handle Errors that get thrown on non-main threads we have to make a clone. There are cases where the Error is constructed in TLS, which either might be overwritten with another Error (if the thread continues work and hits another Error), or might point to invalid memory once the thread is cleaned up. In order to be safe we have to clone the Error.

class ThrowableClone(T : Throwable) : T {
this(Args...)(Throwable.TraceInfo info, Args args) @safe nothrow {
super(args);
if (info)
this.info = new ClonedTraceInfo(info);
}
}

// The reason it accepts a Throwable is because there might be classes that derive directly from Throwable but aren't Exceptions. We treat them as errors here.
Throwable clone(Throwable t) nothrow @safe {
import core.exception;
if (auto a = cast(AssertError)t)
return new ThrowableClone!AssertError(t.info, a.msg, a.file, a.line, a.next);
if (auto r = cast(RangeError)t)
return new ThrowableClone!RangeError(t.info, r.file, r.line, r.next);
if (auto e = cast(Error)t)
Copy link
Contributor

@nordlow nordlow Aug 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why specialize handling of AssertError and RangeError only when there are sub-classes of Error such as FinalizeError? Is it because those are the only ones with constructor parameters different from Error's?

Wouldn't it have been useful (and likely faster instead of successive calls to dynamic-cast) for Error (and Object in the general case) to have an abstract dup member that preserves typeinfo? Explicitly implemented for each sub-class, though. Just a thought.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why specialize handling of AssertError and RangeError only when there are sub-classes of Error such as FinalizeError?

I had a whole list. Then realized a lot of those where actually added in latest master only. Which made be realize I will never be able to specialize for all of them. So then I did the most useful ones only.

Wouldn't it have been useful (and likely faster instead of successive calls to dynamic-cast) for Error (and Object in the general case) to have an abstract dup member that preserves typeinfo?

It would be immensely useful. But there isn't one. I don't think one will be added as well.

Copy link
Contributor

@nordlow nordlow Aug 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, cool. Why don't you think such a member will be added?

Copy link
Contributor

@nordlow nordlow Aug 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@nordlow nordlow Oct 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also the question of dup vs deep dup. Maybe a deep dup be partially inferred using .tupleof.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok with me to merge this as is, @skoppe and maybe add an issue about handling of more sub-classes of Error.

return new ThrowableClone!Error(t.info, t.msg, t.file, t.line, t.next);
return new ThrowableClone!Throwable(t.info, t.msg, t.file, t.line, t.next);
}

class ClonedTraceInfo : Throwable.TraceInfo {
string[] buf;
this(Throwable.TraceInfo t) @trusted nothrow {
if (t) {
try {
foreach (i, line; t)
buf ~= line.idup();
} catch (Throwable t) {
// alas...
}
}
}
override int opApply(scope int delegate(ref const(char[])) dg) const {
return opApply((ref size_t, ref const(char[]) buf) => dg(buf));
}

override int opApply(scope int delegate(ref size_t, ref const(char[])) dg) const {
foreach(i, line; buf) {
if (dg(i,line))
return 1;
}
return 0;
}

override string toString() const {
string buf;
foreach ( i, line; this )
buf ~= i ? "\n" ~ line : line;
return buf;
}
}
20 changes: 10 additions & 10 deletions source/concurrency/nursery.d
Expand Up @@ -31,7 +31,7 @@ class Nursery : StopSource {
Mutex mutex;
shared size_t busy = 0;
shared size_t counter = 0;
Exception exception; // first exception from sender, if any
Throwable throwable; // first throwable from sender, if any
ReceiverObject receiver;
StopCallback stopCallback;
Nursery assumeThreadSafe() @trusted shared nothrow {
Expand All @@ -53,9 +53,9 @@ class Nursery : StopSource {
return (cast()receiver).getScheduler();
}

private void setError(Exception e, size_t id) nothrow @safe shared {
private void setError(Throwable e, size_t id) nothrow @safe shared {
import core.atomic : cas;
with(assumeThreadSafe) cas(&exception, cast(Exception)null, e); // store exception if not already
with(assumeThreadSafe) cas(&throwable, cast(Throwable)null, e); // store throwable if not already
done(id);
stop();
}
Expand All @@ -71,18 +71,18 @@ class Nursery : StopSource {
operations = operations.remove(idx);
bool isDone = atomicOp!"-="(busy,1) == 0;
auto localReceiver = receiver;
auto localException = exception;
auto localThrowable = throwable;
if (isDone) {
exception = null;
throwable = null;
receiver = null;
stopCallback.dispose();
stopCallback = null;
}
mutex.unlock_nothrow();

if (isDone && localReceiver !is null) {
if (localException !is null) {
localReceiver.setError(localException);
if (localThrowable !is null) {
localReceiver.setError(localThrowable);
} else if (isStopRequested()) {
localReceiver.setDone();
} else {
Expand Down Expand Up @@ -135,7 +135,7 @@ class Nursery : StopSource {
this(Receiver receiver) { this.receiver = receiver; }
void setValue() @safe { receiver.setValue(); }
void setDone() nothrow @safe { receiver.setDone(); }
void setError(Exception e) nothrow @safe { receiver.setError(e); }
void setError(Throwable e) nothrow @safe { receiver.setError(e); }
SchedulerObjectBase getScheduler() nothrow @safe {
import concurrency.scheduler : toSchedulerObject;
return receiver.getScheduler().toSchedulerObject();
Expand Down Expand Up @@ -180,7 +180,7 @@ class Nursery : StopSource {
private interface ReceiverObject {
void setValue() @safe;
void setDone() nothrow @safe;
void setError(Exception e) nothrow @safe;
void setError(Throwable e) nothrow @safe;
SchedulerObjectBase getScheduler() nothrow @safe;
}

Expand Down Expand Up @@ -212,7 +212,7 @@ private struct NurseryReceiver(Value) {
nursery.done(id);
}

void setError(Exception e) nothrow @safe {
void setError(Throwable e) nothrow @safe {
nursery.setError(e, id);
}

Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/completewithcancellation.d
Expand Up @@ -18,7 +18,7 @@ private struct CompleteWithCancellationReceiver(Receiver) {
void setDone() nothrow @safe {
receiver.setDone();
}
void setError(Exception e) nothrow @safe {
void setError(Throwable e) nothrow @safe {
receiver.setError(e);
}
mixin ForwardExtensionPoints!receiver;
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/finally_.d
Expand Up @@ -31,7 +31,7 @@ private struct FinallyReceiver(Value, Result, Receiver) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setValue(getResult());
}
mixin ForwardExtensionPoints!receiver;
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/forwardon.d
Expand Up @@ -28,7 +28,7 @@ private struct ForwardOnReceiver(Receiver, Value, Scheduler) {
void setDone() @safe nothrow {
DoneSender().via(scheduler.schedule()).connectHeap(receiver).start();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
ErrorSender(e).via(scheduler.schedule()).connectHeap(receiver).start();
}
mixin ForwardExtensionPoints!receiver;
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/ignoreerror.d
Expand Up @@ -36,7 +36,7 @@ private struct IEReceiver(Value, Receiver) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setDone();
}
mixin ForwardExtensionPoints!receiver;
Expand Down
4 changes: 2 additions & 2 deletions source/concurrency/operations/race.d
Expand Up @@ -88,7 +88,7 @@ private class State(Value) : StopSource {
shared SharedBitField!Flags bitfield;
static if (!is(Value == void))
Value value;
Exception exception;
Throwable exception;
bool noDropouts;
this(bool noDropouts) {
this.noDropouts = noDropouts;
Expand Down Expand Up @@ -162,7 +162,7 @@ private struct RaceReceiver(Receiver, InnerValue, Value) {
process(newState);
}
}
void setError(Exception exception) @safe nothrow {
void setError(Throwable exception) @safe nothrow {
with (state.bitfield.lock(Flags.doneOrError_produced, Counter.tick)) {
bool last = isLast(newState);
if (!isDoneOrErrorProduced(oldState)) {
Expand Down
4 changes: 2 additions & 2 deletions source/concurrency/operations/retry.d
Expand Up @@ -10,7 +10,7 @@ import std.traits;
struct Times {
int max = 5;
int n = 0;
bool failure(Exception e) @safe nothrow {
bool failure(Throwable e) @safe nothrow {
n++;
return n >= max;
}
Expand Down Expand Up @@ -39,7 +39,7 @@ private struct RetryReceiver(Receiver, Sender, Logic) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
if (logic.failure(e))
receiver.setError(e);
else {
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/stopon.d
Expand Up @@ -28,7 +28,7 @@ private struct StopOnReceiver(Receiver, Value) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}
auto getStopToken() nothrow @trusted {
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/then.d
Expand Up @@ -46,7 +46,7 @@ private struct ThenReceiver(Receiver, Value, Fun) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}
mixin ForwardExtensionPoints!receiver;
Expand Down
7 changes: 3 additions & 4 deletions source/concurrency/operations/toshared.d
Expand Up @@ -38,7 +38,7 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender,
alias Value = Sender.Value;
static if (!is(Value == void))
alias ValueRep = Value;
alias InternalValue = Algebraic!(Exception, ValueRep, Done);
alias InternalValue = Algebraic!(Throwable, ValueRep, Done);
alias DG = void delegate(InternalValue) nothrow @safe shared;
static struct SharedSenderOp(Receiver) {
SharedSender parent;
Expand Down Expand Up @@ -74,7 +74,7 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender,
cb.dispose();
receiver.setError(e);
}
}, (Exception e){
}, (Throwable e){
receiver.setError(e);
}, (Done d){
receiver.setDone();
Expand Down Expand Up @@ -114,7 +114,7 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender,
state.value = InternalValue(Done());
process();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
state.value = InternalValue(e);
process();
}
Expand Down Expand Up @@ -159,7 +159,6 @@ class SharedSender(Sender, Scheduler, ResetLogic resetLogic) if (models!(Sender,
localState.op.start();
} else {
auto localState = state;
release(); // release early
localState.dgs.pushBack(dg);
}
}
Expand Down
4 changes: 2 additions & 2 deletions source/concurrency/operations/via.d
Expand Up @@ -28,7 +28,7 @@ private struct ViaAReceiver(ValueB, ValueA, Receiver) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}
mixin ForwardExtensionPoints!receiver;
Expand All @@ -55,7 +55,7 @@ private struct ViaBReceiver(SenderA, ValueB, Receiver) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}
mixin ForwardExtensionPoints!receiver;
Expand Down
4 changes: 2 additions & 2 deletions source/concurrency/operations/whenall.d
Expand Up @@ -126,7 +126,7 @@ private class WhenAllState(Value) : StopSource {
StopCallback cb;
static if (is(typeof(Value.values)))
Value value;
Exception exception;
Throwable exception;
shared SharedBitField!Flags bitfield;
}

Expand Down Expand Up @@ -175,7 +175,7 @@ private struct WhenAllReceiver(Receiver, InnerValue, Value) {
process(newState);
}
}
void setError(Exception exception) @safe nothrow {
void setError(Throwable exception) @safe nothrow {
with (state.bitfield.lock(Flags.doneOrError_produced, Counter.tick)) {
bool last = isLast(newState);
if (!isDoneOrErrorProduced(oldState)) {
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/withscheduler.d
Expand Up @@ -26,7 +26,7 @@ private struct WithSchedulerReceiver(Receiver, Value, Scheduler) {
void setDone() @safe nothrow {
receiver.setDone();
}
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
receiver.setError(e);
}
auto getScheduler() @safe nothrow {
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/withstopsource.d
Expand Up @@ -39,7 +39,7 @@ private struct SSReceiver(Receiver, Value) {
receiver.setDone();
}
// TODO: would be good if we only emit this function in the Sender actually could call it
void setError(Exception e) @safe nothrow {
void setError(Throwable e) @safe nothrow {
resetStopCallback();
receiver.setError(e);
}
Expand Down
2 changes: 1 addition & 1 deletion source/concurrency/operations/withstoptoken.d
Expand Up @@ -50,7 +50,7 @@ private struct STReceiver(Receiver, Value, Fun) {
void setDone() nothrow @safe {
receiver.setDone();
}
void setError(Exception e) nothrow @safe {
void setError(Throwable e) nothrow @safe {
receiver.setError(e);
}
mixin ForwardExtensionPoints!receiver;
Expand Down