Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

added emulation for org.nicklevine.rabbitmq.

  • Loading branch information...
commit 9e93b21a7c39d34842e00d0984a05be23c65e6ed 1 parent cc29b23
@lisp authored
View
BIN  rabbitmq/.DS_Store
Binary file not shown
View
414 rabbitmq/channel.lisp
@@ -0,0 +1,414 @@
+;;;-* Package: rabbitmq; -*-
+;; $Id: //info.ravenbrook.com/user/ndl/lisp/cl-rabbit/channel.lisp#2 $
+
+(in-package :rabbitmq)
+
+;; CHANNEL.LISP
+;; Nick Levine, Ravenbrook Limited, 2007-09-20
+;; James Anderson, setf.de, 2010-02-04
+;;
+;; 1. INTRODUCTION
+;;
+;; The purpose of this document is to implement a lisp interface to AMQP channels, consistent with the
+;; RabbitMQ API. It emulates the original com.nicklevine.rabbitmq version, which was layered over
+;; RabbitMQ/Java
+;;
+;; See Appendix C below for copyright and license.
+
+
+;; 2. OPEN & CLOSE
+
+(defun new-channel (connection)
+ (amqp:connection.channel connection :number t))
+
+
+(defmacro with-alive-channel ((channel &key (if-dead :error)) &body body)
+ (rebinding (channel)
+ `(if (channel-alive ,channel)
+ (progn ,@body)
+ ,@(case if-dead
+ ((:error)
+ `((progn (channel-not-alive ,channel)
+ ;; prevent tail call, aid debugging
+ nil)))))))
+
+(defun channel-not-alive (channel)
+ (error 'channel-not-alive :channel channel))
+
+(define-condition channel-not-alive (error)
+ ((channel :reader channel-not-alive-channel :initform nil :initarg :channel))
+ (:report (lambda (condition stream)
+ (format stream "Channel~@[ ~a~] is no longer alive"
+ (channel-not-alive-channel condition)))))
+
+(defmacro with-channel ((channel connection) &body body)
+ (rebinding (connection)
+ `(multiple-value-prog1
+ (let ((,channel (new-channel ,connection)))
+ (unwind-protect
+ (progn ,@body)
+ (destroy-channel ,channel)))
+ (check-connection-alive ,connection))))
+
+(defun destroy-channel (channel &key (code 0) (message "closed by application"))
+ (with-alive-channel (channel :if-dead nil)
+ (handler-case (amqp:request-close channel
+ :reply-code code
+ :reply-text message)
+ (channel-not-alive () ())))
+ channel)
+
+
+;; 3. CHANNEL SUBCLASS
+
+(defclass channel ()
+ ((the-consumer
+ :initform nil
+ :reader the-consumer :writer setf-the-consumer)))
+
+;;; adjust the amqp:channel class to fit with the jfli-based class
+(interpose-superclass 'channel 'amqp:channel)
+
+(defmethod channel-ticket ((channel channel))
+ (amqp.utility:channel-ticket channel))
+
+;;; define a consumer and a deilvery
+
+(defclass queueingconsumer. ()
+ ((queue :initform (make-instance 'amqp.utility:queue)
+ :reader consumer-queue)
+ (channel
+ :initform nil :initarg :channel
+ :accessor consumer-channel))
+ (:documentation "The queueing consumer interacts with the channel to accept,
+ parse and queue incoming messages"))
+
+
+(defmethod consumer-next-delivery ((consumer queueingconsumer.))
+ (unless (consumer-empty-p consumer)
+ (amqp.utility:dequeue (consumer-queue consumer))))
+
+
+;;;!!! this needs to look through the queue to see if there is any pending i/o
+;;;!!! andd , if so, read it and push it through the processing pipeline
+;;;!!! once there is nothing wating the count is up-to-date
+
+(defmethod consumer-empty-p ((consumer QueueingConsumer.))
+ (and (amqp.utility:collection-empty-p (consumer-queue consumer))
+ (let ((count 0))
+ (amqp:command-loop ((consumer-channel consumer) :wait nil)
+ (amqp:deliver ((class amqp:basic) &key &allow-other-keys)
+ ;; just count, but don't handle
+ (print (incf count))
+ ;; allow the next handler to process and queue the result
+ nil))
+ (zerop count))))
+
+
+(defmethod consumer-empty-p ((consumer null))
+ t)
+
+(defmethod consumer-arrived-count ((consumer QueueingConsumer.))
+ (if (consumer-empty-p consumer)
+ 0
+ (amqp.utility:collection-size (consumer-queue consumer))))
+
+(defmethod consumer-arrived-count ((consumer null))
+ 0)
+
+
+(defclass queueingconsumer$delivery (incoming-message)
+ ()
+ (:documentation "The delivery class holds the delivered payload."))
+
+
+
+(defmethod Channel.basicConsume ((channel AMQP-1-1-0-8-0:channel) ticket queue no-ack consumer)
+ (check-type consumer queueingconsumer.)
+ (amqp:request-consume (amqp:basic channel) :ticket ticket :no-ack no-ack :queue queue)
+ (setf (de.setf.amqp.implementation::channel-command channel 'amqp:deliver)
+ #'(lambda (channel class method &rest args)
+ (let* ((body (apply #'amqp:respond-to-deliver class args))
+ (message (make-instance 'queueingconsumer$delivery
+ :body body
+ :content-type (amqp.utility:class-mime-type class)
+ :envelope (apply #'make-envelope
+ (amqp.utility:method-arguments method))
+ :properties (amqp.utility:class-properties class)))
+ (consumer (the-consumer channel)))
+ (when consumer
+ (amqp.utility:enqueue message (consumer-queue consumer)))
+ message)))
+ (setf (consumer-channel consumer) channel)
+ (setf-the-consumer consumer channel)
+ channel)
+
+
+(defmethod Channel.basicConsume ((channel AMQP-1-1-0-9-1:channel) (ticket t) queue no-ack consumer)
+ (flet ((delivery-handler (channel class method &rest args)
+ (print :handler)
+ (let* ((body (apply #'amqp:respond-to-deliver class args))
+ (message (make-instance 'queueingconsumer$delivery
+ :body body
+ :content-type (amqp.utility:class-mime-type class)
+ :envelope (apply #'make-envelope
+ (amqp.utility:method-arguments method))
+ :properties (amqp.utility:class-properties class)))
+ (consumer (the-consumer channel)))
+ (amqp:log :error class "delivered: ~s" message)
+ (when consumer
+ (amqp.utility:enqueue message (consumer-queue consumer)))
+ message)))
+ (check-type consumer queueingconsumer.)
+ (amqp:request-consume (amqp:basic channel) :no-ack no-ack :queue queue)
+ (setf (de.setf.amqp.implementation::channel-command channel 'amqp:deliver)
+ #'delivery-handler)
+ (setf (consumer-channel consumer) channel)
+ (setf-the-consumer consumer channel)
+ channel))
+
+(defmethod Channel.basicCancel ((channel channel) consumer-tag no-wait)
+ (prog1 (amqp:request-cancel (amqp:basic channel) :consumer-tag consumer-tag :no-wait no-wait)
+ (setf (slot-value channel 'the-consumer) nil)))
+
+
+;; 4. MESSAGES
+
+(defun full-next-message (channel consumer nowait)
+ (with-alive-channel (channel)
+ (when nowait
+ (when (consumer-empty-p consumer)
+ (return-from full-next-message
+ nil)))
+ ;; TBD ?? -- shouldn't there be some waiting to do when nowait is
+ ;; false? I no longer remember my intent, and this function
+ ;; doesn't have a mirror in libamq so I can't peek at that -- NDL
+ ;; 2007-09-28
+ (let ((delivery (consumer-next-delivery consumer))
+ (basic (amqp:basic channel)))
+ (unless (amqp:basic-no-ack basic)
+ (acknowledge-delivery channel delivery))
+ delivery)))
+
+(defun next-message (channel)
+ (full-next-message channel (the-consumer channel) t))
+
+(defmethod consumer-empty-p ((channel channel))
+ (consumer-empty-p (the-consumer channel)))
+
+(defun channel-arrived-count (channel)
+ (with-alive-channel (channel)
+ (consumer-arrived-count (the-consumer channel))))
+
+(defun channel-arrived-count-or-nil (channel)
+ (let ((count (channel-arrived-count channel)))
+ (when (plusp count)
+ count)))
+
+(defun channel-wait (channel timeout)
+ (when (zerop timeout)
+ (error "~s called with zerop timeout. If you really wanted that, call ~s instead"
+ 'channel-wait 'channel-wait-forever))
+ (with-alive-channel (channel)
+ (let ((consumer (the-consumer channel)))
+ (assert consumer ()
+ "No consumer present to satisfy wait criteria: ~s." channel)
+ (let ((deadline (+ timeout (get-internal-run-time))))
+ (loop (when (or (>= (get-internal-run-time) deadline)
+ (not (consumer-empty-p consumer)))
+ (return))
+ (sleep 0.01))))))
+
+(defun channel-wait-forever (channel)
+ (with-alive-channel (channel)
+ (let ((consumer (the-consumer channel)))
+ (assert consumer ()
+ "No consumer present to satisfy wait criteria: ~s." channel)
+ (loop (unless (consumer-empty-p consumer)
+ (return))
+ (sleep 0.01)))))
+
+
+;; 5. PROPERTIES
+
+(defun channel-alive (channel)
+ (open-stream-p channel))
+
+(defun channel-consumer-count (channel)
+ (values 1
+ (list (the-consumer channel))))
+
+(defun channel-consumer-tag (channel)
+ (amqp:basic-consumer-tag (amqp:basic channel)))
+
+
+;; 6. EXCHANGE
+
+(defmacro with-exchange-type ((type) &body body)
+ `(let ((,type (ecase ,type
+ ((:fanout) "fanout") ;; ignores routing-key
+ ((:direct) "direct") ;; exact match on routing-key
+ ((:topic) "topic") ;; pattern matching on routing-key
+ )))
+ ,@body))
+
+(defgeneric full-declare-exchange (channel ticket exchange type passive durable auto-delete no-wait arguments)
+ (:method ((channel AMQP-1-1-0-9-1:channel) ticket exchange type passive durable auto-delete no-wait arguments)
+ (declare (ignore ticket auto-delete))
+ (with-alive-channel (channel)
+ (amqp:request-declare (amqp:exchange channel)
+ :exchange exchange
+ :type type
+ :passive passive
+ :durable durable
+ :no-wait no-wait
+ :arguments arguments)))
+ (:method ((channel AMQP-1-1-0-8-0:channel) ticket exchange type passive durable auto-delete no-wait arguments)
+ (declare (ignore no-wait))
+ (with-alive-channel (channel)
+ (amqp:request-declare (amqp:exchange channel)
+ :ticket ticket
+ :exchange exchange
+ :type type
+ :passive passive
+ :durable durable
+ :auto-delete auto-delete
+ :arguments arguments))))
+
+(defun declare-exchange (channel exchange type)
+ (with-exchange-type (type)
+ (full-declare-exchange channel (channel-ticket channel) exchange type nil nil nil nil nil)))
+
+(defun full-test-exchange (connection exchange type durable auto-delete arguments)
+ (with-channel (channel connection)
+ (trapping-not-found
+ (full-declare-exchange channel (channel-ticket channel) exchange type t durable auto-delete nil arguments)
+ t)))
+
+(defun test-exchange (connection exchange type)
+ (with-exchange-type (type)
+ (full-test-exchange connection exchange type nil nil nil)))
+
+
+(defgeneric full-delete-exchange (channel ticket exchange if-unused)
+ (:method ((channel AMQP-1-1-0-9-1:channel) ticket exchange if-unused)
+ (declare (ignore ticket))
+ (with-alive-channel (channel)
+ (amqp:request-delete (amqp:exchange channel) :exchange exchange :if-unused if-unused)))
+ (:method ((channel AMQP-1-1-0-8-0:channel) ticket exchange if-unused)
+ (with-alive-channel (channel)
+ (amqp:request-delete (amqp:exchange channel) :ticket ticket :exchange exchange :if-unused if-unused))))
+
+(defun delete-exchange (channel exchange)
+ (full-delete-exchange channel (channel-ticket channel) exchange nil))
+
+
+;; 7. QUEUE
+
+(defgeneric full-declare-queue (channel ticket queue passive durable exclusive auto-delete arguments)
+ (:method ((channel AMQP-1-1-0-9-1:channel) ticket queue passive durable exclusive auto-delete arguments)
+ (declare (ignore ticket))
+ (with-alive-channel (channel)
+ (amqp:request-declare (amqp:queue channel)
+ :queue queue
+ :passive passive
+ :durable durable
+ :exclusive exclusive
+ :auto-delete auto-delete
+ :arguments arguments)))
+ (:method ((channel AMQP-1-1-0-8-0:channel) ticket queue passive durable exclusive auto-delete arguments)
+ (with-alive-channel (channel)
+ (amqp:request-declare (amqp:queue channel)
+ :ticket ticket
+ :queue queue
+ :passive passive
+ :durable durable
+ :exclusive exclusive
+ :auto-delete auto-delete
+ :arguments arguments))))
+
+(defun declare-queue (channel queue)
+ (full-declare-queue channel (channel-ticket channel) queue nil nil nil nil nil))
+
+(defun full-test-queue (connection queue durable exclusive auto-delete arguments)
+ (with-channel (channel connection)
+ (trapping-not-found
+ (amqp:request-declare (amqp:queue channel)
+ :ticket (channel-ticket channel)
+ :queue queue
+ :passive t
+ :durable durable
+ :exclusive exclusive
+ :auto-delete auto-delete
+ :arguments arguments)
+ t)))
+
+(defun test-queue (connection queue)
+ (full-test-queue connection queue nil nil nil nil))
+
+(defun full-delete-queue (channel ticket queue if-unused if-empty)
+ (with-alive-channel (channel)
+ (amqp:request-delete (amqp:queue channel)
+ :ticket ticket
+ :queue queue
+ :if-unused if-unused
+ :if-empty if-empty)))
+
+(defun delete-queue (channel queue)
+ (full-delete-queue channel (channel-ticket channel) queue nil nil))
+
+
+(defgeneric full-bind-queue (channel ticket queue exchange routing-key arguments)
+ (:method ((channel AMQP-1-1-0-9-1:channel) ticket queue exchange routing-key arguments)
+ (declare (ignore ticket))
+ (with-alive-channel (channel)
+ (amqp:request-bind (amqp:queue channel)
+ :queue queue
+ :exchange exchange
+ :routing-key routing-key
+ :arguments arguments)))
+ (:method ((channel AMQP-1-1-0-8-0:channel) ticket queue exchange routing-key arguments)
+ (with-alive-channel (channel)
+ (amqp:request-bind (amqp:queue channel)
+ :ticket ticket
+ :queue queue
+ :exchange exchange
+ :routing-key routing-key
+ :arguments arguments))))
+
+(defun bind-queue (channel queue exchange routing-key)
+ (full-bind-queue channel (channel-ticket channel) queue exchange routing-key nil))
+
+
+
+;; A. REFERENCES
+;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
+;; [2] http://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.1/rabbitmq-java-client-javadoc-1.7.1/
+;;
+;; B. HISTORY
+;;
+;; 2007-09-20 NDL Created.
+;; 2010-02-04 JAA Emulation / de.setf.amqp.
+;;
+;;
+;; C. COPYRIGHT
+;;
+;; Copyright (c) 2007 Wiinz Limited.
+;; Copyright (c) 2010 james.anderson@setf.de
+;;
+;; See `rabbitmq.asd` for the license terms for the original org.levine.rabbitmq package.
+
+;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
+;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
+;;;
+;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation as version 3 of the License.
+;;;
+;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).
View
128 rabbitmq/connection.lisp
@@ -0,0 +1,128 @@
+;;;-* Package: rabbitmq; -*-
+;; $Id: //info.ravenbrook.com/user/ndl/lisp/cl-rabbit/connection.lisp#2 $
+
+(in-package :rabbitmq)
+
+;; CONNECTION.LISP
+;; Nick Levine, Ravenbrook Limited, 2007-09-20
+;; James Anderson, setf.de, 2010-02-04
+;;
+;; 1. INTRODUCTION
+;;
+;; The purpose of this document is to implement a lisp interface to AMQP connections consistent with the
+;; RabbitMQ API. It emulates the original com.nicklevine.rabbitmq version, which was layered over
+;; RabbitMQ/Java
+;;
+;; See Appendix C below for copyright and license.
+
+
+;; 2. OPEN & CLOSE
+
+(defun new-connection (host vhost &rest args
+ &key (port amqp:*standard-port*)
+ (userinfo "guest:guest")
+ &allow-other-keys)
+ (initialize-rabbitmq)
+ (apply #'make-instance 'amqp:connection
+ :uri (puri:uri :scheme :amqp :host host :port port
+ :userinfo userinfo
+ :path vhost)
+ args))
+
+
+(defmacro with-alive-connection ((connection &key (if-dead :error)) &body body)
+ (rebinding (connection)
+ `(if (connection-alive ,connection)
+ (progn ,@body)
+ ,@(case if-dead
+ ((:error)
+ `((progn (connection-not-alive ,connection)
+ ;; prevent tail call, aid debugging
+ nil)))))))
+
+(defun new-connection-parameters (vhost)
+ (declare (ignore vhost))
+ (error "new-connection-parameters: no autonomous properties are implemented."))
+
+(defun connection-not-alive (connection)
+ (error 'connection-not-alive :connection connection))
+
+(define-condition connection-not-alive (error)
+ ((connection :reader connection-not-alive-connection :initform nil :initarg :connection))
+ (:report (lambda (condition stream)
+ (format stream "Connection~@[ ~a~] is no longer alive"
+ (connection-not-alive-connection condition)))))
+
+(defun check-connection-alive (connection)
+ (with-alive-connection (connection)
+ ()))
+
+(defun destroy-connection (connection &key code message)
+ (with-alive-connection (connection :if-dead nil)
+ (handler-case
+ (amqp:request-close connection
+ :reply-code code
+ :reply-test message)
+ (connection-not-alive () ())))
+ connection)
+
+
+;; 3. PROPERTIES
+
+(defun connection-alive (connection)
+ (open-stream-p connection))
+
+(defun connection-client-property (connection property)
+ (getf (amqp:connection-client-properties connection) property))
+
+(defun connection-server-property (connection property)
+ (getf (amqp:connection-server-properties connection) property))
+
+(defun connection-server-product (connection)
+ (connection-server-property connection :product))
+
+(defun connection-server-platform (connection)
+ (connection-server-property connection :platform))
+
+(defun connection-server-version (connection)
+ (connection-server-property connection :version))
+
+(defun connection-server-copyright (connection)
+ (connection-server-property connection :copyright))
+
+(defun connection-server-info (connection)
+ (connection-server-property connection :information))
+
+
+
+;; A. REFERENCES
+;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
+;; [2] http://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.1/rabbitmq-java-client-javadoc-1.7.1/
+;;
+;; B. HISTORY
+;;
+;; 2007-09-20 NDL Created.
+;; 2010-02-04 JAA Emulation / de.setf.amqp.
+;;
+;;
+;; C. COPYRIGHT
+;;
+;; Copyright (c) 2007 Wiinz Limited.
+;; Copyright (c) 2010 james.anderson@setf.de
+;;
+;; See `rabbitmq.asd` for the license terms for the original org.levine.rabbitmq package.
+
+;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
+;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
+;;;
+;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation as version 3 of the License.
+;;;
+;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).
View
83 rabbitmq/errors.lisp
@@ -0,0 +1,83 @@
+;;;-* Package: rabbitmq; -*-
+;; $Id: //info.ravenbrook.com/user/ndl/lisp/cl-rabbit/errors.lisp#2 $
+
+(in-package :rabbitmq)
+
+;; ERRORS.LISP
+;; Nick Levine, Ravenbrook Limited, 2007-09-21
+;; James Anderson, setf.de, 2010-02-04
+;;
+;; 1. INTRODUCTION
+;;
+;; This document defines a RabbitMQ protocol class for errors and intergrates it into the de.setf.amqp
+;; error class.
+;;
+;; See Appendix C below for copyright and license.
+
+;;; define a protocol exception class for this interface and
+;;; modify the base definition to reflect it.
+
+(define-condition amqp-exception (#+jfli java-exception
+ #-jfli simple-error)
+ ())
+
+;;; adjust the error class to fit with the jfli-based class
+(interpose-superclass 'amqp-exception 'amqp:error)
+
+
+(defun call-ignoring-not-found (operator)
+ "Call the operator and suppress amqp not found exceptions.
+ Serves as the functional implementation for IGNORE-NOT-FOUND-ERRORS."
+ (declare (dynamic-extent operator))
+ (handler-bind ((amqp:not-found-error
+ (lambda (e)
+ (return-from call-ignoring-not-found
+ (values nil e)))))
+ (funcall operator)))
+
+(defmacro ignore-not-found-errors (&body body)
+ "Normal control flow returns the value(s) from the body.
+ Iff an AMQP exception is signaled with a not-found error code,
+ the error is ignored and the form returns two values, NIL and the
+ signaled exception."
+ (let ((operator (gensym (string 'ignore-not-found))))
+ `(flet ((,operator () ,@body))
+ (declare (dynamic-extent #',operator))
+ (call-ignoring-not-found #',operator))))
+
+(defmacro trapping-not-found (&body body)
+ `(ignore-not-found-errors ,@body))
+
+;; A. REFERENCES
+;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
+;; [2] http://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.1/rabbitmq-java-client-javadoc-1.7.1/
+;;
+;; B. HISTORY
+;;
+;; 2007-09-21 NDL Created.
+;; 2010-02-04 JAA Emulation / de.setf.amqp.
+;; factored out jfli dependency for portability
+;; trapping-not-found -> ignore-not-found-errors
+;;
+;;
+;; C. COPYRIGHT
+;;
+;; Copyright (c) 2007 Wiinz Limited.
+;; Copyright (c) 2010 james.anderson@setf.de
+;;
+;; See `rabbitmq.asd` for the license terms for the original org.levine.rabbitmq package.
+
+;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
+;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
+;;;
+;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation as version 3 of the License.
+;;;
+;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).
View
124 rabbitmq/examples.lisp
@@ -0,0 +1,124 @@
+;;;-* Package: rabbitmq; -*-
+
+(in-package :rabbitmq)
+
+;; EXAMPLES.LISP
+;; Nick Levine, Ravenbrook Limited, 2007-09-04
+;; James Anderson, setf.de, 2010-02-04
+
+;; 1. INTRODUCTION
+;;
+;; The purpose of this document demonstrate the RABBITMQ package.
+;;
+;; See Appendix C below for copyright and license.
+
+;; This transcript paraphrases Levine's original README, with results as per emulation.
+;; It opens a connection and a channel, declares exchange and queue, binds them and
+;; loops a message back to itself.
+
+(defparameter *my-connection* nil)
+(defparameter *my-channel* nil)
+(defparameter *outgoing-message* nil)
+(defparameter *incoming-message* nil)
+
+(setq *my-connection* (new-connection "localhost" "/"))
+;; #<AMQP-1-1-0-9-1:CLIENT-CONNECTION #x278956FE>
+
+(setq *my-channel* (new-channel *my-connection*))
+;; #<CHANNEL [#<URI amqp://localhost:5672/>].1 #x278C33FE>
+
+
+(declare-exchange *my-channel* "my exchange" :direct)
+;; #<AMQP-1-1-0-9-1:EXCHANGE #x27A3C2FE>
+
+(declare-queue *my-channel* "my queue")
+;; #<AMQP-1-1-0-9-1:QUEUE #x27C8E346>
+
+(bind-queue *my-channel* "my queue" "my exchange" "my routing key")
+;; #<AMQP-1-1-0-9-1:QUEUE #x27C8E346>
+
+
+;;; - Send a message into the void:
+(setq *outgoing-message* (new-message))
+;; #<OUTGOING-MESSAGE #x27CBAFEE>
+
+
+(setf (message-id *outgoing-message*) "42"
+ (message-body *outgoing-message*) "Hello, World")
+;; "Hello, World"
+
+
+(publish *outgoing-message* *my-channel* "my exchange" "my routing key")
+;; "Hello, World"
+
+
+;;; - And get it back again:
+(consume-queue *my-channel* "my queue")
+;; #<CHANNEL [#<URI amqp://localhost:5672/>].1 #x2B5691F6>
+
+(channel-arrived-count *my-channel*)
+;; 1
+
+(setq *incoming-message* (next-message *my-channel*))
+;; #<RABBITMQ::QUEUEINGCONSUMER$DELIVERY #x2B7F9486>
+
+(values (message-body *incoming-message*)
+ (message-id *incoming-message*))
+;; "Hello, World"
+;; ""
+
+(close *my-connection* :abort t)
+
+
+;; A. REFERENCES
+;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
+;;
+;; B. HISTORY
+;;
+;; 2007-09-21 NDL Created.
+;; 2009-02-04 james.anderson@setf.de portability
+;;
+;;
+;; C. COPYRIGHT
+;;
+;; Copyright (c) 2007 Wiinz Limited.
+;;
+;; Permission is hereby granted, free of charge, to any person
+;; obtaining a copy of this software and associated documentation
+;; files (the "Software"), to deal in the Software without
+;; restriction, including without limitation the rights to use, copy,
+;; modify, merge, publish, distribute, sublicense, and/or sell copies
+;; of the Software, and to permit persons to whom the Software is
+;; furnished to do so, subject to the following conditions:
+;;
+;; The above copyright notice and this permission notice shall be
+;; included in all copies or substantial portions of the Software.
+;;
+;; THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+;; EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+;; MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+;; NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+;; HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+;; WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+;; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+;; DEALINGS IN THE SOFTWARE.
+
+;;;
+;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
+;;; It contains examples for simple interaction with a broker.
+;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
+;;;
+;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation, as version 3 of the License.
+;;;
+;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).
+
+
+;;; 2010-02-03 [janderson](james.anderson@setf.de)
View
292 rabbitmq/message.lisp
@@ -0,0 +1,292 @@
+;;;-* Package: rabbitmq; -*-
+
+(in-package "RABBITMQ")
+
+;; MESSAGE.LISP
+;; Nick Levine, Ravenbrook Limited, 2007-09-21
+;; James Anderson, setf.de, 2010-02-04
+;;
+;; 1. INTRODUCTION
+;;
+;; This document is to implement a class to emulate RabbitMQ messages.
+;;
+;; See Appendix C below for copyright and license.
+
+
+;; 2. MESSAGE
+
+(defclass message ()
+ ((body
+ :initform nil :initarg :body
+ :accessor message-raw-body)
+ (properties
+ :initform nil :initarg :properties
+ :accessor message-properties)
+ (content-type
+ :initform nil :initarg :content-type
+ :accessor message-content-type
+ :type (or (member t nil) mime:mime-type)
+ :documentation "A MIME-TYPE instance specifies the encoding type.")
+ (envelope
+ :initform nil :initarg :envelope
+ :accessor message-envelope)))
+
+
+;;; timestamp epoch shifts -
+;;; Instance field/slot values are universal times and buffer accessors perform necessary
+;;; epoch shift integral with the access. As auch, decoded property/argument values are
+;;; universal times. If any code needs times in UNIX epoch, it shouldn't store or pass the values
+;;; through here."
+
+(defun message-timestamp (message)
+ (getf (message-properties message) :timestamp))
+
+(defun (setf message-timestamp) (new-value message)
+ (setf (getf (message-properties message) :timestamp) new-value))
+
+
+(defclass outgoing-message (message)
+ ((properties
+ :initform (new-basic-properties))))
+
+(defclass incoming-message (message)
+ ())
+
+(defclass envelope ()
+ ((exchange
+ :initform nil :initarg :exchange
+ :accessor envelope-exchange)
+ (routing-key
+ :initform nil :initarg :routing-key
+ :accessor envelope-routing-key)
+ (delivery-tag
+ :initform nil :initarg :delivery-tag
+ :accessor envelope-delivery-tag)))
+
+(defun make-envelope (&key exchange routing-key delivery-tag &allow-other-keys)
+ (make-instance 'envelope :exchange exchange
+ :routing-key routing-key
+ :delivery-tag delivery-tag))
+
+(defun new-basic-properties ()
+ `(:delivery-mode 1 :priority 0))
+
+(defun new-message (&key timestamp)
+ (let ((message (make-instance 'outgoing-message)))
+ (setf (message-timestamp message)
+ (case timestamp
+ ((t) (get-universal-time))
+ ((nil) 0)
+ (otherwise timestamp)))
+ message))
+
+
+
+;; 3. PROPERTIES
+;;
+;; TBD (if anyone ever wants them): clusterId and headers
+
+(defun message-body (message)
+ (etypecase (message-content-type message)
+ (null nil)
+ ((eql t) (message-raw-body message))
+ (mime:text/* (message-body-string message))
+ (mime:application/octet-stream (message-body-data message))))
+
+(defun (setf message-body) (new-value message)
+ (typecase new-value
+ (null
+ (setf (message-content-type message) nil)
+ (setf (message-raw-body message) nil))
+ (string
+ (setf (message-content-type message) mime:text/plain)
+ (setf (message-body-string message) new-value))
+ (simple-vector
+ (setf (message-content-type message) mime:application/octet-stream)
+ (setf (message-body-data message) new-value))
+ (otherwise
+ (setf (message-content-type message) t)
+ (setf (message-raw-body message) new-value))))
+
+
+(defun message-body-size (message)
+ (length (message-body message)))
+
+
+(defmacro def-message-property (accessor property)
+ `(progn
+ (defmethod ,accessor ((message message))
+ (getf (message-properties message) ',property))
+ (defmethod (setf ,accessor) (value (message message))
+ (setf (getf (message-properties message) ',property) value))
+ ',accessor))
+
+;; "The Basic class provides methods that support an industry-standard messaging model."
+
+(def-message-property message-id :message-id)
+(def-message-property message-application-id :app-id)
+(def-message-property message-content-encoding :content-encoding)
+(def-message-property message-correlation-id :correlation-id)
+(def-message-property message-delivery-mode :delivery-mode)
+(def-message-property message-expiration :expiration)
+(def-message-property message-reply-to :reply-to)
+(def-message-property message-priority :priority)
+(def-message-property message-type :type)
+(def-message-property message-user-id :userId)
+
+(defun message-origin (message)
+ (format nil "~a/~a"
+ (message-reply-to message)
+ (message-id message)))
+
+
+(defun message-delivery-persistent (message)
+ (eql (message-delivery-mode message) 2))
+
+(defun (setf message-delivery-persistent) (new-value message)
+ (setf (message-delivery-mode message)
+ (if new-value 2 1))
+ new-value)
+
+
+(defun message-raw-message-content-type (message)
+ (let ((type (message-content-type message)))
+ (when type (symbol-name (type-of type)))))
+
+(defun (setf message-raw-message-content-type) (type message)
+ (setf (message-content-type message)
+ (etypecase type
+ ((or null (eql t)) type)
+ (string (mime:mime-type type)))))
+
+
+
+(defun message-exchange (message)
+ (envelope-exchange (message-envelope message)))
+
+(defun message-routing-key (message)
+ (envelope-routing-key (message-envelope message)))
+
+(defun message-delivery-tag (message)
+ (envelope-delivery-tag (message-envelope message)))
+
+
+;; 4. METHODS
+
+(defgeneric message-body-string (message)
+ (:method ((message string)) message)
+ (:method ((message vector))
+ (map 'string #'code-char message))
+ (:method ((message message))
+ (message-body-string (message-raw-body message))))
+
+(defmethod (setf message-body-string) (new-value (self outgoing-message))
+ (setf (message-raw-body self)
+ (map 'vector #'char-code new-value))
+ new-value)
+
+(defun message-body-data (message &key (element-type t))
+ (let* ((body (or (message-raw-body message)
+ (return-from message-body-data
+ nil)))
+ (data (make-array (length body) :element-type element-type)))
+ (typecase data
+ (simple-string (map-into data #'(lambda (x) (code-char x)) body))
+ (t (replace data body)))
+ data))
+
+(defmethod (setf message-body-data) (new-value (self outgoing-message))
+ (let* ((length (length new-value)))
+ (setf (message-raw-body self)
+ (typecase new-value
+ (simple-vector (make-array length :element-type '(unsigned-byte 8)
+ :initial-contents new-value))
+ (simple-string (map 'vector #'char-code new-value))
+ (t (map 'vector #'(lambda (x) (assert (typep x '(unsigned-byte 8))) x)
+ new-value)))))
+ new-value)
+
+(defun message-first-byte (message)
+ (let ((raw-body (message-raw-body message)))
+ (when raw-body
+ (aref raw-body 0))))
+
+
+(defun full-publish (message channel ticket exchange routing-key mandatory immediate)
+ (declare (ignore ticket))
+ (amqp:request-publish (amqp:basic channel)
+ :exchange exchange
+ :routing-key routing-key
+ :mandatory mandatory
+ :immediate immediate
+ :body (message-body message)))
+
+(defun publish (message channel exchange routing-key)
+ (full-publish message channel (channel-ticket channel) exchange routing-key nil nil))
+
+(defun destroy-message (message)
+ (declare (ignore message))
+ nil)
+
+(defun full-consume-queue (channel ticket queue consumer-tag no-local no-ack exclusive)
+ (declare (ignore ticket))
+ (amqp:request-consume (amqp:basic channel)
+ :queue queue
+ :consumer-tag consumer-tag
+ :no-local no-local
+ :no-ack no-ack
+ :exclusive exclusive))
+
+
+(defun consume-queue (channel queue)
+ (with-alive-channel (channel)
+ (let ((consumer (make-instance 'queueingconsumer. :channel channel)))
+ ;; allow server to generate the consumerTag
+ (Channel.basicConsume channel (channel-ticket channel) queue nil consumer))))
+
+(defun full-cancel-queue (channel consumer-tag)
+ (amqp:request-cancel channel (amqp:basic channel)
+ :consumer-tag consumer-tag))
+
+(defun cancel-queue (channel &key (consumer-tag (channel-consumer-tag channel)))
+ (full-cancel-queue channel consumer-tag))
+
+
+(defun acknowledge-delivery (channel message)
+ (let ((delivery-tag (message-delivery-tag message)))
+ (amqp:request-ack (amqp:basic channel)
+ :delivery-tag delivery-tag
+ :multiple nil)))
+
+
+;; A. REFERENCES
+;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
+;; [2] http://www.rabbitmq.com/releases/rabbitmq-java-client/v1.7.1/rabbitmq-java-client-javadoc-1.7.1/
+;;
+;; B. HISTORY
+;;
+;; 2007-09-21 NDL Created.
+;; 2010-02-04 JAA Emulation / de.setf.amqp.
+;;
+;;
+;; C. COPYRIGHT
+;;
+;; Copyright (c) 2007 Wiinz Limited.
+;; Copyright (c) 2010 james.anderson@setf.de
+;;
+;; See `rabbitmq.asd` for the license terms for the original org.levine.rabbitmq package.
+
+;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
+;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
+;;;
+;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation as version 3 of the License.
+;;;
+;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).
View
51 rabbitmq/parameters.lisp
@@ -0,0 +1,51 @@
+;;;-* Package: rabbitmq; -*-
+
+(in-package :rabbitmq)
+
+;; PARAMETERS.LISP
+;; Nick Levine, Ravenbrook Limited, 2007-09-20
+;; James Anderson, setf.de, 2010-02-04
+;;
+;; 1. INTRODUCTION
+;;
+;; This document is to define the parameters used by the rabbitmq interface.
+;;
+;; See Appendix C below for copyright and license.
+
+;; 2. CONFIGURE
+;;
+;; 2.1. Run time parameters
+
+(defparameter *rabbitmq-timeout* 10) ; seconds
+
+
+;; A. REFERENCES
+;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
+;;
+;; B. HISTORY
+;;
+;; 2007-09-20 NDL Created.
+;; 2010-01-10 JAA Emulation / de.setf.amqp.
+;;
+;;
+;; C. COPYRIGHT
+;;
+;; Copyright (c) 2007 Wiinz Limited.
+;; Copyright (c) 2010 james.anderson@setf.de
+;;
+;; See `rabbitmq.asd` for the license terms for the original org.levine.rabbitmq package.
+
+;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
+;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
+;;;
+;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation as version 3 of the License.
+;;;
+;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).
View
138 rabbitmq/pkg.lisp
@@ -0,0 +1,138 @@
+;;; -*- Package: common-lisp-user; -*-
+;; $Id: //info.ravenbrook.com/user/ndl/lisp/cl-rabbit/pkg.lisp#2 $
+
+(in-package :cl-user)
+
+;; PKG.LISP
+;; Nick Levine, Ravenbrook Limited, 2007-09-04
+;; James Anderson, setf.de, 2010-02-04
+;;
+;; 1. INTRODUCTION
+;;
+;; The purpose of this document is to define the RABBITMQ package.
+;;
+;; See Appendix C below for copyright and license.
+
+
+(pushnew :rabbitmq *features*)
+
+(eval-when (:compile-toplevel :load-toplevel :execute)
+ (unless (find-package :jfli)
+ (defpackage :jfli
+ (:use :common-lisp)
+ (:intern
+ :initialize-rabbitmq))))
+
+
+(defpackage :rabbitmq
+ #+lw (:add-use-defaults t)
+ #-lw (:use :common-lisp)
+ (:import-from :jfli
+ :initialize-rabbitmq)
+ (:export
+ ;; Connection
+ :new-connection ; Open new connection with plain login as guest/guest, e.g. (new-connection "localhost" "/")
+ :destroy-connection ; Clean shut-down preferred. This will destroy any outstanding sessions
+ ;; Connection properties
+ :connection-alive ; (boolean) - nil when connection has had an error
+ :connection-server-product ; (string) - product name reported by server
+ :connection-server-version ; (string) - product version reported by server
+ :connection-server-platform ; (string) - operating system platform reported by server
+ :connection-server-copyright ; (string) - copyright notice reported by server
+ :connection-server-info ; (string) - other information reported by server
+ ;;
+ ;; Channel (aka session)
+ :new-channel ; Creates a new channel on this connection. Create as many as you like
+ :destroy-channel ; Clean shut-down for this channel
+ :next-message ; Returns next incoming message, or nil if there weren't any
+ :channel-arrived-count ; How many incoming messages are there?
+ :channel-wait ; Wait for server content, timeout given in ms (non-zero). Return value is either nil or number of messages which have arrived
+ :channel-wait-forever ; Wait for server content as above, no timeout
+ ;;
+ ;; Channel properties
+ :channel-alive ; (boolean) - nil when channel has had an error
+ ;;
+ ;; Channel properties set by server, in response to various methods
+ :channel-consumer-count ; (integer) number of consumers
+ :channel-consumer-tag ; (integer as string) consumer tag
+ ;;
+ ;; Exchange
+ :declare-exchange ; Creates an exchange with the given name, if none such already exists, and with the given type (:fanout / :direct / :topic)
+ :delete-exchange ; Deletes exchange with given name
+ :test-exchange ; Tests for existence of named exchange
+ ;;
+ ;; Queue
+ :declare-queue ; Creates a queue with the given name, if none such already exists. Note that the auto-delete flag will be set.
+ :delete-queue ; Deletes queue with given name
+ :bind-queue ; Binds named queue to exchange and routing key
+ :test-queue ; Tests for existence of named queue
+ ;;
+ ;; Message processing
+ :consume-queue ; Start a queue consumer
+ :cancel-queue ; End a queue consumer
+ :destroy-message ; Destroy a message (no-op for RabbitMQ)
+ :publish ; Publish a message
+ ;;
+ ;; Message
+ :new-message ; Creates a new message. It'll need a body-string and an ID before you publish it
+ :message-body ; [setfable] Payload of message, as vector (see below), string (ditto) or null. Reads / sets message-content-type.
+ :message-body-data ; [setfable] Payload of message as SIGNED bytes. Reader takes :element-type argument, default is t (i.e. simple-vector)
+ :message-body-string ; [setfable] Payload of message, as a simple-base-string. ** NO NULLS! JNI WILL TRUNCATE!! **
+ :message-body-size ; (integer) Size of content
+ :message-first-byte ; (byte) Peek, without having to build full sequence
+ :message-id ; [setfable] (string) Message identifier. In the examnples these are short unique strings. Is uniqueness required? I don't _think_ so.
+ :message-exchange ; (string) Exchange to which content was published (incoming messages only)
+ :message-routing-key ; (string) Original routing key specified by publisher (incoming messages only)
+ :message-application-id ; [optionally setfable] (string) ID of creating application
+ :message-content-encoding ; [optionally setfable] (string) MIME content encoding
+ :message-content-type ; [optionally setfable] MIME content type; both :string and :octets are currently supported
+ :message-correlation-id ; [optionally setfable] (string) Application correlation identifier
+ :message-expiration ; [optionally setfable] (string) Expiration specification
+ :message-reply-to ; [optionally setfable] (string) Destination to reply to
+ :message-type ; [optionally setfable] (string) Message type name
+ :message-user-id ; [optionally setfable] (string) ID of creating user
+ :message-delivery-persistent ; [setfable] (boolean) does message persist should server be restarted?
+ :message-priority ; [setfable] (integer) in the range 0...9 - it doesn't say anywhere which end of the scale gets there first
+ :message-timestamp ; [setfable] (float) univeral-time with 3 decimal places
+ :message-origin ; Defined as (format nil "~a/~a" message-reply-to message-id)
+ ;;
+ ;; Conditions
+ :amqp-exception
+ :connection-not-alive ; Signalled if a connection is found not to be alive
+ :connection-not-alive-connection ; Reader: the connection which was not alive (if known)
+ :channel-not-alive ; Signalled if a channel is found not to be alive
+ :channel-not-alive-channel ; Reader: the channel which was not alive (if known)
+ :ignore-not-found-errors
+ ))
+
+
+;; A. REFERENCES
+;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
+;;
+;; B. HISTORY
+;;
+;; 2007-09-04 NDL Created.
+;; 2010-01-10 JAA Emulation / de.setf.amqp.
+;;
+;;
+;; C. COPYRIGHT
+;;
+;; Copyright (c) 2007 Wiinz Limited.
+;; Copyright (c) 2010 james.anderson@setf.de
+;;
+;; See `rabbitmq.asd` for the license terms for the original org.levine.rabbitmq package.
+
+;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
+;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
+;;;
+;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation as version 3 of the License.
+;;;
+;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).
View
90 rabbitmq/rabbitmq.asd
@@ -0,0 +1,90 @@
+;;;-* Package: cl-user; -*-
+;; $Id: //info.ravenbrook.com/user/ndl/lisp/cl-rabbit/rabbitmq.asd#2 $
+
+(in-package :cl-user)
+
+;; RABBITMQ.ASD
+;; Nick Levine, Ravenbrook Limited, 2007-09-04
+;; James Anderson, setf.de, 2010-01-10
+;;
+;; 1. INTRODUCTION
+;;
+;; This document defines the :rabbitmq system.
+;;
+;; See Appendix C below for copyright and license.
+
+
+(asdf:defsystem :de.setf.amqp.rabbitmq
+ :description "RABBITMQ - interface to RabbitMQ"
+ :author "james anderson <james.anderson@setf.de>"
+ :depends-on (:de.setf.amqp.AMQP-1-1-0-8-0)
+ :serial t
+ :components ((:file "pkg")
+ (:file "parameters")
+ (:file "utilities")
+ (:file "rabbitmq")
+ (:file "errors")
+ (:file "connection")
+ (:file "channel")
+ (:file "message")
+ )
+ :long-description
+" `de.setf.amqp.rabbitmq` is emulates for Levine's Java-based library,
+ [rabbitmq](org.nicklevine.rabbitmq) based on the `de.setf.amqp` library module.")
+
+
+;; A. REFERENCES
+;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
+;;
+;; B. HISTORY
+;;
+;; 2007-09-04 NDL Created.
+;; 2010-02-04 JAA Emulation / de.setf.amqp.
+;;
+;;
+;; C. COPYRIGHT
+;;
+;; Copyright (c) 2007 Wiinz Limited.
+;; Copyright (c) 2010 james.anderson@setf.de
+;;
+;; The license terms for the original org.levine.rabbitmq package[1]:
+;;
+;; Permission is hereby granted, free of charge, to any person
+;; obtaining a copy of this software and associated documentation
+;; files (the "Software"), to deal in the Software without
+;; restriction, including without limitation the rights to use, copy,
+;; modify, merge, publish, distribute, sublicense, and/or sell copies
+;; of the Software, and to permit persons to whom the Software is
+;; furnished to do so, subject to the following conditions:
+;;
+;; The above copyright notice and this permission notice shall be
+;; included in all copies or substantial portions of the Software.
+;;
+;; THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+;; EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+;; MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+;; NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+;; HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+;; WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+;; OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+;; DEALINGS IN THE SOFTWARE.
+;;
+
+;;; The de.setf.amqp.rabbitmq license:
+;;;
+;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
+;;; It contains examples for simple interaction with a broker.
+;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
+;;;
+;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation as version 3 of the License.
+;;;
+;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).
+
View
59 rabbitmq/rabbitmq.lisp
@@ -0,0 +1,59 @@
+;;; -*- Package: jfli; -*-
+
+(in-package :jfli)
+
+;; RABBITMQ.LISP
+;; Nick Levine, Ravenbrook Limited, 2007-09-03
+;; James Anderson, setf.de, 2010-02-04
+;;
+;; 1. INTRODUCTION
+;;
+;; The purpose of this document is to support initial library configuration
+;;
+;; See Appendix C below for copyright and license.
+
+(defvar *rabbitmq-initialized* nil)
+
+(defun initialize-rabbitmq (&rest args &key frame-size (timeout rabbitmq::*rabbitmq-timeout*))
+ "Configure the AMQP interface in terms of
+ :FRAME-SIZE : integer : the default frame size to negotiate with brokers
+ :TIMEOUT : integer : duration, in seconds, for protocol time-outs."
+
+ (declare (ignore frame-size))
+ (unless *rabbitmq-initialized*
+ (apply #'amqp:initialize
+ :timeout timeout
+ args)
+ (setf *rabbitmq-initialized* t)))
+
+
+;; A. REFERENCES
+;; [1] [org.levine.rabbitmq](http://www.nicklevine.org/cl-rabbit/)
+;;
+;; B. HISTORY
+;;
+;; 2007-09-03 NDL Created.
+;; 2010-02-04 JAA Emulation / de.setf.amqp.
+;;
+;;
+;; C. COPYRIGHT
+;;
+;; Copyright (c) 2007 Wiinz Limited.
+;; Copyright (c) 2010 james.anderson@setf.de
+;;
+;; See `rabbitmq.asd` for the license terms for the original org.levine.rabbitmq package.
+
+;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
+;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
+;;;
+;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation as version 3 of the License.
+;;;
+;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).
View
105 rabbitmq/utilities.lisp
@@ -0,0 +1,105 @@
+;;;-* Package: rabbitmq; -*-
+
+(in-package :rabbitmq)
+
+;; UTILITIES.LISP
+;; Nick Levine, Ravenbrook Limited, 2007-09-21
+;; James Anderson, setf.de, 2010-02-04
+
+;; 1. INTRODUCTION
+;;
+;; This document collects utilities for the RabbitMQ interface.
+;;
+;; See Appendix C below for copyright and license.
+
+(defun whitespace-char-p (char)
+ (find char #(#\space #\tab #\return #\linefeed)))
+
+(defun simple-word-wrap (text &optional (start 0))
+ (let* ((ideal 72)
+ (where (+ start ideal))
+ (length (length text)))
+ (when (>= where length)
+ (return-from simple-word-wrap
+ text))
+ (loop (when (whitespace-char-p (schar text where))
+ (setf (schar text where) #\Newline)
+ (return-from simple-word-wrap
+ (simple-word-wrap text (1+ where))))
+ (when (= (decf where) start)
+ (return)))
+ (setf where (+ start ideal 1))
+ (loop (when (whitespace-char-p (schar text where))
+ (setf (schar text where) #\Newline)
+ (return
+ (simple-word-wrap text (1+ where))))
+ (when (= (incf where) length)
+ (return text)))))
+
+
+#+(or ) ;; by-hand
+(defmacro rebinding (variables . body)
+ (let ((rebindings (mapcar #'(lambda (v) (list (gensym (string v)) v)) variables)))
+ `(list 'let (list ,@(mapcar #'(lambda (b) `(list (quote ,(first b)) ,(second b))) rebindings))
+ ,@(mapcar #'(lambda (form)
+ `(let ((form ,form))
+ (loop for (new old) in (list ,@(mapcar #'(lambda (b) `(list (quote ,(first b)) ,(second b))) rebindings))
+ do (setf form (subst new old form))
+ return form)))
+ body))))
+
+(defmacro rebinding (variables . body)
+ (let ((rebindings (mapcar #'(lambda (v) (list (gensym (string v)) v)) variables)))
+ `(list 'let (list ,@(mapcar #'(lambda (b) `(list (quote ,(first b)) ,(second b))) rebindings))
+ (list 'symbol-macrolet ',(mapcar #'reverse rebindings)
+ ,@body))))
+
+#+mcl
+(defmacro defadvice ((function tag when) arglist . body)
+ `(ccl:advise ,function (apply #'(lambda ,arglist ,@body) arglist) :when ,when :name ,tag))
+
+
+(defgeneric interpose-superclass (add-class amqp-class)
+ (:method ((add-class symbol) (amqp-class t))
+ (interpose-superclass (find-class add-class) amqp-class))
+ (:method ((add-class t) (amqp-class symbol))
+ (assert (eq (symbol-package amqp-class) (find-package :amqp)) ()
+ "Permitted for protocol classes only.")
+ (interpose-superclass add-class (find-class amqp-class)))
+ (:method ((add-class class) (amqp-class class))
+ (let ((existing-supers (c2mop:class-direct-superclasses amqp-class)))
+ (unless (find add-class existing-supers)
+ (reinitialize-instance amqp-class
+ :direct-superclasses (cons add-class existing-supers))))
+ amqp-class))
+
+
+;; A. REFERENCES
+;;
+;;
+;; B. HISTORY
+;;
+;; 2007-09-21 NDL Created.
+;; 2010-02-04 james.anderson@setf.de portability
+;;
+;;
+;; C. COPYRIGHT
+;;
+;; Copyright (c) 2007 Wiinz Limited.
+;;
+;; See `rabbitmq.asd` for the license terms for the original org.levine.rabbitmq package.
+
+;;; This file is part of the `de.setf.amqp.rabbitmq` library module.
+;;; (c) 2010 [james anderson](mailto:james.anderson@setf.de)
+;;;
+;;; `de.setf.amqp.rabbitmq` is free software: you can redistribute it and/or modify
+;;; it under the terms of the GNU General Public License as published by
+;;; the Free Software Foundation as version 3 of the License.
+;;;
+;;; `de.setf.amqp.rabbitmq` is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+;;; GNU General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU General Public License
+;;; along with `de.setf.amqp.rabbitmq`. If not, see the GNU [site](http://www.gnu.org/licenses/).
Please sign in to comment.
Something went wrong with that request. Please try again.