Skip to content

Commit

Permalink
Graceful shutdown (#243)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Jun 22, 2021
1 parent b78084d commit 2723d11
Show file tree
Hide file tree
Showing 25 changed files with 613 additions and 545 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/tklauser/go-sysconf v0.3.6 // indirect
github.com/twmb/murmur3 v1.1.5
github.com/wacul/ptr v1.0.0 // indirect
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20210423082822-04245dca01da
golang.org/x/tools v0.1.0
google.golang.org/protobuf v1.26.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170927054621-314a259e304f/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
38 changes: 0 additions & 38 deletions pkg/agent/selfprofile.go

This file was deleted.

19 changes: 8 additions & 11 deletions pkg/agent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type ProfileSession struct {
startTime time.Time
stopTime time.Time

Logger Logger
logger Logger
}

type SessionConfig struct {
Expand All @@ -73,7 +73,7 @@ func NewSession(c *SessionConfig, logger Logger) *ProfileSession {
pids: []int{c.Pid},
stopCh: make(chan struct{}),
withSubprocesses: c.WithSubprocesses,
Logger: logger,
logger: logger,
}

if ps.spyName == types.GoSpy {
Expand All @@ -89,6 +89,7 @@ func NewSession(c *SessionConfig, logger Logger) *ProfileSession {

func (ps *ProfileSession) takeSnapshots() {
ticker := time.NewTicker(time.Second / time.Duration(ps.sampleRate))
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -131,7 +132,6 @@ func (ps *ProfileSession) takeSnapshots() {
}

case <-ps.stopCh:
ticker.Stop()
// stop the spies
for _, spy := range ps.spies {
spy.Stop()
Expand Down Expand Up @@ -196,11 +196,8 @@ func (ps *ProfileSession) Stop() {
defer ps.trieMutex.Unlock()

ps.stopTime = time.Now()
select {
case ps.stopCh <- struct{}{}:
default:
}
close(ps.stopCh)
// TODO: wait for stopCh consumer to finish!

// before stopping, upload the tries
ps.uploadTries(time.Now())
Expand Down Expand Up @@ -253,12 +250,12 @@ func (ps *ProfileSession) addSubprocesses() {
ps.pids = append(ps.pids, newPid)
newSpy, err := spy.SpyFromName(ps.spyName, newPid)
if err != nil {
if ps.Logger != nil {
ps.Logger.Errorf("failed to initialize a spy %d [%s]", newPid, ps.spyName)
if ps.logger != nil {
ps.logger.Errorf("failed to initialize a spy %d [%s]", newPid, ps.spyName)
}
} else {
if ps.Logger != nil {
ps.Logger.Debugf("started spy for subprocess %d [%s]", newPid, ps.spyName)
if ps.logger != nil {
ps.logger.Debugf("started spy for subprocess %d [%s]", newPid, ps.spyName)
}
ps.spies = append(ps.spies, newSpy)
}
Expand Down
45 changes: 22 additions & 23 deletions pkg/agent/upstream/direct/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package direct

import (
"runtime/debug"
"sync"

"github.com/sirupsen/logrus"

Expand All @@ -13,37 +14,37 @@ import (
const upstreamThreads = 1

type Direct struct {
s *storage.Storage
todo chan *upstream.UploadJob
done chan struct{}
s *storage.Storage
queue chan *upstream.UploadJob
stop chan struct{}
wg sync.WaitGroup
}

func New(s *storage.Storage) *Direct {
d := &Direct{
s: s,
todo: make(chan *upstream.UploadJob, 100),
done: make(chan struct{}, upstreamThreads),
return &Direct{
s: s,
queue: make(chan *upstream.UploadJob, 100),
stop: make(chan struct{}),
}

go d.start()
return d
}

func (u *Direct) start() {
func (u *Direct) Start() {
u.wg.Add(upstreamThreads)
for i := 0; i < upstreamThreads; i++ {
go u.uploadLoop()
}
}

func (u *Direct) Stop() {
for i := 0; i < upstreamThreads; i++ {
u.done <- struct{}{}
}
close(u.stop)
u.wg.Wait()
}

func (u *Direct) Upload(j *upstream.UploadJob) {
select {
case u.todo <- j:
case u.queue <- j:
case <-u.stop:
return
default:
logrus.Error("Direct upload queue is full, dropping a profile")
}
Expand Down Expand Up @@ -71,19 +72,18 @@ func (u *Direct) uploadProfile(j *upstream.UploadJob) {
Units: j.Units,
AggregationType: j.AggregationType,
}
if err := u.s.Put(pi); err == storage.ErrClosing {
if err := u.s.PutLocal(pi); err != nil {
logrus.WithError(err).Error("failed to store a local profile")
}
if err = u.s.Put(pi); err != nil {
logrus.WithError(err).Error("failed to store a local profile")
}
}

func (u *Direct) uploadLoop() {
defer u.wg.Done()
for {
select {
case j := <-u.todo:
case j := <-u.queue:
u.safeUpload(j)
case <-u.done:
case <-u.stop:
return
}
}
Expand All @@ -93,9 +93,8 @@ func (u *Direct) uploadLoop() {
func (u *Direct) safeUpload(j *upstream.UploadJob) {
defer func() {
if r := recover(); r != nil {
logrus.Errorf("panic, stack = : %v", debug.Stack())
logrus.Errorf("panic recovered: %v; %v", r, string(debug.Stack()))
}
}()

u.uploadProfile(j)
}
24 changes: 15 additions & 9 deletions pkg/analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func NewService(cfg *config.Server, s *storage.Storage, c *server.Controller) *S
},
Timeout: 60 * time.Second,
},
stopCh: make(chan struct{}),
stop: make(chan struct{}),
done: make(chan struct{}),
}
}

Expand All @@ -58,7 +59,9 @@ type Service struct {
c *server.Controller
httpClient *http.Client
uploads int
stopCh chan struct{}

stop chan struct{}
done chan struct{}
}

type metrics struct {
Expand Down Expand Up @@ -91,26 +94,29 @@ type metrics struct {
}

func (s *Service) Start() {
defer close(s.done)
timer := time.NewTimer(gracePeriod)
<-timer.C
select {
case <-s.stop:
return
case <-timer.C:
}
s.sendReport()
ticker := time.NewTicker(uploadFrequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.sendReport()
case <-s.stopCh:
case <-s.stop:
return
}
}
}

func (s *Service) Stop() {
select {
case s.stopCh <- struct{}{}:
default:
}
close(s.stopCh)
close(s.stop)
<-s.done
}

func (s *Service) sendReport() {
Expand Down
18 changes: 11 additions & 7 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"os"
goexec "os/exec"
"runtime"

"github.com/peterbourgon/ff/v3"
Expand Down Expand Up @@ -113,11 +114,6 @@ func generateRootCmd(cfg *config.Config) *ffcli.Command {
}

serverCmd.Exec = func(ctx context.Context, args []string) error {
l, err := logrus.ParseLevel(cfg.Server.LogLevel)
if err != nil {
return err
}
logrus.SetLevel(l)
return startServer(&cfg.Server)
}

Expand Down Expand Up @@ -145,7 +141,15 @@ func generateRootCmd(cfg *config.Config) *ffcli.Command {
return nil
}

return exec.Cli(context.Background(), &cfg.Exec, args)
err := exec.Cli(&cfg.Exec, args)
// Normally, if the program ran, the call should return ExitError and
// the exit code must be preserved. Otherwise, the error originates from
// pyroscope and will be printed.
if e, ok := err.(*goexec.ExitError); ok {
os.Exit(e.ExitCode())
}

return err
}

connectCmd.Exec = func(ctx context.Context, args []string) error {
Expand All @@ -160,7 +164,7 @@ func generateRootCmd(cfg *config.Config) *ffcli.Command {
return nil
}

return exec.Cli(context.Background(), &cfg.Exec, args)
return exec.Cli(&cfg.Exec, args)
}

dbmanagerCmd.Exec = func(ctx context.Context, args []string) error {
Expand Down

0 comments on commit 2723d11

Please sign in to comment.