Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

changes to streamed framing to eliminate extra messages

amqp-device.lisp :
 amqp-j-read-chars : add missing end test

commands.lisp :
 change get and deliver to return the headers as well as the content.
 change publish to defer sending the method and header frames until
  the flush operation

device-level.lisp :
 device-flush : send the method and header frames as per the configured
  channel basic instance; do not send any extra termination or
  padding frames
 #+/-zero-frame-eoc-marker the implementation versions for the moment
 device-initialize-content-header : factored out from the writing
  process to be available for the publish command.
 device-write-content : initialize the basic's header properties, but do not write.

frame.lisp :
 unget-read-frame : added for use in device read to push back non-body frames
  and rheat them as an eof indicator.

utilities.lisp :
 undequeue : for unget-read-frame
  • Loading branch information...
commit d772d24c2629974ba8ac34b6aef2a94ad5ebe3ae 1 parent aa0cd0d
@lisp authored
View
4 amqp-device.lisp
@@ -364,7 +364,9 @@
(let ((i start)
(start-buffpos buffpos)
(char #\null))
- (loop (unless (setf char (funcall decoder #'buffer-extract-byte device))
+ (loop (when (>= i end)
+ (return (values (- i start) nil)))
+ (unless (setf char (funcall decoder #'buffer-extract-byte device))
(return (values (- i start) :eof)))
(setf last-char-read-size (- buffpos start-buffpos))
(setf start-buffpos buffpos)
View
2  classes.lisp
@@ -706,7 +706,7 @@
;; try to determine the size
(setf body-size (channel-compute-body-size channel body content-type))
(etypecase body-size
- (null
+ (null
(setf (getf headers :transfer-encoding) "chunked")
(setf body-size (device-buffer-length channel)))
(integer
View
33 commands.lisp
@@ -446,7 +446,8 @@ messages in between sending the cancel method and receiving the cancel-ok reply.
(let ((channel (object-channel basic)))
;; save the tag for eventual acknowledgment - either by app or below
(setf (amqp:basic-delivery-tag basic) delivery-tag)
- (prog1 (apply #'device-read-content channel args)
+ (multiple-value-prog1 (values (apply #'device-read-content channel args)
+ (amqp:basic-headers basic))
(when (and (channel-acknowledge-messages channel)
;; in case the ack was managed elsewhere, test
(eql (amqp:basic-delivery-tag basic) delivery-tag))
@@ -530,7 +531,8 @@ messages in between sending the cancel method and receiving the cancel-ok reply.
(amqp:log :debug basic "respond-to-get, get-ok: ~s" get-ok-args)
(let ((channel (object-channel basic)))
(return-from command-case
- (prog1 (apply #'device-read-content channel :body body get-ok-args)
+ (multiple-value-prog1 (values (apply #'device-read-content channel :body body get-ok-args)
+ (amqp:basic-headers basic))
(unless (amqp:basic-no-ack basic)
(amqp::send-ack basic :delivery-tag delivery-tag))))))))))
@@ -543,7 +545,7 @@ messages in between sending the cancel method and receiving the cancel-ok reply.
(declare (dynamic-extent args))
(let ((channel (object-channel basic)))
;;; nb. do not ack a get-ok
- (prog1 (apply #'device-read-content channel args))))))
+ (apply #'device-read-content channel args)))))
(def-amqp-command amqp:Get-Empty (class &key)
@@ -616,28 +618,15 @@ any, is committed.")
(apply #'amqp::request-publish (amqp:channel.basic channel) args))
(:method ((basic amqp:basic) &rest args &key (body nil body-s)
- (ticket nil t-s)
- (exchange (amqp:basic-exchange basic))
- (routing-key (amqp:basic-routing-key basic))
- (mandatory (amqp:basic-mandatory basic))
- (immediate (amqp:basic-immediate basic))
- content-type content-encoding headers delivery-mode
- priority correlation-id reply-to expiration message-id timestamp
- type user-id)
- (declare (ignore content-type content-encoding headers delivery-mode
- priority correlation-id reply-to expiration message-id timestamp
- type user-id))
- (setf exchange (amqp:exchange-exchange exchange)) ; coerce to a string
- (setf (amqp:basic-exchange basic) exchange) ; cache for possible use in chunk headers
+ (exchange nil e-s)
+ &allow-other-keys)
+ (when e-s
+ (setf exchange (amqp:exchange-exchange exchange)) ; coerce to a string
+ (setf (amqp:basic-exchange basic) exchange)) ; cache for possible use in chunk headers
(when body-s
(setf args (copy-list args))
(remf args :body))
- (if t-s ; version variation
- (amqp::send-publish basic :ticket ticket :exchange exchange :routing-key routing-key
- :mandatory mandatory :immediate immediate)
- (amqp::send-publish basic :exchange exchange :routing-key routing-key
- :mandatory mandatory :immediate immediate))
-
+ (apply #'shared-initialize basic t args)
(let ((channel (object-channel basic)))
(apply #'device-write-content channel body :exchange exchange args)))))
View
461 device-level.lisp
@@ -215,6 +215,7 @@ as well as the discussions of the the alternative fu interface.[5]
(disconnect-channel (channel-connection device) device))))
+#+zero-frame-eoc-marker
(defmethod device-read ((device amqp:channel) buffer-arg start end blocking)
"Channels read a frame at a time through a connection.
the connection manages the actual stream and makes frames available as
@@ -328,8 +329,114 @@ as well as the discussions of the the alternative fu interface.[5]
(return (when blocking (setf buffer-ptr -1))))))))))
-(defmethod device-write ((device amqp:channel) buffer-arg start end (blocking t))
+#-zero-frame-eoc-marker
+(defmethod device-read ((device amqp:channel) buffer-arg start end blocking)
"Channels read a frame at a time through a connection.
+ the connection manages the actual stream and makes frames available as
+ they appear. the specified 'blocking' mode determines whether to
+ wait if there is nothing present."
+ (assert-device-state device use-channel.body.input device-read)
+
+ (with-slots (buffer buffpos buffer-ptr buf-len body-position body-length) device
+ (cond ((< buffer-ptr 0)
+ -1)
+ ((eql start end) ; interpret blocking
+ (if blocking
+ ;; nothing is read anyway
+ 0
+ ;; iff not blocking , see if anything is present or in the read queue
+ (if (or (< buffpos buffer-ptr) (not (collection-empty-p (device-read-frames device))))
+ -3
+ 0)))
+ ((>= body-position body-length)
+ (typecase (device-state device)
+ (amqp.s:use-channel.body.input.chunked
+ ;; chunked => start the next message
+ ;; - if the last was a deliver, wait for the next
+ ;; - if it was a get-ok, ask for the next
+ (command-case (device)
+ ((or amqp:get-ok amqp:deliver) ((basic amqp:basic) &rest args)
+ (amqp:log :debug device "chunk continuation: (~s ~s) . ~s"
+ (type-of amqp:method) (type-of basic) args)
+ t)
+ (t ((class t) &rest args)
+ (amqp:log :error device "Unexpected chunk continuation: (~s ~s) . ~s"
+ (type-of amqp:method) (type-of class) args)
+ t))
+ (let ((basic (device-read-content-header device)))
+ (amqp:log :debug device "read chunk header: ~s" basic)
+ (cond ((string-equal (getf (amqp:basic-headers basic) :transfer-encoding) "chunked")
+ ;; if that succeeded, try again
+ (if (< body-position body-length)
+ (device-read device buffer-arg start end blocking)
+ (setf buffer-ptr -1)))
+ (t
+ ;; no more chunks
+ (setf (device-state device) amqp.s:use-channel.body.input)
+ (setf buffer-ptr -1)))))
+ (t
+ ; not chunked => mark eof
+ (setf buffer-ptr -1))))
+ (buffer-arg
+ ;; if a buffer is provided, use it+bounds together with the devices buffer+bounds
+ ;; to iteratively fill the argument buffer. recurse for more input
+ ;; maintain body-position since the buffer is accepting the content.
+ (let ((total-count 0)
+ (last-device-read 0))
+ (unless end (setf end (length buffer-arg)))
+ (unless start (setf start 0))
+ (loop (unless (> start end) (return))
+ (when (>= buffpos buffer-ptr)
+ (unless (and (< body-position body-length)
+ (plusp (setf last-device-read (device-read device nil 0 nil blocking))))
+ (return)))
+ (let* ((count (min (- end start) (- buffer-ptr buffpos)))
+ (end1 (+ start count))
+ (end2 (+ buffpos count)))
+ (replace buffer-arg buffer :start1 start :end1 end1 :start2 buffpos :end2 end2)
+ (setf start end1
+ buffpos end2)
+ (incf total-count count)
+ (incf body-position count)))
+ (if (zerop total-count)
+ last-device-read
+ total-count)))
+ (t
+ ;; otherwise read a frame buffer
+ (assert (and (zerop start) (or (null end) (= end (length buffer)))) ()
+ "Frame buffer i/o permitted for entire buffers only.")
+ (let* ((frame nil))
+ (loop (setf frame (get-read-frame device :wait blocking))
+ (amqp:log :debug device "device-read: next read frame: state: ~a, size: ~d, body: ~s/~s, buffer: ~s/~s/~s."
+ (type-of (device-state device)) (frame-size frame) body-position body-length buffpos buffer-ptr buf-len)
+ ;; if non-blocking, maybe no frame
+ (if frame
+ (cond ((not (eq (frame-type-class-name frame) 'amqp:body))
+ (unget-read-frame device frame)
+ (return (setf buffer-ptr -1)))
+ ((plusp (frame-size frame))
+ (let* ((data (frame-data frame))
+ (length (frame-size frame)))
+ (rotatef buffer data)
+ (setf-frame-data data frame)
+ (release-frame frame)
+ (incf (device-frame-position device) buffpos)
+ (setf buffpos 0)
+ (setf buffer-ptr length)
+ (setf buf-len (length buffer)) ; could change iff possible to re-tune
+ (assert (<= buffer-ptr buf-len) ()
+ "Invalid buffer sizes: ptr ~d, len ~d." buffer-ptr buf-len)
+ (amqp:log :debug device "device-read: adjusted pointers: state: ~a, body: ~s/~s, buffer: ~s/~s/~s."
+ (type-of (device-state device)) body-position body-length buffpos buffer-ptr buf-len)
+ (return length)))
+ (t
+ ;; if the frame is a zero-length frame, skip it
+ (release-frame frame)))
+ (return (when blocking (setf buffer-ptr -1))))))))))
+
+
+(defmethod device-write ((device amqp:channel) buffer-arg start end (blocking t))
+ "Channels write a frame at a time through a connection.
The connection manages the actual stream and writes frames as required. The
specified 'blocking' mode has no affect on output."
(assert-device-state device use-channel.body device-write)
@@ -365,7 +472,7 @@ as well as the discussions of the the alternative fu interface.[5]
(device-flush device)))))
-
+#+zero-frame-eoc-marker
(defmethod device-flush ((device amqp:channel) &optional (complete nil))
"Push data to the channel's connection.
DEVICE : amqp:channel : an open channel
@@ -413,11 +520,12 @@ as well as the discussions of the the alternative fu interface.[5]
(setf max-out-pos (length out-buffer))
length)))
;; check whether there is anything to flush
- (when (or (plusp outpos) (zerop body-length)) ; always at least one frame
+ (when (or (plusp outpos) ) ; (zerop body-length)) ; always at least one frame
;; if there is content, send the frame out.
(setf result-length (flush-frame)))
;; check completion
(cond (complete
+ (amqp:log :debug device "Complete flush in state ~s." (device-state device))
(typecase (device-state device)
(amqp.s:use-channel.body.output.chunked
;; if the content was chunked, send a zero-length frame and revert to .output
@@ -453,6 +561,275 @@ as well as the discussions of the the alternative fu interface.[5]
(amqp:log :debug basic "Starting next chunk: done."))))))
result-length))))))
+#-zero-frame-eoc-marker
+(defmethod device-flush ((device amqp:channel) &optional (complete nil))
+ "Push data to the channel's connection.
+ DEVICE : amqp:channel : an open channel
+ COMPLETE : boolean : iff true, this is the final flush on a stream of messsages. if the channel was
+ sending chunks, send an additional final marker message.
+
+ Given a buffer, its content is passed through the channel's
+ frame buffers. Lacking a buffer, the exisitng frame buffer is sent. The effect is to support
+ single-frame commands, sequence-based stream io, and buffer based stream io. The device state is checked to
+ confirm an operation makes sense.
+
+ On framed content, streams, and chunking:
+ there are three aspects:
+ * is there buffered content: (zerop outpos) ?
+ * is the content complete?
+ * was the content length known ahead of time: .output or .output.chunked ?
+
+ if there is content, wrap up the body frame and send it.
+ given the now empty buffer, if the content body is now complete, then it matters whether the length was
+ predetermined. if it was, that is, in state .output (which includes .output.chunked), since this is not an
+ abort, sufficient frames must be sent to achieve the content length.if the state is .output.chunked, the end
+ of the sequence of commands is indicated by sending an additional zero-length message."
+
+ (let ((result-length 0)
+ (basic (channel.basic device)))
+ (typecase (device-state device)
+ (amqp.s:use-channel.body.output
+ (with-slots (out-buffer outpos max-out-pos body-position body-length) device
+ (amqp:log :debug device "device-flush (~a/~d) ~d/~d, ~d/~d"
+ (device-state device) (class-body-size basic)
+ body-position body-length outpos max-out-pos
+ )
+ (flet ((flush-frame ()
+ (let* ((frame (claim-output-frame device))
+ (length outpos))
+ ;; update start of the next frame -
+ ;; nb. here v/s when frames are written as only body/content frames contribute
+ (incf (device-frame-position device) outpos)
+ (rotatef out-buffer (frame-data frame))
+ (setf-frame-type-class-name 'amqp:body frame)
+ (setf-frame-cycle 0 frame)
+ (setf-frame-channel-number (channel-number device) frame)
+ (setf-frame-track-number (channel-track device) frame)
+ (setf-frame-size outpos frame)
+ (put-encoded-frame device frame)
+ (amqp:log :debug device "flushed frame: ~a" frame)
+ (setf outpos 0)
+ (setf max-out-pos (length out-buffer))
+ length)))
+ (cond (complete
+ (amqp:log :debug device "device-flush, complete (~a/~d) ~d/~d, ~d/~d"
+ (device-state device) (class-body-size basic)
+ body-position body-length outpos max-out-pos)
+ (setf result-length outpos)
+ (let ((pad-length (max 0 (min (- body-length body-position) (- max-out-pos outpos)))))
+ (when (> pad-length 0)
+ ;; pad the current frame
+ (fill out-buffer 0 :start outpos :end (+ outpos pad-length))
+ (incf body-position pad-length)
+ (incf outpos pad-length)
+ (amqp:log :debug device "device-flush, complete, padded frame to ~d, ~d" body-position outpos)))
+ (when (or (plusp outpos) ) ;; (zerop body-length))
+ ;; if there is content, send the frame out.
+ ;; if there is none, always send at least one frame
+ (flush-frame))
+ ;; send any additional frames to fill the difference between body-position and body-length
+ (do ((count (min (- body-length body-position) max-out-pos)
+ (min (- body-length body-position) max-out-pos))
+ (first t nil))
+ ((<= count 0))
+ (amqp:log :debug device "device-flush, additional pad frame (~a/~d) ~d/~d, ~d/~d"
+ (device-state device) (class-body-size basic)
+ body-position body-length outpos max-out-pos)
+ (when first (fill out-buffer 0 :start 0 :end count))
+ (setf outpos count)
+ (flush-frame)
+ (incf body-position count))
+ (amqp:log :debug device "device-flush, complete after any padding (~a/~d) ~d/~d, ~d/~d"
+ (device-state device) (class-body-size basic)
+ body-position body-length outpos max-out-pos)
+ (typecase (device-state device)
+ (amqp.s:use-channel.body.output.chunked
+ ;; if the content was chunked, send non-chunked termination message
+ (setf (device-state device) amqp.s:use-channel.body.output)
+ (let* ((headers (copy-list (amqp:basic-headers basic))))
+ (remf headers :transfer-encoding)
+ (amqp:log :debug basic "Starting end non-chunk: publish...")
+ (amqp:publish device :body ""
+ :exchange (amqp:basic-exchange basic)
+ :routing-key (amqp:basic-routing-key basic)
+ :headers headers))
+ #+(or)
+ (let* ((headers (amqp:basic-headers basic)))
+ (remf headers :transfer-encoding)
+ (setf (amqp:basic-headers basic) headers)
+ ;; set up for a zero-length message
+ (setf outpos 0
+ max-out-pos 0
+ body-length 0
+ body-position 0)
+ (setf (class-body-size basic) 0)
+ ;; (setf (aref out-buffer 0) 0)
+ (amqp:log :debug basic "Starting end non-chunk: publish...")
+ (amqp:send-publish basic :exchange (amqp:basic-exchange basic))
+ (amqp:log :debug basic "Sending EOC message: header...")
+ (send-header basic)
+ ;; rabbitmq balks on the zero-length body frame
+ (amqp:log :debug basic "Sending EOC message: zero frame...")
+ ;;(flush-frame)
+ ))))
+ (t
+ ;; if the output is to continue,
+ ;; - flush the frame
+ ;; - interpose a new message is chunking
+ ;; check whether there is anything to flush
+ (setf result-length outpos)
+ (when (or (plusp outpos) ) ;; (zerop body-length))
+ ;; if there is content, send the frame out.
+ ;; if there is none, always send at least one frame
+ (flush-frame))
+ (typecase (device-state device)
+ (amqp.s:use-channel.body.output.chunked
+ ;; if so, need to send a new command:
+ ;; send a new publish, reset the buffer positions, and continue streaming
+ (let ()
+ (amqp:log :debug basic "Starting next chunk: publish...")
+ (amqp:send-publish basic :exchange (amqp:basic-exchange basic))
+ (setf outpos 0
+ max-out-pos (length out-buffer)
+ ;; start a new body
+ body-length (class-body-size basic))
+ (setf body-position 0)
+ (amqp:log :debug basic "Starting next chunk: header...")
+ (send-header basic)
+ (amqp:log :debug basic "Starting next chunk: done."))))))
+ result-length))))))
+
+#-zero-frame-eoc-marker ;; with lazy method/header writes
+(defmethod device-flush ((device amqp:channel) &optional (complete nil))
+ "Push data to the channel's connection.
+ DEVICE : amqp:channel : an open channel
+ COMPLETE : boolean : iff true, this is the final flush on a stream of messsages. if the channel was
+ sending chunks, send an additional final marker message.
+
+ Given a buffer, its content is passed through the channel's
+ frame buffers. Lacking a buffer, the exisitng frame buffer is sent. The effect is to support
+ single-frame commands, sequence-based stream io, and buffer based stream io. The device state is checked to
+ confirm an operation makes sense.
+
+ On framed content, streams, and chunking:
+ there are three aspects:
+ * is there buffered content: (zerop outpos) ?
+ * is the content complete?
+ * was the content length known ahead of time: .output or .output.chunked ?
+
+ if there is content, wrap up the body frame and send it.
+ given the now empty buffer, if the content body is now complete, then it matters whether the length was
+ predetermined. if it was, that is, in state .output (which includes .output.chunked), since this is not an
+ abort, sufficient frames must be sent to achieve the content length.if the state is .output.chunked, the end
+ of the sequence of commands is indicated by sending an additional zero-length message."
+
+ (let ((result-length 0)
+ (basic (channel.basic device)))
+ (typecase (device-state device)
+ (amqp.s:use-channel.body.output
+ (with-slots (out-buffer outpos max-out-pos body-position body-length) device
+ (amqp:log :debug device "device-flush (~a/~d) ~d/~d, ~d/~d"
+ (device-state device) (class-body-size basic)
+ body-position body-length outpos max-out-pos
+ )
+ (flet ((flush-frame ()
+ (let* ((frame (claim-output-frame device))
+ (length outpos))
+ ;; update start of the next frame -
+ ;; nb. here v/s when frames are written as only body/content frames contribute
+ (incf (device-frame-position device) outpos)
+ (rotatef out-buffer (frame-data frame))
+ (setf-frame-type-class-name 'amqp:body frame)
+ (setf-frame-cycle 0 frame)
+ (setf-frame-channel-number (channel-number device) frame)
+ (setf-frame-track-number (channel-track device) frame)
+ (setf-frame-size outpos frame)
+ (put-encoded-frame device frame)
+ (amqp:log :debug device "flushed frame: ~a" frame)
+ (setf outpos 0)
+ (setf max-out-pos (length out-buffer))
+ length))
+ (write-method-and-header ()
+ ;; write the initial header - also the final one if complete
+ (amqp:log :debug basic "Sending method/header on demand...")
+ (amqp::send-publish basic)
+ (send-header basic)))
+
+ (when (<= body-position max-out-pos)
+ ;; if this is the first frame
+ (when complete
+ (setf body-length body-position
+ (class-body-size basic) body-position)
+ (setf (device-state device) amqp.s:use-channel.body.output)
+ (remf (amqp:basic-headers basic) :transfer-encoding))
+ (write-method-and-header))
+
+ (cond (complete
+ (amqp:log :debug device "device-flush, complete (~a/~d) ~d/~d, ~d/~d"
+ (device-state device) (class-body-size basic)
+ body-position body-length outpos max-out-pos)
+ (setf result-length outpos)
+ (let ((pad-length (max 0 (min (- body-length body-position) (- max-out-pos outpos)))))
+ (when (> pad-length 0)
+ ;; pad the current frame
+ (fill out-buffer 0 :start outpos :end (+ outpos pad-length))
+ (incf body-position pad-length)
+ (incf outpos pad-length)
+ (amqp:log :debug device "device-flush, complete, padded frame to ~d, ~d" body-position outpos)))
+ (when (or (plusp outpos) ) ;; (zerop body-length))
+ ;; if there is content, send the frame out.
+ ;; if there is none, (no !!!always send at least one frame)
+ (flush-frame))
+ ;; send any additional frames to fill the difference between body-position and body-length
+ (do ((count (min (- body-length body-position) max-out-pos)
+ (min (- body-length body-position) max-out-pos))
+ (first t nil))
+ ((<= count 0))
+ (amqp:log :debug device "device-flush, additional pad frame (~a/~d) ~d/~d, ~d/~d"
+ (device-state device) (class-body-size basic)
+ body-position body-length outpos max-out-pos)
+ (when first (fill out-buffer 0 :start 0 :end count))
+ (setf outpos count)
+ (flush-frame)
+ (incf body-position count))
+ (amqp:log :debug device "device-flush, complete after any padding (~a/~d) ~d/~d, ~d/~d"
+ (device-state device) (class-body-size basic)
+ body-position body-length outpos max-out-pos)
+ (typecase (device-state device)
+ (amqp.s:use-channel.body.output.chunked
+ ;; if the content was chunked, and the header has already been sent,
+ ;; send a non-chunked termination message
+ (setf (device-state device) amqp.s:use-channel.body.output)
+ (let* ((headers (amqp:basic-headers basic)))
+ (remf headers :transfer-encoding)
+ (setf (amqp:basic-headers basic) headers)
+ ;; set up for a zero-length message
+ (setf outpos 0
+ max-out-pos 0
+ body-length 0
+ body-position 0)
+ (setf (class-body-size basic) 0)
+ ;; (setf (aref out-buffer 0) 0)
+ (amqp:log :debug basic "Starting end non-chunk: publish...")
+ (amqp:send-publish basic :exchange (amqp:basic-exchange basic))
+ (amqp:log :debug basic "Sending EOC message: header...")
+ (send-header basic)
+ ;; rabbitmq balks on the zero-length body frame
+ (amqp:log :debug basic "Sending EOC message: zero frame...")
+ (flush-frame)))))
+ (t
+ ;; if the output is to continue,
+ ;; - flush the frame
+ ;; - interpose a new message is chunking
+ ;; check whether there is anything to flush
+ (setf result-length outpos)
+ (when (or (plusp outpos) (zerop body-length))
+ ;; if there is content, send the frame out.
+ ;; if there is none, always send at least one frame
+ (flush-frame))
+ (setf body-position 0)))
+ result-length))))))
+
(defmethod device-clear-input ((device amqp:channel) buffer-only)
;;; call the decoder to clear a possible pushed character and correct position
@@ -848,7 +1225,7 @@ returned.")
* device-read-message (channel &rest)
* device-write-message (channel body &rest)
- Each accepts the keywords which apply to the respctive protocol operation, that is, any method arguments and the class'
+ Each accepts the keywords which apply to the respective protocol operation, that is, any method arguments and the class'
header properties. for reading this means the arguments for get and deliver, while for writing those for publish.
The interface supports two use patterns : body instances and continuation based. The decision is made by the writer
according to whether the body size is known at the outset. For fixed length vectors this is true. For aeverything else, it
@@ -880,7 +1257,7 @@ returned.")
first, prepare the channel based on the content header properties, and read the content
according to the combined channel data type and content type. Combine the header's possibly
incomplete content type with the channel's to specify the effective decoding.")
-
+
(:method ((channel amqp:channel) &key body delivery-tag redelivered exchange routing-key message-count consumer-tag)
(declare (ignore delivery-tag redelivered exchange routing-key message-count consumer-tag))
(setf (channel-state channel) amqp.s:use-channel.body.input)
@@ -892,7 +1269,8 @@ returned.")
(content-type (mime:mime-type (amqp:basic-content-type basic))))
;; element-type in the basic header combines the read values with the channel's content-type
(when element-type
- (let ((found-element-type (find-symbol element-type package)))
+ (let* ((found-package (find-package package))
+ (found-element-type (when found-package (find-symbol element-type found-package))))
(unless found-element-type
(warn "Invalid type x package combination: ~s, ~s." element-type package))
(setf element-type found-element-type)))
@@ -933,7 +1311,6 @@ returned.")
amqp.s:use-channel.body.input))
(return-from command-loop basic))))))
-
(defgeneric device-read-content-body (device type content-type)
(:method ((channel amqp:channel) (type (eql 'string)) (content-type mime:text/plain))
(let* ((body-length (device-body-length channel))
@@ -945,6 +1322,9 @@ returned.")
(:method ((channel amqp:channel) (type null) (content-type mime:text/plain))
(device-read-content-body channel 'string content-type))
+ (:method ((channel amqp:channel) (type (eql 'character)) (content-type mime:text/plain))
+ (device-read-content-body channel 'string content-type))
+
(:method ((channel amqp:channel) (body-op function) (content-type mime:*/*))
"Given a the null type, just return the channel as a stream to be read."
(prog1 (funcall body-op channel content-type)
@@ -1021,29 +1401,7 @@ returned.")
-(defgeneric device-write-content (channel body &rest args
- &key class-id weight body-size
- exchange routing-key mandatory immediate
- content-type content-encoding headers delivery-mode
- priority correlation-id reply-to expiration message-id timestamp
- type user-id)
- (:documentation "Given a channel which has sent a Basic.Publish,
- firat, write a content header based on the properties, then write the content
- according to the combined channel data type and content type. Combine the header's possibly
- incomplete content type with the channel's to specify the effective encoding.")
- (declare (dynamic-extent args))
-
- (:method ((channel amqp:channel) body &rest args)
- ;; configure the respective basic for the (content x element-type x content-type)
- ;; combination. this resolve the body size, the transfer encoding, and the
- ;; transfer element type
- (let* ((basic (apply #'device-write-content-header channel body args)))
- (setf (device-frame-position channel) 0)
- (prog1 (apply #'device-write-content-body channel body (mime:mime-type (amqp:basic-content-type basic)) args)
- (device-flush channel t)))))
-
-
-(defgeneric device-write-content-header (channel body
+(defgeneric device-initialize-content-header (channel body
&key class-id weight body-size
exchange routing-key mandatory immediate
content-type content-encoding headers delivery-mode
@@ -1051,6 +1409,7 @@ returned.")
type user-id)
(:method ((channel amqp:channel) (body t) &rest args)
+ "Send the header frame and set up the channel to continu with output as a single message or as a sequence of chunks."
(let* ((basic (apply #'amqp:channel.basic channel :body body args))
(body-size (class-body-size basic))
(headers (amqp:basic-headers basic))
@@ -1065,10 +1424,22 @@ returned.")
(if (string-equal (getf headers :transfer-encoding) "chunked")
amqp.s:use-channel.body.output.chunked
amqp.s:use-channel.body.output))
- (send-header basic)
basic)))
+(defgeneric device-write-content-header (channel body
+ &key class-id weight body-size
+ exchange routing-key mandatory immediate
+ content-type content-encoding headers delivery-mode
+ priority correlation-id reply-to expiration message-id timestamp
+ type user-id)
+
+ (:method ((channel amqp:channel) (body t) &rest args)
+ "Send the header frame and set up the channel to continu with output as a single message or as a sequence of chunks."
+ (let ((basic (apply #'device-initialize-content-header channel body args)))
+ (send-header basic)
+ basic)))
+
(defgeneric device-write-content-body (device body content-type
&key
body-size class-id consumer-tag content-type content-encoding correlation-id
@@ -1079,19 +1450,17 @@ returned.")
(:method ((channel amqp:channel) (body null) (content-type mime:*/*) &rest args)
"Given a null body , configure the channel to write the message body
- and return the stream."
+ and return the stream."
(declare (dynamic-extent args) (ignore args))
nil)
(:method ((channel amqp:channel) (body string) (content-type mime:text/plain) &rest args)
(declare (dynamic-extent args) (ignore args))
-
;; write the content,
(stream-write-string channel body 0 nil))
(:method ((channel amqp:channel) (body-op function) (content-type mime:*/*) &rest args)
(declare (dynamic-extent args) (ignore args))
-
;; call the function
(funcall body-op channel content-type))
@@ -1110,7 +1479,6 @@ returned.")
(:method ((channel amqp:channel) (body standard-object) (content-type mime::application/sexp) &rest args)
"Given an sexp mime type, set up the stream, then read the form."
(declare (dynamic-extent args) (ignore args))
-
(multiple-value-bind (creation initialization)
(make-load-form body)
(with-standard-io-syntax
@@ -1118,3 +1486,26 @@ returned.")
(let ((form `(let ((,marker ,creation))
,(subst marker body initialization))))
(write form :stream channel :circle t)))))))
+
+
+(defgeneric device-write-content (channel body &rest args
+ &key class-id weight body-size
+ exchange routing-key mandatory immediate
+ content-type content-encoding headers delivery-mode
+ priority correlation-id reply-to expiration message-id timestamp
+ type user-id)
+ (:documentation "Given a channel which has sent a Basic.Publish,
+ first, write a content header based on the properties, then write the content
+ according to the combined channel data type and content type. Combine the header's possibly
+ incomplete content type with the channel's to specify the effective encoding.")
+ (declare (dynamic-extent args))
+
+ (:method ((channel amqp:channel) body &rest args)
+ ;; configure the respective basic for the (content x element-type x content-type)
+ ;; combination. this resolve the body size, the transfer encoding, and the
+ ;; transfer element type
+ (let* ((basic ;; (apply #'device-write-content-header channel body args)))
+ (apply #'device-initialize-content-header channel body args)))
+ (setf (device-frame-position channel) 0)
+ (prog1 (apply #'device-write-content-body channel body (mime:mime-type (amqp:basic-content-type basic)) args)
+ (device-flush channel t)))))
View
9 frames.lisp
@@ -272,6 +272,11 @@
:test #'frame-matches-connection-p
:if-empty (when (or wait (stream-listen connection))
#'read-connection-frame)))))
+
+(defgeneric unget-read-frame (channel frame)
+ (:method ((channel amqp:channel) frame)
+ (undequeue (device-read-frames channel) frame)))
+
;;;
;;; frame accessors
@@ -588,7 +593,7 @@
(:documentation "Read from the connection socket into the given frame.
This varies per protocol as the header layout varies.")
-
+ ; #+amqp.log-frames
(:method :around ((connection amqp:connection) (frame t) &key start end)
(unless (open-stream-p connection)
(error 'end-of-file :stream connection))
@@ -603,7 +608,7 @@
(:documentation "Write from the given frame to the connection socket.
This varies per protocol as the header layout varies.")
- #+amqp.log-frames
+ ; #+amqp.log-frames
(:method :around ((connection amqp:connection) (frame t) &key start end)
(amqp:log :debug connection "write-frame: (~a,~a) ~s" start end frame)
(call-next-method))
View
21 utilities.lisp
@@ -45,7 +45,7 @@
(defmacro amqp:log (criteria class &rest args)
(let ((log-op (gensym)))
`(flet ((,log-op (stream)
- (format stream "~&[~/date::format-iso-time/] ~a ~a: ~@?"
+ (format stream "~&[~/date::format-iso-time/] ~a ~a: ~@?~%"
(get-universal-time) ',criteria ,class ,@args)))
(declare (dynamic-extent (function ,log-op)))
(log-when ',criteria (function ,log-op)))))
@@ -53,7 +53,7 @@
(defmacro amqp:log* (criteria class &rest args)
(let ((log-op (gensym)))
`(flet ((,log-op (stream)
- (apply #'format stream "~&[~/date::format-iso-time/] ~a ~a: ~@?"
+ (apply #'format stream "~&[~/date::format-iso-time/] ~a ~a: ~@?~%"
(get-universal-time) ',criteria ,class ,@args)))
(declare (dynamic-extent (function ,log-op)))
(log-when ',criteria (function ,log-op)))))
@@ -438,6 +438,23 @@
(call-next-method))))
+(defgeneric undequeue (queue entry)
+ (:documentation "put an entry back in the queue at the head.")
+
+ (:method ((queue queue) entry)
+ (push entry (cdr (queue-header queue)))))
+
+#+(or)
+(let ((q (make-instance 'queue)))
+ (list (enqueue 1 q)
+ (list :header (copy-list (queue-header q)) :pointer (copy-list (queue-pointer q)))
+ (enqueue 2 q)
+ (list :header (copy-list (queue-header q)) :pointer (copy-list (queue-pointer q)))
+ (enqueue 3 q)
+ (list :header (copy-list (queue-header q)) :pointer (copy-list (queue-pointer q)))
+ (undequeue q (dequeue q))
+ (list :header (copy-list (queue-header q)) :pointer (copy-list (queue-pointer q)))))
+
#+:de.setf.utility.test
(with-test-situation (:define)
Please sign in to comment.
Something went wrong with that request. Please try again.