Skip to content

Commit

Permalink
Merge 4cd3453 into 97ab2d6
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Sep 27, 2018
2 parents 97ab2d6 + 4cd3453 commit 9664184
Show file tree
Hide file tree
Showing 6 changed files with 865 additions and 55 deletions.
42 changes: 25 additions & 17 deletions server/client.go
Expand Up @@ -69,6 +69,7 @@ type clientFlag byte
// Some client state represented as flags
const (
connectReceived clientFlag = 1 << iota // The CONNECT proto has been received
infoReceived // The INFO protocol has been received
firstPongSent // The first PONG has been sent
handshakeComplete // For TLS clients, indicate that the handshake is complete
clearConnection // Marks that clearConnection has already been called.
Expand Down Expand Up @@ -856,9 +857,12 @@ func (c *client) maxPayloadViolation(sz int, max int64) {
}

// queueOutbound queues data for client/route connections.
// Return pending length.
// Return if the data is referenced or not. If referenced, the caller
// should not reuse the `data` array.
// Lock should be held.
func (c *client) queueOutbound(data []byte) {
func (c *client) queueOutbound(data []byte) bool {
// Assume data will not be referenced
referenced := false
// Add to pending bytes total.
c.out.pb += int64(len(data))

Expand All @@ -868,7 +872,7 @@ func (c *client) queueOutbound(data []byte) {
c.clearConnection(SlowConsumerPendingBytes)
atomic.AddInt64(&c.srv.slowConsumers, 1)
c.Noticef("Slow Consumer Detected: MaxPending of %d Exceeded", c.out.mp)
return
return referenced
}

if c.out.p == nil && len(data) < maxBufSize {
Expand Down Expand Up @@ -901,6 +905,7 @@ func (c *client) queueOutbound(data []byte) {
// FIXME(dlc) - do we need signaling of ownership here if we want len(data) <
if len(data) > maxBufSize {
c.out.nb = append(c.out.nb, data)
referenced = true
} else {
// We will copy to primary.
if c.out.p == nil {
Expand All @@ -924,6 +929,7 @@ func (c *client) queueOutbound(data []byte) {
} else {
c.out.p = append(c.out.p, data...)
}
return referenced
}

// Assume the lock is held upon entry.
Expand Down Expand Up @@ -993,6 +999,12 @@ func (c *client) processPing() {
}
c.sendPong()

// If not a CLIENT, we are done
if c.typ != CLIENT {
c.mu.Unlock()
return
}

// The CONNECT should have been received, but make sure it
// is so before proceeding
if !c.flags.isSet(connectReceived) {
Expand Down Expand Up @@ -1667,19 +1679,18 @@ func (c *client) processPingTimer() {

c.Debugf("%s Ping Timer", c.typeString())

// Check for violation
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
c.Debugf("Stale Client Connection - Closing")
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true)
c.clearConnection(StaleConnection)
return
}

// If we have had activity within the PingInterval no
// need to send a ping.
if delta := time.Since(c.last); delta < c.srv.getOpts().PingInterval {
c.Debugf("Delaying PING due to activity %v ago", delta.Round(time.Second))
} else {
// Check for violation
if c.ping.out+1 > c.srv.getOpts().MaxPingsOut {
c.Debugf("Stale Client Connection - Closing")
c.sendProto([]byte(fmt.Sprintf("-ERR '%s'\r\n", "Stale Connection")), true)
c.clearConnection(StaleConnection)
return
}
// Send PING
c.sendPing()
}
Expand Down Expand Up @@ -1824,12 +1835,9 @@ func (c *client) closeConnection(reason ClosedState) {

// Remove clients subscriptions.
srv.sl.RemoveBatch(subs)
if c.typ != ROUTER {
for _, sub := range subs {
// Forward on unsubscribes if we are not
// a router ourselves.
srv.broadcastUnSubscribe(sub)
}
if c.typ == CLIENT {
// Forward UNSUBs protocols to all routes
srv.broadcastUnSubscribeBatch(subs)
}
}

Expand Down
189 changes: 164 additions & 25 deletions server/reload.go
Expand Up @@ -38,20 +38,37 @@ type option interface {

// IsAuthChange indicates if this option requires reloading authorization.
IsAuthChange() bool

// IsClusterPermsChange indicates if this option requires reloading
// cluster permissions.
IsClusterPermsChange() bool
}

// noopOption is a base struct that provides default no-op behaviors.
type noopOption struct{}

func (n noopOption) IsLoggingChange() bool {
return false
}

func (n noopOption) IsAuthChange() bool {
return false
}

func (n noopOption) IsClusterPermsChange() bool {
return false
}

// loggingOption is a base struct that provides default option behaviors for
// logging-related options.
type loggingOption struct{}
type loggingOption struct {
noopOption
}

func (l loggingOption) IsLoggingChange() bool {
return true
}

func (l loggingOption) IsAuthChange() bool {
return false
}

// traceOption implements the option interface for the `trace` setting.
type traceOption struct {
loggingOption
Expand Down Expand Up @@ -119,17 +136,6 @@ func (r *remoteSyslogOption) Apply(server *Server) {
server.Noticef("Reloaded: remote_syslog = %v", r.newValue)
}

// noopOption is a base struct that provides default no-op behaviors.
type noopOption struct{}

func (n noopOption) IsLoggingChange() bool {
return false
}

func (n noopOption) IsAuthChange() bool {
return false
}

// tlsOption implements the option interface for the `tls` setting.
type tlsOption struct {
noopOption
Expand Down Expand Up @@ -164,10 +170,8 @@ func (t *tlsTimeoutOption) Apply(server *Server) {
}

// authOption is a base struct that provides default option behaviors.
type authOption struct{}

func (o authOption) IsLoggingChange() bool {
return false
type authOption struct {
noopOption
}

func (o authOption) IsAuthChange() bool {
Expand Down Expand Up @@ -235,7 +239,8 @@ func (u *usersOption) Apply(server *Server) {
// clusterOption implements the option interface for the `cluster` setting.
type clusterOption struct {
authOption
newValue ClusterOpts
newValue ClusterOpts
permsChanged bool
}

// Apply the cluster change.
Expand All @@ -256,6 +261,10 @@ func (c *clusterOption) Apply(server *Server) {
server.Noticef("Reloaded: cluster")
}

func (c *clusterOption) IsClusterPermsChange() bool {
return c.permsChanged
}

// routesOption implements the option interface for the cluster `routes`
// setting.
type routesOption struct {
Expand Down Expand Up @@ -503,6 +512,10 @@ func (s *Server) reloadOptions(newOpts *Options) error {
if err != nil {
return err
}
// Need to save off previous cluster permissions
s.mu.Lock()
s.oldClusterPerms = s.opts.Cluster.Permissions
s.mu.Unlock()
s.setOpts(newOpts)
s.applyOptions(changed)
return nil
Expand Down Expand Up @@ -557,10 +570,12 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &usersOption{newValue: newValue.([]*User)})
case "cluster":
newClusterOpts := newValue.(ClusterOpts)
if err := validateClusterOpts(oldValue.(ClusterOpts), newClusterOpts); err != nil {
oldClusterOpts := oldValue.(ClusterOpts)
if err := validateClusterOpts(oldClusterOpts, newClusterOpts); err != nil {
return nil, err
}
diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts})
permsChanged := !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions)
diffOpts = append(diffOpts, &clusterOption{newValue: newClusterOpts, permsChanged: permsChanged})
case "routes":
add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL))
diffOpts = append(diffOpts, &routesOption{add: add, remove: remove})
Expand Down Expand Up @@ -612,8 +627,9 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {

func (s *Server) applyOptions(opts []option) {
var (
reloadLogging = false
reloadAuth = false
reloadLogging = false
reloadAuth = false
reloadClusterPerms = false
)
for _, opt := range opts {
opt.Apply(s)
Expand All @@ -623,6 +639,9 @@ func (s *Server) applyOptions(opts []option) {
if opt.IsAuthChange() {
reloadAuth = true
}
if opt.IsClusterPermsChange() {
reloadClusterPerms = true
}
}

if reloadLogging {
Expand All @@ -631,6 +650,9 @@ func (s *Server) applyOptions(opts []option) {
if reloadAuth {
s.reloadAuthorization()
}
if reloadClusterPerms {
s.reloadClusterPermissions()
}

s.Noticef("Reloaded server configuration")
}
Expand Down Expand Up @@ -674,6 +696,123 @@ func (s *Server) reloadAuthorization() {
}
}

// reloadClusterPermissions reconfigures the cluster's permssions
// and set the permissions to all existing routes, sending an
// update INFO protocol so that remote can resend their local
// subs if needed, and sending local subs matching cluster's
// import subjects.
func (s *Server) reloadClusterPermissions() {
s.mu.Lock()
var (
infoJSON []byte
oldPerms = s.oldClusterPerms
newPerms = s.opts.Cluster.Permissions
routes = make(map[uint64]*client, len(s.routes))
withNewProto int
)
// We can clear this now that we have captured it with oldPerms.
s.oldClusterPerms = nil
// Get all connected routes
for i, route := range s.routes {
// Count the number of routes that can understand receiving INFO updates.
route.mu.Lock()
if route.opts.Protocol >= routeProtoInfo {
withNewProto++
}
route.mu.Unlock()
routes[i] = route
}
// If new permissions is nil, then clear routeInfo import/export
if newPerms == nil {
s.routeInfo.Import = nil
s.routeInfo.Export = nil
} else {
s.routeInfo.Import = newPerms.Import
s.routeInfo.Export = newPerms.Export
}
// Regenerate route INFO
s.generateRouteInfoJSON()
infoJSON = s.routeInfoJSON
s.mu.Unlock()

// If there were no route, we are done
if len(routes) == 0 {
return
}

// If only older servers, simply close all routes and they will do the right
// thing on reconnect.
if withNewProto == 0 {
for _, route := range routes {
route.closeConnection(RouteRemoved)
}
return
}

// Fake clients to test cluster permissions
oldPermsTester := &client{}
oldPermsTester.setRoutePermissions(oldPerms)
newPermsTester := &client{}
newPermsTester.setRoutePermissions(newPerms)

var (
_localSubs [4096]*subscription
localSubs = _localSubs[:0]
subsNeedSUB []*subscription
subsNeedUNSUB []*subscription
deleteRoutedSubs []*subscription
)
s.sl.localSubs(&localSubs)

// Go through all local subscriptions
for _, sub := range localSubs {
// Get all subs that can now be imported
couldImportThen := oldPermsTester.canImport(sub.subject)
canImportNow := newPermsTester.canImport(sub.subject)
if canImportNow {
// If we could not before, then will need to send a SUB protocol.
if !couldImportThen {
subsNeedSUB = append(subsNeedSUB, sub)
}
} else if couldImportThen {
// We were previously able to import this sub, but now
// we can't so we need to send an UNSUB protocol
subsNeedUNSUB = append(subsNeedUNSUB, sub)
}
}

for _, route := range routes {
route.mu.Lock()
// If route is to older server, simply close connection.
if route.opts.Protocol < routeProtoInfo {
route.mu.Unlock()
route.closeConnection(RouteRemoved)
continue
}
route.setRoutePermissions(newPerms)
for _, sub := range route.subs {
// If we can't export, we need to drop the subscriptions that
// we have on behalf of this route.
if !route.canExport(sub.subject) {
delete(route.subs, string(sub.sid))
deleteRoutedSubs = append(deleteRoutedSubs, sub)
}
}
// Send an update INFO, which will allow remote server to show
// our current route config in monitoring and resend subscriptions
// that we now possibly allow with a change of Export permissions.
route.sendInfo(infoJSON)
// Now send SUB and UNSUB protocols as needed.
closed := route.sendRouteSubProtos(subsNeedSUB, nil)
if !closed {
route.sendRouteUnSubProtos(subsNeedUNSUB, nil)
}
route.mu.Unlock()
}
// Remove as a batch all the subs that we have removed from each route.
s.sl.RemoveBatch(deleteRoutedSubs)
}

// validateClusterOpts ensures the new ClusterOpts does not change host or
// port, which do not support reload.
func validateClusterOpts(old, new ClusterOpts) error {
Expand Down

0 comments on commit 9664184

Please sign in to comment.