Skip to content

Commit

Permalink
[CHANGED] User cluster ID as part of internal subscriptions
Browse files Browse the repository at this point in the history
We were using a NUID for subjects such as `_STAN.pub.<nuid>.>`
when running in standalone mode without channel partitioning.

In all other cases, we used the cluster ID in place of the nuid.
This change will make use of the cluster ID in all cases. This
will apply only to new deployment, meaning that existing stores
with NUID in the subjects will still be used.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Nov 19, 2020
1 parent 4df4c5a commit 651b409
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions server/server.go
Expand Up @@ -1952,12 +1952,11 @@ func (s *StanServer) start(runningState State) error {
} else {
s.log.Noticef("No recovered state")
}
subjID := s.opts.ID
// In FT or with static channels (aka partitioning), we use the cluster ID
// as part of the subjects prefix, not a NUID.
if runningState == Standalone && s.partitions == nil {
subjID = nuid.Next()
}
// We used to use a NUID as part of the internal subjects in standalone mode
// without channel partitioning. In all other cases, we used the cluster ID.
// We now always use the cluster ID. It makes it easier when users want to
// use NATS permissions to restrict things.
clusterID := s.opts.ID
if recoveredState != nil {
// Copy content
s.info = *recoveredState.Info
Expand All @@ -1970,7 +1969,7 @@ func (s *StanServer) start(runningState State) error {
// If not, it means we recovered from an older server, so
// need to update.
if s.info.SubClose == "" {
s.info.SubClose = fmt.Sprintf("%s.%s", DefaultSubClosePrefix, subjID)
s.info.SubClose = fmt.Sprintf("%s.%s", DefaultSubClosePrefix, clusterID)
// Update the store with the server info
callStoreInit = true
}
Expand Down Expand Up @@ -2004,13 +2003,13 @@ func (s *StanServer) start(runningState State) error {
s.info.ClusterID = s.opts.ID

// Generate Subjects
s.info.Discovery = fmt.Sprintf("%s.%s", s.opts.DiscoverPrefix, s.info.ClusterID)
s.info.Publish = fmt.Sprintf("%s.%s", DefaultPubPrefix, subjID)
s.info.Subscribe = fmt.Sprintf("%s.%s", DefaultSubPrefix, subjID)
s.info.SubClose = fmt.Sprintf("%s.%s", DefaultSubClosePrefix, subjID)
s.info.Unsubscribe = fmt.Sprintf("%s.%s", DefaultUnSubPrefix, subjID)
s.info.Close = fmt.Sprintf("%s.%s", DefaultClosePrefix, subjID)
s.info.AcksSubs = fmt.Sprintf("%s.%s", defaultAcksPrefix, subjID)
s.info.Discovery = fmt.Sprintf("%s.%s", s.opts.DiscoverPrefix, clusterID)
s.info.Publish = fmt.Sprintf("%s.%s", DefaultPubPrefix, clusterID)
s.info.Subscribe = fmt.Sprintf("%s.%s", DefaultSubPrefix, clusterID)
s.info.SubClose = fmt.Sprintf("%s.%s", DefaultSubClosePrefix, clusterID)
s.info.Unsubscribe = fmt.Sprintf("%s.%s", DefaultUnSubPrefix, clusterID)
s.info.Close = fmt.Sprintf("%s.%s", DefaultClosePrefix, clusterID)
s.info.AcksSubs = fmt.Sprintf("%s.%s", defaultAcksPrefix, clusterID)

if s.opts.Clustering.Clustered {
// If clustered, assign a random cluster node ID if not provided.
Expand Down

0 comments on commit 651b409

Please sign in to comment.