Skip to content

Commit

Permalink
Address #329. (#330)
Browse files Browse the repository at this point in the history
* Address #329.

* Refactor AsyncStreamReader/Writer `state` helper functions.
Fix some compilation warnings in `debugutils`.
Add multiple writes into test to check consistency.

* Fix Linux issue.
Fix warnings.
  • Loading branch information
cheatfate committed Jan 10, 2023
1 parent e9f8baa commit 945c304
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 104 deletions.
4 changes: 2 additions & 2 deletions chronos/debugutils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ proc dumpPendingFutures*(filter = AllFutureStates): string =
## not yet finished).
## 2. Future[T] objects with ``FutureState.Finished/Cancelled/Failed`` state
## which callbacks are scheduled, but not yet fully processed.
var count = 0'u
var res = ""
when defined(chronosFutureTracking):
var count = 0'u
var res = ""
for item in pendingFutures():
if item.state in filter:
inc(count)
Expand Down
182 changes: 90 additions & 92 deletions chronos/streams/asyncstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type
par*: ref CatchableError
AsyncStreamWriteError* = object of AsyncStreamError
par*: ref CatchableError
AsyncStreamWriteEOFError* = object of AsyncStreamWriteError

AsyncBuffer* = object
offset*: int
Expand Down Expand Up @@ -218,24 +219,25 @@ proc newAsyncStreamUseClosedError*(): ref AsyncStreamUseClosedError {.
newException(AsyncStreamUseClosedError, "Stream is already closed")

proc raiseAsyncStreamUseClosedError*() {.
noinline, noreturn, raises: [Defect, AsyncStreamUseClosedError].} =
noinline, noreturn, raises: [Defect, AsyncStreamUseClosedError].} =
raise newAsyncStreamUseClosedError()

proc raiseAsyncStreamLimitError*() {.
noinline, noreturn, raises: [Defect, AsyncStreamLimitError].} =
noinline, noreturn, raises: [Defect, AsyncStreamLimitError].} =
raise newAsyncStreamLimitError()

proc raiseAsyncStreamIncompleteError*() {.
noinline, noreturn, raises: [Defect, AsyncStreamIncompleteError].} =
noinline, noreturn, raises: [Defect, AsyncStreamIncompleteError].} =
raise newAsyncStreamIncompleteError()

proc raiseEmptyMessageDefect*() {.noinline, noreturn.} =
raise newException(AsyncStreamIncorrectDefect,
"Could not write empty message")

template checkStreamClosed*(t: untyped) =
if t.state == AsyncStreamState.Closed:
raiseAsyncStreamUseClosedError()
proc raiseAsyncStreamWriteEOFError*() {.
noinline, noreturn, raises: [Defect, AsyncStreamWriteEOFError].} =
raise newException(AsyncStreamWriteEOFError,
"Stream finished or remote side dropped connection")

proc atEof*(rstream: AsyncStreamReader): bool =
## Returns ``true`` is reading stream is closed or finished and internal
Expand All @@ -257,93 +259,81 @@ proc atEof*(wstream: AsyncStreamWriter): bool =
else:
wstream.wsource.atEof()
else:
wstream.state != AsyncStreamState.Running
# `wstream.future` holds `rstream.writerLoop()` call's result.
# Return `true` if `writerLoop()` is not yet started or already stopped.
if isNil(wstream.future) or wstream.future.finished():
true
else:
wstream.state != AsyncStreamState.Running

proc closed*(reader: AsyncStreamReader): bool =
proc closed*(rw: AsyncStreamRW): bool =
## Returns ``true`` is reading/writing stream is closed.
reader.state in {AsyncStreamState.Closing, Closed}
rw.state in {AsyncStreamState.Closing, Closed}

proc finished*(reader: AsyncStreamReader): bool =
## Returns ``true`` is reading/writing stream is finished (completed).
if isNil(reader.readerLoop):
if isNil(reader.rsource):
reader.tsource.finished()
else:
reader.rsource.finished()
else:
(reader.state == AsyncStreamState.Finished)
proc finished*(rw: AsyncStreamRW): bool =
## Returns ``true`` if reading/writing stream is finished (completed).
rw.atEof() and rw.state == AsyncStreamState.Finished

proc stopped*(reader: AsyncStreamReader): bool =
## Returns ``true`` is reading/writing stream is stopped (interrupted).
if isNil(reader.readerLoop):
if isNil(reader.rsource):
false
proc stopped*(rw: AsyncStreamRW): bool =
## Returns ``true`` if reading/writing stream is stopped (interrupted).
let loopIsNil =
when rw is AsyncStreamReader:
isNil(rw.readerLoop)
else:
reader.rsource.stopped()
else:
(reader.state == AsyncStreamState.Stopped)
isNil(rw.writerLoop)

