Skip to content

Commit

Permalink
maintain: extract syncDestination from syncWithServer
Browse files Browse the repository at this point in the history
  • Loading branch information
dnephin committed Oct 3, 2022
1 parent 20e9a3a commit 21f51cb
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 57 deletions.
118 changes: 63 additions & 55 deletions internal/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,76 +270,79 @@ func httpTransportFromOptions(opts ServerOptions) *http.Transport {
return transport
}

func syncWithServer(con connector) func(context.Context) {
return func(context.Context) {
host, port, err := con.k8s.Endpoint()
if err != nil {
logging.Errorf("failed to lookup endpoint: %v", err)
return
}
func syncDestination(con connector) error {
host, port, err := con.k8s.Endpoint()
if err != nil {
return fmt.Errorf("failed to lookup endpoint: %w", err)
}

if ipv4 := net.ParseIP(host); ipv4 == nil {
// wait for DNS resolution if endpoint is not an IPv4 address
if _, err := net.LookupIP(host); err != nil {
logging.Errorf("host could not be resolved")
return
}
if ipv4 := net.ParseIP(host); ipv4 == nil {
// wait for DNS resolution if endpoint is not an IPv4 address
if _, err := net.LookupIP(host); err != nil {
return fmt.Errorf("host could not be resolved: %w", err)
}
}

// update certificates if the host changed
_, err = con.certCache.AddHost(host)
if err != nil {
logging.Errorf("could not update self-signed certificates")
return
}
// update certificates if the host changed
_, err = con.certCache.AddHost(host)
if err != nil {
return fmt.Errorf("could not update self-signed certificates")
}

endpoint := fmt.Sprintf("%s:%d", host, port)
logging.Debugf("connector serving on %s", endpoint)
endpoint := fmt.Sprintf("%s:%d", host, port)
logging.Debugf("connector serving on %s", endpoint)

namespaces, err := con.k8s.Namespaces()
if err != nil {
logging.Errorf("could not get kubernetes namespaces: %v", err)
return
}
namespaces, err := con.k8s.Namespaces()
if err != nil {
return fmt.Errorf("could not get kubernetes namespaces: %w", err)
}

clusterRoles, err := con.k8s.ClusterRoles()
clusterRoles, err := con.k8s.ClusterRoles()
if err != nil {
return fmt.Errorf("could not get kubernetes cluster-roles: %w", err)
}

switch {
case con.destination.ID == 0:
// TODO: move this warning somewhere earlier in startup
isClusterIP, err := con.k8s.IsServiceTypeClusterIP()
if err != nil {
logging.Errorf("could not get kubernetes cluster-roles: %v", err)
return
logging.Debugf("could not determine service type: %v", err)
}

switch {
case con.destination.ID == 0:
isClusterIP, err := con.k8s.IsServiceTypeClusterIP()
if err != nil {
logging.Debugf("could not determine service type: %v", err)
}
if isClusterIP {
logging.Warnf("registering Kubernetes connector with ClusterIP. it may not be externally accessible. if you are experiencing connectivity issues, consider switching to LoadBalancer or Ingress")
}

if isClusterIP {
logging.Warnf("registering Kubernetes connector with ClusterIP. it may not be externally accessible. if you are experiencing connectivity issues, consider switching to LoadBalancer or Ingress")
}
fallthrough

fallthrough
case !slicesEqual(con.destination.Resources, namespaces):
con.destination.Resources = namespaces
fallthrough

case !slicesEqual(con.destination.Resources, namespaces):
con.destination.Resources = namespaces
fallthrough
case !slicesEqual(con.destination.Roles, clusterRoles):
con.destination.Roles = clusterRoles
fallthrough

case !slicesEqual(con.destination.Roles, clusterRoles):
con.destination.Roles = clusterRoles
fallthrough
case string(con.destination.Connection.CA) != string(con.options.CACert):
con.destination.Connection.CA = api.PEM(con.options.CACert)
fallthrough

case string(con.destination.Connection.CA) != string(con.options.CACert):
con.destination.Connection.CA = api.PEM(con.options.CACert)
fallthrough
case con.destination.Connection.URL != endpoint:
con.destination.Connection.URL = endpoint

case con.destination.Connection.URL != endpoint:
con.destination.Connection.URL = endpoint
if err := createOrUpdateDestination(con.client, con.destination); err != nil {
return fmt.Errorf("create or update destination: %w", err)
}
}
return nil
}

if err := createOrUpdateDestination(con.client, con.destination); err != nil {
logging.Errorf("initializing destination: %v", err)
return
}
func syncWithServer(con connector) func(context.Context) {
return func(context.Context) {
if err := syncDestination(con); err != nil {
logging.Errorf("failed to update destination in infra: %v", err)
return
}

grants, err := con.client.ListGrants(api.ListGrantsRequest{Resource: con.destination.Name})
Expand All @@ -348,6 +351,12 @@ func syncWithServer(con connector) func(context.Context) {
return
}

namespaces, err := con.k8s.Namespaces()
if err != nil {
logging.Errorf("could not get kubernetes namespaces: %v", err)
return
}

// TODO(https://github.com/infrahq/infra/issues/2422): support wildcard resource searches
for _, n := range namespaces {
g, err := con.client.ListGrants(api.ListGrantsRequest{Resource: fmt.Sprintf("%s.%s", con.destination.Name, n)})
Expand Down Expand Up @@ -490,7 +499,6 @@ func updateDestination(client *api.Client, local *api.Destination) error {
if _, err := client.UpdateDestination(request); err != nil {
return fmt.Errorf("error updating existing destination: %w", err)
}

return nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewKubernetes() (*Kubernetes, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
cfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil)
if config, err = cfg.ClientConfig(); err != nil {
return nil, fmt.Errorf("%v", err)
return nil, fmt.Errorf("load kube config %w", err)
}
}

Expand Down Expand Up @@ -521,7 +521,7 @@ const namespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespa
func readNamespaceFromInClusterFile() (string, error) {
contents, err := ioutil.ReadFile(namespaceFilePath)
if err != nil {
return "", fmt.Errorf("failed to read namespace file: %v", err)
return "", fmt.Errorf("failed to read namespace file: %w", err)
}

return string(contents), nil
Expand Down

0 comments on commit 21f51cb

Please sign in to comment.