Skip to content

Commit

Permalink
Add ExternalStream field to StreamSource
Browse files Browse the repository at this point in the history
  • Loading branch information
nsurfer committed Mar 4, 2021
1 parent 9faf9b2 commit e4a53fb
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 4 deletions.
16 changes: 12 additions & 4 deletions jsm.go
Expand Up @@ -95,10 +95,18 @@ type Placement struct {

// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
External *ExternalStream `json:"external,omitempty"`
}

// ExternalStream allows you to qualify access to a stream source in another
// account.
type ExternalStream struct {
APIPrefix string `json:"api"`
DeliverPrefix string `json:"deliver"`
}

// apiError is included in all API responses if there was an error.
Expand Down
129 changes: 129 additions & 0 deletions test/js_test.go
Expand Up @@ -1695,6 +1695,135 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
}
}

func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
no_auth_user: rip
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
accounts {
JS {
jetstream: enabled
users = [ { user: "rip", pass: "pass" } ]
exports [
{ service: "$JS.API.CONSUMER.>" } # To create internal consumers to mirror/source.
{ stream: "RI.DELIVER.SYNC.>" } # For the mirror/source consumers sending to IA via delivery subject.
]
}
IA {
jetstream: enabled
users = [ { user: "dlc", pass: "pass" } ]
imports [
{ service: { account: JS, subject: "$JS.API.CONSUMER.>"}, to: "RI.JS.API.CONSUMER.>" }
{ stream: { account: JS, subject: "RI.DELIVER.SYNC.>"} }
]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`))
defer os.Remove(conf)

s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc1, err := nats.Connect(s.ClientURL(), nats.UserInfo("rip", "pass"))
if err != nil {
t.Fatal(err)
}
defer nc1.Close()
js1, err := nc1.JetStream()
if err != nil {
t.Fatal(err)
}

_, err = js1.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 2,
})
if err != nil {
t.Fatal(err)
}

toSend := 100
for i := 0; i < toSend; i++ {
if _, err := js1.Publish("TEST", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}

nc2, err := nats.Connect(s.ClientURL(), nats.UserInfo("dlc", "pass"))
if err != nil {
t.Fatal(err)
}
defer nc2.Close()
js2, err := nc1.JetStream()
if err != nil {
t.Fatal(err)
}

checkMsgCount := func(t *testing.T, js nats.JetStreamManager, timeout time.Duration, want int) {
t.Helper()

deadline := time.Now().Add(timeout)
var loopErr error
for time.Now().Before(deadline) {
si, err := js2.StreamInfo("MY_MIRROR_TEST")
if err != nil {
loopErr = err
continue
}
loopErr = nil

if got := int(si.State.Msgs); got != want {
loopErr = fmt.Errorf("Unexpected msg count, got %d, want %d", got, want)
continue
}
loopErr = nil
break
}
if loopErr != nil {
t.Fatal(loopErr)
}
}

_, err = js2.AddStream(&nats.StreamConfig{
Name: "MY_MIRROR_TEST",
Storage: nats.FileStorage,
Mirror: &nats.StreamSource{
Name: "TEST",
External: &nats.ExternalStream{
APIPrefix: "RI.JS.API",
DeliverPrefix: "RI.DELIVER.SYNC.MIRRORS",
},
},
})
if err != nil {
t.Fatal(err)
}
checkMsgCount(t, js2, 2*time.Second, toSend)

_, err = js2.AddStream(&nats.StreamConfig{
Name: "MY_SOURCE_TEST",
Storage: nats.FileStorage,
Sources: []*nats.StreamSource{
&nats.StreamSource{
Name: "TEST",
External: &nats.ExternalStream{
APIPrefix: "RI.JS.API",
DeliverPrefix: "RI.DELIVER.SYNC.SOURCES",
},
},
},
})
if err != nil {
t.Fatal(err)
}
checkMsgCount(t, js2, 2*time.Second, toSend)
}

func TestJetStreamAutoMaxAckPending(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
Expand Down

0 comments on commit e4a53fb

Please sign in to comment.