Permalink
Browse files

wip, wrong direction but some stuff here will get used

  • Loading branch information...
jmoiron committed Dec 22, 2016
1 parent ca16f25 commit 46e39edd9a3758b0e09c88e740c0444ff4a24d1d
Showing with 73 additions and 6 deletions.
  1. +73 −6 dmc.go
View
79 dmc.go
@@ -5,6 +5,7 @@ import (
"bytes"
"flag"
"fmt"
"io"
"net"
"os"
"os/exec"
@@ -55,7 +56,7 @@ func init() {
flag.StringVar(&cfg.hosts, "hosts", "", "list of hosts")
flag.StringVar(&cfg.dns, "d", "", "dns name for multi-hosts")
flag.IntVar(&cfg.threads, "n", 512, "threads to run in parallel")
// flag.BoolVar(&cfg.interleave, "i", false, "interleave output as it is available")
flag.BoolVar(&cfg.interleave, "i", false, "interleave output as it is available")
flag.Parse()
}
@@ -112,6 +113,61 @@ func do(host, cmd string) ([]byte, error) {
return buf.Bytes(), nil
}
// LineBufferedWriter is a WriteCloser that buffers lines from multiple
// threads and writes them as available.
type LineBufferedWriter struct {
out io.Writer
buf chan string
wg sync.WaitGroup
prefix string
}
func NewLineBufferedWriter(w io.Writer, prefix string) *LineBufferedWriter {
b := &LineBufferedWriter{out: w, buf: make(chan string, 256), prefix: prefix}
b.run()
return b
}
func (w *LineBufferedWriter) run() {
w.wg.Add(1)
go func() {
for l := range w.buf {
w.out.Write([]byte(l))
}
w.wg.Done()
}()
}
// Close flushes the rest of the output and closes the writer.
// It is not legal to write to this LineBufferedWriter after closing.
func (w *LineBufferedWriter) Close() error {
close(w.buf)
w.wg.Wait()
return nil
}
// Write lines to this buffered writer. Lines may be interleaved with
// others being written at "the same time", as each line is treated as
// an individual write.
func (w LineBufferedWriter) Write(b []byte) (n int, err error) {
r := bufio.NewScanner(bytes.NewReader(b))
for r.Scan() {
w.buf <- w.prefix + r.Text() + "\n"
}
if err := r.Err(); err != nil {
return len(b), err
}
return len(b), nil
}
func doi(host, cmd string, out io.Writer) error {
c := exec.Command("ssh", host, cmd)
outw := NewLineBufferedWriter(out, fmt.Sprintf("[%s] ", host))
c.Stdout = outw
c.Stderr = outw
return c.Run()
}
func main() {
args := flag.Args()
if len(args) == 0 {
@@ -139,11 +195,22 @@ func main() {
for i := 0; i < par; i++ {
go func() {
for host := range hostch {
out, err := do(host, cmd)
output <- string(out)
if err != nil {
atomic.StoreInt64(&code, 1)
// if we're interleaving output it's slightly different
// so we just branch here
if cfg.interleave {
for host := range hostch {
err := doi(host, cmd, os.Stdout)
if err != nil {
atomic.StoreInt64(&code, 1)
}
}
} else {
for host := range hostch {
out, err := do(host, cmd)
output <- string(out)
if err != nil {
atomic.StoreInt64(&code, 1)
}
}
}
wg.Done()

0 comments on commit 46e39ed

Please sign in to comment.