Permalink
Browse files

optimize node handling.

  • Loading branch information...
1 parent 5796232 commit 6bce97fd457b8852406c7c3b5be180badd18cd5d @kraison committed Sep 10, 2010
Showing with 329 additions and 211 deletions.
  1. +53 −0 binary-protocol.lisp
  2. +3 −3 constants.lisp
  3. +2 −1 gc.lisp
  4. +1 −1 graph.lisp
  5. +5 −5 interface.lisp
  6. +4 −4 json-functions.lisp
  7. +4 −42 listener.lisp
  8. +19 −17 node.lisp
  9. +8 −4 prolog-functors.lisp
  10. +10 −8 prologc.lisp
  11. +28 −0 server.lisp
  12. +19 −22 session.lisp
  13. +23 −12 test.lisp
  14. +7 −7 triples-interface.lisp
  15. +139 −82 triples.lisp
  16. +4 −3 vivace-graph.asd
View
@@ -0,0 +1,53 @@
+(in-package #:vivace-graph)
+
+(defparameter +max-bytes+ 40960)
+(defparameter +magic-byte+ #x2A)
+(defparameter *query-types* (make-hash-table))
+(setf (gethash #x00 *query-types*) 'select
+ (gethash #x01 *query-types*) 'select-flat
+ (gethash #x02 *query-types*) 'select-bind-list
+ (gethash #x03 *query-types*) '<-
+ (gethash #x04 *query-types*) 'insert
+ (gethash #x05 *query-types*) 'do-query
+ (gethash #xFF *query-types*) 'quit)
+
+(defun decode-and-execute (vec)
+ (let ((query-type (gethash (aref vec 0) *query-types*)))
+ (logger :debug "Got query type of ~A for byte ~A" query-type (aref vec 0))
+ (values nil 0 t)))
+
+(defun extract-query-length (stream)
+ (let ((int 0))
+ (dotimes (i 4)
+ (setq int (dpb (read-byte stream nil 0) (byte 8 (* i 8)) int)))
+ int))
+
+(defmethod data-received-handler ((session v-session))
+ (let ((buffer (v-session-buffer session))
+ (magic-byte (read-byte (v-session-stream session) nil :eof)))
+ (if (eql magic-byte +magic-byte+)
+ (let ((query-length (extract-query-length (v-session-stream session))))
+ (setf (fill-pointer buffer) 0)
+ (dotimes (i query-length)
+ (let ((byte (read-byte (v-session-stream session) nil :eof)))
+ (if (eq byte :eof)
+ (progn
+ (setf (v-session-finished? session) t)
+ (return-from data-received-handler
+ "Got EOF on stream before all bytes were read."))
+ (vector-push-extend byte buffer))))
+ (multiple-value-bind (response rlen quit?) (decode-and-execute buffer)
+ (unwind-protect
+ (progn
+ (when (pointerp response)
+ (dotimes (i rlen)
+ (logger :debug "Sending byte ~A: ~A to client" i
+ (mem-aref response :unsigned-char i))
+ (write-byte (mem-aref response :unsigned-char i)
+ (v-session-stream session)))
+ (force-output (v-session-stream session)))
+ (when quit? (setf (v-session-finished? session) t)))
+ (when (pointerp response) (foreign-free response)))))
+ (progn
+ (setf (v-session-finished? session) t)
+ (format nil "Invalid magic byte: ~A" magic-byte)))))
View
@@ -16,21 +16,21 @@
;; User-defined type identifiers for serializing. Start at 100
(defconstant +triple+ 101)
-(defconstant +node+ 102)
+;;(defconstant +node+ 102)
(defconstant +predicate+ 103)
(defconstant +rule+ 105)
;; Tags for sorting entry types in tokyo cabinet
(defconstant +triple-key+ 201)
-(defconstant +node-key+ 202)
+;;(defconstant +node-key+ 202)
(defconstant +predicate-key+ 209)
(defconstant +triple-subject+ 203)
(defconstant +triple-predicate+ 204)
(defconstant +triple-object+ 205)
(defconstant +triple-subject-predicate+ 206)
(defconstant +triple-subject-object+ 207)
(defconstant +triple-predicate-object+ 208)
-(defconstant +node-ref-count+ 209)
+;;(defconstant +node-ref-count+ 209)
(defconstant +deleted-triple-key+ 210)
(defconstant +text-index+ 211)
(defconstant +rule-key+ 212)
View
@@ -3,7 +3,8 @@
(defmethod vivace-gc ((graph graph))
(loop until (sb-concurrency:queue-empty-p (delete-queue graph)) do
(let ((item (sb-concurrency:dequeue (delete-queue graph))))
+ (logger :debug "vivace-gc handling item ~A for graph ~A" item graph)
(with-transaction ((triple-db graph))
(typecase item
- (node (if (= 0 (node-ref-count item)) (delete-node item)))
+ ;;(node (if (= 0 (node-ref-count item)) (delete-node item)))
(triple (triple-deleted? item) (shadow-triple item)))))))
View
@@ -20,7 +20,7 @@
(functors (make-hash-table :synchronized t))
(rule-cache (make-hash-table :synchronized t))
(predicate-cache (make-hash-table :synchronized t))
- (node-cache (make-hash-table :test 'equal :synchronized t))
+ ;;(node-cache (make-hash-table :test 'equal :synchronized t))
(triple-cache (make-hash-table :test 'equal :synchronized t))
(deleted-triple-cache (make-hash-table :test 'equal :synchronized t))
(production-pq (make-skip-pq :key-equal #'timestamp= :comparison #'timestamp>
View
@@ -3,11 +3,11 @@
(defun add-triple (subject predicate object &optional graph)
"Add a triple to the datastore. Subject, predicate and object can be nodes or atomic data types."
(let ((*graph* (or graph *graph*)))
- (let ((subject (or (lookup-node subject)
- (make-new-node :value subject)))
- (predicate (make-new-predicate :name predicate))
- (object (or (lookup-node object)
- (make-new-node :value object))))
+ (let ((predicate (make-new-predicate :name predicate)))
+; (subject (or (lookup-node subject)
+; (make-new-node :value subject)))
+; (object (or (lookup-node object)
+; (make-new-node :value object))))
(make-new-triple *graph* subject predicate object))))
(defun get-triples (&key s p o g (decode? t))
View
@@ -6,20 +6,20 @@
(defun json-eval (string)
(in-package #:vivace-graph)
(let ((form (read-from-string string)))
- (logger :debug "GOT FORM ~A" form)
+ (logger :debug "JSON-EVAL GOT FORM ~A" form)
(unless (valid-prolog-query? form)
(error "Error: '~A' is not a valid prolog query." string))
(eval form)))
(defun-ajax query (graph query) (*prolog-rpc*)
- (logger :debug "RPC TEST GOT: ~A / ~A" graph query)
+ (logger :debug "QUERY GOT: ~A / ~A" graph query)
(handler-case
(let ((*graph* (lookup-graph graph)))
(if (graph? *graph*)
(progn
- (logger :debug "GRAPH ~A IS ~A / QUERY IS ~A" graph *graph* query)
+ (logger :debug "QUERY: GRAPH ~A IS ~A / QUERY IS ~A" graph *graph* query)
(let ((result (json-eval query)))
- (logger :debug "Sending ~A" result)
+ (logger :debug "QUERY: Sending ~A" result)
(typecase result
(triple (as-list result))
(otherwise result))))
View
@@ -1,8 +1,6 @@
(in-package #:vivace-graph)
(defparameter *stop-listener* nil)
-(defparameter *eoc* 0)
-(defparameter +max-bytes+ 10240)
(define-condition listener-error (error)
((reason :initarg :reason))
@@ -11,53 +9,17 @@
error
(format stream "Listener error: ~A." reason)))))
-(defmethod data-received-handler ((session session))
- (let ((buffer (session-buffer session)))
- (do ((fin nil))
- (fin t)
- (let ((len
- (do ((byte (read-byte (session-stream session) nil :eof)
- (read-byte (session-stream session) nil :eof))
- (count 0 (incf count)))
- ((or (= (fill-pointer (session-buffer session)) +buflen+)
- (eq byte :eof))
- count)
- (vector-push-extend byte (session-buffer session))
- (if (eql byte *eoc*) (return count)))))
- (if (= 0 len)
- (setf fin t)
- (setf (fill-pointer buffer) len)))
- (cond ((= (length buffer) 0)
- (logger :debug "Got 0 bytes, closing session ~A" session)
- (setf fin t)
- (setf (session-finished? session) t))
- (fin
- (logger :debug "Got NIL for ~A, returning from data-received-handler" session)
- (setf (session-finished? session) t))
- ((and (= (fill-pointer (session-buffer session)) +buflen+)
- (not (= *eoc* (aref (session-buffer session)
- (1- (fill-pointer (session-buffer session)))))))
- (logger :crit "Buffer overrun attempt for ~A: ~A" session (session-buffer session))
- (setf (session-finished? session) t))
- (t
- (logger :debug "Read ~A bytes for ~A: ~A~%"
- (length buffer) session buffer)
- (dotimes (i (length buffer))
- (logger :debug "Sending byte ~A: ~A to client" i (aref buffer i))
- (write-byte (aref buffer i) (session-stream session)))
- (force-output (session-stream session)))))))
-
(defun client-loop ()
(handler-case
(progn
(loop until *stop-listener*
do
(let ((socket
- (usocket:wait-for-input (session-socket *session*) :ready-only t :timeout 1)))
+ (usocket:wait-for-input (v-session-socket *session*) :ready-only t :timeout 1)))
(when socket
(let ((status (data-received-handler *session*)))
(logger :debug "client-loop got status ~A" status)
- (if (session-finished? *session*) (return))))))
+ (if (v-session-finished? *session*) (return))))))
(shutdown-session *session*)
(logger :debug "Session ended normally: ~A" *session*))
(end-of-file (c)
@@ -77,8 +39,8 @@
(handler-case
(let ((stream (usocket:socket-stream socket)))
(setf *session* (start-session stream socket))
- (force-output (session-stream *session*))
- (setf (session-thread *session*) (current-thread))
+ (force-output (v-session-stream *session*))
+ (setf (v-session-thread *session*) (current-thread))
(initiate-session *session*))
(end-of-file (c)
(declare (ignore c))
View
@@ -1,5 +1,22 @@
(in-package #:vivace-graph)
+(defmethod make-anonymous-node-name ((uuid uuid:uuid))
+ (format nil "_anon:~A" uuid))
+
+(defun make-anonymous-node (&key graph (cache? t))
+ "Create a unique anonymous node."
+ (declare (ignore cache?))
+ (format nil "_anon:~A" (make-uuid)))
+; (let ((*graph* (or graph *graph*)))
+; (let* ((uuid (make-uuid))
+; (value (make-anonymous-node-name uuid)))
+; (let ((node (make-node :uuid uuid
+; :value value)))
+; (save-node node)
+; (when cache? (cache-node node))
+; node))))
+
+#|
(defstruct (node
(:conc-name %node-)
(:predicate node?))
@@ -91,20 +108,6 @@
(if (= 0 count) (sb-concurrency:enqueue node (delete-queue *graph*)))
count))
-(defmethod make-anonymous-node-name ((uuid uuid:uuid))
- (format nil "_anon:~A" uuid))
-
-(defun make-anonymous-node (&key graph (cache? t))
- "Create a unique anonymous node."
- (let ((*graph* (or graph *graph*)))
- (let* ((uuid (make-uuid))
- (value (make-anonymous-node-name uuid)))
- (let ((node (make-node :uuid uuid
- :value value)))
- (save-node node)
- (when cache? (cache-node node))
- node))))
-
(defun make-new-node (&key value graph (cache? t))
"Create a new node with value VALUE. Return the existing node if already present in the db."
(if (node? value)
@@ -124,11 +127,10 @@
(defmethod delete-node ((node node))
"Delete a node. Does nothing at the moment, as nodes are never deleted."
- (with-transaction ((triple-db *graph*))
- node))
+ node)
(defun make-new-node-unsafe (&key value)
(let ((node (make-node :value value)))
(save-node node)
(cache-node node)))
-
+|#
View
@@ -264,8 +264,10 @@ second arg."
(let ((old-trail (fill-pointer *trail*)))
(when (and (triple? triple) (not (triple-deleted? triple)))
(when (unify! p (pred-name (triple-predicate triple)))
- (when (unify! s (node-value (triple-subject triple)))
- (when (unify! o (node-value (triple-object triple)))
+; (when (unify! s (node-value (triple-subject triple)))
+; (when (unify! o (node-value (triple-object triple)))
+ (when (unify! s (triple-subject triple))
+ (when (unify! o (triple-object triple))
(funcall cont))))
(undo-bindings! old-trail)))))
triples)
@@ -274,8 +276,10 @@ second arg."
(let ((old-trail (fill-pointer *trail*)))
(when (not (triple-deleted? triples))
(when (unify! p (pred-name (triple-predicate triples)))
- (when (unify! s (node-value (triple-subject triples)))
- (when (unify! o (node-value (triple-object triples)))
+; (when (unify! s (node-value (triple-subject triples)))
+; (when (unify! o (node-value (triple-object triples)))
+ (when (unify! s (triple-subject triples))
+ (when (unify! o (triple-object triples))
(funcall cont))))
(undo-bindings! old-trail))))))))
View
@@ -2,18 +2,19 @@
;;;; Copyright (c) 1991 Peter Norvig
(in-package #:vivace-graph)
-(defparameter *prolog-trace* t)
+(defparameter *prolog-trace* nil)
(defstruct (var (:constructor ? ())
(:print-function print-var))
(name (incf *var-counter*))
(binding +unbound+))
(defmacro var-deref (exp)
- "Follow pointers for bound variables and dereference node values."
+ "Follow pointers for bound variables."
`(progn (loop while (and (var-p ,exp) (bound-p ,exp))
do (setf ,exp (var-binding ,exp)))
- (if (node? ,exp) (setf ,exp (node-value ,exp)) ,exp)))
+ ,exp))
+ ;;(if (node? ,exp) (setf ,exp (node-value ,exp)) ,exp)))
(defun print-var (var stream depth)
(if (or (and *print-level*
@@ -33,9 +34,9 @@ types that will be stored in the db.")
(:method ((x timestamp) (y timestamp)) (timestamp= x y))
(:method ((x timestamp) (y integer)) (= (timestamp-to-universal x) y))
(:method ((x integer) (y timestamp)) (= (timestamp-to-universal y) x))
- (:method ((x node) (y node)) (node-equal x y))
- (:method ((x node) y) (prolog-equal (node-value x) y))
- (:method (x (y node)) (prolog-equal x (node-value y)))
+ ;;(:method ((x node) (y node)) (node-equal x y))
+ ;;(:method ((x node) y) (prolog-equal (node-value x) y))
+ ;;(:method (x (y node)) (prolog-equal x (node-value y)))
(:method ((x triple) (y triple)) (triple-equal x y))
(:method ((x uuid:uuid) (y uuid:uuid)) (uuid:uuid-eql x y))
(:method (x y) (equal x y)))
@@ -79,8 +80,9 @@ types that will be stored in the db.")
(defmethod clause-head ((triple triple))
(list (pred-name (triple-predicate triple))
- (node-value (triple-subject triple))
- (node-value (triple-object triple))))
+ (triple-subject triple) (triple-object triple)))
+ ;;(node-value (triple-subject triple))
+ ;;(node-value (triple-object triple))))
(defmethod clause-head ((list list))
(first list))
View
@@ -14,3 +14,31 @@
(defun stop-json-rpc (&optional (server *prolog-server*))
(hunchentoot:stop server))
+(defun start-vivace-graph (&key (config-file "/etc/vivace-graph.ini"))
+ (unless (probe-file config-file)
+ (error "Configuration file ~A does not exist or is not readable." config-file)
+ (sb-ext:quit :unix-status 255))
+ (let ((config (py-configparser:make-config)))
+ (py-configparser:read-files config (list config-file))
+ (let ((json-port (py-configparser:get-option config "default" "json-rpc-port"))
+ (bin-port (py-configparser:get-option config "default" "binary-port"))
+ (slime-port (py-configparser:get-option config "default" "slime-port"))
+ (pid-file (py-configparser:get-option config "default" "pid-file")))
+ (when (probe-file pid-file)
+ (with-open-file (in pid-file)
+ (let ((pid (read-line in nil :eof)))
+ (error "PID file ~A exists. Am I already running as ~a?" pid-file pid)))
+ (sb-ext:quit :unix-status 255))
+ (handler-case
+ (start-json-rpc :port (parse-integer json-port))
+ (SB-INT:SIMPLE-PARSE-ERROR (c)
+ (declare (ignore c))
+ (format t "JSON-RPC port '~A' is not a valid integer" json-port)
+ (sb-ext:quit :unix-status 255)))
+ (handler-case
+ (start-binary-listener :port (parse-integer bin-port))
+ (SB-INT:SIMPLE-PARSE-ERROR (c)
+ (declare (ignore c))
+ (format t "Binary port '~A' is not a valid integer" bin-port)
+ (sb-ext:quit :unix-status 255)))
+ config)))
Oops, something went wrong.

0 comments on commit 6bce97f

Please sign in to comment.