Skip to content

Commit

Permalink
Merge pull request #8617 from howbazaar/presence-status
Browse files Browse the repository at this point in the history
#8617

This branch adds two new logical pieces, and hooks up a third.

Features are added to controller config, and when config is updated, a pubsub message is sent for controller.ConfigChanged.

Added a new apiserver struct for shared server data. This is for server attributes that don't change from connection to connection, and need to be threaded through to the facade context. The state pool, presence, and now the central hub are all part of this context. This shared context also knows what controller feature flags have been set.

The last piece is to hook up the status code to the new presence. This means setting the ModelPresence when calling status. There was a logic bug in the controller connections in that all were considered controller connections which meant that AgentStatus always showed unknown. The controller connection flag is supposed to be for non-controller models for controller machine connections. This has now been updated.

## QA steps
```
juju bootstrap lxd test
juju enable-ha
watch -c juju status -m controller
juju ssh -m controller 2
# in there, do this
sudo service jujud-machine-2 stop
# observe the slow status updating showing machine 2 down
# in another terminal
juju controller-config features=[new-presence]
# now stop and start machine 2 and observe faster status.
```

## Documentation changes

This isn't a user observable change, although we may want to document the controller feature flags options.
  • Loading branch information
jujubot committed Apr 17, 2018
2 parents 87dcf3b + 5a7319d commit 23515f7
Show file tree
Hide file tree
Showing 21 changed files with 395 additions and 39 deletions.
9 changes: 6 additions & 3 deletions apiserver/admin.go
Expand Up @@ -114,11 +114,10 @@ func (a *admin) login(req params.LoginRequest, loginVersion int) (params.LoginRe
// apiRoot is the API root exposed to the client after login.
var apiRoot rpc.Root = newAPIRoot(
a.root.state,
a.srv.statePool,
a.root.shared,
a.srv.facades,
a.root.resources,
a.root,
a.root.presence,
)
apiRoot, err = restrictAPIRoot(
a.srv,
Expand Down Expand Up @@ -263,17 +262,21 @@ func (a *admin) authenticate(req params.LoginRequest) (*authResult, error) {
return nil, a.handleAuthError(err)
}
result.controllerMachineLogin = authInfo.Controller
// controllerConn is used to indicate a connection from the controller
// to a non-controller model.
controllerConn := false
if authInfo.Controller && !a.root.state.IsController() {
// We only need to run a pinger for controller machine
// agents when logging into the controller model.
startPinger = false
controllerConn = true
}
a.root.entity = authInfo.Entity
// TODO(wallyworld) - we can't yet observe anonymous logins as entity must be non-nil
a.apiObserver.Login(
authInfo.Entity.Tag(),
a.root.model.ModelTag(),
authInfo.Controller,
controllerConn,
req.UserData,
)
}
Expand Down
30 changes: 19 additions & 11 deletions apiserver/apiserver.go
Expand Up @@ -53,7 +53,8 @@ type Server struct {
clock clock.Clock
pingClock clock.Clock
wg sync.WaitGroup
statePool *state.StatePool

shared *sharedServerContext

// tag of the machine where the API server is running.
tag names.Tag
Expand All @@ -66,8 +67,6 @@ type Server struct {
authenticator httpcontext.LocalMacaroonAuthenticator
offerAuthCtxt *crossmodel.AuthContext
lastConnectionID uint64
centralHub *pubsub.StructuredHub
presence presence.Recorder
newObserver observer.ObserverFactory
connCount int64
totalConn int64
Expand Down Expand Up @@ -238,11 +237,20 @@ func newServer(cfg ServerConfig) (_ *Server, err error) {
limiter := utils.NewLimiterWithPause(
cfg.RateLimitConfig.LoginRateLimit, cfg.RateLimitConfig.LoginMinPause,
cfg.RateLimitConfig.LoginMaxPause, clock.WallClock)
shared, err := newSharedServerContex(sharedServerConfig{
statePool: cfg.StatePool,
centralHub: cfg.Hub,
presence: cfg.Presence,
logger: loggo.GetLogger("juju.apiserver"),
})
if err != nil {
return nil, errors.Trace(err)
}
srv := &Server{
clock: cfg.Clock,
pingClock: cfg.pingClock(),
newObserver: cfg.NewObserver,
statePool: cfg.StatePool,
shared: shared,
tag: cfg.Tag,
dataDir: cfg.DataDir,
logDir: cfg.LogDir,
Expand All @@ -251,8 +259,6 @@ func newServer(cfg ServerConfig) (_ *Server, err error) {
upgradeComplete: cfg.UpgradeComplete,
restoreStatus: cfg.RestoreStatus,
facades: AllFacades(),
centralHub: cfg.Hub,
presence: cfg.Presence,
mux: cfg.Mux,
authenticator: cfg.Authenticator,
allowModelAccess: cfg.AllowModelAccess,
Expand Down Expand Up @@ -296,6 +302,7 @@ func newServer(cfg ServerConfig) (_ *Server, err error) {
defer srv.tomb.Done()
defer srv.dbloggers.dispose()
defer srv.logSinkWriter.Close()
defer srv.shared.Close()
srv.tomb.Kill(srv.loop(ready))
}()

Expand Down Expand Up @@ -414,7 +421,7 @@ func (srv *Server) endpoints() []apihttp.Endpoint {
noModelUUID bool
}
var endpoints []apihttp.Endpoint
controllerModelUUID := srv.statePool.SystemState().ModelUUID()
controllerModelUUID := srv.shared.statePool.SystemState().ModelUUID()
addHandler := func(handler handler) {
methods := handler.methods
if methods == nil {
Expand Down Expand Up @@ -459,7 +466,7 @@ func (srv *Server) endpoints() []apihttp.Endpoint {
debugLogHandler := newDebugLogDBHandler(
httpCtxt, srv.authenticator,
tagKindAuthorizer{names.MachineTagKind, names.UserTagKind})
pubsubHandler := newPubSubHandler(httpCtxt, srv.centralHub)
pubsubHandler := newPubSubHandler(httpCtxt, srv.shared.centralHub)
logSinkHandler := logsink.NewHTTPHandler(
newAgentLogWriteCloserFunc(httpCtxt, srv.logSinkWriter, &srv.dbloggers),
httpCtxt.stop(),
Expand Down Expand Up @@ -530,7 +537,7 @@ func (srv *Server) endpoints() []apihttp.Endpoint {
return opener, st, nil
},
}
controllerAdminAuthorizer := controllerAdminAuthorizer{srv.statePool.SystemState()}
controllerAdminAuthorizer := controllerAdminAuthorizer{srv.shared.statePool.SystemState()}
migrateCharmsHandler := &charmsHandler{
ctxt: httpCtxt,
dataDir: srv.dataDir,
Expand Down Expand Up @@ -811,15 +818,16 @@ func (srv *Server) serveConn(
// newAPIHandler treats an empty modelUUID as signifying
// the API version used.
resolvedModelUUID := modelUUID
statePool := srv.shared.statePool
if modelUUID == "" {
resolvedModelUUID = srv.statePool.SystemState().ModelUUID()
resolvedModelUUID = statePool.SystemState().ModelUUID()
}
var (
st *state.PooledState
h *apiHandler
)

st, err := srv.statePool.Get(resolvedModelUUID)
st, err := statePool.Get(resolvedModelUUID)
if err == nil {
defer st.Release()
h, err = newAPIHandler(srv, st.State, conn, modelUUID, connectionID, host)
Expand Down
4 changes: 2 additions & 2 deletions apiserver/export_test.go
Expand Up @@ -44,7 +44,7 @@ func NewErrRoot(err error) *errRoot {
// *barely* connected to anything. Just enough to let you probe some
// of the interfaces, but not enough to actually do any RPC calls.
func TestingAPIRoot(facades *facade.Registry) rpc.Root {
return newAPIRoot(nil, state.NewStatePool(nil), facades, common.NewResources(), nil, nil)
return newAPIRoot(nil, nil, facades, common.NewResources(), nil)
}

// TestingAPIHandler gives you an APIHandler that isn't connected to
Expand All @@ -57,7 +57,7 @@ func TestingAPIHandler(c *gc.C, pool *state.StatePool, st *state.State) (*apiHan
srv := &Server{
authenticator: authenticator,
offerAuthCtxt: offerAuthCtxt,
statePool: pool,
shared: &sharedServerContext{statePool: pool},
tag: names.NewMachineTag("0"),
}
h, err := newAPIHandler(srv, st, nil, st.ModelUUID(), 6543, "testing.invalid:1234")
Expand Down
6 changes: 6 additions & 0 deletions apiserver/facade/facadetest/context.go
Expand Up @@ -12,6 +12,7 @@ import (
type Context struct {
Auth_ facade.Authorizer
Dispose_ func()
Hub_ facade.Hub
Resources_ facade.Resources
State_ *state.State
StatePool_ *state.StatePool
Expand All @@ -31,6 +32,11 @@ func (context Context) Dispose() {
context.Dispose_()
}

// Hub is part of the facade.Context interface.
func (context Context) Hub() facade.Hub {
return context.Hub_
}

// Resources is part of the facade.Context interface.
func (context Context) Resources() facade.Resources {
return context.Resources_
Expand Down
9 changes: 9 additions & 0 deletions apiserver/facade/interface.go
Expand Up @@ -66,6 +66,10 @@ type Context interface {
// the current model presence.
Presence() Presence

// Hub returns the central hub that the API server holds.
// At least at this stage, facades only need to publish events.
Hub() Hub

// ID returns a string that should almost always be "", unless
// this is a watcher facade, in which case it exists in lieu of
// actual arguments in the Next() call, and is used as a key
Expand Down Expand Up @@ -149,3 +153,8 @@ type ModelPresence interface {
// For a given non controller agent, return the Status for that agent.
AgentStatus(agent string) (presence.Status, error)
}

// Hub represents the central hub that the API server has.
type Hub interface {
Publish(topic string, data interface{}) (<-chan struct{}, error)
}
1 change: 1 addition & 0 deletions apiserver/facades/client/charms/client_test.go
Expand Up @@ -37,6 +37,7 @@ func (ctx *charmsSuiteContext) State() *state.State { return ctx.cs.Stat
func (ctx *charmsSuiteContext) StatePool() *state.StatePool { return nil }
func (ctx *charmsSuiteContext) ID() string { return "" }
func (ctx *charmsSuiteContext) Presence() facade.Presence { return nil }
func (ctx *charmsSuiteContext) Hub() facade.Hub { return nil }

func (s *charmsSuite) SetUpTest(c *gc.C) {
s.JujuConnSuite.SetUpTest(c)
Expand Down
8 changes: 7 additions & 1 deletion apiserver/facades/client/client/client.go
Expand Up @@ -36,7 +36,9 @@ type API struct {
pool Pool
auth facade.Authorizer
resources facade.Resources
client *Client
presence facade.Presence

client *Client
// statusSetter provides common methods for updating an entity's provisioning status.
statusSetter *common.StatusSetter
toolsFinder *common.ToolsFinder
Expand Down Expand Up @@ -114,6 +116,7 @@ func NewFacade(ctx facade.Context) (*Client, error) {
st := ctx.State()
resources := ctx.Resources()
authorizer := ctx.Auth()
presence := ctx.Presence()

model, err := st.Model()
if err != nil {
Expand All @@ -139,6 +142,7 @@ func NewFacade(ctx facade.Context) (*Client, error) {
modelConfigAPI,
resources,
authorizer,
presence,
statusSetter,
toolsFinder,
newEnviron,
Expand All @@ -153,6 +157,7 @@ func NewClient(
modelConfigAPI *modelconfig.ModelConfigAPI,
resources facade.Resources,
authorizer facade.Authorizer,
presence facade.Presence,
statusSetter *common.StatusSetter,
toolsFinder *common.ToolsFinder,
newEnviron func() (environs.Environ, error),
Expand All @@ -168,6 +173,7 @@ func NewClient(
pool: pool,
auth: authorizer,
resources: resources,
presence: presence,
statusSetter: statusSetter,
toolsFinder: toolsFinder,
},
Expand Down
1 change: 1 addition & 0 deletions apiserver/facades/client/client/status.go
Expand Up @@ -177,6 +177,7 @@ func (c *Client) FullStatus(args params.StatusParams) (params.FullStatus, error)
if err != nil {
return noStatus, errors.Annotate(err, "cannot get model")
}
context.presence.Presence = c.api.presence.ModelPresence(m.UUID())
cfg, err := m.Config()
if err != nil {
return noStatus, errors.Annotate(err, "cannot obtain current model config")
Expand Down
1 change: 1 addition & 0 deletions apiserver/facades/client/client/statushistory_test.go
Expand Up @@ -37,6 +37,7 @@ func (s *statusHistoryTestSuite) SetUpTest(c *gc.C) {
nil, // modelconfig API
nil, // resources
authorizer,
nil, // presence
nil, // statusSetter
nil, // toolsFinder
nil, // newEnviron
Expand Down
23 changes: 22 additions & 1 deletion apiserver/facades/client/controller/controller.go
Expand Up @@ -24,6 +24,7 @@ import (
coremigration "github.com/juju/juju/core/migration"
"github.com/juju/juju/migration"
"github.com/juju/juju/permission"
"github.com/juju/juju/pubsub/controller"
"github.com/juju/juju/state"
)

Expand All @@ -41,6 +42,7 @@ type ControllerAPI struct {
apiUser names.UserTag
resources facade.Resources
presence facade.Presence
hub facade.Hub
}

// ControllerAPIv4 provides the v4 Controller API. The only difference
Expand All @@ -62,13 +64,15 @@ func NewControllerAPIv5(ctx facade.Context) (*ControllerAPI, error) {
pool := ctx.StatePool()
resources := ctx.Resources()
presence := ctx.Presence()
hub := ctx.Hub()

return NewControllerAPI(
st,
pool,
authorizer,
resources,
presence,
hub,
)
}

Expand Down Expand Up @@ -98,6 +102,7 @@ func NewControllerAPI(
authorizer facade.Authorizer,
resources facade.Resources,
presence facade.Presence,
hub facade.Hub,
) (*ControllerAPI, error) {
if !authorizer.AuthClient() {
return nil, errors.Trace(common.ErrPerm)
Expand Down Expand Up @@ -128,6 +133,7 @@ func NewControllerAPI(
apiUser: apiUser,
resources: resources,
presence: presence,
hub: hub,
}, nil
}

Expand Down Expand Up @@ -526,7 +532,22 @@ func (c *ControllerAPI) ConfigSet(args params.ControllerConfigSet) error {
if err := c.checkHasAdmin(); err != nil {
return errors.Trace(err)
}
return errors.Trace(c.state.UpdateControllerConfig(args.Config, nil))
if err := c.state.UpdateControllerConfig(args.Config, nil); err != nil {
return errors.Trace(err)
}
// TODO(thumper): add a version to controller config to allow for
// simultaneous updates and races in publishing, potentially across
// HA servers.
cfg, err := c.state.ControllerConfig()
if err != nil {
return errors.Trace(err)
}
if _, err := c.hub.Publish(
controller.ConfigChanged,
controller.ConfigChangedMessage{cfg}); err != nil {
return errors.Trace(err)
}
return nil
}

// Mask the ConfigSet method from the v4 API. The API reflection code
Expand Down

0 comments on commit 23515f7

Please sign in to comment.