Skip to content

Commit

Permalink
maintain: extract syncDestination from syncWithServer
Browse files Browse the repository at this point in the history
  • Loading branch information
dnephin committed Sep 28, 2022
1 parent c81afd8 commit 68c834e
Showing 1 changed file with 63 additions and 55 deletions.
118 changes: 63 additions & 55 deletions internal/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,76 +283,79 @@ func httpTransportFromOptions(opts ServerOptions) *http.Transport {
return transport
}

func syncWithServer(con connector) func(context.Context) {
return func(context.Context) {
host, port, err := con.k8s.Endpoint()
if err != nil {
logging.Errorf("failed to lookup endpoint: %v", err)
return
}
func syncDestination(con connector) error {
host, port, err := con.k8s.Endpoint()
if err != nil {
return fmt.Errorf("failed to lookup endpoint: %w", err)
}

if ipv4 := net.ParseIP(host); ipv4 == nil {
// wait for DNS resolution if endpoint is not an IPv4 address
if _, err := net.LookupIP(host); err != nil {
logging.Errorf("host could not be resolved")
return
}
if ipv4 := net.ParseIP(host); ipv4 == nil {
// wait for DNS resolution if endpoint is not an IPv4 address
if _, err := net.LookupIP(host); err != nil {
return fmt.Errorf("host could not be resolved: %w", err)
}
}

// update certificates if the host changed
_, err = con.certCache.AddHost(host)
if err != nil {
logging.Errorf("could not update self-signed certificates")
return
}
// update certificates if the host changed
_, err = con.certCache.AddHost(host)
if err != nil {
return fmt.Errorf("could not update self-signed certificates")
}

endpoint := fmt.Sprintf("%s:%d", host, port)
logging.Debugf("connector serving on %s", endpoint)
endpoint := fmt.Sprintf("%s:%d", host, port)
logging.Debugf("connector serving on %s", endpoint)

namespaces, err := con.k8s.Namespaces()
if err != nil {
logging.Errorf("could not get kubernetes namespaces: %v", err)
return
}
namespaces, err := con.k8s.Namespaces()
if err != nil {
return fmt.Errorf("could not get kubernetes namespaces: %w", err)
}

clusterRoles, err := con.k8s.ClusterRoles()
clusterRoles, err := con.k8s.ClusterRoles()
if err != nil {
return fmt.Errorf("could not get kubernetes cluster-roles: %w", err)
}

switch {
case con.destination.ID == 0:
// TODO: move this warning somewhere earlier in startup
isClusterIP, err := con.k8s.IsServiceTypeClusterIP()
if err != nil {
logging.Errorf("could not get kubernetes cluster-roles: %v", err)
return
logging.Debugf("could not determine service type: %v", err)
}

switch {
case con.destination.ID == 0:
isClusterIP, err := con.k8s.IsServiceTypeClusterIP()
if err != nil {
logging.Debugf("could not determine service type: %v", err)
}
if isClusterIP {
logging.Warnf("registering Kubernetes connector with ClusterIP. it may not be externally accessible. if you are experiencing connectivity issues, consider switching to LoadBalancer or Ingress")
}

if isClusterIP {
logging.Warnf("registering Kubernetes connector with ClusterIP. it may not be externally accessible. if you are experiencing connectivity issues, consider switching to LoadBalancer or Ingress")
}
fallthrough

fallthrough
case !slicesEqual(con.destination.Resources, namespaces):
con.destination.Resources = namespaces
fallthrough

case !slicesEqual(con.destination.Resources, namespaces):
con.destination.Resources = namespaces
fallthrough
case !slicesEqual(con.destination.Roles, clusterRoles):
con.destination.Roles = clusterRoles
fallthrough

case !slicesEqual(con.destination.Roles, clusterRoles):
con.destination.Roles = clusterRoles
fallthrough
case string(con.destination.Connection.CA) != string(con.options.CACert):
con.destination.Connection.CA = api.PEM(con.options.CACert)
fallthrough

case string(con.destination.Connection.CA) != string(con.options.CACert):
con.destination.Connection.CA = api.PEM(con.options.CACert)
fallthrough
case con.destination.Connection.URL != endpoint:
con.destination.Connection.URL = endpoint

case con.destination.Connection.URL != endpoint:
con.destination.Connection.URL = endpoint
if err := createOrUpdateDestination(con.client, con.destination); err != nil {
return fmt.Errorf("create or update destination: %w", err)
}
}
return nil
}

if err := createOrUpdateDestination(con.client, con.destination); err != nil {
logging.Errorf("initializing destination: %v", err)
return
}
func syncWithServer(con connector) func(context.Context) {
return func(context.Context) {
if err := syncDestination(con); err != nil {
logging.Errorf("failed to update destination in infra: %v", err)
return
}

grants, err := con.client.ListGrants(api.ListGrantsRequest{Resource: con.destination.Name})
Expand All @@ -361,6 +364,12 @@ func syncWithServer(con connector) func(context.Context) {
return
}

namespaces, err := con.k8s.Namespaces()
if err != nil {
logging.Errorf("could not get kubernetes namespaces: %v", err)
return
}

// TODO(https://github.com/infrahq/infra/issues/2422): support wildcard resource searches
for _, n := range namespaces {
g, err := con.client.ListGrants(api.ListGrantsRequest{Resource: fmt.Sprintf("%s.%s", con.destination.Name, n)})
Expand Down Expand Up @@ -503,7 +512,6 @@ func updateDestination(client *api.Client, local *api.Destination) error {
if _, err := client.UpdateDestination(request); err != nil {
return fmt.Errorf("error updating existing destination: %w", err)
}

return nil
}

Expand Down

0 comments on commit 68c834e

Please sign in to comment.