Skip to content

Commit

Permalink
correct get operator to handle function valued body; correct the erro…
Browse files Browse the repository at this point in the history
…r message in the defualt response operator; correct state constraints in channel request-close;
  • Loading branch information
lisp committed Feb 19, 2010
1 parent dca8f18 commit 91ee393
Showing 1 changed file with 42 additions and 18 deletions.
60 changes: 42 additions & 18 deletions commands.lisp
Expand Up @@ -61,13 +61,14 @@


(defgeneric default-channel-respond-to
(channel class &key args)
(channel class &rest args)
(:documentation "the base protocol response operator for alert.")
(:method :before ((channel t) (class t) &rest args) "a before method logs the response-to-be and updates the class instance."
(declare (dynamic-extent args))
(amqp:log* default-channel-respond-to class args))
(:method ((channel amqp:channel) (class t) &rest args)
(amqp:not-implemented-error :format-string "Unimplemented method: ~s . ~s" class args)))
(amqp:not-implemented-error :message-string "Unimplemented method: ~s . ~s"
:message-arguments (list class args))))


(def-amqp-command amqp:ack (class &key delivery-tag multiple)
Expand Down Expand Up @@ -187,22 +188,27 @@ messages in between sending the cancel method and receiving the cancel-ok reply.

(let ((initial-state (shiftf (channel-state channel) amqp.s:close-channel)))
(etypecase initial-state
((or amqp.s:use-connection amqp.s:close-connection)
((or amqp.s:use-channel amqp.s:close-channel)
(when (connected-channel-p channel)
(amqp::send-close channel
:reply-code reply-code
:reply-text reply-text
:class-id class-id
:method-id method-id)
(command-loop (channel)
(amqp:header (basic) nil)
(amqp:body (basic) nil)
(amqp:close-ok (channel) (return-from command-loop)))

;; once the channel is flushed, disconnect it and optionally, close the stream
(setf (connection.channel (channel-connection channel) :number (channel-number channel)) nil)
(amqp:header (basic &rest args)
(declare (dynamic-extent args))
(amqp:log :debug basic "Draining closed channel: ~s . ~s" basic args)
nil)
(amqp:body (basic &rest args)
(declare (dynamic-extent args))
(amqp:log :debug basic "Draining closed channel: ~s . ~s" basic args)
nil)
(amqp:close-ok (channel &key &allow-other-keys) (return-from command-loop)))

;; once the channel is flushed, close the stream if that's not already in progress
(unless (typep initial-state 'amqp.s:close-channel)
(close channel)))))
(device-close channel nil)))))
channel))

(:method ((connection amqp:connection) &key (reply-code 0) (reply-text "")
Expand Down Expand Up @@ -362,13 +368,13 @@ messages in between sending the cancel method and receiving the cancel-ok reply.
(declare (dynamic-extent args))
(apply #'amqp::send-declare exchange args)
(command-loop (exchange)
(amqp:declare-ok (class &key ) (return-from command-loop)))
(amqp:declare-ok ((class amqp:exchange) &key ) (return-from command-loop)))
exchange)

(:method ((queue amqp:queue) &rest args)
(apply #'amqp::send-declare queue args)
(command-loop (queue)
(amqp:declare-ok (class &key queue message-count consumer-count)
(amqp:declare-ok ((class amqp:queue) &key queue message-count consumer-count)
(amqp:log :debug queue "queue declared: ~a ~a ~a" queue message-count consumer-count)
(return-from command-loop)))
queue)))
Expand Down Expand Up @@ -462,16 +468,34 @@ messages in between sending the cancel method and receiving the cancel-ok reply.
(:send )) ; needed for the send rsponse


(def-amqp-command amqp:get (class &key queue no-ack)
(:documentation "C-->S :")
(def-amqp-command amqp:get (object &key queue no-ack body)
(:documentation "C-->S : C:GET ( S:GET-OK content / S:GET-EMPTY )
Request the 'next' message for the given queue.
OBJECT : (or amqp:channel amqp:basic amqp:queue) : designates the queue
Resolves the given object to the queue and encodes a Basic.Get with the appropriate arguments.
Processes the responses get-ok and get-empty. If the reply is -ok invoke `device-read-content`
and return the result. If -empty, return nil.")

(:request
(:method ((basic amqp:basic) &rest args &key queue no-ack)
(:method ((channel amqp:channel) &rest args)
(declare (dynamic-extent args))
(apply #'channel-request-get channel (amqp:channel.basic channel) args))

(:method ((channel amqp:queue) &rest args &key queue no-ack body)
(declare (dynamic-extent args) (ignore no-ack body))
(apply #'channel-request-get amqp:channel (amqp:channel.basic amqp:channel)
:queue queue
args))

(:method ((basic amqp:basic) &rest args &key queue no-ack (body nil body-s))
(declare (dynamic-extent args))
(assert-argument-type amqp:get queue (or string amqp:queue))
(setf queue (amqp:queue-queue queue))
(setf (channel-acknowledge-messages (object-channel basic)) (not no-ack))

(when body-s
(setf args (copy-list args))
(remf args :body))
(apply #'amqp::send-get basic :queue queue args)

(command-case (basic)
Expand All @@ -486,7 +510,7 @@ messages in between sending the cancel method and receiving the cancel-ok reply.
(amqp:log :debug basic "respond-to-get, get-ok: ~s" get-ok-args)
(let ((channel (object-channel basic)))
(return-from command-case
(prog1 (apply #'device-read-content channel get-ok-args)
(prog1 (apply #'device-read-content channel :body body get-ok-args)
(unless (amqp:basic-no-ack basic)
(amqp::send-ack basic :delivery-tag delivery-tag))))))))))

Expand Down Expand Up @@ -525,7 +549,7 @@ messages in between sending the cancel method and receiving the cancel-ok reply.
(command-loop (class)
(amqp:open-ok (class &rest args)
(declare (dynamic-extent args))
(apply #'amqp:respond-to-open-ok class args)
(apply #'amqp::respond-to-open-ok class args)
(return-from command-loop)))
class)

Expand Down

0 comments on commit 91ee393

Please sign in to comment.