Skip to content

Commit

Permalink
connection.blocked & connection.unblocked
Browse files Browse the repository at this point in the history
RabbitMQ delivers this method when it activates and deactivates TCP
pushback on all connections due to a limited global resource.
  • Loading branch information
Sean Treadway committed Aug 10, 2013
1 parent 35d8da5 commit 2491854
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 0 deletions.
44 changes: 44 additions & 0 deletions connection.go
Expand Up @@ -53,6 +53,7 @@ type Connection struct {

noNotify bool // true when we will never notify again
closes []chan *Error
blocks []chan Blocking

errors chan *Error

Expand Down Expand Up @@ -178,6 +179,30 @@ func (me *Connection) NotifyClose(c chan *Error) chan *Error {
return c
}

/*
NotifyBlock registers a listener for RabbitMQ specific TCP flow control method
extensions connection.blocked and connection.unblocked. Flow control is active
with a reason when Blocking.Blocked is true. When a Connection is blocked, all
methods will block across all connections until server resources become free
again.
This optional extension is supported by the server when the
"connection.blocked" server capability key is true.
*/
func (me *Connection) NotifyBlocked(c chan Blocking) chan Blocking {
me.m.Lock()
defer me.m.Unlock()

if me.noNotify {
close(c)
} else {
me.blocks = append(me.blocks, c)
}

return c
}

/*
Close requests and waits for the response to close the AMQP connection.
Expand Down Expand Up @@ -262,6 +287,10 @@ func (me *Connection) shutdown(err *Error) {
close(c)
}

for _, c := range me.blocks {
close(c)
}

me.noNotify = true
})
}
Expand All @@ -288,6 +317,14 @@ func (me *Connection) dispatch0(f frame) {
})

me.shutdown(newError(m.ReplyCode, m.ReplyText))
case *connectionBlocked:
for _, c := range me.blocks {
c <- Blocking{Active: true, Reason: m.Reason}
}
case *connectionUnblocked:
for _, c := range me.blocks {
c <- Blocking{Active: false}
}
default:
me.rpc <- m
}
Expand Down Expand Up @@ -512,6 +549,13 @@ func (me *Connection) openTune(config Config, auth Authentication) error {
ok := &connectionStartOk{
Mechanism: auth.Mechanism(),
Response: auth.Response(),
ClientProperties: Table{ // Open an issue if you wish these refined/parameterizable
"product": "https://github.com/streadway/amqp",
"version": "β",
"capabilities": Table{
"connection.blocked": true,
},
},
}
tune := &connectionTune{}

Expand Down
31 changes: 31 additions & 0 deletions examples_test.go
Expand Up @@ -348,3 +348,34 @@ func ExampleChannel_Publish() {
log.Fatalf("basic.publish: %v", err)
}
}

func publishAllTheThings(conn *amqp.Connection) {
// ... snarf snarf, barf barf
}

func ExampleConnection_NotifyBlocked() {
// Simply logs when the server throttles the TCP connection for publishers

// Test this by tuning your server to have a low memory watermark:
// rabbitmqctl set_vm_memory_high_watermark 0.00000001

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("connection.open: %s", err)
}
defer conn.Close()

blockings := conn.NotifyBlocked(make(chan amqp.Blocking))
go func() {
for b := range blockings {
if b.Active {
log.Printf("TCP blocked: %q", b.Reason)
} else {
log.Printf("TCP unblocked")
}
}
}()

// Your application domain channel setup publishings
publishAllTheThings(conn)
}
9 changes: 9 additions & 0 deletions types.go
Expand Up @@ -132,6 +132,15 @@ type Publishing struct {
Body []byte
}

// Blocking notifies the server's TCP flow control of the Connection. When a
// server hits a memory or disk alarm it will block all connections until the
// resources are reclaimed. Use NotifyBlock on the Connection to receive these
// events.
type Blocking struct {
Active bool // TCP pushback active/inactive on server
Reason string // Server reason for activation
}

// Decimal matches the AMQP decimal type. Scale is the number of decimal
// digits Scale == 2, Value == 12345, Decimal == 123.45
type Decimal struct {
Expand Down

0 comments on commit 2491854

Please sign in to comment.