Skip to content

Commit

Permalink
Merge pull request #23 from status-im/all
Browse files Browse the repository at this point in the history
Fixed all() implementation.
  • Loading branch information
cheatfate authored Mar 15, 2019
2 parents 685665a + c05c012 commit df8d0da
Show file tree
Hide file tree
Showing 4 changed files with 464 additions and 60 deletions.
6 changes: 3 additions & 3 deletions chronos.nimble
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
packageName = "chronos"
version = "2.2.0"
version = "2.2.1"
author = "Status Research & Development GmbH"
description = "Chronos"
license = "Apache License 2.0 or MIT"
Expand All @@ -25,8 +25,8 @@ task test, "Run all tests":
for cmd in @[
"nim c -r -d:useSysAssert -d:useGcAssert tests/" & tfile,
"nim c -r tests/" & tfile,
"nim c -r --gc:markAndSweep tests/" & tfile,
"nim c -r -d:release tests/" & tfile,
#"nim c -r --gc:markAndSweep tests/" & tfile,
"nim c -r -d:release tests/" & tfile
]:
echo "\n" & cmd
exec cmd
Expand Down
110 changes: 54 additions & 56 deletions chronos/asyncfutures2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ type
FutureError* = object of Exception
cause*: FutureBase

{.deprecated: [PFutureBase: FutureBase, PFuture: Future].}

when not defined(release):
var currentID = 0
var currentID* {.threadvar.}: int
currentID = 0

# ZAH: This seems unnecessary. Isn't it easy to introduce a seperate
# module for the dispatcher type, so it can be directly referenced here?
Expand Down Expand Up @@ -145,7 +144,7 @@ proc complete*[T](future: Future[T], val: T) =
## Completes ``future`` with value ``val``.
#doAssert(not future.finished, "Future already finished, cannot finish twice.")
checkFinished(future)
doAssert(future.error == nil)
doAssert(isNil(future.error))
future.value = val
future.finished = true
future.callbacks.call()
Expand All @@ -154,15 +153,15 @@ proc complete*(future: Future[void]) =
## Completes a void ``future``.
#doAssert(not future.finished, "Future already finished, cannot finish twice.")
checkFinished(future)
doAssert(future.error == nil)
doAssert(isNil(future.error))
future.finished = true
future.callbacks.call()

proc complete*[T](future: FutureVar[T]) =
## Completes a ``FutureVar``.
template fut: untyped = Future[T](future)
checkFinished(fut)
doAssert(fut.error == nil)
doAssert(isNil(fut.error))
fut.finished = true
fut.callbacks.call()

Expand All @@ -172,7 +171,7 @@ proc complete*[T](future: FutureVar[T], val: T) =
## Any previously stored value will be overwritten.
template fut: untyped = Future[T](future)
checkFinished(fut)
doAssert(fut.error.isNil())
doAssert(isNil(fut.error))
fut.finished = true
fut.value = val
fut.callbacks.call()
Expand All @@ -198,7 +197,7 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
## Adds the callbacks proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
doAssert cb != nil
doAssert(not isNil(cb))
if future.finished:
# ZAH: it seems that the Future needs to know its associated Dispatcher
callSoon(cb, udata)
Expand All @@ -214,7 +213,7 @@ proc addCallback*[T](future: Future[T], cb: CallbackFunc) =

proc removeCallback*(future: FutureBase, cb: CallbackFunc,
udata: pointer = nil) =
doAssert cb != nil
doAssert(not isNil(cb))
let acb = AsyncCallback(function: cb, udata: udata)
future.callbacks.remove acb

Expand Down Expand Up @@ -258,7 +257,7 @@ proc `$`*(entries: seq[StackTraceEntry]): string =
# Find longest filename & line number combo for alignment purposes.
var longestLeft = 0
for entry in entries:
if entry.procName.isNil: continue
if isNil(entry.procName): continue

let left = $entry.filename & $entry.line
if left.len > longestLeft:
Expand All @@ -267,7 +266,7 @@ proc `$`*(entries: seq[StackTraceEntry]): string =
var indent = 2
# Format the entries.
for entry in entries:
if entry.procName.isNil:
if isNil(entry.procName):
if entry.line == -10:
result.add(spaces(indent) & "#[\n")
indent.inc(2)
Expand Down Expand Up @@ -320,7 +319,7 @@ proc read*[T](future: Future[T] | FutureVar[T]): T =
let fut = Future[T](future)
{.pop.}
if fut.finished:
if fut.error != nil:
if not isNil(fut.error):
injectStacktrace(fut)
raise fut.error
when T isnot void:
Expand All @@ -334,7 +333,7 @@ proc readError*[T](future: Future[T]): ref Exception =
##
## An ``ValueError`` exception will be thrown if no exception exists
## in the specified Future.
if future.error != nil: return future.error
if not isNil(future.error): return future.error
else:
raise newException(ValueError, "No error in future.")

