From d482ea507f62e9736872154ab2fcccc46dd004f0 Mon Sep 17 00:00:00 2001 From: Eric Peterson Date: Mon, 15 Oct 2018 14:39:08 -0400 Subject: [PATCH] bring the Lisp code up to speed with its Python counterparts introduce Lisp RPC client and server upgrade de/serialization in Lisp to use msgpack --- README.md | 21 +++- rpcq-tests.asd | 7 +- rpcq.asd | 11 +- src-tests/suite.lisp | 10 ++ src-tests/test-rpc.lisp | 74 ++++++++++++ src/client.lisp | 134 ++++++++++++++++++++++ src/package.lisp | 21 +++- src/rpcq.lisp | 37 +++--- src/server.lisp | 248 ++++++++++++++++++++++++++++++++++++++++ src/utilities.lisp | 23 ++++ 10 files changed, 559 insertions(+), 27 deletions(-) create mode 100644 src-tests/test-rpc.lisp create mode 100644 src/client.lisp create mode 100644 src/server.lisp create mode 100644 src/utilities.lisp diff --git a/README.md b/README.md index 75cd3de..376afa3 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ for more info). Using the Client-Server Framework --------------------------------- -First, create a server, add a test handler, and spin it up. +The following two code samples (first in Python, then in Lisp) demonstrate how to create a server, add a test handler, and spin it up. ```python from rpcq import Server @@ -44,7 +44,17 @@ def test(): server.run('tcp://*:5555') ``` -In another window, create a client that points to the same socket, and call the test method. +```lisp +(defun test () + "test") + +(let ((dt (rpcq:make-dispatch-table))) + (rpcq:dispatch-table-add-handler dt 'test) + (rpcq:start-server :dispatch-table dt + :listen-addresses '("tcp://*:5555"))) +``` + +In another window, we can (again first in Python, then in Lisp) create a client that points to the same socket, and call the test method. ```python from rpcq import Client @@ -54,7 +64,12 @@ client = Client('tcp://localhost:5555') client.call('test') ``` -This will return the string `'test'`. +```lisp +(rpcq:with-rpc-client (client "tcp://localhost:5555") + (rpcq:rpc-call client "test")) +``` + +In all cases (including interoperating a client/server pair written in different languages), this will return the string `'test'`. Using the Message Spec ---------------------- diff --git a/rpcq-tests.asd b/rpcq-tests.asd index bc77008..840802e 100644 --- a/rpcq-tests.asd +++ b/rpcq-tests.asd @@ -30,8 +30,5 @@ :serial t :components ((:file "package") (:file "suite") - - ) - - ) - + (:file "test-rpc") + )) diff --git a/rpcq.asd b/rpcq.asd index 2a69b83..bd08ffa 100644 --- a/rpcq.asd +++ b/rpcq.asd @@ -22,11 +22,20 @@ :depends-on (#:alexandria ; Utilities #:parse-float ; Float parsing #:yason ; JSON generation + ;; RPC requirements + #:pzmq ; communication layer + #:cl-messagepack ; message packer + #:bordeaux-threads ; threaded RPC server + #:local-time ; local time for logs + #:unicly ; UUID generation ) :in-order-to ((asdf:test-op (asdf:test-op #:rpcq-tests))) :pathname "src/" :serial t :components ((:file "package") + (:file "utilities") (:file "rpcq") - (:file "messages"))) + (:file "messages") + (:file "server") + (:file "client"))) diff --git a/src-tests/suite.lisp b/src-tests/suite.lisp index 551a614..3a0745e 100644 --- a/src-tests/suite.lisp +++ b/src-tests/suite.lisp @@ -69,3 +69,13 @@ (is (string= (my-msg-str m) "a string")) ) ) + +(deftest test-serialize-deserialize () + (let* ((original (make-instance 'rpcq::|RPCRequest| + :|method| "test-method" + :|params| (make-hash-table) + :|id| "test-id")) + (cloned (rpcq::deserialize (rpcq::serialize original)))) + (is (typep cloned 'rpcq::|RPCRequest|)) + (is (string= (rpcq::|RPCRequest-id| original) (rpcq::|RPCRequest-id| cloned))) + (is (string= (rpcq::|RPCRequest-method| original) (rpcq::|RPCRequest-method| cloned))))) diff --git a/src-tests/test-rpc.lisp b/src-tests/test-rpc.lisp new file mode 100644 index 0000000..413bacd --- /dev/null +++ b/src-tests/test-rpc.lisp @@ -0,0 +1,74 @@ +;;;; test-rpc.lisp +;;;; +;;;; Author: Eric Peterson + +(in-package #:rpcq-tests) + + +(deftest test-client-server-dialogue () + (let* ((expected-response "test-response") + (method-name "test-method") + (server-thread + (lambda () + (let ((dt (rpcq:make-dispatch-table))) + (rpcq:dispatch-table-add-handler dt (constantly expected-response) + :name method-name) + (rpcq:start-server :dispatch-table dt + :listen-addresses '("inproc://RPCQ-test")))))) + ;; spawn the server thread + (let ((server-thread (bt:make-thread server-thread))) + (unwind-protect + (progn + (sleep 1) + ;; hook up the client + (rpcq:with-rpc-client (client "inproc://RPCQ-test") + ;; send a communique + (let ((server-response (rpcq:rpc-call client method-name))) + (is (string= expected-response server-response))))) + ;; kill the server thread + (bt:destroy-thread server-thread))))) + +(deftest test-client-timeout () + (let* ((method-name "test-method") + (server-thread + (lambda () + (let ((dt (rpcq:make-dispatch-table))) + (rpcq:dispatch-table-add-handler dt (lambda () (sleep 5)) + :name method-name) + (rpcq:start-server :dispatch-table dt + :listen-addresses '("inproc://RPCQ-test")))))) + ;; spawn the server thread + (let ((server-thread (bt:make-thread server-thread))) + (unwind-protect + (progn + (sleep 1) + ;; hook up the client + (rpcq:with-rpc-client (client "inproc://RPCQ-test" :timeout 1) + ;; send a communique + (signals sb-ext:timeout + (rpcq:rpc-call client method-name)))) + ;; kill the server thread + (bt:destroy-thread server-thread))))) + +(deftest test-server-timeout () + (let* ((method-name "test-method") + (server-thread + (lambda () + (let ((dt (rpcq:make-dispatch-table))) + (rpcq:dispatch-table-add-handler dt (lambda () (sleep 5)) + :name method-name) + (rpcq:start-server :timeout 1 + :dispatch-table dt + :listen-addresses '("inproc://RPCQ-test")))))) + ;; spawn the server thread + (let ((server-thread (bt:make-thread server-thread))) + (unwind-protect + (progn + (sleep 1) + ;; hook up the client + (rpcq:with-rpc-client (client "inproc://RPCQ-test") + ;; send a communique + (signals rpcq::rpc-error + (rpcq:rpc-call client method-name)))) + ;; kill the server thread + (bt:destroy-thread server-thread))))) diff --git a/src/client.lisp b/src/client.lisp new file mode 100644 index 0000000..fbc4b1d --- /dev/null +++ b/src/client.lisp @@ -0,0 +1,134 @@ +;;;; client.lisp +;;;; +;;;; Author: Eric Peterson +;;;; +;;;; Lisp mimic of the python JSON RPC Client. + +;; +;; Suppose someone else has set up a compute resource which supplies access to +;; some valuable functions over RPCQ, which you would like to access. You're in +;; luck! The RPCQ client makes this task easy. +;; +;; Suppose that the definition of the function on the remote machine is as such: +;; +;; (defun my-favorite-function (&keys (argA valA) (argB valB) ...) +;; ...) +;; +;; If you had access to its definition locally, you might invoke it as +;; +;; (my-favorite-function :argA val1 :argB val2 ...) +;; +;; Making the analogous call over an RPC client looks like this: +;; +;; (with-rpc-client (socket "tcp://the-endpoint") +;; (rpc-call socket "my-favorite-function" :argA val1 :argB val2 ...)) +;; +;; The connection to the remote server is automatically opened and closed by +;; the form WITH-RPC-CLIENT, and it can be used as many times as one likes +;; within the body of that form. +;; + + +(in-package #:rpcq) + +(defstruct rpc-client + "Holds the data for an (active) RPCQ client connection." + socket + (timeout nil :type (or null (real 0)))) + +(defmethod print-object ((object rpc-client) stream) + (print-unreadable-object (object stream :type t :identity t) + (cond + ((rpc-client-socket object) + (format stream "~s" (pzmq:getsockopt (rpc-client-socket object) :last-endpoint))) + (t + (format stream "DISCONNECTED"))))) + + +(define-condition rpc-error (simple-error) + ((error-string :initarg :string :reader rpc-error-string) + (request-id :initarg :id :reader rpc-error-request-id)) + (:documentation "An RPC call signaled an error on the remote host.") + (:report (lambda (condition stream) + (format stream "RPC request ~a resulted in error:~%~a~&" + (rpc-error-request-id condition) + (rpc-error-string condition))))) + +(define-condition rpc-protocol-error (simple-error) + ((object :initarg :object :reader rpc-protocol-error-object) + (id :initarg :id :reader rpc-protocol-error-id)) + (:documentation "While listening for an RPC call reply, the client received malformed information.") + (:report (lambda (condition stream) + (format stream "RPC request ~a resulted in bad reply:~%~a~&" + (rpc-protocol-error-id condition) + (rpc-protocol-error-object condition))))) + + +(defun rpc-call (client call &rest args) + "Makes a synchronous RPC call, designated by the string method name CALL, over the connection CLIENT. ARGS is a plist of arguments. Returns the result of the call directly." + (let* ((uuid (unicly:uuid-princ-to-string (unicly:make-v4-uuid))) + (request (make-instance '|RPCRequest| + :|id| uuid + :|params| (alexandria:plist-hash-table args :test #'equal) + :|method| (sanitize-name call))) + (payload (serialize request)) + (socket (rpc-client-socket client))) + (cffi:with-foreign-object (foreign-payload :uint8 (length payload)) + (dotimes (j (length payload)) + (setf (cffi:mem-aref foreign-payload ':uint8 j) + (aref payload j))) + (pzmq:send socket foreign-payload :len (length payload))) + ;; NOTE: Here lies a tail-recursive loop that waits for a reply. + ;; Lisp users working in an implementation that doesn't handle + ;; tail-recursion well should beware that receiving a bunch of bad + ;; packets could blow the stack! + (labels + ((loop-til-reply () + (let (unpacked-reply) + (pzmq:with-message msg + (pzmq:msg-recv msg socket) + (setf unpacked-reply (deserialize (unpack-foreign-msg-to-bytes msg)))) + (typecase unpacked-reply + (|RPCError| + (cond + ((string= uuid (|RPCError-id| unpacked-reply)) + (error 'rpc-error + :string (|RPCError-error| unpacked-reply) + :id (|RPCError-id| unpacked-reply))) + (t + (warn "Discarding RPC error with ID ~a, which doesn't match ours of ~a." + (|RPCError-id| unpacked-reply) uuid) + (loop-til-reply)))) + (|RPCReply| + (cond + ((string= uuid (|RPCReply-id| unpacked-reply)) + (|RPCReply-result| unpacked-reply)) + (t + (warn "Discarding RPC error with ID ~a, which doesn't match ours of ~a." + (|RPCReply-id| unpacked-reply) uuid) + (loop-til-reply)))) + (otherwise + (error 'rpc-protocol-error + :id uuid + :object unpacked-reply)))))) + (cond + ((rpc-client-timeout client) + (let ((timeout (rpc-client-timeout client))) + (bt:with-timeout (timeout) + (loop-til-reply)))) + (t + (loop-til-reply)))))) + +(defmacro with-rpc-client ((client endpoint &rest options) &body body) + "Opens an RPCQ client connection, referenced by CLIENT, at ENDPOINT. The connection is automatically closed as this form is exited or unwound. Hence, CLIENT is only valid during the execution of BODY, and it should not be stored or closed over. + +OPTIONS is a p-list with the following possible keys: + + :TIMEOUT is a timeout duration in seconds." + (let ((socket (gensym "SOCKET-")) + (timeout (getf options ':timeout))) + `(pzmq:with-socket ,socket :dealer + (pzmq:connect ,socket ,endpoint) + (let ((,client (make-rpc-client :socket ,socket + :timeout ,timeout))) + ,@body)))) diff --git a/src/package.lisp b/src/package.lisp index 42dd6b6..03109d8 100644 --- a/src/package.lisp +++ b/src/package.lisp @@ -21,5 +21,22 @@ #:serialize #:deserialize #:clear-messages - #:python-message-spec)) - + #:python-message-spec + ;; RPC client/server functions + #:make-dispatch-table + #:dispatch-table-add-handler + #:start-server + #:with-rpc-client + #:rpc-call + ;; RPC client/server errors and error accessors + #:not-an-rpcrequest + #:not-an-rpcrequest-object + #:unknown-rpc-method + #:unknown-rpc-method-name + #:rpc-error + #:rpc-error-string + #:rpc-error-request-id + #:rpc-protocol-error + #:rpc-protocol-error-id + #:rpc-protocol-error-object + )) diff --git a/src/rpcq.lisp b/src/rpcq.lisp index 8888371..6d4e76f 100644 --- a/src/rpcq.lisp +++ b/src/rpcq.lisp @@ -374,11 +374,13 @@ class ~A(Message): CoreMessages.~A = _deprecated_property(~:*~A) " (symbol-name msg-name)))))) -(defgeneric serialize (obj stream) - (:documentation "Serialize OBJ and append its representation to STREAM")) - -(defmethod serialize (obj stream) - (yason:encode obj stream)) +(defun serialize (obj &optional stream) + "Serialize OBJ, either written to a stream or returned as a vector of (INTEGER 0 255)." + (typecase stream + (stream + (messagepack:encode-stream (%serialize obj) stream)) + (otherwise + (messagepack:encode (%serialize obj))))) (defgeneric %deserialize (payload) (:documentation "Reconstruct objects that have already been converted to Lisp objects.")) @@ -403,8 +405,11 @@ CoreMessages.~A = _deprecated_property(~:*~A) (defun deserialize (payload) "Deserialize the object(s) encoded in PAYLOAD (string or stream)." - (%deserialize (let ((*read-default-float-format* 'double-float)) - (yason:parse payload :json-arrays-as-vectors t)))) + (etypecase payload + (array + (%deserialize (messagepack:decode payload))) + (stream + (%deserialize (messagepack:decode-stream payload))))) (defun slot-type-and-initform (field-type required default) "Translate a FIELD-TYPE to a Lisp type and initform taking into account @@ -559,22 +564,22 @@ We distinguish between the following options for any field type: (lambda (slot-name) `( ,(intern (symbol-name slot-name) :keyword) (%deserialize (gethash ,(symbol-name slot-name) ,json)))))) - (alexandria:with-gensyms (obj) + (alexandria:with-gensyms (obj hash-table) (let ((slot-names (mapcar #'car field-specs))) `(progn (defclass ,class-name () ,(mapcar #'make-slot-spec field-specs) ,@(when documentation `((:documentation ,(format-documentation-string documentation))))) - - (defmethod yason:encode ((,obj ,class-name) &optional (stream *standard-output*)) + + (defmethod %serialize ((,obj ,class-name)) (with-slots ,slot-names ,obj - (yason:with-output (stream) - (yason:with-object () - (yason:encode-object-element "_type" ,(symbol-name class-name)) - ,@(loop :for slot :in slot-names - :collect `(yason:encode-object-element ,(string-downcase (symbol-name slot)) - ,slot)))))) + (let ((,hash-table (make-hash-table :test #'equal))) + (setf (gethash "_type" ,hash-table) ,(symbol-name class-name)) + ,@(loop :for slot :in slot-names + :collect `(setf (gethash ,(symbol-name slot) ,hash-table) + ,slot)) + ,hash-table))) (defmethod %deserialize-struct ((type (eql ',class-name)) (payload hash-table)) (assert (string= (gethash "_type" payload) ,(symbol-name class-name))) diff --git a/src/server.lisp b/src/server.lisp new file mode 100644 index 0000000..d06ed3e --- /dev/null +++ b/src/server.lisp @@ -0,0 +1,248 @@ +;;;; server.lisp +;;;; +;;;; Author: Eric Peterson +;;;; +;;;; Lisp mimic of the python JSON RPC Server. + +;; +;; Suppose you've written some collection of functions, like +;; +;; (defun my-first-function (&keys (argA valA) (argB valB) ...) +;; ...) +;; (defun my-second-function (&keys (arg1 val1) (arg2 val2) ...) +;; ...) +;; ... +;; +;; and you want to build a compute resource that other remote processes can +;; access. You're in luck! This file will help you set up an RPCQ server +;; process that administers access to your favorite collection of functions. +;; +;; The primary method involved is START-SERVER, which amounts to the main loop +;; of the server. In addition to the usual server-y parameters (listening +;; endpoint, number of worker threads, time limits, ...), it takes a +;; DISPATCH-TABLE, which is populated with the list of functions you wish to +;; service for RPC clients. +;; +;; We may set up an RPC server (at the default endpoint, with the default number +;; of workers, with the default time limit ) that delivers on the function calls +;; defined above using the following snippet: +;; +;; (let ((dt (make-dispatch-table))) +;; (dispatch-table-add-handler dt 'my-first-function) +;; (dispatch-table-add-handler dt 'my-second-function) +;; ... +;; (dispatch-table-add-handler dt 'my-nth-function) +;; (start-server :dispatch-table dt)) +;; +;; Whenever the server receives an RPC call from a client, it browses its +;; dispatch table, retrieves the corresponding function (or signals failure to +;; the client), evaluates the function with the client-supplied parameters, +;; and replies with the result. +;; +;; NOTE: Both the arguments and the return value of functions in the dispatch +;; table must either be JSONifiable Lisp types (numbers, strings, lists, hashes) +;; or they must be RPCQ objects defined (on both the client and the server +;; process) via DEFMESSAGE. +;; + +(in-package #:rpcq) + + +(deftype dispatch-table () + 'hash-table) + +(defun make-dispatch-table () + "Make an empty DISPATCH-TABLE, suitable for use with DISPATCH-TABLE-ADD-HANDLER and START-SERVER." + (make-hash-table :test #'equal)) + +(defun dispatch-table-add-handler (dispatch-table f + &key + (name (string f))) + "Add a function F to DISPATCH-TABLE, for use by an RPCQ server. The function F is expected to take only keyword arguments. + +By default, a symbol passed in for F will be automatically converted into the name of F used in the dispatch table. To manually specify a name (or to provide a name for a non-symbol value of F), use the keyword argument :NAME." + (check-type f function) + (check-type name string) + (setf (gethash (sanitize-name name) dispatch-table) f) + nil) + + +(defun %pull-raw-request (receiver) + "Pulls a ZMQ request over the RECEIVER socket. Returns a VALUES triple: + +* IDENTITY-FRAME: array of (UNSIGNED-BYTE 8) that describes the intended recipient of a reply to this request. +* NULL-FRAME?: boolean indicating whether the recipient expects an additional null frame after the identity frame. (This is the case for REQ-type clients.) +* PAYLOAD: array of (UNSIGNED-BYTE 8) that houses the raw request." + ;; ZeroMQ requests come in one of two flavors: + ;; + ;; (1) identity frame, null frame, data frame + ;; (2) identity frame, data frame + ;; + ;; whichever we get, we also have to reply in the same way, so we track the + ;; format in addition to the data. + (let (identity) + (pzmq:with-message msg + (pzmq:msg-recv msg receiver) + (setf identity (unpack-foreign-msg-to-bytes msg))) + (assert (pzmq:getsockopt receiver :rcvmore)) + (pzmq:with-message msg + (pzmq:msg-recv msg receiver) + (cond + ((pzmq:getsockopt receiver :rcvmore) + (assert (zerop (pzmq:msg-size msg))) + (pzmq:with-message msg + (assert (not (pzmq:getsockopt receiver :rcvmore))) + (values + identity + t + (unpack-foreign-msg-to-bytes msg)))) + (t + (values + identity + nil + (unpack-foreign-msg-to-bytes msg))))))) + +(defun %push-raw-request (socket identity null-frame? payload) + "Pushes a ZMQ reply onto SOCKET. Takes the following values: + +* IDENTITY: array of (UNSIGNED-BYTE 8) that describes the inteded recipient of the reply. Copy this from the matching %PULL-RAW-REQUEST. +* NULL-FRAME?: boolean indicating whether the recipient expects an additional null frame after the identity frame. Copy this from the matching %PULL-RAW-REQUEST. +* PAYLOAD: array of (UNSIGNED-BYTE 8) that houses the raw reply." + ;; transmit messages + (cffi:with-foreign-objects ((foreign-identity ':uint8 (length identity)) + (foreign-payload ':uint8 (length payload))) + (dotimes (j (length identity)) + (setf (cffi:mem-aref foreign-identity ':uint8 j) + (aref identity j))) + (dotimes (j (length payload)) + (setf (cffi:mem-aref foreign-payload ':uint8 j) + (aref payload j))) + (pzmq:send socket foreign-identity :len (length identity) :sndmore t) + (when null-frame? + (pzmq:send socket "" :len 0 :sndmore t)) + (pzmq:send socket foreign-payload :len (length payload) :sndmore nil)) + nil) + + +(define-condition not-an-rpcrequest (simple-error) + ((object :initarg :object :reader not-an-rpcrequest-object)) + (:documentation "While listening for an RPC call, the server received malformed information.") + (:report (lambda (condition stream) + (format stream "Received bad object as inbound RPC request:~%~a~&" + (not-an-rpcrequest-object condition))))) + +(define-condition unknown-rpc-method (simple-error) + ((method-name :initarg :method-name :reader unknown-rpc-method-name)) + (:documentation "The server received an RPC request for a method it does not recognize.") + (:report (lambda (condition stream) + (format stream "Received request for method \"~a\", which is not known to us." + (unknown-rpc-method-name condition))))) + + +(defun %rpc-server-thread-worker (&key + dispatch-table + logging-stream + timeout) + "The thread body for an RPCQ server. Responds to RPCQ requests which match entries in DISPATCH-TABLE and writes log entries to LOGGING-STREAM. + +DISPATCH-TABLE and LOGGING-STREAM are both required arguments. TIMEOUT is of type (OR NULL (REAL 0)), with NIL signalling no timeout." + (pzmq:with-socket receiver :dealer + (pzmq:connect receiver "inproc://workers") + (loop + (handler-case + (let (request result reply) + (multiple-value-bind (identity empty-frame raw-request) + (%pull-raw-request receiver) + + (handler-case + (progn + (setf request (deserialize raw-request)) + (unless (typep request '|RPCRequest|) + (error 'not-an-rpcrequest + :object request)) + (format-log logging-stream "Got request: ~a" (|RPCRequest-method| request)) + + (let ((params-as-plist + (loop :for key :being :the :hash-keys :of (|RPCRequest-params| request) + :using (hash-value val) + :collect (str->lisp-keyword key) + :collect val))) + (let ((f (gethash (|RPCRequest-method| request) dispatch-table))) + (unless f + (error 'unknown-rpc-method + :method-name (|RPCRequest-method| request))) + (let ((*debug-io* logging-stream)) + (setf result + (if timeout + (bt:with-timeout (timeout) + (apply f params-as-plist)) + (apply f params-as-plist)))) + + (setf reply (make-instance '|RPCReply| + :|id| (|RPCRequest-id| request) + :|result| result)))) + (format-log logging-stream "Finishing request: ~a" (|RPCRequest-method| request))) + + ;; this is where errors go where we can reply to the client + (unknown-rpc-method (c) + (declare (ignore c)) + (format-log logging-stream "Error: method ~a unknown" (|RPCRequest-method| request)) + (setf reply (make-instance '|RPCError| + :|id| (|RPCRequest-id| request) + :|error| (format nil "Method named \"~a\" is unknown." + (|RPCRequest-method| request))))) + (bt:timeout (c) + (declare (ignore c)) + (format-log logging-stream "Timed out on request ~a" (|RPCRequest-method| request)) + (setf reply (make-instance '|RPCError| + :|id| (|RPCRequest-id| request) + :|error| (format nil "Execution timed out. Note: time limit: ~a seconds." timeout)))) + (error (c) + (format-log logging-stream "Threw generic error during RPC call:~%~a" c) + (setf reply (make-instance '|RPCError| + :|id| (|RPCRequest-id| request) + :|error| (format nil "~a" c))))) + + ;; send the client response, whether success or failure + (%push-raw-request receiver identity empty-frame (serialize reply)))) + + ;; this is where errors go where we can't even reply to the client + (simple-error (c) + (format-log logging-stream "Threw generic error before RPC call:~%~a" c)))))) + +(defun start-server (&key + dispatch-table + (listen-addresses (list "tcp://*:5555")) + (thread-count 5) + (logging-stream (make-broadcast-stream)) + timeout) + "Main loop of an RPCQ server. + +Argument descriptions: + * DISPATCH-TABLE, of type DISPATCH-TABLE, registers the valid methods to which the server will respond. + * LISTEN-ADDRESSES is a list of strings, each of which is a valid ZMQ interface address that the server will listen on. + * THREAD-COUNT is a positive integer of the number of worker threads that the server will spawn to service requests. + * LOGGING-STREAM is the stream to which the worker threads will write debug information. This stream is also forwarded to the RPC functions as *DEBUG-IO*. + * TIMEOUT, of type (OR NULL (REAL 0)), sets the maximum duration that a thread will be allowed to work for before it is forcefully terminated. A TIMEOUT value of NIL signals that no thread will ever be terminated for taking too long." + (check-type dispatch-table dispatch-table) + (check-type logging-stream stream) + (check-type thread-count (integer 1)) + (check-type timeout (or null (real 0))) + (check-type listen-addresses list) + (format-log logging-stream "Spawning server at ~a .~%" listen-addresses) + (pzmq:with-sockets ((clients :router :monitor) (workers :dealer :monitor)) + (dolist (address listen-addresses) + (pzmq:bind clients address)) + (pzmq:bind workers "inproc://workers") + (let ((thread-pool nil)) + (unwind-protect + (progn + (dotimes (j thread-count) + (push (bt:make-thread (lambda () (%rpc-server-thread-worker + :dispatch-table dispatch-table + :logging-stream logging-stream + :timeout timeout)) + :name (format nil "RPC-server-thread-~a" j)) + thread-pool)) + (pzmq:device :queue clients workers)) + (mapc #'bt:destroy-thread thread-pool))))) diff --git a/src/utilities.lisp b/src/utilities.lisp new file mode 100644 index 0000000..1e4101a --- /dev/null +++ b/src/utilities.lisp @@ -0,0 +1,23 @@ +;;;; utilities.lisp +;;;; +;;;; Author: Eric Peterson + +(in-package #:rpcq) + + +(defun sanitize-name (string-designator) + "Convert a stringifiable STRING-DESIGNATOR into a snake-case string." + (substitute #\_ #\- (string string-designator))) + +(defun str->lisp-keyword (str) + "Convert a snake-case string into a hyphenated Lisp keyword." + (make-keyword (substitute #\- #\_ (string-upcase str)))) + +(defun format-log (s fmt-string &rest args) + "Writes a format string to the stream S in debug output format." + (format s "[~A | ~A] ~?~%" (local-time:now) (bt:thread-name (bt:current-thread)) fmt-string args)) + +(defun unpack-foreign-msg-to-bytes (msg) + "Converts a foreign array of unsigned characters to a Lisp vector of such." + (cffi:foreign-array-to-lisp (pzmq:msg-data msg) + `(:array :uint8 ,(pzmq:msg-size msg))))