Skip to content

Commit

Permalink
fix(runner): fix concurrency and log file creation
Browse files Browse the repository at this point in the history
  • Loading branch information
ncarlier committed Dec 13, 2018
1 parent 9c1d59c commit c5e393e
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions pkg/worker/work_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"os/exec"
"path"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -59,18 +60,30 @@ func run(work *WorkRequest) (string, error) {
logger.Debug.Printf("Work %s#%d output to file: %s\n", work.Name, work.ID, logFilename)

wLogFile := bufio.NewWriter(logFile)
defer wLogFile.Flush()

r, w := io.Pipe()
cmd.Stdout = w
cmd.Stderr = w
// Combine cmd stdout and stderr
outReader, err := cmd.StdoutPipe()
if err != nil {
return logFilename, err
}
errReader, err := cmd.StderrPipe()
if err != nil {
return logFilename, err
}
cmdReader := io.MultiReader(outReader, errReader)

// Start the script...
err = cmd.Start()
if err != nil {
return logFilename, err
}

// Write script output to log file and the work message channel.
// Create wait group to wait for command output completion
var wg sync.WaitGroup
wg.Add(1)

// Write script output to log file and the work message channel
go func(reader io.Reader) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
Expand All @@ -91,18 +104,27 @@ func run(work *WorkRequest) (string, error) {
if err := scanner.Err(); err != nil {
logger.Error.Printf("Work %s#%d unable to read script stdout: %v\n", work.Name, work.ID, err)
}
if err = wLogFile.Flush(); err != nil {
logger.Error.Println("Error while flushing the log file:", logFilename, err)
}
}(r)
wg.Done()
}(cmdReader)

// Start timeout timer
timer := time.AfterFunc(time.Duration(work.Timeout)*time.Second, func() {
logger.Warning.Printf("Work %s#%d has timed out (%ds). Killing process #%d...\n", work.Name, work.ID, work.Timeout, cmd.Process.Pid)
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
})

// Wait for command output completion
wg.Wait()

// Wait for command completion
err = cmd.Wait()

// Stop timeout timer
timer.Stop()

// Mark work as terminated
work.Terminate()

if err != nil {
logger.Info.Printf("Work %s#%d done [ERROR]\n", work.Name, work.ID)
return logFilename, err
Expand Down

0 comments on commit c5e393e

Please sign in to comment.