Skip to content

Commit

Permalink
jsm: Add fields for mirrors to StreamConfig
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Feb 25, 2021
1 parent c50dffa commit 9c91f7d
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 4 deletions.
27 changes: 23 additions & 4 deletions jsm.go
Expand Up @@ -83,6 +83,8 @@ type StreamConfig struct {
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
}

// Placement is used to guide placement of streams in clustered JetStream.
Expand All @@ -91,6 +93,14 @@ type Placement struct {
Tags []string `json:"tags,omitempty"`
}

// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
}

// apiError is included in all API responses if there was an error.
type apiError struct {
Code int `json:"code"`
Expand Down Expand Up @@ -380,10 +390,19 @@ func (js *js) StreamInfo(stream string) (*StreamInfo, error) {

// StreamInfo shows config and current state for this stream.
type StreamInfo struct {
Config StreamConfig `json:"config"`
Created time.Time `json:"created"`
State StreamState `json:"state"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
Config StreamConfig `json:"config"`
Created time.Time `json:"created"`
State StreamState `json:"state"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
Sources []*StreamSourceInfo `json:"sources,omitempty"`
}

// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
}

// StreamState is information about the given stream.
Expand Down
189 changes: 189 additions & 0 deletions test/js_test.go
Expand Up @@ -2689,6 +2689,195 @@ func TestJetStream_ClusterPlacement(t *testing.T) {
})
}

func TestJetStreamStreamMirror(t *testing.T) {
withJSCluster(t, "MIRROR", 3, testJetStreamMirror_Source)
}

func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
srvA := nodes[0]
nc, err := nats.Connect(srvA.ClientURL())
if err != nil {
t.Error(err)
}

js, err := nc.JetStream()
if err != nil {
t.Fatal(err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "origin",
Placement: &nats.Placement{
Tags: []string{"NODE_0"},
},
Storage: nats.MemoryStorage,
Replicas: 1,
})
if err != nil {
t.Fatalf("Unexpected error creating stream: %v", err)
}

totalMsgs := 10
for i := 0; i < totalMsgs; i++ {
payload := fmt.Sprintf("i:%d", i)
js.Publish("origin", []byte(payload))
}

t.Run("create mirrors", func(t *testing.T) {
_, err = js.AddStream(&nats.StreamConfig{
Name: "m1",
Mirror: &nats.StreamSource{Name: "origin"},
Storage: nats.FileStorage,
Replicas: 1,
})
if err != nil {
t.Fatalf("Unexpected error creating stream: %v", err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "m2",
Mirror: &nats.StreamSource{Name: "origin"},
Storage: nats.MemoryStorage,
Replicas: 1,
})
if err != nil {
t.Fatalf("Unexpected error creating stream: %v", err)
}
msgs := make([]*nats.RawStreamMsg, 0)

// Stored message sequences start at 1
startSequence := 1

GetNextMsg:
for i := startSequence; i < totalMsgs+1; i++ {
var (
err error
seq = uint64(i)
msgA *nats.RawStreamMsg
msgB *nats.RawStreamMsg
sourceMsg *nats.RawStreamMsg
timeout = time.Now().Add(2 * time.Second)
)

for time.Now().Before(timeout) {
sourceMsg, err = js.GetMsg("origin", seq)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
msgA, err = js.GetMsg("m1", seq)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
if !reflect.DeepEqual(sourceMsg, msgA) {
t.Errorf("Expected %+v, got: %+v", sourceMsg, msgA)
}

msgB, err = js.GetMsg("m2", seq)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
if !reflect.DeepEqual(sourceMsg, msgB) {
t.Errorf("Expected %+v, got: %+v", sourceMsg, msgB)
}

msgs = append(msgs, msgA)
continue GetNextMsg
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

got := len(msgs)
if got < totalMsgs {
t.Errorf("Expected %v, got: %v", totalMsgs, got)
}
})

t.Run("get mirror info", func(t *testing.T) {
m1, err := js.StreamInfo("m1")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
got := m1.Mirror.Name
expected := "origin"
if got != expected {
t.Errorf("Expected %v, got: %v", expected, got)
}

m2, err := js.StreamInfo("m2")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
got = m2.Mirror.Name
expected = "origin"
if got != expected {
t.Errorf("Expected %v, got: %v", expected, got)
}
})

t.Run("create stream from sources", func(t *testing.T) {
sources := make([]*nats.StreamSource, 0)
sources = append(sources, &nats.StreamSource{Name: "m1"})
sources = append(sources, &nats.StreamSource{Name: "m2"})
_, err = js.AddStream(&nats.StreamConfig{
Name: "s1",
Sources: sources,
Storage: nats.FileStorage,
Replicas: 1,
})
if err != nil {
t.Fatalf("Unexpected error creating stream: %v", err)
}

msgs := make([]*nats.RawStreamMsg, 0)

// Stored message sequences start at 1
startSequence := 1
expectedTotal := totalMsgs * 2

GetNextMsg:
for i := startSequence; i < expectedTotal+1; i++ {
var (
err error
seq = uint64(i)
msg *nats.RawStreamMsg
timeout = time.Now().Add(2 * time.Second)
)

for time.Now().Before(timeout) {
msg, err = js.GetMsg("s1", seq)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue
}
msgs = append(msgs, msg)
continue GetNextMsg
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}

got := len(msgs)
if got < expectedTotal {
t.Errorf("Expected %v, got: %v", expectedTotal, got)
}

si, err := js.StreamInfo("s1")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
got = int(si.State.Msgs)
if got != expectedTotal {
t.Errorf("Expected %v, got: %v", expectedTotal, got)
}
})
}

func TestJetStream_ClusterReconnect(t *testing.T) {
n := 3
replicas := []int{1, 3}
Expand Down

0 comments on commit 9c91f7d

Please sign in to comment.