Permalink
Browse files

additions and corrections in connection with spocq sae development.

 - binary channel operations with support for stream-reader/writer
   (perchance, circumvents sbcl bug with read-byte, which does not properly
    body-position.)
 - factored assertion macros into general utilities for use elsewhere

amqp-device :
 amqp-j-write-chars : lexicographica

classes :
 added printers for exchange and queue

commands :
 extend publish with an exchange method
 add body key arg to deliver

data-wire-coding :
 different unsigned/signed byte setters
 improve error message for missing version implementations
 correct error messages for condition assertions

device-level:
 guard atttempt to close channel 0

device-stream:
 call-with-channel-output-stream : correct exchange reference
 call-with-open-channel-stream : clearer directionhandline
 with-open-connection: new

*/package : clean up documentation

stream :
 stream-read/write-byte/char : factored into plain functions for use with reader/writer
 stream-reader, stream-writer : new, with binary/character distinction

utilities :
 assert-condition, assert-argument-type -> de.setf.utilities

README
  • Loading branch information...
1 parent 6260875 commit fd51e7735edb61205bd3de9361d2c0854b5e44ef @lisp committed Sep 10, 2010
View
@@ -211,8 +211,8 @@ in this combined form, under the GAL as well
- 2003 [Kevin Rosenberg](mailto:kevin@rosenberg.net)
-> [5]: agpl.txt
-> [6]: http://common-lisp.net/project/bordeaux-threads/darcs/bordeaux-threads/CONTRIBUTORS
+ [5]: agpl.txt
+ [6]: http://common-lisp.net/project/bordeaux-threads/darcs/bordeaux-threads/CONTRIBUTORS
--------
![made with mcl](http://www.digitool.com/img/mcl-made-1.gif "Made With MCL")
File renamed without changes.
@@ -3,9 +3,8 @@
(in-package :cl-user)
-(de.setf.utility:document :file
- (description "This file defines the package for AMPQ version 0.8 components of the
- `de.setf.amqp` library.")
+(:documentation "This file defines the package for AMPQ version 0.8 components of the
+ `de.setf.amqp` library."
(copyright
"Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
"'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
File renamed without changes.
@@ -3,9 +3,7 @@
(in-package :cl-user)
-(de.setf.utility:document :file
- (description "This file defines the package for AMPQ version 0.9r0 components of the
- `de.setf.amqp` library.")
+(:documentation "This file defines the package for AMPQ version 0.9r0 components of the `de.setf.amqp` library."
(copyright
"Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
"'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
File renamed without changes.
@@ -4,9 +4,7 @@
(in-package :cl-user)
-(de.setf.utility:document :file
- (description "This file defines the package for AMPQ version 0.9r1 components of the
- `de.setf.amqp` library.")
+(:documentation "This file defines the package for AMPQ version 0.9r1 components of the `de.setf.amqp` library."
(copyright
"Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved"
"'de.setf.amqp' is free software: you can redistribute it and/or modify it under the terms of version 3
View
@@ -133,7 +133,6 @@
(setf (device-content-type device) (mime-type type))))
-
(defmethod update-device-codecs ((device amqp-device) (type mime:*/*))
(multiple-value-bind (decoder encoder)
(compute-charset-codecs (device-content-type device))
@@ -144,6 +143,17 @@
type)
+(defmethod update-device-codecs ((device amqp-device) (type mime:application/octet-stream))
+ "given a binary content type, set the codecs to byte-identity - but note they will fail
+ for any callers which expect characters."
+ (flet ((get-unsigned-byte (get-byte destination)
+ (funcall get-byte destination))
+ (put-unsigned-byte (byte put-byte destination)
+ (funcall put-byte destination byte)))
+ (setf-device-decoder #'get-unsigned-byte device)
+ (setf-device-encoder #'put-unsigned-byte device)
+ type))
+
(defmethod mime-type-charset ((device amqp-device))
"Given a device, delegate to the content-type."
@@ -276,7 +286,11 @@
#+mcl
(defmethod terminate ((object simple-stream))
- (device-close object t))
+ ;; double-check for incompletely closed channels etc.
+ (typecase (device-state object)
+ (amqp.s:close )
+ (t
+ (device-close object t))))
(when (fboundp 'stream-close)
(defmethod stream-close ((stream amqp-device))
@@ -343,7 +357,7 @@
(amqp-stream-write-char stream character))
(defun amqp-j-write-chars (string stream start end)
- (amql-stream-write-string stream string start end))
+ (amqp-stream-write-string stream string start end))
(defun amqp-j-listen (stream)
(stream-listen (stream-input-handle stream)))
View
@@ -56,7 +56,7 @@
:net.common-lisp.closer-mop
:net.common-lisp.bordeaux-threads
:de.weitz.cl-ppcre
- :com.b9.puri.ppcre
+ :com.b9.puri.puri-ppcre
:de.setf.utility
:de.setf.utility.mime
#+sbcl :sb-simple-streams)
View
@@ -913,11 +913,10 @@
initargs))
(defmethod print-object ((instance amqp:channel) stream)
- (with-slots (uri number) instance
- (print-unreadable-object (instance stream :identity t :type t)
- (write-char #\[ stream)
- (print-object uri stream)
- (format stream "].~d" number))))
+ (print-unreadable-object (instance stream :identity t :type t)
+ (write-char #\[ stream)
+ (print-object (bound-slot-value instance 'uri) stream)
+ (format stream "].~d" (bound-slot-value instance 'number))))
(defmethod object-channel ((channel amqp:channel))
@@ -1190,6 +1189,10 @@
(def-ensure-method (amqp:exchange amqp:delete-ok))
+(defmethod print-object ((instance amqp:exchange) stream)
+ (print-unreadable-object (instance stream :identity t :type t)
+ (format stream "[~a]" (or (ignore-errors (amqp:exchange-exchange instance)) "?"))))
+
(defgeneric amqp:exchange-exchange (object)
(:documentation "The exchange name accessor is extended with a string method to allow to
coerce arguments to a string value in request/response operators.")
@@ -1250,6 +1253,12 @@
(def-ensure-method (amqp:queue amqp:delete))
(def-ensure-method (amqp:queue amqp:delete-ok))
+
+(defmethod print-object ((instance amqp:queue) stream)
+ (print-unreadable-object (instance stream :identity t :type t)
+ (format stream "[~a]" (or (ignore-errors (amqp:queue-queue instance)) "?"))))
+
+
(defgeneric amqp:queue-queue (object)
(:documentation "The queue name accessor is extended with a string method to allow to
coerce arguments to a string value in request/response operators.")
View
@@ -322,6 +322,13 @@ messages in between sending the cancel method and receiving the cancel-ok reply.
deliver commands and process them either as polled or asynchronous events.")
(:request
+ (:method ((queue amqp:queue) &rest args)
+ (declare (dynamic-extent args))
+ (let ((channel (queue-channel queue)))
+ (apply #'channel-request-consume channel (amqp:channel.basic channel)
+ :queue queue
+ args)))
+
(:method ((basic amqp:basic) &rest args &key queue consumer-tag no-local no-ack exclusive no-wait arguments)
(declare (dynamic-extent args))
@@ -418,7 +425,7 @@ messages in between sending the cancel method and receiving the cancel-ok reply.
class)))
-(def-amqp-command amqp:deliver (class &key consumer-tag delivery-tag redelivered exchange routing-key)
+(def-amqp-command amqp:deliver (class &key body consumer-tag delivery-tag redelivered exchange routing-key)
(:documentation "C<--S : notify a client of an incoming consumer message.
CLASS : The client class to which the message is being delivered.
A read frame generates an immediate basic instance, which then delegates
@@ -483,6 +490,7 @@ messages in between sending the cancel method and receiving the cancel-ok reply.
(:method ((channel amqp:queue) &rest args &key queue no-ack body)
(declare (dynamic-extent args) (ignore no-ack body))
+ ;;;??? should better use the queues own channel?
(apply #'channel-request-get amqp:channel (amqp:channel.basic amqp:channel)
:queue queue
args))
@@ -578,15 +586,18 @@ defined by the exchange configuration and distributed to any active consumers wh
any, is committed.")
(:request
+ (:method ((exchange amqp:exchange) &rest args)
+ "Given an exchange, delegate to its channel's basic instance."
+ (declare (dynamic-extent args))
+ (apply #'amqp::request-publish (amqp:channel.basic (amqp.u:exchange-channel exchange)) args))
+
(:method ((channel amqp:channel) &rest args)
"The class' channel is state is set to use-channel.body.output, the stream is cleared,
and the encoding is asserted. If a body is supplied, then, it is written. Otherwise the
channel is left available as a stream."
-
(declare (dynamic-extent args))
;; delegate to the channel's basic class
- (apply #'amqp::request-publish (amqp:channel.basic channel)
- args))
+ (apply #'amqp::request-publish (amqp:channel.basic channel) args))
(:method ((basic amqp:basic) &rest args &key body exchange routing-key mandatory immediate)
(declare (ignore routing-key mandatory immediate))
View
@@ -1086,10 +1086,10 @@ In addition compound buffer accessors are defined for the types
(when assert-conditions
(assert-argument-type ,buffer-unsigned-name buffer frame-buffer)
(assert-condition (and (typep position 'fixnum) (<= (+ position ,bytes) (length buffer)))
- buffer-unsigned-name "value overflows buffer: (~s + ~s), ~s"
+ ,buffer-unsigned-name "value overflows buffer: (~s + ~s), ~s"
position ,bytes (length buffer)))
(let ((value 0))
- (declare (type ,(if (<= (expt 2 length) most-positive-fixnum) 'fixnum 'integer) value))
+ (declare (type (unsigned-byte ,length) value))
,@(loop for i from 1 to bytes
append `((setf value ,(if (= i 1)
'(aref buffer position)
@@ -1100,8 +1100,8 @@ In addition compound buffer accessors are defined for the types
(defun (setf ,buffer-unsigned-name) (value buffer position &optional (assert-conditions t))
(declare (type amqp:frame-buffer buffer))
(declare (type fixnum position)
- (type ,(if (<= (expt 2 length) most-positive-fixnum) 'fixnum 'integer) value))
- (assert-condition (and (integerp value) (>= value 0) (< value ,(expt 2 length)))
+ (type (unsigned-byte ,length) value))
+ (assert-condition (typep value '(unsigned-byte ,length))
(setf ,buffer-unsigned-name) "Invalid byte value, exceeds domain: ~s."
value)
(when assert-conditions
@@ -1115,12 +1115,28 @@ In addition compound buffer accessors are defined for the types
(setf value (ash value -8))))
(+ position ,bytes))))
+ ;; the signed read can reuse the unsigned version, but the writer has different type constraints
(defun ,buffer-signed-name (buffer position &optional (assert-conditions t))
(values (,signed-name (,buffer-unsigned-name buffer position assert-conditions))
(+ position ,bytes)))
(defun (setf ,buffer-signed-name) (value buffer position &optional (assert-conditions t))
- (setf (,buffer-unsigned-name buffer position assert-conditions) value))))))
+ (declare (type amqp:frame-buffer buffer))
+ (declare (type fixnum position)
+ (type (signed-byte ,length) value))
+ (assert-condition (typep value '(signed-byte ,length))
+ (setf ,buffer-signed-name) "Invalid byte value, exceeds domain: ~s."
+ value)
+ (when assert-conditions
+ (assert-argument-type (setf ,buffer-signed-name) buffer frame-buffer)
+ (assert-condition (and (typep position 'fixnum) (<= (+ position ,bytes) (length buffer)))
+ (setf ,buffer-signed-name) "value overflows buffer: (~s + ~s), ~s"
+ position ,bytes (length buffer)))
+ (values value
+ (progn ,@(loop for i from (1- bytes) downto 0
+ append `((setf (aref buffer (+ position ,i)) (logand #xff value))
+ (setf value (ash value -8))))
+ (+ position ,bytes))))))))
(def-byte-accessors 8)
(def-byte-accessors 16)
@@ -1282,7 +1298,7 @@ In addition compound buffer accessors are defined for the types
;; check bounds here as it's finally the encoded positioning
(assert-condition (< position max-position)
(setf ,buffer-utf8-name) "String overflows size constraint: ~s, ~s"
- ',buffer-utf8-name position max-position)
+ position max-position)
(setf (aref buffer position) byte)
(incf position)))
(declare (dynamic-extent #'buffer-insert-byte)) ; just in case
@@ -1406,16 +1422,24 @@ In addition compound buffer accessors are defined for the types
(defun (setf buffer-protocol-header-version) (version buffer)
"Store a protocol header into a buffer.
Accept a version keyword and set the version header as registered in the list of supported versions."
+ (let ((encoded-version (version-protocol-header version)))
+ (cond (encoded-version)
+ ((null amqp.u:*version-headers*)
+ (error "No AMQP version implementation loaded."))
+ (t
+ (error "Invalid version : ~s." version)))
- (replace buffer (or (version-protocol-header version) (error "Invalid version : ~s." version)) :start1 0 :end1 8)
- version)
+ (replace buffer encoded-version :start1 0 :end1 8)
+ version))
(defun buffer-protocol-header-version (buffer &optional (error-p t))
"Extract a protocol header from a buffer.
Return the respective version keyword as registered in the list of supported versions."
(cond ((protocol-header-version (if (= (length buffer) 8) buffer (setf buffer (subseq buffer 0 8)))))
+ ((null amqp.u:*version-headers*)
+ (error "No AMQP version implementation loaded."))
(error-p
(error "Invalid version : ~s." buffer))
(t
View
@@ -187,20 +187,23 @@ as well as the discussions of the the alternative fu interface.[5]
(defmethod device-close ((device amqp:channel) (abort t))
"remove the channel from the connection."
(amqp:log :debug device "Close in state: ~s" (channel-state device))
- (when (open-stream-p device)
- (cond (abort
- (call-next-method))
- (t
- (let ((initial-state (shiftf (channel-state device) amqp.s:close-channel)))
- (typecase initial-state
- ;; if in use, send the close request, the flush it
- (amqp.s:use-channel
- (amqp:request-close device)
- ;; complete and flush the content.
- (device-flush device t)))
- (call-next-method))))
- ;; in any case disconnect
- (disconnect-channel (channel-connection device) device)))
+ (if (zerop (channel-number device))
+ (amqp:log :warn device "Attempt to close channel zero.")
+ (when (open-stream-p device)
+ (cond (abort
+ (setf (channel-state device) amqp.s:close-channel)
+ (call-next-method))
+ (t
+ (let ((initial-state (shiftf (channel-state device) amqp.s:close-channel)))
+ (typecase initial-state
+ ;; if in use, send the close request, the flush it
+ (amqp.s:use-channel
+ (amqp:request-close device)
+ ;; complete and flush the content.
+ (device-flush device t)))
+ (call-next-method))))
+ ;; in any case disconnect
+ (disconnect-channel (channel-connection device) device))))
(defmethod device-read ((device amqp:channel) buffer-arg start end blocking)
@@ -431,11 +434,12 @@ as well as the discussions of the the alternative fu interface.[5]
(defmethod device-clear-input ((device amqp:channel) buffer-only)
- ;;; clear a possible pushed character and "empty" buffer.
- ;;; unless buffer-only, also flush any not yet read frames until
- ;;; the end of the body is reached
+ ;;; call the decoder to clear a possible pushed character and correct position
+ ;;; then "empty" the buffer.
+ ;;; then, unless buffer-only, also flush any not yet read frames until the end of the body is reached
(with-slots (decoder buffer buffpos buffer-ptr body-position body-length) device
- (funcall decoder #'(lambda (s) (declare (ignore s)) 0) device)
+ (when decoder ; maybe not present for binary streams
+ (funcall decoder #'(lambda (s) (declare (ignore s)) 0) device))
;; skip over anything already in the buffer
(setf body-position (+ body-position (- buffer-ptr buffpos)))
(setf buffpos buffer-ptr)
@@ -517,7 +521,9 @@ as well as the discussions of the the alternative fu interface.[5]
(defmethod device-close ((device amqp:connection) (abort t))
- (map nil #'(lambda (c) (when c (device-close c abort)))
+ (map nil #'(lambda (c)
+ (when (and c (plusp (channel-number c)))
+ (device-close c abort)))
(get-connection-channels device))
(if abort
(call-next-method)
@@ -841,7 +847,7 @@ returned.")
consumer-tag
)
(:documentation "Given a channel which has received a Basic.Deliver or Basic.Get/Get-ok,
- firat, prepare the channel based on the content header properties, and read the content
+ first, prepare the channel based on the content header properties, and read the content
according to the combined channel data type and content type. Combine the header's possibly
incomplete content type with the channel's to specify the effective decoding.")
@@ -855,9 +861,10 @@ returned.")
(package (getf headers :package))
(mime-type (class-mime-type basic)))
;; element-type in the basic header combines the read values with the channel's content-type
- (setf element-type (or (find-symbol element-type package)
- (error "Invalid type x package combination: ~s, ~s."
- element-type package)))
+ (when element-type
+ (setf element-type (or (find-symbol element-type package)
+ (error "Invalid type x package combination: ~s, ~s."
+ element-type package))))
(amqp:log :debug channel "device-read-content: in (~s ~s) in state ~s x~s"
element-type mime-type (channel-state channel) (device-body-length channel))
(device-read-content-body channel (or body element-type) mime-type))))
@@ -880,14 +887,14 @@ returned.")
(setf body-length body-size
buffer-ptr 0
body-position 0))
- (update-device-codecs channel mime-type)
+ ;; (update-device-codecs channel mime-type)
+ (setf (channel-content-type channel) mime-type)
(setf (channel-state channel)
(if (string-equal (getf headers :transfer-encoding) "chunked")
amqp.s:use-channel.body.input.chunked
amqp.s:use-channel.body.input))
(return-from command-loop basic))))))
-
(defgeneric device-read-content-body (device type content-type)
(:method ((channel amqp:channel) (type (eql 'string)) (content-type mime:text/plain))
@@ -900,6 +907,7 @@ returned.")
(:method ((channel amqp:channel) (body-op function) (content-type mime:*/*))
"Given a the null type, just return the channel as a stream to be read."
(prog1 (funcall body-op channel content-type)
+ ;; once the operator has read, clear to the end of the message
(device-clear-input channel nil)))
(:method ((channel amqp:channel) (type (eql 'vector)) (content-type mime:application/octet-stream))
Oops, something went wrong.

0 comments on commit fd51e77

Please sign in to comment.