Skip to content

Commit

Permalink
Merge pull request #75 from streadway/75-connection.blocked
Browse files Browse the repository at this point in the history
Support connection.blocked (coming in RabbitMQ 3.2)
  • Loading branch information
Sean Treadway committed Oct 17, 2013
2 parents 1130bc6 + 2491854 commit 7e47df8
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 0 deletions.
44 changes: 44 additions & 0 deletions connection.go
Expand Up @@ -53,6 +53,7 @@ type Connection struct {

noNotify bool // true when we will never notify again
closes []chan *Error
blocks []chan Blocking

errors chan *Error

Expand Down Expand Up @@ -178,6 +179,30 @@ func (me *Connection) NotifyClose(c chan *Error) chan *Error {
return c
}

/*
NotifyBlock registers a listener for RabbitMQ specific TCP flow control method
extensions connection.blocked and connection.unblocked. Flow control is active
with a reason when Blocking.Blocked is true. When a Connection is blocked, all
methods will block across all connections until server resources become free
again.
This optional extension is supported by the server when the
"connection.blocked" server capability key is true.
*/
func (me *Connection) NotifyBlocked(c chan Blocking) chan Blocking {
me.m.Lock()
defer me.m.Unlock()

if me.noNotify {
close(c)
} else {
me.blocks = append(me.blocks, c)
}

return c
}

/*
Close requests and waits for the response to close the AMQP connection.
Expand Down Expand Up @@ -262,6 +287,10 @@ func (me *Connection) shutdown(err *Error) {
close(c)
}

for _, c := range me.blocks {
close(c)
}

me.noNotify = true
})
}
Expand All @@ -288,6 +317,14 @@ func (me *Connection) dispatch0(f frame) {
})

me.shutdown(newError(m.ReplyCode, m.ReplyText))
case *connectionBlocked:
for _, c := range me.blocks {
c <- Blocking{Active: true, Reason: m.Reason}
}
case *connectionUnblocked:
for _, c := range me.blocks {
c <- Blocking{Active: false}
}
default:
me.rpc <- m
}
Expand Down Expand Up @@ -512,6 +549,13 @@ func (me *Connection) openTune(config Config, auth Authentication) error {
ok := &connectionStartOk{
Mechanism: auth.Mechanism(),
Response: auth.Response(),
ClientProperties: Table{ // Open an issue if you wish these refined/parameterizable
"product": "https://github.com/streadway/amqp",
"version": "β",
"capabilities": Table{
"connection.blocked": true,
},
},
}
tune := &connectionTune{}

Expand Down
31 changes: 31 additions & 0 deletions examples_test.go
Expand Up @@ -348,3 +348,34 @@ func ExampleChannel_Publish() {
log.Fatalf("basic.publish: %v", err)
}
}

func publishAllTheThings(conn *amqp.Connection) {
// ... snarf snarf, barf barf
}

func ExampleConnection_NotifyBlocked() {
// Simply logs when the server throttles the TCP connection for publishers

// Test this by tuning your server to have a low memory watermark:
// rabbitmqctl set_vm_memory_high_watermark 0.00000001

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
defer conn.Close()

blockings := conn.NotifyBlocked(make(chan amqp.Blocking))
go func() {
for b := range blockings {
if b.Active {
log.Printf("TCP blocked: %q", b.Reason)
} else {
log.Printf("TCP unblocked")
}
}
}()

// Your application domain channel setup publishings
publishAllTheThings(conn)
}
7 changes: 7 additions & 0 deletions spec/amqp0-9-1.stripped.extended.xml
Expand Up @@ -190,6 +190,13 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
<chassis name="client" implement="MUST"/>
<chassis name="server" implement="MUST"/>
</method>
<method name="blocked" index="60">
<chassis name="server" implement="MAY"/>
<field name="reason" type="shortstr"/>
</method>
<method name="unblocked" index="61">
<chassis name="server" implement="MAY"/>
</method>
</class>
<class name="channel" handler="channel" index="20">
<chassis name="server" implement="MUST"/>
Expand Down
67 changes: 67 additions & 0 deletions spec091.go
Expand Up @@ -501,6 +501,57 @@ func (me *connectionCloseOk) read(r io.Reader) (err error) {
return
}

type connectionBlocked struct {
Reason string
}

func (me *connectionBlocked) id() (uint16, uint16) {
return 10, 60
}

func (me *connectionBlocked) wait() bool {
return false
}

func (me *connectionBlocked) write(w io.Writer) (err error) {

if err = writeShortstr(w, me.Reason); err != nil {
return
}

return
}

func (me *connectionBlocked) read(r io.Reader) (err error) {

if me.Reason, err = readShortstr(r); err != nil {
return
}

return
}

type connectionUnblocked struct {
}

func (me *connectionUnblocked) id() (uint16, uint16) {
return 10, 61
}

func (me *connectionUnblocked) wait() bool {
return false
}

func (me *connectionUnblocked) write(w io.Writer) (err error) {

return
}

func (me *connectionUnblocked) read(r io.Reader) (err error) {

return
}

type channelOpen struct {
reserved1 string
}
Expand Down Expand Up @@ -2785,6 +2836,22 @@ func (me *reader) parseMethodFrame(channel uint16, size uint32) (f frame, err er
}
mf.Method = method

case 60: // connection blocked
//fmt.Println("NextMethod: class:10 method:60")
method := &connectionBlocked{}
if err = method.read(me.r); err != nil {
return
}
mf.Method = method

case 61: // connection unblocked
//fmt.Println("NextMethod: class:10 method:61")
method := &connectionUnblocked{}
if err = method.read(me.r); err != nil {
return
}
mf.Method = method

default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
}
Expand Down
9 changes: 9 additions & 0 deletions types.go
Expand Up @@ -133,6 +133,15 @@ type Publishing struct {
Body []byte
}

// Blocking notifies the server's TCP flow control of the Connection. When a
// server hits a memory or disk alarm it will block all connections until the
// resources are reclaimed. Use NotifyBlock on the Connection to receive these
// events.
type Blocking struct {
Active bool // TCP pushback active/inactive on server
Reason string // Server reason for activation
}

// Decimal matches the AMQP decimal type. Scale is the number of decimal
// digits Scale == 2, Value == 12345, Decimal == 123.45
type Decimal struct {
Expand Down

0 comments on commit 7e47df8

Please sign in to comment.