Skip to content

Commit

Permalink
finished new JSON protocol! all queries ive thrown at it are coming b…
Browse files Browse the repository at this point in the history
…ack fine, as well as the return types (errors/cursors included). there are also a number of updates i made to the protocol parsers that should make them more resilient (not that they need to be, but JIC). looking/working great so far. going to implement some real tests next.
  • Loading branch information
orthecreedence committed Dec 14, 2014
1 parent 6a787ad commit febdcb7
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 296 deletions.
8 changes: 5 additions & 3 deletions cl-rethinkdb.asd
Expand Up @@ -3,9 +3,10 @@
:license "MIT"
:version "0.5.0"
:description "A RethinkDB driver for Common Lisp"
:depends-on (#:cl-async-future
:depends-on (#:blackbird
#:cl-async
#:flexi-streams
#:fast-io
#:yason
#:cl-hash-util
#:cl-ppcre)
:components
Expand All @@ -16,7 +17,8 @@
(:module reql
:serial t
:components
((:file "function")
((:file "types")
(:file "function")
(:file "commands")
(:file "dsl"))
:depends-on ("config"))
Expand Down
73 changes: 48 additions & 25 deletions connection.lisp
Expand Up @@ -4,38 +4,61 @@

(in-package :cl-rethinkdb)

(define-condition connect-error (simple-error)
((msg :reader connect-error-msg :initarg :msg :initform ""))
(:report (lambda (c s) (format s "Connection error: ~a" (connect-error-msg c))))
(:documentation "A general connection error condition."))

(defparameter *empty* (make-array 0 :element-type '(unsigned-byte 8))
"An empty data set for setting callbacks. Perhaps a cl-async API for this is
in order??")

(defun do-connect (host port &key (read-timeout 5))
"Create a connection to the given host/port, and optionally db."
(let ((future (make-future))
(sock nil))
;; make sure sock is async so people aren't tempted to depend on a sync op
(as:delay (lambda () (finish future sock)))
(setf sock (as:tcp-connect host port
nil
(lambda (ev) (signal-error future ev))
:read-timeout read-timeout))))

(defun do-send (sock bytes)
"Send a query to the server over the given socket using the given token.
Returns a future that is finished with the full byte-array of the response
once the full response returns."
(let* ((future (make-future))
;; create a lambda that can be called multiple times with chunks of
;; response data, returning the full data (byte array) only when all
;; of the response has been received.
(response-handler (make-response-handler)))
(as:write-socket-data sock bytes
(with-promise (resolve reject)
(let ((sock (as:tcp-connect host port
nil
(lambda (ev) (reject ev))
:read-timeout read-timeout)))
(as:with-delay ()
(resolve sock)))))

(defun finalize-connect (sock)
"Make sure a connection to the DB was successful."
(with-promise (resolve reject)
(as:write-socket-data sock *empty*
:read-cb (lambda (sock data)
(declare (ignore sock))
(let ((full-response-bytes (funcall response-handler data)))
(when full-response-bytes
(finish future full-response-bytes))))
(let ((msg (babel:octets-to-string data)))
(if (string= (subseq msg 0 7) "SUCCESS")
(resolve sock)
(reject (make-instance 'connect-error
:msg (format nil "bad connect: ~a~%" msg))))))
:event-cb (lambda (ev)
(let ((ev-type (type-of ev)))
(when (or (not (subtypep ev-type 'as:event-info))
(subtypep ev-type 'as:event-error))
(signal-error future ev)))))
future))
(reject ev)))))))

(defun sock-write (sock bytes)
"Send data on a rethinkdb connection."
(as:write-socket-data sock bytes))

(defun finalize-query (sock)
"Make sure a socket that just had query data sent over it is ready to handle
the response."
(with-promise (resolve reject)
(let ((response-handler (make-response-handler)))
(as:write-socket-data sock *empty*
:read-cb (lambda (sock data)
(declare (ignore sock))
(let ((full-response-bytes (funcall response-handler data)))
(when full-response-bytes
(parse-response full-response-bytes))))
:event-cb (lambda (ev)
(let ((ev-type (type-of ev)))
(when (or (not (subtypep ev-type 'as:event-info))
(subtypep ev-type 'as:event-error))
(reject ev))))))))

(defun do-close (sock)
"Close the given socket."
Expand Down
2 changes: 1 addition & 1 deletion package.lisp
Expand Up @@ -34,7 +34,7 @@
(:nicknames :reql))

(defpackage :cl-rethinkdb
(:use :cl :cl-rethinkdb-util :cl-async-future :cl-rethinkdb-reql)
(:use :cl :cl-rethinkdb-util :blackbird :cl-rethinkdb-reql)
(:export :*state*
:state

Expand Down
5 changes: 5 additions & 0 deletions protocol.lisp
Expand Up @@ -3,6 +3,11 @@
(defconstant +proto-version+ #x5f75e83e)
(defconstant +proto-json+ #x7e6970c7)

(defconstant +proto-query-start+ 1)
(defconstant +proto-query-continue+ 2)
(defconstant +proto-query-stop+ 3)
(defconstant +proto-query-wait+ 4)

(defconstant +rdb-response-atom+ 1)
(defconstant +rdb-response-sequence+ 2)
(defconstant +rdb-response-partial+ 3)
Expand Down

0 comments on commit febdcb7

Please sign in to comment.