Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Task output improvements. Task timeout. Closes #11
  • Loading branch information
Yevhen Terentiev committed Nov 19, 2019
1 parent e08ef00 commit 2a9c515
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 50 deletions.
2 changes: 1 addition & 1 deletion cmd/cmd.go
Expand Up @@ -19,7 +19,7 @@ var cfg *config.Config
var configFile string

var tasks = make(map[string]*task.Task)
var contexts = make(map[string]*runner.Context)
var contexts = make(map[string]*runner.ExecutionContext)
var pipelines = make(map[string]*scheduler.Pipeline)
var watchers = make(map[string]*watch.Watcher)

Expand Down
1 change: 1 addition & 0 deletions example/full.yaml
Expand Up @@ -79,6 +79,7 @@ tasks:
env:
VAR_NAME: VAR_VALUE
dir: /task/working/dir # current directory by default
timeout: 10s

task2:
context: docker-context-name
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config.go
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
"os"
"path"
"time"
)

const (
Expand All @@ -20,7 +21,6 @@ const (
CONTEXT_CONTAINER_PROVIDER_DOCKER = "docker"
CONTEXT_CONTAINER_PROVIDER_DOCKER_COMPOSE = "docker-compose"
CONTEXT_CONTAINER_PROVIDER_KUBECTL = "kubectl"
CONTEX_REMOTE_PROVIDER_SSH = "ssh"
)

var loaded = make(map[string]bool)
Expand Down Expand Up @@ -59,6 +59,7 @@ type TaskConfig struct {
Context string
Env map[string]string
Dir string
Timeout *time.Duration
}

type WatcherConfig struct {
Expand Down
60 changes: 32 additions & 28 deletions pkg/runner/context.go
@@ -1,6 +1,7 @@
package runner

import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/trntv/wilson/pkg/config"
Expand Down Expand Up @@ -28,7 +29,7 @@ type ssh struct {
executable util.Executable
}

type Context struct {
type ExecutionContext struct {
ctxType string
executable util.Executable
env []string
Expand All @@ -50,8 +51,8 @@ type Context struct {
mu sync.Mutex
}

func BuildContext(def config.ContextConfig, wcfg *config.WilsonConfig) (*Context, error) {
c := &Context{
func BuildContext(def config.ContextConfig, wcfg *config.WilsonConfig) (*ExecutionContext, error) {
c := &ExecutionContext{
ctxType: def.Type,
executable: util.Executable{
Bin: def.Bin,
Expand Down Expand Up @@ -162,18 +163,19 @@ func BuildContext(def config.ContextConfig, wcfg *config.WilsonConfig) (*Context
return c, nil
}

func (c *Context) buildLocalCommand(command string) *exec.Cmd {
cmd := exec.Command(c.executable.Bin, c.executable.Args...)
func (c *ExecutionContext) buildLocalCommand(command string, ctx context.Context) *exec.Cmd {
cmd := exec.CommandContext(ctx, c.executable.Bin, c.executable.Args...)
cmd.Args = append(cmd.Args, command)
cmd.Env = c.env
cmd.Dir = c.dir

return cmd
}

func (c *Context) buildDockerCommand(command string) *exec.Cmd {
cmd := exec.Command(c.container.executable.Bin, c.container.executable.Args...)
func (c *ExecutionContext) buildDockerCommand(command string, ctx context.Context) *exec.Cmd {
cmd := exec.CommandContext(ctx, c.container.executable.Bin, c.container.executable.Args...)
cmd.Env = c.env
cmd.Dir = c.dir

switch c.container.provider {
case config.CONTEXT_CONTAINER_PROVIDER_DOCKER:
Expand Down Expand Up @@ -217,9 +219,10 @@ func (c *Context) buildDockerCommand(command string) *exec.Cmd {
return cmd
}

func (c *Context) buildKubectlCommand(command string) *exec.Cmd {
cmd := exec.Command(c.container.executable.Bin, c.container.executable.Args...)
func (c *ExecutionContext) buildKubectlCommand(command string, ctx context.Context) *exec.Cmd {
cmd := exec.CommandContext(ctx, c.container.executable.Bin, c.container.executable.Args...)
cmd.Env = append(c.env, c.container.env...)
cmd.Dir = c.dir

cmd.Args = append(cmd.Args, "exec", c.container.name)
cmd.Args = append(cmd.Args, c.container.options...)
Expand All @@ -231,12 +234,14 @@ func (c *Context) buildKubectlCommand(command string) *exec.Cmd {
return cmd
}

func (c *Context) buildRemoteCommand(command string) *exec.Cmd {
cmd := exec.Command(c.ssh.executable.Bin, c.ssh.executable.Args...)
func (c *ExecutionContext) buildRemoteCommand(command string, ctx context.Context) *exec.Cmd {
cmd := exec.CommandContext(ctx, c.ssh.executable.Bin, c.ssh.executable.Args...)
cmd.Env = c.env
cmd.Dir = c.dir

cmd.Args = append(cmd.Args, c.executable.Bin)
cmd.Args = append(cmd.Args, c.executable.Args...)
cmd.Args = append(cmd.Args, command)
cmd.Env = c.env

return cmd
}
Expand All @@ -246,19 +251,19 @@ func setDefaultShell(e *util.Executable) {
e.Args = []string{"-c"}
}

func (c *Context) Bin() string {
func (c *ExecutionContext) Bin() string {
return c.executable.Bin
}

func (c *Context) Args() []string {
func (c *ExecutionContext) Args() []string {
return c.executable.Args
}

func (c *Context) Env() []string {
func (c *ExecutionContext) Env() []string {
return c.env
}

func (c *Context) WithEnvs(env []string) (*Context, error) {
func (c *ExecutionContext) WithEnvs(env []string) (*ExecutionContext, error) {
def := *c.def
for _, v := range env {
kv := strings.Split(v, "=")
Expand All @@ -271,7 +276,7 @@ func (c *Context) WithEnvs(env []string) (*Context, error) {
return BuildContext(def, &config.Get().WilsonConfig)
}

func (c *Context) Up() {
func (c *ExecutionContext) Up() {
c.onceUp.Do(func() {
for _, command := range c.up {
err := c.runServiceCommand(command)
Expand All @@ -282,7 +287,7 @@ func (c *Context) Up() {
})
}

func (c *Context) Down() {
func (c *ExecutionContext) Down() {
c.onceDown.Do(func() {
for _, command := range c.down {
err := c.runServiceCommand(command)
Expand All @@ -293,7 +298,7 @@ func (c *Context) Down() {
})
}

func (c *Context) Before() error {
func (c *ExecutionContext) Before() error {
for _, command := range c.before {
err := c.runServiceCommand(command)
if err != nil {
Expand All @@ -304,7 +309,7 @@ func (c *Context) Before() error {
return nil
}

func (c *Context) After() error {
func (c *ExecutionContext) After() error {
for _, command := range c.after {
err := c.runServiceCommand(command)
if err != nil {
Expand All @@ -315,7 +320,7 @@ func (c *Context) After() error {
return nil
}

func (c *Context) runServiceCommand(command string) error {
func (c *ExecutionContext) runServiceCommand(command string) error {
log.Debugf("running service context service command: %s", command)
ca := strings.Split(command, " ")
cmd := exec.Command(ca[0], ca[1:]...)
Expand All @@ -330,29 +335,28 @@ func (c *Context) runServiceCommand(command string) error {
return nil
}

func (c *Context) createCommand(command string) *exec.Cmd {
func (c *ExecutionContext) createCommand(command string, ctx context.Context) *exec.Cmd {
switch c.ctxType {
case config.CONTEXT_TYPE_LOCAL:
return c.buildLocalCommand(command)
return c.buildLocalCommand(command, ctx)
case config.CONTEXT_TYPE_CONTAINER:
switch c.container.provider {
case config.CONTEXT_CONTAINER_PROVIDER_DOCKER, config.CONTEXT_CONTAINER_PROVIDER_DOCKER_COMPOSE:
return c.buildDockerCommand(command)
return c.buildDockerCommand(command, ctx)
case config.CONTEXT_CONTAINER_PROVIDER_KUBECTL:
return c.buildKubectlCommand(command)
return c.buildKubectlCommand(command, ctx)
}
case config.CONTEXT_TYPE_REMOTE:
return c.buildRemoteCommand(command)
return c.buildRemoteCommand(command, ctx)
default:
log.Fatal("unknown context type")
}

return nil
}

func (c *Context) ScheduleForCleanup() {
func (c *ExecutionContext) ScheduleForCleanup() {
c.mu.Lock()
c.scheduledForCleanup = true
c.mu.Unlock()

}
36 changes: 31 additions & 5 deletions pkg/runner/output.go
Expand Up @@ -19,6 +19,16 @@ type taskOutput struct {
stderr io.Writer
}

type logWriter struct {
t *task.Task
}

func (l logWriter) Write(p []byte) (n int, err error) {
l.t.WiteLog(p)

return len(p), nil
}

func NewTaskOutput(raw bool, quiet bool) *taskOutput {
o := &taskOutput{
raw: raw,
Expand Down Expand Up @@ -84,22 +94,38 @@ func (o *taskOutput) streamOutput(t *task.Task, done chan struct{}) {
}

func (o *taskOutput) streamRawOutput(t *task.Task) error {
_, err := io.Copy(o.stdout, t.Stdout)
lw := &logWriter{t: t}
err := o.stream(o.stdout, t.Stdout, lw)
if err != nil {
return err
}

err = o.stream(o.stderr, t.Stderr, lw)
if err != nil {
return err
}

return nil
}

func (o *taskOutput) stream(dst io.Writer, src io.ReadCloser, log io.Writer) error {
logw, logr, _ := os.Pipe()
stderr := io.MultiWriter(o.stderr, logw)
w := io.MultiWriter(dst, logw)

_, err = io.Copy(stderr, t.Stderr)
_, err := io.Copy(w, src)
if err == os.ErrClosed {
return err
}

scanner := bufio.NewScanner(logr)
for scanner.Scan() {
t.WiteLog(scanner.Text())
b := scanner.Bytes()
if len(b) > 0 {
_, err = log.Write(b)
if err != nil {
return err
}
}
}

return nil
Expand All @@ -125,7 +151,7 @@ func (o *taskOutput) streamDecoratedStderrOutput(t *task.Task) error {
scanner := bufio.NewScanner(t.Stderr)
for scanner.Scan() {
line := scanner.Text()
t.WiteLog(line)
t.WiteLog([]byte(line))
_, err := fmt.Fprintf(o.stderr, "%s: %s\r\n", aurora.Red(t.Name), line)
if err != nil {
log.Debug(err)
Expand Down
24 changes: 18 additions & 6 deletions pkg/runner/runner.go
Expand Up @@ -10,15 +10,15 @@ import (
)

type TaskRunner struct {
contexts map[string]*Context
contexts map[string]*ExecutionContext
env []string

output *taskOutput
ctx context.Context
cancel context.CancelFunc
}

func NewTaskRunner(contexts map[string]*Context, env []string, raw, quiet bool) *TaskRunner {
func NewTaskRunner(contexts map[string]*ExecutionContext, env []string, raw, quiet bool) *TaskRunner {
tr := &TaskRunner{
contexts: contexts,
output: NewTaskOutput(raw, quiet),
Expand Down Expand Up @@ -49,8 +49,14 @@ func (r *TaskRunner) RunWithEnv(t *task.Task, env []string) (err error) {
t.Start = time.Now()
log.Infof("Running task %s...", t.Name)

var ctx = context.Background()
var cancel context.CancelFunc
for _, command := range t.Command {
cmd := c.createCommand(command)
if t.Timeout != nil {
ctx, cancel = context.WithTimeout(ctx, *t.Timeout)
}

cmd := c.createCommand(command, ctx)
cmd.Env = append(cmd.Env, env...)
cmd.Env = append(cmd.Env, r.env...)

Expand All @@ -70,7 +76,7 @@ func (r *TaskRunner) RunWithEnv(t *task.Task, env []string) (err error) {
}
t.SetStderr(stderr)

err = r.runCommand(t, cmd)
err = r.runCommand(t, cmd, cancel)
if err != nil {
t.UpdateStatus(task.STATUS_ERROR)
t.End = time.Now()
Expand All @@ -91,7 +97,13 @@ func (r *TaskRunner) RunWithEnv(t *task.Task, env []string) (err error) {
return nil
}

func (r *TaskRunner) runCommand(t *task.Task, cmd *exec.Cmd) error {
func (r *TaskRunner) runCommand(t *task.Task, cmd *exec.Cmd, cancelFunc context.CancelFunc) error {
defer func(cancelFunc context.CancelFunc) {
if cancelFunc != nil {
cancelFunc()
}
}(cancelFunc)

var done = make(chan struct{})
var killed = make(chan struct{})
go r.waitForInterruption(*cmd, done, killed)
Expand Down Expand Up @@ -142,7 +154,7 @@ func (r *TaskRunner) Cancel() {
r.cancel()
}

func (r *TaskRunner) contextForTask(t *task.Task) (c *Context, err error) {
func (r *TaskRunner) contextForTask(t *task.Task) (c *ExecutionContext, err error) {
c, ok := r.contexts[t.Context]
if !ok {
return nil, errors.New("no such context")
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler.go
Expand Up @@ -21,7 +21,7 @@ type PipelineScheduler struct {
End time.Time
}

func NewScheduler(pipeline *Pipeline, contexts map[string]*runner.Context, env []string, raw, quiet bool) *PipelineScheduler {
func NewScheduler(pipeline *Pipeline, contexts map[string]*runner.ExecutionContext, env []string, raw, quiet bool) *PipelineScheduler {
r := &PipelineScheduler{
pipeline: pipeline,
pause: 50 * time.Millisecond,
Expand Down

0 comments on commit 2a9c515

Please sign in to comment.