Permalink
Browse files

use -n to control parallelism

  • Loading branch information...
jmoiron committed Dec 22, 2016
1 parent fd3cb7a commit ca16f25edcce01c1f40067b5ed16a5b1d2c6ac48
Showing with 52 additions and 38 deletions.
  1. +52 −38 dmc.go
View
90 dmc.go
@@ -2,13 +2,15 @@ package main
import (
"bufio"
"bytes"
"flag"
"fmt"
"net"
"os"
"os/exec"
"strings"
"sync"
"sync/atomic"
"golang.org/x/crypto/ssh/terminal"
)
@@ -44,15 +46,15 @@ var cfg struct {
prefix string
hosts string
dns string
maxHosts int
threads int
}
func init() {
flag.BoolVar(&cfg.verbose, "v", false, "verbose output")
flag.StringVar(&cfg.prefix, "p", "", "prefix for command echo")
flag.StringVar(&cfg.hosts, "hosts", "", "list of hosts")
flag.StringVar(&cfg.dns, "d", "", "dns name for multi-hosts")
flag.IntVar(&cfg.maxHosts, "m", 1024, "Maximum number of hosts the command will run on in parallel")
flag.IntVar(&cfg.threads, "n", 512, "threads to run in parallel")
// flag.BoolVar(&cfg.interleave, "i", false, "interleave output as it is available")
flag.Parse()
}
@@ -93,11 +95,21 @@ func getHosts() []string {
}
func min(a, b int) int {
if a < b {
return a
// do runs cmd on host, writing its output to out.
func do(host, cmd string) ([]byte, error) {
c := exec.Command("ssh", host, cmd)
output, err := c.CombinedOutput()
var buf bytes.Buffer
if err != nil {
fmt.Fprintf(&buf, "%s[%s]$ %s: Error: %s\n", cfg.prefix, color(host, red, true), cmd, err)
if len(output) > 0 {
buf.Write(output)
}
return buf.Bytes(), err
}
return b
fmt.Fprintf(&buf, "%s[%s]$ %s\n%s", cfg.prefix, color(host, green, true), cmd, string(output))
return buf.Bytes(), nil
}
func main() {
@@ -111,50 +123,52 @@ func main() {
cmd := strings.Join(args, " ")
vprintf("Running `%s` on %d hosts\n", cmd, len(hosts))
groups := len(hosts) / cfg.maxHosts
if len(hosts) % cfg.maxHosts != 0 {
groups++
par := cfg.threads
if par > len(hosts) {
par = len(hosts)
}
var wg sync.WaitGroup
var once sync.Once
wg.Add(len(hosts))
output := make(chan string)
return_code := 0
setError := func() {
return_code = 1
}
// output and input channels
output := make(chan string, par)
hostch := make(chan string, par)
var code int64
for i := 0; i < groups; i++ {
beginRange := cfg.maxHosts * i
endRange := min(cfg.maxHosts * (i + 1), len(hosts))
for _, host := range hosts[beginRange:endRange] {
go func(host string) {
defer wg.Done()
c := exec.Command("ssh", host, cmd)
out, err := c.CombinedOutput()
// use par as breadth of parallelism
var wg, outwg sync.WaitGroup
wg.Add(par)
for i := 0; i < par; i++ {
go func() {
for host := range hostch {
out, err := do(host, cmd)
output <- string(out)
if err != nil {
e := fmt.Sprintf("%s[%s]$ %s: Error: %s\n", cfg.prefix, color(host, red, true), cmd, err)
if len(out) > 0 {
e = fmt.Sprintf("%s%s", e, string(out))
}
output <- e
once.Do(setError)
return
atomic.StoreInt64(&code, 1)
}
output <- fmt.Sprintf("%s[%s]$ %s\n%s", cfg.prefix, color(host, green, true), cmd, string(out))
}(host)
}
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(output)
}()
for o := range output {
fmt.Print(o)
// print output as it comes in
outwg.Add(1)
go func() {
for o := range output {
fmt.Print(o)
}
outwg.Done()
}()
for _, host := range hosts {
hostch <- host
}
close(hostch)
outwg.Wait()
os.Exit(return_code)
os.Exit(int(code))
}

0 comments on commit ca16f25

Please sign in to comment.