fix non-terminating controller problem #5985

Merged
merged 2 commits into from Aug 12, 2016
Jump to file or symbol
Failed to load files and symbols.
+200 −135
Split
@@ -5,6 +5,7 @@ package controller_test
import (
"fmt"
+ "net"
"time"
"github.com/juju/errors"
@@ -13,9 +14,13 @@ import (
gc "gopkg.in/check.v1"
"gopkg.in/juju/names.v2"
+ "github.com/juju/juju/api"
"github.com/juju/juju/api/base"
"github.com/juju/juju/api/controller"
+ "github.com/juju/juju/apiserver"
commontesting "github.com/juju/juju/apiserver/common/testing"
+ "github.com/juju/juju/apiserver/observer"
+ "github.com/juju/juju/apiserver/observer/fakeobserver"
"github.com/juju/juju/apiserver/params"
jujutesting "github.com/juju/juju/juju/testing"
"github.com/juju/juju/state"
@@ -36,7 +41,7 @@ func (s *controllerSuite) SetUpTest(c *gc.C) {
}
func (s *controllerSuite) OpenAPI(c *gc.C) *controller.Client {
- return controller.NewClient(s.APIState)
+ return controller.NewClient(s.OpenControllerAPI(c))
}
func (s *controllerSuite) TestAllModels(c *gc.C) {
@@ -47,6 +52,7 @@ func (s *controllerSuite) TestAllModels(c *gc.C) {
Name: "second", Owner: owner}).Close()
sysManager := s.OpenAPI(c)
+ defer sysManager.Close()
envs, err := sysManager.AllModels()
c.Assert(err, jc.ErrorIsNil)
c.Assert(envs, gc.HasLen, 3)
@@ -65,13 +71,15 @@ func (s *controllerSuite) TestAllModels(c *gc.C) {
func (s *controllerSuite) TestModelConfig(c *gc.C) {
sysManager := s.OpenAPI(c)
+ defer sysManager.Close()
cfg, err := sysManager.ModelConfig()
c.Assert(err, jc.ErrorIsNil)
c.Assert(cfg["name"], gc.Equals, "controller")
}
func (s *controllerSuite) TestControllerConfig(c *gc.C) {
sysManager := s.OpenAPI(c)
+ defer sysManager.Close()
cfg, err := sysManager.ControllerConfig()
c.Assert(err, jc.ErrorIsNil)
cfgFromDB, err := s.State.ControllerConfig()
@@ -87,6 +95,7 @@ func (s *controllerSuite) TestDestroyController(c *gc.C) {
st.Close()
sysManager := s.OpenAPI(c)
+ defer sysManager.Close()
err := sysManager.DestroyController(false)
c.Assert(err, gc.ErrorMatches, `failed to destroy model: hosting 1 other models \(controller has hosted models\)`)
}
@@ -97,6 +106,7 @@ func (s *controllerSuite) TestListBlockedModels(c *gc.C) {
c.Assert(err, jc.ErrorIsNil)
sysManager := s.OpenAPI(c)
+ defer sysManager.Close()
results, err := sysManager.ListBlockedModels()
c.Assert(err, jc.ErrorIsNil)
c.Assert(results, jc.DeepEquals, []params.ModelBlockInfo{
@@ -117,6 +127,7 @@ func (s *controllerSuite) TestRemoveBlocks(c *gc.C) {
s.State.SwitchBlockOn(state.ChangeBlock, "TestChangeBlock")
sysManager := s.OpenAPI(c)
+ defer sysManager.Close()
err := sysManager.RemoveBlocks()
c.Assert(err, jc.ErrorIsNil)
@@ -129,6 +140,7 @@ func (s *controllerSuite) TestWatchAllModels(c *gc.C) {
// The WatchAllModels infrastructure is comprehensively tested
// else. This test just ensure that the API calls work end-to-end.
sysManager := s.OpenAPI(c)
+ defer sysManager.Close()
w, err := sysManager.WatchAllModels()
c.Assert(err, jc.ErrorIsNil)
@@ -162,10 +174,82 @@ func (s *controllerSuite) TestWatchAllModels(c *gc.C) {
}
}
+func (s *controllerSuite) TestAPIServerCanShutdownWithOutstandingNext(c *gc.C) {
+
+ lis, err := net.Listen("tcp", "localhost:0")
+ c.Assert(err, jc.ErrorIsNil)
+
+ srv, err := apiserver.NewServer(s.State, lis, apiserver.ServerConfig{
+ Cert: []byte(testing.ServerCert),
+ Key: []byte(testing.ServerKey),
+ Tag: names.NewMachineTag("0"),
+ DataDir: c.MkDir(),
+ LogDir: c.MkDir(),
+ NewObserver: func() observer.Observer { return &fakeobserver.Instance{} },
+ })
+ c.Assert(err, gc.IsNil)
+
+ // Connect to the API server we've just started.
+ apiInfo := s.APIInfo(c)
+ apiInfo.Addrs = []string{lis.Addr().String()}
+ apiState, err := api.Open(apiInfo, api.DialOpts{})
+ sysManager := controller.NewClient(apiState)
+ defer sysManager.Close()
+
+ w, err := sysManager.WatchAllModels()
+ c.Assert(err, jc.ErrorIsNil)
+ defer w.Stop()
+
+ deltasC := make(chan struct{}, 2)
+ go func() {
+ defer close(deltasC)
+ for {
+ _, err := w.Next()
+ if err != nil {
+ return
+ }
+ deltasC <- struct{}{}
+ }
+ }()
+ // Read the first event.
+ select {
+ case <-deltasC:
+ case <-time.After(testing.LongWait):
+ c.Fatal("timed out")
+ }
+ // Wait a little while for the Next call to actually arrive.
+ time.Sleep(testing.ShortWait)
+
+ // We should be able to close the server instance
+ // even when there's an outstanding Next call.
+ srvStopped := make(chan struct{})
+ go func() {
+ srv.Stop()
+ close(srvStopped)
+ }()
+
+ select {
+ case <-srvStopped:
+ case <-time.After(testing.LongWait):
+ c.Fatal("timed out waiting for server to stop")
+ }
+
+ // Check that the Next call has returned too.
+ select {
+ case _, ok := <-deltasC:
+ if ok {
+ c.Fatalf("got unexpected event from deltasC")
+ }
+ case <-time.After(testing.LongWait):
+ c.Fatal("timed out")
+ }
+}
+
func (s *controllerSuite) TestModelStatus(c *gc.C) {
- controller := s.OpenAPI(c)
+ sysManager := s.OpenAPI(c)
+ defer sysManager.Close()
modelTag := s.State.ModelTag()
- results, err := controller.ModelStatus(modelTag)
+ results, err := sysManager.ModelStatus(modelTag)
c.Assert(err, jc.ErrorIsNil)
c.Assert(results, jc.DeepEquals, []base.ModelStatus{{
UUID: modelTag.Id(),
@@ -192,8 +276,9 @@ func (s *controllerSuite) TestInitiateModelMigration(c *gc.C) {
TargetPassword: "secret",
}
- controller := s.OpenAPI(c)
- id, err := controller.InitiateModelMigration(spec)
+ sysManager := s.OpenAPI(c)
+ defer sysManager.Close()
+ id, err := sysManager.InitiateModelMigration(spec)
c.Assert(err, jc.ErrorIsNil)
expectedId := st.ModelUUID() + ":0"
c.Check(id, gc.Equals, expectedId)
@@ -214,8 +299,9 @@ func (s *controllerSuite) TestInitiateModelMigrationError(c *gc.C) {
TargetPassword: "secret",
}
- controller := s.OpenAPI(c)
- id, err := controller.InitiateModelMigration(spec)
+ sysManager := s.OpenAPI(c)
+ defer sysManager.Close()
+ id, err := sysManager.InitiateModelMigration(spec)
c.Check(id, gc.Equals, "")
c.Check(err, gc.ErrorMatches, "unable to read model: .+")
}
View
@@ -53,7 +53,7 @@ func (a *admin) doLogin(req params.LoginRequest, loginVersion int) (params.Login
}
// authedApi is the API method finder we'll use after getting logged in.
- var authedApi rpc.MethodFinder = newApiRoot(a.root.state, a.root.resources, a.root)
+ var authedApi rpc.Root = newApiRoot(a.root.state, a.root.resources, a.root)
// Use the login validation function, if one was specified.
if a.srv.validator != nil {
@@ -214,7 +214,7 @@ func (a *admin) doLogin(req params.LoginRequest, loginVersion int) (params.Login
authedApi = newClientAuthRoot(authedApi, modelUser, controllerUser)
}
- a.root.rpcConn.ServeFinder(authedApi, serverError)
+ a.root.rpcConn.ServeRoot(authedApi, serverError)
return loginResult, nil
}
@@ -532,3 +532,6 @@ type errRoot struct {
func (r *errRoot) FindMethod(rootName string, version int, methodName string) (rpcreflect.MethodCaller, error) {
return nil, r.err
}
+
+func (r *errRoot) Kill() {
+}
View
@@ -507,13 +507,13 @@ func (srv *Server) serveConn(wsConn *websocket.Conn, modelUUID string, apiObserv
h, err := srv.newAPIHandler(conn, modelUUID)
if err != nil {
- conn.ServeFinder(&errRoot{err}, serverError)
+ conn.ServeRoot(&errRoot{err}, serverError)
} else {
adminApis := make(map[int]interface{})
for apiVersion, factory := range srv.adminApiFactories {
adminApis[apiVersion] = factory(srv, h, apiObserver)
}
- conn.ServeFinder(newAnonRoot(h, adminApis), serverError)
+ conn.ServeRoot(newAnonRoot(h, adminApis), serverError)
}
conn.Start()
select {
@@ -16,22 +16,22 @@ import (
// authorisation checks are only for read only access to the model, but in the
// near future, full ACL support is desirable.
type clientAuthRoot struct {
- finder rpc.MethodFinder
+ rpc.Root
modelUser description.UserAccess
controllerUser description.UserAccess
}
// newClientAuthRoot returns a new restrictedRoot.
-func newClientAuthRoot(finder rpc.MethodFinder, modelUser description.UserAccess, controllerUser description.UserAccess) *clientAuthRoot {
- return &clientAuthRoot{finder, modelUser, controllerUser}
+func newClientAuthRoot(root rpc.Root, modelUser description.UserAccess, controllerUser description.UserAccess) *clientAuthRoot {
+ return &clientAuthRoot{root, modelUser, controllerUser}
}
// FindMethod returns a not supported error if the rootName is not one of the
// facades available at the server root when there is no active model.
func (r *clientAuthRoot) FindMethod(rootName string, version int, methodName string) (rpcreflect.MethodCaller, error) {
// The lookup of the name is done first to return a not found error if the
// user is looking for a method that we just don't have.
- caller, err := r.finder.FindMethod(rootName, version, methodName)
+ caller, err := r.Root.FindMethod(rootName, version, methodName)
if err != nil {
return nil, err
}
@@ -44,7 +44,7 @@ func (s *clientAuthRootSuite) AssertCallErrPerm(c *gc.C, client *clientAuthRoot,
func (s *clientAuthRootSuite) TestNormalUser(c *gc.C) {
modelUser := s.Factory.MakeModelUser(c, nil)
- client := newClientAuthRoot(&fakeFinder{}, modelUser, description.UserAccess{})
+ client := newClientAuthRoot(&fakeRoot{}, modelUser, description.UserAccess{})
s.AssertCallGood(c, client, "Application", 1, "Deploy")
s.AssertCallGood(c, client, "UserManager", 1, "UserInfo")
s.AssertCallNotImplemented(c, client, "Client", 1, "Unknown")
@@ -53,7 +53,7 @@ func (s *clientAuthRootSuite) TestNormalUser(c *gc.C) {
func (s *clientAuthRootSuite) TestReadOnlyUser(c *gc.C) {
modelUser := s.Factory.MakeModelUser(c, &factory.ModelUserParams{Access: description.ReadAccess})
- client := newClientAuthRoot(&fakeFinder{}, modelUser, description.UserAccess{})
+ client := newClientAuthRoot(&fakeRoot{}, modelUser, description.UserAccess{})
// deploys are bad
s.AssertCallErrPerm(c, client, "Application", 1, "Deploy")
// read only commands are fine
@@ -69,10 +69,9 @@ func isCallNotImplementedError(err error) bool {
return ok
}
-type fakeFinder struct{}
+type fakeRoot struct{}
-// FindMethod is the only thing we need to implement rpc.MethodFinder.
-func (f *fakeFinder) FindMethod(rootName string, version int, methodName string) (rpcreflect.MethodCaller, error) {
+func (f *fakeRoot) FindMethod(rootName string, version int, methodName string) (rpcreflect.MethodCaller, error) {
_, _, err := lookupMethod(rootName, version, methodName)
if err != nil {
return nil, err
@@ -81,6 +80,9 @@ func (f *fakeFinder) FindMethod(rootName string, version int, methodName string)
return &fakeCaller{}, nil
}
+func (f *fakeRoot) Kill() {
+}
+
// fakeCaller implements a rpcreflect.MethodCaller. We don't care what the
// actual reflect.Types or values actually are, the caller just has to be
// valid.
@@ -87,7 +87,7 @@ func NewErrRoot(err error) *errRoot {
// TestingApiRoot gives you an ApiRoot as a rpc.Methodfinder that is
// *barely* connected to anything. Just enough to let you probe some
// of the interfaces, but not enough to actually do any RPC calls.
-func TestingApiRoot(st *state.State) rpc.MethodFinder {
+func TestingApiRoot(st *state.State) rpc.Root {
return newApiRoot(st, common.NewResources(), nil)
}
@@ -117,14 +117,14 @@ func TestingApiHandlerWithEntity(c *gc.C, srvSt, st *state.State, entity state.E
// TestingUpgradingRoot returns a limited srvRoot
// in an upgrade scenario.
-func TestingUpgradingRoot(st *state.State) rpc.MethodFinder {
+func TestingUpgradingRoot(st *state.State) rpc.Root {
r := TestingApiRoot(st)
return newUpgradingRoot(r)
}
// TestingRestrictedApiHandler returns a restricted srvRoot as if accessed
// from the root of the API path.
-func TestingRestrictedApiHandler(st *state.State) rpc.MethodFinder {
+func TestingRestrictedApiHandler(st *state.State) rpc.Root {
r := TestingApiRoot(st)
return newRestrictedRoot(r)
}
@@ -17,35 +17,35 @@ var restoreInProgressError = errors.New("juju restore is in progress - Juju api
// aboutToRestoreRoot a root that will only allow a limited
// set of methods to run, defined in allowedMethodsAboutToRestore.
type aboutToRestoreRoot struct {
- rpc.MethodFinder
+ rpc.Root
}
// restoreRoot a root that will not allow calls whatsoever during restore.
type restoreInProgressRoot struct {
- rpc.MethodFinder
+ rpc.Root
}
// newAboutToRestoreRoot creates a root where all API calls
// but restore will fail with aboutToRestoreError.
-func newAboutToRestoreRoot(finder rpc.MethodFinder) *aboutToRestoreRoot {
+func newAboutToRestoreRoot(root rpc.Root) *aboutToRestoreRoot {
return &aboutToRestoreRoot{
- MethodFinder: finder,
+ Root: root,
}
}
// newRestoreInProressRoot creates a root where all API calls
// but restore will fail with restoreInProgressError.
-func newRestoreInProgressRoot(finder rpc.MethodFinder) *restoreInProgressRoot {
+func newRestoreInProgressRoot(root rpc.Root) *restoreInProgressRoot {
return &restoreInProgressRoot{
- MethodFinder: finder,
+ Root: root,
}
}
// FindMethod extended srvRoot.FindMethod. It returns aboutToRestoreError
// for all API calls except Client.Restore
// for use while Juju is preparing to restore a backup.
func (r *aboutToRestoreRoot) FindMethod(rootName string, version int, methodName string) (rpcreflect.MethodCaller, error) {
- caller, err := r.MethodFinder.FindMethod(rootName, version, methodName)
+ caller, err := r.Root.FindMethod(rootName, version, methodName)
if err != nil {
return nil, err
}
@@ -76,7 +76,7 @@ func isMethodAllowedAboutToRestore(rootName, methodName string) bool {
// for all API calls.
// for use while Juju is restoring a backup.
func (r *restoreInProgressRoot) FindMethod(rootName string, version int, methodName string) (rpcreflect.MethodCaller, error) {
- _, err := r.MethodFinder.FindMethod(rootName, version, methodName)
+ _, err := r.Root.FindMethod(rootName, version, methodName)
if err != nil {
return nil, err
}
Oops, something went wrong.