Skip to content

Commit

Permalink
feat(hatchery): dump stack traces (#3176)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and yesnault committed Aug 10, 2018
1 parent 34e9e18 commit 1e9bab1
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 40 deletions.
3 changes: 2 additions & 1 deletion engine/.gitignore
@@ -1,2 +1,3 @@
bin/
target/
target/
panic_dumps/
68 changes: 68 additions & 0 deletions engine/hatchery/serve.go
Expand Up @@ -4,9 +4,15 @@ import (
"context"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"time"

"github.com/gorilla/mux"

"github.com/ovh/cds/engine/api"
"github.com/ovh/cds/engine/api/observability"
"github.com/ovh/cds/engine/service"
Expand All @@ -23,6 +29,34 @@ type Common struct {
stats hatchery.Stats
}

const panicDumpDir = "panic_dumps"

func (c *Common) servePanicDumpList() ([]string, error) {
dir, _ := os.Getwd()
path := filepath.Join(dir, panicDumpDir)
files, err := ioutil.ReadDir(path)
if err != nil {
return nil, err
}
res := make([]string, len(files))
for i, f := range files {
res[i] = f.Name()
}
return res, nil
}

func (c *Common) servePanicDump(f string) (io.ReadCloser, error) {
dir, _ := os.Getwd()
path := filepath.Join(dir, panicDumpDir, f)
return os.OpenFile(path, os.O_RDONLY, os.FileMode(0644))
}

func (c *Common) PanicDumpDirectory() (string, error) {
dir, _ := os.Getwd()
path := filepath.Join(dir, panicDumpDir)
return path, os.MkdirAll(path, os.FileMode(0755))
}

func (c *Common) ServiceName() string {
return c.Common.ServiceName
}
Expand Down Expand Up @@ -112,6 +146,40 @@ func (c *Common) initRouter(ctx context.Context, h hatchery.Interface) {
r.Handle("/mon/status", r.GET(getStatusHandler(h), api.Auth(false)))
r.Handle("/mon/workers", r.GET(getWorkersPoolHandler(h), api.Auth(false)))
r.Handle("/mon/metrics", r.GET(observability.StatsHandler, api.Auth(false)))
r.Handle("/mon/errors", r.GET(c.getPanicDumpListHandler, api.Auth(false)))
r.Handle("/mon/errors/{id}", r.GET(c.getPanicDumpHandler, api.Auth(false)))

}

func (c *Common) getPanicDumpListHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
l, err := c.servePanicDumpList()
if err != nil {
return err
}
return service.WriteJSON(w, l, http.StatusOK)
}
}

func (c *Common) getPanicDumpHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
id := vars["id"]
f, err := c.servePanicDump(id)
if err != nil {
return err
}
defer f.Close() // nolint

if _, err := io.Copy(w, f); err != nil {
return err
}

w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)

return nil
}
}

func getWorkersPoolHandler(h hatchery.Interface) service.HandlerFunc {
Expand Down
24 changes: 19 additions & 5 deletions sdk/common.go
Expand Up @@ -107,17 +107,31 @@ func FileSHA512sum(filePath string) (string, error) {
}

// GoRoutine runs the function within a goroutine with a panic recovery
func GoRoutine(name string, fn func()) {
func GoRoutine(name string, fn func(), writerFactories ...func(s string) (io.WriteCloser, error)) {
go func() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
log.Error("[PANIC] %s Failed", name)
log.Error("[PANIC] %s> %s", name, string(buf))
runtime.Stack(buf, false)
uuid := UUID()
log.Error("[PANIC] %s Failed (%s)", name, uuid)

for _, f := range writerFactories {
w, err := f(uuid)
if err != nil {
log.Error("unable open writer %s ¯\\_(ツ)_/¯ (%v)", uuid, err)
continue
}
if _, err := io.Copy(w, bytes.NewReader(buf)); err != nil {
log.Error("unable to write %s ¯\\_(ツ)_/¯ (%v)", uuid, err)
continue
}
if err := w.Close(); err != nil {
log.Error("unable to close %s ¯\\_(ツ)_/¯ (%v)", uuid, err)
}
}
}
}()
fn()
}()

}
71 changes: 40 additions & 31 deletions sdk/hatchery/hatchery.go
Expand Up @@ -107,16 +107,22 @@ func Create(h Interface) error {
// hatchery is now fully Initialized
h.SetInitialized()

sdk.GoRoutine("heartbeat", func() {
hearbeat(h, h.Configuration().API.Token, h.Configuration().API.MaxHeartbeatFailures)
})

sdk.GoRoutine("queuePolling", func() {
if err := h.CDSClient().QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, h.Configuration().Provision.GraceTimeQueued, nil); err != nil {
log.Error("Queues polling stopped: %v", err)
cancel()
}
})
sdk.GoRoutine("heartbeat",
func() {
hearbeat(h, h.Configuration().API.Token, h.Configuration().API.MaxHeartbeatFailures)
},
PanicDump(h),
)

