diff --git a/jsm.go b/jsm.go index fb7851e48..ae2fa1e98 100644 --- a/jsm.go +++ b/jsm.go @@ -95,10 +95,18 @@ type Placement struct { // StreamSource dictates how streams can source from other streams. type StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + External *ExternalStream `json:"external,omitempty"` +} + +// ExternalStream allows you to qualify access to a stream source in another +// account. +type ExternalStream struct { + ApiPrefix string `json:"api"` + DeliverPrefix string `json:"deliver"` } // apiError is included in all API responses if there was an error. diff --git a/test/js_test.go b/test/js_test.go index 902c147f8..9c5e4dfe0 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1695,6 +1695,135 @@ func TestJetStreamImportDirectOnly(t *testing.T) { } } +func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + no_auth_user: rip + jetstream: {max_mem_store: 64GB, max_file_store: 10TB} + accounts { + JS { + jetstream: enabled + users = [ { user: "rip", pass: "pass" } ] + exports [ + { service: "$JS.API.CONSUMER.>" } # To create internal consumers to mirror/source. + { stream: "RI.DELIVER.SYNC.>" } # For the mirror/source consumers sending to IA via delivery subject. + ] + } + IA { + jetstream: enabled + users = [ { user: "dlc", pass: "pass" } ] + imports [ + { service: { account: JS, subject: "$JS.API.CONSUMER.>"}, to: "RI.JS.API.CONSUMER.>" } + { stream: { account: JS, subject: "RI.DELIVER.SYNC.>"} } + ] + } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } + `)) + defer os.Remove(conf) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc1, err := nats.Connect(s.ClientURL(), nats.UserInfo("rip", "pass")) + if err != nil { + t.Fatal(err) + } + defer nc1.Close() + js1, err := nc1.JetStream() + if err != nil { + t.Fatal(err) + } + + _, err = js1.AddStream(&nats.StreamConfig{ + Name: "TEST", + Replicas: 2, + }) + if err != nil { + t.Fatal(err) + } + + toSend := 100 + for i := 0; i < toSend; i++ { + if _, err := js1.Publish("TEST", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + nc2, err := nats.Connect(s.ClientURL(), nats.UserInfo("dlc", "pass")) + if err != nil { + t.Fatal(err) + } + defer nc2.Close() + js2, err := nc1.JetStream() + if err != nil { + t.Fatal(err) + } + + checkMsgCount := func(t *testing.T, js nats.JetStreamManager, timeout time.Duration, want int) { + t.Helper() + + deadline := time.Now().Add(timeout) + var loopErr error + for time.Now().Before(deadline) { + si, err := js2.StreamInfo("MY_MIRROR_TEST") + if err != nil { + loopErr = err + continue + } + loopErr = nil + + if got := int(si.State.Msgs); got != want { + loopErr = fmt.Errorf("Unexpected msg count, got %d, want %d", got, want) + continue + } + loopErr = nil + break + } + if loopErr != nil { + t.Fatal(loopErr) + } + } + + _, err = js2.AddStream(&nats.StreamConfig{ + Name: "MY_MIRROR_TEST", + Storage: nats.FileStorage, + Mirror: &nats.StreamSource{ + Name: "TEST", + External: &nats.ExternalStream{ + ApiPrefix: "RI.JS.API", + DeliverPrefix: "RI.DELIVER.SYNC.MIRRORS", + }, + }, + }) + if err != nil { + t.Fatal(err) + } + checkMsgCount(t, js2, 2*time.Second, toSend) + + _, err = js2.AddStream(&nats.StreamConfig{ + Name: "MY_SOURCE_TEST", + Storage: nats.FileStorage, + Sources: []*nats.StreamSource{ + &nats.StreamSource{ + Name: "TEST", + External: &nats.ExternalStream{ + ApiPrefix: "RI.JS.API", + DeliverPrefix: "RI.DELIVER.SYNC.SOURCES", + }, + }, + }, + }) + if err != nil { + t.Fatal(err) + } + checkMsgCount(t, js2, 2*time.Second, toSend) +} + func TestJetStreamAutoMaxAckPending(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown()