Skip to content

Commit

Permalink
Emit and Deliver now use the job API, closes #8
Browse files Browse the repository at this point in the history
  • Loading branch information
sshirokov committed May 14, 2012
2 parents 97c7f18 + 11180a8 commit 68c7376
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 61 deletions.
40 changes: 2 additions & 38 deletions src/emitter.lisp
Expand Up @@ -46,52 +46,16 @@ only fires once, then it is removed."))
(flet ((queue-cb (cb)
(when (remhash cb (oneshots emitter))
(remove-listener emitter event cb))
(enqueue (deliver-queue (owner emitter)) (lambda () (apply cb args)))))
(queue-work (owner emitter) (curry #'apply cb args) :normal)))
(mapc #'queue-cb registered))))

(defmethod deliver :after ((emitter emitter) event args)
(declare (ignore event args))
(flet ((deliver-cb (l w e)
(declare (ignore l e))
(log-for (debug) "Deliver queue: ~S" (deliver-queue (owner emitter)))
(if-let (thunk (dequeue (deliver-queue (owner emitter))))
(funcall thunk)
(ev:stop-watcher (owner emitter) w :keep-callback t))))

(unless (deliver-runner (owner emitter))
(setf (deliver-runner (owner emitter))
(let ((watcher (make-instance 'ev:ev-idle)))
(ev:set-idle (owner emitter) watcher #'deliver-cb)
(incf (ev:watcher-slot watcher :priority))
watcher)))

(unless (queue-empty-p (deliver-queue (owner emitter)))
(ev:start-watcher (owner emitter) (deliver-runner (owner emitter))))))

(defmethod emit ((emitter emitter) (event string) &rest args)
"Enqueue the delivery of an `event' with `args'. The event will
be delivered at some point in the future, but very soon, in the order
that it was emitted relative to other events."
(flet ((emit-thunk ()
(deliver emitter event args)))
(enqueue (emit-queue (owner emitter)) #'emit-thunk)))

(defmethod emit :after ((emitter emitter) (event string) &rest args)
(declare (ignore args))
(flet ((emitter-cb (l w e)
(declare (ignore l e))
(log-for (debug) "Emit queue: ~S" (emit-queue (owner emitter)))
(if-let (thunk (dequeue (emit-queue (owner emitter))))
(funcall thunk)
(ev:stop-watcher (owner emitter) w :keep-callback t))))

(unless (emit-runner (owner emitter))
(setf (emit-runner (owner emitter))
(let ((watcher (make-instance 'ev:ev-idle)))
(ev:set-idle (owner emitter) watcher #'emitter-cb)
watcher)))

(ev:start-watcher (owner emitter) (emit-runner (owner emitter)))))
(queue-work (owner emitter) #'emit-thunk)))

(defmethod add-listener ((emitter emitter) (event string) (callback symbol))
(add-listener emitter event (symbol-function callback)))
Expand Down
8 changes: 6 additions & 2 deletions src/generics.lisp
Expand Up @@ -13,7 +13,11 @@ to be invoked every `timeout'"))
(defgeneric clear (hinge handle)
(:documentation "Clear the registration of a watcher (e.g. timeout or interval) named by `handle'"))

(defgeneric queue-work (hinge work &optional queue)
(:documentation "Queue a thunk, `work', into a work queue on
`hinge'. If no `queue' is named, `:low' should be used."))

;; Wrappers
(defmacro defer ((hinge) &body forms)
`(enqueue (defer-queue ,hinge)
(lambda () ,@forms)))
"Enqueue work `forms' into the low priority queue of `hinge'"
`(queue-work ,hinge (lambda () ,@forms) :low))
24 changes: 8 additions & 16 deletions src/hinge.lisp
Expand Up @@ -16,7 +16,7 @@
((owner :initarg :owner :accessor owner)

(queue :initform (make-instance 'queue) :accessor queue)
(priority :initform 0 :accessor priority)
(priority :initarg :priority :initform 0 :accessor priority)
(runner :initform (make-instance 'ev:ev-idle) :accessor runner))
(:metaclass c2mop:funcallable-standard-class)
(:documentation "A wrapper that binds together a queue and an idle runner
Expand Down Expand Up @@ -49,18 +49,10 @@ Set the watcher priority and bind the instance as the callback"
((bg-pool :accessor bg-pool
:documentation "Background work threadpool")

(defer-queue :accessor defer-queue)

(emit-queue :accessor emit-queue
:initform (make-instance 'queue)
:documentation "Queue of event emissions.")
(emit-runner :initform nil
:accessor emit-runner
:documentation "The queue runner of the `emit-queue'")

(deliver-queue :accessor deliver-queue
:initform (make-instance 'queue)
:documentation "Queue of event deliveries.")
(deliver-runner :initform nil
:accessor deliver-runner
:documentation "The queue runner of the `deliver-queue'")))
(queues :initform '((:low . 0)
(:normal . 1)
(:high . 2))
:accessor queues
:documentation "A set of work queues.
If an alist of (:name . priority) it will be transformed
to a hashtable of {:name => (ev:idle-watcher :priority priority)}")))
26 changes: 21 additions & 5 deletions src/methods.lisp
Expand Up @@ -2,14 +2,30 @@

;; Methods
(defmethod initialize-instance :after ((hinge hinge) &key)
(setf (bg-pool hinge)
(make-instance 'pool :owner hinge)
"Remap the queues descriptions to an HT, then init the pool"
(when-let (q-desc (and (listp (queues hinge)) (every #'consp (queues hinge))
(queues hinge)))
(setf (queues hinge) (make-hash-table))
(mapc #'(lambda (name-priority)
(destructuring-bind (name . priority) name-priority
(format t "=> Making queue: ~S => ~S~%" name priority)

(defer-queue hinge)
(make-instance 'running-queue :owner hinge)))
(setf (gethash name (queues hinge))
(make-instance 'running-queue :owner hinge :priority priority))))
q-desc))


(setf (bg-pool hinge) (make-instance 'pool :owner hinge)))

(defmethod queue-work ((hinge hinge) work &optional (queue :low))
"Enqueue the `work' thunk into the `queue' queue within `hinge'"
(enqueue (gethash queue (queues hinge)) work))

(defmethod close ((hinge hinge) &key &allow-other-keys)
(close (defer-queue hinge))
(maphash #'(lambda (name queue)
(declare (ignore name))
(close queue))
(queues hinge))
(close (bg-pool hinge)))

(defmethod run ((hinge (eql :default)))
Expand Down

0 comments on commit 68c7376

Please sign in to comment.