Skip to content

Commit

Permalink
Merge pull request #1128 from nats-io/user_cluster_id_in_internal_sub…
Browse files Browse the repository at this point in the history
…jects

[CHANGED] User cluster ID as part of internal subscriptions
  • Loading branch information
kozlovic committed Nov 19, 2020
2 parents 4df4c5a + 651b409 commit 4ddbef0
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1952,12 +1952,11 @@ func (s *StanServer) start(runningState State) error {
} else {
s.log.Noticef("No recovered state")
}
subjID := s.opts.ID
// In FT or with static channels (aka partitioning), we use the cluster ID
// as part of the subjects prefix, not a NUID.
if runningState == Standalone && s.partitions == nil {
subjID = nuid.Next()
}
// We used to use a NUID as part of the internal subjects in standalone mode
// without channel partitioning. In all other cases, we used the cluster ID.
// We now always use the cluster ID. It makes it easier when users want to
// use NATS permissions to restrict things.
clusterID := s.opts.ID
if recoveredState != nil {
// Copy content
s.info = *recoveredState.Info
Expand All @@ -1970,7 +1969,7 @@ func (s *StanServer) start(runningState State) error {
// If not, it means we recovered from an older server, so
// need to update.
if s.info.SubClose == "" {
s.info.SubClose = fmt.Sprintf("%s.%s", DefaultSubClosePrefix, subjID)
s.info.SubClose = fmt.Sprintf("%s.%s", DefaultSubClosePrefix, clusterID)
// Update the store with the server info
callStoreInit = true
}
Expand Down Expand Up @@ -2004,13 +2003,13 @@ func (s *StanServer) start(runningState State) error {
s.info.ClusterID = s.opts.ID

// Generate Subjects
s.info.Discovery = fmt.Sprintf("%s.%s", s.opts.DiscoverPrefix, s.info.ClusterID)
s.info.Publish = fmt.Sprintf("%s.%s", DefaultPubPrefix, subjID)
s.info.Subscribe = fmt.Sprintf("%s.%s", DefaultSubPrefix, subjID)
s.info.SubClose = fmt.Sprintf("%s.%s", DefaultSubClosePrefix, subjID)
s.info.Unsubscribe = fmt.Sprintf("%s.%s", DefaultUnSubPrefix, subjID)
s.info.Close = fmt.Sprintf("%s.%s", DefaultClosePrefix, subjID)
s.info.AcksSubs = fmt.Sprintf("%s.%s", defaultAcksPrefix, subjID)
s.info.Discovery = fmt.Sprintf("%s.%s", s.opts.DiscoverPrefix, clusterID)
s.info.Publish = fmt.Sprintf("%s.%s", DefaultPubPrefix, clusterID)
s.info.Subscribe = fmt.Sprintf("%s.%s", DefaultSubPrefix, clusterID)
s.info.SubClose = fmt.Sprintf("%s.%s", DefaultSubClosePrefix, clusterID)
s.info.Unsubscribe = fmt.Sprintf("%s.%s", DefaultUnSubPrefix, clusterID)
s.info.Close = fmt.Sprintf("%s.%s", DefaultClosePrefix, clusterID)
s.info.AcksSubs = fmt.Sprintf("%s.%s", defaultAcksPrefix, clusterID)

if s.opts.Clustering.Clustered {
// If clustered, assign a random cluster node ID if not provided.
Expand Down

0 comments on commit 4ddbef0

Please sign in to comment.