Expand All @@ -356,20 +355,23 @@ proc finished*(future: FutureBase | FutureVar): bool =

proc failed*(future: FutureBase): bool =
## Determines whether ``future`` completed with an error.
return future.error != nil
return (not isNil(future.error))

proc asyncCheck*[T](future: Future[T]) =
## Sets a callback on ``future`` which raises an exception if the future
## finished with an error.
##
## This should be used instead of ``discard`` to discard void futures.
doAssert(not future.isNil, "Future is nil")
doAssert(not isNil(future), "Future is nil")
proc cb(data: pointer) =
if future.failed:
injectStacktrace(future)
raise future.error
future.callback = cb

proc asyncDiscard*[T](future: Future[T]) = discard
## This is async workaround for discard ``Future[T]``.

# ZAH: The return type here could be a Future[(T, Y)]
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## Returns a future which will complete once both ``fut1`` and ``fut2``
Expand Down Expand Up @@ -413,64 +415,60 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
fut2.callback = cb
return retFuture

# ZAH: The return type here could be a tuple
# This will enable waiting a heterogenous collection of futures.
proc all*[T](futs: varargs[Future[T]]): auto =
## Returns a future which will complete once
## all futures in ``futs`` complete.
## Returns a future which will complete once all futures in ``futs`` complete.
## If the argument is empty, the returned future completes immediately.
##
## If the awaited futures are not ``Future[void]``, the returned future
## will hold the values of all awaited futures in a sequence.
##
## If the awaited futures *are* ``Future[void]``,
## this proc returns ``Future[void]``.

when T is void:
var
retFuture = newFuture[void]("asyncdispatch.all")
completedFutures = 0
## If the awaited futures *are* ``Future[void]``, this proc returns
## ``Future[void]``.
##
## Note, that if one of the futures in ``futs`` will fail, result of ``all()``
## will also be failed with error from failed future.
let totalFutures = len(futs)
var completedFutures = 0

let totalFutures = len(futs)
# Because we can't capture varargs[T] in closures we need to create copy.
var nfuts = @futs

for fut in futs:
when T is void:
var retFuture = newFuture[void]("asyncdispatch.all(void)")
for fut in nfuts:
fut.addCallback proc (data: pointer) =
var fut = cast[FutureBase](data)
inc(completedFutures)
if not retFuture.finished:
if fut.failed:
retFuture.fail(fut.error)
else:
if completedFutures == totalFutures:
if completedFutures == totalFutures:
for nfut in nfuts:
if nfut.failed:
retFuture.fail(nfut.error)
break
if not retFuture.failed:
retFuture.complete()

if totalFutures == 0:
if len(nfuts) == 0:
retFuture.complete()

return retFuture

else:
var
retFuture = newFuture[seq[T]]("asyncdispatch.all")
retValues = newSeq[T](len(futs))
completedFutures = 0

for i, fut in futs:
proc setCallback(i: int) =
fut.addCallback proc (data: pointer) =
var fut = cast[Future[T]](data)
inc(completedFutures)
if not retFuture.finished:
if fut.failed:
retFuture.fail(fut.error)
else:
retValues[i] = fut.read()
if completedFutures == len(retValues):
retFuture.complete(retValues)

setCallback(i)

if retValues.len == 0:
var retFuture = newFuture[seq[T]]("asyncdispatch.all(T)")
var retValues = newSeq[T](totalFutures)
for fut in nfuts:
fut.addCallback proc (data: pointer) =
inc(completedFutures)
if not retFuture.finished:
if completedFutures == totalFutures:
for k, nfut in nfuts:
if nfut.failed:
retFuture.fail(nfut.error)
break
else:
retValues[k] = nfut.read()
if not retFuture.failed:
retFuture.complete(retValues)

if len(nfuts) == 0:
retFuture.complete(retValues)

return retFuture
Loading

0 comments on commit df8d0da

Please sign in to comment.