Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize warnings #45

Merged
merged 20 commits into from Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions rpcq/_client.py
Expand Up @@ -17,6 +17,7 @@
import logging
import time
from typing import Dict, Union
from warnings import warn

import zmq
import zmq.asyncio
Expand Down Expand Up @@ -172,6 +173,9 @@ def call(self, method_name: str, *args, rpc_timeout: float = None, **kwargs):
else:
_log.debug('Discarding reply: %s', reply)

for warning in reply.warnings:
warn(f"{warning.kind}: {warning.body}")

if isinstance(reply, RPCError):
raise utils.RPCError(reply.error)
else:
Expand Down
49 changes: 26 additions & 23 deletions rpcq/_spec.py
Expand Up @@ -20,6 +20,7 @@
import logging
import traceback
from typing import Union
from warnings import catch_warnings

from rpcq._utils import rpc_reply, rpc_error, RPCMethodError, get_input
from rpcq.messages import RPCRequest, RPCReply, RPCError
Expand Down Expand Up @@ -105,28 +106,30 @@ async def run_handler(self, request: RPCRequest) -> Union[RPCReply, RPCError]:
:param RPCRequest request: JSON RPC request
:return: JSON RPC reply
"""
try:
rpc_handler = self.get_handler(request)
except RPCMethodError as e:
return rpc_error(request.id, str(e))

try:
# Run RPC and get result
args, kwargs = get_input(request.params)
result = rpc_handler(*args, **kwargs)

if asyncio.iscoroutine(result):
result = await result

except Exception as e:
if self.serialize_exceptions:
_traceback = traceback.format_exc()
_log.error(_traceback)
if self.provide_tracebacks:
return rpc_error(request.id, "{}\n{}".format(str(e), _traceback))
with catch_warnings(record=True) as warnings:
caryan marked this conversation as resolved.
Show resolved Hide resolved
try:
rpc_handler = self.get_handler(request)
except RPCMethodError as e:
return rpc_error(request.id, str(e), warnings=warnings)

try:
# Run RPC and get result
args, kwargs = get_input(request.params)
result = rpc_handler(*args, **kwargs)

if asyncio.iscoroutine(result):
result = await result

except Exception as e:
if self.serialize_exceptions:
_traceback = traceback.format_exc()
_log.error(_traceback)
if self.provide_tracebacks:
return rpc_error(request.id, "{}\n{}".format(str(e), _traceback),
warnings=warnings)
else:
return rpc_error(request.id, str(e), warnings=warnings)
else:
return rpc_error(request.id, str(e))
else:
raise e
raise e

return rpc_reply(request.id, result)
return rpc_reply(request.id, result, warnings=warnings)
22 changes: 17 additions & 5 deletions rpcq/_utils.py
Expand Up @@ -16,11 +16,15 @@
"""Utils for message passing"""
import uuid
import warnings
from typing import Optional, Tuple, Union
from typing import Optional, Tuple, Union, List, Any

import rpcq.messages


def rpc_warning(warning) -> rpcq.messages.RPCWarning:
ecpeterson marked this conversation as resolved.
Show resolved Hide resolved
return rpcq.messages.RPCWarning(body=str(warning),
kind=str(type(warning)))

def rpc_request(method_name: str, *args, **kwargs) -> rpcq.messages.RPCRequest:
"""
Create RPC request
Expand All @@ -41,33 +45,41 @@ def rpc_request(method_name: str, *args, **kwargs) -> rpcq.messages.RPCRequest:
)


def rpc_reply(id: Union[str, int], result: Optional[object]) -> rpcq.messages.RPCReply:
def rpc_reply(id: Union[str, int], result: Optional[object],
warnings: List[Warning] = None) -> rpcq.messages.RPCReply:
ecpeterson marked this conversation as resolved.
Show resolved Hide resolved
"""
Create RPC reply

:param str|int id: Request ID
:param result: Result
:param warning: List of warnings to attach to the message
:return: JSON RPC formatted dict
"""
warnings = warnings or list()
ecpeterson marked this conversation as resolved.
Show resolved Hide resolved
ecpeterson marked this conversation as resolved.
Show resolved Hide resolved

return rpcq.messages.RPCReply(
jsonrpc='2.0',
id=id,
result=result
result=result,
warnings=[rpc_warning(warning) for warning in warnings]
)


