Skip to content

Commit

Permalink
Complete futures in closure finally (fix #415) (#449)
Browse files Browse the repository at this point in the history
* Complete in closure finally

* cleanup tests, add comment

* handle defects

* don't complete future on defect

* complete future in test to avoid failure

* fix with strict exceptions

* fix regressions

* fix nim 1.6
  • Loading branch information
Menduist committed Oct 16, 2023
1 parent 2e8551b commit 253bc3c
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 114 deletions.
69 changes: 21 additions & 48 deletions chronos/asyncfutures2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -311,57 +311,30 @@ proc internalContinue(fut: pointer) {.raises: [], gcsafe.} =
proc futureContinue*(fut: FutureBase) {.raises: [], gcsafe.} =
# This function is responsible for calling the closure iterator generated by
# the `{.async.}` transformation either until it has completed its iteration
# or raised and error / been cancelled.
#
# Every call to an `{.async.}` proc is redirected to call this function
# instead with its original body captured in `fut.closure`.
var next: FutureBase
template iterate =
while true:
# Call closure to make progress on `fut` until it reaches `yield` (inside
# `await` typically) or completes / fails / is cancelled
next = fut.internalClosure(fut)
if fut.internalClosure.finished(): # Reached the end of the transformed proc
break

if next == nil:
raiseAssert "Async procedure (" & ($fut.location[LocationKind.Create]) &
") yielded `nil`, are you await'ing a `nil` Future?"

if not next.finished():
# We cannot make progress on `fut` until `next` has finished - schedule
# `fut` to continue running when that happens
GC_ref(fut)
next.addCallback(CallbackFunc(internalContinue), cast[pointer](fut))

# return here so that we don't remove the closure below
return

# Continue while the yielded future is already finished.

when chronosStrictException:
try:
iterate
except CancelledError:
fut.cancelAndSchedule()
except CatchableError as exc:
fut.fail(exc)
finally:
next = nil # GC hygiene
else:
try:
iterate
except CancelledError:
fut.cancelAndSchedule()
except CatchableError as exc:
fut.fail(exc)
except Exception as exc:
if exc of Defect:
raise (ref Defect)(exc)

fut.fail((ref ValueError)(msg: exc.msg, parent: exc))
finally:
next = nil # GC hygiene
while true:
# Call closure to make progress on `fut` until it reaches `yield` (inside
# `await` typically) or completes / fails / is cancelled
let next: FutureBase = fut.internalClosure(fut)
if fut.internalClosure.finished(): # Reached the end of the transformed proc
break

if next == nil:
raiseAssert "Async procedure (" & ($fut.location[LocationKind.Create]) &
") yielded `nil`, are you await'ing a `nil` Future?"

if not next.finished():
# We cannot make progress on `fut` until `next` has finished - schedule
# `fut` to continue running when that happens
GC_ref(fut)
next.addCallback(CallbackFunc(internalContinue), cast[pointer](fut))

# return here so that we don't remove the closure below
return

# Continue while the yielded future is already finished.

# `futureContinue` will not be called any more for this future so we can
# clean it up
Expand Down
183 changes: 123 additions & 60 deletions chronos/asyncmacro2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,60 +9,14 @@

import std/[macros]

# `quote do` will ruin line numbers so we avoid it using these helpers
proc completeWithResult(fut, baseType: NimNode): NimNode {.compileTime.} =
# when `baseType` is void:
# complete(`fut`)
# else:
# complete(`fut`, result)
if baseType.eqIdent("void"):
# Shortcut if we know baseType at macro expansion time
newCall(ident "complete", fut)
else:
# `baseType` might be generic and resolve to `void`
nnkWhenStmt.newTree(
nnkElifExpr.newTree(
nnkInfix.newTree(ident "is", baseType, ident "void"),
newCall(ident "complete", fut)
),
nnkElseExpr.newTree(
newCall(ident "complete", fut, ident "result")
)
)

proc completeWithNode(fut, baseType, node: NimNode): NimNode {.compileTime.} =
# when typeof(`node`) is void:
# `node` # statement / explicit return
# -> completeWithResult(fut, baseType)
# else: # expression / implicit return
# complete(`fut`, `node`)
if node.kind == nnkEmpty: # shortcut when known at macro expanstion time
completeWithResult(fut, baseType)
else:
# Handle both expressions and statements - since the type is not know at
# macro expansion time, we delegate this choice to a later compilation stage
# with `when`.
nnkWhenStmt.newTree(
nnkElifExpr.newTree(
nnkInfix.newTree(
ident "is", nnkTypeOfExpr.newTree(node), ident "void"),
newStmtList(
node,
completeWithResult(fut, baseType)
)
),
nnkElseExpr.newTree(
newCall(ident "complete", fut, node)
)
)

proc processBody(node, fut, baseType: NimNode): NimNode {.compileTime.} =
proc processBody(node, setResultSym, baseType: NimNode): NimNode {.compileTime.} =
#echo(node.treeRepr)
case node.kind
of nnkReturnStmt:
let
res = newNimNode(nnkStmtList, node)
res.add completeWithNode(fut, baseType, processBody(node[0], fut, baseType))
if node[0].kind != nnkEmpty:
res.add newCall(setResultSym, processBody(node[0], setResultSym, baseType))
res.add newNimNode(nnkReturnStmt, node).add(newNilLit())

res
Expand All @@ -71,12 +25,89 @@ proc processBody(node, fut, baseType: NimNode): NimNode {.compileTime.} =
node
else:
for i in 0 ..< node.len:
# We must not transform nested procedures of any form, otherwise
# `fut` will be used for all nested procedures as their own
# `retFuture`.
node[i] = processBody(node[i], fut, baseType)
# We must not transform nested procedures of any form, since their
# returns are not meant for our futures
node[i] = processBody(node[i], setResultSym, baseType)
node

proc wrapInTryFinally(fut, baseType, body: NimNode): NimNode {.compileTime.} =
# creates:
# var closureSucceeded = true
# try: `body`
# except CancelledError: closureSucceeded = false; `castFutureSym`.cancelAndSchedule()
# except CatchableError as exc: closureSucceeded = false; `castFutureSym`.fail(exc)
# except Defect as exc:
# closureSucceeded = false
# raise exc
# finally:
# if closureSucceeded:
# `castFutureSym`.complete(result)

# we are completing inside finally to make sure the completion happens even
# after a `return`
let closureSucceeded = genSym(nskVar, "closureSucceeded")
var nTry = nnkTryStmt.newTree(body)
nTry.add nnkExceptBranch.newTree(
ident"CancelledError",
nnkStmtList.newTree(
nnkAsgn.newTree(closureSucceeded, ident"false"),
newCall(ident "cancelAndSchedule", fut)
)
)

nTry.add nnkExceptBranch.newTree(
nnkInfix.newTree(ident"as", ident"CatchableError", ident"exc"),
nnkStmtList.newTree(
nnkAsgn.newTree(closureSucceeded, ident"false"),
newCall(ident "fail", fut, ident"exc")
)
)

nTry.add nnkExceptBranch.newTree(
nnkInfix.newTree(ident"as", ident"Defect", ident"exc"),
nnkStmtList.newTree(
nnkAsgn.newTree(closureSucceeded, ident"false"),
nnkRaiseStmt.newTree(ident"exc")
)
)

when not chronosStrictException:
# adds
# except Exception as exc:
# closureSucceeded = false
# fut.fail((ref ValueError)(msg: exc.msg, parent: exc))
let excName = ident"exc"

nTry.add nnkExceptBranch.newTree(
nnkInfix.newTree(ident"as", ident"Exception", ident"exc"),
nnkStmtList.newTree(
nnkAsgn.newTree(closureSucceeded, ident"false"),
newCall(ident "fail", fut,
quote do: (ref ValueError)(msg: `excName`.msg, parent: `excName`)),
)
)

nTry.add nnkFinally.newTree(
nnkIfStmt.newTree(
nnkElifBranch.newTree(
closureSucceeded,
nnkWhenStmt.newTree(
nnkElifExpr.newTree(
nnkInfix.newTree(ident "is", baseType, ident "void"),
newCall(ident "complete", fut)
),
nnkElseExpr.newTree(
newCall(ident "complete", fut, ident "result")
)
)
)
)
)
return nnkStmtList.newTree(
newVarStmt(closureSucceeded, ident"true"),
nTry
)

proc getName(node: NimNode): string {.compileTime.} =
case node.kind
of nnkSym:
Expand Down Expand Up @@ -153,8 +184,9 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
if baseTypeIsVoid: futureVoidType
else: returnType
castFutureSym = nnkCast.newTree(internalFutureType, internalFutureSym)
setResultSym = ident"setResult"

procBody = prc.body.processBody(castFutureSym, baseType)
procBody = prc.body.processBody(setResultSym, baseType)

# don't do anything with forward bodies (empty)
if procBody.kind != nnkEmpty:
Expand Down Expand Up @@ -199,9 +231,44 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
)
)

