Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add --eta for progress bar #39

Merged
merged 1 commit into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/cznic/sortutil"
"github.com/pkg/errors"
pb "github.com/schollz/progressbar/v3"
"github.com/shenwei356/go-logging"
psutil "github.com/shirou/gopsutil/process"
)
Expand Down Expand Up @@ -1138,8 +1139,12 @@ func (c *Command) run(opts *Options, tryNumber int) error {

// Options contains the options
type Options struct {
DryRun bool // just print command
Jobs int // max jobs number
DryRun bool // just print command
Jobs int // max jobs number

ETA bool // show eta
ETABar *pb.ProgressBar

KeepOrder bool // keep output order
Retries int // max retry chances
RetryInterval time.Duration // retry interval
Expand Down Expand Up @@ -1201,6 +1206,9 @@ func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan
chOut <- msg
}
c.Cleanup()
if opts.ETA {
opts.ETABar.Add(1)
}

// if Verbose {
// Log.Debugf("receive %d bytes from cmd #%d\n", N, c.ID)
Expand Down Expand Up @@ -1231,6 +1239,9 @@ func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan
chOut <- msg
}
c.Cleanup()
if opts.ETA {
opts.ETABar.Add(1)
}

id++
} else { // wait the ID come out
Expand All @@ -1240,6 +1251,9 @@ func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan
chOut <- msg
}
c1.Cleanup()
if opts.ETA {
opts.ETABar.Add(1)
}

delete(cmds, c1.ID)
id++
Expand All @@ -1264,6 +1278,9 @@ func Run4Output(opts *Options, cancel chan struct{}, chCmdStr chan string) (chan
chOut <- msg
}
c.Cleanup()
if opts.ETA {
opts.ETABar.Add(1)
}
}
}

Expand Down
154 changes: 90 additions & 64 deletions root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"time"

