Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --count option to sub #250

Merged
merged 1 commit into from
Sep 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ require (
github.com/guptarohit/asciigraph v0.5.2
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/klauspost/compress v1.13.4
github.com/nats-io/jsm.go v0.0.26
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f
github.com/nats-io/nats.go v1.12.0
github.com/nats-io/jsm.go v0.0.27-0.20210922085804-a34fc486465d
github.com/nats-io/nats-server/v2 v2.6.0
github.com/nats-io/nats.go v1.12.3
github.com/nats-io/nuid v1.0.1
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/common v0.26.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,16 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nats-io/jsm.go v0.0.26 h1:Ocj9wy/tfGJ+lOaxyYwLv2skG5CFSLuhRlg7DhP3WQI=
github.com/nats-io/jsm.go v0.0.26/go.mod h1:jeU4Spx3HMszhjbvCyQNoWKdIJaj8ResDalLHFysbew=
github.com/nats-io/jsm.go v0.0.27-0.20210922085804-a34fc486465d h1:ZekDq4dQYIL4mMi9t+wERdC/SuDBom+F25t3EhKVS8g=
github.com/nats-io/jsm.go v0.0.27-0.20210922085804-a34fc486465d/go.mod h1:vvRrQ4L48Ny6I7Fc3SZGp8SjX4XyN8KC70RNOlA64dI=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI=
github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY=
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f h1:r+bnrlIkFeYHwqxvFGwHT3ajx3Yji6frxH+X+ZgGxeA=
github.com/nats-io/nats-server/v2 v2.4.1-0.20210907200628-874c79fe411f/go.mod h1:TUAhMFYh1VISyY/D4WKJUMuGHg8yHtoUTuxkbiej1lc=
github.com/nats-io/nats.go v1.12.0 h1:n0oZzK2aIZDMKuEiMKJ9qkCUgVY5vTAAksSXtLlz5Xc=
github.com/nats-io/nats.go v1.12.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.6.0 h1:OAt+ef+9QaaNdn4uTyQC372bv1ZZqC0vZ1I9YxWqjwI=
github.com/nats-io/nats-server/v2 v2.6.0/go.mod h1:Az91TbZiV7K4a6k/4v6YYdOKEoxCXj+iqhHVf/MlrKo=
github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E=
github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
4 changes: 2 additions & 2 deletions nats/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestCLIConsumerAdd(t *testing.T) {
srv, _, mgr := setupConsTest(t)
defer srv.Shutdown()

runNatsCli(t, fmt.Sprintf("--server='%s' con add mem1 push1 --max-pending 10 --replay instant --deliver all --target out.push1 --ack explicit --filter '' --deliver-group '' --max-deliver 20 --bps 1024 --heartbeat=-1 --flow-control --description 'test suite'", srv.ClientURL()))
runNatsCli(t, fmt.Sprintf("--server='%s' con add mem1 push1 --max-pending 10 --replay instant --deliver all --target out.push1 --ack explicit --filter '' --deliver-group '' --max-deliver 20 --bps 1024 --heartbeat=1s --flow-control --description 'test suite'", srv.ClientURL()))
consumerShouldExist(t, mgr, "mem1", "push1")
push1, err := mgr.LoadConsumer("mem1", "push1")
checkErr(t, err, "push1 could not be loaded")
Expand Down Expand Up @@ -497,7 +497,7 @@ func TestCLIConsumerAdd(t *testing.T) {
runNatsCli(t, fmt.Sprintf("--server='%s' con add mem1 pull1 --config testdata/mem1_pull1_consumer.json", srv.ClientURL()))
consumerShouldExist(t, mgr, "mem1", "pull1")

runNatsCli(t, fmt.Sprintf("--server='%s' con add mem1 push1 --filter 'js.mem.>' --max-pending 10 --replay instant --deliver subject --target out.push1 --ack explicit --deliver-group BOB --max-deliver 20 --bps 1024 --heartbeat=-1 --flow-control --description 'test suite'", srv.ClientURL()))
runNatsCli(t, fmt.Sprintf("--server='%s' con add mem1 push1 --filter 'js.mem.>' --max-pending 10 --replay instant --deliver subject --target out.push1 --ack explicit --deliver-group BOB --max-deliver 20 --bps 1024 --heartbeat=1s --flow-control --description 'test suite'", srv.ClientURL()))
consumerShouldExist(t, mgr, "mem1", "push1")
push1, err = mgr.LoadConsumer("mem1", "push1")
checkErr(t, err, "push1 could not be loaded")
Expand Down
2 changes: 1 addition & 1 deletion nats/server_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ nats server req accounts --account WEATHER
nats server req jsz --leader

# To manage JetStream cluster RAFT membership
nats server cluster raft step-down
nats server raft step-down
`
}
37 changes: 25 additions & 12 deletions nats/sub_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type subCmd struct {
jsAck bool
inbox bool
dump string
limit uint
}

func configureSubCommand(app *kingpin.Application) {
Expand All @@ -45,6 +46,7 @@ func configureSubCommand(app *kingpin.Application) {
act.Flag("raw", "Show the raw data received").Short('r').BoolVar(&c.raw)
act.Flag("ack", "Acknowledge JetStream message that have the correct metadata").BoolVar(&c.jsAck)
act.Flag("inbox", "Subscribes to a generate inbox").Short('i').BoolVar(&c.inbox)
act.Flag("count", "Quit after receiving this many messages").UintVar(&c.limit)
act.Flag("dump", "Dump received messages to files, 1 file per message. Specify - for null terminated STDOUT for use with xargs -0").PlaceHolder("DIRECTORY").StringVar(&c.dump)

cheats["sub"] = `# To subscribe to messages, in a queue group and acknowledge any JetStream ones
Expand Down Expand Up @@ -74,10 +76,14 @@ func (c *subCmd) subscribe(_ *kingpin.ParseContext) error {
}
defer nc.Close()

i := 0
mu := sync.Mutex{}
dump := c.dump != ""
ctr := 0
var (
sub *nats.Subscription
mu = sync.Mutex{}
dump = c.dump != ""
ctr = uint(0)
ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()

if c.dump == "-" && c.inbox {
return fmt.Errorf("generating inboxes is not compatible with dumping to stdout using null terminated strings")
Expand All @@ -87,8 +93,6 @@ func (c *subCmd) subscribe(_ *kingpin.ParseContext) error {
mu.Lock()
defer mu.Unlock()

i += 1

var info *jsm.MsgInfo
if m.Reply != "" {
info, _ = jsm.ParseJSMsgMetadata(m)
Expand Down Expand Up @@ -159,13 +163,13 @@ func (c *subCmd) subscribe(_ *kingpin.ParseContext) error {

if info == nil {
if m.Reply != "" {
fmt.Printf("[#%d] Received on %q with reply %q\n", i, m.Subject, m.Reply)
fmt.Printf("[#%d] Received on %q with reply %q\n", ctr, m.Subject, m.Reply)
} else {
fmt.Printf("[#%d] Received on %q\n", i, m.Subject)
fmt.Printf("[#%d] Received on %q\n", ctr, m.Subject)
}

} else {
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d / ack: %v\n", i, info.Stream(), info.Consumer(), m.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence(), c.jsAck)
fmt.Printf("[#%d] Received JetStream message: consumer: %s > %s / subject: %s / delivered: %d / consumer seq: %d / stream seq: %d / ack: %v\n", ctr, info.Stream(), info.Consumer(), m.Subject, info.Delivered(), info.ConsumerSequence(), info.StreamSequence(), c.jsAck)
}

if len(m.Header) > 0 {
Expand All @@ -182,6 +186,11 @@ func (c *subCmd) subscribe(_ *kingpin.ParseContext) error {
if !strings.HasSuffix(string(m.Data), "\n") {
fmt.Println()
}

if ctr == c.limit {
sub.Unsubscribe()
cancel()
}
}

if (!c.raw && c.dump == "") || c.inbox {
Expand All @@ -193,18 +202,22 @@ func (c *subCmd) subscribe(_ *kingpin.ParseContext) error {
}

if c.queue != "" {
nc.QueueSubscribe(c.subject, c.queue, handler)
sub, err = nc.QueueSubscribe(c.subject, c.queue, handler)
} else {
nc.Subscribe(c.subject, handler)
sub, err = nc.Subscribe(c.subject, handler)
}
if err != nil {
return err
}

nc.Flush()

err = nc.LastError()
if err != nil {
return err
}

<-context.Background().Done()
<-ctx.Done()

return nil
}