Skip to content

Commit

Permalink
Run go fmt on the code
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Dec 22, 2022
1 parent f7b70c1 commit 16c69b3
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 193 deletions.
83 changes: 30 additions & 53 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
// +------+---------+-------------+ +------------+ +-----------+
// | type | channel | size | | payload | | frame-end |
// +------+---------+-------------+ +------------+ +-----------+
// octet short long size octets octet
//
// octet short long size octets octet
const frameHeaderSize = 1 + 2 + 4 + 1

/*
Channel represents an AMQP channel. Used as a context for valid message
exchange. Errors on methods with this Channel as a receiver means this channel
should be discarded and a new channel established.
*/
type Channel struct {
destructor sync.Once
Expand Down Expand Up @@ -460,7 +460,6 @@ Close initiate a clean channel closure by sending a close message with the error
code set to '200'.
It is safe to call this method multiple times.
*/
func (ch *Channel) Close() error {
defer ch.connection.closeChannel(ch, nil)
Expand Down Expand Up @@ -488,7 +487,6 @@ graceful close, no error will be sent.
In case of a non graceful close the error will be notified synchronously by the library
so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks
*/
func (ch *Channel) NotifyClose(c chan *Error) chan *Error {
ch.notifyM.Lock()
Expand Down Expand Up @@ -534,7 +532,6 @@ much on the same connection, all channels using that connection will suffer,
including acknowledgments from deliveries. Use different Connections if you
desire to interleave consumers and producers in the same process to avoid your
basic.ack messages from getting rate limited with your basic.publish messages.
*/
func (ch *Channel) NotifyFlow(c chan bool) chan bool {
ch.notifyM.Lock()
Expand All @@ -556,7 +553,6 @@ immediate flags.
A return struct has a copy of the Publishing along with some error
information about why the publishing failed.
*/
func (ch *Channel) NotifyReturn(c chan Return) chan Return {
ch.notifyM.Lock()
Expand All @@ -577,7 +573,6 @@ from the server when a queue is deleted or when consuming from a mirrored queue
where the master has just failed (and was moved to another node).
The subscription tag is returned to the listener.
*/
func (ch *Channel) NotifyCancel(c chan string) chan string {
ch.notifyM.Lock()
Expand Down Expand Up @@ -642,7 +637,6 @@ Channel.Close() or Connection.Close().
It is also advisable for the caller to consume from the channel returned till it is closed
to avoid possible deadlocks
*/
func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {
ch.notifyM.Lock()
Expand Down Expand Up @@ -721,7 +715,6 @@ When noWait is true, do not wait for the server to acknowledge the cancel.
Only use this when you are certain there are no deliveries in flight that
require an acknowledgment, otherwise they will arrive and be dropped in the
client without an ack, and will not be redelivered to other consumers.
*/
func (ch *Channel) Cancel(consumer string, noWait bool) error {
req := &basicCancel{
Expand Down Expand Up @@ -754,12 +747,12 @@ the type "direct" with the routing key matching the queue's name. With this
default binding, it is possible to publish messages that route directly to
this queue by publishing to "" with the routing key of the queue name.
QueueDeclare("alerts", true, false, false, false, nil)
Publish("", "alerts", false, false, Publishing{Body: []byte("...")})
QueueDeclare("alerts", true, false, false, false, nil)
Publish("", "alerts", false, false, Publishing{Body: []byte("...")})
Delivery Exchange Key Queue
-----------------------------------------------
key: alerts -> "" -> alerts -> alerts
Delivery Exchange Key Queue
-----------------------------------------------
key: alerts -> "" -> alerts -> alerts
The queue name may be empty, in which case the server will generate a unique name
which will be returned in the Name field of Queue struct.
Expand Down Expand Up @@ -795,7 +788,6 @@ or attempting to modify an existing queue from a different connection.
When the error return value is not nil, you can assume the queue could not be
declared with these parameters, and the channel will be closed.
*/
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
if err := args.Validate(); err != nil {
Expand Down Expand Up @@ -829,13 +821,11 @@ func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noW
}

/*
QueueDeclarePassive is functionally and parametrically equivalent to
QueueDeclare, except that it sets the "passive" attribute to true. A passive
queue is assumed by RabbitMQ to already exist, and attempting to connect to a
non-existent queue will cause RabbitMQ to throw an exception. This function
can be used to test for the existence of a queue.
*/
func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
if err := args.Validate(); err != nil {
Expand Down Expand Up @@ -881,7 +871,6 @@ declared with specific parameters.
If a queue by this name does not exist, an error will be returned and the
channel will be closed.
*/
func (ch *Channel) QueueInspect(name string) (Queue, error) {
req := &queueDeclare{
Expand All @@ -906,14 +895,14 @@ QueueBind binds an exchange to a queue so that publishings to the exchange will
be routed to the queue when the publishing routing key matches the binding
routing key.
QueueBind("pagers", "alert", "log", false, nil)
QueueBind("emails", "info", "log", false, nil)
QueueBind("pagers", "alert", "log", false, nil)
QueueBind("emails", "info", "log", false, nil)
Delivery Exchange Key Queue
-----------------------------------------------
key: alert --> log ----> alert --> pagers
key: info ---> log ----> info ---> emails
key: debug --> log (none) (dropped)
Delivery Exchange Key Queue
-----------------------------------------------
key: alert --> log ----> alert --> pagers
key: info ---> log ----> info ---> emails
key: debug --> log (none) (dropped)
If a binding with the same key and arguments already exists between the
exchange and queue, the attempt to rebind will be ignored and the existing
Expand All @@ -923,16 +912,16 @@ In the case that multiple bindings may cause the message to be routed to the
same queue, the server will only route the publishing once. This is possible
with topic exchanges.
QueueBind("pagers", "alert", "amq.topic", false, nil)
QueueBind("emails", "info", "amq.topic", false, nil)
QueueBind("emails", "#", "amq.topic", false, nil) // match everything
QueueBind("pagers", "alert", "amq.topic", false, nil)
QueueBind("emails", "info", "amq.topic", false, nil)
QueueBind("emails", "#", "amq.topic", false, nil) // match everything
Delivery Exchange Key Queue
-----------------------------------------------
key: alert --> amq.topic ----> alert --> pagers
key: info ---> amq.topic ----> # ------> emails
\---> info ---/
key: debug --> amq.topic ----> # ------> emails
Delivery Exchange Key Queue
-----------------------------------------------
key: alert --> amq.topic ----> alert --> pagers
key: info ---> amq.topic ----> # ------> emails
\---> info ---/
key: debug --> amq.topic ----> # ------> emails
It is only possible to bind a durable queue to a durable exchange regardless of
whether the queue or exchange is auto-deleted. Bindings between durable queues
Expand All @@ -943,7 +932,6 @@ will be closed.
When noWait is false and the queue could not be bound, the channel will be
closed with an error.
*/
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error {
if err := args.Validate(); err != nil {
Expand All @@ -968,7 +956,6 @@ arguments.
It is possible to send and empty string for the exchange name which means to
unbind the queue from the default exchange.
*/
func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error {
if err := args.Validate(); err != nil {
Expand Down Expand Up @@ -1025,7 +1012,6 @@ When noWait is true, the queue will be deleted without waiting for a response
from the server. The purged message count will not be meaningful. If the queue
could not be deleted, a channel exception will be raised and the channel will
be closed.
*/
func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error) {
req := &queueDelete{
Expand Down Expand Up @@ -1096,7 +1082,6 @@ be dropped.
When the consumer tag is cancelled, all inflight messages will be delivered until
the returned chan is closed.
*/
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
// When we return from ch.call, there may be a delivery already for the
Expand Down Expand Up @@ -1207,13 +1192,11 @@ func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, inter
}

/*
ExchangeDeclarePassive is functionally and parametrically equivalent to
ExchangeDeclare, except that it sets the "passive" attribute to true. A passive
exchange is assumed by RabbitMQ to already exist, and attempting to connect to a
non-existent exchange will cause RabbitMQ to throw an exception. This function
can be used to detect the existence of an exchange.
*/
func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
if err := args.Validate(); err != nil {
Expand Down Expand Up @@ -1276,14 +1259,14 @@ exchange even though multiple bindings will match.
Given a message delivered to the source exchange, the message will be forwarded
to the destination exchange when the routing key is matched.
ExchangeBind("sell", "MSFT", "trade", false, nil)
ExchangeBind("buy", "AAPL", "trade", false, nil)
ExchangeBind("sell", "MSFT", "trade", false, nil)
ExchangeBind("buy", "AAPL", "trade", false, nil)
Delivery Source Key Destination
example exchange exchange
-----------------------------------------------
key: AAPL --> trade ----> MSFT sell
\---> AAPL --> buy
Delivery Source Key Destination
example exchange exchange
-----------------------------------------------
key: AAPL --> trade ----> MSFT sell
\---> AAPL --> buy
When noWait is true, do not wait for the server to confirm the binding. If any
error occurs the channel will be closed. Add a listener to NotifyClose to
Expand Down Expand Up @@ -1493,7 +1476,6 @@ delivery.
When autoAck is true, the server will automatically acknowledge this message so
you don't have to. But if you are unable to fully process this message before
the channel or connection is closed, the message will not get requeued.
*/
func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error) {
req := &basicGet{Queue: queue, NoAck: autoAck}
Expand Down Expand Up @@ -1525,7 +1507,6 @@ the channel is in a transaction is not defined.
Once a channel has been put into transaction mode, it cannot be taken out of
transaction mode. Use a different channel for non-transactional semantics.
*/
func (ch *Channel) Tx() error {
return ch.call(
Expand All @@ -1539,7 +1520,6 @@ TxCommit atomically commits all publishings and acknowledgments for a single
queue and immediately start a new transaction.
Calling this method without having called Channel.Tx is an error.
*/
func (ch *Channel) TxCommit() error {
return ch.call(
Expand All @@ -1553,7 +1533,6 @@ TxRollback atomically rolls back all publishings and acknowledgments for a
single queue and immediately start a new transaction.
Calling this method without having called Channel.Tx is an error.
*/
func (ch *Channel) TxRollback() error {
return ch.call(
Expand Down Expand Up @@ -1583,7 +1562,6 @@ pause its publishings when `false` is sent on that channel.
Note: RabbitMQ prefers to use TCP push back to control flow for all channels on
a connection, so under high volume scenarios, it's wise to open separate
Connections for publishings and deliveries.
*/
func (ch *Channel) Flow(active bool) error {
return ch.call(
Expand Down Expand Up @@ -1615,7 +1593,6 @@ persisting the message if necessary.
When noWait is true, the client will not wait for a response. A channel
exception could occur if the server does not support this method.
*/
func (ch *Channel) Confirm(noWait bool) error {
if err := ch.call(
Expand Down
27 changes: 13 additions & 14 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func DialConfig(url string, config Config) (*Connection, error) {
Open accepts an already established connection, or other io.ReadWriteCloser as
a transport. Use this method if you have established a TLS connection or wish
to use your own custom transport.
*/
func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
c := &Connection{
Expand Down Expand Up @@ -331,7 +330,6 @@ so that it will be necessary to consume the Channel from the caller in order to
To reconnect after a transport or protocol error, register a listener here and
re-run your setup process.
*/
func (c *Connection) NotifyClose(receiver chan *Error) chan *Error {
c.m.Lock()
Expand All @@ -355,7 +353,6 @@ become free again.
This optional extension is supported by the server when the
"connection.blocked" server capability key is true.
*/
func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking {
c.m.Lock()
Expand Down Expand Up @@ -787,7 +784,6 @@ func (c *Connection) closeChannel(ch *Channel, e *Error) {
Channel opens a unique, concurrent server channel to process the bulk of AMQP
messages. Any error from methods on this receiver will render the receiver
invalid and a new Channel should be opened.
*/
func (c *Connection) Channel() (*Channel, error) {
return c.openChannel()
Expand Down Expand Up @@ -824,16 +820,19 @@ func (c *Connection) call(req message, res ...message) error {
return ErrCommandInvalid
}

// Connection = open-Connection *use-Connection close-Connection
// open-Connection = C:protocol-header
// S:START C:START-OK
// *challenge
// S:TUNE C:TUNE-OK
// C:OPEN S:OPEN-OK
// challenge = S:SECURE C:SECURE-OK
// use-Connection = *channel
// close-Connection = C:CLOSE S:CLOSE-OK
// / S:CLOSE C:CLOSE-OK
// Connection = open-Connection *use-Connection close-Connection
// open-Connection = C:protocol-header
//
// S:START C:START-OK
// *challenge
// S:TUNE C:TUNE-OK
// C:OPEN S:OPEN-OK
//
// challenge = S:SECURE C:SECURE-OK
// use-Connection = *channel
// close-Connection = C:CLOSE S:CLOSE-OK
//
// / S:CLOSE C:CLOSE-OK
func (c *Connection) open(config Config) error {
if err := c.send(&protocolHeader{}); err != nil {
return err
Expand Down

0 comments on commit 16c69b3

Please sign in to comment.