Skip to content

Commit

Permalink
improve: add connector advertise addr
Browse files Browse the repository at this point in the history
  • Loading branch information
dnephin committed Jul 15, 2022
1 parent ee0ff31 commit 40f3218
Showing 1 changed file with 39 additions and 14 deletions.
53 changes: 39 additions & 14 deletions internal/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/httputil"
"os"
"strconv"
"strings"
"time"

Expand All @@ -31,11 +32,12 @@ import (
)

type Options struct {
Server ServerOptions
Name string
CACert string // TODO: types.StringOrFile
CAKey string // TODO: types.StringOrFile
Addr string
Server ServerOptions
Name string
CACert string // TODO: types.StringOrFile
CAKey string // TODO: types.StringOrFile
Addr string
AdvertiseAddr string
}

type ServerOptions struct {
Expand Down Expand Up @@ -93,7 +95,8 @@ func Run(ctx context.Context, options Options) error {
}

// server is localhost which should never be the case. try to infer the actual host
if strings.HasPrefix(u.Host, "localhost") {
// TODO: document this change
if u.Host == "" {
server, err := k8s.Service("server")
if err != nil {
logging.Warnf("no cluster-local infra server found for %q. check connector configurations", u.Host)
Expand Down Expand Up @@ -138,7 +141,7 @@ func Run(ctx context.Context, options Options) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

repeat.Start(ctx, 5*time.Second, syncWithServer(k8s, client, destination, certCache, caCertPEM))
repeat.Start(ctx, 5*time.Second, syncWithServer(k8s, client, destination, certCache, caCertPEM, options))

ginutil.SetMode()
router := gin.New()
Expand Down Expand Up @@ -220,13 +223,35 @@ func httpTransportFromOptions(opts ServerOptions) *http.Transport {
return transport
}

func syncWithServer(k8s *kubernetes.Kubernetes, client *api.Client, destination *api.Destination, certCache *CertCache, caCertPEM []byte) func(context.Context) {

func syncWithServer(
k8s *kubernetes.Kubernetes,
client *api.Client,
destination *api.Destination,
certCache *CertCache,
caCertPEM []byte,
opts Options,
) func(context.Context) {
return func(context.Context) {
host, port, err := k8s.Endpoint()
if err != nil {
logging.Errorf("failed to lookup endpoint: %v", err)
return
var host string
var port string
var err error

if opts.AdvertiseAddr != "" {
host, port, err = net.SplitHostPort(opts.AdvertiseAddr)
if err != nil {
// TODO: validate this before the background goroutine starts
logging.Errorf("bad advertise addr: %v", err)
return
}
} else {
// TODO: cleanup port arg
var intPort int
host, intPort, err = k8s.Endpoint()
if err != nil {
logging.Errorf("failed to lookup endpoint: %v", err)
return
}
port = strconv.Itoa(intPort)
}

if ipv4 := net.ParseIP(host); ipv4 == nil {
Expand All @@ -244,7 +269,7 @@ func syncWithServer(k8s *kubernetes.Kubernetes, client *api.Client, destination
return
}

endpoint := fmt.Sprintf("%s:%d", host, port)
endpoint := fmt.Sprintf("%s:%v", host, port)
logging.Debugf("connector serving on %s", endpoint)

namespaces, err := k8s.Namespaces()
Expand Down

0 comments on commit 40f3218

Please sign in to comment.