Skip to content

Commit

Permalink
Merge pull request #1612 from nats-io/sys-evt-cleanup
Browse files Browse the repository at this point in the history
[Added] filtering by account to leafz and exposing this as per acc subject
  • Loading branch information
kozlovic committed Sep 24, 2020
2 parents b19b2e1 + 371861e commit 2792fd2
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 10 deletions.
11 changes: 11 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,17 @@ func (s *Server) initEventTracking() {
}
})
},
"LEAFZ": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &LeafzEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
if acc, err := extractAccount(subject); err != nil {
return nil, err
} else {
optz.LeafzOptions.Account = acc
return s.Leafz(&optz.LeafzOptions)
}
})
},
"INFO": func(sub *subscription, _ *client, subject, reply string, msg []byte) {
optz := &AccInfoEventOptions{}
s.zReq(reply, msg, &optz.EventFilterOptions, optz, func() (interface{}, error) {
Expand Down
4 changes: 3 additions & 1 deletion server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,7 @@ func TestSystemAccountWithGateways(t *testing.T) {

// If this tests fails with wrong number after 10 seconds we may have
// added a new inititial subscription for the eventing system.
checkExpectedSubs(t, 33, sa)
checkExpectedSubs(t, 34, sa)

// Create a client on B and see if we receive the event
urlb := fmt.Sprintf("nats://%s:%d", ob.Host, ob.Port)
Expand Down Expand Up @@ -1935,6 +1935,8 @@ func TestServerEventsPingMonitorz(t *testing.T) {
[]string{"now", "leafs"}},
{"ACCOUNTZ", &AccountzOptions{Account: sysAcc}, &Accountz{},
[]string{"now", "account_detail"}},
{"LEAFZ", &LeafzOptions{Account: sysAcc}, &Leafz{},
[]string{"now", "leafs"}},

{"ROUTEZ", json.RawMessage(`{"cluster":""}`), &Routez{},
[]string{"now", "routes"}},
Expand Down
18 changes: 11 additions & 7 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1739,7 +1739,8 @@ type Leafz struct {
// LeafzOptions are options passed to Leafz
type LeafzOptions struct {
// Subscriptions indicates that Leafz will return a leafnode's subscriptions
Subscriptions bool `json:"subscriptions"`
Subscriptions bool `json:"subscriptions"`
Account string `json:"account"`
}

// LeafInfo has detailed information on each remote leafnode connection.
Expand All @@ -1764,6 +1765,14 @@ func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) {
if len(s.leafs) > 0 {
lconns = make([]*client, 0, len(s.leafs))
for _, ln := range s.leafs {
if opts != nil && opts.Account != "" {
ln.mu.Lock()
ok := ln.acc.Name == opts.Account
ln.mu.Unlock()
if !ok {
continue
}
}
lconns = append(lconns, ln)
}
}
Expand Down Expand Up @@ -1813,12 +1822,7 @@ func (s *Server) HandleLeafz(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
var opts *LeafzOptions
if subs {
opts = &LeafzOptions{Subscriptions: true}
}

l, err := s.Leafz(opts)
l, err := s.Leafz(&LeafzOptions{subs, r.URL.Query().Get("acc")})
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
Expand Down
24 changes: 22 additions & 2 deletions server/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3607,7 +3607,27 @@ func TestMonitorLeafz(t *testing.T) {
}
}
}

// Make sure that we can request per account - existing account
pollURL = fmt.Sprintf("http://127.0.0.1:%d/leafz?acc=%s", sa.MonitorAddr().Port, acc1.Name)
for pollMode := 1; pollMode < 2; pollMode++ {
l := pollLeafz(t, sa, pollMode, pollURL, &LeafzOptions{Account: acc1.Name})
for _, ln := range l.Leafs {
if ln.Account != acc1.Name {
t.Fatalf("Expected leaf node to be from account %s, got: %v", acc1.Name, ln)
}
}
if len(l.Leafs) != 1 {
t.Fatalf("Expected only two leaf node for this account, got: %v", len(l.Leafs))
}
}
// Make sure that we can request per account - non existing account
pollURL = fmt.Sprintf("http://127.0.0.1:%d/leafz?acc=%s", sa.MonitorAddr().Port, "DOESNOTEXIST")
for pollMode := 1; pollMode < 2; pollMode++ {
l := pollLeafz(t, sa, pollMode, pollURL, &LeafzOptions{Account: "DOESNOTEXIST"})
if len(l.Leafs) != 0 {
t.Fatalf("Expected no leaf node for this account, got: %v", len(l.Leafs))
}
}
// Now polling server B.
pollURL = fmt.Sprintf("http://127.0.0.1:%d/leafz?subs=1", sb.MonitorAddr().Port)
for pollMode := 1; pollMode < 2; pollMode++ {
Expand Down Expand Up @@ -3663,7 +3683,7 @@ func TestMonitorAccountz(t *testing.T) {
t.Fatalf("Body missing value. Contains: %s", body)
} else if !strings.Contains(body, `"account_name": "$SYS",`) {
t.Fatalf("Body missing value. Contains: %s", body)
} else if !strings.Contains(body, `"subscriptions": 32,`) {
} else if !strings.Contains(body, `"subscriptions": 33,`) {
t.Fatalf("Body missing value. Contains: %s", body)
}
}

0 comments on commit 2792fd2

Please sign in to comment.