Skip to content

Commit

Permalink
bring the Lisp code up to speed with its Python counterparts
Browse files Browse the repository at this point in the history
introduce Lisp RPC client and server
upgrade de/serialization in Lisp to use msgpack
  • Loading branch information
ecpeterson authored and stylewarning committed Oct 19, 2018
1 parent 0a8be8d commit d482ea5
Show file tree
Hide file tree
Showing 10 changed files with 559 additions and 27 deletions.
21 changes: 18 additions & 3 deletions README.md
Expand Up @@ -30,7 +30,7 @@ for more info).
Using the Client-Server Framework
---------------------------------

First, create a server, add a test handler, and spin it up.
The following two code samples (first in Python, then in Lisp) demonstrate how to create a server, add a test handler, and spin it up.

```python
from rpcq import Server
Expand All @@ -44,7 +44,17 @@ def test():
server.run('tcp://*:5555')
```

In another window, create a client that points to the same socket, and call the test method.
```lisp
(defun test ()
"test")
(let ((dt (rpcq:make-dispatch-table)))
(rpcq:dispatch-table-add-handler dt 'test)
(rpcq:start-server :dispatch-table dt
:listen-addresses '("tcp://*:5555")))
```

In another window, we can (again first in Python, then in Lisp) create a client that points to the same socket, and call the test method.

```python
from rpcq import Client
Expand All @@ -54,7 +64,12 @@ client = Client('tcp://localhost:5555')
client.call('test')
```

This will return the string `'test'`.
```lisp
(rpcq:with-rpc-client (client "tcp://localhost:5555")
(rpcq:rpc-call client "test"))
```

In all cases (including interoperating a client/server pair written in different languages), this will return the string `'test'`.

Using the Message Spec
----------------------
Expand Down
7 changes: 2 additions & 5 deletions rpcq-tests.asd
Expand Up @@ -30,8 +30,5 @@
:serial t
:components ((:file "package")
(:file "suite")

)

)

(:file "test-rpc")
))
11 changes: 10 additions & 1 deletion rpcq.asd
Expand Up @@ -22,11 +22,20 @@
:depends-on (#:alexandria ; Utilities
#:parse-float ; Float parsing
#:yason ; JSON generation
;; RPC requirements
#:pzmq ; communication layer
#:cl-messagepack ; message packer
#:bordeaux-threads ; threaded RPC server
#:local-time ; local time for logs
#:unicly ; UUID generation
)
:in-order-to ((asdf:test-op (asdf:test-op #:rpcq-tests)))
:pathname "src/"
:serial t
:components ((:file "package")
(:file "utilities")
(:file "rpcq")
(:file "messages")))
(:file "messages")
(:file "server")
(:file "client")))

10 changes: 10 additions & 0 deletions src-tests/suite.lisp
Expand Up @@ -69,3 +69,13 @@
(is (string= (my-msg-str m) "a string"))
)
)

