Skip to content

Commit

Permalink
Fix double task release on shutdown
Browse files Browse the repository at this point in the history
Back when we made Run's return value (done bool) in 26274e5 -- that
implicitly made Run the sole arbiter of whether or not to Release a task
or Mark it as done.

The only thing the core Consumer should do is tell handlers to Stop --
it should only ever call Release or Done after Run exits and use that
return value.

We do lose the ReleaseTask command (since now it's the same as the
StopTask command), but we've never used it and it can be added later if
it's found to be valuable.
  • Loading branch information
schmichael committed Jan 5, 2015
1 parent f46fec5 commit eb5aa7c
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 54 deletions.
16 changes: 5 additions & 11 deletions command.go
Expand Up @@ -3,11 +3,10 @@ package metafora
import "encoding/json"

const (
cmdFreeze = "freeze"
cmdUnfreeze = "unfreeze"
cmdBalance = "balance"
cmdReleaseTask = "release_task"
cmdStopTask = "stop_task"
cmdFreeze = "freeze"
cmdUnfreeze = "unfreeze"
cmdBalance = "balance"
cmdStopTask = "stop_task"
)

// Commands are a way clients can communicate directly with nodes for cluster
Expand Down Expand Up @@ -72,12 +71,7 @@ func CommandBalance() Command {
return &command{C: cmdBalance}
}

// CommandReleaseTask forces a node to stop and release a task even if frozen.
func CommandReleaseTask(task string) Command {
return &command{C: cmdReleaseTask, P: map[string]interface{}{"task": task}}
}

// CommandStopTask forces a node to stop and remove a task even if frozen.
// CommandStopTask forces a node to stop a task even if frozen.
func CommandStopTask(task string) Command {
return &command{C: cmdStopTask, P: map[string]interface{}{"task": task}}
}
1 change: 0 additions & 1 deletion command_test.go
Expand Up @@ -38,6 +38,5 @@ func TestCommands(t *testing.T) {
testCmd(t, CommandFreeze(), "freeze", nil)
testCmd(t, CommandUnfreeze(), "unfreeze", nil)
testCmd(t, CommandBalance(), "balance", nil)
testCmd(t, CommandReleaseTask("test"), "release_task", map[string]interface{}{"task": "test"})
testCmd(t, CommandStopTask("test"), "stop_task", map[string]interface{}{"task": "test"})
}
4 changes: 1 addition & 3 deletions coordinator.go
Expand Up @@ -56,7 +56,5 @@ type coordinatorContext struct {
// calling by Coordinator implementations via the CoordinatorContext interface.
func (ctx *coordinatorContext) Lost(taskID string) {
ctx.Log(LogLevelError, "Lost task %s", taskID)
if !ctx.stopTask(taskID) {
ctx.Log(LogLevelWarn, "Lost task %s wasn't running.", taskID)
}
ctx.stopTask(taskID)
}
2 changes: 1 addition & 1 deletion m_etcd/coordinator.go
Expand Up @@ -165,7 +165,7 @@ func (ec *EtcdCoordinator) upsertDir(path string, ttl uint64) {
}
}

// nodeRefresher is in chage of keeping the node entry in etcd alive. If it's
// nodeRefresher is in charge of keeping the node entry in etcd alive. If it's
// unable to communicate with etcd it must shutdown the coordinator.
//
// watch retries on errors and taskmgr calls Lost(task) on tasks it can't
Expand Down
48 changes: 11 additions & 37 deletions metafora.go
Expand Up @@ -23,9 +23,8 @@ type runningTask struct {
// handler on which Run and Stop are called
h Handler

// channel that's closed after Run() exits
c chan struct{}

// stopL serializes calls to task.h.Stop() to make handler implementations
// easier/safer.
stopL sync.Mutex
}

Expand Down Expand Up @@ -288,7 +287,7 @@ func (c *Consumer) balance() {
c.logger.Log(LogLevelInfo, "Balancer releasing: %v", tasks)
}
for _, task := range tasks {
go c.release(task)
go c.stopTask(task)
}
}