sdk.GoRoutine("queuePolling",
func() {
if err := h.CDSClient().QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, h.Configuration().Provision.GraceTimeQueued, nil); err != nil {
log.Error("Queues polling stopped: %v", err)
cancel()
}
},
PanicDump(h),
)

// run the starters pool
workersStartChan, workerStartResultChan := startWorkerStarters(h)
Expand All @@ -126,33 +132,36 @@ func Create(h Interface) error {
return fmt.Errorf("Create> Cannot retrieve hostname: %s", errh)
}
// read the result channel in another goroutine to let the main goroutine start new workers
sdk.GoRoutine("checkStarterResult", func() {
for startWorkerRes := range workerStartResultChan {
if startWorkerRes.err != nil {
errs <- startWorkerRes.err
}
if startWorkerRes.temptToSpawn {
found := false
for _, hID := range startWorkerRes.request.spawnAttempts {
if hID == h.ID() {
found = true
break
}
sdk.GoRoutine("checkStarterResult",
func() {
for startWorkerRes := range workerStartResultChan {
if startWorkerRes.err != nil {
errs <- startWorkerRes.err
}
if !found {
if hCount, err := h.CDSClient().HatcheryCount(startWorkerRes.request.workflowNodeRunID); err == nil {
if int64(len(startWorkerRes.request.spawnAttempts)) < hCount {
if _, errQ := h.CDSClient().QueueJobIncAttempts(startWorkerRes.request.id); errQ != nil {
log.Warning("Hatchery> Create> cannot inc spawn attempts %v", errQ)
if startWorkerRes.temptToSpawn {
found := false
for _, hID := range startWorkerRes.request.spawnAttempts {
if hID == h.ID() {
found = true
break
}
}
if !found {
if hCount, err := h.CDSClient().HatcheryCount(startWorkerRes.request.workflowNodeRunID); err == nil {
if int64(len(startWorkerRes.request.spawnAttempts)) < hCount {
if _, errQ := h.CDSClient().QueueJobIncAttempts(startWorkerRes.request.id); errQ != nil {
log.Warning("Hatchery> Create> cannot inc spawn attempts %v", errQ)
}
}
} else {
log.Warning("Hatchery> Create> cannot get hatchery count: %v", err)
}
} else {
log.Warning("Hatchery> Create> cannot get hatchery count: %v", err)
}
}
}
}
})
},
PanicDump(h),
)

// the main goroutine
for {
Expand Down
22 changes: 19 additions & 3 deletions sdk/hatchery/starter.go
Expand Up @@ -3,6 +3,9 @@ package hatchery
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"sync/atomic"
"time"

Expand Down Expand Up @@ -35,6 +38,16 @@ type workerStarterResult struct {
err error
}

func PanicDump(h Interface) func(s string) (io.WriteCloser, error) {
return func(s string) (io.WriteCloser, error) {
dir, err := h.PanicDumpDirectory()
if err != nil {
return nil, err
}
return os.OpenFile(filepath.Join(dir, s), os.O_RDWR|os.O_CREATE, 0644)
}
}

// Start all goroutines which manage the hatchery worker spawning routine.
// the purpose is to avoid go routines leak when there is a bunch of worker to start
func startWorkerStarters(h Interface) (chan<- workerStarterRequest, chan workerStarterResult) {
Expand All @@ -46,9 +59,12 @@ func startWorkerStarters(h Interface) (chan<- workerStarterRequest, chan workerS
maxProv = defaultMaxProvisioning
}
for i := 0; i < maxProv; i++ {
sdk.GoRoutine("workerStarter", func() {
workerStarter(h, jobs, results)
})
sdk.GoRoutine("workerStarter",
func() {
workerStarter(h, jobs, results)
},
PanicDump(h),
)
}

return jobs, results
Expand Down
1 change: 1 addition & 0 deletions sdk/hatchery/types.go
Expand Up @@ -93,6 +93,7 @@ type Interface interface {
SetInitialized()
ServiceName() string
Stats() *Stats
PanicDumpDirectory() (string, error)
}

type Stats struct {
Expand Down

0 comments on commit 1e9bab1

Please sign in to comment.