(deftest test-serialize-deserialize ()
(let* ((original (make-instance 'rpcq::|RPCRequest|
:|method| "test-method"
:|params| (make-hash-table)
:|id| "test-id"))
(cloned (rpcq::deserialize (rpcq::serialize original))))
(is (typep cloned 'rpcq::|RPCRequest|))
(is (string= (rpcq::|RPCRequest-id| original) (rpcq::|RPCRequest-id| cloned)))
(is (string= (rpcq::|RPCRequest-method| original) (rpcq::|RPCRequest-method| cloned)))))
74 changes: 74 additions & 0 deletions src-tests/test-rpc.lisp
@@ -0,0 +1,74 @@
;;;; test-rpc.lisp
;;;;
;;;; Author: Eric Peterson

(in-package #:rpcq-tests)


(deftest test-client-server-dialogue ()
(let* ((expected-response "test-response")
(method-name "test-method")
(server-thread
(lambda ()
(let ((dt (rpcq:make-dispatch-table)))
(rpcq:dispatch-table-add-handler dt (constantly expected-response)
:name method-name)
(rpcq:start-server :dispatch-table dt
:listen-addresses '("inproc://RPCQ-test"))))))
;; spawn the server thread
(let ((server-thread (bt:make-thread server-thread)))
(unwind-protect
(progn
(sleep 1)
;; hook up the client
(rpcq:with-rpc-client (client "inproc://RPCQ-test")
;; send a communique
(let ((server-response (rpcq:rpc-call client method-name)))
(is (string= expected-response server-response)))))
;; kill the server thread
(bt:destroy-thread server-thread)))))

(deftest test-client-timeout ()
(let* ((method-name "test-method")
(server-thread
(lambda ()
(let ((dt (rpcq:make-dispatch-table)))
(rpcq:dispatch-table-add-handler dt (lambda () (sleep 5))
:name method-name)
(rpcq:start-server :dispatch-table dt
:listen-addresses '("inproc://RPCQ-test"))))))
;; spawn the server thread
(let ((server-thread (bt:make-thread server-thread)))
(unwind-protect
(progn
(sleep 1)
;; hook up the client
(rpcq:with-rpc-client (client "inproc://RPCQ-test" :timeout 1)
;; send a communique
(signals sb-ext:timeout
(rpcq:rpc-call client method-name))))
;; kill the server thread
(bt:destroy-thread server-thread)))))

(deftest test-server-timeout ()
(let* ((method-name "test-method")
(server-thread
(lambda ()
(let ((dt (rpcq:make-dispatch-table)))
(rpcq:dispatch-table-add-handler dt (lambda () (sleep 5))
:name method-name)
(rpcq:start-server :timeout 1
:dispatch-table dt
:listen-addresses '("inproc://RPCQ-test"))))))
;; spawn the server thread
(let ((server-thread (bt:make-thread server-thread)))
(unwind-protect
(progn
(sleep 1)
;; hook up the client
(rpcq:with-rpc-client (client "inproc://RPCQ-test")
;; send a communique
(signals rpcq::rpc-error
(rpcq:rpc-call client method-name))))
;; kill the server thread
(bt:destroy-thread server-thread)))))
134 changes: 134 additions & 0 deletions src/client.lisp
@@ -0,0 +1,134 @@
;;;; client.lisp
;;;;
;;;; Author: Eric Peterson
;;;;
;;;; Lisp mimic of the python JSON RPC Client.

;;
;; Suppose someone else has set up a compute resource which supplies access to
;; some valuable functions over RPCQ, which you would like to access. You're in
;; luck! The RPCQ client makes this task easy.
;;
;; Suppose that the definition of the function on the remote machine is as such:
;;
;; (defun my-favorite-function (&keys (argA valA) (argB valB) ...)
;; ...)
;;
;; If you had access to its definition locally, you might invoke it as
;;
;; (my-favorite-function :argA val1 :argB val2 ...)
;;
;; Making the analogous call over an RPC client looks like this:
;;
;; (with-rpc-client (socket "tcp://the-endpoint")
;; (rpc-call socket "my-favorite-function" :argA val1 :argB val2 ...))
;;
;; The connection to the remote server is automatically opened and closed by
;; the form WITH-RPC-CLIENT, and it can be used as many times as one likes
;; within the body of that form.
;;


(in-package #:rpcq)

(defstruct rpc-client
"Holds the data for an (active) RPCQ client connection."
socket
(timeout nil :type (or null (real 0))))

(defmethod print-object ((object rpc-client) stream)
(print-unreadable-object (object stream :type t :identity t)
(cond
((rpc-client-socket object)
(format stream "~s" (pzmq:getsockopt (rpc-client-socket object) :last-endpoint)))
(t
(format stream "DISCONNECTED")))))


(define-condition rpc-error (simple-error)
((error-string :initarg :string :reader rpc-error-string)
(request-id :initarg :id :reader rpc-error-request-id))
(:documentation "An RPC call signaled an error on the remote host.")
(:report (lambda (condition stream)
(format stream "RPC request ~a resulted in error:~%~a~&"
(rpc-error-request-id condition)
(rpc-error-string condition)))))

(define-condition rpc-protocol-error (simple-error)
((object :initarg :object :reader rpc-protocol-error-object)
(id :initarg :id :reader rpc-protocol-error-id))
(:documentation "While listening for an RPC call reply, the client received malformed information.")
(:report (lambda (condition stream)
(format stream "RPC request ~a resulted in bad reply:~%~a~&"
(rpc-protocol-error-id condition)
(rpc-protocol-error-object condition)))))


(defun rpc-call (client call &rest args)
"Makes a synchronous RPC call, designated by the string method name CALL, over the connection CLIENT. ARGS is a plist of arguments. Returns the result of the call directly."
(let* ((uuid (unicly:uuid-princ-to-string (unicly:make-v4-uuid)))
(request (make-instance '|RPCRequest|
:|id| uuid
:|params| (alexandria:plist-hash-table args :test #'equal)
:|method| (sanitize-name call)))
(payload (serialize request))
(socket (rpc-client-socket client)))
(cffi:with-foreign-object (foreign-payload :uint8 (length payload))
(dotimes (j (length payload))
(setf (cffi:mem-aref foreign-payload ':uint8 j)
(aref payload j)))
(pzmq:send socket foreign-payload :len (length payload)))
;; NOTE: Here lies a tail-recursive loop that waits for a reply.
;; Lisp users working in an implementation that doesn't handle
;; tail-recursion well should beware that receiving a bunch of bad
;; packets could blow the stack!
(labels
((loop-til-reply ()
(let (unpacked-reply)
(pzmq:with-message msg
(pzmq:msg-recv msg socket)
(setf unpacked-reply (deserialize (unpack-foreign-msg-to-bytes msg))))
(typecase unpacked-reply
(|RPCError|
(cond
((string= uuid (|RPCError-id| unpacked-reply))
(error 'rpc-error
:string (|RPCError-error| unpacked-reply)
:id (|RPCError-id| unpacked-reply)))
(t
(warn "Discarding RPC error with ID ~a, which doesn't match ours of ~a."
(|RPCError-id| unpacked-reply) uuid)
(loop-til-reply))))
(|RPCReply|
(cond
((string= uuid (|RPCReply-id| unpacked-reply))
(|RPCReply-result| unpacked-reply))
(t
(warn "Discarding RPC error with ID ~a, which doesn't match ours of ~a."
(|RPCReply-id| unpacked-reply) uuid)
(loop-til-reply))))
(otherwise
(error 'rpc-protocol-error
:id uuid
:object unpacked-reply))))))
(cond
((rpc-client-timeout client)
(let ((timeout (rpc-client-timeout client)))
(bt:with-timeout (timeout)
(loop-til-reply))))
(t
(loop-til-reply))))))

(defmacro with-rpc-client ((client endpoint &rest options) &body body)
"Opens an RPCQ client connection, referenced by CLIENT, at ENDPOINT. The connection is automatically closed as this form is exited or unwound. Hence, CLIENT is only valid during the execution of BODY, and it should not be stored or closed over.
OPTIONS is a p-list with the following possible keys:
:TIMEOUT is a timeout duration in seconds."
(let ((socket (gensym "SOCKET-"))
(timeout (getf options ':timeout)))
`(pzmq:with-socket ,socket :dealer
(pzmq:connect ,socket ,endpoint)
(let ((,client (make-rpc-client :socket ,socket
:timeout ,timeout)))
,@body))))
21 changes: 19 additions & 2 deletions src/package.lisp
Expand Up @@ -21,5 +21,22 @@
#:serialize
#:deserialize
#:clear-messages
#:python-message-spec))

#:python-message-spec
;; RPC client/server functions
#:make-dispatch-table
#:dispatch-table-add-handler
#:start-server
#:with-rpc-client
#:rpc-call
;; RPC client/server errors and error accessors
#:not-an-rpcrequest
#:not-an-rpcrequest-object
#:unknown-rpc-method
#:unknown-rpc-method-name
#:rpc-error
#:rpc-error-string
#:rpc-error-request-id
#:rpc-protocol-error
#:rpc-protocol-error-id
#:rpc-protocol-error-object
))
37 changes: 21 additions & 16 deletions src/rpcq.lisp
Expand Up @@ -374,11 +374,13 @@ class ~A(Message):
CoreMessages.~A = _deprecated_property(~:*~A)
" (symbol-name msg-name))))))

(defgeneric serialize (obj stream)
(:documentation "Serialize OBJ and append its representation to STREAM"))

(defmethod serialize (obj stream)
(yason:encode obj stream))
(defun serialize (obj &optional stream)
"Serialize OBJ, either written to a stream or returned as a vector of (INTEGER 0 255)."
(typecase stream
(stream
(messagepack:encode-stream (%serialize obj) stream))
(otherwise
(messagepack:encode (%serialize obj)))))

(defgeneric %deserialize (payload)
(:documentation "Reconstruct objects that have already been converted to Lisp objects."))
Expand All @@ -403,8 +405,11 @@ CoreMessages.~A = _deprecated_property(~:*~A)

(defun deserialize (payload)
"Deserialize the object(s) encoded in PAYLOAD (string or stream)."
(%deserialize (let ((*read-default-float-format* 'double-float))
(yason:parse payload :json-arrays-as-vectors t))))
(etypecase payload
(array
(%deserialize (messagepack:decode payload)))
(stream
(%deserialize (messagepack:decode-stream payload)))))

(defun slot-type-and-initform (field-type required default)
"Translate a FIELD-TYPE to a Lisp type and initform taking into account
Expand Down Expand Up @@ -559,22 +564,22 @@ We distinguish between the following options for any field type:
(lambda (slot-name)
`( ,(intern (symbol-name slot-name) :keyword)
(%deserialize (gethash ,(symbol-name slot-name) ,json))))))
(alexandria:with-gensyms (obj)
(alexandria:with-gensyms (obj hash-table)
(let ((slot-names (mapcar #'car field-specs)))
`(progn
(defclass ,class-name ()
,(mapcar #'make-slot-spec field-specs)
,@(when documentation
`((:documentation ,(format-documentation-string documentation)))))

(defmethod yason:encode ((,obj ,class-name) &optional (stream *standard-output*))
(defmethod %serialize ((,obj ,class-name))
(with-slots ,slot-names ,obj
(yason:with-output (stream)
(yason:with-object ()
(yason:encode-object-element "_type" ,(symbol-name class-name))
,@(loop :for slot :in slot-names
:collect `(yason:encode-object-element ,(string-downcase (symbol-name slot))
,slot))))))
(let ((,hash-table (make-hash-table :test #'equal)))
(setf (gethash "_type" ,hash-table) ,(symbol-name class-name))
,@(loop :for slot :in slot-names
:collect `(setf (gethash ,(symbol-name slot) ,hash-table)
,slot))
,hash-table)))

(defmethod %deserialize-struct ((type (eql ',class-name)) (payload hash-table))
(assert (string= (gethash "_type" payload) ,(symbol-name class-name)))
Expand Down

0 comments on commit d482ea5

Please sign in to comment.