proc running*(reader: AsyncStreamReader): bool =
## Returns ``true`` is reading/writing stream is still pending.
if isNil(reader.readerLoop):
if isNil(reader.rsource):
reader.tsource.running()
if loopIsNil:
when rw is AsyncStreamReader:
if isNil(rw.rsource): false else: rw.rsource.stopped()
else:
reader.rsource.running()
if isNil(rw.wsource): false else: rw.wsource.stopped()
else:
(reader.state == AsyncStreamState.Running)

proc failed*(reader: AsyncStreamReader): bool =
if isNil(reader.readerLoop):
if isNil(reader.rsource):
reader.tsource.failed()
if isNil(rw.future) or rw.future.finished():
false
else:
reader.rsource.failed()
else:
(reader.state == AsyncStreamState.Error)
rw.state == AsyncStreamState.Stopped

proc closed*(writer: AsyncStreamWriter): bool =
## Returns ``true`` is reading/writing stream is closed.
writer.state in {AsyncStreamState.Closing, Closed}

proc finished*(writer: AsyncStreamWriter): bool =
## Returns ``true`` is reading/writing stream is finished (completed).
if isNil(writer.writerLoop):
if isNil(writer.wsource):
writer.tsource.finished()
proc running*(rw: AsyncStreamRW): bool =
## Returns ``true`` if reading/writing stream is still pending.
let loopIsNil =
when rw is AsyncStreamReader:
isNil(rw.readerLoop)
else:
writer.wsource.finished()
isNil(rw.writerLoop)
if loopIsNil:
when rw is AsyncStreamReader:
if isNil(rw.rsource): rw.tsource.running() else: rw.rsource.running()
else:
if isNil(rw.wsource): rw.tsource.running() else: rw.wsource.running()
else:
(writer.state == AsyncStreamState.Finished)

proc stopped*(writer: AsyncStreamWriter): bool =
## Returns ``true`` is reading/writing stream is stopped (interrupted).
if isNil(writer.writerLoop):
if isNil(writer.wsource):
if isNil(rw.future) or rw.future.finished():
false
else:
writer.wsource.stopped()
else:
(writer.state == AsyncStreamState.Stopped)
rw.state == AsyncStreamState.Running

proc running*(writer: AsyncStreamWriter): bool =
## Returns ``true`` is reading/writing stream is still pending.
if isNil(writer.writerLoop):
if isNil(writer.wsource):
writer.tsource.running()
proc failed*(rw: AsyncStreamRW): bool =
## Returns ``true`` if reading/writing stream is in failed state.
let loopIsNil =
when rw is AsyncStreamReader:
isNil(rw.readerLoop)
else:
writer.wsource.running()
else:
(writer.state == AsyncStreamState.Running)

proc failed*(writer: AsyncStreamWriter): bool =
if isNil(writer.writerLoop):
if isNil(writer.wsource):
writer.tsource.failed()
isNil(rw.writerLoop)
if loopIsNil:
when rw is AsyncStreamReader:
if isNil(rw.rsource): rw.tsource.failed() else: rw.rsource.failed()
else:
writer.wsource.failed()
if isNil(rw.wsource): rw.tsource.failed() else: rw.wsource.failed()
else:
(writer.state == AsyncStreamState.Error)
if isNil(rw.future) or rw.future.finished():
false
else:
rw.state == AsyncStreamState.Error

template checkStreamClosed*(t: untyped) =
if t.closed(): raiseAsyncStreamUseClosedError()

template checkStreamFinished*(t: untyped) =
if t.atEof(): raiseAsyncStreamWriteEOFError()

proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {.
gcsafe, raises: [Defect].}
Expand Down Expand Up @@ -787,6 +777,8 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
##
## ``nbytes`` must be more then zero.
checkStreamClosed(wstream)
checkStreamFinished(wstream)

if nbytes <= 0:
raiseEmptyMessageDefect()

Expand Down Expand Up @@ -834,6 +826,8 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte],
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
## stream.
checkStreamClosed(wstream)
checkStreamFinished(wstream)

let length = if msglen <= 0: len(sbytes) else: min(msglen, len(sbytes))
if length <= 0:
raiseEmptyMessageDefect()
Expand Down Expand Up @@ -885,6 +879,8 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink string,
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
## stream.
checkStreamClosed(wstream)
checkStreamFinished(wstream)

