Skip to content

Commit

Permalink
Add flags to the sub command (#1059)
Browse files Browse the repository at this point in the history
* Adds flags to `nats sub`

- `--subjects-only` only prints the subject for each message received
- `--timestamp` adds a time stamp of when the message is received
- `--delta-time` adds a time since the start of the command of when the message is received

---------

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
  • Loading branch information
jnmoyne committed May 23, 2024
1 parent 6264f3e commit 8249277
Showing 1 changed file with 29 additions and 7 deletions.
36 changes: 29 additions & 7 deletions cli/sub_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ type subCmd struct {
jetStream bool
ignoreSubjects []string
wait time.Duration
timeStamps bool
deltaTimeStamps bool
subjectsOnly bool
}

func configureSubCommand(app commandHost) {
Expand All @@ -75,6 +78,7 @@ func configureSubCommand(app commandHost) {
act.Flag("count", "Quit after receiving this many messages").UintVar(&c.limit)
act.Flag("dump", "Dump received messages to files, 1 file per message. Specify - for null terminated STDOUT for use with xargs -0").PlaceHolder("DIRECTORY").StringVar(&c.dump)
act.Flag("headers-only", "Do not render any data, shows only headers").UnNegatableBoolVar(&c.headersOnly)
act.Flag("subjects-only", "Prints only the messages' subjects").UnNegatableBoolVar(&c.subjectsOnly)
act.Flag("start-sequence", "Starts at a specific Stream sequence (requires JetStream)").PlaceHolder("SEQUENCE").Uint64Var(&c.sseq)
act.Flag("all", "Delivers all messages found in the Stream (requires JetStream").UnNegatableBoolVar(&c.deliverAll)
act.Flag("new", "Delivers only future messages (requires JetStream)").UnNegatableBoolVar(&c.deliverNew)
Expand All @@ -86,6 +90,8 @@ func configureSubCommand(app commandHost) {
act.Flag("wait", "Unsubscribe after this amount of time without any traffic").DurationVar(&c.wait)
act.Flag("report-subjects", "Subscribes to a subject pattern and builds a de-duplicated report of active subjects receiving data").UnNegatableBoolVar(&c.reportSubjects)
act.Flag("report-top", "Number of subjects to show when doing 'report-subjects'. Default is 10.").Default("10").IntVar(&c.reportSubjectsCount)
act.Flag("timestamp", "Show timestamps in output").Short('t').UnNegatableBoolVar(&c.timeStamps)
act.Flag("delta-time", "Show time since start in output").Short('d').UnNegatableBoolVar(&c.deltaTimeStamps)
}

func init() {
Expand Down Expand Up @@ -183,6 +189,9 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
if c.reportSubjects && c.reportSubjectsCount == 0 {
return fmt.Errorf("subject count must be at least one")
}
if c.timeStamps && c.deltaTimeStamps {
return fmt.Errorf("timestamp and delta-time flags are mutually exclusive")
}

if c.dump != "" && c.dump != "-" {
err = os.MkdirAll(c.dump, 0700)
Expand All @@ -205,6 +214,8 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {

subjectReportMap map[string]int64
subjectBytesReportMap map[string]int64

startTime = time.Now()
)
defer cancel()

Expand Down Expand Up @@ -266,7 +277,7 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
if c.match && m.Reply != "" {
matchMap[m.Reply] = m
} else {
c.printMsg(m, nil, ctr)
c.printMsg(m, nil, ctr, startTime)
}
}

Expand Down Expand Up @@ -298,7 +309,7 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
return
}

c.printMsg(request, reply, ctr)
c.printMsg(request, reply, ctr, startTime)
delete(matchMap, reply.Subject)

// if reached limit and matched all requests
Expand Down Expand Up @@ -371,7 +382,7 @@ func (c *subCmd) subscribe(p *fisk.ParseContext) error {
nats.AckNone(),
}

if c.headersOnly {
if c.headersOnly || c.subjectsOnly {
opts = append(opts, nats.HeadersOnly())
}

Expand Down Expand Up @@ -489,7 +500,7 @@ func (c *subCmd) firstSubject() string {
return c.subjects[0]
}

func (c *subCmd) printMsg(msg *nats.Msg, reply *nats.Msg, ctr uint) {
func (c *subCmd) printMsg(msg *nats.Msg, reply *nats.Msg, ctr uint, startTime time.Time) {
var info *jsm.MsgInfo
if msg.Reply != "" {
info, _ = jsm.ParseJSMsgMetadata(msg)
Expand All @@ -499,6 +510,13 @@ func (c *subCmd) printMsg(msg *nats.Msg, reply *nats.Msg, ctr uint) {
fmt.Printf("<<< Reply Subject: %v\n", msg.Reply)
}

var timeStamp string
if c.timeStamps {
timeStamp = fmt.Sprintf(" @ %s", time.Now().Format(time.StampMicro))
} else if c.deltaTimeStamps {
timeStamp = fmt.Sprintf(" @ %s", time.Since(startTime).String())
}

if c.dump != "" {
// Output format 1/3: dumping, to stdout or files

Expand Down Expand Up @@ -534,21 +552,25 @@ func (c *subCmd) printMsg(msg *nats.Msg, reply *nats.Msg, ctr uint) {

if info == nil {
if msg.Reply != "" {
fmt.Printf("[#%d] Received on %q with reply %q\n", ctr, msg.Subject, msg.Reply)
fmt.Printf("[#%d]%s Received on %q with reply %q\n", ctr, timeStamp, msg.Subject, msg.Reply)
} else {
fmt.Printf("[#%d] Received on %q\n", ctr, msg.Subject)
fmt.Printf("[#%d]%s Received on %q\n", ctr, timeStamp, msg.Subject)
}
} else if c.jetStream {
fmt.Printf("[#%d] Received JetStream message: stream: %s seq %d / subject: %s / time: %v\n", ctr, info.Stream(), info.StreamSequence(), msg.Subject, info.TimeStamp().Format(time.RFC3339))
} else {
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d\n", ctr, info.Stream(), info.Consumer(), msg.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence())
}

if c.subjectsOnly {
return
}

prettyPrintMsg(msg, c.headersOnly, c.translate)

if reply != nil {
if info == nil {
fmt.Printf("[#%d] Matched reply on %q\n", ctr, reply.Subject)
fmt.Printf("[#%d]%s Matched reply on %q\n", ctr, timeStamp, reply.Subject)
} else if c.jetStream {
fmt.Printf("[#%d] Matched reply JetStream message: stream: %s seq %d / subject: %s / time: %v\n", ctr, info.Stream(), info.StreamSequence(), reply.Subject, info.TimeStamp().Format(time.RFC3339))
} else {
Expand Down

0 comments on commit 8249277

Please sign in to comment.