Skip to content

Commit

Permalink
Merge aa63249 into a3519e1
Browse files Browse the repository at this point in the history
  • Loading branch information
variadico committed Dec 17, 2020
2 parents a3519e1 + aa63249 commit 8facbdb
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 5 deletions.
25 changes: 20 additions & 5 deletions js.go
Expand Up @@ -168,10 +168,27 @@ func (opt jsOptFn) configureJSContext(opts *js) error {

func APIPrefix(pre string) JSOpt {
return jsOptFn(func(js *js) error {
js.pre = pre
if !strings.HasSuffix(js.pre, ".") {
js.pre = js.pre + "."
pre = strings.TrimSuffix(pre, ".")

if pre == ">" {
return ErrJetStreamBadPre
}

toks := strings.Split(pre, ".")
for i, tok := range toks {
if tok == "" || tok == "*" {
return ErrJetStreamBadPre
}

if tok == ">" && i < len(toks)-1 {
return ErrBadSubject
} else if tok == ">" && i == len(toks)-1 {
js.pre = pre
return nil
}
}

js.pre = fmt.Sprintf("%s.", pre)
return nil
})
}
Expand Down Expand Up @@ -617,7 +634,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []

func (js *js) lookupStreamBySubject(subj string) (string, error) {
var slr JSApiStreamNamesResponse
// FIXME(dlc) - prefix
req := &streamRequest{subj}
j, err := json.Marshal(req)
if err != nil {
Expand Down Expand Up @@ -734,7 +750,6 @@ func (sub *Subscription) Poll() error {
}

func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
// FIXME(dlc) - prefix
ccInfoSubj := fmt.Sprintf(JSApiConsumerInfoT, stream, consumer)
resp, err := js.nc.Request(js.apiSubj(ccInfoSubj), nil, js.wait)
if err != nil {
Expand Down
58 changes: 58 additions & 0 deletions nats_test.go
Expand Up @@ -2599,3 +2599,61 @@ func TestMsg_RespondMsg(t *testing.T) {
t.Fatalf("did not get correct response: %q", resp.Data)
}
}

func TestJetStreamAPIPrefix(t *testing.T) {
cases := []struct {
prefix string
wantPrefix string
wantErr bool
}{
{
prefix: "foo",
wantPrefix: "foo.",
},
{
prefix: "foo.",
wantPrefix: "foo.",
},
{
prefix: "foo.>",
wantPrefix: "foo.>",
},
{
prefix: "foo.b*r.baz",
wantPrefix: "foo.b*r.baz.",
},
{
prefix: "foo.*",
wantErr: true,
},
{
prefix: "foo.*.bar",
wantErr: true,
},
{
prefix: "foo.>.bar",
wantErr: true,
},
{
prefix: ">",
wantErr: true,
},
}
for _, c := range cases {
t.Run(c.prefix, func(t *testing.T) {
jsOpt := APIPrefix(c.prefix)

js := new(js)
if err := jsOpt.configureJSContext(js); err != nil && !c.wantErr {
t.Fatal(err)
} else if err == nil && c.wantErr {
t.Fatal("unexpected success")
}

if js.pre != c.wantPrefix {
t.Error("unexpected api prefix")
t.Fatalf("got=%s; want=%s", js.pre, c.wantPrefix)
}
})
}
}

0 comments on commit 8facbdb

Please sign in to comment.