Skip to content

Commit

Permalink
Separate JetStream into smaller interfaces
Browse files Browse the repository at this point in the history
This separates the JetStream interface into two smaller interfaces:
A JetStream interface which is for producing and consuming messages
and a JetStreamManager interface for creating streams/consumers.

This also adds a new interface that is the compound of both called
JetStreamContext and is the one that is being returned when calling
`nc.JetStream()`.

This change allows to opt-in to the behaviors provided by the
JetStreamContext as needed, for example:

```go
// Can be used to produce/consume messages, but not for creating streams
var js nats.JetStream
js, err = nc.JetStream()

// Can be used for managing streams/consumers
var jsm nats.JetStreamManager
js, err = nc.JetStream()

// Can still be used to both create streams and publish
js, err := nc.JetStream()
```

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Dec 11, 2020
1 parent c1932e1 commit 5bbcdc9
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
14 changes: 10 additions & 4 deletions js.go
Expand Up @@ -25,7 +25,7 @@ import (
"time"
)

// JetStream is the public interface for the JetStream context.
// JetStream is the public interface for JetStream.
type JetStream interface {
// Publishing messages to JetStream.
Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
Expand All @@ -38,16 +38,22 @@ type JetStream interface {
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
// QueueSubscribe.
QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
}

// Management
// JetStreamManager is the public interface for managing JetStream streams & consumers.
type JetStreamManager interface {
// Create a stream.
AddStream(cfg *StreamConfig) (*StreamInfo, error)
// Create a consumer.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)
// Stream information.
StreamInfo(stream string) (*StreamInfo, error)
}

// TODO(dlc) - add more
// JetStream is the public interface for the JetStream context.
type JetStreamContext interface {
JetStream
JetStreamManager
}

// APIError is included in all API responses if there was an error.
Expand Down Expand Up @@ -116,7 +122,7 @@ const (
)

// JetStream returns a JetStream context for pub/sub interactions.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStream, error) {
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
const defaultRequestWait = 5 * time.Second

js := &js{nc: nc, pre: JSDefaultAPIPrefix, wait: defaultRequestWait}
Expand Down
47 changes: 47 additions & 0 deletions test/js_test.go
Expand Up @@ -834,6 +834,53 @@ func TestJetStreamAutoMaxAckPending(t *testing.T) {
}
}

func TestJetStreamInterfaces(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

var js nats.JetStream
var jsm nats.JetStreamManager
var jsctx nats.JetStreamContext

// JetStream that can publish/subscribe but cannot manage streams.
js, err = nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
js.Publish("foo", []byte("hello"))

// JetStream context that can manage streams/consumers but cannot produce messages.
jsm, err = nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
jsm.AddStream(&nats.StreamConfig{Name: "FOO"})

// JetStream context that can both manage streams/consumers
// as well as publish/subscribe.
jsctx, err = nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
jsctx.AddStream(&nats.StreamConfig{Name: "BAR"})
jsctx.Publish("bar", []byte("hello world"))

publishMsg := func(js nats.JetStream, payload []byte) {
js.Publish("foo", payload)
}
publishMsg(js, []byte("hello world"))
}

// WIP(dlc) - This is in support of stall based tests and processing.
func TestJetStreamPullBasedStall(t *testing.T) {
t.SkipNow()
Expand Down

0 comments on commit 5bbcdc9

Please sign in to comment.