Skip to content

Commit

Permalink
Metafora was marking tasks as Done on Shutdown, now let Run decide
Browse files Browse the repository at this point in the history
Also drop error return value as there was nothign for Metafora to do with it.
  • Loading branch information
schmichael committed Dec 12, 2014
1 parent 2a1f66a commit 26274e5
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 111 deletions.
11 changes: 5 additions & 6 deletions embedded/embedded_test.go
@@ -1,7 +1,6 @@
package embedded

import (
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -122,9 +121,9 @@ type testhandler struct {
addfunc func(r string)
}

func (th *testhandler) Run(taskId string) error {
func (th *testhandler) Run(taskId string) (done bool) {
th.addfunc(taskId)
return nil
return true
}

func (th *testhandler) Stop() {
Expand All @@ -136,14 +135,14 @@ type blockingtesthandler struct {
tc *testcounter
}

func (bh *blockingtesthandler) Run(taskId string) error {
func (bh *blockingtesthandler) Run(taskId string) (done bool) {
select {
case <-bh.stopchan:
bh.tc.Add(taskId)
case <-time.After(time.Second * 3):
return fmt.Errorf("Not stopped before three seconds")
return false
}
return nil
return true
}

func (bh *blockingtesthandler) Stop() {
Expand Down
38 changes: 13 additions & 25 deletions examples/koalemosd/handler.go
Expand Up @@ -2,7 +2,6 @@ package main

import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
Expand All @@ -17,10 +16,6 @@ import (
"github.com/lytics/metafora"
)

var (
NoArgs = errors.New("koalemosd: no args in task")
)

type shellHandler struct {
etcdc *etcd.Client
id string
Expand All @@ -30,31 +25,25 @@ type shellHandler struct {
stop bool
}

type FatalError struct {
error
}

func (*FatalError) Fatal() bool { return true }

// Run retrieves task information from etcd and executes it.
func (h *shellHandler) Run(taskID string) error {
func (h *shellHandler) Run(taskID string) (done bool) {
h.id = taskID

const sort, recurs = false, false
resp, err := h.etcdc.Get("/koalemos-tasks/"+taskID, sort, recurs)
if err != nil {
h.log("Fatal error: Failed retrieving task from etcd: %v", err)
return &FatalError{err}
return true
}

task := struct{ Args []string }{}
if err := json.Unmarshal([]byte(resp.Node.Value), &task); err != nil {
h.log("Failed to unmarshal command body: %v", err)
return err
return true
}
if len(task.Args) == 0 {
h.log("No Args in task: %s", resp.Node.Value)
return NoArgs
return true
}

cmd := exec.Command(task.Args[0], task.Args[1:]...)
Expand All @@ -63,7 +52,7 @@ func (h *shellHandler) Run(taskID string) error {
stdout, stderr, err := outFiles(taskID)
if err != nil {
h.log("Could not create log files: %v", err)
return err
return false
}
defer stdout.Close()
defer stderr.Close()
Expand All @@ -77,14 +66,14 @@ func (h *shellHandler) Run(taskID string) error {
if h.stop {
h.log("Task stopped before it even started.")
h.m.Unlock()
return nil
return false
}

h.log("Running task: %s", strings.Join(task.Args, " "))
if err := cmd.Start(); err != nil {
h.m.Unlock()
h.log("Error starting task: %v", err)
return nil // don't return the error, metafora doesn't care
return true
}
h.p = cmd.Process
h.ps = cmd.ProcessState
Expand All @@ -93,26 +82,25 @@ func (h *shellHandler) Run(taskID string) error {
h.m.Unlock()

h.log("running")
stopping := false

if err := cmd.Wait(); err != nil {
if err.(*exec.ExitError).Sys().(syscall.WaitStatus).Signal() == os.Interrupt {
stopping = true
h.log("Stopping")
} else {
// Metafora doesn't care about internal task failures, so just log it
h.log("Exited with error: %v", err)
done = true // don't retry commands that error'd
}
}

// Only delete task if we're not stopping
if !stopping {
// Only delete task if command is done
if done {
//FIXME Use CompareAndDelete
if _, err := h.etcdc.Delete("/koalemos-tasks/"+taskID, recurs); err != nil {
h.log("Error deleting task body: %v", err)
}
}
h.log("done")
return nil
h.log("done? %t", done)
return done
}

// Stop sends the Interrupt signal to the running process.
Expand Down
53 changes: 37 additions & 16 deletions handler.go
Expand Up @@ -4,28 +4,49 @@ package metafora
// Handler for each claimed task, call Run once and only once, and call Stop
// when the task should persist its progress and exit.
type Handler interface {
// Run should block until a task is complete. If it returns nil, the task is
// considered complete. If error is non-nil, ...well... log it? FIXME
Run(taskID string) error
// Run handles a task and blocks until completion or Stop is called.
//
// If Run returns true, Metafora will mark the task as Done via the
// Coordinator. The task will not be rescheduled.
//
// If Run returns false, Metafora will Release the task via the Coordinator.
// The task will be scheduled to run again.
//
// Panics are treated the same as returning true.
Run(taskID string) (done bool)

// Stop should signal to the handler to shutdown gracefully. Stop
// implementations should not block until Run exits.
// Stop signals to the handler to shutdown gracefully. Stop implementations
// should not block until Run exits.
//
// Run probably wants to return false when stop is called, but this is left
// up to the implementation as races between Run finishing and Stop being
// called can happen.
Stop()
}

// HandlerFunc is called by the Consumer to create a new Handler for each task.
type HandlerFunc func() Handler

// FatalError is a custom error interface Handlers may choose to return from
// their Run methods in order to indicate to Metafora that the task has failed
// and should not be rescheduled.
//
// If an error is returned by Run that does not implement this interface, or
// Fatal() returns false, the task will be rescheduled.
type FatalError interface {
error
// SimpleHander creates a HandlerFunc for a simple function that accepts a stop
// channel. The channel will be closed when Stop is called.
func SimpleHandler(f func(task string, stop <-chan bool) bool) HandlerFunc {
return func() Handler {
return &simpleHandler{
stop: make(chan bool),
f: f,
}
}
}

type simpleHandler struct {
stop chan bool
f func(string, <-chan bool) bool
}

func (h *simpleHandler) Run(task string) bool {
return h.f(task, h.stop)
}

// Fatal returns true when an error is unrecoverable and should not be
// rescheduled.
Fatal() bool
func (h *simpleHandler) Stop() {
close(h.stop)
}
48 changes: 27 additions & 21 deletions metafora.go
Expand Up @@ -368,17 +368,7 @@ func (c *Consumer) claimed(taskID string) {

// Start handler in its own goroutine
go func() {
c.logger.Log(LogLevelInfo, "Task started: %s", taskID)
defer c.logger.Log(LogLevelInfo, "Task exited: %s", taskID)
defer func() {
if err := recover(); err != nil {
stack := make([]byte, 50*1024)
sz := runtime.Stack(stack, false)
c.logger.Log(LogLevelError, "Handler %s panic()'d: %v\n%s", taskID, err, stack[:sz])
// panics are considered fatal errors. Make sure the task isn't
// rescheduled.
c.coord.Done(taskID)
}
// **This is the only place tasks should be removed from c.running**
c.runL.Lock()
close(c.running[taskID].c)
Expand All @@ -388,21 +378,37 @@ func (c *Consumer) claimed(taskID string) {
}()

// Run the task
c.logger.Log(LogLevelDebug, "Calling run for task %s", taskID)
if err := h.Run(taskID); err != nil {
if ferr, ok := err.(FatalError); ok && ferr.Fatal() {
c.logger.Log(LogLevelError, "Handler for %s exited with fatal error: %v", taskID, err)
} else {
c.logger.Log(LogLevelError, "Handler for %s exited with error: %v", taskID, err)
// error was non-fatal, release and let another node try
c.coord.Release(taskID)
return
}
c.logger.Log(LogLevelInfo, "Task started: %s", taskID)
done := c.runTask(h.Run, taskID)
if done {
c.logger.Log(LogLevelInfo, "Task exited: %s (marking done)", taskID)
c.coord.Done(taskID)
} else {
c.logger.Log(LogLevelInfo, "Task exited: %s (releasing)", taskID)
c.coord.Release(taskID)
}
c.coord.Done(taskID)
}()
}

// runTask executes a handler's Run method and recovers from panic()s.
func (c *Consumer) runTask(run func(string) bool, task string) bool {
done := false
func() {
defer func() {
if err := recover(); err != nil {
stack := make([]byte, 50*1024)
sz := runtime.Stack(stack, false)
c.logger.Log(LogLevelError, "Handler %s panic()'d: %v\n%s", task, err, stack[:sz])
// panics are considered fatal errors. Make sure the task isn't
// rescheduled.
done = true
}
}()
done = run(task)
}()
return done
}

// release stops and Coordinator.Release()s a task if it's running.
//
// release blocks until the task handler stops running.
Expand Down

0 comments on commit 26274e5

Please sign in to comment.