Skip to content

Commit

Permalink
Add --count option to sub
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <rip@devco.net>
  • Loading branch information
ripienaar committed Sep 23, 2021
1 parent bc90a06 commit 71341f5
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 24 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ require (
github.com/guptarohit/asciigraph v0.5.2
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/klauspost/compress v1.13.4
github.com/nats-io/jsm.go v0.0.26
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f
github.com/nats-io/nats.go v1.12.0
github.com/nats-io/jsm.go v0.0.27-0.20210922085804-a34fc486465d
github.com/nats-io/nats-server/v2 v2.6.0
github.com/nats-io/nats.go v1.12.3
github.com/nats-io/nuid v1.0.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.26.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,16 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jsm.go v0.0.26 h1:Ocj9wy/tfGJ+lOaxyYwLv2skG5CFSLuhRlg7DhP3WQI=
github.com/nats-io/jsm.go v0.0.26/go.mod h1:jeU4Spx3HMszhjbvCyQNoWKdIJaj8ResDalLHFysbew=
github.com/nats-io/jsm.go v0.0.27-0.20210922085804-a34fc486465d h1:ZekDq4dQYIL4mMi9t+wERdC/SuDBom+F25t3EhKVS8g=
github.com/nats-io/jsm.go v0.0.27-0.20210922085804-a34fc486465d/go.mod h1:vvRrQ4L48Ny6I7Fc3SZGp8SjX4XyN8KC70RNOlA64dI=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f h1:r+bnrlIkFeYHwqxvFGwHT3ajx3Yji6frxH+X+ZgGxeA=
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f/go.mod h1:TUAhMFYh1VISyY/D4WKJUMuGHg8yHtoUTuxkbiej1lc=
github.com/nats-io/nats.go v1.12.0 h1:n0oZzK2aIZDMKuEiMKJ9qkCUgVY5vTAAksSXtLlz5Xc=
github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.6.0 h1:OAt+ef+9QaaNdn4uTyQC372bv1ZZqC0vZ1I9YxWqjwI=
github.com/nats-io/nats-server/v2 v2.6.0/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo=
github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E=
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
4 changes: 2 additions & 2 deletions nats/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestCLIConsumerAdd(t *testing.T) {
srv, _, mgr := setupConsTest(t)
defer srv.Shutdown()

runNatsCli(t, fmt.Sprintf("--server='%s' con add mem1 push1 --max-pending 10 --replay instant --deliver all --target out.push1 --ack explicit --filter '' --deliver-group '' --max-deliver 20 --bps 1024 --heartbeat=-1 --flow-control --description 'test suite'", srv.ClientURL()))
runNatsCli(t, fmt.Sprintf("--server='%s' con add mem1 push1 --max-pending 10 --replay instant --deliver all --target out.push1 --ack explicit --filter '' --deliver-group '' --max-deliver 20 --bps 1024 --heartbeat=1s --flow-control --description 'test suite'", srv.ClientURL()))
consumerShouldExist(t, mgr, "mem1", "push1")
push1, err := mgr.LoadConsumer("mem1", "push1")
checkErr(t, err, "push1 could not be loaded")
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestCLIConsumerAdd(t *testing.T) {
runNatsCli(t, fmt.Sprintf("--server='%s' con add mem1 pull1 --config testdata/mem1_pull1_consumer.json", srv.ClientURL()))
consumerShouldExist(t, mgr, "mem1", "pull1")

runNatsCli(t, fmt.Sprintf("--server='%s' con add mem1 push1 --filter 'js.mem.>' --max-pending 10 --replay instant --deliver subject --target out.push1 --ack explicit --deliver-group BOB --max-deliver 20 --bps 1024 --heartbeat=-1 --flow-control --description 'test suite'", srv.ClientURL()))
runNatsCli(t, fmt.Sprintf("--server='%s' con add mem1 push1 --filter 'js.mem.>' --max-pending 10 --replay instant --deliver subject --target out.push1 --ack explicit --deliver-group BOB --max-deliver 20 --bps 1024 --heartbeat=1s --flow-control --description 'test suite'", srv.ClientURL()))
consumerShouldExist(t, mgr, "mem1", "push1")
push1, err = mgr.LoadConsumer("mem1", "push1")
checkErr(t, err, "push1 could not be loaded")
Expand Down
2 changes: 1 addition & 1 deletion nats/server_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ nats server req accounts --account WEATHER
nats server req jsz --leader
# To manage JetStream cluster RAFT membership
nats server cluster raft step-down
nats server raft step-down
`
}
37 changes: 25 additions & 12 deletions nats/sub_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type subCmd struct {
jsAck bool
inbox bool
dump string
limit uint
}

func configureSubCommand(app *kingpin.Application) {
Expand All @@ -45,6 +46,7 @@ func configureSubCommand(app *kingpin.Application) {
act.Flag("raw", "Show the raw data received").Short('r').BoolVar(&c.raw)
act.Flag("ack", "Acknowledge JetStream message that have the correct metadata").BoolVar(&c.jsAck)
act.Flag("inbox", "Subscribes to a generate inbox").Short('i').BoolVar(&c.inbox)
act.Flag("count", "Quit after receiving this many messages").UintVar(&c.limit)
act.Flag("dump", "Dump received messages to files, 1 file per message. Specify - for null terminated STDOUT for use with xargs -0").PlaceHolder("DIRECTORY").StringVar(&c.dump)

cheats["sub"] = `# To subscribe to messages, in a queue group and acknowledge any JetStream ones
Expand Down Expand Up @@ -74,10 +76,14 @@ func (c *subCmd) subscribe(_ *kingpin.ParseContext) error {
}
defer nc.Close()

i := 0
mu := sync.Mutex{}
dump := c.dump != ""
ctr := 0
var (
sub *nats.Subscription
mu = sync.Mutex{}
dump = c.dump != ""
ctr = uint(0)
ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()

if c.dump == "-" && c.inbox {
return fmt.Errorf("generating inboxes is not compatible with dumping to stdout using null terminated strings")
Expand All @@ -87,8 +93,6 @@ func (c *subCmd) subscribe(_ *kingpin.ParseContext) error {
mu.Lock()
defer mu.Unlock()

i += 1

var info *jsm.MsgInfo
if m.Reply != "" {
info, _ = jsm.ParseJSMsgMetadata(m)
Expand Down Expand Up @@ -159,13 +163,13 @@ func (c *subCmd) subscribe(_ *kingpin.ParseContext) error {

if info == nil {
if m.Reply != "" {
fmt.Printf("[#%d] Received on %q with reply %q\n", i, m.Subject, m.Reply)
fmt.Printf("[#%d] Received on %q with reply %q\n", ctr, m.Subject, m.Reply)
} else {
fmt.Printf("[#%d] Received on %q\n", i, m.Subject)
fmt.Printf("[#%d] Received on %q\n", ctr, m.Subject)
}

} else {
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d / ack: %v\n", i, info.Stream(), info.Consumer(), m.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence(), c.jsAck)
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d / ack: %v\n", ctr, info.Stream(), info.Consumer(), m.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence(), c.jsAck)
}

if len(m.Header) > 0 {
Expand All @@ -182,6 +186,11 @@ func (c *subCmd) subscribe(_ *kingpin.ParseContext) error {
if !strings.HasSuffix(string(m.Data), "\n") {
fmt.Println()
}

if ctr == c.limit {
sub.Unsubscribe()
cancel()
}
}

if (!c.raw && c.dump == "") || c.inbox {
Expand All @@ -193,18 +202,22 @@ func (c *subCmd) subscribe(_ *kingpin.ParseContext) error {
}

if c.queue != "" {
nc.QueueSubscribe(c.subject, c.queue, handler)
sub, err = nc.QueueSubscribe(c.subject, c.queue, handler)
} else {
nc.Subscribe(c.subject, handler)
sub, err = nc.Subscribe(c.subject, handler)
}
if err != nil {
return err
}

nc.Flush()

err = nc.LastError()
if err != nil {
return err
}

<-context.Background().Done()
<-ctx.Done()

return nil
}

0 comments on commit 71341f5

Please sign in to comment.