From a50da75053f4ee047a56d2fc7c72f7a67b9f6a82 Mon Sep 17 00:00:00 2001 From: Harry Pidcock Date: Thu, 12 Sep 2019 11:49:09 +1000 Subject: [PATCH] Run+Hook sockets over TLS + token auth for CAAS - juju-run multiplex server listens on TCP+TLS - juju-run local listener per uniter on unix socket - jujuc listener per hook context either over TCP+TLS or unix socket - x509 certificates generated by apiserver on request from caasoperatorprovisioner and injected into operator config map - juju-run auth token generated per unit and store in operator.yaml alongside ca.crt file (controller's CA cert) - jujuc listner pass token via hook env vars & ca cert filename passed by env var - caasoperatorprovisioner updates existing operators with certificates - increase rsa key size to 3072 bits to future proof past 2030 --- api/caasoperatorprovisioner/client.go | 31 ++++ api/caasoperatorprovisioner/client_test.go | 38 ++++ .../caasoperatorprovisioner/mock_test.go | 7 + .../caasoperatorprovisioner/provisioner.go | 41 ++++- .../provisioner_test.go | 40 +++++ .../caasoperatorprovisioner/state.go | 1 + apiserver/facades/schema.json | 49 +++++ apiserver/params/params.go | 14 ++ caas/broker.go | 4 + caas/kubernetes/provider/exec/copy_test.go | 3 + caas/kubernetes/provider/exec/exec.go | 23 ++- caas/kubernetes/provider/exec/exec_test.go | 19 +- caas/kubernetes/provider/exec/export_test.go | 2 +- caas/kubernetes/provider/k8s.go | 80 +++++++-- caas/kubernetes/provider/k8s_test.go | 76 +++++++- caas/operator.go | 68 +++++++ caas/operator_test.go | 43 +++++ cert/cert.go | 4 +- cmd/jujud/agent/caasoperator.go | 9 +- cmd/jujud/agent/caasoperator/manifolds.go | 4 +- cmd/jujud/main.go | 71 ++++++-- cmd/jujud/main_test.go | 2 +- cmd/jujud/run.go | 68 ++++++- featuretests/agent_caasoperator_test.go | 20 ++- juju/sockets/package_test.go | 14 ++ juju/sockets/sockets.go | 6 +- juju/sockets/sockets_nix.go | 27 ++- juju/sockets/sockets_test.go | 101 +++++++++++ worker/caasoperator/action.go | 99 +++++++---- worker/caasoperator/action_test.go | 78 +++++--- worker/caasoperator/caasoperator.go | 22 +-- worker/caasoperator/caasoperator_test.go | 4 +- worker/caasoperator/manifold.go | 69 ++++++- worker/caasoperator/manifold_test.go | 31 ++++ worker/caasoperator/paths.go | 5 + worker/caasoperatorprovisioner/mock_test.go | 33 ++++ worker/caasoperatorprovisioner/worker.go | 74 ++++++-- worker/caasoperatorprovisioner/worker_test.go | 118 +++++++++--- worker/meterstatus/context.go | 13 +- worker/meterstatus/context_test.go | 10 +- worker/meterstatus/runner.go | 3 +- worker/metrics/collect/context.go | 13 +- worker/metrics/collect/context_test.go | 8 +- worker/metrics/collect/handler.go | 2 +- worker/metrics/collect/manifold.go | 4 +- worker/metrics/sender/manifold.go | 2 +- worker/uniter/paths.go | 168 ++++++++++-------- worker/uniter/paths_test.go | 86 ++++----- worker/uniter/runlistener.go | 33 ++++ worker/uniter/runner/context/context.go | 27 ++- worker/uniter/runner/context/env_test.go | 8 +- worker/uniter/runner/context/util_test.go | 16 +- worker/uniter/runner/factory_test.go | 2 +- worker/uniter/runner/jujuc/server.go | 10 +- worker/uniter/runner/jujuc/server_test.go | 2 +- worker/uniter/runner/runner.go | 49 +++-- worker/uniter/runner/runner_test.go | 2 +- worker/uniter/runner/testing/utils.go | 12 +- worker/uniter/uniter.go | 43 +++-- worker/uniter/util_test.go | 2 +- 60 files changed, 1561 insertions(+), 352 deletions(-) create mode 100644 caas/operator.go create mode 100644 caas/operator_test.go create mode 100644 juju/sockets/package_test.go create mode 100644 juju/sockets/sockets_test.go diff --git a/api/caasoperatorprovisioner/client.go b/api/caasoperatorprovisioner/client.go index f8af6a5b1ca1..2f696ad9e9ab 100644 --- a/api/caasoperatorprovisioner/client.go +++ b/api/caasoperatorprovisioner/client.go @@ -135,3 +135,34 @@ func filesystemFromParams(in params.KubernetesFilesystemParams) storage.Kubernet ResourceTags: in.Tags, } } + +// OperatorCertificate provides all the information an operator needs to +// create a TLS listener. +type OperatorCertificate struct { + CACert string + Cert string + PrivateKey string +} + +// IssueOperatorCertificate issues an x509 certificate for use by the specified application operator. +func (c *Client) IssueOperatorCertificate(applicationName string) (OperatorCertificate, error) { + args := params.Entities{[]params.Entity{ + {Tag: applicationName}, + }} + var result params.IssueOperatorCertificateResults + if err := c.facade.FacadeCall("IssueOperatorCertificate", args, &result); err != nil { + return OperatorCertificate{}, errors.Trace(err) + } + if len(result.Results) != 1 { + return OperatorCertificate{}, errors.Errorf("expected one result, got %d", len(result.Results)) + } + certInfo := result.Results[0] + if err := certInfo.Error; err != nil { + return OperatorCertificate{}, errors.Trace(err) + } + return OperatorCertificate{ + CACert: certInfo.CACert, + Cert: certInfo.Cert, + PrivateKey: certInfo.PrivateKey, + }, nil +} diff --git a/api/caasoperatorprovisioner/client_test.go b/api/caasoperatorprovisioner/client_test.go index ece9da21be80..d3c3ece7d4f3 100644 --- a/api/caasoperatorprovisioner/client_test.go +++ b/api/caasoperatorprovisioner/client_test.go @@ -194,3 +194,41 @@ func (s *provisionerSuite) OperatorProvisioningInfo(c *gc.C) { }, }) } + +func (s *provisionerSuite) TestIssueOperatorCertificate(c *gc.C) { + client := newClient(func(objType string, version int, id, request string, a, result interface{}) error { + c.Check(objType, gc.Equals, "CAASOperatorProvisioner") + c.Check(id, gc.Equals, "") + c.Assert(request, gc.Equals, "IssueOperatorCertificate") + c.Assert(a, jc.DeepEquals, params.Entities{Entities: []params.Entity{{Tag: "appymcappface"}}}) + c.Assert(result, gc.FitsTypeOf, ¶ms.IssueOperatorCertificateResults{}) + *(result.(*params.IssueOperatorCertificateResults)) = params.IssueOperatorCertificateResults{ + Results: []params.IssueOperatorCertificateResult{{ + CACert: "ca cert", + Cert: "cert", + PrivateKey: "private key", + }}, + } + return nil + }) + info, err := client.IssueOperatorCertificate("appymcappface") + c.Assert(err, jc.ErrorIsNil) + c.Assert(info, jc.DeepEquals, caasoperatorprovisioner.OperatorCertificate{ + CACert: "ca cert", + Cert: "cert", + PrivateKey: "private key", + }) +} + +func (s *provisionerSuite) TestIssueOperatorCertificateArity(c *gc.C) { + client := newClient(func(objType string, version int, id, request string, a, result interface{}) error { + c.Check(objType, gc.Equals, "CAASOperatorProvisioner") + c.Check(id, gc.Equals, "") + c.Assert(request, gc.Equals, "IssueOperatorCertificate") + c.Assert(a, jc.DeepEquals, params.Entities{Entities: []params.Entity{{Tag: "appymcappface"}}}) + c.Assert(result, gc.FitsTypeOf, ¶ms.IssueOperatorCertificateResults{}) + return nil + }) + _, err := client.IssueOperatorCertificate("appymcappface") + c.Assert(err, gc.ErrorMatches, "expected one result, got 0") +} diff --git a/apiserver/facades/controller/caasoperatorprovisioner/mock_test.go b/apiserver/facades/controller/caasoperatorprovisioner/mock_test.go index 5fe96efea236..bdab379b6dd5 100644 --- a/apiserver/facades/controller/caasoperatorprovisioner/mock_test.go +++ b/apiserver/facades/controller/caasoperatorprovisioner/mock_test.go @@ -70,6 +70,13 @@ func (st *mockState) Model() (caasoperatorprovisioner.Model, error) { return st.model, nil } +func (st *mockState) StateServingInfo() (state.StateServingInfo, error) { + st.MethodCall(st, "StateServingInfo") + return state.StateServingInfo{ + CAPrivateKey: coretesting.CAKey, + }, nil +} + type mockStorageRegistry struct { storage.ProviderRegistry } diff --git a/apiserver/facades/controller/caasoperatorprovisioner/provisioner.go b/apiserver/facades/controller/caasoperatorprovisioner/provisioner.go index 1c0674c1e34a..f138d7718422 100644 --- a/apiserver/facades/controller/caasoperatorprovisioner/provisioner.go +++ b/apiserver/facades/controller/caasoperatorprovisioner/provisioner.go @@ -14,6 +14,7 @@ import ( "github.com/juju/juju/apiserver/params" "github.com/juju/juju/caas" "github.com/juju/juju/caas/kubernetes/provider" + "github.com/juju/juju/cert" "github.com/juju/juju/cloudconfig/podcfg" "github.com/juju/juju/environs/config" "github.com/juju/juju/environs/tags" @@ -39,7 +40,6 @@ type API struct { // NewStateCAASOperatorProvisionerAPI provides the signature required for facade registration. func NewStateCAASOperatorProvisionerAPI(ctx facade.Context) (*API, error) { - authorizer := ctx.Auth() resources := ctx.Resources() @@ -147,6 +147,45 @@ func (a *API) OperatorProvisioningInfo() (params.OperatorProvisioningInfo, error }, nil } +// IssueOperatorCertificate issues an x509 certificate for use by the specified application operator. +func (a *API) IssueOperatorCertificate(args params.Entities) (params.IssueOperatorCertificateResults, error) { + cfg, err := a.state.ControllerConfig() + if err != nil { + return params.IssueOperatorCertificateResults{}, errors.Trace(err) + } + caCert, _ := cfg.CACert() + + si, err := a.state.StateServingInfo() + if err != nil { + return params.IssueOperatorCertificateResults{}, errors.Trace(err) + } + + res := params.IssueOperatorCertificateResults{ + Results: make([]params.IssueOperatorCertificateResult, len(args.Entities)), + } + for i, entity := range args.Entities { + applicationName := entity.Tag + + hostnames := []string{ + applicationName, + } + cert, privateKey, err := cert.NewDefaultServer(caCert, si.CAPrivateKey, hostnames) + if err != nil { + res.Results[i] = params.IssueOperatorCertificateResult{ + Error: common.ServerError(err), + } + continue + } + res.Results[i] = params.IssueOperatorCertificateResult{ + CACert: caCert, + Cert: cert, + PrivateKey: privateKey, + } + } + + return res, nil +} + // CharmStorageParams returns filesystem parameters needed // to provision storage used for a charm operator or workload. func CharmStorageParams( diff --git a/apiserver/facades/controller/caasoperatorprovisioner/provisioner_test.go b/apiserver/facades/controller/caasoperatorprovisioner/provisioner_test.go index 470b1fc9039a..25a67bb40f1b 100644 --- a/apiserver/facades/controller/caasoperatorprovisioner/provisioner_test.go +++ b/apiserver/facades/controller/caasoperatorprovisioner/provisioner_test.go @@ -4,6 +4,13 @@ package caasoperatorprovisioner_test import ( + "crypto" + "crypto/rand" + "crypto/rsa" + "crypto/sha256" + "crypto/x509" + "encoding/pem" + "github.com/juju/errors" jc "github.com/juju/testing/checkers" "github.com/juju/version" @@ -201,3 +208,36 @@ func (s *CAASProvisionerSuite) TestAddresses(c *gc.C) { c.Assert(err, jc.ErrorIsNil) s.st.CheckCallNames(c, "APIHostPortsForAgents") } + +func (s *CAASProvisionerSuite) TestIssueOperatorCertificate(c *gc.C) { + res, err := s.api.IssueOperatorCertificate(params.Entities{ + Entities: []params.Entity{{Tag: "appname"}}, + }) + c.Assert(err, jc.ErrorIsNil) + s.st.CheckCallNames(c, "StateServingInfo") + c.Assert(res.Results, gc.HasLen, 1) + certInfo := res.Results[0] + c.Assert(certInfo.Error, gc.IsNil) + certBlock, rem := pem.Decode([]byte(certInfo.Cert)) + c.Assert(rem, gc.HasLen, 0) + keyBlock, rem := pem.Decode([]byte(certInfo.PrivateKey)) + c.Assert(rem, gc.HasLen, 0) + cert, err := x509.ParseCertificate(certBlock.Bytes) + c.Assert(err, jc.ErrorIsNil) + roots := x509.NewCertPool() + ok := roots.AppendCertsFromPEM([]byte(certInfo.CACert)) + c.Assert(ok, jc.IsTrue) + _, err = cert.Verify(x509.VerifyOptions{ + DNSName: "appname", + Roots: roots, + }) + c.Assert(err, jc.ErrorIsNil) + key, err := x509.ParsePKCS1PrivateKey(keyBlock.Bytes) + c.Assert(err, jc.ErrorIsNil) + toSign := []byte("hello juju") + hash := sha256.Sum256(toSign) + sig, err := rsa.SignPKCS1v15(rand.Reader, key, crypto.SHA256, hash[:]) + c.Assert(err, jc.ErrorIsNil) + err = cert.CheckSignature(x509.SHA256WithRSA, toSign, sig) + c.Assert(err, jc.ErrorIsNil) +} diff --git a/apiserver/facades/controller/caasoperatorprovisioner/state.go b/apiserver/facades/controller/caasoperatorprovisioner/state.go index 21eb3d09fa64..9cea50460d51 100644 --- a/apiserver/facades/controller/caasoperatorprovisioner/state.go +++ b/apiserver/facades/controller/caasoperatorprovisioner/state.go @@ -16,6 +16,7 @@ import ( // required by the CAAS operator provisioner facade. type CAASOperatorProvisionerState interface { ControllerConfig() (controller.Config, error) + StateServingInfo() (state.StateServingInfo, error) WatchApplications() state.StringsWatcher FindEntity(tag names.Tag) (state.Entity, error) Addresses() ([]string, error) diff --git a/apiserver/facades/schema.json b/apiserver/facades/schema.json index 845763226abc..2a80aa9a9516 100644 --- a/apiserver/facades/schema.json +++ b/apiserver/facades/schema.json @@ -6299,6 +6299,17 @@ } } }, + "IssueOperatorCertificate": { + "type": "object", + "properties": { + "Params": { + "$ref": "#/definitions/Entities" + }, + "Result": { + "$ref": "#/definitions/IssueOperatorCertificateResults" + } + } + }, "Life": { "type": "object", "properties": { @@ -6540,6 +6551,44 @@ "port" ] }, + "IssueOperatorCertificateResult": { + "type": "object", + "properties": { + "ca-cert": { + "type": "string" + }, + "cert": { + "type": "string" + }, + "error": { + "$ref": "#/definitions/Error" + }, + "private-key": { + "type": "string" + } + }, + "additionalProperties": false, + "required": [ + "ca-cert", + "cert", + "private-key" + ] + }, + "IssueOperatorCertificateResults": { + "type": "object", + "properties": { + "results": { + "type": "array", + "items": { + "$ref": "#/definitions/IssueOperatorCertificateResult" + } + } + }, + "additionalProperties": false, + "required": [ + "results" + ] + }, "KubernetesFilesystemAttachmentParams": { "type": "object", "properties": { diff --git a/apiserver/params/params.go b/apiserver/params/params.go index 7af363d5174a..49f75672c6f6 100644 --- a/apiserver/params/params.go +++ b/apiserver/params/params.go @@ -314,6 +314,20 @@ type OperatorProvisioningInfo struct { CharmStorage KubernetesFilesystemParams `json:"charm-storage"` } +// IssueOperatorCertificateResult contains an x509 certificate +// for a CAAS Operator. +type IssueOperatorCertificateResult struct { + CACert string `json:"ca-cert"` + Cert string `json:"cert"` + PrivateKey string `json:"private-key"` + Error *Error `json:"error,omitempty"` +} + +// IssueOperatorCertificateResults holds IssueOperatorCertificate results. +type IssueOperatorCertificateResults struct { + Results []IssueOperatorCertificateResult `json:"results"` +} + // PublicAddress holds parameters for the PublicAddress call. type PublicAddress struct { Target string `json:"target"` diff --git a/caas/broker.go b/caas/broker.go index 6c51e8cbac35..bc298e61427f 100644 --- a/caas/broker.go +++ b/caas/broker.go @@ -300,6 +300,7 @@ type Operator struct { Id string Dying bool Status status.StatusInfo + Config *OperatorConfig } // CharmStorageParams defines parameters used to create storage @@ -335,6 +336,9 @@ type OperatorConfig struct { // AgentConf is the contents of the agent.conf file. AgentConf []byte + // OperatorInfo is the contents of the operator.yaml file. + OperatorInfo []byte + // ResourceTags is a set of tags to set on the operator pod. ResourceTags map[string]string } diff --git a/caas/kubernetes/provider/exec/copy_test.go b/caas/kubernetes/provider/exec/copy_test.go index 4a62c5364c00..fab6727d119f 100644 --- a/caas/kubernetes/provider/exec/copy_test.go +++ b/caas/kubernetes/provider/exec/copy_test.go @@ -140,6 +140,9 @@ func (s *execSuite) TestCopyToPod(c *gc.C) { {Name: "gitlab-container"}, }, }, + Status: core.PodStatus{ + Phase: core.PodRunning, + }, } pod.SetName("gitlab-k8s-0") diff --git a/caas/kubernetes/provider/exec/exec.go b/caas/kubernetes/provider/exec/exec.go index 6e03cd9955e5..09418c071673 100644 --- a/caas/kubernetes/provider/exec/exec.go +++ b/caas/kubernetes/provider/exec/exec.go @@ -19,6 +19,7 @@ import ( typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/util/exec" ) var logger = loggo.GetLogger("juju.kubernetes.provider.exec") @@ -33,6 +34,15 @@ type client struct { podGetter typedcorev1.PodInterface } +// ExitError exposes what we need from k8s exec.ExitError +type ExitError interface { + error + String() string + ExitStatus() int +} + +var _ ExitError = exec.CodeExitError{} + // Executor provides the API to exec or cp on a pod inside the cluster. type Executor interface { Exec(params ExecParams, cancel <-chan struct{}) error @@ -40,7 +50,7 @@ type Executor interface { } // NewInCluster returns a in-cluster exec client. -func NewInCluster(modelName string) (Executor, error) { +func NewInCluster(namespace string) (Executor, error) { // creates the in-cluster config. config, err := rest.InClusterConfig() if err != nil { @@ -52,13 +62,13 @@ func NewInCluster(modelName string) (Executor, error) { return nil, errors.Trace(err) } - return New(modelName, c, config), nil + return New(namespace, c, config), nil } // New contructs an executor. // no cross model/namespace allowed. func New(namespace string, clientset kubernetes.Interface, config *rest.Config) Executor { - return new( + return newClient( namespace, clientset, config, @@ -67,7 +77,7 @@ func New(namespace string, clientset kubernetes.Interface, config *rest.Config) ) } -func new( +func newClient( namespace string, clientset kubernetes.Interface, config *rest.Config, @@ -224,9 +234,9 @@ func getValidatedPodContainer( return "", "", errors.NotFoundf("pod %q", podName) } - if pod.Status.Phase == core.PodSucceeded || pod.Status.Phase == core.PodFailed { + if pod.Status.Phase != core.PodRunning { return "", "", errors.New(fmt.Sprintf( - "cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase, + "cannot exec into a container within a %s pod", pod.Status.Phase, )) } @@ -247,5 +257,6 @@ func getValidatedPodContainer( containerName = pod.Spec.Containers[0].Name logger.Debugf("choose first container %q to exec", containerName) } + return podName, containerName, nil } diff --git a/caas/kubernetes/provider/exec/exec_test.go b/caas/kubernetes/provider/exec/exec_test.go index af2337b14c27..8dfdf91963a4 100644 --- a/caas/kubernetes/provider/exec/exec_test.go +++ b/caas/kubernetes/provider/exec/exec_test.go @@ -108,7 +108,7 @@ func (s *execSuite) TestExecParamsValidatePodContainerExistence(c *gc.C) { s.mockPodGetter.EXPECT().List(metav1.ListOptions{}).Times(1). Return(&core.PodList{Items: []core.Pod{pod}}, nil), ) - c.Assert(params.Validate(s.mockPodGetter), gc.ErrorMatches, "cannot exec into a container in a completed pod; current phase is Succeeded") + c.Assert(params.Validate(s.mockPodGetter), gc.ErrorMatches, "cannot exec into a container within a Succeeded pod") // failed - failed pod params = exec.ExecParams{ @@ -127,7 +127,7 @@ func (s *execSuite) TestExecParamsValidatePodContainerExistence(c *gc.C) { s.mockPodGetter.EXPECT().List(metav1.ListOptions{}).Times(1). Return(&core.PodList{Items: []core.Pod{pod}}, nil), ) - c.Assert(params.Validate(s.mockPodGetter), gc.ErrorMatches, "cannot exec into a container in a completed pod; current phase is Failed") + c.Assert(params.Validate(s.mockPodGetter), gc.ErrorMatches, "cannot exec into a container within a Failed pod") // failed - containerName not found params = exec.ExecParams{ @@ -135,7 +135,11 @@ func (s *execSuite) TestExecParamsValidatePodContainerExistence(c *gc.C) { PodName: "gitlab-k8s-uid", ContainerName: "non-existing-container-name", } - pod = core.Pod{} + pod = core.Pod{ + Status: core.PodStatus{ + Phase: core.PodRunning, + }, + } pod.SetUID("gitlab-k8s-uid") pod.SetName("gitlab-k8s-0") gomock.InOrder( @@ -158,6 +162,9 @@ func (s *execSuite) TestExecParamsValidatePodContainerExistence(c *gc.C) { {Name: "gitlab-container"}, }, }, + Status: core.PodStatus{ + Phase: core.PodRunning, + }, } pod.SetUID("gitlab-k8s-uid") pod.SetName("gitlab-k8s-0") @@ -181,6 +188,9 @@ func (s *execSuite) TestExecParamsValidatePodContainerExistence(c *gc.C) { {Name: "gitlab-container"}, }, }, + Status: core.PodStatus{ + Phase: core.PodRunning, + }, } pod.SetUID("gitlab-k8s-uid") pod.SetName("gitlab-k8s-0") @@ -213,6 +223,9 @@ func (s *execSuite) TestExec(c *gc.C) { {Name: "gitlab-container"}, }, }, + Status: core.PodStatus{ + Phase: core.PodRunning, + }, } pod.SetUID("gitlab-k8s-uid") pod.SetName("gitlab-k8s-0") diff --git a/caas/kubernetes/provider/exec/export_test.go b/caas/kubernetes/provider/exec/export_test.go index 889088bce84c..f788bd742b3d 100644 --- a/caas/kubernetes/provider/exec/export_test.go +++ b/caas/kubernetes/provider/exec/export_test.go @@ -9,7 +9,7 @@ import ( var ( ProcessEnv = processEnv - NewForTest = new + NewForTest = newClient ) func (ep *ExecParams) Validate(podGetter typedcorev1.PodInterface) error { diff --git a/caas/kubernetes/provider/k8s.go b/caas/kubernetes/provider/k8s.go index 6d77de97a4ba..7761645507b9 100644 --- a/caas/kubernetes/provider/k8s.go +++ b/caas/kubernetes/provider/k8s.go @@ -75,15 +75,16 @@ const ( annotationPrefix = "juju.io" + operatorContainerName = "juju-operator" + // OperatorPodIPEnvName is the environment name for operator pod IP. OperatorPodIPEnvName = "JUJU_OPERATOR_POD_IP" // OperatorPodIPEnvName is the environment name for operator service IP. OperatorServiceIPEnvName = "JUJU_OPERATOR_SERVICE_IP" - // OperatorInfoFile is the file containing info about the operator, - // copied to the workload pod so the hook tools and juju-run can function. - OperatorInfoFile = "operator.yaml" + // OperatorNamespaceEnvName is the environment name for k8s namespace the operator is in. + OperatorNamespaceEnvName = "JUJU_OPERATOR_NAMESPACE" // JujuRunServerSocketPort is the port used by juju run callbacks. JujuRunServerSocketPort = 30666 @@ -478,9 +479,9 @@ func (k *kubernetesClient) APIVersion() (string, error) { // application exists, and whether the operator is terminating. func (k *kubernetesClient) OperatorExists(appName string) (caas.OperatorState, error) { var result caas.OperatorState - - statefulsets := k.client().AppsV1().StatefulSets(k.namespace) - operator, err := statefulsets.Get(k.operatorName(appName), v1.GetOptions{IncludeUninitialized: true}) + operatorName := k.operatorName(appName) + statefulSets := k.client().AppsV1().StatefulSets(k.namespace) + operator, err := statefulSets.Get(operatorName, v1.GetOptions{IncludeUninitialized: true}) if k8serrors.IsNotFound(err) { return result, nil } @@ -533,7 +534,7 @@ func (k *kubernetesClient) EnsureOperator(appName, agentPath string, config *caa cmName := operatorConfigMapName(operatorName) // TODO(caas) use secrets for storing agent password? - if config.AgentConf == nil { + if config.AgentConf == nil && config.OperatorInfo == nil { // We expect that the config map already exists, // so make sure it does. if _, err := k.getConfigMap(cmName); err != nil { @@ -2247,6 +2248,16 @@ func (k *kubernetesClient) volumeInfoForPVC(vol core.Volume, volMount core.Volum // Operator returns an Operator with current status and life details. func (k *kubernetesClient) Operator(appName string) (*caas.Operator, error) { + operatorName := k.operatorName(appName) + statefulSets := k.client().AppsV1().StatefulSets(k.namespace) + operator, err := statefulSets.Get(operatorName, v1.GetOptions{IncludeUninitialized: true}) + if k8serrors.IsNotFound(err) { + return nil, errors.NotFoundf("operator %s", appName) + } + if err != nil { + return nil, errors.Trace(err) + } + pods := k.client().CoreV1().Pods(k.namespace) podsList, err := pods.List(v1.ListOptions{ LabelSelector: operatorSelector(appName), @@ -2265,6 +2276,34 @@ func (k *kubernetesClient) Operator(appName string) (*caas.Operator, error) { if err != nil { return nil, errors.Trace(err) } + + cfg := caas.OperatorConfig{} + if ver, ok := operator.Annotations[labelVersion]; ok { + cfg.Version, err = version.Parse(ver) + if err != nil { + return nil, errors.Trace(err) + } + } + for _, container := range operator.Spec.Template.Spec.Containers { + if container.Name == operatorContainerName { + cfg.OperatorImagePath = container.Image + break + } + } + configMaps := k.client().CoreV1().ConfigMaps(k.namespace) + configMap, err := configMaps.Get(operatorConfigMapName(operatorName), v1.GetOptions{IncludeUninitialized: true}) + if err != nil && !k8serrors.IsNotFound(err) { + return nil, errors.Trace(err) + } + if configMap != nil { + if agentConf, ok := configMap.Data[operatorConfigMapAgentConfKey(appName)]; ok { + cfg.AgentConf = []byte(agentConf) + } + if operatorInfo, ok := configMap.Data[caas.OperatorInfoFile]; ok { + cfg.OperatorInfo = []byte(operatorInfo) + } + } + return &caas.Operator{ Id: string(opPod.UID), Dying: terminated, @@ -2273,6 +2312,7 @@ func (k *kubernetesClient) Operator(appName string) (*caas.Operator, error) { Message: statusMessage, Since: &since, }, + Config: &cfg, }, nil } @@ -2435,7 +2475,7 @@ func operatorPod(podName, appName, operatorServiceIP, agentPath, operatorImagePa }, Spec: core.PodSpec{ Containers: []core.Container{{ - Name: "juju-operator", + Name: operatorContainerName, ImagePullPolicy: core.PullIfNotPresent, Image: operatorImagePath, WorkingDir: jujuDataDir, @@ -2462,11 +2502,23 @@ func operatorPod(podName, appName, operatorServiceIP, agentPath, operatorImagePa }, }, }, + { + Name: OperatorNamespaceEnvName, + ValueFrom: &core.EnvVarSource{ + FieldRef: &core.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, }, VolumeMounts: []core.VolumeMount{{ Name: configVolName, MountPath: filepath.Join(agent.Dir(agentPath, appTag), TemplateFileNameAgentConf), SubPath: TemplateFileNameAgentConf, + }, { + Name: configVolName, + MountPath: filepath.Join(agent.Dir(agentPath, appTag), caas.OperatorInfoFile), + SubPath: caas.OperatorInfoFile, }}, }}, Volumes: []core.Volume{{ @@ -2477,8 +2529,11 @@ func operatorPod(podName, appName, operatorServiceIP, agentPath, operatorImagePa Name: configMapName, }, Items: []core.KeyToPath{{ - Key: appName + "-agent.conf", + Key: operatorConfigMapAgentConfKey(appName), Path: TemplateFileNameAgentConf, + }, { + Key: caas.OperatorInfoFile, + Path: caas.OperatorInfoFile, }}, }, }, @@ -2487,6 +2542,10 @@ func operatorPod(podName, appName, operatorServiceIP, agentPath, operatorImagePa }, nil } +func operatorConfigMapAgentConfKey(appName string) string { + return appName + "-agent.conf" +} + // operatorConfigMap returns a *core.ConfigMap for the operator pod // of the specified application, with the specified configuration. func operatorConfigMap(appName, name string, labels map[string]string, config *caas.OperatorConfig) *core.ConfigMap { @@ -2496,7 +2555,8 @@ func operatorConfigMap(appName, name string, labels map[string]string, config *c Labels: labels, }, Data: map[string]string{ - appName + "-agent.conf": string(config.AgentConf), + operatorConfigMapAgentConfKey(appName): string(config.AgentConf), + caas.OperatorInfoFile: string(config.OperatorInfo), }, } } diff --git a/caas/kubernetes/provider/k8s_test.go b/caas/kubernetes/provider/k8s_test.go index ccc92d4f0412..a6ee6a641f94 100644 --- a/caas/kubernetes/provider/k8s_test.go +++ b/caas/kubernetes/provider/k8s_test.go @@ -313,11 +313,23 @@ $JUJU_TOOLS_DIR/jujud caasoperator --application-name=test --debug }, }, }, + { + Name: "JUJU_OPERATOR_NAMESPACE", + ValueFrom: &core.EnvVarSource{ + FieldRef: &core.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, }, VolumeMounts: []core.VolumeMount{{ Name: "test-operator-config", MountPath: "path/to/agent/agents/application-test/template-agent.conf", SubPath: "template-agent.conf", + }, { + Name: "test-operator-config", + MountPath: "path/to/agent/agents/application-test/operator.yaml", + SubPath: "operator.yaml", }, { Name: "charm", MountPath: "path/to/agent/agents", @@ -333,6 +345,9 @@ $JUJU_TOOLS_DIR/jujud caasoperator --application-name=test --debug Items: []core.KeyToPath{{ Key: "test-agent.conf", Path: "template-agent.conf", + }, { + Key: "operator.yaml", + Path: "operator.yaml", }}, }, }, @@ -462,8 +477,9 @@ func (s *K8sSuite) TestOperatorPodConfig(c *gc.C) { }) c.Assert(pod.Spec.Containers, gc.HasLen, 1) c.Assert(pod.Spec.Containers[0].Image, gc.Equals, "jujusolutions/jujud-operator") - c.Assert(pod.Spec.Containers[0].VolumeMounts, gc.HasLen, 1) + c.Assert(pod.Spec.Containers[0].VolumeMounts, gc.HasLen, 2) c.Assert(pod.Spec.Containers[0].VolumeMounts[0].MountPath, gc.Equals, "/var/lib/juju/agents/application-gitlab/template-agent.conf") + c.Assert(pod.Spec.Containers[0].VolumeMounts[1].MountPath, gc.Equals, "/var/lib/juju/agents/application-gitlab/operator.yaml") podEnv := make(map[string]string) for _, env := range pod.Spec.Containers[0].Env { @@ -929,6 +945,7 @@ func (s *K8sBrokerSuite) TestEnsureOperatorCreate(c *gc.C) { }, Data: map[string]string{ "test-agent.conf": "agent-conf-data", + "operator.yaml": "operator-info-data", }, } statefulSetArg := operatorStatefulSetArg(1, "test-operator-storage") @@ -958,6 +975,7 @@ func (s *K8sBrokerSuite) TestEnsureOperatorCreate(c *gc.C) { OperatorImagePath: "/path/to/image", Version: version.MustParse("2.99.0"), AgentConf: []byte("agent-conf-data"), + OperatorInfo: []byte("operator-info-data"), ResourceTags: map[string]string{"fred": "mary"}, CharmStorage: caas.CharmStorageParams{ Size: uint64(10), @@ -980,6 +998,7 @@ func (s *K8sBrokerSuite) TestEnsureOperatorUpdate(c *gc.C) { }, Data: map[string]string{ "test-agent.conf": "agent-conf-data", + "operator.yaml": "operator-info-data", }, } statefulSetArg := operatorStatefulSetArg(1, "test-operator-storage") @@ -1011,6 +1030,7 @@ func (s *K8sBrokerSuite) TestEnsureOperatorUpdate(c *gc.C) { OperatorImagePath: "/path/to/image", Version: version.MustParse("2.99.0"), AgentConf: []byte("agent-conf-data"), + OperatorInfo: []byte("operator-info-data"), ResourceTags: map[string]string{"fred": "mary"}, CharmStorage: caas.CharmStorageParams{ Size: uint64(10), @@ -3083,9 +3103,38 @@ func (s *K8sBrokerSuite) TestOperator(c *gc.C) { Message: "test message.", }, } + ss := apps.StatefulSet{ + ObjectMeta: v1.ObjectMeta{ + Annotations: map[string]string{ + "juju-version": "2.99.0", + }, + }, + Spec: apps.StatefulSetSpec{ + Template: core.PodTemplateSpec{ + Spec: core.PodSpec{ + Containers: []core.Container{{ + Name: "juju-operator", + Image: "test-image", + }}, + }, + }, + }, + } + cm := core.ConfigMap{ + Data: map[string]string{ + "test-agent.conf": "agent-conf-data", + "operator.yaml": "operator-info-data", + }, + } gomock.InOrder( + s.mockStatefulSets.EXPECT().Get("juju-operator-test", v1.GetOptions{IncludeUninitialized: true}).Times(1). + Return(nil, s.k8sNotFoundError()), + s.mockStatefulSets.EXPECT().Get("test-operator", v1.GetOptions{IncludeUninitialized: true}).Times(1). + Return(&ss, nil), s.mockPods.EXPECT().List(v1.ListOptions{LabelSelector: "juju-operator==test"}).Times(1). Return(&core.PodList{Items: []core.Pod{opPod}}, nil), + s.mockConfigMaps.EXPECT().Get("test-operator-config", v1.GetOptions{IncludeUninitialized: true}).Times(1). + Return(&cm, nil), ) operator, err := s.broker.Operator("test") @@ -3093,13 +3142,38 @@ func (s *K8sBrokerSuite) TestOperator(c *gc.C) { c.Assert(operator.Status.Status, gc.Equals, status.Allocating) c.Assert(operator.Status.Message, gc.Equals, "test message.") + c.Assert(operator.Config.Version, gc.Equals, version.MustParse("2.99.0")) + c.Assert(operator.Config.OperatorImagePath, gc.Equals, "test-image") + c.Assert(operator.Config.AgentConf, gc.DeepEquals, []byte("agent-conf-data")) + c.Assert(operator.Config.OperatorInfo, gc.DeepEquals, []byte("operator-info-data")) } func (s *K8sBrokerSuite) TestOperatorNoPodFound(c *gc.C) { ctrl := s.setupController(c) defer ctrl.Finish() + ss := apps.StatefulSet{ + ObjectMeta: v1.ObjectMeta{ + Annotations: map[string]string{ + "juju-version": "2.99.0", + }, + }, + Spec: apps.StatefulSetSpec{ + Template: core.PodTemplateSpec{ + Spec: core.PodSpec{ + Containers: []core.Container{{ + Name: "juju-operator", + Image: "test-image", + }}, + }, + }, + }, + } gomock.InOrder( + s.mockStatefulSets.EXPECT().Get("juju-operator-test", v1.GetOptions{IncludeUninitialized: true}).Times(1). + Return(nil, s.k8sNotFoundError()), + s.mockStatefulSets.EXPECT().Get("test-operator", v1.GetOptions{IncludeUninitialized: true}).Times(1). + Return(&ss, nil), s.mockPods.EXPECT().List(v1.ListOptions{LabelSelector: "juju-operator==test"}).Times(1). Return(&core.PodList{Items: []core.Pod{}}, nil), ) diff --git a/caas/operator.go b/caas/operator.go new file mode 100644 index 000000000000..4eead2364fad --- /dev/null +++ b/caas/operator.go @@ -0,0 +1,68 @@ +// Copyright 2019 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package caas + +import ( + "github.com/juju/errors" + "gopkg.in/yaml.v2" +) + +const ( + // OperatorInfoFile is the file which contains certificate information for + // the operator. + OperatorInfoFile = "operator.yaml" + + // OperatorClientInfoFile is the file containing info about the operator, + // copied to the workload pod so the hook tools and juju-run can function. + OperatorClientInfoFile = "operator-client.yaml" + + // OperatorClientInfoCacheFile is a cache of OperatorClientInfoFile stored on the operator. + OperatorClientInfoCacheFile = "operator-client-cache.yaml" + + // CACertFile is the file containing the cluster CA. + CACertFile = "ca.crt" +) + +// OperatorInfo contains information needed by CAAS operators +type OperatorInfo struct { + CACert string `yaml:"ca-cert,omitempty"` + Cert string `yaml:"cert,omitempty"` + PrivateKey string `yaml:"private-key,omitempty"` +} + +// UnmarshalOperatorInfo parses OperatorInfo yaml data. +func UnmarshalOperatorInfo(data []byte) (*OperatorInfo, error) { + var oi OperatorInfo + err := yaml.Unmarshal(data, &oi) + if err != nil { + return nil, errors.Trace(err) + } + return &oi, nil +} + +// Marshal OperatorInfo into yaml data. +func (info OperatorInfo) Marshal() ([]byte, error) { + return yaml.Marshal(info) +} + +// OperatorClientInfo contains information needed by CAAS tools. +type OperatorClientInfo struct { + ServiceAddress string `yaml:"service-address,omitempty"` + Token string `yaml:"token,omitempty"` +} + +// UnmarshalOperatorClientInfo parses OperatorClientInfo yaml data. +func UnmarshalOperatorClientInfo(data []byte) (*OperatorClientInfo, error) { + var oi OperatorClientInfo + err := yaml.Unmarshal(data, &oi) + if err != nil { + return nil, errors.Trace(err) + } + return &oi, nil +} + +// Marshal OperatorClientInfo into yaml data. +func (info OperatorClientInfo) Marshal() ([]byte, error) { + return yaml.Marshal(info) +} diff --git a/caas/operator_test.go b/caas/operator_test.go new file mode 100644 index 000000000000..f98fba0d3b7c --- /dev/null +++ b/caas/operator_test.go @@ -0,0 +1,43 @@ +// Copyright 2019 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package caas_test + +import ( + jc "github.com/juju/testing/checkers" + gc "gopkg.in/check.v1" + + "github.com/juju/juju/caas" + "github.com/juju/juju/testing" +) + +type OperatorSuite struct { + testing.BaseSuite +} + +var _ = gc.Suite(&OperatorSuite{}) + +func (s *OperatorSuite) TestOperatorInfo(c *gc.C) { + info := caas.OperatorInfo{ + CACert: "ca cert", + Cert: "cert", + PrivateKey: "private key", + } + marshaled, err := info.Marshal() + c.Assert(err, jc.ErrorIsNil) + unmarshaledInfo, err := caas.UnmarshalOperatorInfo(marshaled) + c.Assert(err, jc.ErrorIsNil) + c.Assert(*unmarshaledInfo, jc.DeepEquals, info) +} + +func (s *OperatorSuite) TestOperatorClientInfo(c *gc.C) { + info := caas.OperatorClientInfo{ + ServiceAddress: "1.2.3.4", + Token: "token", + } + marshaled, err := info.Marshal() + c.Assert(err, jc.ErrorIsNil) + unmarshaledInfo, err := caas.UnmarshalOperatorClientInfo(marshaled) + c.Assert(err, jc.ErrorIsNil) + c.Assert(*unmarshaledInfo, jc.DeepEquals, info) +} diff --git a/cert/cert.go b/cert/cert.go index c87caa50b08a..4ce2875c8166 100644 --- a/cert/cert.go +++ b/cert/cert.go @@ -36,7 +36,7 @@ func Verify(srvCertPEM, caCertPEM string, when time.Time) error { } // NewLeafKeyBits is the number of bits used for the cert.NewLeaf call. -var NewLeafKeyBits = 2048 +var NewLeafKeyBits = 3072 // NewDefaultServer generates a certificate/key pair suitable for use by a server, with an // expiry time of 10 years. @@ -108,5 +108,5 @@ var NewCA = newCA func newCA(commonName, UUID string, expiry time.Time) (certPEM, keyPEM string, err error) { return cert.NewCA( fmt.Sprintf("juju-generated CA for model %q", commonName), - UUID, expiry, 0) + UUID, expiry, NewLeafKeyBits) } diff --git a/cmd/jujud/agent/caasoperator.go b/cmd/jujud/agent/caasoperator.go index 769d98edde3c..cce9557ace00 100644 --- a/cmd/jujud/agent/caasoperator.go +++ b/cmd/jujud/agent/caasoperator.go @@ -38,6 +38,7 @@ import ( "github.com/juju/juju/worker/gate" "github.com/juju/juju/worker/introspection" "github.com/juju/juju/worker/logsender" + "github.com/juju/juju/worker/uniter" "github.com/juju/juju/worker/upgradesteps" ) @@ -66,16 +67,16 @@ type CaasOperatorAgent struct { prometheusRegistry *prometheus.Registry - newExecClient func(modelName string) (exec.Executor, error) - runListenerSocket func() (*sockets.Socket, error) + newExecClient func(namespace string) (exec.Executor, error) + runListenerSocket func(*uniter.SocketConfig) (*sockets.Socket, error) } // NewCaasOperatorAgent creates a new CAASOperatorAgent instance properly initialized. func NewCaasOperatorAgent( ctx *cmd.Context, bufferedLogger *logsender.BufferedLogWriter, - newExecClient func(modelName string) (exec.Executor, error), - runListenerSocket func() (*sockets.Socket, error), + newExecClient func(namespace string) (exec.Executor, error), + runListenerSocket func(*uniter.SocketConfig) (*sockets.Socket, error), ) (*CaasOperatorAgent, error) { prometheusRegistry, err := newPrometheusRegistry() if err != nil { diff --git a/cmd/jujud/agent/caasoperator/manifolds.go b/cmd/jujud/agent/caasoperator/manifolds.go index 35f613a6f947..3521e9c5201f 100644 --- a/cmd/jujud/agent/caasoperator/manifolds.go +++ b/cmd/jujud/agent/caasoperator/manifolds.go @@ -96,10 +96,10 @@ type ManifoldsConfig struct { PreviousAgentVersion version.Number // NewExecClient provides k8s execframework functionality for juju run commands or actions. - NewExecClient func(modelName string) (exec.Executor, error) + NewExecClient func(namespace string) (exec.Executor, error) // RunListenerSocket returns a function to create a run listener socket. - RunListenerSocket func() (*sockets.Socket, error) + RunListenerSocket func(*uniter.SocketConfig) (*sockets.Socket, error) } // Manifolds returns a set of co-configured manifolds covering the various diff --git a/cmd/jujud/main.go b/cmd/jujud/main.go index 6d133899751b..6eea8c1876e5 100644 --- a/cmd/jujud/main.go +++ b/cmd/jujud/main.go @@ -4,6 +4,8 @@ package main import ( + "crypto/tls" + "crypto/x509" "fmt" "io" "io/ioutil" @@ -20,6 +22,7 @@ import ( "github.com/juju/loggo" proxyutils "github.com/juju/proxy" "github.com/juju/utils/exec" + "gopkg.in/juju/names.v3" jujucmd "github.com/juju/juju/cmd" agentcmd "github.com/juju/juju/cmd/jujud/agent" @@ -28,10 +31,11 @@ import ( cmdutil "github.com/juju/juju/cmd/jujud/util" components "github.com/juju/juju/component/all" "github.com/juju/juju/core/machinelock" - "github.com/juju/juju/juju/names" + jujunames "github.com/juju/juju/juju/names" "github.com/juju/juju/juju/sockets" k8sexec "github.com/juju/juju/caas/kubernetes/provider/exec" + // Import the providers. _ "github.com/juju/juju/provider/all" "github.com/juju/juju/upgrades" @@ -92,12 +96,57 @@ func getwd() (string, error) { return abs, nil } +func getSocket() (sockets.Socket, error) { + var err error + socket := sockets.Socket{} + socket.Address, err = getenv("JUJU_AGENT_SOCKET_ADDRESS") + if err != nil { + return sockets.Socket{}, err + } + socket.Network, err = getenv("JUJU_AGENT_SOCKET_NETWORK") + if err != nil { + return sockets.Socket{}, err + } + + // If we are not connecting over tcp, no need for TLS. + if socket.Network != "tcp" { + return socket, nil + } + + caCertFile, err := getenv("JUJU_AGENT_CA_CERT") + if err != nil { + return sockets.Socket{}, err + } + caCert, err := ioutil.ReadFile(caCertFile) + if err != nil { + return sockets.Socket{}, errors.Annotatef(err, "reading %s", caCertFile) + } + rootCAs := x509.NewCertPool() + if ok := rootCAs.AppendCertsFromPEM(caCert); ok == false { + return sockets.Socket{}, errors.Errorf("invalid ca certificate") + } + + unitName, err := getenv("JUJU_UNIT_NAME") + if err != nil { + return sockets.Socket{}, err + } + application, err := names.UnitApplication(unitName) + if err != nil { + return sockets.Socket{}, errors.Trace(err) + } + socket.TLSConfig = &tls.Config{ + RootCAs: rootCAs, + ServerName: application, + } + return socket, nil +} + // hookToolMain uses JUJU_CONTEXT_ID and JUJU_AGENT_SOCKET_ADDRESS to ask a running unit agent // to execute a Command on our behalf. Individual commands should be exposed // by symlinking the command name to this executable. func hookToolMain(commandName string, ctx *cmd.Context, args []string) (code int, err error) { code = 1 - contextId, err := getenv("JUJU_CONTEXT_ID") + contextID, err := getenv("JUJU_CONTEXT_ID") if err != nil { return } @@ -106,17 +155,13 @@ func hookToolMain(commandName string, ctx *cmd.Context, args []string) (code int return } req := jujuc.Request{ - ContextId: contextId, + ContextId: contextID, Dir: dir, CommandName: commandName, Args: args[1:], + Token: os.Getenv("JUJU_AGENT_TOKEN"), } - socket := sockets.Socket{} - socket.Address, err = getenv("JUJU_AGENT_SOCKET_ADDRESS") - if err != nil { - return - } - socket.Network, err = getenv("JUJU_AGENT_SOCKET_NETWORK") + socket, err := getSocket() if err != nil { return } @@ -234,9 +279,9 @@ func Main(args []string) int { var code int commandName := filepath.Base(args[0]) switch commandName { - case names.Jujud: + case jujunames.Jujud: code, err = jujuDMain(args, ctx) - case names.JujuRun: + case jujunames.JujuRun: lock, err := machinelock.New(machinelock.Config{ AgentName: "juju-run", Clock: clock.WallClock, @@ -249,9 +294,9 @@ func Main(args []string) int { run := &RunCommand{MachineLock: lock} code = cmd.Main(run, ctx, args[1:]) } - case names.JujuDumpLogs: + case jujunames.JujuDumpLogs: code = cmd.Main(dumplogs.NewCommand(), ctx, args[1:]) - case names.JujuIntrospect: + case jujunames.JujuIntrospect: code = cmd.Main(&introspect.IntrospectCommand{}, ctx, args[1:]) default: code, err = hookToolMain(commandName, ctx, args) diff --git a/cmd/jujud/main_test.go b/cmd/jujud/main_test.go index 7ca67bb17363..06ade9bf259a 100644 --- a/cmd/jujud/main_test.go +++ b/cmd/jujud/main_test.go @@ -202,7 +202,7 @@ func (s *HookToolMainSuite) SetUpSuite(c *gc.C) { return &RemoteCommand{}, nil } s.sockPath = osDependentSockPath(c) - srv, err := jujuc.NewServer(factory, s.sockPath) + srv, err := jujuc.NewServer(factory, s.sockPath, "") c.Assert(err, jc.ErrorIsNil) s.server = srv go func() { diff --git a/cmd/jujud/run.go b/cmd/jujud/run.go index 7f9804a58622..240a00b900da 100644 --- a/cmd/jujud/run.go +++ b/cmd/jujud/run.go @@ -4,6 +4,8 @@ package main import ( + "crypto/tls" + "crypto/x509" "fmt" "io/ioutil" "os" @@ -19,9 +21,10 @@ import ( "github.com/juju/os/series" "github.com/juju/utils/exec" "gopkg.in/juju/names.v3" + "gopkg.in/yaml.v2" "github.com/juju/juju/agent" - "github.com/juju/juju/caas/kubernetes/provider" + "github.com/juju/juju/caas" jujucmd "github.com/juju/juju/cmd" cmdutil "github.com/juju/juju/cmd/jujud/util" "github.com/juju/juju/core/machinelock" @@ -162,14 +165,37 @@ func (c *RunCommand) Run(ctx *cmd.Context) error { return cmd.NewRcPassthroughError(result.Code) } -func (c *RunCommand) getSocket(baseDir string) sockets.Socket { - // juju-run on k8s uses an operator yaml file - ipAddrFile := filepath.Join(baseDir, provider.OperatorInfoFile) - _, err := os.Stat(ipAddrFile) - isRemote := err == nil +func (c *RunCommand) getSocket(op *caas.OperatorClientInfo) (sockets.Socket, error) { + if op == nil { + paths := uniter.NewPaths(cmdutil.DataDir, c.unit, nil) + return paths.Runtime.LocalJujuRunSocket.Client, nil + } + + baseDir := agent.Dir(cmdutil.DataDir, c.unit) + caCertFile := filepath.Join(baseDir, caas.CACertFile) + caCert, err := ioutil.ReadFile(caCertFile) + if err != nil { + return sockets.Socket{}, errors.Annotatef(err, "reading %s", caCertFile) + } + rootCAs := x509.NewCertPool() + if ok := rootCAs.AppendCertsFromPEM(caCert); ok == false { + return sockets.Socket{}, errors.Errorf("invalid ca certificate") + } - runtimePaths := uniter.NewPaths(cmdutil.DataDir, c.unit, isRemote) - return runtimePaths.Runtime.JujuRunSocket + application, err := names.UnitApplication(c.unit.Id()) + if err != nil { + return sockets.Socket{}, errors.Trace(err) + } + + socketConfig := &uniter.SocketConfig{ + ServiceAddress: op.ServiceAddress, + TLSConfig: &tls.Config{ + RootCAs: rootCAs, + ServerName: application, + }, + } + paths := uniter.NewPaths(cmdutil.DataDir, c.unit, socketConfig) + return paths.Runtime.RemoteJujuRunSocket.Client, nil } func (c *RunCommand) executeInUnitContext() (*exec.ExecResponse, error) { @@ -191,7 +217,28 @@ func (c *RunCommand) executeInUnitContext() (*exec.ExecResponse, error) { if len(c.remoteUnitName) > 0 && relationId == -1 { return nil, errors.Errorf("remote unit: %s, provided without a relation", c.remoteUnitName) } - client, err := sockets.Dial(c.getSocket(unitDir)) + + // juju-run on k8s uses an operator yaml file + infoFilePath := filepath.Join(unitDir, caas.OperatorClientInfoFile) + infoFileBytes, err := ioutil.ReadFile(infoFilePath) + if err != nil && !os.IsNotExist(err) { + return nil, errors.Annotatef(err, "reading %s", infoFilePath) + } + var operatorClientInfo *caas.OperatorClientInfo + if infoFileBytes != nil { + op := caas.OperatorClientInfo{} + err = yaml.Unmarshal(infoFileBytes, &op) + if err != nil { + return nil, errors.Trace(err) + } + operatorClientInfo = &op + } + + socket, err := c.getSocket(operatorClientInfo) + if err != nil { + return nil, errors.Annotate(err, "configuring juju run socket") + } + client, err := sockets.Dial(socket) if err != nil { return nil, errors.Annotate(err, "dialing juju run socket") } @@ -205,6 +252,9 @@ func (c *RunCommand) executeInUnitContext() (*exec.ExecResponse, error) { RemoteUnitName: c.remoteUnitName, ForceRemoteUnit: c.forceRemoteUnit, } + if operatorClientInfo != nil { + args.Token = operatorClientInfo.Token + } err = client.Call(uniter.JujuRunEndpoint, args, &result) return &result, errors.Trace(err) } diff --git a/featuretests/agent_caasoperator_test.go b/featuretests/agent_caasoperator_test.go index 2428f2928122..9eb491af9a61 100644 --- a/featuretests/agent_caasoperator_test.go +++ b/featuretests/agent_caasoperator_test.go @@ -4,6 +4,7 @@ package featuretests import ( + "io/ioutil" "os" "path/filepath" "runtime" @@ -16,6 +17,7 @@ import ( "gopkg.in/juju/worker.v1/dependency" "github.com/juju/juju/agent" + "github.com/juju/juju/caas" "github.com/juju/juju/caas/kubernetes/provider/exec" jujudagent "github.com/juju/juju/cmd/jujud/agent" "github.com/juju/juju/cmd/jujud/agent/agenttest" @@ -25,6 +27,7 @@ import ( coretesting "github.com/juju/juju/testing" "github.com/juju/juju/tools" "github.com/juju/juju/worker/logsender" + "github.com/juju/juju/worker/uniter" ) const ( @@ -59,9 +62,24 @@ func (s *CAASOperatorSuite) primeAgent(c *gc.C) (*state.Application, agent.Confi err := app.SetPassword(initialApplicationPassword) c.Assert(err, jc.ErrorIsNil) conf, tools := s.PrimeAgent(c, app.Tag(), initialApplicationPassword) + s.primeOperator(c, app) return app, conf, tools } +func (s *CAASOperatorSuite) primeOperator(c *gc.C, app *state.Application) { + baseDir := agent.Dir(s.DataDir(), app.Tag()) + file := filepath.Join(baseDir, caas.OperatorInfoFile) + info := caas.OperatorInfo{ + CACert: coretesting.CACert, + Cert: coretesting.ServerCert, + PrivateKey: coretesting.ServerKey, + } + data, err := info.Marshal() + c.Assert(err, jc.ErrorIsNil) + err = ioutil.WriteFile(file, data, 0644) + c.Assert(err, jc.ErrorIsNil) +} + func (s *CAASOperatorSuite) newAgent(c *gc.C, app *state.Application) *jujudagent.CaasOperatorAgent { a, err := s.newCaasOperatorAgent(c, nil, s.newBufferedLogWriter()) c.Assert(err, jc.ErrorIsNil) @@ -155,7 +173,7 @@ func sockPath(c *gc.C) sockets.Socket { } func (s *CAASOperatorSuite) newCaasOperatorAgent(c *gc.C, ctx *cmd.Context, bufferedLogger *logsender.BufferedLogWriter) (*jujudagent.CaasOperatorAgent, error) { - a, err := jujudagent.NewCaasOperatorAgent(ctx, s.newBufferedLogWriter(), newExecClient, func() (*sockets.Socket, error) { + a, err := jujudagent.NewCaasOperatorAgent(ctx, s.newBufferedLogWriter(), newExecClient, func(*uniter.SocketConfig) (*sockets.Socket, error) { socket := sockPath(c) return &socket, nil }) diff --git a/juju/sockets/package_test.go b/juju/sockets/package_test.go new file mode 100644 index 000000000000..a80c24b7b26b --- /dev/null +++ b/juju/sockets/package_test.go @@ -0,0 +1,14 @@ +// Copyright 2019 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package sockets_test + +import ( + stdtesting "testing" + + coretesting "github.com/juju/juju/testing" +) + +func Test(t *stdtesting.T) { + coretesting.MgoTestPackage(t) +} diff --git a/juju/sockets/sockets.go b/juju/sockets/sockets.go index cb5b0a27c0b5..671d9856ba8a 100644 --- a/juju/sockets/sockets.go +++ b/juju/sockets/sockets.go @@ -5,6 +5,8 @@ package sockets import ( + "crypto/tls" + "github.com/juju/loggo" // this is only here so that godeps will produce the right deps on all platforms _ "gopkg.in/natefinch/npipe.v2" @@ -14,10 +16,12 @@ var logger = loggo.GetLogger("juju.juju.sockets") // Socket represents the set of parameters to use for socket to dial/listen. type Socket struct { - // Network is the socket network. Network string // Address is the socket address. Address string + + // TLSConfig is set when the socket should also establish a TLS connection. + TLSConfig *tls.Config } diff --git a/juju/sockets/sockets_nix.go b/juju/sockets/sockets_nix.go index 76a293dc2095..18111244f799 100644 --- a/juju/sockets/sockets_nix.go +++ b/juju/sockets/sockets_nix.go @@ -7,6 +7,8 @@ package sockets import ( + "crypto/tls" + "io" "io/ioutil" "net" "net/rpc" @@ -18,10 +20,31 @@ import ( ) func Dial(soc Socket) (*rpc.Client, error) { - return rpc.Dial(soc.Network, soc.Address) + var conn io.ReadWriteCloser + var err error + if soc.TLSConfig != nil { + conn, err = tls.Dial(soc.Network, soc.Address, soc.TLSConfig) + } else { + conn, err = net.Dial(soc.Network, soc.Address) + } + if err != nil { + return nil, errors.Trace(err) + } + return rpc.NewClient(conn), nil +} + +func Listen(soc Socket) (net.Listener, error) { + listener, err := innerListen(soc) + if err != nil { + return nil, err + } + if soc.TLSConfig != nil { + return tls.NewListener(listener, soc.TLSConfig), nil + } + return listener, nil } -func Listen(soc Socket) (listener net.Listener, err error) { +func innerListen(soc Socket) (listener net.Listener, err error) { if soc.Network == "tcp" { return net.Listen(soc.Network, soc.Address) } diff --git a/juju/sockets/sockets_test.go b/juju/sockets/sockets_test.go new file mode 100644 index 000000000000..e2aeeb1f5f64 --- /dev/null +++ b/juju/sockets/sockets_test.go @@ -0,0 +1,101 @@ +// Copyright 2019 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package sockets_test + +import ( + "crypto/tls" + "crypto/x509" + "net/rpc" + "runtime" + + jc "github.com/juju/testing/checkers" + gc "gopkg.in/check.v1" + + "github.com/juju/juju/juju/sockets" + jujutesting "github.com/juju/juju/testing" +) + +type RpcCaller func(string, *string) error + +func (f RpcCaller) TestCall(arg string, reply *string) error { + return f(arg, reply) +} + +type SocketSuite struct { +} + +var _ = gc.Suite(&SocketSuite{}) + +func (s *SocketSuite) TestTCP(c *gc.C) { + socketDesc := sockets.Socket{ + Address: "127.0.0.1:32134", + Network: "tcp", + } + s.testConn(c, socketDesc, socketDesc) +} + +func (s *SocketSuite) TestAbstractDomain(c *gc.C) { + if runtime.GOOS == "windows" { + c.Skip("abstract domain socket not supported on windows") + return + } + socketDesc := sockets.Socket{ + Address: "@hello-juju", + Network: "unix", + } + s.testConn(c, socketDesc, socketDesc) +} + +func (s *SocketSuite) TestTLSOverTCP(c *gc.C) { + roots := x509.NewCertPool() + roots.AddCert(jujutesting.CACertX509) + serverSocketDesc := sockets.Socket{ + Address: "127.0.0.1:32135", + Network: "tcp", + TLSConfig: &tls.Config{ + Certificates: []tls.Certificate{*jujutesting.ServerTLSCert}, + }, + } + clientSocketDesc := sockets.Socket{ + Address: "127.0.0.1:32135", + Network: "tcp", + TLSConfig: &tls.Config{ + RootCAs: roots, + InsecureSkipVerify: true, + }, + } + s.testConn(c, serverSocketDesc, clientSocketDesc) +} + +func (s *SocketSuite) testConn(c *gc.C, serverSocketDesc, clientSocketDesc sockets.Socket) { + l, err := sockets.Listen(serverSocketDesc) + c.Assert(err, jc.ErrorIsNil) + + srv := rpc.Server{} + called := false + err = srv.Register(RpcCaller(func(arg string, reply *string) error { + called = true + *reply = arg + return nil + })) + c.Assert(err, jc.ErrorIsNil) + + go func() { + cconn, err := sockets.Dial(clientSocketDesc) + c.Assert(err, jc.ErrorIsNil) + rep := "" + err = cconn.Call("RpcCaller.TestCall", "hello", &rep) + c.Check(err, jc.ErrorIsNil) + c.Check(rep, gc.Equals, "hello") + err = cconn.Close() + c.Assert(err, jc.ErrorIsNil) + }() + + sconn, err := l.Accept() + c.Assert(err, jc.ErrorIsNil) + srv.ServeConn(sconn) + err = l.Close() + c.Assert(err, jc.ErrorIsNil) + c.Assert(called, jc.IsTrue) +} diff --git a/worker/caasoperator/action.go b/worker/caasoperator/action.go index e356c7278607..9737bd91463b 100644 --- a/worker/caasoperator/action.go +++ b/worker/caasoperator/action.go @@ -11,10 +11,10 @@ import ( "path/filepath" "github.com/juju/errors" + "github.com/juju/utils" utilexec "github.com/juju/utils/exec" - "gopkg.in/yaml.v2" - k8sexec "k8s.io/client-go/util/exec" + "github.com/juju/juju/caas" "github.com/juju/juju/caas/kubernetes/provider" "github.com/juju/juju/caas/kubernetes/provider/exec" "github.com/juju/juju/worker/uniter" @@ -64,10 +64,6 @@ func ensureSymlink( return errors.Trace(err) } -type unitOperatorInfo struct { - OperatorAddress string `yaml:"operator-address"` -} - type workloadPathSpec struct { src, dest string } @@ -88,27 +84,30 @@ func prepare( serviceAddress string, operatorPaths Paths, unitPaths uniter.Paths, + operatorInfo caas.OperatorInfo, stdout io.ReadWriter, stderr io.Writer, cancel <-chan struct{}, ) error { // TODO(caas) - quick check to see if files have already been copied across. // upgrade-charm and upgrade-juju will need to ensure files are up-to-date. - operatorFile := filepath.Join(unitPaths.State.BaseDir, provider.OperatorInfoFile) - if err := client.Exec( + operatorFile := filepath.Join(unitPaths.State.BaseDir, caas.OperatorClientInfoFile) + err := client.Exec( exec.ExecParams{ PodName: podName, - Commands: []string{"test", "-f", operatorFile, "||", "echo notfound"}, + Commands: []string{"test", "-f", operatorFile}, Stdout: stdout, Stderr: stderr, }, cancel, - ); err != nil { + ) + if exitErr, ok := errors.Cause(err).(exec.ExitError); ok { + if exitErr.ExitStatus() != 1 { + return errors.Trace(err) + } + } else if err != nil { return errors.Trace(err) - } - var o bytes.Buffer - _, err := o.ReadFrom(stdout) - if err == nil && len(o.Bytes()) == 0 { + } else { return nil } @@ -139,22 +138,62 @@ func prepare( } } - // Create the operator.yaml file containing the operator service address. + // set up the symlinks to jujud (hook commands and juju-run etc). + jujudPath := filepath.Join(unitPaths.ToolsDir, "jujud") + for _, slk := range jujudSymlinks { + if err := ensureSymlink(client, podName, jujudPath, slk, stdout, stderr, cancel); err != nil { + return errors.Trace(err) + } + } + for _, cmdName := range jujuc.CommandNames() { + slk := filepath.Join(unitPaths.ToolsDir, cmdName) + if err := ensureSymlink(client, podName, jujudPath, slk, stdout, stderr, cancel); err != nil { + return errors.Trace(err) + } + } + + // Ensure unit dir exists for operator-client.yaml and ca.crt file. if err := ensurePath(client, podName, unitPaths.State.BaseDir, stdout, stderr, cancel); err != nil { return errors.Trace(err) } - opInfo := unitOperatorInfo{OperatorAddress: serviceAddress} - data, err := yaml.Marshal(opInfo) + + // Create the ca.crt file containing the cluster's CA cert. + tempCACertFile := filepath.Join(os.TempDir(), caas.CACertFile) + if err := ioutil.WriteFile(tempCACertFile, []byte(operatorInfo.CACert), 0644); err != nil { + return errors.Trace(err) + } + if err := client.Copy(exec.CopyParam{ + Src: exec.FileResource{ + Path: tempCACertFile, + }, + Dest: exec.FileResource{ + Path: filepath.Join(unitPaths.State.BaseDir, caas.CACertFile), + PodName: podName, + }, + }, cancel); err != nil { + return errors.Trace(err) + } + + // Create the operator.yaml file containing the operator service address and token. + token, err := utils.RandomPassword() if err != nil { return errors.Trace(err) } - operatorFileSrc := filepath.Join(os.TempDir(), provider.OperatorInfoFile) - if err := ioutil.WriteFile(operatorFileSrc, data, 0644); err != nil { + clientInfo := caas.OperatorClientInfo{ + ServiceAddress: serviceAddress, + Token: token, + } + data, err := clientInfo.Marshal() + if err != nil { + return errors.Trace(err) + } + operatorCacheFile := filepath.Join(unitPaths.State.BaseDir, caas.OperatorClientInfoCacheFile) + if err := ioutil.WriteFile(operatorCacheFile, data, 0644); err != nil { return errors.Trace(err) } if err := client.Copy(exec.CopyParam{ Src: exec.FileResource{ - Path: operatorFileSrc, + Path: operatorCacheFile, }, Dest: exec.FileResource{ Path: operatorFile, @@ -164,20 +203,6 @@ func prepare( return errors.Trace(err) } - // set up the symlinks to jujud (hook commands and juju-run etc). - jujudPath := filepath.Join(unitPaths.ToolsDir, "jujud") - for _, slk := range jujudSymlinks { - if err := ensureSymlink(client, podName, jujudPath, slk, stdout, stderr, cancel); err != nil { - return errors.Trace(err) - } - } - for _, cmdName := range jujuc.CommandNames() { - slk := filepath.Join(unitPaths.ToolsDir, cmdName) - if err := ensureSymlink(client, podName, jujudPath, slk, stdout, stderr, cancel); err != nil { - return errors.Trace(err) - } - } - return nil } @@ -186,10 +211,10 @@ func prepare( func getNewRunnerExecutor( execClient exec.Executor, operatorPaths Paths, + operatorInfo caas.OperatorInfo, ) uniter.NewRunnerExecutorFunc { return func(providerIDGetter uniter.ProviderIDGetter, unitPaths uniter.Paths) runner.ExecFunc { return func(params runner.ExecParams) (*utilexec.ExecResponse, error) { - if err := providerIDGetter.Refresh(); err != nil { return nil, errors.Trace(err) } @@ -204,7 +229,7 @@ func getNewRunnerExecutor( logger.Debugf("operator service address: %v", serviceAddress) if err := prepare( execClient, podNameOrID, serviceAddress, - operatorPaths, unitPaths, + operatorPaths, unitPaths, operatorInfo, params.Stdout, params.Stderr, params.Cancel, ); err != nil { logger.Errorf("ensuring dirs and syncing files %v", err) @@ -238,7 +263,7 @@ func getNewRunnerExecutor( } exitCode := func(exitErr error) int { if exitErr != nil { - if exitErr, ok := exitErr.(k8sexec.CodeExitError); ok { + if exitErr, ok := exitErr.(exec.ExitError); ok { return exitErr.ExitStatus() } return -1 diff --git a/worker/caasoperator/action_test.go b/worker/caasoperator/action_test.go index b84c6b30169a..c38e144c4bb8 100644 --- a/worker/caasoperator/action_test.go +++ b/worker/caasoperator/action_test.go @@ -12,6 +12,7 @@ import ( "github.com/golang/mock/gomock" "github.com/juju/errors" + "github.com/juju/juju/caas" "github.com/juju/juju/worker/uniter/runner/jujuc" jc "github.com/juju/testing/checkers" "github.com/juju/utils" @@ -68,7 +69,7 @@ func (s *actionSuite) assertRunnerExecFunc(c *gc.C, errMsg string) { baseDir := c.MkDir() operatorPaths := caasoperator.NewPaths(baseDir, names.NewApplicationTag("gitlab-k8s")) - unitPaths := uniter.NewPaths(baseDir, names.NewUnitTag("gitlab-k8s/0"), true) + unitPaths := uniter.NewPaths(baseDir, names.NewUnitTag("gitlab-k8s/0"), &uniter.SocketConfig{}) for _, p := range []string{ operatorPaths.GetCharmDir(), unitPaths.GetCharmDir(), @@ -82,7 +83,7 @@ func (s *actionSuite) assertRunnerExecFunc(c *gc.C, errMsg string) { err := utils.AtomicWriteFile(filepath.Join(operatorPaths.GetToolsDir(), "jujud"), []byte(""), 0600) c.Assert(err, jc.ErrorIsNil) - runnerExecFunc := caasoperator.GetNewRunnerExecutor(s.executor, operatorPaths)(s.unitAPI, unitPaths) + runnerExecFunc := caasoperator.GetNewRunnerExecutor(s.executor, operatorPaths, caas.OperatorInfo{})(s.unitAPI, unitPaths) cancel := make(<-chan struct{}, 1) stdout := bytes.NewBufferString("") @@ -94,13 +95,12 @@ func (s *actionSuite) assertRunnerExecFunc(c *gc.C, errMsg string) { s.executor.EXPECT().Exec( exec.ExecParams{ PodName: "gitlab-xxxx", - Commands: []string{"test", "-f", baseDir + "/agents/unit-gitlab-k8s-0/operator.yaml", "||", "echo notfound"}, + Commands: []string{"test", "-f", baseDir + "/agents/unit-gitlab-k8s-0/operator-client.yaml"}, Stdout: stdout, Stderr: stdout, }, cancel, ).Times(1).DoAndReturn(func(...interface{}) error { - stdout.WriteString("notfound") - return nil + return exitError{code: 1, err: "file not found"} }), s.executor.EXPECT().Exec( @@ -142,7 +142,24 @@ func (s *actionSuite) assertRunnerExecFunc(c *gc.C, errMsg string) { }, }, cancel, ).Times(1).Return(nil), + } + calls = append(calls, + s.executor.EXPECT().Exec(s.symlinkJujudCommand(stdout, baseDir, "/usr/bin/juju-run"), + cancel).Times(1).Return(nil)) + for _, cmdName := range jujuc.CommandNames() { + s.executor.EXPECT().Exec(s.symlinkJujudCommand(stdout, baseDir, baseDir+"/tools/unit-gitlab-k8s-0/"+cmdName), + cancel).Times(1).Return(nil) + } + + expectedCode := 0 + var exitErr error + if errMsg != "" { + exitErr = errors.Trace(k8sexec.CodeExitError{Code: 3, Err: errors.New(errMsg)}) + expectedCode = 3 + } + stderr := bytes.NewBufferString("") + calls = append(calls, s.executor.EXPECT().Exec( exec.ExecParams{ PodName: "gitlab-xxxx", @@ -154,31 +171,25 @@ func (s *actionSuite) assertRunnerExecFunc(c *gc.C, errMsg string) { s.executor.EXPECT().Copy( exec.CopyParam{ Src: exec.FileResource{ - Path: filepath.Join(os.TempDir(), "operator.yaml"), + Path: filepath.Join(os.TempDir(), "ca.crt"), }, Dest: exec.FileResource{ - Path: baseDir + "/agents/unit-gitlab-k8s-0/operator.yaml", + Path: baseDir + "/agents/unit-gitlab-k8s-0/ca.crt", + PodName: "gitlab-xxxx", + }, + }, cancel, + ).Times(1).Return(nil), + s.executor.EXPECT().Copy( + exec.CopyParam{ + Src: exec.FileResource{ + Path: baseDir + "/agents/unit-gitlab-k8s-0/operator-client-cache.yaml", + }, + Dest: exec.FileResource{ + Path: baseDir + "/agents/unit-gitlab-k8s-0/operator-client.yaml", PodName: "gitlab-xxxx", }, }, cancel, ).Times(1).Return(nil), - } - calls = append(calls, - s.executor.EXPECT().Exec(s.symlinkJujudCommand(stdout, baseDir, "/usr/bin/juju-run"), - cancel).Times(1).Return(nil)) - for _, cmdName := range jujuc.CommandNames() { - s.executor.EXPECT().Exec(s.symlinkJujudCommand(stdout, baseDir, baseDir+"/tools/unit-gitlab-k8s-0/"+cmdName), - cancel).Times(1).Return(nil) - } - - expectedCode := 0 - var exitErr error - if errMsg != "" { - exitErr = errors.Trace(k8sexec.CodeExitError{Code: 3, Err: errors.New(errMsg)}) - expectedCode = 3 - } - stderr := bytes.NewBufferString("") - calls = append(calls, s.executor.EXPECT().Exec( exec.ExecParams{ PodName: "gitlab-xxxx", @@ -221,3 +232,22 @@ func (s *actionSuite) assertRunnerExecFunc(c *gc.C, errMsg string) { c.Assert(err, gc.ErrorMatches, "boom") } } + +type exitError struct { + code int + err string +} + +var _ exec.ExitError = exitError{} + +func (e exitError) String() string { + return e.err +} + +func (e exitError) Error() string { + return e.err +} + +func (e exitError) ExitStatus() int { + return e.code +} diff --git a/worker/caasoperator/caasoperator.go b/worker/caasoperator/caasoperator.go index 7442619005f6..a93bb28f6424 100644 --- a/worker/caasoperator/caasoperator.go +++ b/worker/caasoperator/caasoperator.go @@ -22,6 +22,7 @@ import ( "gopkg.in/juju/worker.v1/catacomb" apiuniter "github.com/juju/juju/api/uniter" + "github.com/juju/juju/caas" "github.com/juju/juju/caas/kubernetes/provider" "github.com/juju/juju/core/leadership" "github.com/juju/juju/core/life" @@ -127,7 +128,10 @@ type Config struct { StartUniterFunc func(runner *worker.Runner, params *uniter.UniterParams) error // RunListenerSocketFunc returns a socket used for the juju run listener. - RunListenerSocketFunc func() (*sockets.Socket, error) + RunListenerSocketFunc func(*uniter.SocketConfig) (*sockets.Socket, error) + + // OperatorInfo contains serving information such as Certs and PrivateKeys. + OperatorInfo caas.OperatorInfo } func (config Config) Validate() error { @@ -294,22 +298,18 @@ func toBinaryVersion(vers version.Number) version.Binary { return outVers } -func runListenerSocket() (*sockets.Socket, error) { - podIP := os.Getenv(provider.OperatorPodIPEnvName) - if podIP == "" { - return nil, errors.New("missing pod IP") - } +func runListenerSocket(sc *uniter.SocketConfig) (*sockets.Socket, error) { socket := sockets.Socket{ - Network: "tcp", - Address: fmt.Sprintf("%s:%d", podIP, provider.JujuRunServerSocketPort), + Network: "tcp", + Address: fmt.Sprintf(":%d", provider.JujuRunServerSocketPort), + TLSConfig: sc.TLSConfig, } return &socket, nil } func (op *caasOperator) init() (*LocalState, error) { - - // Set up a single juju run listener to be used by all units. - socket, err := op.config.RunListenerSocketFunc() + // Set up a single remote juju run listener to be used by all units. + socket, err := op.config.RunListenerSocketFunc(op.config.UniterParams.SocketConfig) if err != nil { return nil, errors.Annotate(err, "creating juju run socket") } diff --git a/worker/caasoperator/caasoperator_test.go b/worker/caasoperator/caasoperator_test.go index 677ce8d0820e..a7c89b215cd5 100644 --- a/worker/caasoperator/caasoperator_test.go +++ b/worker/caasoperator/caasoperator_test.go @@ -56,7 +56,7 @@ type WorkerSuite struct { uniterParams *uniter.UniterParams leadershipTrackerFunc func(unitTag names.UnitTag) leadership.TrackerWorker uniterFacadeFunc func(unitTag names.UnitTag) *apiuniter.State - runListenerSocketFunc func() (*sockets.Socket, error) + runListenerSocketFunc func(*uniter.SocketConfig) (*sockets.Socket, error) } var _ = gc.Suite(&WorkerSuite{}) @@ -105,7 +105,7 @@ func (s *WorkerSuite) SetUpTest(c *gc.C) { s.uniterFacadeFunc = func(unitTag names.UnitTag) *apiuniter.State { return &apiuniter.State{} } - s.runListenerSocketFunc = func() (*sockets.Socket, error) { + s.runListenerSocketFunc = func(*uniter.SocketConfig) (*sockets.Socket, error) { socket := sockPath(c) return &socket, nil } diff --git a/worker/caasoperator/manifold.go b/worker/caasoperator/manifold.go index b08aad717748..4370349f94b0 100644 --- a/worker/caasoperator/manifold.go +++ b/worker/caasoperator/manifold.go @@ -4,10 +4,16 @@ package caasoperator import ( + "crypto/tls" + "encoding/pem" + "io/ioutil" + "os" + "path" "time" "github.com/juju/clock" "github.com/juju/errors" + "github.com/juju/utils" "gopkg.in/juju/names.v3" "gopkg.in/juju/worker.v1" "gopkg.in/juju/worker.v1/dependency" @@ -17,6 +23,8 @@ import ( apileadership "github.com/juju/juju/api/leadership" apiuniter "github.com/juju/juju/api/uniter" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/caas" + "github.com/juju/juju/caas/kubernetes/provider" "github.com/juju/juju/caas/kubernetes/provider/exec" coreleadership "github.com/juju/juju/core/leadership" "github.com/juju/juju/core/machinelock" @@ -44,8 +52,10 @@ type ManifoldConfig struct { NewClient func(base.APICaller) Client NewCharmDownloader func(base.APICaller) Downloader - NewExecClient func(modelName string) (exec.Executor, error) - RunListenerSocket func() (*sockets.Socket, error) + NewExecClient func(namespace string) (exec.Executor, error) + RunListenerSocket func(*uniter.SocketConfig) (*sockets.Socket, error) + + LoadOperatorInfo func(paths Paths) (*caas.OperatorInfo, error) } func (config ManifoldConfig) Validate() error { @@ -168,21 +178,30 @@ func Manifold(config ManifoldConfig) dependency.Manifold { VersionSetter: client, StartUniterFunc: uniter.StartUniter, RunListenerSocketFunc: runListenerSocketFunc, - LeadershipTrackerFunc: leadershipTrackerFunc, UniterFacadeFunc: newUniterFunc, } - execClient, err := config.NewExecClient(model.Name) + execClient, err := config.NewExecClient(os.Getenv(provider.OperatorNamespaceEnvName)) if err != nil { return nil, errors.Trace(err) } + loadOperatorInfoFunc := config.LoadOperatorInfo + if loadOperatorInfoFunc == nil { + loadOperatorInfoFunc = loadOperatorInfo + } + operatorInfo, err := loadOperatorInfoFunc(wCfg.getPaths()) + if err != nil { + return nil, errors.Trace(err) + } + wCfg.OperatorInfo = *operatorInfo wCfg.UniterParams = &uniter.UniterParams{ NewOperationExecutor: operation.NewExecutor, NewRemoteRunnerExecutor: getNewRunnerExecutor( execClient, wCfg.getPaths(), + *operatorInfo, ), DataDir: agentConfig.DataDir(), Clock: clock, @@ -192,6 +211,10 @@ func Manifold(config ManifoldConfig) dependency.Manifold { HookRetryStrategy: hookRetryStrategy, TranslateResolverErr: config.TranslateResolverErr, } + wCfg.UniterParams.SocketConfig, err = socketConfig(operatorInfo) + if err != nil { + return nil, errors.Trace(err) + } w, err := config.NewWorker(wCfg) if err != nil { @@ -201,3 +224,41 @@ func Manifold(config ManifoldConfig) dependency.Manifold { }, } } + +func socketConfig(info *caas.OperatorInfo) (*uniter.SocketConfig, error) { + tlsCert, err := tls.X509KeyPair([]byte(info.Cert), []byte(info.PrivateKey)) + if err != nil { + return nil, errors.Annotatef(err, "cannot parse operator TLS certificate") + } + + block, _ := pem.Decode([]byte(info.CACert)) + tlsCert.Certificate = append(tlsCert.Certificate, block.Bytes) + tlsConfig := utils.SecureTLSConfig() + tlsConfig.Certificates = []tls.Certificate{tlsCert} + + serviceAddress := os.Getenv(provider.OperatorServiceIPEnvName) + if serviceAddress == "" { + return nil, errors.Errorf("env %s missing", provider.OperatorServiceIPEnvName) + } + + operatorAddress := os.Getenv(provider.OperatorPodIPEnvName) + if operatorAddress == "" { + return nil, errors.Errorf("env %s missing", provider.OperatorPodIPEnvName) + } + + sc := &uniter.SocketConfig{ + ServiceAddress: serviceAddress, + OperatorAddress: operatorAddress, + TLSConfig: tlsConfig, + } + return sc, nil +} + +func loadOperatorInfo(paths Paths) (*caas.OperatorInfo, error) { + filepath := path.Join(paths.State.BaseDir, caas.OperatorInfoFile) + data, err := ioutil.ReadFile(filepath) + if err != nil { + return nil, errors.Annotatef(err, "reading operator info file %s", filepath) + } + return caas.UnmarshalOperatorInfo(data) +} diff --git a/worker/caasoperator/manifold_test.go b/worker/caasoperator/manifold_test.go index e653b22e3e9e..4e355fcc78e7 100644 --- a/worker/caasoperator/manifold_test.go +++ b/worker/caasoperator/manifold_test.go @@ -4,6 +4,7 @@ package caasoperator_test import ( + "os" "sync" "time" @@ -21,6 +22,7 @@ import ( "github.com/juju/juju/api/base" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/caas" "github.com/juju/juju/caas/kubernetes/provider/exec" "github.com/juju/juju/core/machinelock" coretesting "github.com/juju/juju/testing" @@ -48,6 +50,9 @@ var _ = gc.Suite(&ManifoldSuite{}) func (s *ManifoldSuite) SetUpTest(c *gc.C) { s.IsolationSuite.SetUpTest(c) + os.Setenv("JUJU_OPERATOR_SERVICE_IP", "127.0.0.1") + os.Setenv("JUJU_OPERATOR_POD_IP", "127.0.0.2") + s.dataDir = c.MkDir() s.agent = fakeAgent{ config: fakeAgentConfig{ @@ -61,6 +66,13 @@ func (s *ManifoldSuite) SetUpTest(c *gc.C) { s.context = s.newContext(nil) } +func (s *ManifoldSuite) TearDownTest(c *gc.C) { + os.Setenv("JUJU_OPERATOR_SERVICE_IP", "") + os.Setenv("JUJU_OPERATOR_POD_IP", "") + + s.IsolationSuite.TearDownTest(c) +} + func (s *ManifoldSuite) setupManifold(c *gc.C) *gomock.Controller { ctrl := gomock.NewController(c) s.manifold = caasoperator.Manifold(caasoperator.ManifoldConfig{ @@ -77,6 +89,13 @@ func (s *ManifoldSuite) setupManifold(c *gc.C) *gomock.Controller { NewExecClient: func(modelName string) (exec.Executor, error) { return mocks.NewMockExecutor(ctrl), nil }, + LoadOperatorInfo: func(paths caasoperator.Paths) (*caas.OperatorInfo, error) { + return &caas.OperatorInfo{ + CACert: coretesting.CACert, + Cert: coretesting.ServerCert, + PrivateKey: coretesting.ServerKey, + }, nil + }, }) return ctrl } @@ -167,6 +186,9 @@ func (s *ManifoldSuite) TestStart(c *gc.C) { config.UniterParams.NewOperationExecutor = nil config.UniterParams.NewRemoteRunnerExecutor = nil + c.Assert(config.UniterParams.SocketConfig.TLSConfig, gc.NotNil) + config.UniterParams.SocketConfig.TLSConfig = nil + c.Assert(config, jc.DeepEquals, caasoperator.Config{ ModelUUID: coretesting.ModelTag.Id(), ModelName: "gitlab-model", @@ -186,6 +208,15 @@ func (s *ManifoldSuite) TestStart(c *gc.C) { MachineLock: &fakemachinelock{}, CharmDirGuard: &mockCharmDirGuard{}, Clock: s.clock, + SocketConfig: &uniter.SocketConfig{ + ServiceAddress: "127.0.0.1", + OperatorAddress: "127.0.0.2", + }, + }, + OperatorInfo: caas.OperatorInfo{ + CACert: coretesting.CACert, + Cert: coretesting.ServerCert, + PrivateKey: coretesting.ServerKey, }, }) } diff --git a/worker/caasoperator/paths.go b/worker/caasoperator/paths.go index 06491145a13d..8bd523b078de 100644 --- a/worker/caasoperator/paths.go +++ b/worker/caasoperator/paths.go @@ -36,6 +36,11 @@ func (paths Paths) GetToolsDir() string { return paths.ToolsDir } +// GetBaseDir exists to satisfy the context.Paths interface. +func (paths Paths) GetBaseDir() string { + return paths.State.BaseDir +} + // GetCharmDir exists to satisfy the context.Paths interface. func (paths Paths) GetCharmDir() string { return paths.State.CharmDir diff --git a/worker/caasoperatorprovisioner/mock_test.go b/worker/caasoperatorprovisioner/mock_test.go index cc394acb45a6..28b3b27c5912 100644 --- a/worker/caasoperatorprovisioner/mock_test.go +++ b/worker/caasoperatorprovisioner/mock_test.go @@ -6,6 +6,7 @@ package caasoperatorprovisioner_test import ( "sync" + "github.com/juju/errors" "github.com/juju/juju/storage" "github.com/juju/testing" "github.com/juju/version" @@ -70,6 +71,20 @@ func (m *mockProvisionerFacade) OperatorProvisioningInfo() (apicaasprovisioner.O }, nil } +func (m *mockProvisionerFacade) IssueOperatorCertificate(string) (apicaasprovisioner.OperatorCertificate, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.stub.MethodCall(m, "IssueOperatorCertificate") + if err := m.stub.NextErr(); err != nil { + return apicaasprovisioner.OperatorCertificate{}, err + } + return apicaasprovisioner.OperatorCertificate{ + CACert: coretesting.CACert, + Cert: coretesting.ServerCert, + PrivateKey: coretesting.ServerKey, + }, nil +} + func (m *mockProvisionerFacade) Life(entityName string) (life.Value, error) { m.mu.Lock() defer m.mu.Unlock() @@ -123,6 +138,7 @@ type mockBroker struct { mu sync.Mutex terminating bool operatorExists bool + config *caas.OperatorConfig } func (m *mockBroker) setTerminating(terminating bool) { @@ -156,6 +172,23 @@ func (m *mockBroker) DeleteOperator(appName string) error { return m.NextErr() } +func (m *mockBroker) Operator(appName string) (*caas.Operator, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.MethodCall(m, "Operator", appName) + err := m.NextErr() + if err != nil { + return nil, err + } + if m.operatorExists == false { + return nil, errors.NotFoundf("operator %s", appName) + } + return &caas.Operator{ + Dying: m.terminating, + Config: m.config, + }, nil +} + type mockWatcher struct { testing.Stub tomb.Tomb diff --git a/worker/caasoperatorprovisioner/worker.go b/worker/caasoperatorprovisioner/worker.go index 727755c422ac..c267793e5e4a 100644 --- a/worker/caasoperatorprovisioner/worker.go +++ b/worker/caasoperatorprovisioner/worker.go @@ -17,6 +17,7 @@ import ( "gopkg.in/juju/worker.v1/catacomb" "github.com/juju/juju/agent" + "github.com/juju/juju/api/caasoperatorprovisioner" apicaasprovisioner "github.com/juju/juju/api/caasoperatorprovisioner" "github.com/juju/juju/apiserver/params" "github.com/juju/juju/caas" @@ -34,6 +35,7 @@ type CAASProvisionerFacade interface { WatchApplications() (watcher.StringsWatcher, error) SetPasswords([]apicaasprovisioner.ApplicationPassword) (params.ErrorResults, error) Life(string) (life.Value, error) + IssueOperatorCertificate(string) (apicaasprovisioner.OperatorCertificate, error) } // Config defines the operation of a Worker. @@ -176,6 +178,12 @@ func (p *provisioner) ensureOperators(apps []string) error { } opState.Exists = false } + + op, err := p.broker.Operator(app) + if err != nil && !errors.IsNotFound(err) { + return errors.Trace(err) + } + // If the operator does not exist already, we need to create an initial // password for it. var password string @@ -186,7 +194,11 @@ func (p *provisioner) ensureOperators(apps []string) error { appPasswords = append(appPasswords, apicaasprovisioner.ApplicationPassword{Name: app, Password: password}) } - config, err := p.makeOperatorConfig(app, password) + var prevCfg caas.OperatorConfig + if op != nil && op.Config != nil { + prevCfg = *op.Config + } + config, err := p.updateOperatorConfig(app, password, prevCfg) if err != nil { return errors.Annotatef(err, "failed to generate operator config for %q", app) } @@ -228,8 +240,7 @@ func (p *provisioner) ensureOperator(app string, config *caas.OperatorConfig) er return nil } -func (p *provisioner) makeOperatorConfig(appName, password string) (*caas.OperatorConfig, error) { - appTag := names.NewApplicationTag(appName) +func (p *provisioner) updateOperatorConfig(appName, password string, prevCfg caas.OperatorConfig) (*caas.OperatorConfig, error) { info, err := p.provisionerFacade.OperatorProvisioningInfo() if err != nil { return nil, errors.Trace(err) @@ -252,11 +263,28 @@ func (p *provisioner) makeOperatorConfig(appName, password string) (*caas.Operat ResourceTags: info.Tags, CharmStorage: charmStorageParams(info.CharmStorage), } - // If no password required, we leave the agent conf empty. - if password == "" { - return cfg, nil + + cfg.AgentConf, err = p.updateAgentConf(appName, password, info, prevCfg.AgentConf) + if err != nil { + return nil, errors.Trace(err) + } + + cfg.OperatorInfo, err = p.updateOperatorInfo(appName, prevCfg.OperatorInfo) + if err != nil { + return nil, errors.Trace(err) } + return cfg, nil +} + +func (p *provisioner) updateAgentConf(appName, password string, + info caasoperatorprovisioner.OperatorProvisioningInfo, + prevAgentConfData []byte) ([]byte, error) { + if prevAgentConfData != nil && password == "" { + return prevAgentConfData, nil + } + + appTag := names.NewApplicationTag(appName) conf, err := agent.NewAgentConfig( agent.AgentConfigParams{ Paths: agent.Paths{ @@ -270,8 +298,8 @@ func (p *provisioner) makeOperatorConfig(appName, password string) (*caas.Operat CACert: p.agentConfig.CACert(), Password: password, - // UpgradedToVersion is mandatory but not used by caas operator agents as they - // are not upgraded insitu. + // UpgradedToVersion is mandatory but not used by + // caas operator agents as they are not upgraded insitu. UpgradedToVersion: info.Version, }, ) @@ -279,12 +307,32 @@ func (p *provisioner) makeOperatorConfig(appName, password string) (*caas.Operat return nil, errors.Trace(err) } - confBytes, err := conf.Render() - if err != nil { - return nil, errors.Trace(err) + return conf.Render() +} + +func (p *provisioner) updateOperatorInfo(appName string, prevOperatorInfoData []byte) ([]byte, error) { + var operatorInfo caas.OperatorInfo + if prevOperatorInfoData != nil { + prevOperatorInfo, err := caas.UnmarshalOperatorInfo(prevOperatorInfoData) + if err != nil { + return nil, errors.Trace(err) + } + operatorInfo = *prevOperatorInfo } - cfg.AgentConf = confBytes - return cfg, nil + + if operatorInfo.Cert == "" || + operatorInfo.PrivateKey == "" || + operatorInfo.CACert == "" { + cert, err := p.provisionerFacade.IssueOperatorCertificate(appName) + if err != nil { + return nil, errors.Trace(err) + } + operatorInfo.Cert = cert.Cert + operatorInfo.PrivateKey = cert.PrivateKey + operatorInfo.CACert = cert.CACert + } + + return operatorInfo.Marshal() } func charmStorageParams(in storage.KubernetesFilesystemParams) caas.CharmStorageParams { diff --git a/worker/caasoperatorprovisioner/worker_test.go b/worker/caasoperatorprovisioner/worker_test.go index 1ef4cb47e982..466ca1c8f343 100644 --- a/worker/caasoperatorprovisioner/worker_test.go +++ b/worker/caasoperatorprovisioner/worker_test.go @@ -4,9 +4,11 @@ package caasoperatorprovisioner_test import ( + "fmt" "io/ioutil" "path/filepath" "reflect" + "strconv" "time" "github.com/juju/clock/testclock" @@ -87,14 +89,14 @@ func (s *CAASProvisionerSuite) TestWorkerStarts(c *gc.C) { workertest.CleanKill(c, w) } -func (s *CAASProvisionerSuite) assertOperatorCreated(c *gc.C, exists, terminating bool) { +func (s *CAASProvisionerSuite) assertOperatorCreated(c *gc.C, exists, terminating, updateCerts bool) { s.caasClient.setTerminating(terminating) s.provisionerFacade.life = "alive" s.provisionerFacade.applicationsWatcher.changes <- []string{"myapp"} - expectedCalls := 2 + expectedCalls := 3 if terminating { - expectedCalls = 4 + expectedCalls = 5 } for a := coretesting.LongAttempt.Start(); a.Next(); { nrCalls := len(s.caasClient.Calls()) @@ -107,9 +109,9 @@ func (s *CAASProvisionerSuite) assertOperatorCreated(c *gc.C, exists, terminatin s.clock.Advance(4 * time.Second) } } - callNames := []string{"OperatorExists", "EnsureOperator"} + callNames := []string{"OperatorExists", "Operator", "EnsureOperator"} if terminating { - callNames = []string{"OperatorExists", "OperatorExists", "OperatorExists", "EnsureOperator"} + callNames = []string{"OperatorExists", "OperatorExists", "OperatorExists", "Operator", "EnsureOperator"} } s.caasClient.CheckCallNames(c, callNames...) c.Assert(s.caasClient.Calls(), gc.HasLen, expectedCalls) @@ -118,9 +120,9 @@ func (s *CAASProvisionerSuite) assertOperatorCreated(c *gc.C, exists, terminatin c.Assert(args, gc.HasLen, 1) c.Assert(args[0], gc.Equals, "myapp") - ensureIndex := 1 + ensureIndex := 2 if terminating { - ensureIndex = 3 + ensureIndex = 4 } args = s.caasClient.Calls()[ensureIndex].Args c.Assert(args, gc.HasLen, 3) @@ -138,19 +140,21 @@ func (s *CAASProvisionerSuite) assertOperatorCreated(c *gc.C, exists, terminatin Attributes: map[string]interface{}{"key": "value"}, }) - if !exists || terminating { - agentFile := filepath.Join(c.MkDir(), "agent.config") - err := ioutil.WriteFile(agentFile, config.AgentConf, 0644) - c.Assert(err, jc.ErrorIsNil) - cfg, err := agent.ReadConfig(agentFile) - c.Assert(err, jc.ErrorIsNil) - c.Assert(cfg.CACert(), gc.Equals, coretesting.CACert) - addr, err := cfg.APIAddresses() - c.Assert(err, jc.ErrorIsNil) - c.Assert(addr, jc.DeepEquals, []string{"10.0.0.1:17070", "192.18.1.1:17070"}) - } else { - c.Assert(config.AgentConf, gc.IsNil) - } + agentFile := filepath.Join(c.MkDir(), "agent.config") + err := ioutil.WriteFile(agentFile, config.AgentConf, 0644) + c.Assert(err, jc.ErrorIsNil) + cfg, err := agent.ReadConfig(agentFile) + c.Assert(err, jc.ErrorIsNil) + c.Assert(cfg.CACert(), gc.Equals, coretesting.CACert) + addr, err := cfg.APIAddresses() + c.Assert(err, jc.ErrorIsNil) + c.Assert(addr, jc.DeepEquals, []string{"10.0.0.1:17070", "192.18.1.1:17070"}) + + operatorInfo, err := caas.UnmarshalOperatorInfo(config.OperatorInfo) + c.Assert(err, jc.ErrorIsNil) + c.Assert(operatorInfo.CACert, gc.Equals, coretesting.CACert) + c.Assert(operatorInfo.Cert, gc.Equals, coretesting.ServerCert) + c.Assert(operatorInfo.PrivateKey, gc.Equals, coretesting.ServerKey) for a := coretesting.LongAttempt.Start(); a.Next(); { if len(s.provisionerFacade.stub.Calls()) > 0 { @@ -159,14 +163,18 @@ func (s *CAASProvisionerSuite) assertOperatorCreated(c *gc.C, exists, terminatin } if exists && !terminating { - s.provisionerFacade.stub.CheckCallNames(c, "Life", "OperatorProvisioningInfo") + callNames := []string{"Life", "OperatorProvisioningInfo"} + if updateCerts { + callNames = append(callNames, "IssueOperatorCertificate") + } + s.provisionerFacade.stub.CheckCallNames(c, callNames...) c.Assert(s.provisionerFacade.stub.Calls()[0].Args[0], gc.Equals, "myapp") return } - s.provisionerFacade.stub.CheckCallNames(c, "Life", "OperatorProvisioningInfo", "SetPasswords") + s.provisionerFacade.stub.CheckCallNames(c, "Life", "OperatorProvisioningInfo", "IssueOperatorCertificate", "SetPasswords") c.Assert(s.provisionerFacade.stub.Calls()[0].Args[0], gc.Equals, "myapp") - passwords := s.provisionerFacade.stub.Calls()[2].Args[0].([]apicaasprovisioner.ApplicationPassword) + passwords := s.provisionerFacade.stub.Calls()[3].Args[0].([]apicaasprovisioner.ApplicationPassword) c.Assert(passwords, gc.HasLen, 1) c.Assert(passwords[0].Name, gc.Equals, "myapp") @@ -177,15 +185,71 @@ func (s *CAASProvisionerSuite) TestNewApplicationCreatesNewOperator(c *gc.C) { w := s.assertWorker(c) defer workertest.CleanKill(c, w) - s.assertOperatorCreated(c, false, false) + s.assertOperatorCreated(c, false, false, false) } func (s *CAASProvisionerSuite) TestNewApplicationUpdatesOperator(c *gc.C) { s.caasClient.operatorExists = true + s.caasClient.config = &caas.OperatorConfig{ + OperatorImagePath: "juju-operator-image", + Version: version.MustParse("2.99.0"), + AgentConf: []byte(fmt.Sprintf(` +# format 2.0 +tag: application-myapp +upgradedToVersion: 2.99.0 +controller: controller-deadbeef-1bad-500d-9000-4b1d0d06f00d +model: model-deadbeef-0bad-400d-8000-4b1d0d06f00d +oldpassword: wow +cacert: %s +apiaddresses: +- 10.0.0.1:17070 +- 192.18.1.1:17070 +oldpassword: dxKwhgZPrNzXVTrZSxY1VLHA +values: {} +mongoversion: "0.0" +`[1:], strconv.Quote(coretesting.CACert))), + OperatorInfo: []byte( + fmt.Sprintf( + "private-key: %s\ncert: %s\nca-cert: %s\n", + strconv.Quote(coretesting.ServerKey), + strconv.Quote(coretesting.ServerCert), + strconv.Quote(coretesting.CACert), + ), + ), + } + + w := s.assertWorker(c) + defer workertest.CleanKill(c, w) + + s.assertOperatorCreated(c, true, false, false) +} + +func (s *CAASProvisionerSuite) TestNewApplicationUpdatesOperatorAndIssueCerts(c *gc.C) { + s.caasClient.operatorExists = true + s.caasClient.config = &caas.OperatorConfig{ + OperatorImagePath: "juju-operator-image", + Version: version.MustParse("2.99.0"), + AgentConf: []byte(fmt.Sprintf(` +# format 2.0 +tag: application-myapp +upgradedToVersion: 2.99.0 +controller: controller-deadbeef-1bad-500d-9000-4b1d0d06f00d +model: model-deadbeef-0bad-400d-8000-4b1d0d06f00d +oldpassword: wow +cacert: %s +apiaddresses: +- 10.0.0.1:17070 +- 192.18.1.1:17070 +oldpassword: dxKwhgZPrNzXVTrZSxY1VLHA +values: {} +mongoversion: "0.0" +`[1:], strconv.Quote(coretesting.CACert))), + } + w := s.assertWorker(c) defer workertest.CleanKill(c, w) - s.assertOperatorCreated(c, true, false) + s.assertOperatorCreated(c, true, false, true) } func (s *CAASProvisionerSuite) TestNewApplicationWaitsOperatorTerminated(c *gc.C) { @@ -193,14 +257,14 @@ func (s *CAASProvisionerSuite) TestNewApplicationWaitsOperatorTerminated(c *gc.C w := s.assertWorker(c) defer workertest.CleanKill(c, w) - s.assertOperatorCreated(c, true, true) + s.assertOperatorCreated(c, true, true, false) } func (s *CAASProvisionerSuite) TestApplicationDeletedRemovesOperator(c *gc.C) { w := s.assertWorker(c) defer workertest.CleanKill(c, w) - s.assertOperatorCreated(c, false, false) + s.assertOperatorCreated(c, false, false, false) s.caasClient.ResetCalls() s.provisionerFacade.stub.SetErrors(errors.NotFoundf("myapp")) s.provisionerFacade.life = "dead" diff --git a/worker/meterstatus/context.go b/worker/meterstatus/context.go index 24ac53a29df6..044bc44a9bfb 100644 --- a/worker/meterstatus/context.go +++ b/worker/meterstatus/context.go @@ -6,10 +6,12 @@ package meterstatus import ( "fmt" "math/rand" + "path" "time" "github.com/juju/errors" + "github.com/juju/juju/caas" "github.com/juju/juju/core/model" "github.com/juju/juju/worker/uniter/runner/context" "github.com/juju/juju/worker/uniter/runner/jujuc" @@ -33,15 +35,20 @@ func NewLimitedContext(unitName string) *limitedContext { } // HookVars implements runner.Context. -func (ctx *limitedContext) HookVars(paths context.Paths) ([]string, error) { +func (ctx *limitedContext) HookVars(paths context.Paths, remote bool) ([]string, error) { vars := []string{ "CHARM_DIR=" + paths.GetCharmDir(), // legacy "JUJU_CHARM_DIR=" + paths.GetCharmDir(), "JUJU_CONTEXT_ID=" + ctx.id, - "JUJU_AGENT_SOCKET_ADDRESS=" + paths.GetJujucSocket().Address, - "JUJU_AGENT_SOCKET_NETWORK=" + paths.GetJujucSocket().Network, + "JUJU_AGENT_SOCKET_ADDRESS=" + paths.GetJujucClientSocket(remote).Address, + "JUJU_AGENT_SOCKET_NETWORK=" + paths.GetJujucClientSocket(remote).Network, "JUJU_UNIT_NAME=" + ctx.unitName, } + if remote { + vars = append(vars, + "JUJU_AGENT_CA_CERT="+path.Join(paths.GetBaseDir(), caas.CACertFile), + ) + } for key, val := range ctx.env { vars = append(vars, fmt.Sprintf("%s=%s", key, val)) } diff --git a/worker/meterstatus/context_test.go b/worker/meterstatus/context_test.go index 1bf5090a62aa..d6e459f35d03 100644 --- a/worker/meterstatus/context_test.go +++ b/worker/meterstatus/context_test.go @@ -22,7 +22,11 @@ type dummyPaths struct{} func (*dummyPaths) GetToolsDir() string { return "/dummy/tools" } func (*dummyPaths) GetCharmDir() string { return "/dummy/charm" } -func (*dummyPaths) GetJujucSocket() sockets.Socket { +func (*dummyPaths) GetBaseDir() string { return "/dummy/" } +func (*dummyPaths) GetJujucServerSocket(remote bool) sockets.Socket { + return sockets.Socket{Network: "unix", Address: "/dummy/jujuc.sock"} +} +func (*dummyPaths) GetJujucClientSocket(remote bool) sockets.Socket { return sockets.Socket{Network: "unix", Address: "/dummy/jujuc.sock"} } func (*dummyPaths) GetMetricsSpoolDir() string { return "/dummy/spool" } @@ -31,7 +35,7 @@ func (*dummyPaths) ComponentDir(name string) string { return "/dummy/" + name } func (s *ContextSuite) TestHookContextEnv(c *gc.C) { ctx := meterstatus.NewLimitedContext("u/0") paths := &dummyPaths{} - vars, err := ctx.HookVars(paths) + vars, err := ctx.HookVars(paths, false) c.Assert(err, jc.ErrorIsNil) varMap, err := keyvalues.Parse(vars, true) c.Assert(err, jc.ErrorIsNil) @@ -54,7 +58,7 @@ func (s *ContextSuite) TestHookContextSetEnv(c *gc.C) { } ctx.SetEnvVars(setVars) paths := &dummyPaths{} - vars, err := ctx.HookVars(paths) + vars, err := ctx.HookVars(paths, false) c.Assert(err, jc.ErrorIsNil) varMap, err := keyvalues.Parse(vars, true) c.Assert(err, jc.ErrorIsNil) diff --git a/worker/meterstatus/runner.go b/worker/meterstatus/runner.go index a1f4a18a838e..d1d78c6c51e6 100644 --- a/worker/meterstatus/runner.go +++ b/worker/meterstatus/runner.go @@ -11,7 +11,6 @@ import ( "github.com/juju/juju/agent" "github.com/juju/juju/core/machinelock" - "github.com/juju/juju/core/model" "github.com/juju/juju/worker/uniter" "github.com/juju/juju/worker/uniter/runner" ) @@ -60,7 +59,7 @@ func (w *hookRunner) RunHook(code, info string, interrupt <-chan struct{}) (runE "JUJU_METER_STATUS": code, "JUJU_METER_INFO": info, }) - paths := uniter.NewPaths(w.config.DataDir(), unitTag, ctx.ModelType() == model.CAAS) + paths := uniter.NewPaths(w.config.DataDir(), unitTag, nil) r := runner.NewRunner(ctx, paths, nil) releaser, err := w.acquireExecutionLock(string(hooks.MeterStatusChanged), interrupt) if err != nil { diff --git a/worker/metrics/collect/context.go b/worker/metrics/collect/context.go index b9a9b5aad37b..0a99529b4fe6 100644 --- a/worker/metrics/collect/context.go +++ b/worker/metrics/collect/context.go @@ -6,10 +6,12 @@ package collect import ( "fmt" "math/rand" + "path" "time" "github.com/juju/errors" + "github.com/juju/juju/caas" "github.com/juju/juju/core/model" "github.com/juju/juju/worker/metrics/spool" "github.com/juju/juju/worker/uniter/runner/context" @@ -31,15 +33,20 @@ func newHookContext(unitName string, recorder spool.MetricRecorder) *hookContext } // HookVars implements runner.Context. -func (ctx *hookContext) HookVars(paths context.Paths) ([]string, error) { +func (ctx *hookContext) HookVars(paths context.Paths, remote bool) ([]string, error) { vars := []string{ "CHARM_DIR=" + paths.GetCharmDir(), // legacy "JUJU_CHARM_DIR=" + paths.GetCharmDir(), "JUJU_CONTEXT_ID=" + ctx.id, - "JUJU_AGENT_SOCKET_ADDRESS=" + paths.GetJujucSocket().Address, - "JUJU_AGENT_SOCKET_NETWORK=" + paths.GetJujucSocket().Network, + "JUJU_AGENT_SOCKET_ADDRESS=" + paths.GetJujucClientSocket(remote).Address, + "JUJU_AGENT_SOCKET_NETWORK=" + paths.GetJujucClientSocket(remote).Network, "JUJU_UNIT_NAME=" + ctx.unitName, } + if remote { + vars = append(vars, + "JUJU_AGENT_CA_CERT="+path.Join(paths.GetBaseDir(), caas.CACertFile), + ) + } return append(vars, context.OSDependentEnvVars(paths)...), nil } diff --git a/worker/metrics/collect/context_test.go b/worker/metrics/collect/context_test.go index 0471139dae57..6c59ba6b525f 100644 --- a/worker/metrics/collect/context_test.go +++ b/worker/metrics/collect/context_test.go @@ -52,7 +52,11 @@ type dummyPaths struct{} func (*dummyPaths) GetToolsDir() string { return "/dummy/tools" } func (*dummyPaths) GetCharmDir() string { return "/dummy/charm" } -func (*dummyPaths) GetJujucSocket() sockets.Socket { +func (*dummyPaths) GetBaseDir() string { return "/dummy/" } +func (*dummyPaths) GetJujucServerSocket(remote bool) sockets.Socket { + return sockets.Socket{Network: "unix", Address: "/dummy/jujuc.sock"} +} +func (*dummyPaths) GetJujucClientSocket(remote bool) sockets.Socket { return sockets.Socket{Network: "unix", Address: "/dummy/jujuc.sock"} } func (*dummyPaths) GetMetricsSpoolDir() string { return "/dummy/spool" } @@ -61,7 +65,7 @@ func (*dummyPaths) ComponentDir(name string) string { return "/dummy/" + name } func (s *ContextSuite) TestHookContextEnv(c *gc.C) { ctx := collect.NewHookContext("u/0", s.recorder) paths := &dummyPaths{} - vars, err := ctx.HookVars(paths) + vars, err := ctx.HookVars(paths, false) c.Assert(err, jc.ErrorIsNil) varMap, err := keyvalues.Parse(vars, true) c.Assert(err, jc.ErrorIsNil) diff --git a/worker/metrics/collect/handler.go b/worker/metrics/collect/handler.go index 51c45995cb5b..44d2c65a46bc 100644 --- a/worker/metrics/collect/handler.go +++ b/worker/metrics/collect/handler.go @@ -57,7 +57,7 @@ func (l *handler) Handle(c net.Conn, abort <-chan struct{}) error { } func (l *handler) do(c net.Conn) error { - paths := uniter.NewWorkerPaths(l.config.agent.CurrentConfig().DataDir(), l.config.unitTag, "metrics-collect", false) + paths := uniter.NewWorkerPaths(l.config.agent.CurrentConfig().DataDir(), l.config.unitTag, "metrics-collect", nil) charmURL, validMetrics, err := readCharm(l.config.unitTag, paths) if err != nil { return errors.Trace(err) diff --git a/worker/metrics/collect/manifold.go b/worker/metrics/collect/manifold.go index 7b35fcf3c59c..3c421d026443 100644 --- a/worker/metrics/collect/manifold.go +++ b/worker/metrics/collect/manifold.go @@ -153,7 +153,7 @@ func newCollect(config ManifoldConfig, context dependency.Context) (*collect, er if !ok { return nil, errors.Errorf("expected a unit tag, got %v", tag) } - paths := uniter.NewWorkerPaths(agentConfig.DataDir(), unitTag, "metrics-collect", false) + paths := uniter.NewWorkerPaths(agentConfig.DataDir(), unitTag, "metrics-collect", nil) runner := &hookRunner{ unitTag: unitTag.String(), paths: paths, @@ -222,7 +222,7 @@ func (w *collect) Do(stop <-chan struct{}) (err error) { if !ok { return errors.Errorf("expected a unit tag, got %v", tag) } - paths := uniter.NewWorkerPaths(config.DataDir(), unitTag, "metrics-collect", false) + paths := uniter.NewWorkerPaths(config.DataDir(), unitTag, "metrics-collect", nil) recorder, err := newRecorder(unitTag, paths, w.metricFactory) if errors.Cause(err) == errMetricsNotDefined { diff --git a/worker/metrics/sender/manifold.go b/worker/metrics/sender/manifold.go index b3d93253f678..1519129758ef 100644 --- a/worker/metrics/sender/manifold.go +++ b/worker/metrics/sender/manifold.go @@ -64,7 +64,7 @@ func Manifold(config ManifoldConfig) dependency.Manifold { if !ok { return nil, errors.Errorf("expected a unit tag, got %v", tag) } - paths := uniter.NewWorkerPaths(agentConfig.DataDir(), unitTag, "metrics-send", false) + paths := uniter.NewWorkerPaths(agentConfig.DataDir(), unitTag, "metrics-send", nil) client := newMetricAdderClient(apicaller) diff --git a/worker/uniter/paths.go b/worker/uniter/paths.go index a212b33b57a3..88dcd652f431 100644 --- a/worker/uniter/paths.go +++ b/worker/uniter/paths.go @@ -5,15 +5,12 @@ package uniter import ( + "crypto/tls" "fmt" - "io/ioutil" - "os" "path/filepath" - "github.com/juju/errors" jujuos "github.com/juju/os" "gopkg.in/juju/names.v3" - "gopkg.in/yaml.v2" "github.com/juju/juju/agent" "github.com/juju/juju/agent/tools" @@ -45,14 +42,30 @@ func (paths Paths) GetToolsDir() string { return paths.ToolsDir } +// GetBaseDir exists to satisfy the context.Paths interface. +func (paths Paths) GetBaseDir() string { + return paths.State.BaseDir +} + // GetCharmDir exists to satisfy the context.Paths interface. func (paths Paths) GetCharmDir() string { return paths.State.CharmDir } -// GetJujucSocket exists to satisfy the context.Paths interface. -func (paths Paths) GetJujucSocket() sockets.Socket { - return paths.Runtime.JujucServerSocket +// GetJujucClientSocket exists to satisfy the context.Paths interface. +func (paths Paths) GetJujucClientSocket(remote bool) sockets.Socket { + if remote { + return paths.Runtime.RemoteJujucServerSocket.Client + } + return paths.Runtime.LocalJujucServerSocket.Client +} + +// GetJujucServerSocket exists to satisfy the context.Paths interface. +func (paths Paths) GetJujucServerSocket(remote bool) sockets.Socket { + if remote { + return paths.Runtime.RemoteJujucServerSocket.Server + } + return paths.Runtime.LocalJujucServerSocket.Server } // GetMetricsSpoolDir exists to satisfy the runner.Paths interface. @@ -68,22 +81,33 @@ func (paths Paths) ComponentDir(name string) string { const jujucServerSocketPort = 30000 +// SocketPair is a server+client pair of socket descriptors. +type SocketPair struct { + Server sockets.Socket + Client sockets.Socket +} + // RuntimePaths represents the set of paths that are relevant at runtime. type RuntimePaths struct { - // JujuRunSocket listens for juju-run invocations, and is always // active. - JujuRunSocket sockets.Socket + LocalJujuRunSocket SocketPair + + // RemoteJujuRunSocket listens for remote juju-run invocations. + RemoteJujuRunSocket SocketPair // JujucServerSocket listens for jujuc invocations, and is only // active when supporting a jujuc execution context. - JujucServerSocket sockets.Socket + LocalJujucServerSocket SocketPair + + // RemoteJujucServerSocket listens for remote jujuc invocations, and is only + // active when supporting a jujuc execution context. + RemoteJujucServerSocket SocketPair } // StatePaths represents the set of paths that hold persistent local state for // the uniter. type StatePaths struct { - // BaseDir is the unit agent's base directory. BaseDir string @@ -114,92 +138,71 @@ type StatePaths struct { MetricsSpoolDir string } -// NewPaths returns the set of filesystem paths that the supplied unit should -// use, given the supplied root juju data directory path. -func NewPaths(dataDir string, unitTag names.UnitTag, isRemote bool) Paths { - return NewWorkerPaths(dataDir, unitTag, "", isRemote) +// SocketConfig specifies information for remote sockets. +type SocketConfig struct { + ServiceAddress string + OperatorAddress string + TLSConfig *tls.Config } -// TODO(caas) - move me to generic helper for reading operator config yaml -func socketIP(baseDir string) (string, error) { - // IP address to use for the socket can either be an env var - // (when we are the caas operator and are creating the socket to listen), - // or inside a YAML file (when we are juju-run and need to see where - // to connect to). - podIP := os.Getenv(provider.OperatorPodIPEnvName) - if podIP != "" { - return podIP, nil - } - ipAddrFile := filepath.Join(baseDir, provider.OperatorInfoFile) - ipAddrData, err := ioutil.ReadFile(ipAddrFile) - if err != nil { - return "", errors.Trace(err) - } - var socketIP string - var data map[string]interface{} - if err := yaml.Unmarshal(ipAddrData, &data); err == nil { - socketIP, _ = data["operator-address"].(string) - } - return socketIP, nil +// NewPaths returns the set of filesystem paths that the supplied unit should +// use, given the supplied root juju data directory path. +// If socketConfig is specified, all sockets will be TLS over TCP. +func NewPaths(dataDir string, unitTag names.UnitTag, socketConfig *SocketConfig) Paths { + return NewWorkerPaths(dataDir, unitTag, "", socketConfig) } // NewWorkerPaths returns the set of filesystem paths that the supplied unit worker should // use, given the supplied root juju data directory path and worker identifier. // Distinct worker identifiers ensure that runtime paths of different worker do not interfere. -func NewWorkerPaths(dataDir string, unitTag names.UnitTag, worker string, isRemote bool) Paths { +// If socketConfig is specified, all sockets will be TLS over TCP. +func NewWorkerPaths(dataDir string, unitTag names.UnitTag, worker string, socketConfig *SocketConfig) Paths { baseDir := agent.Dir(dataDir, unitTag) join := filepath.Join stateDir := join(baseDir, "state") - newSocket := func(name string, abstract bool) sockets.Socket { - if isRemote { - socketIP, err := socketIP(baseDir) - if err != nil { - logger.Warningf("unable to get IP address for jujuc socket: %v", err) - return sockets.Socket{} - } - logger.Debugf("using operator address: %v", socketIP) + var newSocket func(name string) SocketPair + if socketConfig != nil { + newSocket = func(name string) SocketPair { + var port int + var address string switch name { case "agent": - return sockets.Socket{ - Network: "tcp", - Address: fmt.Sprintf("%s:%d", socketIP, jujucServerSocketPort+unitTag.Number()), - } + port = jujucServerSocketPort + unitTag.Number() + address = socketConfig.OperatorAddress case "run": - return sockets.Socket{ - Network: "tcp", - Address: fmt.Sprintf("%s:%d", socketIP, provider.JujuRunServerSocketPort), - } + port = provider.JujuRunServerSocketPort + address = socketConfig.ServiceAddress default: - logger.Warningf("caas model socket name %q, fallback to unix protocol", name) + return SocketPair{} } - } - socket := sockets.Socket{Network: "unix"} - if jujuos.HostOS() == jujuos.Windows { - base := fmt.Sprintf("%s", unitTag) - if worker != "" { - base = fmt.Sprintf("%s-%s", unitTag, worker) + return SocketPair{ + Client: sockets.Socket{ + Network: "tcp", + Address: fmt.Sprintf("%s:%d", address, port), + TLSConfig: socketConfig.TLSConfig, + }, + Server: sockets.Socket{ + Network: "tcp", + Address: fmt.Sprintf(":%d", port), + TLSConfig: socketConfig.TLSConfig, + }, } - socket.Address = fmt.Sprintf(`\\.\pipe\%s-%s`, base, name) - return socket - } - path := join(baseDir, name+".socket") - if worker != "" { - path = join(baseDir, fmt.Sprintf("%s-%s.socket", worker, name)) } - if abstract { - path = "@" + path + } else { + newSocket = func(name string) SocketPair { + return SocketPair{} } - socket.Address = path - return socket } toolsDir := tools.ToolsDir(dataDir, unitTag.String()) return Paths{ ToolsDir: filepath.FromSlash(toolsDir), Runtime: RuntimePaths{ - JujuRunSocket: newSocket("run", false), - JujucServerSocket: newSocket("agent", true), + RemoteJujuRunSocket: newSocket("run"), + RemoteJujucServerSocket: newSocket("agent"), + LocalJujuRunSocket: newUnixSocket(baseDir, unitTag, worker, "run", false), + LocalJujucServerSocket: newUnixSocket(baseDir, unitTag, worker, "agent", true), }, State: StatePaths{ BaseDir: baseDir, @@ -213,3 +216,24 @@ func NewWorkerPaths(dataDir string, unitTag names.UnitTag, worker string, isRemo }, } } + +func newUnixSocket(baseDir string, unitTag names.UnitTag, worker string, name string, abstract bool) SocketPair { + socket := sockets.Socket{Network: "unix"} + if jujuos.HostOS() == jujuos.Windows { + base := fmt.Sprintf("%s", unitTag) + if worker != "" { + base = fmt.Sprintf("%s-%s", unitTag, worker) + } + socket.Address = fmt.Sprintf(`\\.\pipe\%s-%s`, base, name) + return SocketPair{socket, socket} + } + path := filepath.Join(baseDir, name+".socket") + if worker != "" { + path = filepath.Join(baseDir, fmt.Sprintf("%s-%s.socket", worker, name)) + } + if abstract { + path = "@" + path + } + socket.Address = path + return SocketPair{socket, socket} +} diff --git a/worker/uniter/paths_test.go b/worker/uniter/paths_test.go index 9a488d41e9e0..6776664d88ea 100644 --- a/worker/uniter/paths_test.go +++ b/worker/uniter/paths_test.go @@ -4,8 +4,7 @@ package uniter_test import ( - "io/ioutil" - "os" + "crypto/tls" "path/filepath" jujuos "github.com/juju/os" @@ -14,7 +13,6 @@ import ( gc "gopkg.in/check.v1" "gopkg.in/juju/names.v3" - "github.com/juju/juju/caas/kubernetes/provider" "github.com/juju/juju/juju/sockets" "github.com/juju/juju/worker/uniter" ) @@ -37,15 +35,17 @@ func (s *PathsSuite) TestWindows(c *gc.C) { dataDir := c.MkDir() unitTag := names.NewUnitTag("some-application/323") - paths := uniter.NewPaths(dataDir, unitTag, false) + paths := uniter.NewPaths(dataDir, unitTag, nil) relData := relPathFunc(dataDir) relAgent := relPathFunc(relData("agents", "unit-some-application-323")) + localRunSocket := sockets.Socket{Network: "unix", Address: `\\.\pipe\unit-some-application-323-run`} + localJujucSocket := sockets.Socket{Network: "unix", Address: `\\.\pipe\unit-some-application-323-agent`} c.Assert(paths, jc.DeepEquals, uniter.Paths{ ToolsDir: relData("tools/unit-some-application-323"), Runtime: uniter.RuntimePaths{ - JujuRunSocket: sockets.Socket{Network: "unix", Address: `\\.\pipe\unit-some-application-323-run`}, - JujucServerSocket: sockets.Socket{Network: "unix", Address: `\\.\pipe\unit-some-application-323-agent`}, + LocalJujuRunSocket: uniter.SocketPair{localRunSocket, localRunSocket}, + LocalJujucServerSocket: uniter.SocketPair{localJujucSocket, localJujucSocket}, }, State: uniter.StatePaths{ BaseDir: relAgent(), @@ -66,15 +66,18 @@ func (s *PathsSuite) TestWorkerPathsWindows(c *gc.C) { dataDir := c.MkDir() unitTag := names.NewUnitTag("some-application/323") worker := "some-worker" - paths := uniter.NewWorkerPaths(dataDir, unitTag, worker, false) + paths := uniter.NewWorkerPaths(dataDir, unitTag, worker, nil) relData := relPathFunc(dataDir) relAgent := relPathFunc(relData("agents", "unit-some-application-323")) + + localRunSocket := sockets.Socket{Network: "unix", Address: `\\.\pipe\unit-some-application-323-some-worker-run`} + localJujucSocket := sockets.Socket{Network: "unix", Address: `\\.\pipe\unit-some-application-323-some-worker-agent`} c.Assert(paths, jc.DeepEquals, uniter.Paths{ ToolsDir: relData("tools/unit-some-application-323"), Runtime: uniter.RuntimePaths{ - JujuRunSocket: sockets.Socket{Network: "unix", Address: `\\.\pipe\unit-some-application-323-some-worker-run`}, - JujucServerSocket: sockets.Socket{Network: "unix", Address: `\\.\pipe\unit-some-application-323-some-worker-agent`}, + LocalJujuRunSocket: uniter.SocketPair{localRunSocket, localRunSocket}, + LocalJujucServerSocket: uniter.SocketPair{localJujucSocket, localJujucSocket}, }, State: uniter.StatePaths{ BaseDir: relAgent(), @@ -94,15 +97,19 @@ func (s *PathsSuite) TestOther(c *gc.C) { dataDir := c.MkDir() unitTag := names.NewUnitTag("some-application/323") - paths := uniter.NewPaths(dataDir, unitTag, false) + + paths := uniter.NewPaths(dataDir, unitTag, nil) relData := relPathFunc(dataDir) relAgent := relPathFunc(relData("agents", "unit-some-application-323")) + + localRunSocket := sockets.Socket{Network: "unix", Address: relAgent("run.socket")} + localJujucSocket := sockets.Socket{Network: "unix", Address: "@" + relAgent("agent.socket")} c.Assert(paths, jc.DeepEquals, uniter.Paths{ ToolsDir: relData("tools/unit-some-application-323"), Runtime: uniter.RuntimePaths{ - JujuRunSocket: sockets.Socket{Network: "unix", Address: relAgent("run.socket")}, - JujucServerSocket: sockets.Socket{Network: "unix", Address: "@" + relAgent("agent.socket")}, + LocalJujuRunSocket: uniter.SocketPair{localRunSocket, localRunSocket}, + LocalJujucServerSocket: uniter.SocketPair{localJujucSocket, localJujucSocket}, }, State: uniter.StatePaths{ BaseDir: relAgent(), @@ -117,38 +124,35 @@ func (s *PathsSuite) TestOther(c *gc.C) { }) } -func (s *PathsSuite) TestTCPRemoteEnvVar(c *gc.C) { - defer os.Setenv(provider.OperatorPodIPEnvName, os.Getenv(provider.OperatorPodIPEnvName)) - os.Setenv(provider.OperatorPodIPEnvName, "1.1.1.1") - s.PatchValue(&jujuos.HostOS, func() jujuos.OSType { return jujuos.Unknown }) +func (s *PathsSuite) TestTCPRemote(c *gc.C) { + unitTag := names.NewUnitTag("some-application/323") - dataDir := c.MkDir() - s.assertTCPRemote(c, dataDir) -} + socketConfig := uniter.SocketConfig{ + ServiceAddress: "127.0.0.1", + OperatorAddress: "127.0.0.2", + TLSConfig: &tls.Config{ + ServerName: "test", + }, + } -func (s *PathsSuite) TestTCPRemoteYamlFile(c *gc.C) { dataDir := c.MkDir() - - unitTag := names.NewUnitTag("some-application/323") - ipAddrFile := filepath.Join(dataDir, "agents", unitTag.String(), "operator.yaml") - err := os.MkdirAll(filepath.Dir(ipAddrFile), 0700) - c.Assert(err, jc.ErrorIsNil) - err = ioutil.WriteFile(ipAddrFile, []byte("operator-address: 1.1.1.1"), 0644) - c.Assert(err, jc.ErrorIsNil) - s.assertTCPRemote(c, dataDir) -} - -func (s *PathsSuite) assertTCPRemote(c *gc.C, dataDir string) { - unitTag := names.NewUnitTag("some-application/323") - paths := uniter.NewPaths(dataDir, unitTag, true) + paths := uniter.NewPaths(dataDir, unitTag, &socketConfig) relData := relPathFunc(dataDir) relAgent := relPathFunc(relData("agents", "unit-some-application-323")) + localRunSocket := sockets.Socket{Network: "unix", Address: relAgent("run.socket")} + localJujucSocket := sockets.Socket{Network: "unix", Address: "@" + relAgent("agent.socket")} + remoteRunServerSocket := sockets.Socket{Network: "tcp", Address: ":30666", TLSConfig: socketConfig.TLSConfig} + remoteRunClientSocket := sockets.Socket{Network: "tcp", Address: "127.0.0.1:30666", TLSConfig: socketConfig.TLSConfig} + remoteJujucServerSocket := sockets.Socket{Network: "tcp", Address: ":30323", TLSConfig: socketConfig.TLSConfig} + remoteJujucClientSocket := sockets.Socket{Network: "tcp", Address: "127.0.0.2:30323", TLSConfig: socketConfig.TLSConfig} c.Assert(paths, jc.DeepEquals, uniter.Paths{ ToolsDir: relData("tools/unit-some-application-323"), Runtime: uniter.RuntimePaths{ - JujuRunSocket: sockets.Socket{Network: "tcp", Address: "1.1.1.1:30666"}, - JujucServerSocket: sockets.Socket{Network: "tcp", Address: "1.1.1.1:30323"}, + LocalJujuRunSocket: uniter.SocketPair{localRunSocket, localRunSocket}, + LocalJujucServerSocket: uniter.SocketPair{localJujucSocket, localJujucSocket}, + RemoteJujuRunSocket: uniter.SocketPair{remoteRunServerSocket, remoteRunClientSocket}, + RemoteJujucServerSocket: uniter.SocketPair{remoteJujucServerSocket, remoteJujucClientSocket}, }, State: uniter.StatePaths{ BaseDir: relAgent(), @@ -169,15 +173,17 @@ func (s *PathsSuite) TestWorkerPaths(c *gc.C) { dataDir := c.MkDir() unitTag := names.NewUnitTag("some-application/323") worker := "worker-id" - paths := uniter.NewWorkerPaths(dataDir, unitTag, worker, false) + paths := uniter.NewWorkerPaths(dataDir, unitTag, worker, nil) relData := relPathFunc(dataDir) relAgent := relPathFunc(relData("agents", "unit-some-application-323")) + localRunSocket := sockets.Socket{Network: "unix", Address: relAgent(worker + "-run.socket")} + localJujucSocket := sockets.Socket{Network: "unix", Address: "@" + relAgent(worker+"-agent.socket")} c.Assert(paths, jc.DeepEquals, uniter.Paths{ ToolsDir: relData("tools/unit-some-application-323"), Runtime: uniter.RuntimePaths{ - JujuRunSocket: sockets.Socket{Network: "unix", Address: relAgent(worker + "-run.socket")}, - JujucServerSocket: sockets.Socket{Network: "unix", Address: "@" + relAgent(worker+"-agent.socket")}, + LocalJujuRunSocket: uniter.SocketPair{localRunSocket, localRunSocket}, + LocalJujucServerSocket: uniter.SocketPair{localJujucSocket, localJujucSocket}, }, State: uniter.StatePaths{ BaseDir: relAgent(), @@ -196,7 +202,7 @@ func (s *PathsSuite) TestContextInterface(c *gc.C) { paths := uniter.Paths{ ToolsDir: "/path/to/tools", Runtime: uniter.RuntimePaths{ - JujucServerSocket: sockets.Socket{Network: "unix", Address: "/path/to/socket"}, + LocalJujucServerSocket: uniter.SocketPair{Server: sockets.Socket{Network: "unix", Address: "/path/to/socket"}}, }, State: uniter.StatePaths{ CharmDir: "/path/to/charm", @@ -205,6 +211,6 @@ func (s *PathsSuite) TestContextInterface(c *gc.C) { } c.Assert(paths.GetToolsDir(), gc.Equals, "/path/to/tools") c.Assert(paths.GetCharmDir(), gc.Equals, "/path/to/charm") - c.Assert(paths.GetJujucSocket(), gc.DeepEquals, sockets.Socket{Address: "/path/to/socket", Network: "unix"}) + c.Assert(paths.GetJujucServerSocket(false), gc.DeepEquals, sockets.Socket{Address: "/path/to/socket", Network: "unix"}) c.Assert(paths.GetMetricsSpoolDir(), gc.Equals, "/path/to/spool/metrics") } diff --git a/worker/uniter/runlistener.go b/worker/uniter/runlistener.go index 40a0d897e223..d6fd1d86cfa8 100644 --- a/worker/uniter/runlistener.go +++ b/worker/uniter/runlistener.go @@ -7,15 +7,22 @@ package uniter import ( + "io/ioutil" "net" "net/rpc" + "path/filepath" "sync" "github.com/juju/errors" "github.com/juju/utils/exec" + "gopkg.in/juju/names.v3" "gopkg.in/juju/worker.v1" "gopkg.in/tomb.v2" + "gopkg.in/yaml.v2" + "github.com/juju/juju/agent" + "github.com/juju/juju/caas" + cmdutil "github.com/juju/juju/cmd/jujud/util" "github.com/juju/juju/juju/sockets" "github.com/juju/juju/worker/uniter/operation" "github.com/juju/juju/worker/uniter/runcommands" @@ -37,6 +44,8 @@ type RunCommandsArgs struct { ForceRemoteUnit bool // UnitName is the unit for which the command is being run. UnitName string + // Token is the unit token when run under CAAS environments for auth. + Token string } // A CommandRunner is something that will actually execute the commands and @@ -62,6 +71,8 @@ type RunListener struct { closed chan struct{} closing chan struct{} wg sync.WaitGroup + + requiresAuth bool } // NewRunListener returns a new RunListener that is listening on given @@ -80,6 +91,9 @@ func NewRunListener(socket sockets.Socket) (*RunListener, error) { closed: make(chan struct{}), closing: make(chan struct{}), } + if socket.Network == "tcp" || socket.TLSConfig != nil { + runListener.requiresAuth = true + } if err := runListener.server.Register(&JujuRunServer{runListener}); err != nil { return nil, errors.Trace(err) } @@ -154,6 +168,25 @@ func (r *RunListener) RunCommands(args RunCommandsArgs) (results *exec.ExecRespo if !ok { return nil, errors.Errorf("no runner is registered for unit %v", args.UnitName) } + + if r.requiresAuth { + // TODO: Cache unit password + baseDir := agent.Dir(cmdutil.DataDir, names.NewUnitTag(args.UnitName)) + infoFilePath := filepath.Join(baseDir, caas.OperatorClientInfoCacheFile) + d, err := ioutil.ReadFile(infoFilePath) + if err != nil { + return nil, errors.Annotatef(err, "reading %s", infoFilePath) + } + op := caas.OperatorClientInfo{} + err = yaml.Unmarshal(d, &op) + if err != nil { + return nil, errors.Trace(err) + } + if args.Token != op.Token { + return nil, errors.Forbiddenf("unit token mismatch") + } + } + return runner.RunCommands(args) } diff --git a/worker/uniter/runner/context/context.go b/worker/uniter/runner/context/context.go index bcffccb6d03f..3be141ffcce1 100644 --- a/worker/uniter/runner/context/context.go +++ b/worker/uniter/runner/context/context.go @@ -7,6 +7,7 @@ package context import ( "fmt" + "path" "strings" "sync" "time" @@ -20,6 +21,7 @@ import ( "github.com/juju/juju/api/base" "github.com/juju/juju/api/uniter" "github.com/juju/juju/apiserver/params" + "github.com/juju/juju/caas" k8sspecs "github.com/juju/juju/caas/kubernetes/provider/specs" "github.com/juju/juju/core/application" "github.com/juju/juju/core/model" @@ -33,19 +35,27 @@ import ( // Paths exposes the paths needed by Context. type Paths interface { - // GetToolsDir returns the filesystem path to the dirctory containing // the hook tool symlinks. GetToolsDir() string + // GetCharmDir returns the filesystem path to the directory in which + // the charm is installed. + GetBaseDir() string + // GetCharmDir returns the filesystem path to the directory in which // the charm is installed. GetCharmDir() string - // GetJujucSocket returns the path to the socket used by the hook tools + // GetJujucServerSocket returns the path to the socket used by the hook tools + // to communicate back to the executing uniter process. It might be a + // filesystem path, or it might be abstract. + GetJujucServerSocket(remote bool) sockets.Socket + + // GetJujucClientSocket returns the path to the socket used by the hook tools // to communicate back to the executing uniter process. It might be a // filesystem path, or it might be abstract. - GetJujucSocket() sockets.Socket + GetJujucClientSocket(remote bool) sockets.Socket // GetMetricsSpoolDir returns the path to a metrics spool dir, used // to store metrics recorded during a single hook run. @@ -658,7 +668,7 @@ func (c *HookContext) ActionData() (*ActionData, error) { // HookVars returns an os.Environ-style list of strings necessary to run a hook // such that it can know what environment it's operating in, and can call back // into context. -func (context *HookContext) HookVars(paths Paths) ([]string, error) { +func (context *HookContext) HookVars(paths Paths, remote bool) ([]string, error) { vars := context.legacyProxySettings.AsEnvironmentValues() // TODO(thumper): as work on proxies progress, there will be additional // proxy settings to be added. @@ -666,8 +676,8 @@ func (context *HookContext) HookVars(paths Paths) ([]string, error) { "CHARM_DIR="+paths.GetCharmDir(), // legacy, embarrassing "JUJU_CHARM_DIR="+paths.GetCharmDir(), "JUJU_CONTEXT_ID="+context.id, - "JUJU_AGENT_SOCKET_ADDRESS="+paths.GetJujucSocket().Address, - "JUJU_AGENT_SOCKET_NETWORK="+paths.GetJujucSocket().Network, + "JUJU_AGENT_SOCKET_ADDRESS="+paths.GetJujucClientSocket(remote).Address, + "JUJU_AGENT_SOCKET_NETWORK="+paths.GetJujucClientSocket(remote).Network, "JUJU_UNIT_NAME="+context.unitName, "JUJU_MODEL_UUID="+context.uuid, "JUJU_MODEL_NAME="+context.modelName, @@ -685,6 +695,11 @@ func (context *HookContext) HookVars(paths Paths) ([]string, error) { "JUJU_CHARM_FTP_PROXY="+context.jujuProxySettings.Ftp, "JUJU_CHARM_NO_PROXY="+context.jujuProxySettings.NoProxy, ) + if remote { + vars = append(vars, + "JUJU_AGENT_CA_CERT="+path.Join(paths.GetBaseDir(), caas.CACertFile), + ) + } if context.meterStatus != nil { vars = append(vars, "JUJU_METER_STATUS="+context.meterStatus.code, diff --git a/worker/uniter/runner/context/env_test.go b/worker/uniter/runner/context/env_test.go index d2d7e54e5456..d6f3bdcf4735 100644 --- a/worker/uniter/runner/context/env_test.go +++ b/worker/uniter/runner/context/env_test.go @@ -158,12 +158,12 @@ func (s *EnvSuite) TestEnvWindows(c *gc.C) { ctx, contextVars := s.getContext(false) paths, pathsVars := s.getPaths() - actualVars, err := ctx.HookVars(paths) + actualVars, err := ctx.HookVars(paths, false) c.Assert(err, jc.ErrorIsNil) s.assertVars(c, actualVars, contextVars, pathsVars, windowsVars) relationVars := s.setRelation(ctx) - actualVars, err = ctx.HookVars(paths) + actualVars, err = ctx.HookVars(paths, false) c.Assert(err, jc.ErrorIsNil) s.assertVars(c, actualVars, contextVars, pathsVars, windowsVars, relationVars) } @@ -181,12 +181,12 @@ func (s *EnvSuite) TestEnvUbuntu(c *gc.C) { ctx, contextVars := s.getContext(false) paths, pathsVars := s.getPaths() - actualVars, err := ctx.HookVars(paths) + actualVars, err := ctx.HookVars(paths, false) c.Assert(err, jc.ErrorIsNil) s.assertVars(c, actualVars, contextVars, pathsVars, ubuntuVars) relationVars := s.setRelation(ctx) - actualVars, err = ctx.HookVars(paths) + actualVars, err = ctx.HookVars(paths, false) c.Assert(err, jc.ErrorIsNil) s.assertVars(c, actualVars, contextVars, pathsVars, ubuntuVars, relationVars) } diff --git a/worker/uniter/runner/context/util_test.go b/worker/uniter/runner/context/util_test.go index 385d73974220..20f23c39509c 100644 --- a/worker/uniter/runner/context/util_test.go +++ b/worker/uniter/runner/context/util_test.go @@ -364,7 +364,21 @@ func (MockEnvPaths) GetCharmDir() string { return "path-to-charm" } -func (MockEnvPaths) GetJujucSocket() sockets.Socket { +func (MockEnvPaths) GetBaseDir() string { + return "path-to-base" +} + +func (MockEnvPaths) GetJujucClientSocket(remote bool) sockets.Socket { + if remote { + return sockets.Socket{Network: "tcp", Address: "127.0.0.1:32000"} + } + return sockets.Socket{Network: "unix", Address: "path-to-jujuc.socket"} +} + +func (MockEnvPaths) GetJujucServerSocket(remote bool) sockets.Socket { + if remote { + return sockets.Socket{Network: "tcp", Address: "127.0.0.1:32000"} + } return sockets.Socket{Network: "unix", Address: "path-to-jujuc.socket"} } diff --git a/worker/uniter/runner/factory_test.go b/worker/uniter/runner/factory_test.go index e6820723d44a..7f4e19be85cd 100644 --- a/worker/uniter/runner/factory_test.go +++ b/worker/uniter/runner/factory_test.go @@ -247,7 +247,7 @@ func (s *FactorySuite) TestNewActionRunnerGood(c *gc.C) { Params: test.payload, ResultsMap: map[string]interface{}{}, }) - vars, err := ctx.HookVars(s.paths) + vars, err := ctx.HookVars(s.paths, false) c.Assert(err, jc.ErrorIsNil) c.Assert(len(vars) > 0, jc.IsTrue, gc.Commentf("expected HookVars but found none")) combined := strings.Join(vars, "|") diff --git a/worker/uniter/runner/jujuc/server.go b/worker/uniter/runner/jujuc/server.go index 530ab9feea36..72f8ae3edc1e 100644 --- a/worker/uniter/runner/jujuc/server.go +++ b/worker/uniter/runner/jujuc/server.go @@ -131,6 +131,8 @@ type Request struct { // is empty. StdinSet bool Stdin []byte + + Token string } // CmdGetter looks up a Command implementation connected to a particular Context. @@ -140,6 +142,7 @@ type CmdGetter func(contextId, cmdName string) (cmd.Command, error) type Jujuc struct { mu sync.Mutex getCmd CmdGetter + token string } // badReqErrorf returns an error indicating a bad Request. @@ -150,6 +153,9 @@ func badReqErrorf(format string, v ...interface{}) error { // Main runs the Command specified by req, and fills in resp. A single command // is run at a time. func (j *Jujuc) Main(req Request, resp *exec.ExecResponse) error { + if req.Token != j.token { + return badReqErrorf("token does not match") + } if req.CommandName == "" { return badReqErrorf("command not specified") } @@ -206,9 +212,9 @@ type Server struct { // NewServer creates an RPC server bound to socketPath, which can execute // remote command invocations against an appropriate Context. It will not // actually do so until Run is called. -func NewServer(getCmd CmdGetter, socket sockets.Socket) (*Server, error) { +func NewServer(getCmd CmdGetter, socket sockets.Socket, token string) (*Server, error) { server := rpc.NewServer() - if err := server.Register(&Jujuc{getCmd: getCmd}); err != nil { + if err := server.Register(&Jujuc{getCmd: getCmd, token: token}); err != nil { return nil, err } listener, err := sockets.Listen(socket) diff --git a/worker/uniter/runner/jujuc/server_test.go b/worker/uniter/runner/jujuc/server_test.go index 1f03246c8c04..b4179311d51f 100644 --- a/worker/uniter/runner/jujuc/server_test.go +++ b/worker/uniter/runner/jujuc/server_test.go @@ -103,7 +103,7 @@ func (s *ServerSuite) osDependentSockPath(c *gc.C) sockets.Socket { func (s *ServerSuite) SetUpTest(c *gc.C) { s.BaseSuite.SetUpTest(c) s.socket = s.osDependentSockPath(c) - srv, err := jujuc.NewServer(factory, s.socket) + srv, err := jujuc.NewServer(factory, s.socket, "") c.Assert(err, jc.ErrorIsNil) c.Assert(srv, gc.NotNil) s.server = srv diff --git a/worker/uniter/runner/runner.go b/worker/uniter/runner/runner.go index ecfa2b76110b..d761d4885449 100644 --- a/worker/uniter/runner/runner.go +++ b/worker/uniter/runner/runner.go @@ -22,6 +22,7 @@ import ( "github.com/juju/errors" "github.com/juju/loggo" jujuos "github.com/juju/os" + "github.com/juju/utils" utilexec "github.com/juju/utils/exec" "github.com/juju/juju/core/actions" @@ -61,7 +62,7 @@ type Runner interface { type Context interface { jujuc.Context Id() string - HookVars(paths context.Paths) ([]string, error) + HookVars(paths context.Paths, remote bool) ([]string, error) ActionData() (*context.ActionData, error) SetProcess(process context.HookProcess) HasExecutionSetUnitStatus() bool @@ -126,8 +127,8 @@ func (runner *runner) Context() Context { return runner.context } -func (runner *runner) getRemoteExecutor(rModel runMode) (ExecFunc, error) { - switch rModel { +func (runner *runner) getRemoteExecutor(rMode runMode) (ExecFunc, error) { + switch rMode { case runOnLocal: return execOnMachine, nil case runOnRemote: @@ -135,7 +136,7 @@ func (runner *runner) getRemoteExecutor(rModel runMode) (ExecFunc, error) { return runner.remoteExecutor, nil } } - return nil, errors.NotSupportedf("run command model %q", rModel) + return nil, errors.NotSupportedf("run command mode %q", rMode) } // RunCommands exists to satisfy the Runner interface. @@ -150,17 +151,28 @@ func (runner *runner) RunCommands(commands string) (*utilexec.ExecResponse, erro // runCommandsWithTimeout is a helper to abstract common code between run commands and // juju-run as an action -func (runner *runner) runCommandsWithTimeout(commands string, timeout time.Duration, clock clock.Clock, model runMode) (*utilexec.ExecResponse, error) { - srv, err := runner.startJujucServer() +func (runner *runner) runCommandsWithTimeout(commands string, timeout time.Duration, clock clock.Clock, rMode runMode) (*utilexec.ExecResponse, error) { + var err error + token := "" + if rMode == runOnRemote { + token, err = utils.RandomPassword() + if err != nil { + return nil, errors.Trace(err) + } + } + srv, err := runner.startJujucServer(token, rMode) if err != nil { return nil, err } defer srv.Close() - env, err := runner.context.HookVars(runner.paths) + env, err := runner.context.HookVars(runner.paths, rMode == runOnRemote) if err != nil { return nil, errors.Trace(err) } + if rMode == runOnRemote { + env = append(env, "JUJU_AGENT_TOKEN="+token) + } var cancel chan struct{} if timeout != 0 { @@ -171,7 +183,7 @@ func (runner *runner) runCommandsWithTimeout(commands string, timeout time.Durat }() } - executor, err := runner.getRemoteExecutor(model) + executor, err := runner.getRemoteExecutor(rMode) if err != nil { return nil, errors.Trace(err) } @@ -286,13 +298,20 @@ func (runner *runner) RunHook(hookName string) error { } func (runner *runner) runCharmHookWithLocation(hookName, charmLocation string, rMode runMode) (err error) { - srv, err := runner.startJujucServer() + token := "" + if rMode == runOnRemote { + token, err = utils.RandomPassword() + if err != nil { + return errors.Trace(err) + } + } + srv, err := runner.startJujucServer(token, rMode) if err != nil { return err } defer srv.Close() - env, err := runner.context.HookVars(runner.paths) + env, err := runner.context.HookVars(runner.paths, rMode == runOnRemote) if err != nil { return errors.Trace(err) } @@ -302,6 +321,9 @@ func (runner *runner) runCharmHookWithLocation(hookName, charmLocation string, r // because that already has handling for windows environment requirements. env = mergeWindowsEnvironment(env, os.Environ()) } + if rMode == runOnRemote { + env = append(env, "JUJU_AGENT_TOKEN="+token) + } defer func() { err = runner.context.Flush(hookName, err) @@ -521,7 +543,7 @@ func (runner *runner) runCharmHookOnLocal(hookName string, env []string, charmLo return errors.Trace(exitErr) } -func (runner *runner) startJujucServer() (*jujuc.Server, error) { +func (runner *runner) startJujucServer(token string, rMode runMode) (*jujuc.Server, error) { // Prepare server. getCmd := func(ctxId, cmdName string) (cmd.Command, error) { if ctxId != runner.context.Id() { @@ -529,7 +551,10 @@ func (runner *runner) startJujucServer() (*jujuc.Server, error) { } return jujuc.NewCommand(runner.context, cmdName) } - srv, err := jujuc.NewServer(getCmd, runner.paths.GetJujucSocket()) + + socket := runner.paths.GetJujucServerSocket(rMode == runOnRemote) + logger.Debugf("starting jujuc server %s %v", token, socket) + srv, err := jujuc.NewServer(getCmd, socket, token) if err != nil { return nil, errors.Annotate(err, "starting jujuc server") } diff --git a/worker/uniter/runner/runner_test.go b/worker/uniter/runner/runner_test.go index ac98bf4449b2..188dce5d6ea9 100644 --- a/worker/uniter/runner/runner_test.go +++ b/worker/uniter/runner/runner_test.go @@ -165,7 +165,7 @@ func (ctx *MockContext) UnitName() string { return "some-unit/999" } -func (ctx *MockContext) HookVars(paths context.Paths) ([]string, error) { +func (ctx *MockContext) HookVars(paths context.Paths, _ bool) ([]string, error) { return []string{"VAR=value"}, nil } diff --git a/worker/uniter/runner/testing/utils.go b/worker/uniter/runner/testing/utils.go index 6ebf0ee9a88d..d23e302dc2de 100644 --- a/worker/uniter/runner/testing/utils.go +++ b/worker/uniter/runner/testing/utils.go @@ -27,6 +27,7 @@ type fops interface { type RealPaths struct { tools string charm string + base string socket sockets.Socket metricsspool string componentDirs map[string]string @@ -45,6 +46,7 @@ func NewRealPaths(c *gc.C) RealPaths { return RealPaths{ tools: c.MkDir(), charm: c.MkDir(), + base: c.MkDir(), socket: osDependentSockPath(c), metricsspool: c.MkDir(), componentDirs: make(map[string]string), @@ -64,7 +66,15 @@ func (p RealPaths) GetCharmDir() string { return p.charm } -func (p RealPaths) GetJujucSocket() sockets.Socket { +func (p RealPaths) GetBaseDir() string { + return p.base +} + +func (p RealPaths) GetJujucClientSocket(remote bool) sockets.Socket { + return p.socket +} + +func (p RealPaths) GetJujucServerSocket(remote bool) sockets.Socket { return p.socket } diff --git a/worker/uniter/uniter.go b/worker/uniter/uniter.go index 27371e2014ea..7d5ea4316c72 100644 --- a/worker/uniter/uniter.go +++ b/worker/uniter/uniter.go @@ -95,9 +95,10 @@ type Uniter struct { // TODO(axw) move the runListener and run-command code outside of the // uniter, and introduce a separate worker. Each worker would feed // operations to a single, synchronized runner to execute. - runListener *RunListener - commands runcommands.Commands - commandChannel chan string + runListener *RunListener + localRunListener *RunListener + commands runcommands.Commands + commandChannel chan string // The execution observer is only used in tests at this stage. Should this // need to be extended, perhaps a list of observers would be needed. @@ -137,6 +138,7 @@ type UniterParams struct { TranslateResolverErr func(error) error Clock clock.Clock ApplicationChannel watcher.NotifyChannel + SocketConfig *SocketConfig // TODO (mattyw, wallyworld, fwereade) Having the observer here make this approach a bit more legitimate, but it isn't. // the observer is only a stop gap to be used in tests. A better approach would be to have the uniter tests start hooks // that write to files, and have the tests watch the output to know that hooks have finished. @@ -180,7 +182,7 @@ func newUniter(uniterParams *UniterParams) func() (worker.Worker, error) { } u := &Uniter{ st: uniterParams.UniterFacade, - paths: NewPaths(uniterParams.DataDir, uniterParams.UnitTag, uniterParams.ModelType == model.CAAS), + paths: NewPaths(uniterParams.DataDir, uniterParams.UnitTag, uniterParams.SocketConfig), modelType: uniterParams.ModelType, hookLock: uniterParams.MachineLock, leadershipTracker: uniterParams.LeadershipTracker, @@ -432,7 +434,10 @@ func (u *Uniter) loop(unitTag names.UnitTag) (err error) { if errors.Cause(err) == ErrCAASUnitDead { err = nil } - u.runListener.UnregisterRunner(u.unit.Name()) + if u.runListener != nil { + u.runListener.UnregisterRunner(u.unit.Name()) + } + u.localRunListener.UnregisterRunner(u.unit.Name()) logger.Infof("unit %q shutting down: %s", u.unit, err) return err } @@ -621,18 +626,17 @@ func (u *Uniter) init(unitTag names.UnitTag) (err error) { } u.operationExecutor = operationExecutor - if u.runListener == nil { - socket := u.paths.Runtime.JujuRunSocket - logger.Debugf("starting juju-run listener on %v", socket) - u.runListener, err = NewRunListener(socket) - if err != nil { - return errors.Annotate(err, "creating juju run listener") - } - rlw := NewRunListenerWrapper(u.runListener) - if err := u.catacomb.Add(rlw); err != nil { - return errors.Trace(err) - } + socket := u.paths.Runtime.LocalJujuRunSocket.Server + logger.Debugf("starting local juju-run listener on %v", socket) + u.localRunListener, err = NewRunListener(socket) + if err != nil { + return errors.Annotate(err, "creating juju run listener") + } + rlw := NewRunListenerWrapper(u.localRunListener) + if err := u.catacomb.Add(rlw); err != nil { + return errors.Trace(err) } + commandRunner, err := NewChannelCommandRunner(ChannelCommandRunnerConfig{ Abort: u.catacomb.Dying(), Commands: u.commands, @@ -641,7 +645,10 @@ func (u *Uniter) init(unitTag names.UnitTag) (err error) { if err != nil { return errors.Annotate(err, "creating command runner") } - u.runListener.RegisterRunner(u.unit.Name(), commandRunner) + u.localRunListener.RegisterRunner(u.unit.Name(), commandRunner) + if u.runListener != nil { + u.runListener.RegisterRunner(u.unit.Name(), commandRunner) + } return nil } @@ -667,7 +674,7 @@ func (u *Uniter) getApplicationCharmURL() (*corecharm.URL, error) { func (u *Uniter) RunCommands(args RunCommandsArgs) (results *exec.ExecResponse, err error) { // TODO(axw) drop this when we move the run-listener to an independent // worker. This exists purely for the tests. - return u.runListener.RunCommands(args) + return u.localRunListener.RunCommands(args) } // acquireExecutionLock acquires the machine-level execution lock, and diff --git a/worker/uniter/util_test.go b/worker/uniter/util_test.go index 0bffa423056c..abb5e5bcd6aa 100644 --- a/worker/uniter/util_test.go +++ b/worker/uniter/util_test.go @@ -665,7 +665,7 @@ func (s verifyDownloadsCleared) step(c *gc.C, ctx *context) { } func downloadDir(ctx *context) string { - paths := uniter.NewPaths(ctx.dataDir, ctx.unit.UnitTag(), false) + paths := uniter.NewPaths(ctx.dataDir, ctx.unit.UnitTag(), nil) return filepath.Join(paths.State.BundlesDir, "downloads") }