Permalink
Browse files

make linewriter its own thing

  • Loading branch information...
jmoiron committed Dec 23, 2016
1 parent 46e39ed commit 147519260b2e70fd241523d766d04dc075495d8e
Showing with 72 additions and 48 deletions.
  1. +26 −48 dmc.go
  2. +46 −0 linewriter.go
View
74 dmc.go
@@ -30,6 +30,14 @@ const (
var tty = terminal.IsTerminal(int(os.Stdout.Fd()))
var last int64 = white
func cycle() int {
col := atomic.LoadInt64(&last)
atomic.AddInt64(&last, 1)
return int(col)
}
func color(s string, color int, bold bool) string {
if !tty {
return s
@@ -113,59 +121,28 @@ func do(host, cmd string) ([]byte, error) {
return buf.Bytes(), nil
}
// LineBufferedWriter is a WriteCloser that buffers lines from multiple
// threads and writes them as available.
type LineBufferedWriter struct {
out io.Writer
buf chan string
wg sync.WaitGroup
prefix string
}
func NewLineBufferedWriter(w io.Writer, prefix string) *LineBufferedWriter {
b := &LineBufferedWriter{out: w, buf: make(chan string, 256), prefix: prefix}
b.run()
return b
type LineWriter interface {
WriteLine(string) error
}
func (w *LineBufferedWriter) run() {
w.wg.Add(1)
// doi runs cmd on host, writing lines to out as available. It cycles through
// colors so that hosts can be differentiated as well as possible.
func doi(host, cmd string, out LineWriter) error {
c := exec.Command("ssh", host, cmd)
rdr, wrt := io.Pipe()
c.Stdout = wrt
c.Stderr = wrt
var err error
go func() {
for l := range w.buf {
w.out.Write([]byte(l))
}
w.wg.Done()
err = c.Run()
wrt.Close()
}()
}
// Close flushes the rest of the output and closes the writer.
// It is not legal to write to this LineBufferedWriter after closing.
func (w *LineBufferedWriter) Close() error {
close(w.buf)
w.wg.Wait()
return nil
}
// Write lines to this buffered writer. Lines may be interleaved with
// others being written at "the same time", as each line is treated as
// an individual write.
func (w LineBufferedWriter) Write(b []byte) (n int, err error) {
r := bufio.NewScanner(bytes.NewReader(b))
col := cycle()
r := bufio.NewScanner(rdr)
for r.Scan() {
w.buf <- w.prefix + r.Text() + "\n"
}
if err := r.Err(); err != nil {
return len(b), err
out.WriteLine(fmt.Sprintf("[%s] %s\n", color(host, col, false), string(r.Bytes())))
}
return len(b), nil
}
func doi(host, cmd string, out io.Writer) error {
c := exec.Command("ssh", host, cmd)
outw := NewLineBufferedWriter(out, fmt.Sprintf("[%s] ", host))
c.Stdout = outw
c.Stderr = outw
return c.Run()
return err
}
func main() {
@@ -198,8 +175,9 @@ func main() {
// if we're interleaving output it's slightly different
// so we just branch here
if cfg.interleave {
stdout := NewSyncLineWriter(os.Stdout)
for host := range hostch {
err := doi(host, cmd, os.Stdout)
err := doi(host, cmd, stdout)
if err != nil {
atomic.StoreInt64(&code, 1)
}
View
@@ -0,0 +1,46 @@
package main
import (
"io"
"sync"
)
// A SyncLineWriter is a LineWriter that is safe to use across goroutines.
type SyncLineWriter struct {
out io.Writer
buf chan string
wg sync.WaitGroup
}
// NewSyncLineWriter returns a SyncLineWriter that writes its lines out to the
// provided writer. It uses one goroutine, which is cleaned up when calling
// Close.
func NewSyncLineWriter(w io.Writer) *SyncLineWriter {
s := &SyncLineWriter{out: w, buf: make(chan string, 512)}
s.run()
return s
}
func (s *SyncLineWriter) run() {
s.wg.Add(1)
go func() {
for l := range s.buf {
s.out.Write([]byte(l))
}
s.wg.Done()
}()
}
// Close flushes the rest of the output and closes the writer.
// It is not legal to write to this LineBufferedWriter after closing.
func (s *SyncLineWriter) Close() error {
close(s.buf)
s.wg.Wait()
return nil
}
// WriteLine writes the str to this writer.
func (s *SyncLineWriter) WriteLine(str string) error {
s.buf <- str
return nil
}

0 comments on commit 1475192

Please sign in to comment.