Skip to content
Permalink
Browse files

local exec healthchecks (#703)

* local exec healthchecks

* swap == nil for != nil

* fix lint
  • Loading branch information
coryschwartz committed Mar 23, 2020
1 parent 6f4df98 commit d295689ecd967a685277c040339b1b4265d30e9c
Showing with 117 additions and 53 deletions.
  1. +11 −0 docs/USAGE.md
  2. +106 −53 pkg/runner/local_exec.go
@@ -138,6 +138,17 @@ You should see a bunch of logs that describe the steps of the test, from:
* Starting the containers (total of 50 as 50 is the default number of nodes for this test)
* You will see the logs that describe each node connecting to the others and executing a kademlia find-peers action.
In order to run the tests with the local:exec runner, there are a few things that must be taken care of first.
1. install required test software
* redis (redis.io)
* prometheus (prometheus.io)
* prometheus-pushgateway (prometheus.io)
(bug) the address of the promethus pushgateway is hard-coded to http://prometheus-pushgateway.
This works fine on the docker and k8s runners, but you will need to make this address resolvable on your local system.
Consider adding a line into /etc/hosts so metrics collection works with the local:exec runner.
## Running a composition
@@ -3,6 +3,7 @@ package runner
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"os"
"path/filepath"
"strconv"
@@ -32,29 +33,118 @@ var (
type LocalExecutableRunner struct {
lk sync.RWMutex

outputsDir string
redisCloseFn context.CancelFunc
outputsDir string
closeFn context.CancelFunc
}

// LocalExecutableRunnerCfg is the configuration struct for this runner.
type LocalExecutableRunnerCfg struct{}

type healthcheckedProcess struct {
HealthcheckItem api.HealthcheckItem
Checker func() bool
Fixer func() error
Success string
Failure string
}

func newhealthcheckedProcess(ctx context.Context, name string, address string, cmd string, args ...string) *healthcheckedProcess {
return &healthcheckedProcess{
HealthcheckItem: api.HealthcheckItem{
Name: name,
},
Checker: tcpChecker(address),
Fixer: commandStarter(ctx, cmd, args...),
Success: fmt.Sprintf("%s instance check: OK", name),
Failure: fmt.Sprintf("%s instance check: FAIL", name),
}
}

// tcpChecker returns a closure which can be used to check
// when a tcp port is open. Returns true if the socket is dialable.
// Otherwise, returns false. Use as healthcheckedProcess.Checker.
func tcpChecker(address string) func() bool {
return func() bool {
_, err := net.Dial("tcp", address)
return err == nil
}
}

func commandStarter(ctx context.Context, cmd string, args ...string) func() error {
return func() error {
cmd := exec.CommandContext(ctx, cmd, args...)
return cmd.Start()
}
}

func (r *LocalExecutableRunner) Healthcheck(fix bool, engine api.Engine, writer io.Writer) (*api.HealthcheckReport, error) {
r.lk.Lock()
defer r.lk.Unlock()

var redisCheck, outputsDirCheck api.HealthcheckItem
ctx, cancel := context.WithCancel(engine.Context())
r.closeFn = cancel

// Check if a local Redis instance is running. If not, try to start it.
_, err := net.Dial("tcp", "localhost:6379")
if err == nil {
msg := "local redis instance check: OK"
redisCheck = api.HealthcheckItem{Name: "local-redis", Status: api.HealthcheckStatusOK, Message: msg}
} else {
msg := fmt.Sprintf("local redis instance check: FAIL; %s", err)
redisCheck = api.HealthcheckItem{Name: "local-redis", Status: api.HealthcheckStatusFailed, Message: msg}
report := api.HealthcheckReport{
Checks: []api.HealthcheckItem{},
Fixes: []api.HealthcheckItem{},
}

// Use this slice to add or remove additional checks.
localInfra := []*healthcheckedProcess{
newhealthcheckedProcess(ctx,
"local-redis",
"localhost:6379",
"redis-server",
"--save",
"\"\"",
"--appendonly",
"no"),
newhealthcheckedProcess(ctx,
"local-prometheus",
"localhost:9090",
"prometheus"),
newhealthcheckedProcess(ctx,
"local-pushgateway",
"localhost:9091",
"pushgateway"),
}

eg, _ := errgroup.WithContext(ctx)

for _, li := range localInfra {
hcp := *li
eg.Go(func() error {
// Checker succeeds, already working.
if hcp.Checker() {
hcp.HealthcheckItem.Status = api.HealthcheckStatusOK
hcp.HealthcheckItem.Message = hcp.Success
report.Checks = append(report.Checks, hcp.HealthcheckItem)
return nil
}
// Checker failed, try to fix.
err := hcp.Fixer()
if err != nil {
// Oh no! the fix failed.
hcp.HealthcheckItem.Status = api.HealthcheckStatusFailed
hcp.HealthcheckItem.Message = hcp.Failure
report.Checks = append(report.Checks, hcp.HealthcheckItem)
// just because the fixer failed, doesn't mean *this* procedure failed.
return nil
}
// Fix succeeded.
hcp.HealthcheckItem.Status = api.HealthcheckStatusOK
hcp.HealthcheckItem.Message = hcp.Success
report.Checks = append(report.Checks, hcp.HealthcheckItem)
report.Fixes = append(report.Fixes, hcp.HealthcheckItem)
return nil
})
err := eg.Wait()
if err != nil {
return nil, nil
}
}

var outputsDirCheck api.HealthcheckItem
// Ensure the outputs dir exists.
r.outputsDir = filepath.Join(engine.EnvConfig().WorkDir(), "local_exec", "outputs")
if _, err := os.Stat(r.outputsDir); err == nil {
@@ -68,61 +158,24 @@ func (r *LocalExecutableRunner) Healthcheck(fix bool, engine api.Engine, writer
outputsDirCheck = api.HealthcheckItem{Name: "outputs-dir", Status: api.HealthcheckStatusAborted, Message: msg}
}

if !fix {
return &api.HealthcheckReport{Checks: []api.HealthcheckItem{redisCheck, outputsDirCheck}}, nil
}

// FIX LOGIC ====================

var fixes []api.HealthcheckItem

if redisCheck.Status != api.HealthcheckStatusOK {
// if there was a previous close function, it's possible that the
// user has killed the redis instance manually, or some other
// pathological situation is in place. Call the close function
// first just in case.
if r.redisCloseFn != nil {
r.redisCloseFn()
}

// Create a new cancellable context for the redis process. Store the
// cancel function and renew the sync.Once in the runner's state.
ctx, cancel := context.WithCancel(engine.Context())
r.redisCloseFn = cancel

cmd := exec.CommandContext(ctx, "redis-server", "--save", "\"\"", "--appendonly", "no")
if err := cmd.Start(); err == nil {
msg := "temporary redis instance started successfully"
it := api.HealthcheckItem{Name: "local-redis", Status: api.HealthcheckStatusOK, Message: msg}
fixes = append(fixes, it)
} else {
msg := fmt.Sprintf("temporary redis instance failed to start: %s", err)
it := api.HealthcheckItem{Name: "local-redis", Status: api.HealthcheckStatusFailed, Message: msg}
fixes = append(fixes, it)
}
}

if outputsDirCheck.Status != api.HealthcheckStatusOK {
if err := os.MkdirAll(r.outputsDir, 0777); err == nil {
msg := "outputs dir created successfully"
it := api.HealthcheckItem{Name: "outputs-dir", Status: api.HealthcheckStatusOK, Message: msg}
fixes = append(fixes, it)
report.Fixes = append(report.Fixes, it)
} else {
msg := fmt.Sprintf("failed to create outputs dir: %s", err)
it := api.HealthcheckItem{Name: "outputs-dir", Status: api.HealthcheckStatusFailed, Message: msg}
fixes = append(fixes, it)
report.Fixes = append(report.Fixes, it)
}
}

return &api.HealthcheckReport{
Checks: []api.HealthcheckItem{redisCheck, outputsDirCheck},
Fixes: fixes,
}, nil
return &report, nil
}

func (r *LocalExecutableRunner) Close() error {
if r.redisCloseFn != nil {
r.redisCloseFn()
if r.closeFn != nil {
r.closeFn()
logging.S().Info("temporary redis instance stopped")
}

0 comments on commit d295689

Please sign in to comment.
You can’t perform that action at this time.