Skip to content

Commit

Permalink
* Tail-recursion optimization doesn't seem to work within a restart-c…
Browse files Browse the repository at this point in the history
…ase handler. Change to loop and fix potential stack overflow.

* New sample applications tock-client and tock-server.
* Add :verbose keyword for libzyre.so verbose debug message enable in zyre-pipe.
* Add zsys_shutdown() binding.
* 'try-again restart to zyre-idle was removed.
  • Loading branch information
Jesse Off committed Jan 24, 2020
1 parent 252290d commit 9c9f451
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -15,3 +15,4 @@
*.sx32fsl
*.wx64fsl
*.wx32fsl
*.lafsl
42 changes: 42 additions & 0 deletions tock.lisp
@@ -0,0 +1,42 @@
(in-package #:zyre)

(defun tock-client ()
"Joins Zyre group named tick-tock and waits for shouts. Shouts contain a remote tock-server's send
timestamp, which is then compared to the local tock-client's reception timestamp and the offset is
printed. This is a measure of how well locked the two clocks are to a reference time and how long
and jittery the network stack + Zyre shouting is."
(labels
((tick (msg peer)
(let ((n (local-time:now)) (o (local-time:parse-timestring msg)))
(format t ">> ~a msec (~a)~%"
(/ (ltd:duration-as (ltd:timestamp-difference n o) :nsec) 1000000.0)
peer)))
(handler (x) (match x ((shout-event (event-msg m) (name n)) (tick m n)))))
(pipe-mapc #'handler (zyre-pipe :group "tick-tock" :name "tock-client"))))

(defun ms-until (ts now)
(let ((ns (ltd:duration-as (ltd:timestamp-difference ts now) :nsec)))
(if (< ns 10000000) 0 (- (ceiling ns 1000000) 10))))

(defun tock-server ()
"Joins Zyre group named tick-tock and shouts the system's current timestamp once per second, on
the second, as precisely as possible. Wakes up from zpolling 10ms early to busy-wait the rest of the
way to make sure. (This reduces jitter by a few milliseconds at the cost of some CPU)"
(let* ((zp (zyre-pipe :group "tick-tock" :name (machine-instance)))
(z (pipe-first zp)) ;z will be the Zyre start-event
(to (local-time:timestamp-minimize-part
(local-time:timestamp+ (local-time:now) 2 :sec)
:nsec))
now)
(handler-bind ((zyre-idle
(lambda (x) (declare (ignore x))
(setf now (local-time:now))
(if (local-time:timestamp<= to now)
(use-value 'quiescent)
(poll-some-ms (ms-until to now))))))
(loop
(setf zp (pipe-sink-until (lambda (x) (eq x 'quiescent)) zp))
(if (pipe-endp zp) (return-from tock-server nil))
(format t ">> Tock! (~a)~%" (local-time:format-timestring nil now))
(shout z "tick-tock" (local-time:format-timestring nil now))
(setf to (local-time:timestamp+ to 1 :sec) zp (pipe-rest zp))))))
18 changes: 18 additions & 0 deletions zyre.asd
Expand Up @@ -38,3 +38,21 @@ Distributed under the MIT license (see LICENSE file)
:components ((:file "package")
(:file "zyredir")))

(asdf:defsystem #:zyre/tock-server
:description "Sample app that shouts its time once/sec to group tick-tock."
:depends-on (:zyre :local-time-duration :local-time)
:license "MIT"
:entry-point "zyre::tock-server"
:serial t
:components ((:file "package")
(:file "tock")))

(asdf:defsystem #:zyre/tock-client
:description "Sample app that compares its system time against shouts to group tick-tock."
:depends-on (:zyre :local-time-duration :local-time)
:license "MIT"
:entry-point "zyre::tock-client"
:serial t
:components ((:file "package")
(:file "tock")))

38 changes: 27 additions & 11 deletions zyre.lisp
Expand Up @@ -67,6 +67,11 @@ Distributed under the MIT license (see LICENSE file)
(cffi:defcfun ("zyre_destroy" %zyre-destroy) :void
(zyre-** :pointer))

(cffi:defcfun ("zyre_set_verbose" zyre-set-verbose) :void
(self :pointer))

(cffi:defcfun ("zsys_shutdown" zsys-shutdown) :void)

(cffi:defcfun ("zyre_uuid" zyre-uuid) :string
(self :pointer))

Expand Down Expand Up @@ -307,6 +312,10 @@ Distributed under the MIT license (see LICENSE file)
(cffi:defcfun ("zpoller_terminated" zpoller-terminated) :bool
(zpoller :pointer))

(cffi:defcfun ("zpoller_set_nonstop" zpoller-set-nonstop) :void
(zpoller :pointer)
(nonstop :bool))

(defun zhash_destroy (zhash)
(declare (type cffi:foreign-pointer zhash))
(cffi:with-foreign-object (ptr :pointer)
Expand Down Expand Up @@ -442,7 +451,7 @@ can be any valid event returned from zyre-pipe."
(defun stop-zyre () (invoke-restart 'stop-zyre))
(defun poll-some-ms (ms) (invoke-restart 'poll-some-ms ms))

(defun zyre-pipe (&key (name (cffi:null-pointer)) headers group interface port)
(defun zyre-pipe (&key (name (cffi:null-pointer)) headers group interface port verbose)
"Initializes and starts up a Zyre node and returns an infinite pipe of Zyre events as conditions.
Takes :name as the short name of the node, otherwise defaults to the first few digits of the UUID.
The :headers arg is a list of (key . value) string pairs, i.e. an alist. The :group key can either
Expand All @@ -457,14 +466,17 @@ pipe may block (depending on the handler of the zyre-idle condition, described b
always count on the first start-event being returned immediately.
When there are no new events, a condition of type zyre-idle is signalled. Valid restarts are
'try-again, 'poll-some-ms, and 'stop. The default action is to block the running thread.
'use-value, 'poll-some-ms, and 'stop. The default action is to block the running thread.
'poll-some-ms takes a argument in milliseconds to continue waiting for, at which point the condition
will be resignalled. 'stop initiates a graceful exit from the zyre network."
will be resignalled. 'stop initiates a graceful exit from the zyre network. 'use-value allows a
handler to insert a sentinel value into the zyre-pipe output."
(let* ((z (zyre-new name))
(zp (zpoller-new (%zyre-socket z) :pointer (cffi:null-pointer)))
(zyre (make-instance 'zyre-state :raw-zyre z :raw-zpoller zp
(zyre (make-instance 'zyre-state :raw-zyre z :raw-zpoller zp
:uuid (zyre-uuid z) :name (zyre-name z))))
(declare (type cffi:foreign-pointer z zp) (type zyre-state zyre))
(when verbose (zyre-set-verbose z))
(zpoller-set-nonstop zp t)
(tg:finalize zyre (lambda () (zpoller-destroy zp) (zyre-destroy z)))
(when interface (zyre-set-interface z interface))
(when group (join zyre group))
Expand All @@ -480,7 +492,9 @@ will be resignalled. 'stop initiates a graceful exit from the zyre network."
:empty-pipe
(pipe-cons (zyre-recv-maybe 0) (next-event-pipe))))
(start-event () (make-condition 'start-event :uuid (uuid zyre) :name (name zyre)))
(stop-event () (make-condition 'stop-event :uuid (uuid zyre) :name (name zyre)))
(stop-event ()
(prog1 (make-condition 'stop-event :uuid (uuid zyre) :name (name zyre))
(stop zyre)))
(%zyre-recv-maybe (to)
(declare (type fixnum to) (optimize speed))
(if (cffi::null-pointer-p (zpoller-wait zp to))
Expand All @@ -489,12 +503,14 @@ will be resignalled. 'stop initiates a graceful exit from the zyre network."
(if (cffi:null-pointer-p x)
(%zyre-recv-maybe 0)
(zyre-zmsg-to-condition x)))))
(zyre-recv-maybe (to)
(declare (type fixnum to) (optimize speed))
(restart-case (%zyre-recv-maybe to)
(stop-zyre () (prog1 (stop-event) (stop zyre)))
(use-value (x) x)
(poll-some-ms (ms) (zyre-recv-maybe ms))))
(zyre-recv-maybe (timeout)
(declare (type fixnum timeout) (optimize speed))
(let ((to timeout))
(loop
(restart-case (return-from zyre-recv-maybe (%zyre-recv-maybe to))
(stop-zyre () (return-from zyre-recv-maybe (stop-event)))
(use-value (x) (return-from zyre-recv-maybe x))
(poll-some-ms (ms) (setf to ms))))))
(annotate-event (ev)
(when (typep ev 'event) (setf (event-zyre ev) zyre))
(match ev
Expand Down

0 comments on commit 9c9f451

Please sign in to comment.