|
@@ -64,7 +64,11 @@ |
|
|
|
|
|
|
|
|
|
|
|
-(defun amqp-version-p (x) (assoc x amqp.u:*version-headers*))
|
|
|
+(defun amqp-version-p (x)
|
|
|
+ "Return true iff the argument is present in a known version. This is determined by its
|
|
|
+ presence as a key in *version-headers*. That a-list is augmented by each loaded
|
|
|
+ protocol version to bind a version key to a concrete protocol header."
|
|
|
+ (assoc x amqp.u:*version-headers*))
|
|
|
|
|
|
(deftype amqp:version () '(satisfies amqp-version-p))
|
|
|
|
|
@@ -134,11 +138,6 @@ |
|
|
(protocol-version
|
|
|
:initform *default-version* :allocation :class
|
|
|
:reader class-protocol-version)
|
|
|
- #+(or ) ; supplanted by specific slots
|
|
|
- (protocol-instances
|
|
|
- :initform (make-hash-table :test 'equal)
|
|
|
- :reader class-protocol-instances
|
|
|
- :documentation "Caches methods for uses as protocol elements.")
|
|
|
(state
|
|
|
:initform nil
|
|
|
:accessor class-state)
|
|
@@ -162,7 +161,7 @@ |
|
|
:initform nil :initarg :context
|
|
|
:accessor object-context
|
|
|
:type (or null amqp:object)))
|
|
|
- (:documentation "The abstract root class for all protocol classes."))
|
|
|
+ (:documentation "The abstract root class for all AMQP protocol classes."))
|
|
|
|
|
|
|
|
|
(defclass amqp-connected-object (amqp:object)
|
|
@@ -226,10 +225,11 @@ |
|
|
|
|
|
|
|
|
(defgeneric object-connection (class)
|
|
|
- )
|
|
|
+ (:documentation "Returns the protocol object's connection."))
|
|
|
|
|
|
(defgeneric object-channel (class)
|
|
|
- )
|
|
|
+ (:documentation "Returns the protocol object's channel.
|
|
|
+ For a channel, this is the channel itself. For a connection, this is the control channel."))
|
|
|
|
|
|
|
|
|
|
|
@@ -238,7 +238,8 @@ |
|
|
|
|
|
|
|
|
(def-amqp-abstract-class amqp:access (amqp-channeled-object)
|
|
|
- ())
|
|
|
+ ()
|
|
|
+ (:documentation "The abstract access class is specialized for each protocol version."))
|
|
|
|
|
|
|
|
|
(def-amqp-abstract-class amqp:basic (amqp-channeled-object amqp-content-object)
|
|
@@ -251,7 +252,8 @@ |
|
|
:documentation "Caches the exchange from the most recent publish for re-use in chunked content.")
|
|
|
(mime-type
|
|
|
:initform nil :initarg :mime-type
|
|
|
- :accessor class-mime-type)))
|
|
|
+ :accessor class-mime-type))
|
|
|
+ (:documentation "The abstract basic class is specialized for each protocol version."))
|
|
|
|
|
|
|
|
|
(def-amqp-abstract-class amqp:channel (amqp-connected-object amqp-connection-device)
|
|
@@ -285,10 +287,6 @@ |
|
|
:initform nil
|
|
|
:documentation "Binds a handler function, with the signature (class method &rest arguments),
|
|
|
which are applied when handle-channel-methods.")
|
|
|
- #+(or) ;; needs to be specific to basic instance
|
|
|
- (exchange-instance
|
|
|
- :initform nil
|
|
|
- :reader channel-exchange :writer setf-channel-exchange)
|
|
|
(realm
|
|
|
:initform nil :initarg :realm
|
|
|
:reader amqp.u:channel-realm
|
|
@@ -351,7 +349,10 @@ |
|
|
:initform nil
|
|
|
:reader get-channel-tx :writer setf-channel-tx
|
|
|
:type (or null amqp:tx)
|
|
|
- :documentation "Caches the channel tx instance.")))
|
|
|
+ :documentation "Caches the channel tx instance."))
|
|
|
+ (:documentation "The abstract channel class is specialized for each protocol version.
|
|
|
+ Each channel is associated with a connection and identified by channel-number.
|
|
|
+ Once a channel is opened, it serves as the context for message- and stream-based operations."))
|
|
|
|
|
|
(def-amqp-abstract-class amqp:cluster (amqp-channeled-object)
|
|
|
())
|
|
@@ -439,11 +440,16 @@ |
|
|
:initform nil
|
|
|
:reader get-connection-heartbeat :writer setf-connection-heartbeat
|
|
|
:type (or null amqp:heartbeat)
|
|
|
- :documentation "Caches the connection heartbeat instance.")))
|
|
|
+ :documentation "Caches the connection heartbeat instance."))
|
|
|
+ (:documentation "The abstract connection class is specialized for each protocol version.
|
|
|
+ Each connection binds the properties negotiated with the peer broker, and a sequence of
|
|
|
+ open channels, each identified by number. Of these channel-zero is used for control operations."))
|
|
|
+
|
|
|
|
|
|
(def-amqp-abstract-class amqp:dtx (amqp-channeled-object)
|
|
|
((context
|
|
|
- :reader dtx-channel)))
|
|
|
+ :reader dtx-channel))
|
|
|
+ (:documentation "The abstract connection class is specialized for each protocol version."))
|
|
|
|
|
|
(def-amqp-abstract-class amqp:exchange (amqp-channeled-object)
|
|
|
((context
|
|
@@ -452,63 +458,82 @@ |
|
|
:initform "direct"
|
|
|
:initarg :type
|
|
|
:type string
|
|
|
- :reader amqp::exchange-type)))
|
|
|
+ :reader amqp::exchange-type))
|
|
|
+ (:documentation "The abstract exchange class is specialized for each protocol version."))
|
|
|
|
|
|
(def-amqp-abstract-class amqp:file (amqp-channeled-object)
|
|
|
((context
|
|
|
- :reader file-channel)))
|
|
|
+ :reader file-channel))
|
|
|
+ (:documentation "The abstract connection file is specialized for each protocol version."))
|
|
|
|
|
|
(def-amqp-abstract-class amqp:link (amqp-channeled-object)
|
|
|
((context
|
|
|
- :reader link-channel)))
|
|
|
+ :reader link-channel))
|
|
|
+ (:documentation "The abstract link class is specialized for each protocol version."))
|
|
|
|
|
|
(def-amqp-abstract-class amqp:queue (amqp-channeled-object)
|
|
|
((context
|
|
|
- :reader queue-channel)))
|
|
|
+ :reader queue-channel))
|
|
|
+ (:documentation "The abstract queue class is specialized for each protocol version."))
|
|
|
|
|
|
(def-amqp-abstract-class amqp:stream (amqp-channeled-object)
|
|
|
((context
|
|
|
- :reader stream-channel)))
|
|
|
+ :reader stream-channel))
|
|
|
+ (:documentation "The abstract stream class is specialized for each protocol version."))
|
|
|
|
|
|
(def-amqp-abstract-class amqp:session (amqp-channeled-object)
|
|
|
((context
|
|
|
- :reader session-channel)))
|
|
|
+ :reader session-channel))
|
|
|
+ (:documentation "The abstract session class is specialized for each protocol version."))
|
|
|
|
|
|
(def-amqp-abstract-class amqp:tx (amqp-channeled-object)
|
|
|
((context
|
|
|
- :reader tx-channel)))
|
|
|
+ :reader tx-channel))
|
|
|
+ (:documentation "The abstract tx class is specialized for each protocol version."))
|
|
|
|
|
|
(def-amqp-abstract-class amqp:test (amqp-channeled-object)
|
|
|
((context
|
|
|
- :reader test-connection))) ; ??
|
|
|
+ :reader test-connection))
|
|
|
+ (:documentation "The abstract test class is specialized for each protocol version."))
|
|
|
|
|
|
(def-amqp-abstract-class amqp:tunnel (amqp-channeled-object)
|
|
|
((context
|
|
|
- :reader tunnel-connection))) ; ??
|
|
|
+ :reader tunnel-connection))
|
|
|
+ (:documentation "The abstract tunnel class is specialized for each protocol version."))
|
|
|
|
|
|
|
|
|
-;;;
|
|
|
-;;; constructors - class and connection relative, to map abstract types
|
|
|
-;;; to version-specific classes. the primary operators (ensure-method ensure-object)
|
|
|
-;;; combine a context and a designator - either a code when parsing, or an abstract
|
|
|
-;;; name in processing functions, and produce an instance of the concrete
|
|
|
-;;; versioned class. in the case of methods, the instance never changes state, while
|
|
|
-;;; in the case of classes, each is reinitialized if supplied initargs.
|
|
|
-;;; the primary operators rely on versiour resolution operators which map between
|
|
|
-;;; class/method names and codes for the given version.
|
|
|
+(document "class and connection relative id-to-abstract-type maps"
|
|
|
+ "to version-specific classes. the primary operators (ensure-method ensure-object)
|
|
|
+ combine a context and a designator - either a code when parsing, or an abstract
|
|
|
+ name in processing functions, and produce an instance of the concrete
|
|
|
+ versioned class. in the case of methods, the instance never changes state, while
|
|
|
+ in the case of classes, each is reinitialized if supplied initargs.
|
|
|
+ the primary operators rely on versiour resolution operators which map between
|
|
|
+ class/method names and codes for the given version.")
|
|
|
+
|
|
|
|
|
|
(defgeneric connection-class-code-class-name (connection class-code)
|
|
|
+ (:documentation "Map a version-specific class id code to the abstract class name.
|
|
|
+ This is specialized for each concrete connection class.")
|
|
|
+
|
|
|
(:method ((connection amqp:connection) (code (eql 0)))
|
|
|
nil))
|
|
|
|
|
|
+
|
|
|
(defgeneric connection-class-name-class-code (connection class-name)
|
|
|
- )
|
|
|
+ (:documentation "Map an abstract class name to connection-specific class id code.
|
|
|
+ This is specialized for each concrete connection class."))
|
|
|
+
|
|
|
|
|
|
(defgeneric class-method-code-method-name (class method-code)
|
|
|
- )
|
|
|
+ (:documentation "Map a class-specific, version-specific method code to an abstract method name.
|
|
|
+ This is specialized for each concrete protocol object class."))
|
|
|
+
|
|
|
|
|
|
(defgeneric class-method-name-method-code (class method-name)
|
|
|
- )
|
|
|
+ (:documentation "Map an abstract method name to a class-specific, version-specific method code.
|
|
|
+ This is specialized for each concrete protocol object class."))
|
|
|
+
|
|
|
|
|
|
(defgeneric connection-method-code-method-name (connection class method-code)
|
|
|
(:method ((connection amqp:connection) (class null) (code (eql 0)))
|
|
@@ -565,49 +590,44 @@ |
|
|
A connection allows channels only. A channel treats the channel and connection
|
|
|
types as designating the respective instances and everything else as a
|
|
|
channel-relative singleton. All other conntected contexts delegate to their channel.")
|
|
|
- (declare (dynamic-extent initargs))
|
|
|
|
|
|
- )
|
|
|
+ (declare (dynamic-extent initargs)))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+(:documentation "class methods"
|
|
|
+ "on input, methods act as markers to permit filtering rather than calling a static function
|
|
|
+ (even dynamically rebound). the arguments are passed on the stack, but also cached for future reference
|
|
|
+ ?in the method, the class or the channel?
|
|
|
+ channel, no - since things like queue, exchange, realm input is specific to that class
|
|
|
+ class, no -a queue.bind can specify more than one exchange and a channel.publish takes exchange, and
|
|
|
+ routing.
|
|
|
+ method-scoped binding is required.
|
|
|
+ exceptions can be implemented for specific things, like basic's content type, as additional methods.
|
|
|
|
|
|
+ operators
|
|
|
+ (method-name class . args) : perform the command request. this is a useful shorthand, but
|
|
|
+ !! is not sufficient for method re-use. publish, for example, allows as arguments exchange and
|
|
|
+ !! routing-key, which are multiple-per-channel. one needs to cache them in the publish instance
|
|
|
+ !! and apply them to the channel & explicit args to generate the effective request. to accomplish this,
|
|
|
+ !! the elementary method-name operator delegates to the request- operator, which takes explicit
|
|
|
+ !! arguments or defaults them from the class. if applied to a method instance, the defaults come
|
|
|
+ !! first from the method, which then delegates to the class.
|
|
|
+ -> (REQUEST-method-name class . args) [explicitly coded]
|
|
|
+ -> (send-method-name class . args)
|
|
|
+ (class-name.class-name class . args) : make a class-scoped class [explicitly coded according to dependency]
|
|
|
+ (class-name.method-name class . args) : make a class-scoped method [in def-amqp-method]
|
|
|
+ (SEND-METHOD method class . args) : encode and send request w/ default arguments from the method
|
|
|
+ (SEND-method-name class . args) :
|
|
|
+ -> (SEND-METHOD (class-name.method-name class . designator-arg) class . args)
|
|
|
|
|
|
-;;;
|
|
|
-;;; class methods
|
|
|
-
|
|
|
-;;; on input, methods act as markers to permit filtering rather than calling a static function
|
|
|
-;;; (even dynamically rebound). the arguments are passed on the stack, but also cached for future reference
|
|
|
-;;; ?in the method, the class or the channel?
|
|
|
-;;; channel, no - since things like queue, exchange, realm input is specific to that class
|
|
|
-;;; class, no -a queue.bind can specify more than one exchange and a channel.publish takes exchange, and
|
|
|
-;;; routing.
|
|
|
-;;; method-scoped binding is required.
|
|
|
-;;; exceptions can be implemented for specific things, like basic's content type, as additional methods.
|
|
|
-;;;
|
|
|
-;;; operators
|
|
|
-;;; (method-name class . args) : perform the command request. this is a useful shorthand, but
|
|
|
-;;; !! is not sufficient for method re-use. publish, for example, allows as arguments exchange and
|
|
|
-;;; !! routing-key, which are multiple-per-channel. one needs to cache them in the publish instance
|
|
|
-;;; !! and apply them to the channel & explicit args to generate the effective request. to accomplish this,
|
|
|
-;;; !! the elementary method-name operator delegates to the request- operator, which takes explicit
|
|
|
-;;; !! arguments or defaults them from the class. if applied to a method instance, the defaults come
|
|
|
-;;; !! first from the method, which then delegates to the class.
|
|
|
-;;; -> (REQUEST-method-name class . args) [explicitly coded]
|
|
|
-;;; -> (send-method-name class . args)
|
|
|
-;;; (class-name.class-name class . args) : make a class-scoped class [explicitly coded according to dependency]
|
|
|
-;;; (class-name.method-name class . args) : make a class-scoped method [in def-amqp-method]
|
|
|
-;;; (SEND-METHOD method class . args) : encode and send request w/ default arguments from the method
|
|
|
-;;; (SEND-method-name class . args) :
|
|
|
-;;; -> (SEND-METHOD (class-name.method-name class . designator-arg) class . args)
|
|
|
-
|
|
|
-;;; eg.
|
|
|
-;;;
|
|
|
-;;; (defmethod amqp:send-publish ((class amqp:basic) &rest arguments)
|
|
|
-;;; (declare (dynamic-extent arguments))
|
|
|
-;;; (apply #'send-method (amqp:basic.publish class :exchange (getf arguments :exchange)) class arguments))
|
|
|
+ eg.
|
|
|
+
|
|
|
+ (defmethod amqp:send-publish ((class amqp:basic) &rest arguments)
|
|
|
+ (declare (dynamic-extent arguments))
|
|
|
+ (apply #'send-method (amqp:basic.publish class :exchange (getf arguments :exchange)) class arguments))")
|
|
|
|
|
|
;;;
|
|
|
;;; amqp:access
|
|
@@ -1015,6 +1035,12 @@ |
|
|
|
|
|
(defmethod shared-initialize ((instance amqp:connection) (slots t) &key
|
|
|
(threaded nil))
|
|
|
+ "provide or renew the connection state. This includes a lock, a vector to hold channels, and
|
|
|
+ an optional connection thread. Iff the trheaded argument is true, create a thread for the
|
|
|
+ connection and start a processing loop.
|
|
|
+ NB. this is likely to be called repeatedly - in particular by change class subsequent to protocol
|
|
|
+ version negotiation."
|
|
|
+
|
|
|
(with-slots (lock thread amqp-channels) instance
|
|
|
(unless (slot-boundp instance 'lock)
|
|
|
(setf lock (bt:make-lock (make-instance-tag instance))))
|
|
@@ -1031,10 +1057,14 @@ |
|
|
;; nb. may not yet be able to create channel-0, since the class may be abstract
|
|
|
(call-next-method)))
|
|
|
|
|
|
+
|
|
|
(defmethod initialize-instance ((instance amqp:connection) &rest initargs &key
|
|
|
uri
|
|
|
(remote-host (unless uri (error "uri or remote-host required")))
|
|
|
(remote-port *standard-port*))
|
|
|
+ "Provide the initial connection state. This includes the frame queues. Integrate any host/port
|
|
|
+ arguments in to an uri for use to open the connection."
|
|
|
+
|
|
|
(declare (dynamic-extent initargs))
|
|
|
(flet ((make-name (tag)
|
|
|
(with-output-to-string (ss)
|
|
@@ -1089,16 +1119,23 @@ |
|
|
0)
|
|
|
|
|
|
(defmethod connection-host ((connection amqp:connection))
|
|
|
+ "Return the host portion of the connection's uri."
|
|
|
(uri-host (connection-uri connection)))
|
|
|
|
|
|
+
|
|
|
(defmethod connection-port ((connection amqp:connection))
|
|
|
+ "Return the port portion of the connection's uri."
|
|
|
(uri-port (connection-uri connection)))
|
|
|
|
|
|
(defmethod connection-virtual-host ((connection amqp:connection))
|
|
|
+ "Return the virtual host specified when the connection was instantiated."
|
|
|
(uri-virtual-host (connection-uri connection)))
|
|
|
|
|
|
|
|
|
(defgeneric connect-channel (connection channel)
|
|
|
+ (:documentation "Bind a channel to a connection: share the connection's queues for the
|
|
|
+ channel to use to set up encoded frame to send and t pick out read froms for decoding and processing.")
|
|
|
+
|
|
|
(:method ((connection t) (channel t))
|
|
|
channel)
|
|
|
|
|
@@ -1113,7 +1150,7 @@ |
|
|
free-output-frames (device-free-output-frames connection)
|
|
|
read-frames (device-read-frames connection)
|
|
|
encoded-frames (device-encoded-frames connection))
|
|
|
- ;; flush anything from an earlier iaancarnation
|
|
|
+ ;; flush anything from an earlier incarnation
|
|
|
(loop (when (null (get-read-frame channel :wait nil)) (return)))
|
|
|
;; and initialize buffers
|
|
|
(device-initialize-buffers channel)
|
|
@@ -1309,7 +1346,7 @@ |
|
|
|
|
|
|
|
|
(defgeneric claim-output-frame (connection)
|
|
|
- (:documentation "Returns a free input frame or creates a new one.")
|
|
|
+ (:documentation "Returns a free output frame or creates a new one.")
|
|
|
|
|
|
(:method ((channel amqp:channel))
|
|
|
(flet ((make-channel-frame ()
|
|
|
0 comments on commit
9bdf42b