Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

read/write locking code. not complete, just an end of day check-in.

  • Loading branch information...
commit 955f38675df9217d95718d77b638b670daf84130 1 parent 3739685
@kraison authored
View
2  globals.lisp
@@ -4,6 +4,8 @@
(defparameter *store-table* (make-hash-table :synchronized t :test 'eql))
(defparameter *namespaces* (make-hash-table :synchronized t :test 'equalp))
+(defparameter *read-uncommitted* t)
+
;; Graphs
(defvar *graph* nil)
(defvar *graph-table* nil)
View
13 hash-table.lisp
@@ -0,0 +1,13 @@
+(in-package #:vivace-graph-v2)
+
+(defun acquire-hash-table-lock (hash-table)
+ (if (eq sb-thread:*current-thread*
+ (sb-thread:spinlock-value (sb-impl:hash-table-spinlock hash-table)))
+ t
+ (sb-thread:my-get-spinlock (sb-impl:hash-table-spinlock hash-table))))
+
+(defun release-hash-table-lock (hash-table)
+ (if (eq sb-thread:*current-thread*
+ (sb-thread:spinlock-value (sb-impl:hash-table-spinlock hash-table)))
+ (sb-thread:release-spinlock (sb-impl:hash-table-spinlock hash-table))))
+
View
137 lock.lisp
@@ -0,0 +1,137 @@
+(in-package #:vivace-graph-v2)
+
+(defstruct (rw-lock
+ (:conc-name lock-)
+ (:predicate rw-lock?))
+ (lock (sb-thread:make-mutex) :type sb-thread:mutex)
+ (readers 0 :type integer)
+ (semaphore (sb-thread:make-semaphore) :type sb-thread:semaphore)
+ (writer-queue (make-empty-queue) :type queue)
+ (writer nil)
+ (waitqueue (sb-thread:make-waitqueue) :type sb-thread:waitqueue))
+
+(defun lock-unused? (rw-lock)
+ (sb-thread:with-recursive-lock ((lock-lock rw-lock))
+ (and (= 0 (lock-readers rw-lock))
+ (= 0 (sb-thread:semaphore-count (lock-semaphore rw-lock)))
+ (null (lock-writer rw-lock))
+ (empty-queue? (lock-writer-queue rw-lock)))))
+
+(defun release-read-lock (rw-lock)
+ (sb-thread:with-recursive-lock ((lock-lock rw-lock))
+ (assert (not (eql 0 (lock-readers rw-lock))))
+ (when (eql 0 (decf (lock-readers rw-lock)))
+ (when (lock-writer rw-lock)
+ (sb-thread:signal-semaphore (lock-semaphore rw-lock))))))
+
+(defun acquire-read-lock (rw-lock &key (max-tries 1000))
+ (loop for tries from 0 to max-tries do
+ (sb-thread:with-recursive-lock ((lock-lock rw-lock))
+ (if (lock-writer rw-lock)
+ (condition-wait (lock-waitqueue rw-lock) (lock-lock rw-lock))
+ (progn
+ (incf (lock-readers rw-lock))
+ (return-from acquire-read-lock rw-lock))))))
+
+(defmacro with-read-lock ((rw-lock) &body body)
+ `(unwind-protect
+ (if (rw-lock? (acquire-read-lock ,rw-lock))
+ (progn ,@body)
+ (error "Unable to get rw-lock: ~A" ,rw-lock))
+ (release-read-lock ,rw-lock)))
+
+(defun release-write-lock (rw-lock &key reading-p)
+ (sb-thread:with-recursive-lock ((lock-lock rw-lock))
+ (if (eq sb-thread:*current-thread* (queue-front (lock-writer-queue rw-lock)))
+ (dequeue (lock-writer-queue rw-lock))
+ (error "Cannot release lock I don't own!"))
+ (if (eq (queue-front (lock-writer-queue rw-lock)) sb-thread:*current-thread*)
+ (format t "Not releasing lock; recursive ownership detected!~%")
+ (progn
+ (setf (lock-writer rw-lock) nil)
+ (when reading-p
+ (incf (lock-readers rw-lock)))
+ (sb-thread:condition-broadcast (lock-waitqueue rw-lock))))))
+
+(defun next-in-queue? (rw-lock thread)
+ (sb-thread:with-recursive-lock ((lock-lock rw-lock))
+ (eq thread (queue-front (lock-writer-queue rw-lock)))))
+
+(defun acquire-write-lock (rw-lock &key (max-tries 1000) reading-p)
+ (sb-thread:with-recursive-lock ((lock-lock rw-lock))
+ (if (and (next-in-queue? rw-lock sb-thread:*current-thread*)
+ (eq (lock-writer rw-lock) sb-thread:*current-thread*))
+ (progn
+ (enqueue-front (lock-writer-queue rw-lock) sb-thread:*current-thread*)
+ (return-from acquire-write-lock rw-lock))
+ (enqueue (lock-writer-queue rw-lock) sb-thread:*current-thread*)))
+ (loop for tries from 0 to max-tries do
+ (if (eq (lock-writer rw-lock) sb-thread:*current-thread*)
+ (return-from acquire-write-lock rw-lock)
+ (let ((wait-p nil))
+ (handler-case
+ (sb-thread:with-recursive-lock ((lock-lock rw-lock))
+ (if (and (null (lock-writer rw-lock))
+ (next-in-queue? rw-lock sb-thread:*current-thread*))
+ (progn
+ (setf (lock-writer rw-lock) sb-thread:*current-thread*)
+ (when reading-p
+ (decf (lock-readers rw-lock)))
+ (unless (eql 0 (lock-readers rw-lock))
+ (setf wait-p t)))
+ (sb-thread:condition-wait
+ (lock-waitqueue rw-lock) (lock-lock rw-lock))))
+ (error (c)
+ (format t "Got error ~A while acquiring write lock ~A" c rw-lock)))
+ (when wait-p
+ (sb-thread:wait-on-semaphore (lock-semaphore rw-lock)))))))
+
+(defmacro with-write-lock ((rw-lock) &body body)
+ `(unwind-protect
+ (if (rw-lock? (acquire-write-lock ,rw-lock))
+ (progn ,@body)
+ (error "Unable to get rw-lock: ~A" ,rw-lock))
+ (release-write-lock ,rw-lock)))
+
+(defstruct (lock-pool
+ (:constructor %make-lock-pool)
+ (:predicate lock-pool?))
+ (lock (make-recursive-lock))
+ (queue (sb-concurrency:make-queue))
+ (acquired-locks (make-hash-table :synchronized t))
+ (size 20))
+
+(defun make-lock-pool (size)
+ (let ((pool (%make-lock-pool :size size)))
+ (dotimes (i size)
+ (sb-concurrency:enqueue (make-rw-lock) (lock-pool-queue pool)))
+ pool))
+
+(defun change-lock-pool-size (pool new-size)
+ (cond ((> new-size (lock-pool-size pool))
+ (sb-thread:with-recursive-lock ((lock-pool-lock pool))
+ (cas (lock-pool-size pool) (lock-pool-size pool) new-size)
+ (dotimes (i (- new-size (lock-pool-size pool)))
+ (sb-concurrency:enqueue (make-rw-lock) (lock-pool-queue pool)))))
+ ((< new-size (lock-pool-size pool))
+ (error "Cannot shrink lock pool size")))
+ new-size)
+
+(defun release-pool-lock (pool lock)
+ (if (remhash lock (lock-pool-acquired-locks pool))
+ (sb-concurrency:enqueue lock (lock-pool-queue pool))
+ (error "Lock ~A not in acquired-locks list" lock)))
+
+(defun get-pool-lock (pool &key (wait-p t) timeout)
+ (let ((start-time (gettimeofday)))
+ (loop
+ (let ((lock (sb-concurrency:dequeue (lock-pool-queue pool))))
+ (if (rw-lock? lock)
+ (progn
+ (setf (gethash lock (lock-pool-acquired-locks pool)) t)
+ (return-from get-pool-lock lock))
+ (if wait-p
+ (if (and timeout (> (gettimeofday) (+ start-time timeout)))
+ (return-from get-pool-lock nil)
+ (sleep 0.000000001))
+ (return-from get-pool-lock nil)))))))
View
4 sb-impl.lisp
@@ -0,0 +1,4 @@
+(in-package #:sb-impl)
+
+(export 'hash-table-spinlock (find-package 'sb-impl))
+
View
34 sb-thread.lisp
@@ -0,0 +1,34 @@
+(in-package #:sb-thread)
+
+(export 'get-spinlock (find-package 'sb-thread))
+(export 'my-get-spinlock (find-package 'sb-thread))
+(export 'release-spinlock (find-package 'sb-thread))
+(export 'spinlock-value (find-package 'sb-thread))
+
+(defun my-get-spinlock (spinlock)
+ (declare (optimize (speed 3) (safety 0)))
+ (let* ((new *current-thread*)
+ (old (sb-ext:compare-and-swap (spinlock-value spinlock) nil new)))
+ (when old
+ (when (eq old new)
+ (error "Recursive lock attempt on ~S." spinlock))
+ (flet ((cas ()
+ (if (sb-ext:compare-and-swap (spinlock-value spinlock) nil new)
+ (progn
+ (sleep 0.000000001)
+ (thread-yield))
+ (return-from my-get-spinlock t))))
+ (if (and (not *interrupts-enabled*) *allow-with-interrupts*)
+ ;; If interrupts are disabled, but we are allowed to
+ ;; enabled them, check for pending interrupts every once
+ ;; in a while. %CHECK-INTERRUPTS is taking shortcuts, make
+ ;; sure that deferrables are unblocked by doing an empty
+ ;; WITH-INTERRUPTS once.
+ (progn
+ (with-interrupts)
+ (loop
+ (loop repeat 128 do (cas)) ; 128 is arbitrary here
+ (sb-unix::%check-interrupts)))
+ (loop (cas)))))
+ t))
+
View
65 store.lisp
@@ -4,14 +4,7 @@
((name :initarg :name :accessor store-name)))
(defclass local-triple-store (triple-store)
- (;;(spogi-idx :initarg :spogi-idx :accessor spogi-idx)
- ;;(posgi-idx :initarg :posgi-idx :accessor posgi-idx)
- ;;(ospgi-idx :initarg :ospgi-idx :accessor ospgi-idx)
- ;;(gspoi-idx :initarg :gspoi-idx :accessor gspoi-idx)
- ;;(gposi-idx :initarg :gposi-idx :accessor gposi-idx)
- ;;(gospi-idx :initarg :gospi-idx :accessor gospi-idx)
- ;;(id-idx :initarg :id-idx :accessor id-idx)
- (main-idx :initarg :main-idx :accessor main-idx)
+ ((main-idx :initarg :main-idx :accessor main-idx)
(text-idx :initarg :text-idx :accessor text-idx)
(log-mailbox :initarg :log-mailbox :accessor log-mailbox)
(index-queue :initarg :index-queue :accessor index-queue)
@@ -19,6 +12,8 @@
(indexed-predicates :initarg :indexed-predicates :accessor indexed-predicates)
(templates :initarg :templates :accessor templates)
(location :initarg :location :accessor location)
+ (lock-pool :initarg :lock-pool :accessor lock-pool)
+ (locks :initarg :locks :accessor locks)
(logger-thread :initarg :logger-thread :accessor logger-thread)))
(defclass remote-triple-store (triple-store)
@@ -39,19 +34,14 @@
(maphash #'(lambda (k v) (when v (push k result))) (indexed-predicates store))
(sort result #'string>)))
-(defun make-fresh-store (name location)
+(defun make-fresh-store (name location &key (num-locks 20))
(let ((store
(make-instance 'local-triple-store
:name name
:location location
:main-idx (make-hierarchical-index)
- ;;:spogi-idx (make-hierarchical-index)
- ;;:posgi-idx (make-hierarchical-index)
- ;;:ospgi-idx (make-hierarchical-index)
- ;;:gspoi-idx (make-hierarchical-index)
- ;;:gposi-idx (make-hierarchical-index)
- ;;:gospi-idx (make-hierarchical-index)
- ;;:id-idx (make-uuid-table :synchronized t)
+ :lock-pool (make-lock-pool num-locks)
+ :locks (make-hash-table :synchronized t :test 'equal)
:text-idx (make-skip-list :key-equal 'equalp
:value-equal 'uuid:uuid-eql
:duplicates-allowed? t)
@@ -123,3 +113,46 @@
(defun add-to-delete-queue (thing &optional (store *store*))
(sb-concurrency:enqueue thing (delete-queue store)))
+(defun lock-pattern (subject predicate object graph &key (kind :write) (store *store*))
+ (let ((lock nil) (pattern (list subject predicate object graph)))
+ (sb-ext:with-locked-hash-table ((locks store))
+ (setq lock
+ (or (gethash pattern (locks store))
+ (setf (gethash pattern store) (get-pool-lock (lock-pool store))))))
+ (if (rw-lock? lock)
+ (if (eq kind :write)
+ (acquire-write-lock lock)
+ (acquire-read-lock lock))
+ (error "Unable to get lock for ~A" pattern))))
+
+(defun lock-triple (triple &key (kind :write) (store *store*))
+ (lock-pattern (subject triple)
+ (predicate triple)
+ (object triple)
+ (graph triple)
+ :kind kind
+ :store store))
+
+(defun unlock-pattern (subject predicate object graph &key kind (store *store*))
+ (let ((pattern (list subject predicate object graph)))
+ (sb-ext:with-locked-hash-table ((locks store))
+ (let ((lock (gethash pattern (locks store))))
+ (when (rw-lock? lock)
+ (sb-thread:with-recursive-lock ((lock-lock lock))
+ (case kind
+ (:write (release-write-lock lock))
+ (:read (release-read-lock lock)))
+ (when (lock-unused? lock)
+ (remhash pattern (locks store))
+ (release-pool-lock (lock-pool store) lock))))))))
+
+(defun unlock-triple (triple &key kind (store *store*))
+ (funcall #'unlock-pattern
+ (subject triple)
+ (predicate triple)
+ (object triple)
+ (graph triple)
+ :kind kind
+ :store store))
+
+
View
31 transaction.lisp
@@ -177,6 +177,15 @@
(logger :err "Unhandled error in tx logger for ~A: ~A" store condition))))
:name (format nil "tx-log thread for ~A" store)))
+(defun release-all-locks (tx)
+ (sb-ext:with-locked-hash-table ((locks *store*))
+ (dolist (pair (tx-locks tx))
+ (destructuring-bind (pattern lock) pair
+ (when (= 0 (lock-readers lock))
+ (unlock-pattern pattern)
+ (release-pool-lock (lock-pool *store*) lock)
+ (release-write-lock lock))
+
(defmacro with-graph-transaction ((store) &body body)
(with-gensyms (success retries condition)
`(let ((,success nil) (,retries 0))
@@ -190,14 +199,16 @@
(error "Transactions cannot currently span multiple stores."))
(t
(let ((*current-transaction* (make-transaction :store ,store)))
- ;; Global serialization is not ideal. Lock triples as we go?
- (with-locked-index ((main-idx ,store))
- (prog1
- (handler-case
- (atomic-op)
- (error (,condition)
- (incf ,retries)
- ,condition)) ;; FIXME: add rollback and/or retry.
+ (prog1
+ (unwind-protect
+ (handler-case
+ (progn
+ (atomic-op)
+ (setf ,success t))
+ (error (,condition)
+ (incf ,retries)
+ ,condition)) ;; FIXME: add rollback and/or retry.
+ (release-all-locks *current-transaction*))
+ (when ,success
(sb-concurrency:send-message (log-mailbox ,store)
- *current-transaction*)
- (setf ,success t))))))))))
+ *current-transaction*))))))))))
View
1  triples.lisp
@@ -75,6 +75,7 @@
(defun index-triple (triple &optional (store *store*))
(with-graph-transaction (store)
+ (pushnew (lock-triple triple :kind :write) (tx-locks *current-transaction*))
(add-to-index (main-idx store) triple :id-idx (id triple))
(add-to-index (main-idx store) (id triple) :gspoi-idx
(graph triple) (subject triple) (predicate triple) (object triple))
View
42 utilities.lisp
@@ -220,3 +220,45 @@ containing the whole rest of the given STRING, if any."
(let ((it ,val)) ,@(cdr cl1))
(acond2 ,@(cdr clauses)))))))
+;; The following queueing code was borrowed and adapted from Russell & Norvig's
+;; "Introduction to AI"
+(defun print-queue (q stream depth)
+ (declare (ignore depth))
+ (format stream "<QUEUE: ~a>" (queue-elements q)))
+
+(defstruct (queue
+ (:print-function print-queue))
+ (key #'identity)
+ (last nil)
+ (elements nil))
+
+(defun make-empty-queue () (make-queue))
+
+(defun empty-queue? (q)
+ (= (length (queue-elements q)) 0))
+
+(defun queue-front (q)
+ (elt (queue-elements q) 0))
+
+(defun dequeue (q)
+ (when (listp (queue-elements q))
+ (pop (queue-elements q))))
+
+(defun enqueue-front (q &rest items)
+ (cond ((null items) nil)
+ ((or (null (queue-last q)) (null (queue-elements q)))
+ (setf (queue-elements q) (nconc items (queue-elements q))
+ (queue-last q) (last (queue-elements q))))
+ (t (setf (queue-elements q) (nconc items (queue-elements q))))))
+
+(defun enqueue (q &rest items)
+ (cond ((null items) nil)
+ ((or (null (queue-last q)) (null (queue-elements q)))
+ (setf (queue-last q) (last items)
+ (queue-elements q) (nconc (queue-elements q) items)))
+ (t (setf (cdr (queue-last q)) items
+ (queue-last q) (last items)))))
+
+(defun queue-length (q)
+ (length (queue-elements q)))
+;; End of adapted code
View
6 vivace-graph-v2.asd
@@ -27,13 +27,17 @@
:cl-js
:cl-json)
:components ((:file "uuid")
+ (:file "sb-impl")
+ (:file "sb-thread")
(:file "vivace-graph-v2-package" :depends-on ("uuid"))
+ (:file "hash-table"
+ :depends-on ("vivace-graph-v2-package" "sb-impl" "sb-thread"))
(:file "gettimeofday" :depends-on ("vivace-graph-v2-package"))
(:file "conditions" :depends-on ("vivace-graph-v2-package"))
(:file "constants" :depends-on ("conditions"))
(:file "globals" :depends-on ("constants"))
(:file "utilities" :depends-on ("globals"))
- (:file "lock" :depends-on ("utilities"))
+ (:file "lock" :depends-on ("utilities" "hash-table"))
(:file "data-types" :depends-on ("lock"))
(:file "certainty-factors" :depends-on ("constants"))
(:file "serialize" :depends-on ("data-types"))
Please sign in to comment.
Something went wrong with that request. Please try again.