let length = if msglen <= 0: len(sbytes) else: min(msglen, len(sbytes))
if length <= 0:
raiseEmptyMessageDefect()
Expand Down Expand Up @@ -929,23 +925,25 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink string,
proc finish*(wstream: AsyncStreamWriter) {.async.} =
## Finish write stream ``wstream``.
checkStreamClosed(wstream)

if not isNil(wstream.wsource):
if isNil(wstream.writerLoop):
await wstream.wsource.finish()
else:
var item = WriteItem(kind: Pointer)
item.size = 0
item.future = newFuture[void]("async.stream.finish")
try:
await wstream.queue.put(item)
await item.future
except CancelledError as exc:
raise exc
except AsyncStreamError as exc:
raise exc
except CatchableError as exc:
raise newAsyncStreamWriteError(exc)
# For AsyncStreamWriter Finished state could be set manually or by stream's
# writeLoop, so we not going to raise exception here.
if not(wstream.atEof()):
if not isNil(wstream.wsource):
if isNil(wstream.writerLoop):
await wstream.wsource.finish()
else:
var item = WriteItem(kind: Pointer)
item.size = 0
item.future = newFuture[void]("async.stream.finish")
try:
await wstream.queue.put(item)
await item.future
except CancelledError as exc:
raise exc
except AsyncStreamError as exc:
raise exc
except CatchableError as exc:
raise newAsyncStreamWriteError(exc)

proc join*(rw: AsyncStreamRW): Future[void] =
## Get Future[void] which will be completed when stream become finished or
Expand Down
13 changes: 3 additions & 10 deletions chronos/streams/tlsstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
## uses sources of BearSSL <https://www.bearssl.org> by Thomas Pornin.
import
bearssl/[brssl, ec, errors, pem, rsa, ssl, x509],
bearssl/abi/cacert
bearssl/certs/cacert
import ../asyncloop, ../timer, ../asyncsync
import asyncstream, ../transports/stream, ../transports/common
export asyncloop, asyncsync, timer, asyncstream
Expand Down Expand Up @@ -102,13 +102,6 @@ type
TLSStreamProtocolError* = object of TLSStreamError
errCode*: int

proc newTLSStreamReadError(p: ref AsyncStreamError): ref TLSStreamReadError {.
noinline.} =
var w = newException(TLSStreamReadError, "Read stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p
w

proc newTLSStreamWriteError(p: ref AsyncStreamError): ref TLSStreamWriteError {.
noinline.} =
var w = newException(TLSStreamWriteError, "Write stream failed")
Expand Down Expand Up @@ -375,10 +368,10 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} =

# Syncing state for reader and writer
stream.writer.state = loopState
stream.reader.state = loopState
if loopState == AsyncStreamState.Error:
if isNil(stream.reader.error):
stream.reader.error = newTLSStreamReadError(error)
stream.reader.state = loopState
stream.reader.state = AsyncStreamState.Finished

if not(isNil(error)):
# Completing all pending writes
Expand Down
42 changes: 42 additions & 0 deletions tests/testasyncstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,48 @@ suite "AsyncStream test suite":
await server.join()
result = true
check waitFor(testConsume2(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(AsyncStream) write(eof) test":
proc testWriteEof(address: TransportAddress): Future[bool] {.async.} =
let
size = 10240
message = createBigMessage("ABCDEFGHIJKLMNOP", size)

proc processClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wbstream = newBoundedStreamWriter(wstream, uint64(size))
try:
check wbstream.atEof() == false
await wbstream.write(message)
check wbstream.atEof() == false
await wbstream.finish()
check wbstream.atEof() == true
expect AsyncStreamWriteEOFError:
await wbstream.write(message)
expect AsyncStreamWriteEOFError:
await wbstream.write(message)
expect AsyncStreamWriteEOFError:
await wbstream.write(message)
check wbstream.atEof() == true
await wbstream.closeWait()
check wbstream.atEof() == true
finally:
await wstream.closeWait()
await transp.closeWait()

let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}
var server = createStreamServer(address, processClient, flags = flags)
server.start()
var conn = await connect(server.localAddress())
try:
discard await conn.consume()
finally:
await conn.closeWait()
server.stop()
await server.closeWait()
return true

check waitFor(testWriteEof(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(AsyncStream) leaks test":
check:
getTracker("async.stream.reader").isLeaked() == false
Expand Down

0 comments on commit 945c304

Please sign in to comment.