Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow the connector to run outside of a kube pod #2613

Merged
merged 3 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 89 additions & 76 deletions internal/connector/connector.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package connector

import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
Expand Down Expand Up @@ -52,20 +51,26 @@ type ListenerOptions struct {
Metrics string
}

// connector stores all the dependencies for the connector operations.
type connector struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm never sure what to call these, so I went with the same pattern we used for Server and called it connector. As the code changes maybe we'll find a better name for this, or find ways to split it into other types.

I'm doing this now to allow us to split up syncWithServer and removing the need to pass around 5+ args to the functions.

k8s *kubernetes.Kubernetes
client *api.Client
destination *api.Destination
certCache *CertCache
options Options
}

func Run(ctx context.Context, options Options) error {
k8s, err := kubernetes.NewKubernetes()
if err != nil {
return err
return fmt.Errorf("failed to create kubernetes client: %w", err)
}

chksm, err := k8s.Checksum()
if err != nil {
logging.Errorf("k8s checksum error: %s", err)
return err
}
checkSum := k8s.Checksum()
logging.L.Debug().Str("uniqueID", checkSum).Msg("Cluster uniqueID")
Comment on lines +69 to +70
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested this manually for now, since we don't quite yet have an easy way of testing this outside of kube.

I tested this by running infra connector locally using my kubectl config, and again by running it in a pod. I compared the checksum in both cases to ensure it produced the same value.


if options.Name == "" {
autoname, err := k8s.Name(chksm)
autoname, err := k8s.Name(checkSum)
if err != nil {
logging.Errorf("k8s name error: %s", err)
return err
Expand Down Expand Up @@ -103,7 +108,7 @@ func Run(ctx context.Context, options Options) error {

destination := &api.Destination{
Name: options.Name,
UniqueID: chksm,
UniqueID: checkSum,
}

// clone the default http transport which sets reasonable defaults
Expand Down Expand Up @@ -133,7 +138,7 @@ func Run(ctx context.Context, options Options) error {
Transport: httpTransportFromOptions(options.Server),
},
Headers: http.Header{
"Infra-Destination": {chksm},
"Infra-Destination": {checkSum},
},
OnUnauthorized: func() {
logging.Errorf("Unauthorized error; token invalid or expired. exiting.")
Expand All @@ -158,8 +163,15 @@ func Run(ctx context.Context, options Options) error {
},
}

con := connector{
k8s: k8s,
client: client,
destination: destination,
certCache: certCache,
options: options,
}
// TODO: make polling time configurable
repeat.Start(ctx, 30*time.Second, syncWithServer(k8s, client, destination, certCache, []byte(options.CACert)))
repeat.Start(ctx, 30*time.Second, syncWithServer(con))

ginutil.SetMode()
router := gin.New()
Expand All @@ -172,14 +184,8 @@ func Run(ctx context.Context, options Options) error {
return fmt.Errorf("parsing host config: %w", err)
}

clusterCACert, err := kubernetes.CA()
if err != nil {
return fmt.Errorf("reading CA file: %w", err)
}

certPool := x509.NewCertPool()

if ok := certPool.AppendCertsFromPEM(clusterCACert); !ok {
if ok := certPool.AppendCertsFromPEM(k8s.Config.CAData); !ok {
Comment on lines -175 to +188
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CA data is populated by the rest.InClusterConfig as long as we call rest.LoadTLSFiles (which I added to NewKubernetes). This removes the need to look it up again, and helps keep most the in-cluster logic in the NewKubernetes constructor.

return errors.New("could not append CA to client cert bundle")
}

Expand Down Expand Up @@ -264,88 +270,96 @@ func httpTransportFromOptions(opts ServerOptions) *http.Transport {
return transport
}

func syncWithServer(k8s *kubernetes.Kubernetes, client *api.Client, destination *api.Destination, certCache *CertCache, caCertPEM []byte) func(context.Context) {
func syncDestination(con connector) error {
host, port, err := con.k8s.Endpoint()
Comment on lines +273 to +274
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next step for allowing this to run locally will be to add a config setting that allows us to skip the endpoint lookup. So before making that change I'm extracting this logic from sycnWithServer to give us some space to add that new logic.

if err != nil {
return fmt.Errorf("failed to lookup endpoint: %w", err)
}

return func(context.Context) {
host, port, err := k8s.Endpoint()
if err != nil {
logging.Errorf("failed to lookup endpoint: %v", err)
return
if ipv4 := net.ParseIP(host); ipv4 == nil {
// wait for DNS resolution if endpoint is not an IPv4 address
if _, err := net.LookupIP(host); err != nil {
return fmt.Errorf("host could not be resolved: %w", err)
}
}

if ipv4 := net.ParseIP(host); ipv4 == nil {
// wait for DNS resolution if endpoint is not an IPv4 address
if _, err := net.LookupIP(host); err != nil {
logging.Errorf("host could not be resolved")
return
}
}
// update certificates if the host changed
_, err = con.certCache.AddHost(host)
if err != nil {
return fmt.Errorf("could not update self-signed certificates")
}

// update certificates if the host changed
_, err = certCache.AddHost(host)
if err != nil {
logging.Errorf("could not update self-signed certificates")
return
}
endpoint := fmt.Sprintf("%s:%d", host, port)
logging.Debugf("connector serving on %s", endpoint)

endpoint := fmt.Sprintf("%s:%d", host, port)
logging.Debugf("connector serving on %s", endpoint)
namespaces, err := con.k8s.Namespaces()
if err != nil {
return fmt.Errorf("could not get kubernetes namespaces: %w", err)
}

namespaces, err := k8s.Namespaces()
if err != nil {
logging.Errorf("could not get kubernetes namespaces: %v", err)
return
}
clusterRoles, err := con.k8s.ClusterRoles()
if err != nil {
return fmt.Errorf("could not get kubernetes cluster-roles: %w", err)
}

clusterRoles, err := k8s.ClusterRoles()
switch {
case con.destination.ID == 0:
// TODO: move this warning somewhere earlier in startup
isClusterIP, err := con.k8s.IsServiceTypeClusterIP()
if err != nil {
logging.Errorf("could not get kubernetes cluster-roles: %v", err)
return
logging.Debugf("could not determine service type: %v", err)
}

switch {
case destination.ID == 0:
isClusterIP, err := k8s.IsServiceTypeClusterIP()
if err != nil {
logging.Debugf("could not determine service type: %v", err)
}
if isClusterIP {
logging.Warnf("registering Kubernetes connector with ClusterIP. it may not be externally accessible. if you are experiencing connectivity issues, consider switching to LoadBalancer or Ingress")
}

if isClusterIP {
logging.Warnf("registering Kubernetes connector with ClusterIP. it may not be externally accessible. if you are experiencing connectivity issues, consider switching to LoadBalancer or Ingress")
}
fallthrough

fallthrough
case !slicesEqual(con.destination.Resources, namespaces):
con.destination.Resources = namespaces
fallthrough

case !slicesEqual(destination.Resources, namespaces):
destination.Resources = namespaces
fallthrough
case !slicesEqual(con.destination.Roles, clusterRoles):
con.destination.Roles = clusterRoles
fallthrough

case !slicesEqual(destination.Roles, clusterRoles):
destination.Roles = clusterRoles
fallthrough
case string(con.destination.Connection.CA) != string(con.options.CACert):
con.destination.Connection.CA = api.PEM(con.options.CACert)
fallthrough

case !bytes.Equal([]byte(destination.Connection.CA), caCertPEM):
destination.Connection.CA = api.PEM(caCertPEM)
fallthrough
case con.destination.Connection.URL != endpoint:
con.destination.Connection.URL = endpoint

case destination.Connection.URL != endpoint:
destination.Connection.URL = endpoint
if err := createOrUpdateDestination(con.client, con.destination); err != nil {
return fmt.Errorf("create or update destination: %w", err)
}
}
return nil
}

if err := createOrUpdateDestination(client, destination); err != nil {
logging.Errorf("initializing destination: %v", err)
return
}
func syncWithServer(con connector) func(context.Context) {
return func(context.Context) {
if err := syncDestination(con); err != nil {
logging.Errorf("failed to update destination in infra: %v", err)
return
}

grants, err := client.ListGrants(api.ListGrantsRequest{Resource: destination.Name})
grants, err := con.client.ListGrants(api.ListGrantsRequest{Resource: con.destination.Name})
if err != nil {
logging.Errorf("error listing grants: %v", err)
return
}

namespaces, err := con.k8s.Namespaces()
if err != nil {
logging.Errorf("could not get kubernetes namespaces: %v", err)
return
}

// TODO(https://github.com/infrahq/infra/issues/2422): support wildcard resource searches
for _, n := range namespaces {
g, err := client.ListGrants(api.ListGrantsRequest{Resource: fmt.Sprintf("%s.%s", destination.Name, n)})
g, err := con.client.ListGrants(api.ListGrantsRequest{Resource: fmt.Sprintf("%s.%s", con.destination.Name, n)})
if err != nil {
logging.Errorf("error listing grants: %v", err)
return
Expand All @@ -354,7 +368,7 @@ func syncWithServer(k8s *kubernetes.Kubernetes, client *api.Client, destination
grants.Items = append(grants.Items, g.Items...)
}

err = updateRoles(client, k8s, grants.Items)
err = updateRoles(con.client, con.k8s, grants.Items)
if err != nil {
logging.Errorf("error updating grants: %v", err)
return
Expand Down Expand Up @@ -485,7 +499,6 @@ func updateDestination(client *api.Client, local *api.Destination) error {
if _, err := client.UpdateDestination(request); err != nil {
return fmt.Errorf("error updating existing destination: %w", err)
}

return nil
}

Expand Down
64 changes: 19 additions & 45 deletions internal/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/infrahq/secrets"
"github.com/jessevdk/go-flags"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand All @@ -28,40 +27,23 @@ import (
"github.com/infrahq/infra/internal/logging"
)

const (
podLabelsFilePath = "/etc/podinfo/labels"
namespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
caFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)

// Kubernetes provides access to the kubernetes API.
type Kubernetes struct {
Config *rest.Config
SecretReader secrets.SecretStorage
Config *rest.Config
}

func NewKubernetes() (*Kubernetes, error) {
k := &Kubernetes{}

config, err := rest.InClusterConfig()
if err != nil {
return k, err
}

k.Config = config

namespace, err := Namespace()
if err != nil {
return k, err
}

clientset, err := kubernetes.NewForConfig(k.Config)
if err != nil {
return nil, err
}

k.SecretReader = secrets.NewKubernetesSecretProvider(clientset, namespace)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field was not used anywhere, so I was able to remove it.

if err := rest.LoadTLSFiles(config); err != nil {
return nil, fmt.Errorf("load TLS files: %w", err)
}

return k, err
k := &Kubernetes{Config: config}
return k, nil
}

// namespaceRole is used as a tuple to pair namespaces and grants as a map key
Expand Down Expand Up @@ -461,16 +443,13 @@ func (k *Kubernetes) kubeControllerManagerClusterName() (string, error) {
return opts.ClusterName, nil
}

func (k *Kubernetes) Checksum() (string, error) {
ca, err := CA()
if err != nil {
return "", err
}

// Checksum returns a sha256 hash of the PEM encoded CA certificate used for
// TLS by this kubernetes cluster.
func (k *Kubernetes) Checksum() string {
h := sha256.New()
h.Write(ca)
h.Write(k.Config.CAData)
hash := h.Sum(nil)
return hex.EncodeToString(hash), nil
return hex.EncodeToString(hash)
}

func (k *Kubernetes) Name(chksm string) (string, error) {
Expand Down Expand Up @@ -501,6 +480,8 @@ func (k *Kubernetes) Name(chksm string) (string, error) {
return name, nil
}

const podLabelsFilePath = "/etc/podinfo/labels"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this constant to next to where it is used. This helps made code more readable by reducing the need to scroll around the file when reading this function, or looking for functions that use the constant.

I know some styles encourage putting all the constants at the top of the file, but I think often that's not the best choice. It's fine if the constants are for something like an enum that will be used all over the place. For constants like this that are only used by a single function, and are unlikely to ever be used in another place, I think it works much better to keep it close to the function where it is used.


func PodLabels() ([]string, error) {
contents, err := ioutil.ReadFile(podLabelsFilePath)
if err != nil {
Expand Down Expand Up @@ -528,32 +509,25 @@ func InstancePodLabels() ([]string, error) {
return instanceLabels, nil
}

func Namespace() (string, error) {
const namespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

func readNamespaceFromInClusterFile() (string, error) {
contents, err := ioutil.ReadFile(namespaceFilePath)
if err != nil {
return "", err
return "", fmt.Errorf("failed to read namespace file: %w", err)
}

return string(contents), nil
}

func CA() ([]byte, error) {
contents, err := ioutil.ReadFile(caFilePath)
if err != nil {
return nil, err
}

return contents, nil
}

// Find the first suitable Service, filtering on infrahq.com/component
func (k *Kubernetes) Service(component string, labels ...string) (*corev1.Service, error) {
clientset, err := kubernetes.NewForConfig(k.Config)
if err != nil {
return nil, err
}

namespace, err := Namespace()
namespace, err := readNamespaceFromInClusterFile()
if err != nil {
return nil, err
}
Expand Down