Permalink
Browse files

Graceful pastry close, blocking pool term, fix #14

  • Loading branch information...
1 parent be73749 commit 9ef23337eacfb2715f3366a8d6317cf19a5c2605 @karalabe karalabe committed Mar 7, 2014
View
@@ -77,12 +77,14 @@ func (t *ThreadPool) Start() {
// Waits for all threads to finish, terminating the whole pool afterwards. No
// new tasks are accepted in the meanwhile.
-func (t *ThreadPool) Terminate() {
+func (t *ThreadPool) Terminate(clear bool) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.quit = true
- t.tasks.Reset()
+ if clear {
+ t.tasks.Reset()
+ }
for t.idle < t.total {
t.done.Wait()
View
@@ -103,7 +103,7 @@ func TestThreadPool(t *testing.T) {
}
}
time.Sleep(10 * time.Millisecond)
- pool.Terminate()
+ pool.Terminate(true)
time.Sleep(150 * time.Millisecond)
if count != 3 {
t.Fatalf("unexpected finished tasks: have %v, want %v.", count, 3)
@@ -206,44 +206,50 @@ func TestClear(t *testing.T) {
func TestTerminate(t *testing.T) {
t.Parallel()
- mutex := new(sync.Mutex)
- started := 0
- workers := 32
-
- // Create the pool and schedule more work than workers
- pool := NewThreadPool(workers)
- for i := 0; i < workers*8; i++ {
- err := pool.Schedule(func() {
- mutex.Lock()
- started++
- mutex.Unlock()
-
- time.Sleep(10 * time.Millisecond)
- })
- if err != nil {
- t.Fatalf("failed to schedule task: %v.", err)
+ // Test termination with both clear flags
+ for _, clear := range []bool{false, true} {
+ mutex := new(sync.Mutex)
+ started := 0
+ workers := 32
+
+ // Create the pool and schedule more work than workers
+ pool := NewThreadPool(workers)
+ for i := 0; i < workers*8; i++ {
+ err := pool.Schedule(func() {
+ mutex.Lock()
+ started++
+ mutex.Unlock()
+
+ time.Sleep(10 * time.Millisecond)
+ })
+ if err != nil {
+ t.Fatalf("failed to schedule task: %v.", err)
+ }
}
- }
- pool.Start()
-
- // Launch a number of terminations to ensure correct blocking and no deadlocks (issue #7)
- for i := 0; i < 16; i++ {
- go pool.Terminate()
- }
- pool.Terminate() // main terminator
+ pool.Start()
- // Ensure terminate blocked until current workers have finished
- if started != workers {
- t.Fatalf("unexpected tasks started: have %d, want %d.", started, workers)
- }
- // Ensure that no more tasks can be scheduled
- if err := pool.Schedule(func() {}); err == nil {
- t.Fatalf("task scheduling succeeded, shouldn't have.")
- }
- time.Sleep(20 * time.Millisecond)
+ // Launch a number of terminations to ensure correct blocking and no deadlocks (issue #7)
+ for i := 0; i < 16; i++ {
+ go pool.Terminate(clear)
+ }
+ pool.Terminate(clear) // main terminator
- // No extra tasks should finish
- if started != workers {
- t.Fatalf("unexpected tasks started: have %d, want %d.", started, workers)
+ // Ensure terminate blocked until current workers have finished
+ if clear && started != workers {
+ t.Fatalf("unexpected tasks started: have %d, want %d.", started, workers)
+ } else if !clear && started != workers*8 {
+ t.Fatalf("task completion mismatch: have %d, want %d.", started, workers*8)
+ }
+ // Ensure that no more tasks can be scheduled
+ if err := pool.Schedule(func() {}); err == nil {
+ t.Fatalf("task scheduling succeeded, shouldn't have.")
+ }
+ // Verify whether the pool was cleared or not before termination
+ time.Sleep(20 * time.Millisecond)
+ if clear && started != workers {
+ t.Fatalf("unexpected tasks started: have %d, want %d.", started, workers)
+ } else if !clear && started != workers*8 {
+ t.Fatalf("task completion mismatch: have %d, want %d.", started, workers*8)
+ }
}
}
View
@@ -21,12 +21,13 @@ package iris
import (
"fmt"
- "github.com/karalabe/iris/config"
- "github.com/karalabe/iris/pool"
- "github.com/karalabe/iris/proto/carrier"
"sync"
"sync/atomic"
"time"
+
+ "github.com/karalabe/iris/config"
+ "github.com/karalabe/iris/pool"
+ "github.com/karalabe/iris/proto/carrier"
)
var appPrefixes []string
@@ -243,5 +244,5 @@ func (c *connection) Close() {
c.conn.Close()
// Terminate the worker pool
- c.workers.Terminate()
+ c.workers.Terminate(true)
}
View
@@ -80,11 +80,13 @@ func (h *heartbeat) Beat() {
// Implements heat.Callback.Dead, handling the event of a remote peer missing
// all its beats. The peers is reported dead and dropped.
func (h *heartbeat) Dead(id *big.Int) {
+ log.Printf("pastry: remote peer reported dead: %v.", id)
+
h.owner.lock.RLock()
- defer h.owner.lock.RUnlock()
+ dead, ok := h.owner.livePeers[id.String()]
+ h.owner.lock.RUnlock()
- log.Printf("pastry: remote peer reported dead: %v.", id)
- if p, ok := h.owner.livePeers[id.String()]; ok {
- h.owner.drop(p)
+ if ok {
+ h.owner.drop(dead)
}
}
@@ -33,6 +33,8 @@ import (
"sync"
"time"
+ "github.com/karalabe/iris/pool"
+
"github.com/karalabe/iris/config"
"github.com/karalabe/iris/ext/mathext"
"github.com/karalabe/iris/ext/sortext"
@@ -124,12 +126,15 @@ func (o *Overlay) manager() {
peerAddrs = append(peerAddrs, addr)
}
}
- // Initiate a connection to the remote peer
+ // Initiate a connection to the remote peer (make sure the lock is not lost)
pending.Add(1)
- o.authInit.Schedule(func() {
+ err := o.authInit.Schedule(func() {
defer pending.Done()
o.dial(peerAddrs)
})
+ if err == pool.ErrTerminating {
+ pending.Done()
+ }
}
// Wait till all outbound connections either complete or timeout
pending.Wait()
@@ -166,6 +171,16 @@ func (o *Overlay) manager() {
pending.Add(1)
go func(p *peer) {
defer pending.Done()
+ // Send a pastry leave to the remote node and wait
+ o.sendClose(p)
+
+ // Wait a while for remote tear-down
+ select {
+ case <-p.drop:
+ case <-time.After(time.Second):
+ log.Printf("pastry: graceful session close timed out.")
+ }
+ // Success or not, close the session
if err := p.Close(); err != nil {
log.Printf("pastry: failed to close peer during termination: %v.", err)
}
View
@@ -182,20 +182,15 @@ func (o *Overlay) Shutdown() error {
}
}
// Wait for all pending handshakes to finish
- o.authAccept.Terminate()
- o.authInit.Terminate()
-
- // Broadcast a termination to all active peers
- // TODO
- // Wait for remote peers to acknowledge overlay shutdown
- // TODO
+ o.authAccept.Terminate(false)
+ o.authInit.Terminate(false)
// Terminate the heartbeat mechanism
if err := o.heart.terminate(); err != nil {
errs = append(errs, err)
}
// Wait for all state exchanges to finish
- o.stateExch.Terminate()
+ o.stateExch.Terminate(true)
// Terminate the maintainer and all peer connections with it
o.maintQuit <- errc
@@ -20,6 +20,7 @@
package pastry
import (
+ "log"
"math/big"
"time"
@@ -90,6 +91,7 @@ type nopCallback struct {
}
func (cb *nopCallback) Deliver(msg *proto.Message, key *big.Int) {
+ log.Printf("nop callback delivered a message")
}
func (cb *nopCallback) Forward(msg *proto.Message, key *big.Int) bool {
View
@@ -55,6 +55,7 @@ type peer struct {
// Maintenance fields
quit chan chan error // Synchronizes peer termination
+ drop chan struct{} // Channel sync for remote drop on graceful tear-down
term bool // Specifies whether the peer terminated already or not
lock sync.Mutex // Lock to protect the close mechanism
}
@@ -73,6 +74,7 @@ func (o *Overlay) newPeer(ses *session.Session) *peer {
// Transport and maintenance channels
quit: make(chan chan error),
+ drop: make(chan struct{}),
}
}
@@ -166,6 +168,7 @@ func (p *peer) processor() {
}
}
// Signal the overlay of the connection drop
+ close(p.drop)
if errc == nil {
p.owner.drop(p)
}
View
@@ -141,3 +141,9 @@ func (o *Overlay) sendState(dest *peer) {
// Send the state exchange
o.sendPacket(dest, &header{Op: opExchage, Dest: dest.nodeId, State: s})
}
+
+// Assembles an overlay leave message, consisting of the close opcode and sends
+// it towards the destination.
+func (o *Overlay) sendClose(dest *peer) {
+ o.sendPacket(dest, &header{Op: opClose, Dest: dest.nodeId})
+}
View
@@ -40,16 +40,16 @@ func (o *Overlay) route(src *peer, msg *proto.Message) {
// Extract some vars for easier access
tab := o.routes
- dst := msg.Head.Meta.(*header).Dest
+ dest := msg.Head.Meta.(*header).Dest
// Check the leaf set for direct delivery
- // TODO: corner cases with if only handful of nodes
+ // TODO: corner cases with if only handful of nodes?
// TODO: binary search with idSlice could be used (worthwhile?)
- if delta(tab.leaves[0], dst).Sign() >= 0 && delta(dst, tab.leaves[len(tab.leaves)-1]).Sign() >= 0 {
+ if delta(tab.leaves[0], dest).Sign() >= 0 && delta(dest, tab.leaves[len(tab.leaves)-1]).Sign() >= 0 {
best := tab.leaves[0]
- dist := distance(best, dst)
+ dist := distance(best, dest)
for _, leaf := range tab.leaves[1:] {
- if d := distance(leaf, dst); d.Cmp(dist) < 0 {
+ if d := distance(leaf, dest); d.Cmp(dist) < 0 {
best, dist = leaf, d
}
}
@@ -62,23 +62,23 @@ func (o *Overlay) route(src *peer, msg *proto.Message) {
return
}
// Check the routing table for indirect delivery
- pre, col := prefix(o.nodeId, dst)
+ pre, col := prefix(o.nodeId, dest)
if best := tab.routes[pre][col]; best != nil {
o.forward(src, msg, best)
return
}
// Route to anybody closer than the local node
- dist := distance(o.nodeId, dst)
+ dist := distance(o.nodeId, dest)
for _, peer := range tab.leaves {
- if p, _ := prefix(peer, dst); p >= pre && distance(peer, dst).Cmp(dist) < 0 {
+ if p, _ := prefix(peer, dest); p >= pre && distance(peer, dest).Cmp(dist) < 0 {
o.forward(src, msg, peer)
return
}
}
for _, row := range tab.routes {
for _, peer := range row {
if peer != nil {
- if p, _ := prefix(peer, dst); p >= pre && distance(peer, dst).Cmp(dist) < 0 {
+ if p, _ := prefix(peer, dest); p >= pre && distance(peer, dest).Cmp(dist) < 0 {
o.forward(src, msg, peer)
return
}
@@ -198,6 +198,12 @@ func (o *Overlay) process(src *peer, head *header) {
o.exch(src, remState)
o.lock.RLock()
}
+ case opClose:
+ // Remote side requested a graceful close
+ o.lock.RUnlock()
+ o.drop(src)
+ o.lock.RLock()
+
default:
log.Printf("pastry: unknown system message: %+v", head)
}
Oops, something went wrong.

0 comments on commit 9ef2333

Please sign in to comment.