Browse files

Initial import.

  • Loading branch information...
0 parents commit 6f662d321a2c3a59aaa56a6b6088cff7211cb2cd @vy committed Apr 30, 2009
Showing with 1,432 additions and 0 deletions.
  1. +23 −0 LICENSE
  2. +191 −0 README
  3. +56 −0 patron.asd
  4. +40 −0 src/job.lisp
  5. +119 −0 src/keeper.lisp
  6. +62 −0 src/packages.lisp
  7. +60 −0 src/patron.lisp
  8. +105 −0 src/queue.lisp
  9. +59 −0 src/semaphore.lisp
  10. +354 −0 src/specials.lisp
  11. +68 −0 src/thread.lisp
  12. +65 −0 src/timeout.lisp
  13. +54 −0 src/utils.lisp
  14. +176 −0 src/worker.lisp
23 LICENSE
@@ -0,0 +1,23 @@
+Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+- Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+- Redistributions in binary form must reproduce the above copyright notice, this
+ list of conditions and the following disclaimer in the documentation and/or
+ other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
191 README
@@ -0,0 +1,191 @@
+
+ ____ ____ ______ ____ ___ ____
+ !! ! | \ / | T \ / \| \ !!!!
+ !!! | o Y o | | D Y Y _ l ! !!!
+ !!!! | _/| l_ __j | O | | | !!!!
+ !!: | | | _ | | | | \| | | | !: :!
+ :!: | | | | | | | | . l ! | | : ::
+ :: l__j l__j__j l__j l__j\_j\___/l__j__j : :
+
+
+--- '( A B S T R A C T ) -------------------------------------------------------
+
+Patron is a multi-consumer/multi-producer thread pooling library written in
+Common Lisp with flexibility and performance in mind. You simply create a
+`PATRON' with a job queue of fixed size, specify a fixed number of `WORKER'
+threads and start submitting your `JOB's into the work queue.
+
+While Patron is written in portable Common Lisp in mind, because of some
+platform specific features[1] used, it currently works on SBCL[2] and CCL[3]
+platforms. As a side note, Patron currently depends on bordeaux-threads[4]
+library for common threading functionalities.
+
+
+[1] Semaphores, missing threading features (`THREAD-JOIN', `WITHOUT-INTERRUPTS',
+ etc.) in bordeaux-threads, `WITH-TIMEOUT' macro.
+
+[2] Steel Bank Common Lisp (http://www.sbcl.org/)
+
+[3] Clozure Common Lisp (http://openmcl.clozure.com/)
+
+[4] http://common-lisp.net/project/bordeaux-threads/
+
+
+--- '( E X A M P L E ) ---------------------------------------------------------
+
+Below basic example should get you to a point where you can start creating
+threading pools in minutes.
+
+(defvar *stream* *standard-output*)
+
+(defvar *stream-lock* (bt:make-lock))
+
+(defun safe-format (fmt &rest args)
+ (bt:with-lock-held (*stream-lock*)
+ (apply #'format *stream* fmt args)))
+
+(defun thread-stats (patron)
+ (safe-format
+ "Keepers: ~{~A~^ ~}~%Workers: ~{~A~^ ~}~%"
+ (map 'list #'patron::thread-alive-p (patron::keepers-of patron))
+ (map 'list #'patron::thread-alive-p (patron::workers-of patron))))
+
+(defun job ()
+ (let ((duration (random 5)))
+ (safe-format " ~S => Sleeping... [~A]~%" (bt:current-thread) duration)
+ (sleep duration)
+ (safe-format " ~S => Done!~%" (bt:current-thread))))
+
+(defun report-result (job)
+ (safe-format "RESULT: JOB: ~S~%" job))
+
+(defun patron-test ()
+ (let* ((patron
+ (make-instance
+ 'patron:patron
+ :worker-capacity 3
+ :job-capacity 32
+ :worker-timeout-duration 3)))
+ (safe-format "Starting...~%")
+ (patron:start-patron patron)
+ (sleep 1.0)
+ (thread-stats patron)
+ (safe-format "Submitting jobs...~%")
+ (loop repeat 5
+ do (patron:submit-job
+ patron
+ (make-instance
+ 'patron:job
+ :function #'job
+ :result-report-function #'report-result)))
+ (safe-format "Submitted.~%")
+ (safe-format "Stopping...~%")
+ (patron:stop-patron patron :wait t)
+ (safe-format "Stopped.~%")
+ (thread-stats patron)))
+; Starting...
+; Keepers: T T
+; Workers: T T T
+; Submitting jobs...
+; Submitted.
+; Stopping...
+; #<SB-THREAD:THREAD "Anonymous" RUNNING {1003A009A1}> => Sleeping... [1]
+; #<SB-THREAD:THREAD "Anonymous" RUNNING {1003A00BF1}> => Sleeping... [2]
+; #<SB-THREAD:THREAD "Anonymous" RUNNING {1003A00E41}> => Sleeping... [0]
+; #<SB-THREAD:THREAD "Anonymous" RUNNING {1003A00E41}> => Done!
+; RESULT: JOB: #<PATRON:JOB :FUNCTION #<FUNCTION TEST::JOB> :START-TIME "2009-04-30 09:07:34" :FINISH-TIME "2009-04-30 09:07:34" {1003E0F2B1}>
+; #<SB-THREAD:THREAD "Anonymous" RUNNING {1003A00E41}> => Sleeping... [2]
+; #<SB-THREAD:THREAD "Anonymous" RUNNING {1003A009A1}> => Done!
+; RESULT: JOB: #<PATRON:JOB :FUNCTION #<FUNCTION TEST::JOB> :START-TIME "2009-04-30 09:07:34" :FINISH-TIME "2009-04-30 09:07:35" {1003E0D3C1}>
+; #<SB-THREAD:THREAD "Anonymous" RUNNING {1003A009A1}> => Sleeping... [2]
+; #<SB-THREAD:THREAD "Anonymous" RUNNING {1003A00BF1}> => Done!
+; RESULT: JOB: #<PATRON:JOB :FUNCTION #<FUNCTION TEST::JOB> :START-TIME "2009-04-30 09:07:34" :FINISH-TIME "2009-04-30 09:07:36" {1003E0EF71}>
+; #<SB-THREAD:THREAD "Anonymous" RUNNING {1003A00E41}> => Done!
+; RESULT: JOB: #<PATRON:JOB :FUNCTION #<FUNCTION TEST::JOB> :START-TIME "2009-04-30 09:07:34" :FINISH-TIME "2009-04-30 09:07:36" {1003E0F5E1}>
+; #<SB-THREAD:THREAD "Anonymous" RUNNING {1003A009A1}> => Done!
+; RESULT: JOB: #<PATRON:JOB :FUNCTION #<FUNCTION TEST::JOB> :START-TIME "2009-04-30 09:07:35" :FINISH-TIME "2009-04-30 09:07:37" {1003E0F911}>
+; Stopped.
+; Keepers: NIL NIL
+; Workers: NIL NIL NIL
+
+
+--- '( D O C U M E N T A T I O N ) ---------------------------------------------
+
+Before going into the syntatical details, here is a general figure about the
+inner workings of Patron.
+
+- Queue operations take action in a blocking manner and are wrapped by
+ `WITH-TIMEOUT' statements.
+
+- There are no busy waits, synchronized access is supplied using semaphores.
+
+- Using a two-lock concurrent queue algorithm, consumer and producer lockings
+ are separated from each other for performance purposes .
+
+- There are two keeper threads where each keeper is ensuring its partners
+ existence and first (master) keeper ensuring the existence of specified number
+ of worker threads.
+
+While Patron source code is fully documented, below you'll find the
+documentation excerpts from the source code for exported symbols.
+
+
+[Condition] timeout-condition (error-condition)
+
+ [Slot] time - Time condition instance is created.
+ [Slot] duration - Elapsed duration before condition is raised.
+
+ Condition thrown when the duration specified in the `WITH-TIMEOUT' is
+ exceeded. (`TIME' slot is inherited from `ERROR-CONDITION'.)
+
+
+[Function] default-error-report (condition)
+
+ Default function for reporting errors.
+
+
+[Class] job ()
+
+ [Slot] function - Function will be called to start the execution.
+ [Slot] result-report-function - Function will be called to report the result.
+ [Slot] error-report-function - Function will be called to report an error.
+ [Slot] submit-time - Job queue entrance time.
+ [Slot] start-time - Job execution start time.
+ [Slot] finish-time - Job execution finish time.
+ [Slot] condition - Signaled condition in case of a failure.
+ [Slot] result - Job result in case of no failure.
+
+
+[Class] patron ()
+
+ [Slot] error-report-function - Will get called for management related
+ errors -- e.g when found a dead worker, keeper, etc.
+ [Slot] job-capacity - Upper limit on the job queue size.
+ [Slot] worker-capacity - Number of serving `WORKER's.
+ [Slot] worker-timeout-duration - Time limit on the work processing duration.
+ [Slot] keeper-timeout-duration - Wait period for keepers.
+
+
+[Function] submit-job (patron job)
+
+ Submit given `JOB' into the job queue of `PATRON'. Function works in a
+ blocking manner and returns inserted `JOB', or throws a `TIMEOUT-CONDITION'.
+
+
+[Function] worker-stats (patron)
+
+ Returns a property list of minimum, maximum, and average statistics of
+ `N-FAILURES', `FAIL-DURATION', `BUSY-DURATION', and `IDLE-DURATION' slots
+ among workers. Function blocks job queue while gathering statistics.
+
+
+[Function] start-patron (patron)
+ After switching `STATE' to `:ACTIVE', starts `WORKERS's and `KEEPER's in
+ order.
+
+
+[Function] stop-patron (patron &key wait kill)
+
+ After switching `STATE' to `:INACTIVE', stops `KEEPER's and `WORKER's in
+ order. For related effects of keyword arguments see documentation of
+ `STOP-KEEPERS' and `STOP-WORKERS' functions.
56 patron.asd
@@ -0,0 +1,56 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :cl-user)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Patron System Definitions
+;;;
+
+(defpackage :patron-system (:use :cl :asdf))
+
+(in-package :patron-system)
+
+(defsystem :patron
+ :author "Volkan YAZICI <volkan.yazici@gmail.com>"
+ :licence "BSD"
+ :description "A compact thread pool implementation."
+ :depends-on (:bordeaux-threads)
+ :components ((:module "src"
+ :serial t
+ :components ((:file "packages")
+ (:file "specials")
+ (:file "utils")
+ (:file "semaphore")
+ (:file "thread")
+ (:file "queue")
+ (:file "timeout")
+ (:file "job")
+ (:file "worker")
+ (:file "keeper")
+ (:file "patron")))))
40 src/job.lisp
@@ -0,0 +1,40 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :patron)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Job Submission Routines
+;;;
+
+(defun submit-job (patron job)
+ "Submit given `JOB' into the job queue of `PATRON'. Function works in a
+blocking manner and returns inserted `JOB', or throws a `TIMEOUT-CONDITION'."
+ (setf (submit-time-of job) (get-universal-time))
+ (or (queue-timed-push (jobs-of patron) job (worker-timeout-duration-of patron))
+ (error 'timeout-condition :duration (worker-timeout-duration-of patron))))
119 src/keeper.lisp
@@ -0,0 +1,119 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :patron)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Keeper Routines
+;;;
+
+(defun keeper (keeper patron)
+ "Keeper function to ensure the existence of its parent `KEEPER' and `WORKER's.
+
+In case of a dead `KEEPER'/`WORKER' instance is found,
+`N-KEEPER-FAILURES'/`N-WORKER-FAILURES' slot is incremented and
+`ERROR-REPORT-FUNCTION' the `PATRON' is called with the inactive instance as
+argument.
+
+Function loops infinitely by checking if `STATE' is still `:ACTIVE' before every
+`KEEPER-TIMEOUT-DURATION' interval. In case of an error, `CONDITION' slot of the
+`KEEPER' is filled appropriately."
+ (handler-case
+ ;; Loop over `STATE' of `PATRON'.
+ (loop with keeper-pos = (position keeper (keepers-of patron))
+ with partner = (elt (keepers-of patron) (if (zerop keeper-pos) 1 0))
+ for state = (with-lock (state-lock-of patron) (state-of patron))
+ while (eql state :active)
+ do (progn
+ ;; Check if our partner `KEEPER' is in mood.
+ (without-interrupts
+ (unless (thread-alive-p partner)
+ (thread-start partner)
+ (incf (n-keeper-failures-of keeper))
+ (funcall (error-report-function-of patron) partner)))
+ ;; Check if `WORKER's are in good shape.
+ (when (zerop keeper-pos)
+ (loop for worker across (workers-of patron)
+ unless (thread-alive-p worker)
+ do (progn
+ (thread-start worker)
+ (incf (n-worker-failures-of keeper))
+ (funcall (error-report-function-of patron)
+ worker))))
+ ;; Time to take a nap.
+ (sleep (keeper-timeout-duration-of patron))))
+ ;; Record the error and exit.
+ (error (c) (setf (condition-of keeper) c)))
+ (setf (finish-time-of keeper) (get-universal-time)))
+
+(defun make-keeper (patron)
+ "Make an appropriate `KEEPER' instance with a specific wrapper function around
+`KEEPER' function."
+ (prog1-let (keeper (make-instance 'keeper))
+ (setf (function-of keeper) (lambda () (keeper keeper patron)))))
+
+(defun wait-keeper (keeper)
+ "Wait for `KEEPER' to exit."
+ (thread-join keeper))
+
+(defun kill-keeper (keeper)
+ (thread-interrupt
+ keeper
+ (lambda ()
+ ;; This condition should be catched by the `ERROR' handler of the main
+ ;; `HANDLER-CASE' expression in `KEEPER' function appropriately.
+ (error 'kill-condition))))
+
+(defun start-keepers (patron)
+ "Starts `KEEPER's and waits for them to wake up. Function returns given
+`PATRON'."
+ (prog1 patron
+ ;; TODO: Instead of allocating a keeper couple from scratch, try to use
+ ;; existing ones if there is any.
+ (setf (keepers-of patron)
+ (coerce (loop repeat 2 collect (make-keeper patron))
+ '(simple-array keeper (2))))
+ ;; We need to hold the `STATE' lock to avoid first `KEEPER' trying to wake
+ ;; its parent up while creating both `KEEPER' threads for the first time.
+ (with-lock (state-lock-of patron)
+ (map nil #'thread-start (keepers-of patron)))))
+
+(defun stop-keepers (patron &key kill wait)
+ "Function does nothing -- assuming `STATE' is switched to `:INACTIVE',
+`KEEPER' function will exit in the next loop round. Function returns given
+`PATRON'.
+
+If `KILL' is true, function will try to terminate every keeper via throwing a
+`KILL-CONDITION'. `CONDITION' slot of related `KEEPER's will get set to this
+condition appropriately.
+
+If `WAIT' is true, function will wait (at most `KEEPER-TIMEOUT-DURATION') for
+`KEEPER's to exit."
+ (prog1 patron
+ (when kill (map nil #'kill-keeper (keepers-of patron)))
+ (when wait (map nil #'wait-keeper (keepers-of patron)))))
62 src/packages.lisp
@@ -0,0 +1,62 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :cl-user)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Package Definitions
+;;;
+
+(defpackage :patron
+ (:use :cl)
+ (:export
+ ;; Specials
+ :timeout-duration
+ :duration-of
+ :time-of
+ :job
+ :function-of
+ :result-report-function-of
+ :error-report-function-of
+ :submit-time-of
+ :start-time-of
+ :finish-time-of
+ :condition-of
+ :result-of
+ :patron
+ :error-report-function-of
+ :job-capacity-of
+ :worker-capacity-of
+ :worker-timeout-duration-of
+ :keeper-timeout-duration-of
+ ;; Functions
+ :default-error-report
+ :submit-job
+ :worker-stats
+ :start-patron
+ :stop-patron))
60 src/patron.lisp
@@ -0,0 +1,60 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :patron)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Management Routines
+;;;
+
+(defun rotate-patron-state (patron target-state)
+ "Switches `STATE' slot of `PATRON' to specified `TARGET-STATE'."
+ (with-lock (state-lock-of patron)
+ (assert
+ (eql (state-of patron)
+ (ecase target-state
+ (:inactive :active)
+ (:active :inactive))))
+ (setf (state-of patron) target-state)))
+
+(defun start-patron (patron)
+ "After switching `STATE' to `:ACTIVE', starts `WORKERS's and `KEEPER's in
+order."
+ (prog1 patron
+ (rotate-patron-state patron :active)
+ (start-workers patron)
+ (start-keepers patron)))
+
+(defun stop-patron (patron &key wait kill)
+ "After switching `STATE' to `:INACTIVE', stops `KEEPER's and `WORKER's in
+order. For related effects of keyword arguments see documentation of
+`STOP-KEEPERS' and `STOP-WORKERS' functions."
+ (prog1 patron
+ (rotate-patron-state patron :inactive)
+ (stop-keepers patron :wait wait)
+ (stop-workers patron :wait wait :kill kill)))
105 src/queue.lisp
@@ -0,0 +1,105 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :patron)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Size Bounded Two-Way Blocking FIFO Queue
+;;;
+
+;;; Algorithm is adopted from "Simple, Fast, and Practical Non-Blocking and
+;;; Blocking Concurrent Queue Algorithms" paper by Maged M. Michael and Michael
+;;; L. Scott. (It's told that algorithm performs well on dedicated
+;;; multiprocessors under high contention. Useful for multiprocessors without a
+;;; universal atomic primitive. YMMV.)
+
+(defmethod shared-initialize :after ((queue queue) slot-names &rest initargs)
+ (declare (ignore slot-names initargs))
+ (prog1 queue
+ (setf (head-of queue) (list t)
+ (tail-of queue) (head-of queue)
+ (pop-lock-of queue) (lock-make)
+ (pop-semaphore-of queue) (semaphore-make)
+ (push-lock-of queue) (lock-make)
+ (push-semaphore-of queue) (semaphore-make (size-of queue)))))
+
+(defun %queue-pop (queue &optional default)
+ "Pops an item from the given `QUEUE'. Function returns `DEFAULT' in case of no
+available elements found."
+ (with-lock (pop-lock-of queue)
+ (let ((head (cdr (head-of queue))))
+ (if head
+ (prog1 (car head)
+ (setf (head-of queue) head))
+ default))))
+
+(defun queue-pop (queue)
+ "Pops an item from the given `QUEUE'. Function blocks if there isn't any
+available item in the queue."
+ (semaphore-wait (pop-semaphore-of queue))
+ (prog1 (%queue-pop queue)
+ (semaphore-signal (push-semaphore-of queue))))
+
+(defun queue-timed-pop (queue duration &optional timeout)
+ "Works like `QUEUE-POP', but function returns `TIMEOUT' if no available
+elements found in given `DURATION'."
+ (if (semaphore-timed-wait (pop-semaphore-of queue) duration)
+ (prog1 (%queue-pop queue timeout)
+ (semaphore-signal (push-semaphore-of queue)))
+ timeout))
+
+(defun %queue-push (queue item)
+ "Pushes given `ITEM' into the `QUEUE'. Function returns supplied `ITEM'."
+ (with-lock (push-lock-of queue)
+ (prog1 item
+ (let ((tail (tail-of queue)))
+ (setf (cdr tail) (list item)
+ (tail-of queue) (cdr tail))))))
+
+(defun queue-push (queue item)
+ "Tries to push given `ITEM' into the `QUEUE'. If queue size gets exceeded,
+function blocks until at least an item is consumed from the queue. Function
+returns supplied `ITEM'."
+ (semaphore-wait (push-semaphore-of queue))
+ (prog1 (%queue-push queue item)
+ (semaphore-signal (pop-semaphore-of queue))))
+
+(defun queue-timed-push (queue item duration &optional timeout)
+ "Works like `QUEUE-PUSH', but function returns `TIMEOUT' if no push occurs in
+given `DURATION'."
+ (if (semaphore-timed-wait (push-semaphore-of queue) duration)
+ (prog1 (%queue-push queue item)
+ (semaphore-signal (pop-semaphore-of queue)))
+ timeout))
+
+(defmacro with-blocking-queue-operations (queue &body body)
+ "Function blocks any physical push/pop operations on the `QUEUE' while
+execution `BODY'."
+ `(with-lock (push-lock-of ,queue)
+ (with-lock (pop-lock-of ,queue)
+ ,@body)))
59 src/semaphore.lisp
@@ -0,0 +1,59 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :patron)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Semaphore Abstraction Layer
+;;;
+
+(defun semaphore-wait (semaphore)
+ #+ccl (ccl:wait-on-semaphore semaphore)
+ #+sbcl (sb-thread:wait-on-semaphore semaphore)
+ #-(or ccl sbcl) (error "`SEMAPHORE-WAIT' is not supported!"))
+
+(defun semaphore-timed-wait (semaphore duration)
+ #+ccl (ccl:timed-wait-on-semaphore semaphore duration)
+ #+sbcl (handler-case
+ (sb-ext:with-timeout duration
+ (sb-thread:wait-on-semaphore semaphore))
+ (sb-ext:timeout ()))
+ #-(or ccl sbcl) (error "`SEMAPHORE-TIMED-WAIT' is not supported!"))
+
+(defun semaphore-signal (semaphore)
+ #+ccl (ccl:signal-semaphore semaphore)
+ #+sbcl (sb-thread:signal-semaphore semaphore)
+ #-(or ccl sbcl) (error "`SEMAPHORE-SIGNAL' is not supported!"))
+
+(defun semaphore-make (&optional (count 0))
+ (prog1-let
+ (semaphore
+ #+ccl (ccl:make-semaphore)
+ #+sbcl (sb-thread:make-semaphore)
+ #-(or ccl sbcl) (error "`MAKE-SEMAPHORE' is not supported!"))
+ (loop repeat count do (semaphore-signal semaphore))))
354 src/specials.lisp
@@ -0,0 +1,354 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :patron)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Common Error Reporting Related Definitions
+;;;
+
+(defvar *error-stream* *debug-io*
+ "Generic stream used by default error reporting functions.")
+
+(define-condition error-condition (error)
+ ((time
+ :initform (get-universal-time)
+ :reader time-of
+ :type (integer 1 *)
+ :documentation "Time condition instance is created."))
+ (:documentation "Generic wrapper condition for application specific
+ conditions."))
+
+(defun time->string (time)
+ (destructuring-bind (s min h day mon y dow dlp z)
+ (multiple-value-list (decode-universal-time time))
+ (declare (ignore dow dlp z))
+ (format nil "~4,'0D-~2,'0D-~2,'0D ~2,'0D:~2,'0D:~2,'0D" y mon day h min s)))
+
+(define-condition kill-condition (error-condition)
+ ()
+ (:documentation "Condition passed to the `CONDITION' slot of a `KEEPER'/`JOB'
+ while killing a keeper/worker."))
+
+(defmethod print-object ((kill-condition kill-condition) stream)
+ (print-unreadable-object (kill-condition stream :type t)
+ (format stream ":TIME ~S" (time->string (time-of kill-condition)))))
+
+(define-condition timeout-condition (error-condition)
+ ((duration
+ :initarg :duration
+ :reader duration-of
+ :type (integer 1 *)
+ :documentation "Elapsed duration before condition is raised."))
+ (:documentation "Condition thrown when the duration specified in the
+ `WITH-TIMEOUT' is exceeded."))
+
+(defmethod print-object ((timeout-condition timeout-condition) stream)
+ (print-unreadable-object (timeout-condition stream :type t)
+ (format stream ":DURATION ~A :TIME ~S"
+ (duration-of timeout-condition)
+ (time->string (time-of timeout-condition)))))
+
+(defun default-error-report (condition)
+ "Default function for reporting errors."
+ (format *error-stream* "~A - ~S~%"
+ (time->string
+ (if (and (slot-exists-p condition 'time)
+ (slot-boundp condition 'time))
+ (time-of condition)
+ (get-universal-time)))
+ condition))
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Queue Related Definitions
+;;;
+
+(defclass queue ()
+ ((head
+ :accessor head-of
+ :type list)
+ (tail
+ :accessor tail-of
+ :type list)
+ (size
+ :initarg :size
+ :reader size-of
+ :type (integer 1 *)
+ :documentation "Maximum # of items allowed in the queue.")
+ (pop-lock
+ :accessor pop-lock-of
+ :documentation "Lock serializing pop operations.")
+ (pop-semaphore
+ :accessor pop-semaphore-of
+ :documentation "Semaphore blocking pop operations while queue is empty.")
+ (push-lock
+ :accessor push-lock-of
+ :documentation "Lock serializing push operations.")
+ (push-semaphore
+ :accessor push-semaphore-of
+ :documentation "Semaphore blocking push operations while queue is full."))
+ (:documentation "Size bounded two-lock concurrent FIFO queue."))
+
+(defmethod print-object ((queue queue) stream)
+ (print-unreadable-object (queue stream :identity t :type t)
+ (format stream ":HEAD ~S :TAIL ~S :SIZE ~A"
+ (head-of queue) (tail-of queue) (size-of queue))))
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Job Related Definitions
+;;;
+
+(defclass job ()
+ ((function
+ :initform (error "Missing `FUNCTION'!")
+ :initarg :function
+ :reader function-of
+ :type function
+ :documentation "Function will be called to start the execution.")
+ (result-report-function
+ :initarg :result-report-function
+ :reader result-report-function-of
+ :type function
+ :documentation "Function will be called to report the result.")
+ (error-report-function
+ :initform #'default-error-report
+ :initarg :error-report-function
+ :reader error-report-function-of
+ :type function
+ :documentation "Function will be called to report an error.")
+ (submit-time
+ :accessor submit-time-of
+ :documentation "Job queue entrance time.")
+ (start-time
+ :accessor start-time-of
+ :documentation "Job execution start time.")
+ (finish-time
+ :accessor finish-time-of
+ :documentation "Job execution finish time.")
+ (condition
+ :accessor condition-of
+ :documentation "Signaled condition in case of a failure.")
+ (result
+ :accessor result-of
+ :documentation "Job result in case of no failure.")))
+
+(defmethod print-object ((job job) stream)
+ (print-unreadable-object (job stream :identity t :type t)
+ (format
+ stream
+ (concatenate
+ 'string
+ ":FUNCTION ~S"
+ "~@[ :START-TIME ~S~]"
+ "~@[ :FINISH-TIME ~S~]"
+ "~@[ :CONDITION ~S~]"
+ "~@[ :RESULT ~S~]")
+ (function-of job)
+ (when (slot-boundp job 'start-time) (time->string (start-time-of job)))
+ (when (slot-boundp job 'finish-time) (time->string (finish-time-of job)))
+ (when (slot-boundp job 'condition) (condition-of job))
+ (when (slot-boundp job 'result) (result-of job)))))
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Thread Related Defitions
+;;;
+
+(defclass thread ()
+ ((id
+ :accessor id-of
+ :documentation "Implementation dependent thread identifier.")
+ (function
+ :accessor function-of
+ :documentation "Function executed by current thread.")))
+
+(defmethod print-object ((thread thread) stream)
+ (print-unreadable-object (thread stream :identity t :type t)
+ (format stream "~@[ :ID ~S~]~@[ :FUNCTION ~S~]"
+ (when (slot-boundp thread 'id) (id-of thread))
+ (when (slot-boundp thread 'function) (function-of thread)))))
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Worker Related Definitions
+;;;
+
+(defclass worker (thread)
+ ((last-start-time
+ :accessor last-start-time-of
+ :documentation "Last time worker started a job.")
+ (last-finish-time
+ :accessor last-finish-time-of
+ :documentation "Last time worker finished a job.")
+ (n-failures
+ :initform 0
+ :accessor n-failures-of
+ :type (integer 0 *)
+ :documentation "# of failed processings.")
+ (fail-duration
+ :initform 0
+ :accessor fail-duration-of
+ :type (integer 0 *)
+ :documentation "Total duration spend on failed processings.")
+ (busy-duration
+ :initform 0
+ :accessor busy-duration-of
+ :type (integer 0 *)
+ :documentation "Total non-idle duration.")
+ (idle-duration
+ :initform 0
+ :accessor idle-duration-of
+ :type (integer 0 *)
+ :documentation "Total duration worker stayed idle.")))
+
+(defmethod print-object ((worker worker) stream)
+ (print-unreadable-object (worker stream :identity t :type t)
+ (format
+ stream ":LAST-START-TIME ~S :LAST-FINISH-TIME ~S :IDLE-DURATION ~A"
+ (time->string (last-start-time-of worker))
+ (time->string (last-finish-time-of worker))
+ (idle-duration-of worker))))
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Keeper Related Definitions
+;;;
+
+(defclass keeper (thread)
+ ((start-time
+ :initform (get-universal-time)
+ :accessor start-time-of
+ :documentation "Birth date.")
+ (finish-time
+ :accessor finish-time-of
+ :documentation "Exit/Crash date.")
+ (condition
+ :initform nil
+ :accessor condition-of
+ :documentation "Condition catched in case of a crash.")
+ (n-keeper-failures
+ :initform 0
+ :accessor n-keeper-failures-of
+ :documentation "# of `KEEPER' failures found.")
+ (n-worker-failures
+ :initform 0
+ :accessor n-worker-failures-of
+ :documentation "# of `WORKER' failures found.")))
+
+(defmethod print-object ((keeper keeper) stream)
+ (print-unreadable-object (keeper stream :identity t :type t)
+ (format stream
+ (concatenate
+ 'string
+ ":START-TIME ~S"
+ "~@[ :FINISH-TIME ~S~]"
+ "~@[ :CONDITION ~S~]"
+ " :N-KEEPER-FAILURES ~A"
+ " :N-WORKER-FAILURES ~A")
+ (time->string (start-time-of keeper))
+ (when (slot-boundp keeper 'finish-time)
+ (time->string (finish-time-of keeper)))
+ (when (slot-boundp keeper 'condition) (condition-of keeper))
+ (n-keeper-failures-of keeper)
+ (n-worker-failures-of keeper))))
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Patron Related Definitions
+;;;
+
+(defclass patron ()
+ ((state
+ :initform :inactive
+ :accessor state-of
+ :type keyword
+ :documentation "State of the patron; either `ACTIVE', or `INACTIVE'.")
+ (state-lock
+ :initform (lock-make)
+ :reader state-lock-of
+ :documentation "Synchronization primitive for `STATE' slot.")
+ (error-report-function
+ :initform #'default-error-report
+ :initarg :error-report-function
+ :reader error-report-function-of
+ :type function
+ :documentation "Will get called for management related errors -- e.g when
+ found a dead worker, keeper, etc.")
+ ;; `JOB' Specific Slots ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+ (jobs
+ :accessor jobs-of
+ :type queue
+ :documentation "FIFO queue of `JOB's waiting to be processed.")
+ (job-capacity
+ :initform (error "Missing `JOB-CAPACITY'!")
+ :initarg :job-capacity
+ :reader job-capacity-of
+ :type (integer 1 *)
+ :documentation "Upper limit on the job queue size.")
+ ;; `WORKER' Specific Slots ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+ (workers
+ :accessor workers-of
+ :type (simple-array worker (*))
+ :documentation "Vector of serving `WORKER's.")
+ (worker-capacity
+ :initform (error "Missing `WORKER-CAPACITY'!")
+ :initarg :worker-capacity
+ :reader worker-capacity-of
+ :type (integer 1 *)
+ :documentation "Number of serving `WORKER's.")
+ (worker-timeout-duration
+ :initform (error "Missing `WORKER-TIMEOUT-DURATION'!")
+ :initarg :worker-timeout-duration
+ :reader worker-timeout-duration-of
+ :type (integer 1 *)
+ :documentation "Time limit on the work processing duration.")
+ ;; Keeper Specific Slots ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+ (keepers
+ :accessor keepers-of
+ :documentation "`KEEPER' couple for `WORKER's and each other.")
+ (keeper-timeout-duration
+ :initform 5
+ :initarg :keeper-timeout-duration
+ :reader keeper-timeout-duration-of
+ :type (integer 1 *)
+ :documentation "Wait period for keepers.")))
+
+(defmethod print-object ((patron patron) stream)
+ (print-unreadable-object (patron stream :identity t :type t)
+ (format
+ stream ":WORKER-CAPACITY ~A :JOB-CAPACITY ~A :WORKER-TIMEOUT-DURATION ~A"
+ (worker-capacity-of patron) (job-capacity-of patron)
+ (worker-timeout-duration-of patron))))
68 src/thread.lisp
@@ -0,0 +1,68 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :patron)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Thread & Locking Abstraction Layer
+;;;
+
+(defun lock-make ()
+ (bt:make-lock))
+
+(defmacro with-lock (lock &body body)
+ `(bt:with-lock-held (,lock) ,@body))
+
+(defun thread-start (thread)
+ (prog1 thread
+ (let ((semaphore (semaphore-make)))
+ (bt:make-thread
+ (lambda ()
+ (setf (id-of thread) (bt:current-thread))
+ (semaphore-signal semaphore)
+ (funcall (function-of thread))))
+ (semaphore-wait semaphore))))
+
+(defun thread-current ()
+ (bt:current-thread))
+
+(defun thread-alive-p (thread)
+ (bt:thread-alive-p (id-of thread)))
+
+(defun thread-interrupt (thread function)
+ (bt:interrupt-thread (id-of thread) function))
+
+(defun thread-join (thread)
+ #+ccl (ccl:join-process (id-of thread))
+ #+sbcl (bt:join-thread (id-of thread))
+ #-(or ccl sbcl) (error "`THREAD-JOIN' is not supported!"))
+
+(defmacro without-interrupts (&body body)
+ #+ccl `(ccl:without-interrupts ,@body)
+ #+sbcl `(sb-sys:without-interrupts ,@body)
+ #-(or ccl sbcl) (error "`WITHOUT-INTERRUPTS' is not supported!"))
65 src/timeout.lisp
@@ -0,0 +1,65 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :patron)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Timeout Abstraction Layer
+;;;
+
+#+sbcl
+(defun %with-timeout (duration body)
+ (with-unique-names (_duration)
+ `(let ((,_duration ,duration))
+ (handler-case (sb-ext:with-timeout ,_duration (progn ,@body))
+ (sb-ext:timeout () (error 'timeout-condition :duration ,_duration))))))
+
+#+ccl
+(defun %with-timeout (duration body)
+ (with-unique-names (_duration _semaphore _result _process)
+ `(let* ((,_duration ,duration)
+ (,_semaphore (ccl:make-semaphore))
+ (,_result)
+ (,_process
+ (ccl:process-run-function
+ ,(format nil "Timed Process ~S" _process)
+ (lambda ()
+ (setf ,_result (multiple-value-list (progn ,@body)))
+ (ccl:signal-semaphore ,_semaphore)))))
+ (cond ((ccl:timed-wait-on-semaphore ,_semaphore ,_duration)
+ (values-list ,_result))
+ (t
+ (ccl:process-kill ,_process)
+ (error 'timeout-condition :duration ,_duration))))))
+
+(defmacro with-timeout (duration &body body)
+ "Execute `BODY' for no more than specified `DURATION'. In case of timeout,
+function throws a `TIMEOUT-CONDITION'."
+ (unless (fboundp '%with-timeout)
+ (error "`WITH-TIMEOUT' is not supported!"))
+ (%with-timeout duration body))
54 src/utils.lisp
@@ -0,0 +1,54 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :patron)
+
+(defmacro when-let ((var val) &body body)
+ `(let ((,var ,val))
+ (when ,var
+ ,@body)))
+
+(defmacro prog1-let ((var val) &body body)
+ `(let ((,var ,val))
+ (prog1 ,var
+ ,@body)))
+
+(defmacro with-unique-names ((&rest bindings) &body body)
+ `(let ,(mapcar
+ (lambda (binding)
+ (check-type binding (or cons symbol))
+ (if (consp binding)
+ (destructuring-bind (var x) binding
+ (check-type var symbol)
+ `(,var
+ (gensym
+ ,(etypecase x
+ (symbol (symbol-name x))
+ (character (string x))
+ (string x)))))
+ `(,binding (gensym ,(symbol-name binding)))))
+ bindings)
+ ,@body))
176 src/worker.lisp
@@ -0,0 +1,176 @@
+;;; Copyright (c) 2009, Volkan YAZICI <volkan.yazici@gmail.com>
+;;; All rights reserved.
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions are met:
+
+;;; - Redistributions of source code must retain the above copyright notice,
+;;; this list of conditions and the following disclaimer.
+
+;;; - Redistributions in binary form must reproduce the above copyright notice,
+;;; this list of conditions and the following disclaimer in the documentation
+;;; and/or other materials provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+;;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+;;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+;;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+;;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+;;; SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+;;; CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+;;; ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+;;; POSSIBILITY OF SUCH DAMAGE.
+
+
+(in-package :patron)
+
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;;
+;;; Worker Routines
+;;;
+
+(defmethod shared-initialize :after ((worker worker) slot-names &rest initargs)
+ (declare (ignore slot-names initargs))
+ (prog1 worker
+ ;; Initialize `LAST-START-TIME' and `LAST-FINISH-TIME' slots.
+ (let ((time (get-universal-time)))
+ (setf (last-start-time-of worker) time
+ (last-finish-time-of worker) time))))
+
+(defun worker (worker patron)
+ "Worker function to execute the next available job in the queue. Function
+infinitely tries to pop `JOB' from the queue until it receives a `NIL' job.
+
+During every job processing iteration, function resets `LAST-START-TIME',
+`LAST-FINISH-TIME', `N-FAILURES', `FAIL-DURATION', `BUSY-DURATION', and
+`IDLE-DURATION' slots of the `WORKER' accordingly.
+
+`START-TIME' and `FINISH-TIME' slots of the `JOB' is assigned respectively
+before and after the execution of the `FUNCTION' slot of the `JOB'. After
+execution, if there doesn't occur any errors, `RESULT' slot of the `JOB' is set
+accordingly and `RESULT-REPORT-FUNCTION' is called with `JOB' as argument. In
+case of an error, `CONDITION' slot is set and `ERROR-REPORT-FUNCTION' is
+called."
+ (loop for job = (queue-pop (jobs-of patron))
+ while job
+ do (progn
+ ;; Update `IDLE-DURATION', `LAST-START-TIME', and `START-TIME'
+ ;; slots.
+ (without-interrupts
+ (let ((time (get-universal-time)))
+ (incf (idle-duration-of worker)
+ (- time (last-finish-time-of worker)))
+ (setf (last-start-time-of worker) time
+ (start-time-of job) time)))
+ (handler-case
+ (progn
+ ;; Run `FUNCTION' with time boundary.
+ (setf (result-of job)
+ (with-timeout (worker-timeout-duration-of patron)
+ (funcall (function-of job))))
+ ;; Update `LAST-FINISH-TIME', `BUSY-DURATION', and
+ ;; `FINISH-TIME' slots.
+ (without-interrupts
+ (let ((time (get-universal-time)))
+ (incf (busy-duration-of worker)
+ (- time (last-start-time-of worker)))
+ (setf (last-finish-time-of worker) time
+ (finish-time-of job) time)))
+ ;; Submit the result.
+ (funcall (result-report-function-of job) job))
+ ;; Possible failure reasons: timeout condition or runtime error.
+ (error (c)
+ (without-interrupts
+ ;; We could have missed updating `LAST-FINISH-TIME', if so,
+ ;; do it now.
+ (unless (< (last-start-time-of worker)
+ (last-finish-time-of worker))
+ (setf (last-finish-time-of worker) (get-universal-time)))
+ ;; Update `N-FAILURES', `FAIL-DURATION', and `BUSY-DURATION'
+ ;; slots.
+ (incf (n-failures-of worker))
+ (let ((duration
+ (- (last-finish-time-of worker)
+ (last-start-time-of worker))))
+ (incf (fail-duration-of worker) duration)
+ (incf (busy-duration-of worker) duration)))
+ ;; Set the `CONDITION' slot and submit the error.
+ (setf (condition-of job) c)
+ (funcall (error-report-function-of job) job))))))
+
+(defun make-worker (patron)
+ (prog1-let (worker (make-instance 'worker))
+ (setf (function-of worker) (lambda () (worker worker patron)))))
+
+(defun wait-worker (worker)
+ (thread-join worker))
+
+(defun kill-worker (worker)
+ (thread-interrupt
+ worker
+ (lambda ()
+ ;; This condition should be catched by the `ERROR' handler of the main
+ ;; `HANDLER-CASE' expression in `WORKER' function appropriately.
+ (error 'kill-condition))))
+
+(defun start-workers (patron)
+ "Fills `WORKERS' and `JOBS' slots of the given `PATRON' appropriately and
+spawns workers. Function returns supplied `PATRON'."
+ (prog1 patron
+ ;; TODO: Instead of allocating a job queue and a workers array from scratch
+ ;; everytime, try to use existing objects if there is any.
+ (setf (jobs-of patron) (make-instance 'queue :size (job-capacity-of patron))
+ (workers-of patron) (make-array (list (worker-capacity-of patron))))
+ (dotimes (i (worker-capacity-of patron))
+ (let ((i i)) (setf (elt (workers-of patron) i) (make-worker patron))))
+ (map nil #'thread-start (workers-of patron))))
+
+(defun stop-workers (patron &key kill wait)
+ "Stops workers by pushing `NIL' jobs to the queue as much as total # of
+workers. Function blocks until there is enough space in the job queue to push
+dummy `NIL's. Function finally returns supplied `PATRON'.
+
+If `KILL' is true, function will try to terminate every worker that is still
+alive and report jobs about the situation via `ERROR-SUBMIT-FUNCTION'.
+`CONDITION' slot of the `JOB' will set to `KILL-CONDITION'.
+
+If `WAIT' is true, function will wait (at most `WORKER-TIMEOUT-DURATION') for
+`WORKER's to exit."
+ (prog1 patron
+ (loop repeat (length (workers-of patron))
+ do (queue-push (jobs-of patron) nil))
+ (when kill (map nil #'kill-worker (workers-of patron)))
+ (when wait (map nil #'wait-worker (workers-of patron)))))
+
+(defun worker-stats (patron)
+ "Returns a property list of minimum, maximum, and average statistics of
+`N-FAILURES', `FAIL-DURATION', `BUSY-DURATION', and `IDLE-DURATION' slots among
+workers. Function blocks job queue while gathering statistics."
+ (with-blocking-queue-operations (jobs-of patron)
+ (loop for worker across (workers-of patron)
+ collect (n-failures-of worker) into n-failures
+ collect (fail-duration-of worker) into fail-durations
+ collect (busy-duration-of worker) into busy-durations
+ collect (idle-duration-of worker) into idle-durations
+ finally (return
+ (let ((n-workers (length (workers-of patron))))
+ (list :min-failures (reduce #'min n-failures)
+ :max-failures (reduce #'max n-failures)
+ :avg-n-failures (/ (reduce #'+ n-failures)
+ n-workers)
+ :sum-n-failures (reduce #'+ n-failures)
+ :min-fail-durations (reduce #'min fail-durations)
+ :max-fail-durations (reduce #'max fail-durations)
+ :avg-fail-durations (/ (reduce #'+ fail-durations)
+ n-workers)
+ :min-busy-durations (reduce #'min busy-durations)
+ :max-busy-durations (reduce #'max busy-durations)
+ :avg-busy-durations (/ (reduce #'+ busy-durations)
+ n-workers)
+ :min-idle-durations (reduce #'min idle-durations)
+ :max-idle-durations (reduce #'max idle-durations)
+ :avg-idle-durations (/ (reduce #'+ idle-durations)
+ n-workers)))))))

0 comments on commit 6f662d3

Please sign in to comment.