def rpc_error(id: Union[str, int], error_msg: str) -> rpcq.messages.RPCError:
def rpc_error(id: Union[str, int], error_msg: str,
warnings: List[Any] = []) -> rpcq.messages.RPCError:
ecpeterson marked this conversation as resolved.
Show resolved Hide resolved
"""
Create RPC error

:param id: Request ID
:param error_msg: Error message
:param warning: List of warnings to attach to the message
ecpeterson marked this conversation as resolved.
Show resolved Hide resolved
:return: JSON RPC formatted dict
"""
return rpcq.messages.RPCError(
jsonrpc='2.0',
id=id,
error=error_msg)
error=error_msg,
warnings=[rpc_warning(warning) for warning in warnings])


def get_input(params: Union[dict, list]) -> Tuple[list, dict]:
Expand Down
19 changes: 19 additions & 0 deletions rpcq/messages.py
Expand Up @@ -68,6 +68,19 @@ class RPCRequest(Message):
"""The JSONRPC version."""


@dataclass(eq=False, repr=False)
class RPCWarning(Message):
"""
An individual warning emitted in the course of RPC processing.
"""

body: str
"""The warning string."""

kind: Optional[str] = None
"""The type of the warning raised."""


@dataclass(eq=False, repr=False)
class RPCReply(Message):
"""
Expand All @@ -83,6 +96,9 @@ class RPCReply(Message):
result: Optional[Any] = None
"""The RPC result."""

warnings: List[RPCWarning] = field(default_factory=list)
"""A list of warnings that occurred during request processing."""


@dataclass(eq=False, repr=False)
class RPCError(Message):
Expand All @@ -99,6 +115,9 @@ class RPCError(Message):
jsonrpc: str = "2.0"
"""The JSONRPC version."""

warnings: List[RPCWarning] = field(default_factory=list)
"""A list of warnings that occurred during request processing."""


@dataclass(eq=False, repr=False)
class TargetDevice(Message):
Expand Down
6 changes: 4 additions & 2 deletions rpcq/test/test_base.py
Expand Up @@ -32,13 +32,15 @@ def test_messages():
assert m.error == e
assert m.id == i
assert m.jsonrpc == "2.0"
assert len(m.warnings) == 0

assert m['error'] == e
assert m.get('error', 1) == e

assert m.asdict() == {"error": e,
"id": i,
"jsonrpc": "2.0"}
"id": i,
"jsonrpc": "2.0",
"warnings": []}

with pytest.raises(TypeError):
RPCError(bad_field=1)
Expand Down
14 changes: 14 additions & 0 deletions rpcq/test/test_rpc.py
Expand Up @@ -19,6 +19,7 @@
import signal
import time
from multiprocessing import Process
from warnings import warn, catch_warnings

import pytest
import zmq
Expand Down Expand Up @@ -57,6 +58,12 @@ def raise_error():
do_oops()


@mock.rpc_handler
def just_a_warning():
warn("Watch out!")
return 5


# Some functions that will eventually raise an error.
def do_oops():
oops()
Expand Down Expand Up @@ -122,6 +129,13 @@ def test_client_rpcerrorerror(server, client):
assert 'ValueError: Oops.' in full_traceback


def test_client_warning(server, client):
with catch_warnings(record=True) as warnings:
result = client.call('just_a_warning')
assert result == 5
assert len(warnings) > 0
ecpeterson marked this conversation as resolved.
Show resolved Hide resolved


def test_client(server, client):
assert client.call('add', 1, 1) == 2
assert client.call('foo') == 'bar'
Expand Down
24 changes: 24 additions & 0 deletions src-tests/test-rpc.lisp
Expand Up @@ -67,3 +67,27 @@
(rpcq:rpc-call client "test-method" :sleep 5)))
;; kill the server thread
(bt:destroy-thread server-thread))))

(defun served-method ()
(warn "The purpose of this test is to communicate a warning.")
"Some other reply payload.")

(deftest test-server-warnings ()
(let* ((server-function
(lambda ()
(let ((dt (rpcq:make-dispatch-table)))
(rpcq:dispatch-table-add-handler dt 'served-method)
(rpcq:start-server :timeout 5
:dispatch-table dt
:listen-addresses '("inproc://RPCQ-test")))))
(server-thread (bt:make-thread server-function)))
(sleep 1)
(unwind-protect
;; hook up the client
(rpcq:with-rpc-client (client "inproc://RPCQ-test")
;; send a communique
(signals simple-warning
(is (string= "Some other reply payload."
(rpcq:rpc-call client "served-method")))))
;; kill the server thread
(bt:destroy-thread server-thread))))
8 changes: 8 additions & 0 deletions src/client.lisp
Expand Up @@ -122,6 +122,10 @@
(|RPCError|
(cond
((string= uuid (|RPCError-id| unpacked-reply))
(loop :for rpc-warning :across (|RPCError-warnings| unpacked-reply)
:do (warn "Warning during RPC call: ~a: ~a"
(|RPCWarning-kind| rpc-warning)
(|RPCWarning-body| rpc-warning)))
(error 'rpc-error
:string (|RPCError-error| unpacked-reply)
:id (|RPCError-id| unpacked-reply)))
Expand All @@ -132,6 +136,10 @@
(|RPCReply|
(cond
((string= uuid (|RPCReply-id| unpacked-reply))
(loop :for rpc-warning :across (|RPCReply-warnings| unpacked-reply)
:do (warn "Warning during RPC call: ~a: ~a"
(|RPCWarning-kind| rpc-warning)
(|RPCWarning-body| rpc-warning)))
(|RPCReply-result| unpacked-reply))
(t
(warn "Discarding RPC error with ID ~a, which doesn't match ours of ~a."
Expand Down
26 changes: 26 additions & 0 deletions src/messages.lisp
Expand Up @@ -88,6 +88,20 @@
)
:documentation "A single request object according to the JSONRPC standard.")

(defmessage |RPCWarning| ()
ecpeterson marked this conversation as resolved.
Show resolved Hide resolved
(
(|body|
:documentation "The warning string."
:type :string
:required t)

(|kind|
:documentation "The type of the warning raised."
:type :string
:required nil)
)
:documentation "An individual warning emitted in the course of RPC processing.")

(defmessage |RPCReply| ()
(
(|jsonrpc|
Expand All @@ -105,6 +119,12 @@
:documentation "The RPC request id."
:type :string
:required t)

(|warnings|
:documentation "A list of warnings that occurred during request processing."
:type (:list |RPCWarning|)
:required t
:default nil)
)
:documentation "The reply for a JSONRPC request.")

Expand All @@ -125,6 +145,12 @@
:documentation "The RPC request id."
:type :string
:required t)

(|warnings|
ecpeterson marked this conversation as resolved.
Show resolved Hide resolved
:documentation "A list of warnings that occurred during request processing."
:type (:list |RPCWarning|)
:required t
:default nil)
)
:documentation "A error message for JSONRPC requests.")

Expand Down
16 changes: 11 additions & 5 deletions src/rpcq.lisp
Expand Up @@ -66,13 +66,19 @@ The input strings are assumed to be FORMAT-compatible, so sequences like ~<newli
(defmethod %serialize (payload)
payload)

(defmethod %serialize ((payload vector))
(map 'vector #'%serialize payload))

(defmethod %serialize ((payload cons))
(cond
((alexandria:proper-list-p payload)
(loop :for elt :in payload :collect (%serialize elt)))
(map 'vector #'%serialize payload))
(t
(error "Can only serialize proper lists, not raw conses. Got ~S" payload))))

(defmethod %serialize ((payload string))
payload)

(defmethod %serialize ((payload hash-table))
(let ((hash (make-hash-table :test #'equal)))
(loop :for k :being :the :hash-keys :of payload
Expand All @@ -85,15 +91,15 @@ The input strings are assumed to be FORMAT-compatible, so sequences like ~<newli

(defgeneric %deserialize-struct (type payload))

(defmethod %deserialize ((payload vector))
(map 'vector #'%deserialize payload))

(defmethod %deserialize ((payload cons))
(loop :for elt :in payload :collect (%deserialize elt)))
(map 'vector #'%deserialize payload))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question here


(defmethod %deserialize ((payload string))
payload)

(defmethod %deserialize ((payload array))
(%deserialize (coerce payload 'list)))

(defmethod %deserialize ((payload hash-table))
(let ((type (gethash "_type" payload)))
(if type
Expand Down