"github.com/pkg/errors"
pb "github.com/schollz/progressbar/v3"
"github.com/shenwei356/rush/process"
"github.com/shenwei356/util/stringutil"
"github.com/shenwei356/xopen"
Expand Down Expand Up @@ -132,6 +133,7 @@ Homepage: https://github.com/shenwei356/rush
opts := &process.Options{
DryRun: config.DryRun,
Jobs: config.Jobs,
ETA: config.ETA,
KeepOrder: config.KeepOrder,
Retries: config.Retries,
RetryInterval: time.Duration(config.RetryInterval) * time.Second,
Expand Down Expand Up @@ -179,76 +181,70 @@ Homepage: https://github.com/shenwei356/rush

anyCommands := false

var inputlines []string
for _, file := range config.Infiles {
// input file handler
var infh *os.File
if isStdin(file) {
infh = os.Stdin
} else {
infh, err = os.Open(file)
checkError(err)
defer infh.Close()
}
scanner := bufio.NewScanner(infh)
// 2147483647: max int32
scanner.Buffer(make([]byte, 0, 16384), 2147483647)
scanner.Split(split)

for scanner.Scan() {
select {
case <-cancel:
if config.Verbose {
log.Warningf("cancel reading file: %s", file)
}
default:
}
inputlines = append(inputlines, scanner.Text())
}
checkError(errors.Wrap(scanner.Err(), "read input data"))
}

if opts.ETA {
jobcount := len(inputlines) / config.NRecords
if len(inputlines)%config.NRecords > 0 {
jobcount++
}
opts.ETABar = pb.NewOptions(jobcount,
pb.OptionShowCount(),
pb.OptionShowIts(),
pb.OptionSetItsString("jobs"),
pb.OptionSetRenderBlankState(true),
pb.OptionSetTheme(pb.Theme{
Saucer: "=",
SaucerHead: ">",
SaucerPadding: "_",
BarStart: "[",
BarEnd: "]",
}))
}

// read data and generate command
go func() {
n := config.NRecords
var id uint64 = 1

READFILES:
for _, file := range config.Infiles {
// input file handler
var infh *os.File
if isStdin(file) {
infh = os.Stdin
} else {
infh, err = os.Open(file)
checkError(err)
defer infh.Close()
var records []string
records = make([]string, 0, n)
var cmdStr string
var runned bool
for _, record := range inputlines {
if record == "" {
continue
}
records = append(records, record)

scanner := bufio.NewScanner(infh)
// 2147483647: max int32
scanner.Buffer(make([]byte, 0, 16384), 2147483647)
scanner.Split(split)

var record string
var records []string
records = make([]string, 0, n)
var cmdStr string
var runned bool
for scanner.Scan() {
select {
case <-cancel:
if config.Verbose {
log.Warningf("cancel reading file: %s", file)
}
break READFILES
default:
}

record = scanner.Text()
if record == "" {
continue
}
records = append(records, record)

if len(records) == n {
cmdStr, err = fillCommand(config, command0, Chunk{ID: id, Data: records})
checkError(errors.Wrap(err, "fill command"))
if config.Escape {
cmdStr = stringutil.EscapeSymbols(cmdStr, config.EscapeSymbols)
}
if len(cmdStr) > 0 {
if config.Continue {
if _, runned = succCmds[cmdStr]; runned {
log.Infof("ignore cmd: %s", cmdStr)
// bfhSuccCmds.WriteString(cmdStr + endMarkOfCMD)
// bfhSuccCmds.Flush()
} else {
chCmdStr <- cmdStr
anyCommands = true
}
} else {
chCmdStr <- cmdStr
anyCommands = true
}

id++
}
records = make([]string, 0, n)
}
}
if len(records) > 0 {
if len(records) == n {
cmdStr, err = fillCommand(config, command0, Chunk{ID: id, Data: records})
checkError(errors.Wrap(err, "fill command"))
if config.Escape {
Expand All @@ -268,9 +264,33 @@ Homepage: https://github.com/shenwei356/rush
chCmdStr <- cmdStr
anyCommands = true
}

id++
}
records = make([]string, 0, n)
}
}
if len(records) > 0 {
cmdStr, err = fillCommand(config, command0, Chunk{ID: id, Data: records})
checkError(errors.Wrap(err, "fill command"))
if config.Escape {
cmdStr = stringutil.EscapeSymbols(cmdStr, config.EscapeSymbols)
}
if len(cmdStr) > 0 {
if config.Continue {
if _, runned = succCmds[cmdStr]; runned {
log.Infof("ignore cmd: %s", cmdStr)
// bfhSuccCmds.WriteString(cmdStr + endMarkOfCMD)
// bfhSuccCmds.Flush()
} else {
chCmdStr <- cmdStr
anyCommands = true
}
} else {
chCmdStr <- cmdStr
anyCommands = true
}
}
checkError(errors.Wrap(scanner.Err(), "read input data"))
}

close(chCmdStr)
Expand Down Expand Up @@ -365,6 +385,9 @@ Homepage: https://github.com/shenwei356/rush
<-donePreprocessFiles // finish read data and send command
<-doneSendOutput // finish send output
<-doneOutput // finish print output
if opts.ETA {
opts.ETABar.Finish()
}
if config.PropExitStatus {
<-doneExitStatus
}
Expand Down Expand Up @@ -405,6 +428,7 @@ func Execute() {
func init() {
RootCmd.Flags().BoolP("verbose", "", false, "print verbose information")
RootCmd.Flags().BoolP("version", "V", false, `print version information and check for update`)
RootCmd.Flags().BoolP("eta", "", false, `eta progress bar`)

RootCmd.Flags().IntP("jobs", "j", runtime.NumCPU(), "run n jobs in parallel (default value depends on your device)")
RootCmd.Flags().StringP("out-file", "o", "-", `out file ("-" for stdout)`)
Expand Down Expand Up @@ -527,6 +551,7 @@ Use "{{.CommandPath}} [command] --help" for more information about a command.{{e
type Config struct {
Verbose bool
Version bool
ETA bool

Jobs int
OutFile string
Expand Down Expand Up @@ -599,6 +624,7 @@ func getConfigs(cmd *cobra.Command) Config {
return Config{
Verbose: getFlagBool(cmd, "verbose"),
Version: getFlagBool(cmd, "version"),
ETA: getFlagBool(cmd, "eta"),

Jobs: getFlagPositiveInt(cmd, "jobs"),
OutFile: getFlagString(cmd, "out-file"),
Expand Down