Skip to content

Commit

Permalink
[IMPROVED] Display client_id in monitor endpoints for offline durables
Browse files Browse the repository at this point in the history
For offline durable (queue) subscriptions, the client_id was empty,
which would prevent one to delete a durable subscription without
knowing the clientID.

Resolves #472
  • Loading branch information
kozlovic committed Feb 15, 2018
1 parent 84a8e03 commit 03e5243
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 2 deletions.
63 changes: 63 additions & 0 deletions README.md
Expand Up @@ -676,9 +676,11 @@ For example: [http://localhost:8222/streaming/clientsz?limit=1&offset=1&subs=1](
"subscriptions": {
"foo": [
{
"client_id": "benchmark-sub-0",
"inbox": "_INBOX.jAHSY3hcL5EGFQGYmfayvC",
"ack_inbox": "_INBOX.J3Odi0wXYKWKFWz5D5uhem",
"is_durable": false,
"is_offline": false,
"max_inflight": 1024,
"ack_wait": 30,
"last_sent": 505597,
Expand All @@ -700,9 +702,11 @@ For example: [http://localhost:8222/streaming/clientsz?client=me&subs=1](http://
"subscriptions": {
"foo": [
{
"client_id": "me",
"inbox": "_INBOX.HG0uDuNtAPxJQ1lVjIC389",
"ack_inbox": "_INBOX.Q9iH2gsDPN57ZEvqswiYSL",
"is_durable": false,
"is_offline": false,
"max_inflight": 1024,
"ack_wait": 30,
"last_sent": 0,
Expand Down Expand Up @@ -771,9 +775,11 @@ For example: [http://localhost:8222/streaming/channelsz?limit=1&offset=0&subs=1]
"last_seq": 0,
"subscriptions": [
{
"client_id": "me",
"inbox": "_INBOX.S7kTJjOcToXiJAzGWgINit",
"ack_inbox": "_INBOX.Y04G5pZxlint3yPXrSTjTV",
"is_durable": false,
"is_offline": false,
"max_inflight": 1024,
"ack_wait": 30,
"last_sent": 0,
Expand Down Expand Up @@ -807,19 +813,23 @@ For example: [http://localhost:8222/streaming/channelsz?channel=foo&subs=1](http
"last_seq": 704770,
"subscriptions": [
{
"client_id": "me",
"inbox": "_INBOX.jAHSY3hcL5EGFQGYmfayvC",
"ack_inbox": "_INBOX.J3Odi0wXYKWKFWz5D5uhem",
"is_durable": false,
"is_offline": false,
"max_inflight": 1024,
"ack_wait": 30,
"last_sent": 704770,
"pending_count": 0,
"is_stalled": false
},
{
"client_id": "me2",
"inbox": "_INBOX.jAHSY3hcL5EGFQGYmfaywG",
"ack_inbox": "_INBOX.J3Odi0wXYKWKFWz5D5uhjV",
"is_durable": false,
"is_offline": false,
"max_inflight": 1024,
"ack_wait": 30,
"last_sent": 704770,
Expand All @@ -831,6 +841,59 @@ For example: [http://localhost:8222/streaming/channelsz?channel=foo&subs=1](http
}
```

For durables that are currently running, the `is_offline` field is set to `false`. Here is an example:
```
{
"name": "foo",
"msgs": 0,
"bytes": 0,
"first_seq": 0,
"last_seq": 0,
"subscriptions": [
{
"client_id": "me",
"inbox": "_INBOX.P23kNGFnwC7KRg3jIMB3IL",
"ack_inbox": "_STAN.ack.pLyMpEyg7dgGZBS7jGXC02.foo.pLyMpEyg7dgGZBS7jGXCaw",
"durable_name": "dur",
"is_durable": true,
"is_offline": false,
"max_inflight": 1024,
"ack_wait": 30,
"last_sent": 0,
"pending_count": 0,
"is_stalled": false
}
]
}
```

When that same durable goes offline, `is_offline` is be set to `true`. Although the client is possibly no longer connected (and would not appear in the `clientsz` endpoint), the `client_id` field is still displayed here.
```
{
"name": "foo",
"msgs": 0,
"bytes": 0,
"first_seq": 0,
"last_seq": 0,
"subscriptions": [
{
"client_id": "me",
"inbox": "_INBOX.P23kNGFnwC7KRg3jIMB3IL",
"ack_inbox": "_STAN.ack.pLyMpEyg7dgGZBS7jGXC02.foo.pLyMpEyg7dgGZBS7jGXCaw",
"durable_name": "dur",
"is_durable": true,
"is_offline": true,
"max_inflight": 1024,
"ack_wait": 30,
"last_sent": 0,
"pending_count": 0,
"is_stalled": false
}
]
}
```


# Getting Started

The best way to get the NATS Streaming Server is to use one of the pre-built release binaries which are available for OSX, Linux (x86-64/ARM), Windows. Instructions for using these binaries are on the GitHub releases page.
Expand Down
4 changes: 4 additions & 0 deletions server/monitor.go
Expand Up @@ -373,6 +373,10 @@ func createSubscriptionz(sub *subState) *Subscriptionz {
PendingCount: len(sub.acksPending),
IsStalled: sub.stalled,
}
// Case of offline durable (queue) subscriptions
if sub.ClientID == "" {
subz.ClientID = sub.savedClientID
}
sub.RUnlock()
return subz
}
Expand Down
4 changes: 4 additions & 0 deletions server/monitor_test.go
Expand Up @@ -1101,6 +1101,10 @@ func TestMonitorDurableSubs(t *testing.T) {
if sub.IsOffline != expectedOffline {
stackFatalf(t, "Unexpected IsOffline, wants %v, got %v", expectedOffline, sub.IsOffline)
}
// ClientID are now always reported, even when the durables are offline
if sub.ClientID == "" {
stackFatalf(t, "ClientID should always have a value")
}
}
}
// There should be 1 sub
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Expand Up @@ -591,7 +591,7 @@ type subState struct {
acksPending map[uint64]int64 // key is message sequence, value is expiration time.
store stores.SubStore // for easy access to the store interface

savedClientID string // Used only for closed durables in Clustering mode.
savedClientID string // Used only for closed durables in Clustering mode and monitoring endpoints.

replicate *subSentAndAck // Used in Clustering mode

Expand Down Expand Up @@ -752,7 +752,7 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
if sub.isDurableSubscriber() {
durableKey = sub.durableKey()
}
// This is needed when doing a snapshot in clustering mode.
// This is needed when doing a snapshot in clustering mode or for monitoring endpoints
sub.savedClientID = sub.ClientID
// Clear the subscriptions clientID
sub.ClientID = ""
Expand Down

0 comments on commit 03e5243

Please sign in to comment.