Expand All @@ -300,7 +299,7 @@ func (c *Consumer) shutdown() {

// Concurrently shutdown handlers as they may take a while to shutdown
for _, id := range tasks {
go c.release(id)
go c.stopTask(id)
}

c.logger.Log(LogLevelInfo, "Waiting for handlers to exit")
Expand Down Expand Up @@ -373,7 +372,7 @@ func (c *Consumer) claimed(taskID string) {
c.logger.Log(LogLevelWarn, "Attempted to claim already running task %s", taskID)
return
}
c.running[taskID] = runningTask{h: h, c: make(chan struct{})}
c.running[taskID] = runningTask{h: h}

// This must be done in the runL lock after the stop chan check so Shutdown
// doesn't close(stop) and start Wait()ing concurrently.
Expand Down Expand Up @@ -413,7 +412,6 @@ func (c *Consumer) runTask(run func(string) bool, task string) bool {

// **This is the only place tasks should be removed from c.running**
c.runL.Lock()
close(c.running[task].c)
delete(c.running, task)
c.runL.Unlock()
}()
Expand All @@ -422,32 +420,23 @@ func (c *Consumer) runTask(run func(string) bool, task string) bool {
return done
}

// release stops and Coordinator.Release()s a task if it's running.
//
// release blocks until the task handler stops running.
func (c *Consumer) release(taskID string) {
// Stop task...
if c.stopTask(taskID) {
// ...instruct the coordinator to release it
c.coord.Release(taskID)
}
}

// stopTask returns true if the task was running and stopped successfully.
func (c *Consumer) stopTask(taskID string) bool {
// stopTask asynchronously calls the task handlers' Stop method. While stopTask
// calls don't block, calls to task handler's Stop method are serialized with a
// lock.
func (c *Consumer) stopTask(taskID string) {
c.runL.Lock()
task, ok := c.running[taskID]
c.runL.Unlock()

if !ok {
// This can happen if a task completes during Balance() and is not an error.
c.logger.Log(LogLevelWarn, "Tried to release a non-running task: %s", taskID)
return false
return
}

// all handler methods must be wrapped in a recover to prevent a misbehaving
// handler from crashing the entire consumer
func() {
go func() {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, 50*1024)
Expand All @@ -461,12 +450,6 @@ func (c *Consumer) stopTask(taskID string) bool {
defer task.stopL.Unlock()
task.h.Stop()
}()

// Block until the handler finishes - even if it blocks indefinitely.
// Otherwise we may release a task that is still running which would allow it
// to run on multiple nodes concurrently.
<-task.c
return true
}

// Frozen returns true if Metafora is no longer watching for new tasks or
Expand Down Expand Up @@ -505,15 +488,6 @@ func (c *Consumer) handleCommand(cmd Command) {
c.logger.Log(LogLevelInfo, "Balancing due to command")
c.balance()
c.logger.Log(LogLevelDebug, "Finished balancing due to command")
case cmdReleaseTask:
taskI, ok := cmd.Parameters()["task"]
task, ok2 := taskI.(string)
if !ok || !ok2 {
c.logger.Log(LogLevelError, "Release task command didn't contain a valid task")
return
}
c.logger.Log(LogLevelInfo, "Releasing task %s due to command", task)
c.release(task)
case cmdStopTask:
taskI, ok := cmd.Parameters()["task"]
task, ok2 := taskI.(string)
Expand Down
2 changes: 1 addition & 1 deletion slowtask_test.go
Expand Up @@ -27,7 +27,7 @@ func TestDoubleRelease(t *testing.T) {
reallyStop := make(chan bool)
h := SimpleHandler(func(task string, stop <-chan bool) bool {
started <- 1
t.Logf("TestDoubleRelease handler recieved %s - blocking until reallStop closed.", task)
t.Logf("TestDoubleRelease handler recieved %s - blocking until reallyStop closed.", task)
<-reallyStop
return true
})
Expand Down

0 comments on commit eb5aa7c

Please sign in to comment.