From 26274e5ede853091e715c02fbaf39a0d5aa87787 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 11 Dec 2014 16:18:11 -0800 Subject: [PATCH] Metafora was marking tasks as Done on Shutdown, now let Run decide Also drop error return value as there was nothign for Metafora to do with it. --- embedded/embedded_test.go | 11 ++-- examples/koalemosd/handler.go | 38 +++++--------- handler.go | 53 ++++++++++++++------ metafora.go | 48 ++++++++++-------- metafora_test.go | 94 +++++++++++++++++++---------------- 5 files changed, 133 insertions(+), 111 deletions(-) diff --git a/embedded/embedded_test.go b/embedded/embedded_test.go index a9a1323..323b039 100644 --- a/embedded/embedded_test.go +++ b/embedded/embedded_test.go @@ -1,7 +1,6 @@ package embedded import ( - "fmt" "sync" "testing" "time" @@ -122,9 +121,9 @@ type testhandler struct { addfunc func(r string) } -func (th *testhandler) Run(taskId string) error { +func (th *testhandler) Run(taskId string) (done bool) { th.addfunc(taskId) - return nil + return true } func (th *testhandler) Stop() { @@ -136,14 +135,14 @@ type blockingtesthandler struct { tc *testcounter } -func (bh *blockingtesthandler) Run(taskId string) error { +func (bh *blockingtesthandler) Run(taskId string) (done bool) { select { case <-bh.stopchan: bh.tc.Add(taskId) case <-time.After(time.Second * 3): - return fmt.Errorf("Not stopped before three seconds") + return false } - return nil + return true } func (bh *blockingtesthandler) Stop() { diff --git a/examples/koalemosd/handler.go b/examples/koalemosd/handler.go index b55557f..69fed38 100644 --- a/examples/koalemosd/handler.go +++ b/examples/koalemosd/handler.go @@ -2,7 +2,6 @@ package main import ( "encoding/json" - "errors" "fmt" "io" "log" @@ -17,10 +16,6 @@ import ( "github.com/lytics/metafora" ) -var ( - NoArgs = errors.New("koalemosd: no args in task") -) - type shellHandler struct { etcdc *etcd.Client id string @@ -30,31 +25,25 @@ type shellHandler struct { stop bool } -type FatalError struct { - error -} - -func (*FatalError) Fatal() bool { return true } - // Run retrieves task information from etcd and executes it. -func (h *shellHandler) Run(taskID string) error { +func (h *shellHandler) Run(taskID string) (done bool) { h.id = taskID const sort, recurs = false, false resp, err := h.etcdc.Get("/koalemos-tasks/"+taskID, sort, recurs) if err != nil { h.log("Fatal error: Failed retrieving task from etcd: %v", err) - return &FatalError{err} + return true } task := struct{ Args []string }{} if err := json.Unmarshal([]byte(resp.Node.Value), &task); err != nil { h.log("Failed to unmarshal command body: %v", err) - return err + return true } if len(task.Args) == 0 { h.log("No Args in task: %s", resp.Node.Value) - return NoArgs + return true } cmd := exec.Command(task.Args[0], task.Args[1:]...) @@ -63,7 +52,7 @@ func (h *shellHandler) Run(taskID string) error { stdout, stderr, err := outFiles(taskID) if err != nil { h.log("Could not create log files: %v", err) - return err + return false } defer stdout.Close() defer stderr.Close() @@ -77,14 +66,14 @@ func (h *shellHandler) Run(taskID string) error { if h.stop { h.log("Task stopped before it even started.") h.m.Unlock() - return nil + return false } h.log("Running task: %s", strings.Join(task.Args, " ")) if err := cmd.Start(); err != nil { h.m.Unlock() h.log("Error starting task: %v", err) - return nil // don't return the error, metafora doesn't care + return true } h.p = cmd.Process h.ps = cmd.ProcessState @@ -93,26 +82,25 @@ func (h *shellHandler) Run(taskID string) error { h.m.Unlock() h.log("running") - stopping := false if err := cmd.Wait(); err != nil { if err.(*exec.ExitError).Sys().(syscall.WaitStatus).Signal() == os.Interrupt { - stopping = true + h.log("Stopping") } else { - // Metafora doesn't care about internal task failures, so just log it h.log("Exited with error: %v", err) + done = true // don't retry commands that error'd } } - // Only delete task if we're not stopping - if !stopping { + // Only delete task if command is done + if done { //FIXME Use CompareAndDelete if _, err := h.etcdc.Delete("/koalemos-tasks/"+taskID, recurs); err != nil { h.log("Error deleting task body: %v", err) } } - h.log("done") - return nil + h.log("done? %t", done) + return done } // Stop sends the Interrupt signal to the running process. diff --git a/handler.go b/handler.go index 8f086ba..4fc2960 100644 --- a/handler.go +++ b/handler.go @@ -4,28 +4,49 @@ package metafora // Handler for each claimed task, call Run once and only once, and call Stop // when the task should persist its progress and exit. type Handler interface { - // Run should block until a task is complete. If it returns nil, the task is - // considered complete. If error is non-nil, ...well... log it? FIXME - Run(taskID string) error + // Run handles a task and blocks until completion or Stop is called. + // + // If Run returns true, Metafora will mark the task as Done via the + // Coordinator. The task will not be rescheduled. + // + // If Run returns false, Metafora will Release the task via the Coordinator. + // The task will be scheduled to run again. + // + // Panics are treated the same as returning true. + Run(taskID string) (done bool) - // Stop should signal to the handler to shutdown gracefully. Stop - // implementations should not block until Run exits. + // Stop signals to the handler to shutdown gracefully. Stop implementations + // should not block until Run exits. + // + // Run probably wants to return false when stop is called, but this is left + // up to the implementation as races between Run finishing and Stop being + // called can happen. Stop() } // HandlerFunc is called by the Consumer to create a new Handler for each task. type HandlerFunc func() Handler -// FatalError is a custom error interface Handlers may choose to return from -// their Run methods in order to indicate to Metafora that the task has failed -// and should not be rescheduled. -// -// If an error is returned by Run that does not implement this interface, or -// Fatal() returns false, the task will be rescheduled. -type FatalError interface { - error +// SimpleHander creates a HandlerFunc for a simple function that accepts a stop +// channel. The channel will be closed when Stop is called. +func SimpleHandler(f func(task string, stop <-chan bool) bool) HandlerFunc { + return func() Handler { + return &simpleHandler{ + stop: make(chan bool), + f: f, + } + } +} + +type simpleHandler struct { + stop chan bool + f func(string, <-chan bool) bool +} + +func (h *simpleHandler) Run(task string) bool { + return h.f(task, h.stop) +} - // Fatal returns true when an error is unrecoverable and should not be - // rescheduled. - Fatal() bool +func (h *simpleHandler) Stop() { + close(h.stop) } diff --git a/metafora.go b/metafora.go index 8e0aaec..2115cfc 100644 --- a/metafora.go +++ b/metafora.go @@ -368,17 +368,7 @@ func (c *Consumer) claimed(taskID string) { // Start handler in its own goroutine go func() { - c.logger.Log(LogLevelInfo, "Task started: %s", taskID) - defer c.logger.Log(LogLevelInfo, "Task exited: %s", taskID) defer func() { - if err := recover(); err != nil { - stack := make([]byte, 50*1024) - sz := runtime.Stack(stack, false) - c.logger.Log(LogLevelError, "Handler %s panic()'d: %v\n%s", taskID, err, stack[:sz]) - // panics are considered fatal errors. Make sure the task isn't - // rescheduled. - c.coord.Done(taskID) - } // **This is the only place tasks should be removed from c.running** c.runL.Lock() close(c.running[taskID].c) @@ -388,21 +378,37 @@ func (c *Consumer) claimed(taskID string) { }() // Run the task - c.logger.Log(LogLevelDebug, "Calling run for task %s", taskID) - if err := h.Run(taskID); err != nil { - if ferr, ok := err.(FatalError); ok && ferr.Fatal() { - c.logger.Log(LogLevelError, "Handler for %s exited with fatal error: %v", taskID, err) - } else { - c.logger.Log(LogLevelError, "Handler for %s exited with error: %v", taskID, err) - // error was non-fatal, release and let another node try - c.coord.Release(taskID) - return - } + c.logger.Log(LogLevelInfo, "Task started: %s", taskID) + done := c.runTask(h.Run, taskID) + if done { + c.logger.Log(LogLevelInfo, "Task exited: %s (marking done)", taskID) + c.coord.Done(taskID) + } else { + c.logger.Log(LogLevelInfo, "Task exited: %s (releasing)", taskID) + c.coord.Release(taskID) } - c.coord.Done(taskID) }() } +// runTask executes a handler's Run method and recovers from panic()s. +func (c *Consumer) runTask(run func(string) bool, task string) bool { + done := false + func() { + defer func() { + if err := recover(); err != nil { + stack := make([]byte, 50*1024) + sz := runtime.Stack(stack, false) + c.logger.Log(LogLevelError, "Handler %s panic()'d: %v\n%s", task, err, stack[:sz]) + // panics are considered fatal errors. Make sure the task isn't + // rescheduled. + done = true + } + }() + done = run(task) + }() + return done +} + // release stops and Coordinator.Release()s a task if it's running. // // release blocks until the task handler stops running. diff --git a/metafora_test.go b/metafora_test.go index 51e6387..6fbb7b9 100644 --- a/metafora_test.go +++ b/metafora_test.go @@ -2,12 +2,13 @@ package metafora import ( "errors" - "fmt" "sync/atomic" "testing" "time" ) +var bal = &DumbBalancer{} + // Handler/Consumer test type testCoord struct { @@ -55,13 +56,13 @@ type testHandler struct { tasksRun chan string } -func (h *testHandler) Run(id string) error { +func (h *testHandler) Run(id string) bool { h.tasksRun <- id h.id = id h.t.Logf("Run(%s)", id) <-h.stop h.t.Logf("Stop received for %s", id) - return nil + return true } func (h *testHandler) Stop() { @@ -231,8 +232,8 @@ func TestBalancer(t *testing.T) { type noopHandler struct{} -func (noopHandler) Run(string) error { return nil } -func (noopHandler) Stop() {} +func (noopHandler) Run(string) bool { return true } +func (noopHandler) Stop() {} // TestHandleTask ensures that tasks are marked as done once handled. func TestHandleTask(t *testing.T) { @@ -242,6 +243,8 @@ func TestHandleTask(t *testing.T) { go c.Run() coord.tasks <- "task1" select { + case <-coord.releases: + t.Errorf("Release called, expected Done!") case <-coord.dones: case <-time.After(100 * time.Millisecond): t.Fatalf("Took too long to mark task as done") @@ -249,52 +252,57 @@ func TestHandleTask(t *testing.T) { c.Shutdown() } -type ferr bool - -func (err ferr) Error() string { return fmt.Sprintf("fatal error=%t", err) } -func (err ferr) Fatal() bool { return bool(err) } - -type errHandler struct{} - -func (errHandler) Run(taskID string) error { - if taskID == "panictask" { - panic("test panic") - } - return ferr(taskID == "fataltask") -} -func (errHandler) Stop() {} - -// TestHandleTaskErr ensures that tasks are either released or marked as done -// if Run returns an error. -func TestHandleTaskErr(t *testing.T) { - hf := func() Handler { return errHandler{} } +// TestTaskPanic ensures panics from Run methods are turned into Done calls. +func TestTaskPanic(t *testing.T) { + t.Parallel() + hf := SimpleHandler(func(string, <-chan bool) bool { + panic("TestTaskPanic") + }) coord := newTestCoord() - c, _ := NewConsumer(coord, hf, &DumbBalancer{}) + c, _ := NewConsumer(coord, hf, bal) go c.Run() - coord.tasks <- "task1" - coord.tasks <- "fataltask" - coord.tasks <- "panictask" - // 2 dones (1 panic, 1 fatal) - for i := 2; i > 0; i-- { + coord.tasks <- "1" + coord.tasks <- "2" + coord.tasks <- "3" + for i := 3; i > 0; i-- { select { case task := <-coord.dones: t.Logf("%s done", task) - case <-time.After(100 * time.Millisecond): - t.Fatalf("Took too long to mark task as done") + case task := <-coord.releases: + t.Errorf("%s released when it should have been marked Done!", task) + case <-time.After(200 * time.Millisecond): + t.Fatalf("Took too long to mark task(s) as done.") } } + c.Shutdown() +} - // 1 release - select { - case task := <-coord.releases: - t.Logf("%s released", task) - case <-time.After(100 * time.Millisecond): - t.Fatal("Took too long to release task") - } - - // nothing else should have happened - if len(coord.dones) > 0 || len(coord.releases) > 0 { - t.Fatalf("Unexpected extra events: dones=%d releases=%d", len(coord.dones), len(coord.releases)) +// TestShutdown ensures Shutdown causes Run() to exit cleanly. +func TestShutdown(t *testing.T) { + t.Parallel() + hf := SimpleHandler(func(_ string, c <-chan bool) bool { + <-c + return false + }) + coord := newTestCoord() + c, _ := NewConsumer(coord, hf, bal) + go c.Run() + coord.tasks <- "1" + coord.tasks <- "2" + coord.tasks <- "3" + time.Sleep(100 * time.Millisecond) + if len(coord.dones)+len(coord.releases) > 0 { + t.Fatalf("Didn't expect any tasks to exit before Shutdown was called.") } c.Shutdown() + for i := 3; i > 0; i-- { + select { + case task := <-coord.dones: + t.Errorf("%s makred done when it should have been Released!", task) + case task := <-coord.releases: + t.Logf("%s relased", task) + case <-time.After(200 * time.Millisecond): + t.Fatalf("Took too long to mark task(s) as released.") + } + } }