Skip to content

Commit

Permalink
begin binary protocol and client. move away from Montezuma for full t…
Browse files Browse the repository at this point in the history
…ext indexing (not thread safe).
  • Loading branch information
kraison committed Sep 22, 2010
1 parent 6bce97f commit 6314bda
Show file tree
Hide file tree
Showing 16 changed files with 574 additions and 181 deletions.
92 changes: 56 additions & 36 deletions binary-protocol.lisp
Expand Up @@ -2,51 +2,71 @@


(defparameter +max-bytes+ 40960) (defparameter +max-bytes+ 40960)
(defparameter +magic-byte+ #x2A) (defparameter +magic-byte+ #x2A)
(defparameter *query-types* (make-hash-table)) (defparameter *response-types* (make-hash-table))
(setf (gethash #x00 *query-types*) 'select (setf (gethash #x00 *response-types*) :success
(gethash #x01 *query-types*) 'select-flat (gethash #x01 *response-types*) :success-no-results
(gethash #x02 *query-types*) 'select-bind-list (gethash #x02 *response-types*) :error
(gethash #x03 *query-types*) '<- (gethash #x03 *response-types*) :retry
(gethash #x04 *query-types*) 'insert (gethash #x04 *response-types*) :authentication-error
(gethash #x05 *query-types*) 'do-query (gethash #xFF *response-types*) :quit)
(gethash #xFF *query-types*) 'quit)

(defun decode-and-execute (vec)
(let ((query-type (gethash (aref vec 0) *query-types*)))
(logger :debug "Got query type of ~A for byte ~A" query-type (aref vec 0))
(values nil 0 t)))


(defun extract-query-length (stream) (defun extract-query-length (stream)
(let ((int 0)) (let ((int 0))
(dotimes (i 4) (dotimes (i 4)
(setq int (dpb (read-byte stream nil 0) (byte 8 (* i 8)) int))) (setq int (dpb (read-byte stream nil 0) (byte 8 (* i 8)) int)))
int)) int))


(defmethod decode-and-execute ((session v-session))
"Decode and execute a binary query. First, extract the query length (bytes 1 - 4), then the query
type. Query types are specified by the 5th byte read from the wire:
#x00 'select
#x01 'select-flat
#x02 '<-
#x03 'insert
#x04 'do-query
#xFF 'quit
Returns 4 values: a header pointer and length and a response pointer and length. Both pointers
must be freed by the calling function!"
(setf (v-session-buffer-length session) (extract-query-length (v-session-stream session)))
(case (read-byte (v-session-stream session) nil :eof)
(#x00 (binary-select session))
(#x01 (binary-select-flat session))
(#x02 (binary-<- session))
(#x03 (binary-insert session))
(#x04 (binary-do-query session))
(#xFF
(values
(foreign-alloc :unsigned-char :count 2 :initial-contents `(,+magic-byte+ #xFF)) 2
(foreign-alloc :unsigned-char :count 1 :initial-element #xFF) 1))
(:eof (values nil 0 nil 0 t))
(otherwise
(values
(foreign-alloc :unsigned-char :count 2 :initial-contents `(,+magic-byte+ #x02)) 2
(foreign-alloc :unsigned-char :count 1 :initial-element #xFF) 1)))) ;; FIXME: make error msg

(defmethod data-received-handler ((session v-session)) (defmethod data-received-handler ((session v-session))
(let ((buffer (v-session-buffer session)) (let ((magic-byte (read-byte (v-session-stream session) nil :eof)))
(magic-byte (read-byte (v-session-stream session) nil :eof)))
(if (eql magic-byte +magic-byte+) (if (eql magic-byte +magic-byte+)
(let ((query-length (extract-query-length (v-session-stream session)))) (multiple-value-bind (header hlen response rlen quit?) (decode-and-execute session)
(setf (fill-pointer buffer) 0) (unwind-protect
(dotimes (i query-length) (progn
(let ((byte (read-byte (v-session-stream session) nil :eof))) (when (pointerp header)
(if (eq byte :eof) (dotimes (i hlen)
(progn (logger :debug "Sending header byte ~A: ~A to client" i
(setf (v-session-finished? session) t) (mem-aref header :unsigned-char i))
(return-from data-received-handler (write-byte (mem-aref response :unsigned-char i)
"Got EOF on stream before all bytes were read.")) (v-session-stream session)))
(vector-push-extend byte buffer)))) (force-output (v-session-stream session)))
(multiple-value-bind (response rlen quit?) (decode-and-execute buffer) (when (pointerp response)
(unwind-protect (dotimes (i rlen)
(progn (logger :debug "Sending data byte ~A: ~A to client" i
(when (pointerp response) (mem-aref response :unsigned-char i))
(dotimes (i rlen) (write-byte (mem-aref response :unsigned-char i)
(logger :debug "Sending byte ~A: ~A to client" i (v-session-stream session)))
(mem-aref response :unsigned-char i)) (force-output (v-session-stream session)))
(write-byte (mem-aref response :unsigned-char i) (when quit? (setf (v-session-finished? session) t)))
(v-session-stream session))) (progn
(force-output (v-session-stream session))) (when (pointerp header) (foreign-free header))
(when quit? (setf (v-session-finished? session) t)))
(when (pointerp response) (foreign-free response))))) (when (pointerp response) (foreign-free response)))))
(progn (progn
(setf (v-session-finished? session) t) (setf (v-session-finished? session) t)
Expand Down
33 changes: 33 additions & 0 deletions client-globals.lisp
@@ -0,0 +1,33 @@
(in-package #:vivace-graph-client)

(defparameter +max-bytes+ 40960)
(defparameter +magic-byte+ #x2A)
(defparameter +version+ #x01)
(defparameter +ack+ #x01)
(defparameter +success+ #x00)
(defparameter +success-no-results+ #x01)
(defparameter +error+ #x02)
(defparameter +retry+ #x03)
(defparameter +authentication-error+ #x04)
(defparameter +success+ #x05)
(defparameter +hello+ #x06)
(defparameter +quit+ #xFF)

(defparameter +select+ #x00)
(defparameter +select-flat+ #x01)
(defparameter +<-+ #x02)
(defparameter +insert+ #x03)
(defparameter +do-query+ #x04)

(define-condition client-error (error)
((reason :initarg :reason))
(:report (lambda (error stream)
(with-slots (reason) error
(format stream "Vivace-graph client error: ~A." reason)))))

(define-condition authentication-error (error)
((reason :initarg :reason))
(:report (lambda (error stream)
(with-slots (reason) error
(format stream "Vivace-graph client authentication error: ~A." reason)))))

68 changes: 68 additions & 0 deletions client.lisp
@@ -0,0 +1,68 @@
(in-package #:vivace-graph)

(defstruct (session
(:predicate session?))
socket
stream
vars
query
pointer
version)

(defun authenticate (username password graph)
(declare (ignore username password graph))
t)

(defmethod get-byte ((socket usocket:socket))
(read-byte (usocket:socket-stream socket) nil :eof))

(defmethod get-byte ((session session))
(read-byte (session-stream session) nil :eof))

(defun connect-to-server (ip port)
(handler-case
(let ((socket (usocket:socket-connect
ip port :protocol :tcp :element-type '(unsigned-byte 8) :timeout 600)))
(if socket
(progn
(write-byte +magic-byte+ (socket-stream socket))
(write-byte +version+ (socket-stream socket))
(force-output (socket-stream socket))
(let ((response (get-byte socket)))
(cond ((eq response +ack+) t)
((eq response :eof) (error "Server disconnected!"))
(t (error "Invalid ACK from server: ~A" response))))
(let ((magic-byte (get-byte socket)) (version (get-byte socket)))
(if (and (eq +magic-byte+ magic-byte) (eq +version+ version))
(make-session :socket socket :version +version+ :stream (socket-stream socket))
(error "Unknown greeting from server: ~A ~A" magic-byte version))))
(error "Unable to connect to ~A:~A" ip port)))
(error (condition)
(error 'client-error :reason condition))))

(defmethod disconnect-from-server ((session session))
(handler-case
(progn
(write-byte +magic-byte+ (session-stream session))
(write-byte +quit+ (session-stream session))
(force-output (session-stream session))
(let ((response (get-byte session)))
(cond ((or (eq response +ack+) (eq response :eof))
(ignore-errors (close (session-stream session)))
(setf (session-socket session) nil
(session-stream session) nil))
(t (error "Invalid ACK from server on request to quit: ~A" response)))))
(error (condition)
(ignore-errors (close (session-stream session)))
(error 'client-error :reason condition))))

(defmethod set-vars ((session session) vars)
(setf (session-vars session) (make-hash-table))
(loop for i from 0 to (list-length vars) do
(setf (gethash i (session-vars session)) (nth i vars)
(gethash (nth i vars) (session-vars session)) i)))

(defmethod select ((session session) vars &rest goals)
(set-vars session vars)
(let ((goals (substitute-vars session vars goals)))

116 changes: 116 additions & 0 deletions full-text-index.lisp
@@ -0,0 +1,116 @@
(in-package #:vivace-graph)

(defun tokenize-string (seq)
(remove-if #'(lambda (token)
(< (length token) 3))
(tokenize seq
:escapes #\\
:multi-escapes "\"|"
:delimiters (format nil "_,:~A" #\Tab)
:terminators ""
:punctuation "[]()!.?~`;'<>/+=-*&^%$#@"
:whitespace :whitespace
:defaults (let ((i 0))
(lambda () (incf i))))))

(defmethod index-text ((triple triple))
(with-transaction ((full-text-idx *graph*))
(when (stringp (triple-subject triple))
(dolist (token (remove-duplicates (tokenize-string (triple-subject triple)) :test 'equalp))
(set-btree (full-text-idx *graph*)
(make-slot-key "s" (string-downcase token))
(triple-uuid triple) :mode :concat)))
(when (stringp (triple-object triple))
(dolist (token (remove-duplicates (tokenize-string (triple-object triple)) :test 'equalp))
(set-btree (full-text-idx *graph*)
(make-slot-key "o" (string-downcase token))
(triple-uuid triple) :mode :concat)))))

(defmethod deindex-text ((triple triple))
(with-transaction ((full-text-idx *graph*))
(when (stringp (triple-subject triple))
(dolist (token (remove-duplicates (tokenize-string (triple-subject triple)) :test 'equalp))
(rem-btree (full-text-idx *graph*)
(make-slot-key "s" (string-downcase token))
(triple-uuid triple))))
(when (stringp (triple-object triple))
(dolist (token (remove-duplicates (tokenize-string (triple-object triple)) :test 'equalp))
(rem-btree (full-text-idx *graph*)
(make-slot-key "o" (string-downcase token))
(triple-uuid triple))))))

(defun full-text-search (string &key subject? object?)
(let ((result nil))
(dolist (token (remove-duplicates (tokenize-string string) :test 'equalp))
(when subject?
(let ((klist (get-btree (full-text-idx *graph*)
(make-slot-key "s" (string-downcase token)) :mode :klist)))
(when (klist? klist)
(unwind-protect
(map-klist #'(lambda (id)
(format t "GOT ~A~%" (lookup-triple-by-id id))
(pushnew (lookup-triple-by-id id) result)) klist)
(klist-free klist)))))
(when object?
(let ((klist (get-btree (full-text-idx *graph*)
(make-slot-key "o" (string-downcase token)) :mode :klist)))
(when (klist? klist)
(unwind-protect
(map-klist #'(lambda (id)
(format t "GOT ~A~%" (lookup-triple-by-id id))
(pushnew (lookup-triple-by-id id) result)) klist)
(klist-free klist))))))
result))





#|
(defmethod index-text ((triple triple))
(when (or (stringp (triple-subject triple))
(stringp (triple-object triple)))
(let ((doc (make-instance 'montezuma:document)))
(montezuma:add-field
doc (montezuma:make-field "uuid"
(format nil "~A" (triple-uuid triple))
:stored t :index :untokenized))
(when (stringp (triple-subject triple))
(montezuma:add-field
doc (montezuma:make-field "subject"
(triple-subject triple)
:stored t :index :tokenized)))
(when (stringp (triple-object triple))
(montezuma:add-field
doc (montezuma:make-field "object"
(triple-object triple)
:stored t :index :tokenized)))
(with-recursive-lock-held ((full-text-lock *graph*))
(montezuma:add-document-to-index (full-text-idx *graph*) doc)))))
(defmethod deindex-text ((triple triple))
(let ((docs nil))
(with-recursive-lock-held ((full-text-lock *graph*))
(montezuma:search-each
(full-text-idx *graph*)
(uuid:print-bytes nil (triple-uuid triple))
#'(lambda (doc score) (declare (ignore score)) (push doc docs)))
(dolist (doc docs)
(montezuma:delete-document (full-text-idx *graph*) doc)))))
(defun map-text-search (string fn &key collect)
(let ((result nil))
(with-recursive-lock-held ((full-text-lock *graph*))
(montezuma:search-each
(full-text-idx *graph*)
string
#'(lambda (doc score)
(declare (ignore score))
(let ((r
(funcall fn
(lookup-triple-by-id
(montezuma:document-value
(montezuma:get-document (full-text-idx *graph*) doc) "uuid")))))
(if collect (push r result)))))
(nreverse result))))
|#
3 changes: 2 additions & 1 deletion globals.lisp
Expand Up @@ -24,5 +24,6 @@
(defvar *trail* (make-array 200 :fill-pointer 0 :adjustable t)) (defvar *trail* (make-array 200 :fill-pointer 0 :adjustable t))
(defvar *var-counter* 0 "Counter for generating variable names.") (defvar *var-counter* 0 "Counter for generating variable names.")
(defvar *predicate* nil "The Prolog predicate currently being compiled.") (defvar *predicate* nil "The Prolog predicate currently being compiled.")
(defvar *select-list* "Accumulator for prolog selects.") (defvar *select-list* nil "Accumulator for prolog selects.")
(defvar *cont* nil "Continuation container for step-wise queries.")
(defvar *prolog-global-functors* (make-hash-table :synchronized t)) (defvar *prolog-global-functors* (make-hash-table :synchronized t))
12 changes: 5 additions & 7 deletions interface.lisp
Expand Up @@ -37,7 +37,7 @@ inconsistency should be eliminated at some point."
klist))) klist)))


(defun load-graph! (file) (defun load-graph! (file)
"Load a graph form configuration file (file). Sets *graph* to the newly opened graph." "Load a graph from configuration file (file). Sets *graph* to the newly opened graph."
(let ((config (py-configparser:make-config))) (let ((config (py-configparser:make-config)))
(py-configparser:read-files config (list file)) (py-configparser:read-files config (list file))
(let ((name (py-configparser:get-option config "default" "name"))) (let ((name (py-configparser:get-option config "default" "name")))
Expand All @@ -58,16 +58,13 @@ inconsistency should be eliminated at some point."
(deleted-triple-db graph) (deleted-triple-db graph)
(open-btree (format nil "~A/deleted-triples.kct" (graph-location graph)) (open-btree (format nil "~A/deleted-triples.kct" (graph-location graph))
:duplicates-allowed? t) :duplicates-allowed? t)
(full-text-idx graph)
(open-btree (format nil "~A/full-text-idx.kct" (graph-location graph))
:duplicates-allowed? t)
(rule-db graph) (rule-db graph)
(open-phash (format nil "~A/rules.kch" (graph-location graph))) (open-phash (format nil "~A/rules.kch" (graph-location graph)))
(functor-db graph) (functor-db graph)
(open-phash (format nil "~A/functors.kch" (graph-location graph))) (open-phash (format nil "~A/functors.kch" (graph-location graph)))
(full-text-idx graph)
(make-instance 'montezuma:index
:default-field "*"
:fields '("subject" "object")
:min-merge-docs 5000
:path (format nil "~A/full-text-idx" (graph-location graph)))
(gethash (graph-name graph) *graph-table*) graph (gethash (graph-name graph) *graph-table*) graph
*graph* graph)))))) *graph* graph))))))
(load-all-functors *graph*) (load-all-functors *graph*)
Expand Down Expand Up @@ -96,6 +93,7 @@ graph."
(when (eql *graph* graph) (setq *graph* nil)) (when (eql *graph* graph) (setq *graph* nil))
(close-phash (functor-db graph)) (close-phash (functor-db graph))
(close-phash (rule-db graph)) (close-phash (rule-db graph))
(close-btree (full-text-idx graph))
(close-btree (triple-db graph)) (close-btree (triple-db graph))
(close-btree (deleted-triple-db graph)) (close-btree (deleted-triple-db graph))
(remhash (graph-name graph) *graph-table*))) (remhash (graph-name graph) *graph-table*)))

0 comments on commit 6314bda

Please sign in to comment.