From 622528be12e79c21848cce524dd1060e898e656b Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 14 Nov 2025 19:44:46 +0100 Subject: [PATCH 01/11] clients: reduce mem usage, refactor protocol handling JSON-RPC is modelled as request-response where each response corresponds to one request, sent in the same order. In addition, batching is possible where several requests can be sent as one batch, receiving one batch of responses in return, but still maintaining the general 1:1 messaging structure. This PR exploits this 1:1 relationship between request and response to simplify the client implementation and improve efficieny (reduces mem usage from 1.2gb to 800mb to send a 128mb message - still a lot, but quite a bit better) while at the same time cleaning up error handling and moving the JSON-RPC specifics out of the transports so that they're equal and shared between each transport. The transports themselves now just provide the simple ability to transfer request-response pairs. In doing so, protocol adherence in edge cases is increased to more closely follow the semantics suggested in the spec. * match messages by order, getting rid of unnecessary matching table * move all JSON-RPC protocol implementation details such as encoding/decoding etc to `clients`, avoiding it being spread out and repeated in each transport - this also opens the door to using a different encoding than JSON in the future (ie CBOR) * stream-encode requests and use `seq[byte]` throughout to avoid costly `string` conversions / copies * clean up error raising - in particular, differentiate between transport and protocol errors more clearly and strive to raise similar exceptions in similar situations for each transport * add `maxMessageSize` parameter to each transport, make it work the same * remove socket client reconnect loop to match websockets - possibly it should be re-added to both instead in a future PR * add `raises` to `async`, where relevant * order request/response json fields the way they're ordered in the spec * this makes it more efficient to parse messages following the same order * make parser more spec-compliant, moving some of the validation to an earlier stage in the parsing pipeline * limit the length of string-based `id` fields to avoid having the log spammed * stream-write requests, avoiding an extra copy of the parameters * use raw async procs where applicable to avoid copies and async overhead --- json_rpc/client.nim | 376 +++++++++++--------- json_rpc/clients/httpclient.nim | 231 ++++-------- json_rpc/clients/socketclient.nim | 161 ++++----- json_rpc/clients/websocketclient.nim | 141 +++++++- json_rpc/clients/websocketclientimpl.nim | 162 --------- json_rpc/errors.nim | 29 +- json_rpc/private/client_handler_wrapper.nim | 2 +- json_rpc/private/jrpc_sys.nim | 82 ++++- json_rpc/private/shared_wrapper.nim | 4 +- json_rpc/router.nim | 3 +- json_rpc/rpcproxy.nim | 5 +- json_rpc/server.nim | 13 +- json_rpc/servers/socketserver.nim | 3 + tests/test_client_hook.nim | 2 +- tests/test_jrpc_sys.nim | 32 +- 15 files changed, 583 insertions(+), 663 deletions(-) delete mode 100644 json_rpc/clients/websocketclientimpl.nim diff --git a/json_rpc/client.nim b/json_rpc/client.nim index a198098..057d962 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -10,31 +10,24 @@ {.push raises: [], gcsafe.} import - std/[json, tables, macros], + std/[deques, json, tables, macros], chronicles, + stew/byteutils, chronos, results, - ./jsonmarshal, - ./private/jrpc_sys, - ./private/client_handler_wrapper, - ./private/shared_wrapper, - ./errors + ./private/[client_handler_wrapper, jrpc_sys, shared_wrapper], + ./[errors, jsonmarshal, router] from strutils import replace export - chronos, - tables, - jsonmarshal, - RequestParamsTx, - RequestBatchTx, - ResponseBatchRx, - results + chronos, chronicles, deques, tables, jsonmarshal, RequestParamsTx, RequestBatchTx, + ResponseBatchRx, RequestIdKind, RequestId, RequestTx, RequestParamKind, results logScope: topics = "JSONRPC-CLIENT" -const MaxMessageBodyBytes* = 128 * 1024 * 1024 # 128 MB (JSON encoded) +const defaultMaxMessageSize* = 128 * 1024 * 1024 # 128 MB (JSON encoded) type RpcBatchItem* = object @@ -49,188 +42,245 @@ type error*: Opt[string] result*: JsonString + ResponseFut* = Future[seq[byte]].Raising([CancelledError, JsonRpcError]) RpcClient* = ref object of RootRef - awaiting*: Table[RequestId, Future[JsonString]] lastId: int onDisconnect*: proc() {.gcsafe, raises: [].} + # TODO implement "JSON-RPC pubsub" / bidirectionality and deprecate this onProcessMessage*: proc(client: RpcClient, line: string): Result[bool, string] {.gcsafe, raises: [].} - batchFut*: Future[ResponseBatchRx] + pendingRequests*: Deque[ResponseFut] + remote*: string + # Client identifier, for logging + maxMessageSize*: int GetJsonRpcRequestHeaders* = proc(): seq[(string, string)] {.gcsafe, raises: [].} -# ------------------------------------------------------------------------------ -# Public helpers -# ------------------------------------------------------------------------------ - -func validateResponse(resIndex: int, res: ResponseRx): Result[void, string] = - if res.jsonrpc.isNone: - return err("missing or invalid `jsonrpc` in response " & $resIndex) - - if res.id.isNone: - if res.error.isSome: - let error = JrpcSys.encode(res.error.get) - return err(error) - else: - return err("missing or invalid response id in response " & $resIndex) +func parseResponse*(payload: openArray[byte], T: type): T {.raises: [JsonRpcError].} = + try: + JrpcSys.decode(payload, T) + except SerializationError as exc: + raise (ref RequestDecodeError)( + msg: exc.formatMsg("msg"), payload: @payload, parent: exc + ) - if res.error.isSome: - let error = JrpcSys.encode(res.error.get) - return err(error) +proc processsSingleResponse( + response: sink ResponseRx, id: int +): JsonString {.raises: [JsonRpcError].} = + if response.id.kind != RequestIdKind.riNumber or response.id.num != id: + raise + (ref RequestDecodeError)(msg: "Expected `id` " & $id & ", got " & $response.id) + + case response.kind + of ResponseKind.rkError: + raise (ref JsonRpcError)(msg: JrpcSys.encode(response.error)) + of ResponseKind.rkResult: + move(response.result) + +proc processsSingleResponse*( + body: openArray[byte], id: int +): JsonString {.raises: [JsonRpcError].} = + processsSingleResponse(parseResponse(body, ResponseRx), id) + +template withPendingFut*(client, fut, body: untyped): untyped = + let fut = ResponseFut.init("jsonrpc.client.pending") + client.pendingRequests.addLast fut + body + +proc callOnProcessMessage*( + client: RpcClient, line: openArray[byte] +): Result[bool, string] = + if client.onProcessMessage.isNil.not: + client.onProcessMessage(client, string.fromBytes(line)) + else: + ok(true) - # Up to this point, the result should contains something - if res.result.string.len == 0: - return err("missing or invalid response result in response " & $resIndex) +proc processMessage*(client: RpcClient, line: sink seq[byte]): Result[void, string] = + if not ?client.callOnProcessMessage(line): + return ok() + # Messages are assumed to arrive one by one - even if the future was cancelled, + # we therefore consume one message for every line we don't have to process + if client.pendingRequests.len() == 0: + debug "Received message even though there's nothing queued, dropping", + id = JrpcSys.decode(line, ReqRespHeader).id + return ok() - ok() + let fut = client.pendingRequests.popFirst() + if fut.finished(): # probably cancelled + debug "Future already finished, dropping", state = fut.state() + return ok() -proc processResponse(resIndex: int, - map: var Table[RequestId, int], - responses: var seq[RpcBatchResponse], - response: ResponseRx): Result[void, string] = - let r = validateResponse(resIndex, response) - if r.isErr: - if response.id.isSome: - let id = response.id.get - var index: int - if not map.pop(id, index): - return err("cannot find message id: " & $id & " in response " & $resIndex) - responses[index] = RpcBatchResponse( - error: Opt.some(r.error) - ) - else: - return err(r.error) - else: - let id = response.id.get - var index: int - if not map.pop(id, index): - return err("cannot find message id: " & $id & " in response " & $resIndex) - responses[index] = RpcBatchResponse( - result: response.result - ) + fut.complete(line) ok() -# ------------------------------------------------------------------------------ -# Public helpers -# ------------------------------------------------------------------------------ - -func requestTxEncode*(name: string, params: RequestParamsTx, id: RequestId): string = - let req = requestTx(name, params, id) - JrpcSys.encode(req) - -func requestBatchEncode*(calls: RequestBatchTx): string = - JrpcSys.encode(calls) +proc clearPending*(client: RpcClient, exc: ref JsonRpcError) = + while client.pendingRequests.len > 0: + let fut = client.pendingRequests.popFirst() + if not fut.finished(): + fut.fail(exc) # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc getNextId*(client: RpcClient): RequestId = +proc getNextId(client: RpcClient): int = client.lastId += 1 - RequestId(kind: riNumber, num: client.lastId) - -method call*(client: RpcClient, name: string, - params: RequestParamsTx): Future[JsonString] - {.base, async.} = - raiseAssert("`RpcClient.call` not implemented") + client.lastId -proc call*(client: RpcClient, name: string, - params: JsonNode): Future[JsonString] - {.async: (raw: true).} = - client.call(name, params.paramsTx) +method request( + client: RpcClient, reqData: seq[byte] +): Future[seq[byte]] {.base, async: (raises: [CancelledError, JsonRpcError]).} = + raiseAssert("`RpcClient.request` not implemented") method close*(client: RpcClient): Future[void] {.base, async: (raises: []).} = raiseAssert("`RpcClient.close` not implemented") -method callBatch*(client: RpcClient, - calls: RequestBatchTx): Future[ResponseBatchRx] - {.base, async.} = - raiseAssert("`RpcClient.callBatch` not implemented") - -proc processMessage*(client: RpcClient, line: string): Result[void, string] = - if client.onProcessMessage.isNil.not: - let fallBack = client.onProcessMessage(client, line).valueOr: - return err(error) - if not fallBack: - return ok() - - try: - let batch = JrpcSys.decode(line, ResponseBatchRx) - if batch.kind == rbkMany: - if client.batchFut.isNil or client.batchFut.finished(): - client.batchFut = newFuture[ResponseBatchRx]() - client.batchFut.complete(batch) - return ok() - - let response = batch.single - if response.jsonrpc.isNone: - return err("missing or invalid `jsonrpc`") - - let id = response.id.valueOr: - if response.error.isSome: - let error = JrpcSys.encode(response.error.get) - return err(error) - else: - return err("missing or invalid response id") - - var requestFut: Future[JsonString] - if not client.awaiting.pop(id, requestFut): - return err("Cannot find message id \"" & $id & "\"") - - if response.error.isSome: - let error = JrpcSys.encode(response.error.get) - requestFut.fail(newException(JsonRpcError, error)) - return ok() - - # Up to this point, the result should contains something - if response.result.string.len == 0: - let msg = "missing or invalid response result" - requestFut.fail(newException(JsonRpcError, msg)) - return ok() - - debug "Received JSON-RPC response", - len = string(response.result).len, id - requestFut.complete(response.result) - return ok() +proc call*( + client: RpcClient, name: string, params: RequestParamsTx +): Future[JsonString] {.async: (raises: [CancelledError, JsonRpcError], raw: true).} = + ## Perform an RPC call returning the `result` of the call + let + # We don't really need an id since exchanges happen in order but using one + # helps debugging, if nothing else + id = client.getNextId() + requestData = JrpcSys.withWriter(writer): + writer.requestTxEncode(name, params, id) + + debug "Sending JSON-RPC request", + name, len = requestData.len, id, remote = client.remote + + # Release params memory earlier by using a raw proc for the initial + # processing + proc complete( + client: RpcClient, request: auto, id: int + ): Future[JsonString] {.async: (raises: [CancelledError, JsonRpcError]).} = + try: + let resData = await request + + debug "Processing JSON-RPC response", + len = resData.len, id, remote = client.remote + processsSingleResponse(resData, id) + except JsonRpcError as exc: + debug "JSON-RPC request failed", err = exc.msg, id, remote = client.remote + raise exc + + let req = client.request(requestData) + client.complete(req, id) + +proc call*( + client: RpcClient, name: string, params: JsonNode +): Future[JsonString] {.async: (raises: [CancelledError, JsonRpcError], raw: true).} = + client.call(name, params.paramsTx) - except CatchableError as exc: - return err(exc.msg) +proc callBatch*( + client: RpcClient, calls: seq[RequestTx] +): Future[seq[ResponseRx]] {. + async: (raises: [CancelledError, JsonRpcError], raw: true) +.} = + if calls.len == 0: + let res = Future[seq[ResponseRx]].Raising([CancelledError, JsonRpcError]).init( + "empty batch" + ) + res.complete(default(seq[ResponseRx])) + return res + + let requestData = JrpcSys.withWriter(writer): + writer.writeArray: + for call in calls: + writer.writeValue(call) + + debug "Sending JSON-RPC batch", len = requestData.len, remote = client.remote + + proc complete( + client: RpcClient, request: auto + ): Future[seq[ResponseRx]] {.async: (raises: [CancelledError, JsonRpcError]).} = + try: + let resData = await request + debug "Processing JSON-RPC batch response", + len = resData.len, remote = client.remote + parseResponse(resData, seq[ResponseRx]) + except JsonRpcError as exc: + debug "JSON-RPC batch request failed", err = exc.msg, remote = client.remote + raise exc + + let req = client.request(requestData) + client.complete(req) proc prepareBatch*(client: RpcClient): RpcBatchCallRef = RpcBatchCallRef(client: client) -proc send*(batch: RpcBatchCallRef): - Future[Result[seq[RpcBatchResponse], string]] {. - async: (raises: []).} = - var - calls = RequestBatchTx( - kind: rbkMany, - many: newSeqOfCap[RequestTx](batch.batch.len), +proc send*( + batch: RpcBatchCallRef +): Future[Result[seq[RpcBatchResponse], string]] {. + async: (raises: [CancelledError], raw: true) +.} = + if batch.batch.len == 0: + let res = Future[Result[seq[RpcBatchResponse], string]] + .Raising([CancelledError]) + .init("empty batch") + res.complete( + Result[seq[RpcBatchResponse], string].ok(default(seq[RpcBatchResponse])) ) - responses = newSeq[RpcBatchResponse](batch.batch.len) - map = initTable[RequestId, int]() - - for item in batch.batch: - let id = batch.client.getNextId() - map[id] = calls.many.len - calls.many.add requestTx(item.meth, item.params, id) + return res + + var lastId: int + var map = initTable[int, int]() + + let requestData = JrpcSys.withWriter(writer): + writer.writeArray: + for i, item in batch.batch: + lastId = batch.client.getNextId() + map[lastId] = i + writer.writeValue(requestTx(item.meth, item.params, lastId)) + + debug "Sending JSON-RPC batch", + len = requestData.len, lastId, remote = batch.client.remote + + proc complete( + client: RpcClient, request: auto, map: sink auto, lastId: int + ): Future[Result[seq[RpcBatchResponse], string]] {.async: (raises: [CancelledError]).} = + var + res = + try: + let resData = await request + debug "Processing JSON-RPC batch response", + len = resData.len, lastId, remote = client.remote + + parseResponse(resData, seq[ResponseRx]) + except JsonRpcError as exc: + debug "JSON-RPC batch request failed", err = exc.msg, remote = client.remote + + return err(exc.msg) + responses = newSeq[RpcBatchResponse](map.len) + + for i, response in res.mpairs(): + let id = response.id.num + var index: int + if not map.pop(id, index): + return err("cannot find message id: " & $lastId & " in response " & $i) + + case response.kind + of ResponseKind.rkError: + responses[index] = + RpcBatchResponse(error: Opt.some(JrpcSys.encode(response.error))) + of ResponseKind.rkResult: + responses[index] = RpcBatchResponse(result: move(response.result)) + + # In case the response is incomplete, we should say something about the + # missing requests + for _, index in map: + responses[index] = RpcBatchResponse( + error: Opt.some( + JrpcSys.encode( + ResponseError(code: INTERNAL_ERROR, message: "Missing response from server") + ) + ) + ) - try: - let res = await batch.client.callBatch(calls) - if res.kind == rbkSingle: - let r = processResponse(0, map, responses, res.single) - if r.isErr: - return err(r.error) - else: - for i, z in res.many: - let r = processResponse(i, map, responses, z) - if r.isErr: - return err(r.error) - except CatchableError as exc: - return err(exc.msg) - - return ok(responses) + ok(responses) + let req = batch.client.request(requestData) + batch.client.complete(req, map, lastId) # ------------------------------------------------------------------------------ # Signature processing diff --git a/json_rpc/clients/httpclient.nim b/json_rpc/clients/httpclient.nim index 3c73b6f..78a8914 100644 --- a/json_rpc/clients/httpclient.nim +++ b/json_rpc/clients/httpclient.nim @@ -1,5 +1,5 @@ # json-rpc -# Copyright (c) 2019-2024 Status Research & Development GmbH +# Copyright (c) 2019-2025 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -10,21 +10,12 @@ {.push raises: [], gcsafe.} import - std/[tables, uri], - stew/byteutils, - results, + std/uri, chronos/apps/http/httpclient, - chronicles, httputils, - json_serialization/std/net as jsnet, - ../client, - ../errors, - ../private/jrpc_sys + httputils, + ../[client, errors] -export - client, errors, jsnet, HttpClientFlag, HttpClientFlags - -logScope: - topics = "JSONRPC-HTTP-CLIENT" +export client, errors, HttpClientFlag, HttpClientFlags type HttpClientOptions* = object @@ -33,53 +24,32 @@ type RpcHttpClient* = ref object of RpcClient httpSession: HttpSessionRef httpAddress: HttpAddress - maxBodySize: int getHeaders: GetJsonRpcRequestHeaders -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc `$`(v: HttpAddress): string = - v.id - -proc new( - T: type RpcHttpClient, maxBodySize = MaxMessageBodyBytes, secure = false, - getHeaders: GetJsonRpcRequestHeaders = nil, flags: HttpClientFlags = {}): T = - +proc new*( + T: type RpcHttpClient, + secure = false, + getHeaders: GetJsonRpcRequestHeaders = nil, + flags: HttpClientFlags = {}, + maxMessageSize = defaultMaxMessageSize, +): T = var moreFlags: HttpClientFlags if secure: moreFlags.incl HttpClientFlag.NoVerifyHost moreFlags.incl HttpClientFlag.NoVerifyServerName T( - maxBodySize: maxBodySize, + maxMessageSize: maxMessageSize, httpSession: HttpSessionRef.new(flags = flags + moreFlags), - getHeaders: getHeaders + getHeaders: getHeaders, ) -template closeRefs(req, res: untyped) = - # We can't trust try/finally in async/await in all nim versions, so we - # do it manually instead - if req != nil: - try: - await req.closeWait() - except CatchableError as exc: # shouldn't happen - debug "Error closing JSON-RPC HTTP resuest/response", err = exc.msg - discard exc - - if res != nil: - try: - await res.closeWait() - except CatchableError as exc: # shouldn't happen - debug "Error closing JSON-RPC HTTP resuest/response", err = exc.msg - discard exc - -proc callImpl(client: RpcHttpClient, reqBody: string): Future[string] {.async.} = +method request( + client: RpcHttpClient, reqData: seq[byte] +): Future[seq[byte]] {.async: (raises: [CancelledError, JsonRpcError]).} = doAssert client.httpSession != nil if client.httpAddress.addresses.len == 0: - raise newException(RpcPostError, "Not connected") - + raise newException(RpcTransportError, "No remote addresses to connect to") var headers = if not isNil(client.getHeaders): @@ -88,130 +58,60 @@ proc callImpl(client: RpcHttpClient, reqBody: string): Future[string] {.async.} @[] headers.add(("Content-Type", "application/json")) - var req: HttpClientRequestRef - var res: HttpClientResponseRef - - req = HttpClientRequestRef.post(client.httpSession, - client.httpAddress, - body = reqBody.toOpenArrayByte(0, reqBody.len - 1), - headers = headers) - res = - try: - await req.send() - except CancelledError as e: - debug "Cancelled POST Request with JSON-RPC", e = e.msg - closeRefs(req, res) - raise e - except CatchableError as e: - debug "Failed to send POST Request with JSON-RPC", e = e.msg - closeRefs(req, res) - raise (ref RpcPostError)(msg: "Failed to send POST Request with JSON-RPC: " & e.msg, parent: e) - - if res.status < 200 or res.status >= 300: # res.status is not 2xx (success) - debug "Unsuccessful POST Request with JSON-RPC", - status = res.status, reason = res.reason - closeRefs(req, res) - raise (ref ErrorResponse)(status: res.status, msg: res.reason) - - let resBytes = - try: - await res.getBodyBytes(client.maxBodySize) - except CancelledError as e: - debug "Cancelled POST Response for JSON-RPC", e = e.msg - closeRefs(req, res) - raise e - except CatchableError as e: - debug "Failed to read POST Response for JSON-RPC", e = e.msg - closeRefs(req, res) - raise (ref FailedHttpResponse)(msg: "Failed to read POST Response for JSON-RPC: " & e.msg, parent: e) - - result = string.fromBytes(resBytes) - trace "Response", text = result - closeRefs(req, res) - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ + let + req = HttpClientRequestRef.post( + client.httpSession, client.httpAddress, body = reqData, headers = headers + ) + + res = + try: + await req.send() + except HttpError as exc: + raise (ref RpcPostError)(msg: exc.msg, parent: exc) + finally: + await req.closeWait() + + try: + if res.status < 200 or res.status >= 300: # res.status is not 2xx (success) + raise (ref ErrorResponse)(status: res.status, msg: res.reason) + + let + resData = await res.getBodyBytes(client.maxMessageSize) + # TODO remove this processMessage hook when subscriptions / pubsub is + # properly supported + fallback = client.callOnProcessMessage(resData).valueOr: + raise (ref RequestDecodeError)(msg: error, payload: resData) + + if not fallback: + # TODO http channels are unidirectional, so it doesn't really make sense + # to call onProcessMessage from http - this should be deprecated + # as soon as bidirectionality is supported + raise (ref InvalidResponse)(msg: "onProcessMessage handled response") + + resData + except HttpError as exc: + raise (ref RpcTransportError)(msg: exc.msg, parent: exc) + finally: + await req.closeWait() proc newRpcHttpClient*( - maxBodySize = MaxMessageBodyBytes, secure = false, + maxBodySize = defaultMaxMessageSize, + secure = false, getHeaders: GetJsonRpcRequestHeaders = nil, - flags: HttpClientFlags = {}): RpcHttpClient = - RpcHttpClient.new(maxBodySize, secure, getHeaders, flags) + flags: HttpClientFlags = {}, +): RpcHttpClient = + RpcHttpClient.new(secure, getHeaders, flags, maxBodySize) -method call*(client: RpcHttpClient, name: string, - params: RequestParamsTx): Future[JsonString] - {.async.} = - let - id = client.getNextId() - reqBody = requestTxEncode(name, params, id) - - debug "Sending JSON-RPC request", - address = $client.httpAddress, len = len(reqBody), name, id - trace "Message", msg = reqBody - - let resText = await client.callImpl(reqBody) - - # completed by processMessage - the flow is quite weird here to accomodate - # socket and ws clients, but could use a more thorough refactoring - var newFut = newFuture[JsonString]() - # add to awaiting responses - client.awaiting[id] = newFut - - # Might error for all kinds of reasons - let msgRes = client.processMessage(resText) - if msgRes.isErr: - # Need to clean up in case the answer was invalid - let exc = newException(JsonRpcError, msgRes.error) - newFut.fail(exc) - client.awaiting.del(id) - raise exc - - client.awaiting.del(id) - - # processMessage should have completed this future - if it didn't, `read` will - # raise, which is reasonable - if newFut.finished: - return newFut.read() - else: - # TODO: Provide more clarity regarding the failure here - debug "Invalid POST Response for JSON-RPC" - raise newException(InvalidResponse, "Invalid response") - -method callBatch*(client: RpcHttpClient, - calls: RequestBatchTx): Future[ResponseBatchRx] - {.async.} = - let reqBody = requestBatchEncode(calls) - debug "Sending JSON-RPC batch", - address = $client.httpAddress, len = len(reqBody) - let resText = await client.callImpl(reqBody) - - if client.batchFut.isNil or client.batchFut.finished(): - client.batchFut = newFuture[ResponseBatchRx]() - - # Might error for all kinds of reasons - let msgRes = client.processMessage(resText) - if msgRes.isErr: - # Need to clean up in case the answer was invalid - debug "Failed to process POST Response for JSON-RPC", msg = msgRes.error - let exc = newException(JsonRpcError, msgRes.error) - client.batchFut.fail(exc) - raise exc - - # processMessage should have completed this future - if it didn't, `read` will - # raise, which is reasonable - if client.batchFut.finished: - return client.batchFut.read() - else: - # TODO: Provide more clarity regarding the failure here - debug "Invalid POST Response for JSON-RPC" - raise newException(InvalidResponse, "Invalid response") - -proc connect*(client: RpcHttpClient, url: string) {.async.} = +proc connect*( + client: RpcHttpClient, url: string +) {.async: (raises: [CancelledError, JsonRpcError]).} = client.httpAddress = client.httpSession.getAddress(url).valueOr: raise newException(RpcAddressUnresolvableError, error) + client.remote = client.httpAddress.id -proc connect*(client: RpcHttpClient, address: string, port: Port, secure: bool) {.async.} = +proc connect*( + client: RpcHttpClient, address: string, port: Port, secure: bool +) {.async: (raises: [CancelledError, JsonRpcError]).} = let uri = Uri( scheme: if secure: "https" else: "http", hostname: address, @@ -219,6 +119,7 @@ proc connect*(client: RpcHttpClient, address: string, port: Port, secure: bool) client.httpAddress = getAddress(client.httpSession, uri).valueOr: raise newException(RpcAddressUnresolvableError, error) + client.remote = client.httpAddress.id method close*(client: RpcHttpClient) {.async: (raises: []).} = if not client.httpSession.isNil: diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index 0017df4..fa47daf 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -1,5 +1,5 @@ # json-rpc -# Copyright (c) 2019-2024 Status Research & Development GmbH +# Copyright (c) 2019-2025 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -10,19 +10,11 @@ {.push raises: [], gcsafe.} import - std/tables, - chronicles, - results, - chronos, - json_serialization/std/net as jsnet, - ../client, - ../errors, + stew/byteutils, + ../[client, errors], ../private/jrpc_sys -export client, errors, jsnet - -logScope: - topics = "JSONRPC-SOCKET-CLIENT" +export client, errors type RpcSocketClient* = ref object of RpcClient @@ -30,111 +22,88 @@ type address*: TransportAddress loop*: Future[void] -const defaultMaxRequestLength* = 1024 * 128 - -proc new*(T: type RpcSocketClient): T = - T() +proc new*(T: type RpcSocketClient, maxMessageSize = defaultMaxMessageSize): T = + T(maxMessageSize: maxMessageSize) -proc newRpcSocketClient*: RpcSocketClient = +proc newRpcSocketClient*(maxMessageSize = defaultMaxMessageSize): RpcSocketClient = ## Creates a new client instance. - RpcSocketClient.new() + RpcSocketClient.new(maxMessageSize) -method call*(client: RpcSocketClient, name: string, - params: RequestParamsTx): Future[JsonString] {.async.} = +method request( + client: RpcSocketClient, reqData: seq[byte] +): Future[seq[byte]] {.async: (raises: [CancelledError, JsonRpcError]).} = ## Remotely calls the specified RPC method. if client.transport.isNil: - raise newException(JsonRpcError, - "Transport is not initialised (missing a call to connect?)") + raise newException( + RpcTransportError, "Transport is not initialised (missing a call to connect?)" + ) + let transport = client.transport - let - id = client.getNextId() - reqBody = requestTxEncode(name, params, id) & "\r\n" - newFut = newFuture[JsonString]() # completed by processMessage - - # add to awaiting responses - client.awaiting[id] = newFut + client.withPendingFut(fut): + try: + discard await transport.write(reqData) + discard await transport.write("\r\n") + except TransportError as exc: + # If there's an error sending, the "next messages" facility will be + # broken since we don't know if the server observed the message or not + transport.close() + raise (ref RpcPostError)(msg: exc.msg, parent: exc) - debug "Sending JSON-RPC request", - address = $client.address, len = len(reqBody), name, id + await fut - let res = await client.transport.write(reqBody) - # TODO: Add actions when not full packet was send, e.g. disconnect peer. - doAssert(res == reqBody.len) +proc processData(client: RpcSocketClient) {.async: (raises: []).} = + let transport = client.transport + var lastError: ref JsonRpcError + while true: + let data = + try: + await transport.readLine(client.maxMessageSize) + except CatchableError as exc: + lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc) + break - return await newFut + if data == "": + break -method callBatch*(client: RpcSocketClient, - calls: RequestBatchTx): Future[ResponseBatchRx] - {.async.} = - if client.transport.isNil: - raise newException(JsonRpcError, - "Transport is not initialised (missing a call to connect?)") + client.processMessage(data.toBytes()).isOkOr: + lastError = (ref RequestDecodeError)(msg: error, payload: data.toBytes()) + break - if client.batchFut.isNil or client.batchFut.finished(): - client.batchFut = newFuture[ResponseBatchRx]() + if lastError == nil: + lastError = (ref RpcTransportError)(msg: "Connection closed") - let reqBody = requestBatchEncode(calls) & "\r\n" - debug "Sending JSON-RPC batch", - address = $client.address, len = len(reqBody) - let res = await client.transport.write(reqBody) + client.clearPending(lastError) - # TODO: Add actions when not full packet was send, e.g. disconnect peer. - doAssert(res == reqBody.len) + await transport.closeWait() + client.transport = nil + if not client.onDisconnect.isNil: + client.onDisconnect() - return await client.batchFut -proc processData(client: RpcSocketClient) {.async: (raises: []).} = - while true: - var localException: ref JsonRpcError - while true: - try: - var value = await client.transport.readLine(defaultMaxRequestLength) - if value == "": - # transmission ends - await client.transport.closeWait() - break - - let res = client.processMessage(value) - if res.isErr: - localException = newException(JsonRpcError, res.error) - break - except TransportError as exc: - localException = newException(JsonRpcError, exc.msg) - await client.transport.closeWait() - break - except CancelledError as exc: - localException = newException(JsonRpcError, exc.msg) - await client.transport.closeWait() - break - - if localException.isNil.not: - for _,fut in client.awaiting: - fut.fail(localException) - if client.batchFut.isNil.not and not client.batchFut.completed(): - client.batchFut.fail(localException) - - # async loop reconnection and waiting +proc connect*( + client: RpcSocketClient, address: TransportAddress +) {.async: (raises: [CancelledError, JsonRpcError]).} = + client.transport = try: - info "Reconnect to server", address=`$`(client.address) - client.transport = await connect(client.address) + await connect(address) except TransportError as exc: - error "Error when reconnecting to server", msg=exc.msg - break - except CancelledError as exc: - debug "Server connection was cancelled", msg=exc.msg - break + raise (ref RpcTransportError)(msg: exc.msg, parent: exc) -proc connect*(client: RpcSocketClient, address: string, port: Port) {.async.} = - let addresses = resolveTAddress(address, port) - client.transport = await connect(addresses[0]) - client.address = addresses[0] - client.loop = processData(client) - -proc connect*(client: RpcSocketClient, address: TransportAddress) {.async.} = - client.transport = await connect(address) client.address = address + client.remote = $client.address client.loop = processData(client) +proc connect*( + client: RpcSocketClient, address: string, port: Port +) {.async: (raises: [CancelledError, JsonRpcError]).} = + let addresses = + try: + resolveTAddress(address, port) + except TransportError as exc: + raise (ref RpcTransportError)(msg: exc.msg, parent: exc) + + await client.connect(addresses[0]) + method close*(client: RpcSocketClient) {.async: (raises: []).} = await client.loop.cancelAndWait() if not client.transport.isNil: diff --git a/json_rpc/clients/websocketclient.nim b/json_rpc/clients/websocketclient.nim index 20a4612..489d295 100644 --- a/json_rpc/clients/websocketclient.nim +++ b/json_rpc/clients/websocketclient.nim @@ -1,5 +1,5 @@ # json-rpc -# Copyright (c) 2019-2023 Status Research & Development GmbH +# Copyright (c) 2019-2025 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -10,13 +10,136 @@ {.push raises: [], gcsafe.} import - ./websocketclientimpl, - ../client + std/uri, + chronicles, + websock/[websock, extensions/compression/deflate], + chronos/apps/http/httptable, + ../[client, errors], + ../private/jrpc_sys -# this weird arrangement is to avoid clash -# between Json.encode and Base64Pad.encode +export client, errors -export - websocketclientimpl, - client, - errors +type + RpcWebSocketClient* = ref object of RpcClient + transport*: WSSession + uri*: Uri + loop*: Future[void] + getHeaders*: GetJsonRpcRequestHeaders + +proc new*( + T: type RpcWebSocketClient, + getHeaders: GetJsonRpcRequestHeaders = nil, + maxMessageSize = defaultMaxMessageSize, +): T = + T(getHeaders: getHeaders, maxMessageSize: maxMessageSize) + +proc newRpcWebSocketClient*( + getHeaders: GetJsonRpcRequestHeaders = nil, maxMessageSize = defaultMaxMessageSize +): RpcWebSocketClient = + ## Creates a new client instance. + RpcWebSocketClient.new(getHeaders, maxMessageSize) + +method request*( + client: RpcWebSocketClient, reqData: seq[byte] +): Future[seq[byte]] {.async: (raises: [CancelledError, JsonRpcError]).} = + ## Remotely calls the specified RPC method. + if client.transport.isNil: + raise newException( + RpcTransportError, "Transport is not initialised (missing a call to connect?)" + ) + let transport = client.transport + + client.withPendingFut(fut): + try: + await transport.send(reqData, Opcode.Binary) + except CancelledError as exc: + raise exc + except CatchableError as exc: + # If there's an error sending, the "next messages" facility will be + # broken since we don't know if the server observed the message or not + try: + await noCancel transport.close() + except CatchableError as exc: + # TODO https://github.com/status-im/nim-websock/pull/178 + raiseAssert "Doesn't actually raise" + raise (ref RpcPostError)(msg: exc.msg, parent: exc) + + await fut + +proc processData(client: RpcWebSocketClient) {.async: (raises: []).} = + let transport = client.transport + var lastError: ref JsonRpcError + while client.transport.readyState != ReadyState.Closed: + var data = + try: + await client.transport.recvMsg(client.maxMessageSize) + except CatchableError as exc: + lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc) + break + + client.processMessage(data).isOkOr: + lastError = (ref RequestDecodeError)(msg: error, payload: data) + break + + if lastError == nil: + lastError = (ref RpcTransportError)(msg: "Connection closed") + + client.clearPending(lastError) + + try: + await client.transport.close() + client.transport = nil + except CatchableError: + raiseAssert "Doesn't actually raise" + + if not client.onDisconnect.isNil: + client.onDisconnect() + +proc addExtraHeaders( + headers: var HttpTable, + client: RpcWebSocketClient, + extraHeaders: HttpTable) = + # Apply client instance overrides + if client.getHeaders != nil: + for header in client.getHeaders(): + headers.add(header[0], header[1]) + + # Apply call specific overrides + for header in extraHeaders.stringItems: + headers.add(header.key, header.value) + + # Apply default origin + discard headers.hasKeyOrPut("Origin", "http://localhost") + +proc connect*( + client: RpcWebSocketClient, + uri: string, + extraHeaders: HttpTable = default(HttpTable), + compression = false, + hooks: seq[Hook] = @[], + flags: set[TLSFlags] = {}) {.async: (raises: [CancelledError, JsonRpcError]).} = + proc headersHook(ctx: Hook, headers: var HttpTable): Result[void, string] = + headers.addExtraHeaders(client, extraHeaders) + ok() + var ext: seq[ExtFactory] = if compression: @[deflateFactory()] + else: @[] + let uri = parseUri(uri) + let ws = try: + await WebSocket.connect( + uri=uri, + factories=ext, + hooks=hooks & Hook(append: headersHook), + flags=flags) + except CancelledError as exc: + raise exc + except CatchableError as exc: + # TODO https://github.com/status-im/nim-websock/pull/178 + raise (ref RpcTransportError)(msg: exc.msg, parent: exc) + + client.transport = ws + client.uri = uri + client.remote = uri.hostname & ":" & uri.port + client.loop = processData(client) + +method close*(client: RpcWebSocketClient) {.async: (raises: []).} = + await client.loop.cancelAndWait() diff --git a/json_rpc/clients/websocketclientimpl.nim b/json_rpc/clients/websocketclientimpl.nim deleted file mode 100644 index 1619ab0..0000000 --- a/json_rpc/clients/websocketclientimpl.nim +++ /dev/null @@ -1,162 +0,0 @@ -# json-rpc -# Copyright (c) 2019-2024 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -# * MIT license ([LICENSE-MIT](LICENSE-MIT)) -# at your option. -# This file may not be copied, modified, or distributed except according to -# those terms. - -{.push raises: [], gcsafe.} - -import - std/uri, - pkg/websock/[websock, extensions/compression/deflate], - pkg/[chronos, chronos/apps/http/httptable, chronicles], - stew/byteutils, - ../errors - -# avoid clash between Json.encode and Base64Pad.encode -import ../client except encode - -logScope: - topics = "JSONRPC-WS-CLIENT" - -type - RpcWebSocketClient* = ref object of RpcClient - transport*: WSSession - uri*: Uri - loop*: Future[void] - getHeaders*: GetJsonRpcRequestHeaders - -proc new*( - T: type RpcWebSocketClient, getHeaders: GetJsonRpcRequestHeaders = nil): T = - T(getHeaders: getHeaders) - -proc newRpcWebSocketClient*( - getHeaders: GetJsonRpcRequestHeaders = nil): RpcWebSocketClient = - ## Creates a new client instance. - RpcWebSocketClient.new(getHeaders) - -method call*(client: RpcWebSocketClient, name: string, - params: RequestParamsTx): Future[JsonString] {.async.} = - ## Remotely calls the specified RPC method. - if client.transport.isNil: - raise newException(JsonRpcError, - "Transport is not initialised (missing a call to connect?)") - - let - id = client.getNextId() - reqBody = requestTxEncode(name, params, id) & "\r\n" - newFut = newFuture[JsonString]() # completed by processMessage - # add to awaiting responses - client.awaiting[id] = newFut - - debug "Sending JSON-RPC request", - address = $client.uri, len = len(reqBody), name - - await client.transport.send(reqBody) - return await newFut - -method callBatch*(client: RpcWebSocketClient, - calls: RequestBatchTx): Future[ResponseBatchRx] - {.async.} = - if client.transport.isNil: - raise newException(JsonRpcError, - "Transport is not initialised (missing a call to connect?)") - - if client.batchFut.isNil or client.batchFut.finished(): - client.batchFut = newFuture[ResponseBatchRx]() - - let reqBody = requestBatchEncode(calls) & "\r\n" - debug "Sending JSON-RPC batch", - address = $client.uri, len = len(reqBody) - await client.transport.send(reqBody) - - return await client.batchFut - -proc processData(client: RpcWebSocketClient) {.async.} = - var error: ref CatchableError - - template processError() = - for k, v in client.awaiting: - v.fail(error) - if client.batchFut.isNil.not and not client.batchFut.completed(): - client.batchFut.fail(error) - client.awaiting.clear() - - let ws = client.transport - try: - while ws.readyState != ReadyState.Closed: - var value = await ws.recvMsg(MaxMessageBodyBytes) - - if value.len == 0: - # transmission ends - break - - let res = client.processMessage(string.fromBytes(value)) - if res.isErr: - error = newException(JsonRpcError, res.error) - processError() - - except CatchableError as e: - error = e - - await client.transport.close() - - client.transport = nil - - if client.awaiting.len != 0: - if error.isNil: - error = newException(IOError, "Transport was closed while waiting for response") - processError() - if not client.onDisconnect.isNil: - client.onDisconnect() - -proc addExtraHeaders( - headers: var HttpTable, - client: RpcWebSocketClient, - extraHeaders: HttpTable) = - # Apply client instance overrides - if client.getHeaders != nil: - for header in client.getHeaders(): - headers.add(header[0], header[1]) - - # Apply call specific overrides - for header in extraHeaders.stringItems: - headers.add(header.key, header.value) - - # Apply default origin - discard headers.hasKeyOrPut("Origin", "http://localhost") - -proc connect*( - client: RpcWebSocketClient, - uri: string, - extraHeaders: HttpTable = default(HttpTable), - compression = false, - hooks: seq[Hook] = @[], - flags: set[TLSFlags] = {}) {.async.} = - proc headersHook(ctx: Hook, headers: var HttpTable): Result[void, string] = - headers.addExtraHeaders(client, extraHeaders) - ok() - var ext: seq[ExtFactory] = if compression: @[deflateFactory()] - else: @[] - let uri = parseUri(uri) - let ws = await WebSocket.connect( - uri=uri, - factories=ext, - hooks=hooks & Hook(append: headersHook), - flags=flags) - client.transport = ws - client.uri = uri - client.loop = processData(client) - -method close*(client: RpcWebSocketClient) {.async: (raises: []).} = - await client.loop.cancelAndWait() - if not client.transport.isNil: - try: - # TODO https://github.com/status-im/nim-websock/pull/178 - await noCancel client.transport.close() - except CatchableError as exc: - warn "Unexpected exception while closing transport", err = exc.msg - client.transport = nil diff --git a/json_rpc/errors.nim b/json_rpc/errors.nim index 2b9a519..f8b4128 100644 --- a/json_rpc/errors.nim +++ b/json_rpc/errors.nim @@ -1,5 +1,5 @@ # json-rpc -# Copyright (c) 2019-2024 Status Research & Development GmbH +# Copyright (c) 2019-2025 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) # * MIT license ([LICENSE-MIT](LICENSE-MIT)) @@ -15,19 +15,25 @@ type JsonRpcError* = object of CatchableError ## Base type of all nim-json-rpc errors - ErrorResponse* = object of JsonRpcError + RpcTransportError* = object of JsonRpcError + ## Raised when there is an issue with the underlying transport - the parent + ## exception may be set to provide more information + + FailedHttpResponse* {.deprecated: "RpcTransportError".} = RpcTransportError + ## Obsolete name for RpcTransportError + + ErrorResponse* = object of RpcTransportError status*: int - ## raised when the server responded with an error + ## Raised when the server responds with a HTTP-style error status code + ## indicating that the call was not processed + + RpcPostError* = object of RpcTransportError + ## raised when the underlying transport fails to send the request - the + ## underlying client may or may not have received the request InvalidResponse* = object of JsonRpcError ## raised when the server response violates the JSON-RPC protocol - FailedHttpResponse* = object of JsonRpcError - ## raised when fail to read the underlying HTTP server response - - RpcPostError* = object of JsonRpcError - ## raised when the client fails to send the POST request with JSON-RPC - RpcBindError* = object of JsonRpcError RpcAddressUnresolvableError* = object of JsonRpcError @@ -37,9 +43,7 @@ type RequestDecodeError* = object of JsonRpcError ## raised when fail to decode RequestRx - - ParamsEncodeError* = object of JsonRpcError - ## raised when fail to encode RequestParamsTx + payload*: seq[byte] ApplicationError* = object of JsonRpcError ## Error to be raised by the application request handlers when the server @@ -47,5 +51,4 @@ type ## be outside the range of -32768 to -32000. A custom JSON data object may ## be provided. code*: int - message*: string data*: results.Opt[JsonString] diff --git a/json_rpc/private/client_handler_wrapper.nim b/json_rpc/private/client_handler_wrapper.nim index c0a7f70..c558c51 100644 --- a/json_rpc/private/client_handler_wrapper.nim +++ b/json_rpc/private/client_handler_wrapper.nim @@ -132,7 +132,7 @@ func createRpcFromSig*(clientType, rpcDecl: NimNode, alias = NimNode(nil)): NimN result.add createBatchCallProc(procName, batchParams, batchCallBody) when defined(nimDumpRpcs): - echo pathStr, ":\n", result.repr + debugEcho pathStr, ":\n", result.repr func processRpcSigs*(clientType, parsedCode: NimNode): NimNode = result = newStmtList() diff --git a/json_rpc/private/jrpc_sys.nim b/json_rpc/private/jrpc_sys.nim index 1bd4d42..2e10aea 100644 --- a/json_rpc/private/jrpc_sys.nim +++ b/json_rpc/private/jrpc_sys.nim @@ -59,26 +59,30 @@ type RequestId* = object case kind*: RequestIdKind + of riNull: + discard of riNumber: num*: int of riString: str*: string - of riNull: - discard + + ReqRespHeader* = object + ## Helper type to extract id from message (for example for logging) + id* : results.Opt[RequestId] # Request received by server RequestRx* = object jsonrpc* : results.Opt[JsonRPC2] - id* : RequestId `method`*: results.Opt[string] params* : RequestParamsRx + id* : RequestId # Request sent by client RequestTx* = object jsonrpc* : JsonRPC2 - id* : results.Opt[RequestId] `method`*: string params* : RequestParamsTx + id* : results.Opt[RequestId] ResponseError* = object code* : int @@ -86,25 +90,28 @@ type data* : results.Opt[JsonString] ResponseKind* = enum - rkResult rkError + rkResult # Response sent by server ResponseTx* = object jsonrpc* : JsonRPC2 - id* : RequestId case kind*: ResponseKind of rkResult: result* : JsonString of rkError: error* : ResponseError + id* : RequestId # Response received by client ResponseRx* = object - jsonrpc*: results.Opt[JsonRPC2] - id* : results.Opt[RequestId] - result* : JsonString - error* : results.Opt[ResponseError] + jsonrpc*: JsonRPC2 + case kind*: ResponseKind + of rkResult: + result* : JsonString + of rkError: + error* : ResponseError + id* : RequestId ReBatchKind* = enum rbkSingle @@ -150,9 +157,12 @@ createJsonFlavor JrpcSys, ResponseError.useDefaultSerializationIn JrpcSys RequestTx.useDefaultWriterIn JrpcSys RequestRx.useDefaultReaderIn JrpcSys +ReqRespHeader.useDefaultReaderIn JrpcSys const JsonRPC2Literal = JsonString("\"2.0\"") + MaxIdStringLength = 256 + ## Maximum length of id, when represented as a string (to avoid spam) func hash*(x: RequestId): hashes.Hash = var h = 0.Hash @@ -197,7 +207,7 @@ proc readValue*(r: var JsonReader[JrpcSys], val: var RequestId) of JsonValueKind.Number: val = RequestId(kind: riNumber, num: r.parseInt(int)) of JsonValueKind.String: - val = RequestId(kind: riString, str: r.parseString()) + val = RequestId(kind: riString, str: r.parseString(MaxIdStringLength)) of JsonValueKind.Null: val = RequestId(kind: riNull) r.parseNull() @@ -256,16 +266,38 @@ proc writeValue*(w: var JsonWriter[JrpcSys], val: ResponseTx) proc readValue*(r: var JsonReader[JrpcSys], val: var ResponseRx) {.gcsafe, raises: [IOError, SerializationError].} = - # We need to overload ResponseRx reader because - # we don't want to skip null fields + # https://www.jsonrpc.org/specification#response_object + + var + jsonrpcOpt: Opt[JsonRPC2] + idOpt: Opt[RequestId] + resultOpt: Opt[JsonString] + errorOpt: Opt[ResponseError] + r.parseObjectWithoutSkip(key): case key - of "jsonrpc": r.readValue(val.jsonrpc) - of "id" : r.readValue(val.id) - of "result" : val.result = r.parseAsString() - of "error" : r.readValue(val.error) + of "jsonrpc": r.readValue(jsonrpcOpt) + of "id" : r.readValue(idOpt) + of "result" : resultOpt.ok r.parseAsString() + of "error" : r.readValue(errorOpt) else: discard + if jsonrpcOpt.isNone: + r.raiseIncompleteObject("Missing or invalid `jsonrpc` version") + let id = idOpt.valueOr: + r.raiseIncompleteObject("Missing `id` field") + + if resultOpt.isNone() and errorOpt.isNone(): + r.raiseIncompleteObject("Missing `result` or `error` field") + + if errorOpt.isSome(): + if resultOpt.isSome(): + r.raiseIncompleteObject("Both `result` and `error` fields present") + + val = ResponseRx(id: id, kind: ResponseKind.rkError, error: move(errorOpt[])) + else: + val = ResponseRx(id: id, kind: ResponseKind.rkResult, result: move(resultOpt[])) + proc writeValue*(w: var JsonWriter[JrpcSys], val: RequestBatchTx) {.gcsafe, raises: [IOError].} = if val.kind == rbkMany: @@ -316,4 +348,20 @@ func toTx*(params: RequestParamsRx): RequestParamsTx = result = RequestParamsTx(kind: rpNamed) result.named = params.named +template requestTxEncode*(writer: var JrpcSys.Writer, name: string, params: RequestParamsTx, id: int) = + writer.writeObject: + writer.writeMember("jsonrpc", JsonRPC2()) + writer.writeMember("id", id) + writer.writeMember("method", name) + writer.writeMember("params", params) + +template withWriter*(_: type JrpcSys, writer, body: untyped): seq[byte] = + var stream = memoryOutput() + + {.cast(noSideEffect), cast(raises: []).}: + var writer = JrpcSys.Writer.init(stream) + body + + stream.getOutput(seq[byte]) + {.pop.} diff --git a/json_rpc/private/shared_wrapper.nim b/json_rpc/private/shared_wrapper.nim index b3b4d5c..7ff26ba 100644 --- a/json_rpc/private/shared_wrapper.nim +++ b/json_rpc/private/shared_wrapper.nim @@ -63,9 +63,9 @@ func paramsTx*(params: JsonNode): RequestParamsTx = positional: @[JrpcConv.encode(params).JsonString], ) -func requestTx*(name: string, params: RequestParamsTx, id: RequestId): RequestTx = +func requestTx*(name: string, params: sink RequestParamsTx, id: int): RequestTx = RequestTx( - id: Opt.some(id), + id: Opt.some(RequestId(kind: riNumber, num: id)), `method`: name, params: params, ) diff --git a/json_rpc/router.nim b/json_rpc/router.nim index c96fc24..f3c763a 100644 --- a/json_rpc/router.nim +++ b/json_rpc/router.nim @@ -35,6 +35,7 @@ type procs*: Table[string, RpcProc] const + # https://www.jsonrpc.org/specification#error_object JSON_PARSE_ERROR* = -32700 INVALID_REQUEST* = -32600 METHOD_NOT_FOUND* = -32601 @@ -43,8 +44,6 @@ const SERVER_ERROR* = -32000 JSON_ENCODE_ERROR* = -32001 - defaultMaxRequestLength* = 1024 * 128 - # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ diff --git a/json_rpc/rpcproxy.nim b/json_rpc/rpcproxy.nim index 6bab43a..1d6cd48 100644 --- a/json_rpc/rpcproxy.nim +++ b/json_rpc/rpcproxy.nim @@ -54,9 +54,8 @@ proc getWebSocketClientConfig*( ClientConfig(kind: WebSocket, wsUri: uri, compression: compression, flags: flags) proc proxyCall(client: RpcClient, name: string): RpcProc = - return proc (params: RequestParamsRx): Future[JsonString] {.async.} = - let res = await client.call(name, params.toTx) - return res + return proc(params: RequestParamsRx): Future[JsonString] {.async: (raw: true).} = + client.call(name, params.toTx) proc getClient*(proxy: RpcProxy): RpcClient = case proxy.kind diff --git a/json_rpc/server.nim b/json_rpc/server.nim index 2c08f1f..5d73be0 100644 --- a/json_rpc/server.nim +++ b/json_rpc/server.nim @@ -14,6 +14,7 @@ import chronos, ./router, ./jsonmarshal, + ./client, ./private/jrpc_sys, ./private/shared_wrapper, ./errors @@ -49,17 +50,11 @@ proc executeMethod*(server: RpcServer, params: RequestParamsTx): Future[JsonString] {.async: (raises: [CancelledError, JsonRpcError]).} = let - req = requestTx(methodName, params, RequestId(kind: riNumber, num: 0)) + req = requestTx(methodName, params, 0) reqData = JrpcSys.encode(req) respData = await server.router.route(reqData) - resp = try: - JrpcSys.decode(respData, ResponseRx) - except CatchableError as exc: - raise (ref JsonRpcError)(msg: exc.msg) - - if resp.error.isSome: - raise (ref JsonRpcError)(msg: $resp.error.get) - resp.result + + processsSingleResponse(respData.toOpenArrayByte(0, respData.high()), 0) proc executeMethod*(server: RpcServer, methodName: string, diff --git a/json_rpc/servers/socketserver.nim b/json_rpc/servers/socketserver.nim index 337edb3..403b377 100644 --- a/json_rpc/servers/socketserver.nim +++ b/json_rpc/servers/socketserver.nim @@ -24,6 +24,9 @@ type servers: seq[StreamServer] processClientHook: StreamCallback2 +# TODO replace with configurable value +const defaultMaxRequestLength* = 1024 * 128 + proc processClient(server: StreamServer, transport: StreamTransport) {.async: (raises: []).} = ## Process transport data to the RPC server try: diff --git a/tests/test_client_hook.nim b/tests/test_client_hook.nim index 85e4868..613619b 100644 --- a/tests/test_client_hook.nim +++ b/tests/test_client_hook.nim @@ -89,7 +89,7 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async: (r var rpc = getUserData[TestSocketServer](server) while true: var - value = await transport.readLine(router.defaultMaxRequestLength) + value = await transport.readLine(defaultMaxRequestLength) if value == "": await transport.closeWait() break diff --git a/tests/test_jrpc_sys.nim b/tests/test_jrpc_sys.nim index 0dc6b0d..76928ed 100644 --- a/tests/test_jrpc_sys.nim +++ b/tests/test_jrpc_sys.nim @@ -171,40 +171,32 @@ suite "jrpc_sys conversion": let txBytes = JrpcSys.encode(tx) let rx = JrpcSys.decode(txBytes, ResponseRx) check: - rx.jsonrpc.isSome - rx.id.isSome - rx.id.get.num == 777 + rx.id.num == 777 + rx.kind == ResponseKind.rkResult rx.result.string.len > 0 rx.result == JsonString("true") - rx.error.isNone test "ResponseTx -> ResponseRx: id(string), err: nodata": let tx = res("gum", resErr(999, "fatal")) let txBytes = JrpcSys.encode(tx) let rx = JrpcSys.decode(txBytes, ResponseRx) check: - rx.jsonrpc.isSome - rx.id.isSome - rx.id.get.str == "gum" - rx.result.string.len == 0 - rx.error.isSome - rx.error.get.code == 999 - rx.error.get.message == "fatal" - rx.error.get.data.isNone + rx.id.str == "gum" + rx.kind == ResponseKind.rkError + rx.error.code == 999 + rx.error.message == "fatal" + rx.error.data.isNone test "ResponseTx -> ResponseRx: id(string), err: some data": let tx = res("gum", resErr(999, "fatal", JsonString("888.999"))) let txBytes = JrpcSys.encode(tx) let rx = JrpcSys.decode(txBytes, ResponseRx) check: - rx.jsonrpc.isSome - rx.id.isSome - rx.id.get.str == "gum" - rx.result.string.len == 0 - rx.error.isSome - rx.error.get.code == 999 - rx.error.get.message == "fatal" - rx.error.get.data.get == JsonString("888.999") + rx.id.str == "gum" + rx.kind == ResponseKind.rkError + rx.error.code == 999 + rx.error.message == "fatal" + rx.error.data.get == JsonString("888.999") test "RequestBatchTx -> RequestBatchRx: single": let tx1 = req(123, "int_positional", pp1) From 16047319678ff8e4ce085879c2d20283401612a3 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sat, 15 Nov 2025 13:10:41 +0100 Subject: [PATCH 02/11] compat --- json_rpc/client.nim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/json_rpc/client.nim b/json_rpc/client.nim index 057d962..6cc42f0 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -238,9 +238,10 @@ proc send*( len = requestData.len, lastId, remote = batch.client.remote proc complete( - client: RpcClient, request: auto, map: sink auto, lastId: int + client: RpcClient, request: auto, map: sink Table[int, int], lastId: int ): Future[Result[seq[RpcBatchResponse], string]] {.async: (raises: [CancelledError]).} = var + map = move(map) # 2.0 compat res = try: let resData = await request From 98ca2411758e8675baa7f0f3f33cde80431836dd Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sat, 15 Nov 2025 19:40:02 +0100 Subject: [PATCH 03/11] compat --- json_rpc/client.nim | 3 ++- json_rpc/clients/socketclient.nim | 3 --- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/json_rpc/client.nim b/json_rpc/client.nim index 6cc42f0..c619baf 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -101,8 +101,9 @@ proc processMessage*(client: RpcClient, line: sink seq[byte]): Result[void, stri # Messages are assumed to arrive one by one - even if the future was cancelled, # we therefore consume one message for every line we don't have to process if client.pendingRequests.len() == 0: + let id {.used.} = JrpcSys.decode(line, ReqRespHeader).id debug "Received message even though there's nothing queued, dropping", - id = JrpcSys.decode(line, ReqRespHeader).id + id return ok() let fut = client.pendingRequests.popFirst() diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index fa47daf..d9c1351 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -62,9 +62,6 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} = lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc) break - if data == "": - break - client.processMessage(data.toBytes()).isOkOr: lastError = (ref RequestDecodeError)(msg: error, payload: data.toBytes()) break From f1a34369503a6e91c3d95080135a55635401845b Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sat, 15 Nov 2025 19:40:39 +0100 Subject: [PATCH 04/11] compat --- json_rpc/clients/socketclient.nim | 3 +++ 1 file changed, 3 insertions(+) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index d9c1351..963c544 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -61,6 +61,9 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} = except CatchableError as exc: lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc) break + debugEcho data + if data == "": + break client.processMessage(data.toBytes()).isOkOr: lastError = (ref RequestDecodeError)(msg: error, payload: data.toBytes()) From 57a3721f72935d3ca82a6a2f8bf128f3c358d08a Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sat, 15 Nov 2025 19:47:17 +0100 Subject: [PATCH 05/11] compat --- json_rpc/client.nim | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/json_rpc/client.nim b/json_rpc/client.nim index c619baf..179c535 100644 --- a/json_rpc/client.nim +++ b/json_rpc/client.nim @@ -101,9 +101,8 @@ proc processMessage*(client: RpcClient, line: sink seq[byte]): Result[void, stri # Messages are assumed to arrive one by one - even if the future was cancelled, # we therefore consume one message for every line we don't have to process if client.pendingRequests.len() == 0: - let id {.used.} = JrpcSys.decode(line, ReqRespHeader).id debug "Received message even though there's nothing queued, dropping", - id + id = (block: JrpcSys.decode(line, ReqRespHeader).id) return ok() let fut = client.pendingRequests.popFirst() From bb65a53e441d1ee9ca7e157baa69dd0304344029 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sat, 15 Nov 2025 22:29:15 +0100 Subject: [PATCH 06/11] test --- json_rpc/clients/socketclient.nim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index 963c544..9be10d0 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -42,6 +42,7 @@ method request( client.withPendingFut(fut): try: discard await transport.write(reqData) + debugEcho "B '", reqData, "'" discard await transport.write("\r\n") except TransportError as exc: # If there's an error sending, the "next messages" facility will be @@ -61,7 +62,7 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} = except CatchableError as exc: lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc) break - debugEcho data + debugEcho "A '", data, "'" if data == "": break From 402c3e76fcd0f7c30a08e756a0ada1bfb57bbd48 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sun, 16 Nov 2025 07:54:25 +0100 Subject: [PATCH 07/11] test --- json_rpc/clients/websocketclient.nim | 3 +-- json_rpc/servers/socketserver.nim | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/json_rpc/clients/websocketclient.nim b/json_rpc/clients/websocketclient.nim index 489d295..b87f1c6 100644 --- a/json_rpc/clients/websocketclient.nim +++ b/json_rpc/clients/websocketclient.nim @@ -59,7 +59,7 @@ method request*( # broken since we don't know if the server observed the message or not try: await noCancel transport.close() - except CatchableError as exc: + except CatchableError: # TODO https://github.com/status-im/nim-websock/pull/178 raiseAssert "Doesn't actually raise" raise (ref RpcPostError)(msg: exc.msg, parent: exc) @@ -67,7 +67,6 @@ method request*( await fut proc processData(client: RpcWebSocketClient) {.async: (raises: []).} = - let transport = client.transport var lastError: ref JsonRpcError while client.transport.readyState != ReadyState.Closed: var data = diff --git a/json_rpc/servers/socketserver.nim b/json_rpc/servers/socketserver.nim index 403b377..098afed 100644 --- a/json_rpc/servers/socketserver.nim +++ b/json_rpc/servers/socketserver.nim @@ -42,6 +42,7 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async: (r len = req.len let res = await rpc.route(req) + debugEcho "c '", res, "'" discard await transport.write(res & "\r\n") except TransportError as ex: error "Transport closed during processing client", From 819e5fa983221192e480f02183d1e617eee6d84f Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sun, 16 Nov 2025 08:26:40 +0100 Subject: [PATCH 08/11] test --- json_rpc/clients/socketclient.nim | 7 +++---- json_rpc/servers/socketserver.nim | 3 ++- tests/test_callsigs.nim | 4 ++-- tests/testethcalls.nim | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index 9be10d0..f4492f3 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -53,16 +53,15 @@ method request( await fut proc processData(client: RpcSocketClient) {.async: (raises: []).} = - let transport = client.transport var lastError: ref JsonRpcError while true: let data = try: - await transport.readLine(client.maxMessageSize) + await client.transport.readLine(client.maxMessageSize) except CatchableError as exc: lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc) break - debugEcho "A '", data, "'" + debugEcho "A '", data, "'", client.transport.atEof if data == "": break @@ -75,7 +74,7 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} = client.clearPending(lastError) - await transport.closeWait() + await client.transport.closeWait() client.transport = nil if not client.onDisconnect.isNil: client.onDisconnect() diff --git a/json_rpc/servers/socketserver.nim b/json_rpc/servers/socketserver.nim index 098afed..338c68b 100644 --- a/json_rpc/servers/socketserver.nim +++ b/json_rpc/servers/socketserver.nim @@ -34,7 +34,6 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async: (r while true: let req = await transport.readLine(defaultMaxRequestLength) if req == "": - await transport.closeWait() break debug "Received JSON-RPC request", @@ -51,6 +50,8 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async: (r except CancelledError: error "JSON-RPC request processing cancelled", address = transport.remoteAddress() + finally: + await transport.closeWait() # Utility functions for setting up servers using stream transport addresses diff --git a/tests/test_callsigs.nim b/tests/test_callsigs.nim index 4bdc0e9..17dda60 100644 --- a/tests/test_callsigs.nim +++ b/tests/test_callsigs.nim @@ -12,9 +12,9 @@ import ../json_rpc/rpcclient, ../json_rpc/rpcserver -from os import getCurrentDir, DirSep +from os import getCurrentDir, DirSep, AltSep from strutils import rsplit -template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] +template sourceDir: string = currentSourcePath.rsplit({DirSep, AltSep}, 1)[0] type Variant = int | bool | string diff --git a/tests/testethcalls.nim b/tests/testethcalls.nim index 96748fa..5ec6350 100644 --- a/tests/testethcalls.nim +++ b/tests/testethcalls.nim @@ -16,9 +16,9 @@ import ./private/ethprocs, ./private/stintjson -from os import getCurrentDir, DirSep +from os import getCurrentDir, DirSep, AltSep from strutils import rsplit -template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] +template sourceDir: string = currentSourcePath.rsplit({DirSep, AltSep}, 1)[0] var server = newRpcSocketServer("127.0.0.1", Port(0)) From ecc1988bddaed6afbbb7acac9738e38d5c026f01 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sun, 16 Nov 2025 08:42:34 +0100 Subject: [PATCH 09/11] test --- json_rpc/clients/socketclient.nim | 1 - json_rpc/servers/socketserver.nim | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index f4492f3..85cc6a0 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -79,7 +79,6 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} = if not client.onDisconnect.isNil: client.onDisconnect() - proc connect*( client: RpcSocketClient, address: TransportAddress ) {.async: (raises: [CancelledError, JsonRpcError]).} = diff --git a/json_rpc/servers/socketserver.nim b/json_rpc/servers/socketserver.nim index 338c68b..3d12bf7 100644 --- a/json_rpc/servers/socketserver.nim +++ b/json_rpc/servers/socketserver.nim @@ -34,6 +34,7 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async: (r while true: let req = await transport.readLine(defaultMaxRequestLength) if req == "": + debugEcho "closing, ", transport.atEof() break debug "Received JSON-RPC request", @@ -41,7 +42,6 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async: (r len = req.len let res = await rpc.route(req) - debugEcho "c '", res, "'" discard await transport.write(res & "\r\n") except TransportError as ex: error "Transport closed during processing client", From 9211e003f733bab9cfaeecb040d1773b35fca42c Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sun, 16 Nov 2025 09:18:11 +0100 Subject: [PATCH 10/11] test --- json_rpc/clients/socketclient.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index 85cc6a0..943b4f7 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -41,9 +41,9 @@ method request( client.withPendingFut(fut): try: - discard await transport.write(reqData) + discard await transport.write(reqData & "\r\n".toBytes()) debugEcho "B '", reqData, "'" - discard await transport.write("\r\n") + # discard await transport.write("\r\n") except TransportError as exc: # If there's an error sending, the "next messages" facility will be # broken since we don't know if the server observed the message or not From e9141b19f7ac43e294712be47b2176645f0cf8e7 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sun, 16 Nov 2025 09:47:22 +0100 Subject: [PATCH 11/11] rm debug code --- json_rpc/clients/socketclient.nim | 3 --- json_rpc/servers/socketserver.nim | 1 - 2 files changed, 4 deletions(-) diff --git a/json_rpc/clients/socketclient.nim b/json_rpc/clients/socketclient.nim index 943b4f7..fcb7dc3 100644 --- a/json_rpc/clients/socketclient.nim +++ b/json_rpc/clients/socketclient.nim @@ -42,8 +42,6 @@ method request( client.withPendingFut(fut): try: discard await transport.write(reqData & "\r\n".toBytes()) - debugEcho "B '", reqData, "'" - # discard await transport.write("\r\n") except TransportError as exc: # If there's an error sending, the "next messages" facility will be # broken since we don't know if the server observed the message or not @@ -61,7 +59,6 @@ proc processData(client: RpcSocketClient) {.async: (raises: []).} = except CatchableError as exc: lastError = (ref RpcTransportError)(msg: exc.msg, parent: exc) break - debugEcho "A '", data, "'", client.transport.atEof if data == "": break diff --git a/json_rpc/servers/socketserver.nim b/json_rpc/servers/socketserver.nim index 3d12bf7..4613c93 100644 --- a/json_rpc/servers/socketserver.nim +++ b/json_rpc/servers/socketserver.nim @@ -34,7 +34,6 @@ proc processClient(server: StreamServer, transport: StreamTransport) {.async: (r while true: let req = await transport.readLine(defaultMaxRequestLength) if req == "": - debugEcho "closing, ", transport.atEof() break debug "Received JSON-RPC request",