Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ExternalStream field to StreamSource #671

Merged
merged 1 commit into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 12 additions & 4 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,18 @@ type Placement struct {

// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
External *ExternalStream `json:"external,omitempty"`
}

// ExternalStream allows you to qualify access to a stream source in another
// account.
type ExternalStream struct {
APIPrefix string `json:"api"`
DeliverPrefix string `json:"deliver"`
}

// apiError is included in all API responses if there was an error.
Expand Down
129 changes: 129 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,135 @@ func TestJetStreamImportDirectOnly(t *testing.T) {
}
}

func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
conf := createConfFile(t, []byte(`
listen: 127.0.0.1:-1
no_auth_user: rip
jetstream: {max_mem_store: 64GB, max_file_store: 10TB}
accounts {
JS {
jetstream: enabled
users = [ { user: "rip", pass: "pass" } ]
exports [
{ service: "$JS.API.CONSUMER.>" } # To create internal consumers to mirror/source.
{ stream: "RI.DELIVER.SYNC.>" } # For the mirror/source consumers sending to IA via delivery subject.
]
}
IA {
jetstream: enabled
users = [ { user: "dlc", pass: "pass" } ]
imports [
{ service: { account: JS, subject: "$JS.API.CONSUMER.>"}, to: "RI.JS.API.CONSUMER.>" }
{ stream: { account: JS, subject: "RI.DELIVER.SYNC.>"} }
]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`))
defer os.Remove(conf)

s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc1, err := nats.Connect(s.ClientURL(), nats.UserInfo("rip", "pass"))
if err != nil {
t.Fatal(err)
}
defer nc1.Close()
js1, err := nc1.JetStream()
if err != nil {
t.Fatal(err)
}

_, err = js1.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 2,
})
if err != nil {
t.Fatal(err)
}

toSend := 100
for i := 0; i < toSend; i++ {
if _, err := js1.Publish("TEST", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}

nc2, err := nats.Connect(s.ClientURL(), nats.UserInfo("dlc", "pass"))
if err != nil {
t.Fatal(err)
}
defer nc2.Close()
js2, err := nc1.JetStream()
if err != nil {
t.Fatal(err)
}

checkMsgCount := func(t *testing.T, js nats.JetStreamManager, timeout time.Duration, want int) {
t.Helper()

deadline := time.Now().Add(timeout)
var loopErr error
for time.Now().Before(deadline) {
si, err := js2.StreamInfo("MY_MIRROR_TEST")
if err != nil {
loopErr = err
continue
}
loopErr = nil

if got := int(si.State.Msgs); got != want {
loopErr = fmt.Errorf("Unexpected msg count, got %d, want %d", got, want)
continue
}
loopErr = nil
break
}
if loopErr != nil {
t.Fatal(loopErr)
}
}

_, err = js2.AddStream(&nats.StreamConfig{
Name: "MY_MIRROR_TEST",
Storage: nats.FileStorage,
Mirror: &nats.StreamSource{
Name: "TEST",
External: &nats.ExternalStream{
APIPrefix: "RI.JS.API",
DeliverPrefix: "RI.DELIVER.SYNC.MIRRORS",
},
},
})
if err != nil {
t.Fatal(err)
}
checkMsgCount(t, js2, 2*time.Second, toSend)

_, err = js2.AddStream(&nats.StreamConfig{
Name: "MY_SOURCE_TEST",
Storage: nats.FileStorage,
Sources: []*nats.StreamSource{
&nats.StreamSource{
Name: "TEST",
External: &nats.ExternalStream{
APIPrefix: "RI.JS.API",
DeliverPrefix: "RI.DELIVER.SYNC.SOURCES",
},
},
},
})
if err != nil {
t.Fatal(err)
}
checkMsgCount(t, js2, 2*time.Second, toSend)
}

func TestJetStreamAutoMaxAckPending(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
Expand Down