completeDecl = completeWithNode(castFutureSym, baseType, procBodyBlck)
# generates:
# template `setResultSym`(code: untyped) {.used.} =
# when typeof(code) is void: code
# else: result = code
#
# this is useful to handle implicit returns, but also
# to bind the `result` to the one we declare here
setResultDecl =
nnkTemplateDef.newTree(
setResultSym,
newEmptyNode(), newEmptyNode(),
nnkFormalParams.newTree(
newEmptyNode(),
nnkIdentDefs.newTree(
ident"code",
ident"untyped",
newEmptyNode(),
)
),
nnkPragma.newTree(ident"used"),
newEmptyNode(),
nnkWhenStmt.newTree(
nnkElifBranch.newTree(
nnkInfix.newTree(ident"is", nnkTypeOfExpr.newTree(ident"code"), ident"void"),
ident"code"
),
nnkElse.newTree(
newAssignment(ident"result", ident"code")
)
)
)

completeDecl = wrapInTryFinally(
castFutureSym, baseType,
newCall(setResultSym, procBodyBlck)
)

closureBody = newStmtList(resultDecl, completeDecl)
closureBody = newStmtList(resultDecl, setResultDecl, completeDecl)

internalFutureParameter = nnkIdentDefs.newTree(
internalFutureSym, newIdentNode("FutureBase"), newEmptyNode())
Expand All @@ -225,10 +292,6 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} =
# here the possibility of transporting more specific error types here
# for example by casting exceptions coming out of `await`..
let raises = nnkBracket.newTree()
when chronosStrictException:
raises.add(newIdentNode("CatchableError"))
else:
raises.add(newIdentNode("Exception"))

closureIterator.addPragma(nnkExprColonExpr.newTree(
newIdentNode("raises"),
Expand Down
7 changes: 1 addition & 6 deletions chronos/futures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ export srcloc
when chronosStackTrace:
type StackTrace = string

when chronosStrictException:
{.pragma: closureIter, raises: [CatchableError], gcsafe.}
else:
{.pragma: closureIter, raises: [Exception], gcsafe.}

type
LocationKind* {.pure.} = enum
Create
Expand Down Expand Up @@ -54,7 +49,7 @@ type
internalState*: FutureState
internalFlags*: FutureFlags
internalError*: ref CatchableError ## Stored exception
internalClosure*: iterator(f: FutureBase): FutureBase {.closureIter.}
internalClosure*: iterator(f: FutureBase): FutureBase {.raises: [], gcsafe.}

when chronosFutureId:
internalId*: uint
Expand Down
Loading

0 comments on commit 253bc3c

Please sign in to comment.