Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion json_rpc.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ requires "nim >= 1.6.0",
"chronos >= 4.0.3 & < 5.0.0",
"httputils >= 0.3.0",
"chronicles",
"websock >= 0.2.0 & < 0.3.0",
"websock >= 0.2.1 & < 0.3.0",
"serialization >= 0.4.4",
"json_serialization >= 0.4.2",
"unittest2"
Expand Down
37 changes: 23 additions & 14 deletions json_rpc/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,28 @@

{.push raises: [], gcsafe.}

import chronos/futures

# json_rpc seems to frequently trigger this bug so add a workaround here
when (NimMajor, NimMinor, NimPatch) < (2, 2, 6):
proc json_rpc_workaround_24844_future_string*() {.exportc.} =
# TODO https://github.com/nim-lang/Nim/issues/24844
discard Future[string]().value()

import
std/[deques, json, tables, macros],
chronos,
chronicles,
stew/byteutils,
chronos,
results,
./private/[client_handler_wrapper, jrpc_sys, shared_wrapper],
./[errors, jsonmarshal, router]

from strutils import replace

export
chronos, chronicles, deques, tables, jsonmarshal, RequestParamsTx, RequestBatchTx,
ResponseBatchRx, RequestIdKind, RequestId, RequestTx, RequestParamKind, results
chronos, deques, tables, jsonmarshal, RequestParamsTx, ResponseBatchRx, RequestIdKind,
RequestId, RequestTx, RequestParamKind, results

