Skip to content

Commit

Permalink
No more Defect on stream close. (#213)
Browse files Browse the repository at this point in the history
Fix async streams issue with replacing state.
Add `closing` states to HTTP's server connection, request and bodyrw.
Fix Http server cancellation leaks.
  • Loading branch information
cheatfate committed Aug 6, 2021
1 parent ef2430d commit b14f66c
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 177 deletions.
45 changes: 29 additions & 16 deletions chronos/apps/http/httpbodyrw.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# MIT license (LICENSE-MIT)
import ../../asyncloop, ../../asyncsync
import ../../streams/[asyncstream, boundstream]
import httpcommon

const
HttpBodyReaderTrackerName* = "http.body.reader"
Expand All @@ -17,9 +18,11 @@ const

type
HttpBodyReader* = ref object of AsyncStreamReader
bstate*: HttpState
streams*: seq[AsyncStreamReader]

HttpBodyWriter* = ref object of AsyncStreamWriter
bstate*: HttpState
streams*: seq[AsyncStreamWriter]

HttpBodyTracker* = ref object of TrackerBase
Expand Down Expand Up @@ -93,41 +96,47 @@ proc newHttpBodyReader*(streams: varargs[AsyncStreamReader]): HttpBodyReader =
##
## First stream in sequence will be used as a source.
doAssert(len(streams) > 0, "At least one stream must be added")
var res = HttpBodyReader(streams: @streams)
var res = HttpBodyReader(bstate: HttpState.Alive, streams: @streams)
res.init(streams[0])
trackHttpBodyReader(res)
res

proc closeWait*(bstream: HttpBodyReader) {.async.} =
## Close and free resource allocated by body reader.
var res = newSeq[Future[void]]()
# We closing streams in reversed order because stream at position [0], uses
# data from stream at position [1].
for index in countdown((len(bstream.streams) - 1), 0):
res.add(bstream.streams[index].closeWait())
await allFutures(res)
await procCall(closeWait(AsyncStreamReader(bstream)))
untrackHttpBodyReader(bstream)
if bstream.bstate == HttpState.Alive:
bstream.bstate = HttpState.Closing
var res = newSeq[Future[void]]()
# We closing streams in reversed order because stream at position [0], uses
# data from stream at position [1].
for index in countdown((len(bstream.streams) - 1), 0):
res.add(bstream.streams[index].closeWait())
await allFutures(res)
await procCall(closeWait(AsyncStreamReader(bstream)))
bstream.bstate = HttpState.Closed
untrackHttpBodyReader(bstream)

proc newHttpBodyWriter*(streams: varargs[AsyncStreamWriter]): HttpBodyWriter =
## HttpBodyWriter is AsyncStreamWriter which holds references to all the
## ``streams``. Also on close it will close all the ``streams``.
##
## First stream in sequence will be used as a destination.
doAssert(len(streams) > 0, "At least one stream must be added")
var res = HttpBodyWriter(streams: @streams)
var res = HttpBodyWriter(bstate: HttpState.Alive, streams: @streams)
res.init(streams[0])
trackHttpBodyWriter(res)
res

proc closeWait*(bstream: HttpBodyWriter) {.async.} =
## Close and free all the resources allocated by body writer.
var res = newSeq[Future[void]]()
for index in countdown(len(bstream.streams) - 1, 0):
res.add(bstream.streams[index].closeWait())
await allFutures(res)
await procCall(closeWait(AsyncStreamWriter(bstream)))
untrackHttpBodyWriter(bstream)
if bstream.bstate == HttpState.Alive:
bstream.bstate = HttpState.Closing
var res = newSeq[Future[void]]()
for index in countdown(len(bstream.streams) - 1, 0):
res.add(bstream.streams[index].closeWait())
await allFutures(res)
await procCall(closeWait(AsyncStreamWriter(bstream)))
bstream.bstate = HttpState.Closed
untrackHttpBodyWriter(bstream)

proc hasOverflow*(bstream: HttpBodyReader): bool {.raises: [Defect].} =
if len(bstream.streams) == 1:
Expand All @@ -144,3 +153,7 @@ proc hasOverflow*(bstream: HttpBodyReader): bool {.raises: [Defect].} =
false
else:
false

proc closed*(bstream: HttpBodyReader | HttpBodyWriter): bool {.
raises: [Defect].} =
bstream.bstate != HttpState.Alive
21 changes: 16 additions & 5 deletions chronos/apps/http/httpclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1183,11 +1183,22 @@ proc redirect*(request: HttpClientRequestRef,

proc fetch*(request: HttpClientRequestRef): Future[HttpResponseTuple] {.
async.} =
let response = await request.send()
let data = await response.getBodyBytes()
let code = response.status
await response.closeWait()
return (code, data)
var response: HttpClientResponseRef
try:
response = await request.send()
let buffer = await response.getBodyBytes()
let status = response.status
await response.closeWait()
response = nil
return (status, buffer)
except HttpError as exc:
if not(isNil(response)):
await response.closeWait()
raise exc
except CancelledError as exc:
if not(isNil(response)):
await response.closeWait()
raise exc

proc fetch*(session: HttpSessionRef, url: Uri): Future[HttpResponseTuple] {.
async.} =
Expand Down
3 changes: 3 additions & 0 deletions chronos/apps/http/httpcommon.nim
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type
CommaSeparatedArray ## Enable usage of comma symbol as separator of array
## items

HttpState* {.pure.} = enum
Alive, Closing, Closed

proc raiseHttpCriticalError*(msg: string,
code = Http400) {.noinline, noreturn.} =
raise (ref HttpCriticalError)(code: code, msg: msg)
Expand Down
168 changes: 111 additions & 57 deletions chronos/apps/http/httpserver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ type
RequestFence* = Result[HttpRequestRef, HttpProcessError]

HttpRequestFlags* {.pure.} = enum
BoundBody, UnboundBody, MultipartForm, UrlencodedForm,
ClientExpect
BoundBody, UnboundBody, MultipartForm, UrlencodedForm, ClientExpect

HttpResponseFlags* {.pure.} = enum
KeepAlive, Chunked
Expand Down Expand Up @@ -83,6 +82,7 @@ type
HttpServerRef* = ref HttpServer

HttpRequest* = object of RootObj
state*: HttpState
headers*: HttpTable
query*: HttpTable
postTable: Option[HttpTable]
Expand Down Expand Up @@ -113,6 +113,7 @@ type
HttpResponseRef* = ref HttpResponse

HttpConnection* = object of RootObj
state*: HttpState
server*: HttpServerRef
transp: StreamTransport
mainReader*: AsyncStreamReader
Expand Down Expand Up @@ -250,7 +251,7 @@ proc hasBody*(request: HttpRequestRef): bool {.raises: [Defect].} =
proc prepareRequest(conn: HttpConnectionRef,
req: HttpRequestHeader): HttpResultCode[HttpRequestRef] {.
raises: [Defect].}=
var request = HttpRequestRef(connection: conn)
var request = HttpRequestRef(connection: conn, state: HttpState.Alive)

if req.version notin {HttpVersion10, HttpVersion11}:
return err(Http505)
Expand Down Expand Up @@ -402,38 +403,57 @@ proc handleExpect*(request: HttpRequestRef) {.async.} =

proc getBody*(request: HttpRequestRef): Future[seq[byte]] {.async.} =
## Obtain request's body as sequence of bytes.
let res = request.getBodyReader()
if res.isErr():
let bodyReader = request.getBodyReader()
if bodyReader.isErr():
return @[]
else:
let reader = res.get()
var reader = bodyReader.get()
try:
await request.handleExpect()
var res = await reader.read()
let res = await reader.read()
if reader.hasOverflow():
await reader.closeWait()
reader = nil
raiseHttpCriticalError(MaximumBodySizeError, Http413)
return res
else:
await reader.closeWait()
reader = nil
return res
except CancelledError as exc:
if not(isNil(reader)):
await reader.closeWait()
raise exc
except AsyncStreamError:
if not(isNil(reader)):
await reader.closeWait()
raiseHttpCriticalError("Unable to read request's body")
finally:
await closeWait(res.get())

proc consumeBody*(request: HttpRequestRef): Future[void] {.async.} =
## Consume/discard request's body.
let res = request.getBodyReader()
if res.isErr():
let bodyReader = request.getBodyReader()
if bodyReader.isErr():
return
else:
let reader = res.get()
var reader = bodyReader.get()
try:
await request.handleExpect()
discard await reader.consume()
if reader.hasOverflow():
await reader.closeWait()
reader = nil
raiseHttpCriticalError(MaximumBodySizeError, Http413)
else:
await reader.closeWait()
reader = nil
return
except CancelledError as exc:
if not(isNil(reader)):
await reader.closeWait()
raise exc
except AsyncStreamError:
if not(isNil(reader)):
await reader.closeWait()
raiseHttpCriticalError("Unable to read request's body")
finally:
await closeWait(res.get())

proc getAcceptInfo*(request: HttpRequestRef): Result[AcceptInfo, cstring] =
## Returns value of `Accept` header as `AcceptInfo` object.
Expand Down Expand Up @@ -574,6 +594,7 @@ proc getRequest(conn: HttpConnectionRef): Future[HttpRequestRef] {.async.} =
proc init*(value: var HttpConnection, server: HttpServerRef,
transp: StreamTransport) =
value = HttpConnection(
state: HttpState.Alive,
server: server,
transp: transp,
buffer: newSeq[byte](server.maxHeadersSize),
Expand All @@ -590,23 +611,32 @@ proc new(ht: typedesc[HttpConnectionRef], server: HttpServerRef,
res

proc closeWait*(conn: HttpConnectionRef) {.async.} =
var pending: seq[Future[void]]
if conn.reader != conn.mainReader:
pending.add(conn.reader.closeWait())
if conn.writer != conn.mainWriter:
pending.add(conn.writer.closeWait())
if len(pending) > 0:
if conn.state == HttpState.Alive:
conn.state = HttpState.Closing
var pending: seq[Future[void]]
if conn.reader != conn.mainReader:
pending.add(conn.reader.closeWait())
if conn.writer != conn.mainWriter:
pending.add(conn.writer.closeWait())
if len(pending) > 0:
await allFutures(pending)
# After we going to close everything else.
pending.setLen(3)
pending[0] = conn.mainReader.closeWait()
pending[1] = conn.mainWriter.closeWait()
pending[2] = conn.transp.closeWait()
await allFutures(pending)
# After we going to close everything else.
await allFutures(conn.mainReader.closeWait(), conn.mainWriter.closeWait(),
conn.transp.closeWait())
conn.state = HttpState.Closed

proc closeWait(req: HttpRequestRef) {.async.} =
if req.response.isSome():
let resp = req.response.get()
if (HttpResponseFlags.Chunked in resp.flags) and
not(isNil(resp.chunkedWriter)):
await resp.chunkedWriter.closeWait()
if req.state == HttpState.Alive:
if req.response.isSome():
req.state = HttpState.Closing
let resp = req.response.get()
if (HttpResponseFlags.Chunked in resp.flags) and
not(isNil(resp.chunkedWriter)):
await resp.chunkedWriter.closeWait()
req.state = HttpState.Closed

proc createConnection(server: HttpServerRef,
transp: StreamTransport): Future[HttpConnectionRef] {.
Expand Down Expand Up @@ -700,16 +730,21 @@ proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} =

if arg.isErr():
let code = arg.error().code
case arg.error().error
of HTTPServerError.TimeoutError:
discard await conn.sendErrorResponse(HttpVersion11, code, false)
of HTTPServerError.RecoverableError:
discard await conn.sendErrorResponse(HttpVersion11, code, false)
of HTTPServerError.CriticalError:
discard await conn.sendErrorResponse(HttpVersion11, code, false)
of HTTPServerError.CatchableError:
discard await conn.sendErrorResponse(HttpVersion11, code, false)
of HttpServerError.DisconnectError:
try:
case arg.error().error
of HTTPServerError.TimeoutError:
discard await conn.sendErrorResponse(HttpVersion11, code, false)
of HTTPServerError.RecoverableError:
discard await conn.sendErrorResponse(HttpVersion11, code, false)
of HTTPServerError.CriticalError:
discard await conn.sendErrorResponse(HttpVersion11, code, false)
of HTTPServerError.CatchableError:
discard await conn.sendErrorResponse(HttpVersion11, code, false)
of HttpServerError.DisconnectError:
discard
except CancelledError:
# We swallowing `CancelledError` in a loop, but we going to exit
# loop ASAP.
discard
break
else:
Expand All @@ -718,33 +753,52 @@ proc processLoop(server: HttpServerRef, transp: StreamTransport) {.async.} =
if lastErrorCode.isNone():
if isNil(resp):
# Response was `nil`.
discard await conn.sendErrorResponse(HttpVersion11, Http404,
false)
try:
discard await conn.sendErrorResponse(HttpVersion11, Http404, false)
except CancelledError:
keepConn = false
else:
case resp.state
of HttpResponseState.Empty:
# Response was ignored
discard await conn.sendErrorResponse(HttpVersion11, Http404,
keepConn)
of HttpResponseState.Prepared:
# Response was prepared but not sent.
discard await conn.sendErrorResponse(HttpVersion11, Http409,
keepConn)
else:
# some data was already sent to the client.
discard
try:
case resp.state
of HttpResponseState.Empty:
# Response was ignored
discard await conn.sendErrorResponse(HttpVersion11, Http404,
keepConn)
of HttpResponseState.Prepared:
# Response was prepared but not sent.
discard await conn.sendErrorResponse(HttpVersion11, Http409,
keepConn)
else:
# some data was already sent to the client.
discard
except CancelledError:
keepConn = false
else:
discard await conn.sendErrorResponse(HttpVersion11, lastErrorCode.get(),
false)
try:
discard await conn.sendErrorResponse(HttpVersion11,
lastErrorCode.get(), false)
except CancelledError:
keepConn = false

# Closing and releasing all the request resources.
await request.closeWait()
try:
await request.closeWait()
except CancelledError:
# We swallowing `CancelledError` in a loop, but we still need to close
# `request` before exiting.
await request.closeWait()

if not(keepConn):
break

# Connection could be `nil` only when secure handshake is failed.
if not(isNil(conn)):
await conn.closeWait()
try:
await conn.closeWait()
except CancelledError:
# Cancellation could be happened while we closing `conn`. But we still
# need to close it.
await conn.closeWait()

server.connections.del(transp.getId())
# if server.maxConnections > 0:
Expand Down
Loading

0 comments on commit b14f66c

Please sign in to comment.