diff --git a/src/emitter.lisp b/src/emitter.lisp index 4020645..b62bacd 100644 --- a/src/emitter.lisp +++ b/src/emitter.lisp @@ -46,52 +46,16 @@ only fires once, then it is removed.")) (flet ((queue-cb (cb) (when (remhash cb (oneshots emitter)) (remove-listener emitter event cb)) - (enqueue (deliver-queue (owner emitter)) (lambda () (apply cb args))))) + (queue-work (owner emitter) (curry #'apply cb args) :normal))) (mapc #'queue-cb registered)))) -(defmethod deliver :after ((emitter emitter) event args) - (declare (ignore event args)) - (flet ((deliver-cb (l w e) - (declare (ignore l e)) - (log-for (debug) "Deliver queue: ~S" (deliver-queue (owner emitter))) - (if-let (thunk (dequeue (deliver-queue (owner emitter)))) - (funcall thunk) - (ev:stop-watcher (owner emitter) w :keep-callback t)))) - - (unless (deliver-runner (owner emitter)) - (setf (deliver-runner (owner emitter)) - (let ((watcher (make-instance 'ev:ev-idle))) - (ev:set-idle (owner emitter) watcher #'deliver-cb) - (incf (ev:watcher-slot watcher :priority)) - watcher))) - - (unless (queue-empty-p (deliver-queue (owner emitter))) - (ev:start-watcher (owner emitter) (deliver-runner (owner emitter)))))) - (defmethod emit ((emitter emitter) (event string) &rest args) "Enqueue the delivery of an `event' with `args'. The event will be delivered at some point in the future, but very soon, in the order that it was emitted relative to other events." (flet ((emit-thunk () (deliver emitter event args))) - (enqueue (emit-queue (owner emitter)) #'emit-thunk))) - -(defmethod emit :after ((emitter emitter) (event string) &rest args) - (declare (ignore args)) - (flet ((emitter-cb (l w e) - (declare (ignore l e)) - (log-for (debug) "Emit queue: ~S" (emit-queue (owner emitter))) - (if-let (thunk (dequeue (emit-queue (owner emitter)))) - (funcall thunk) - (ev:stop-watcher (owner emitter) w :keep-callback t)))) - - (unless (emit-runner (owner emitter)) - (setf (emit-runner (owner emitter)) - (let ((watcher (make-instance 'ev:ev-idle))) - (ev:set-idle (owner emitter) watcher #'emitter-cb) - watcher))) - - (ev:start-watcher (owner emitter) (emit-runner (owner emitter))))) + (queue-work (owner emitter) #'emit-thunk))) (defmethod add-listener ((emitter emitter) (event string) (callback symbol)) (add-listener emitter event (symbol-function callback))) diff --git a/src/generics.lisp b/src/generics.lisp index e44ff9c..29acab7 100644 --- a/src/generics.lisp +++ b/src/generics.lisp @@ -13,7 +13,11 @@ to be invoked every `timeout'")) (defgeneric clear (hinge handle) (:documentation "Clear the registration of a watcher (e.g. timeout or interval) named by `handle'")) +(defgeneric queue-work (hinge work &optional queue) + (:documentation "Queue a thunk, `work', into a work queue on +`hinge'. If no `queue' is named, `:low' should be used.")) + ;; Wrappers (defmacro defer ((hinge) &body forms) - `(enqueue (defer-queue ,hinge) - (lambda () ,@forms))) + "Enqueue work `forms' into the low priority queue of `hinge'" + `(queue-work ,hinge (lambda () ,@forms) :low)) diff --git a/src/hinge.lisp b/src/hinge.lisp index d0cbb14..90b27b5 100644 --- a/src/hinge.lisp +++ b/src/hinge.lisp @@ -16,7 +16,7 @@ ((owner :initarg :owner :accessor owner) (queue :initform (make-instance 'queue) :accessor queue) - (priority :initform 0 :accessor priority) + (priority :initarg :priority :initform 0 :accessor priority) (runner :initform (make-instance 'ev:ev-idle) :accessor runner)) (:metaclass c2mop:funcallable-standard-class) (:documentation "A wrapper that binds together a queue and an idle runner @@ -49,18 +49,10 @@ Set the watcher priority and bind the instance as the callback" ((bg-pool :accessor bg-pool :documentation "Background work threadpool") - (defer-queue :accessor defer-queue) - - (emit-queue :accessor emit-queue - :initform (make-instance 'queue) - :documentation "Queue of event emissions.") - (emit-runner :initform nil - :accessor emit-runner - :documentation "The queue runner of the `emit-queue'") - - (deliver-queue :accessor deliver-queue - :initform (make-instance 'queue) - :documentation "Queue of event deliveries.") - (deliver-runner :initform nil - :accessor deliver-runner - :documentation "The queue runner of the `deliver-queue'"))) + (queues :initform '((:low . 0) + (:normal . 1) + (:high . 2)) + :accessor queues + :documentation "A set of work queues. +If an alist of (:name . priority) it will be transformed +to a hashtable of {:name => (ev:idle-watcher :priority priority)}"))) diff --git a/src/methods.lisp b/src/methods.lisp index 1f64045..56bec69 100644 --- a/src/methods.lisp +++ b/src/methods.lisp @@ -2,14 +2,30 @@ ;; Methods (defmethod initialize-instance :after ((hinge hinge) &key) - (setf (bg-pool hinge) - (make-instance 'pool :owner hinge) + "Remap the queues descriptions to an HT, then init the pool" + (when-let (q-desc (and (listp (queues hinge)) (every #'consp (queues hinge)) + (queues hinge))) + (setf (queues hinge) (make-hash-table)) + (mapc #'(lambda (name-priority) + (destructuring-bind (name . priority) name-priority + (format t "=> Making queue: ~S => ~S~%" name priority) - (defer-queue hinge) - (make-instance 'running-queue :owner hinge))) + (setf (gethash name (queues hinge)) + (make-instance 'running-queue :owner hinge :priority priority)))) + q-desc)) + + + (setf (bg-pool hinge) (make-instance 'pool :owner hinge))) + +(defmethod queue-work ((hinge hinge) work &optional (queue :low)) + "Enqueue the `work' thunk into the `queue' queue within `hinge'" + (enqueue (gethash queue (queues hinge)) work)) (defmethod close ((hinge hinge) &key &allow-other-keys) - (close (defer-queue hinge)) + (maphash #'(lambda (name queue) + (declare (ignore name)) + (close queue)) + (queues hinge)) (close (bg-pool hinge))) (defmethod run ((hinge (eql :default)))