logScope:
topics = "JSONRPC-CLIENT"
Expand Down Expand Up @@ -60,16 +68,16 @@ func parseResponse*(payload: openArray[byte], T: type): T {.raises: [JsonRpcErro
try:
JrpcSys.decode(payload, T)
except SerializationError as exc:
raise (ref RequestDecodeError)(
raise (ref InvalidResponse)(
msg: exc.formatMsg("msg"), payload: @payload, parent: exc
)

proc processsSingleResponse(
response: sink ResponseRx, id: int
response: sink ResponseRx2, id: int
): JsonString {.raises: [JsonRpcError].} =
if response.id.kind != RequestIdKind.riNumber or response.id.num != id:
raise
(ref RequestDecodeError)(msg: "Expected `id` " & $id & ", got " & $response.id)
(ref InvalidResponse)(msg: "Expected `id` " & $id & ", got " & $response.id)

case response.kind
of ResponseKind.rkError:
Expand All @@ -80,7 +88,7 @@ proc processsSingleResponse(
proc processsSingleResponse*(
body: openArray[byte], id: int
): JsonString {.raises: [JsonRpcError].} =
processsSingleResponse(parseResponse(body, ResponseRx), id)
processsSingleResponse(parseResponse(body, ResponseRx2), id)

template withPendingFut*(client, fut, body: untyped): untyped =
let fut = ResponseFut.init("jsonrpc.client.pending")
Expand Down Expand Up @@ -145,10 +153,11 @@ proc call*(
# helps debugging, if nothing else
id = client.getNextId()
requestData = JrpcSys.withWriter(writer):
writer.requestTxEncode(name, params, id)
writer.writeRequest(name, params, id)

debug "Sending JSON-RPC request",
name, len = requestData.len, id, remote = client.remote
trace "Parameters", params

# Release params memory earlier by using a raw proc for the initial
# processing
Expand All @@ -175,14 +184,14 @@ proc call*(

proc callBatch*(
client: RpcClient, calls: seq[RequestTx]
): Future[seq[ResponseRx]] {.
): Future[seq[ResponseRx2]] {.
async: (raises: [CancelledError, JsonRpcError], raw: true)
.} =
if calls.len == 0:
let res = Future[seq[ResponseRx]].Raising([CancelledError, JsonRpcError]).init(
let res = Future[seq[ResponseRx2]].Raising([CancelledError, JsonRpcError]).init(
"empty batch"
)
res.complete(default(seq[ResponseRx]))
res.complete(default(seq[ResponseRx2]))
return res

let requestData = JrpcSys.withWriter(writer):
Expand All @@ -194,12 +203,12 @@ proc callBatch*(

proc complete(
client: RpcClient, request: auto
): Future[seq[ResponseRx]] {.async: (raises: [CancelledError, JsonRpcError]).} =
): Future[seq[ResponseRx2]] {.async: (raises: [CancelledError, JsonRpcError]).} =
try:
let resData = await request
debug "Processing JSON-RPC batch response",
len = resData.len, remote = client.remote
parseResponse(resData, seq[ResponseRx])
parseResponse(resData, seq[ResponseRx2])
except JsonRpcError as exc:
debug "JSON-RPC batch request failed", err = exc.msg, remote = client.remote
raise exc
Expand Down Expand Up @@ -248,7 +257,7 @@ proc send*(
debug "Processing JSON-RPC batch response",
len = resData.len, lastId, remote = client.remote

parseResponse(resData, seq[ResponseRx])
parseResponse(resData, seq[ResponseRx2])
except JsonRpcError as exc:
debug "JSON-RPC batch request failed", err = exc.msg, remote = client.remote

Expand Down
17 changes: 2 additions & 15 deletions json_rpc/clients/httpclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,11 @@ method request(
if res.status < 200 or res.status >= 300: # res.status is not 2xx (success)
raise (ref ErrorResponse)(status: res.status, msg: res.reason)

let
resData = await res.getBodyBytes(client.maxMessageSize)
# TODO remove this processMessage hook when subscriptions / pubsub is
# properly supported
fallback = client.callOnProcessMessage(resData).valueOr:
raise (ref RequestDecodeError)(msg: error, payload: resData)

if not fallback:
# TODO http channels are unidirectional, so it doesn't really make sense
# to call onProcessMessage from http - this should be deprecated
# as soon as bidirectionality is supported
raise (ref InvalidResponse)(msg: "onProcessMessage handled response")

resData
await res.getBodyBytes(client.maxMessageSize)
except HttpError as exc:
raise (ref RpcTransportError)(msg: exc.msg, parent: exc)
finally:
await req.closeWait()
await res.closeWait()

proc newRpcHttpClient*(
maxBodySize = defaultMaxMessageSize,
Expand Down
6 changes: 3 additions & 3 deletions json_rpc/clients/socketclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ method request(
client: RpcSocketClient, reqData: seq[byte]
): Future[seq[byte]] {.async: (raises: [CancelledError, JsonRpcError]).} =
## Remotely calls the specified RPC method.
if client.transport.isNil:
let transport = client.transport
if transport.isNil:
raise newException(
RpcTransportError, "Transport is not initialised (missing a call to connect?)"
)
let transport = client.transport

client.withPendingFut(fut):
try:
discard await transport.write(reqData & "\r\n".toBytes())
except TransportError as exc:
except CatchableError as exc:
# If there's an error sending, the "next messages" facility will be
# broken since we don't know if the server observed the message or not
transport.close()
Expand Down
20 changes: 10 additions & 10 deletions json_rpc/clients/websocketclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,24 @@ method request*(
client: RpcWebSocketClient, reqData: seq[byte]
): Future[seq[byte]] {.async: (raises: [CancelledError, JsonRpcError]).} =
## Remotely calls the specified RPC method.
if client.transport.isNil:
let transport = client.transport
if transport.isNil:
raise newException(
RpcTransportError, "Transport is not initialised (missing a call to connect?)"
)
let transport = client.transport

client.withPendingFut(fut):
try:
await transport.send(reqData, Opcode.Binary)
except CancelledError as exc:
raise exc
except CatchableError as exc:
# If there's an error sending, the "next messages" facility will be
# broken since we don't know if the server observed the message or not
# broken since we don't know if the server observed the message or not -
# the same goes for cancellation during write
try:
await noCancel transport.close()
except CatchableError:
except CatchableError as exc:
# TODO https://github.com/status-im/nim-websock/pull/178
raiseAssert "Doesn't actually raise"
raiseAssert exc.msg
raise (ref RpcPostError)(msg: exc.msg, parent: exc)

await fut
Expand All @@ -86,10 +85,11 @@ proc processData(client: RpcWebSocketClient) {.async: (raises: []).} =
client.clearPending(lastError)

try:
await client.transport.close()
await noCancel client.transport.close()
client.transport = nil
except CatchableError:
raiseAssert "Doesn't actually raise"
except CatchableError as exc:
# TODO https://github.com/status-im/nim-websock/pull/178
raiseAssert exc.msg

if not client.onDisconnect.isNil:
client.onDisconnect()
Expand Down
7 changes: 3 additions & 4 deletions json_rpc/errors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,11 @@ type

InvalidResponse* = object of JsonRpcError
## raised when the server response violates the JSON-RPC protocol
payload*: seq[byte]

RpcBindError* = object of JsonRpcError
RpcAddressUnresolvableError* = object of JsonRpcError

InvalidRequest* = object of JsonRpcError
## raised when the server recieves an invalid JSON request object
code*: int

RequestDecodeError* = object of JsonRpcError
## raised when fail to decode RequestRx
payload*: seq[byte]
Expand All @@ -52,3 +49,5 @@ type
## be provided.
code*: int
data*: results.Opt[JsonString]

InvalidRequest* {.deprecated: "ApplicationError".} = ApplicationError
13 changes: 7 additions & 6 deletions json_rpc/private/client_handler_wrapper.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ func createRpcProc(procName, parameters, callBody: NimNode): NimNode =
# build proc
result = newProc(procName, paramList, callBody)

# make proc async
result.addPragma ident"async"
# export this proc
result[0] = nnkPostfix.newTree(ident"*", newIdentNode($procName))

Expand Down Expand Up @@ -53,11 +51,14 @@ func setupConversion(reqParams, params: NimNode): NimNode =

template maybeUnwrapClientResult*(client, meth, reqParams, returnType): auto =
## Don't decode e.g. JsonString, return as is
when noWrap(typeof returnType):
await client.call(meth, reqParams)
when noWrap(returnType):
client.call(meth, reqParams)
else:
let res = await client.call(meth, reqParams)
decode(JrpcConv, res.string, typeof returnType)
proc complete(f: auto): Future[returnType] {.async.} =
let res = await f
decode(JrpcConv, res.string, returnType)
let fut = client.call(meth, reqParams)
complete(fut)

func createRpcFromSig*(clientType, rpcDecl: NimNode, alias = NimNode(nil)): NimNode =
## This procedure will generate something like this:
Expand Down
Loading