diff --git a/connection.go b/connection.go index 29d2f126..46a127f4 100644 --- a/connection.go +++ b/connection.go @@ -53,6 +53,7 @@ type Connection struct { noNotify bool // true when we will never notify again closes []chan *Error + blocks []chan Blocking errors chan *Error @@ -178,6 +179,30 @@ func (me *Connection) NotifyClose(c chan *Error) chan *Error { return c } +/* +NotifyBlock registers a listener for RabbitMQ specific TCP flow control method +extensions connection.blocked and connection.unblocked. Flow control is active +with a reason when Blocking.Blocked is true. When a Connection is blocked, all +methods will block across all connections until server resources become free +again. + +This optional extension is supported by the server when the +"connection.blocked" server capability key is true. + +*/ +func (me *Connection) NotifyBlocked(c chan Blocking) chan Blocking { + me.m.Lock() + defer me.m.Unlock() + + if me.noNotify { + close(c) + } else { + me.blocks = append(me.blocks, c) + } + + return c +} + /* Close requests and waits for the response to close the AMQP connection. @@ -262,6 +287,10 @@ func (me *Connection) shutdown(err *Error) { close(c) } + for _, c := range me.blocks { + close(c) + } + me.noNotify = true }) } @@ -288,6 +317,14 @@ func (me *Connection) dispatch0(f frame) { }) me.shutdown(newError(m.ReplyCode, m.ReplyText)) + case *connectionBlocked: + for _, c := range me.blocks { + c <- Blocking{Active: true, Reason: m.Reason} + } + case *connectionUnblocked: + for _, c := range me.blocks { + c <- Blocking{Active: false} + } default: me.rpc <- m } @@ -512,6 +549,13 @@ func (me *Connection) openTune(config Config, auth Authentication) error { ok := &connectionStartOk{ Mechanism: auth.Mechanism(), Response: auth.Response(), + ClientProperties: Table{ // Open an issue if you wish these refined/parameterizable + "product": "https://github.com/streadway/amqp", + "version": "β", + "capabilities": Table{ + "connection.blocked": true, + }, + }, } tune := &connectionTune{} diff --git a/examples_test.go b/examples_test.go index b31a9198..5cc8c73d 100644 --- a/examples_test.go +++ b/examples_test.go @@ -348,3 +348,34 @@ func ExampleChannel_Publish() { log.Fatalf("basic.publish: %v", err) } } + +func publishAllTheThings(conn *amqp.Connection) { + // ... snarf snarf, barf barf +} + +func ExampleConnection_NotifyBlocked() { + // Simply logs when the server throttles the TCP connection for publishers + + // Test this by tuning your server to have a low memory watermark: + // rabbitmqctl set_vm_memory_high_watermark 0.00000001 + + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + log.Fatalf("connection.open: %s", err) + } + defer conn.Close() + + blockings := conn.NotifyBlocked(make(chan amqp.Blocking)) + go func() { + for b := range blockings { + if b.Active { + log.Printf("TCP blocked: %q", b.Reason) + } else { + log.Printf("TCP unblocked") + } + } + }() + + // Your application domain channel setup publishings + publishAllTheThings(conn) +} diff --git a/spec/amqp0-9-1.stripped.extended.xml b/spec/amqp0-9-1.stripped.extended.xml index d5f55d03..fbddb93a 100644 --- a/spec/amqp0-9-1.stripped.extended.xml +++ b/spec/amqp0-9-1.stripped.extended.xml @@ -190,6 +190,13 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + + + + + + diff --git a/spec091.go b/spec091.go index 5b45c8db..a9538030 100644 --- a/spec091.go +++ b/spec091.go @@ -501,6 +501,57 @@ func (me *connectionCloseOk) read(r io.Reader) (err error) { return } +type connectionBlocked struct { + Reason string +} + +func (me *connectionBlocked) id() (uint16, uint16) { + return 10, 60 +} + +func (me *connectionBlocked) wait() bool { + return false +} + +func (me *connectionBlocked) write(w io.Writer) (err error) { + + if err = writeShortstr(w, me.Reason); err != nil { + return + } + + return +} + +func (me *connectionBlocked) read(r io.Reader) (err error) { + + if me.Reason, err = readShortstr(r); err != nil { + return + } + + return +} + +type connectionUnblocked struct { +} + +func (me *connectionUnblocked) id() (uint16, uint16) { + return 10, 61 +} + +func (me *connectionUnblocked) wait() bool { + return false +} + +func (me *connectionUnblocked) write(w io.Writer) (err error) { + + return +} + +func (me *connectionUnblocked) read(r io.Reader) (err error) { + + return +} + type channelOpen struct { reserved1 string } @@ -2785,6 +2836,22 @@ func (me *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err er } mf.Method = method + case 60: // connection blocked + //fmt.Println("NextMethod: class:10 method:60") + method := &connectionBlocked{} + if err = method.read(me.r); err != nil { + return + } + mf.Method = method + + case 61: // connection unblocked + //fmt.Println("NextMethod: class:10 method:61") + method := &connectionUnblocked{} + if err = method.read(me.r); err != nil { + return + } + mf.Method = method + default: return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId) } diff --git a/types.go b/types.go index 7daab8c3..b0a39ac8 100644 --- a/types.go +++ b/types.go @@ -133,6 +133,15 @@ type Publishing struct { Body []byte } +// Blocking notifies the server's TCP flow control of the Connection. When a +// server hits a memory or disk alarm it will block all connections until the +// resources are reclaimed. Use NotifyBlock on the Connection to receive these +// events. +type Blocking struct { + Active bool // TCP pushback active/inactive on server + Reason string // Server reason for activation +} + // Decimal matches the AMQP decimal type. Scale is the number of decimal // digits Scale == 2, Value == 12345, Decimal == 123.45 type Decimal struct {