Skip to content

Commit

Permalink
Multi-cluster fixes
Browse files Browse the repository at this point in the history
- Avoid IP address conflicts by running each cluser & namespace with its own IP address
- Include context in the hash key used to store services, so that services with the same name & namespace from different clusters can co-exist
- Use the same naming scheme for services regardless of whether they are the first cluster on the list or not
  • Loading branch information
dobesv committed Aug 29, 2020
1 parent 19fc921 commit 0ff8add
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 71 deletions.
17 changes: 8 additions & 9 deletions cmd/kubefwd/services/services.go
Expand Up @@ -245,6 +245,9 @@ Try:

nsWatchesDone := &sync.WaitGroup{} // We'll wait on this to exit the program. Done() indicates that all namespace watches have shutdown cleanly.

// ShortName field only used if one namespace/context
useFullName := len(namespaces) > 1 || len(contexts) > 1

for i, ctx := range contexts {
// k8s REST config
restConfig, err := configGetter.GetRestConfig(cfgFilePath, ctx)
Expand Down Expand Up @@ -273,8 +276,7 @@ Try:

for ii, namespace := range namespaces {
nsWatchesDone.Add(1)
go func(ii int, namespace string) {
// ShortName field only use short name for the first namespace and context
go func(ctx string, namespace string, ipC int, ipD int) {
nameSpaceOpts := NamespaceOpts{
ClientSet: clientSet,
Context: ctx,
Expand All @@ -284,16 +286,15 @@ Try:
Hostfile: &fwdport.HostFileWithLock{Hosts: hostFile},
ClientConfig: restConfig,
RESTClient: restClient,
ShortName: i < 1 && ii < 1,
Remote: i > 0,
IpC: byte(ipC + ii),
ShortName: !useFullName,
IpC: byte(ipC),
IpD: ipD,
Domain: domain,
ManualStopChannel: stopListenCh,
}
nameSpaceOpts.watchServiceEvents(stopListenCh)
nsWatchesDone.Done()
}(ii, namespace)
}(ctx, namespace, ipC+i, ipD+ii)
}
}

Expand All @@ -316,7 +317,6 @@ type NamespaceOpts struct {
ClientConfig *restclient.Config
RESTClient *restclient.RESTClient
ShortName bool
Remote bool
IpC byte
IpD int
Domain string
Expand Down Expand Up @@ -357,7 +357,7 @@ func (opts *NamespaceOpts) watchServiceEvents(stopListenCh <-chan struct{}) {

// Start the informer, blocking call until we receive a stop signal
controller.Run(stopListenCh)
log.Infof("Stopped watching Service events in namespace %s", opts.Namespace)
log.Infof("Stopped watching Service events in namespace %s, context %s", opts.Namespace, opts.Context)
}

// AddServiceHandler is the event handler for when a new service comes in from k8s (the initial list of services will also be coming in using this event for each).
Expand All @@ -383,7 +383,6 @@ func (opts *NamespaceOpts) AddServiceHandler(obj interface{}) {
ClientConfig: opts.ClientConfig,
RESTClient: opts.RESTClient,
ShortName: opts.ShortName,
Remote: opts.Remote,
IpC: opts.IpC,
IpD: &opts.IpD,
Domain: opts.Domain,
Expand Down
1 change: 0 additions & 1 deletion cmd/kubefwd/services/services_test.go
Expand Up @@ -123,7 +123,6 @@ func buildFwdServiceOpts(t *testing.T, namespace string) *FwdServiceOpts {
ClientConfig: restConfig,
RESTClient: restClient,
ShortName: true,
Remote: false,
IpC: byte(ipC),
IpD: ipD,
ExitOnFail: exitOnFail,
Expand Down
69 changes: 25 additions & 44 deletions pkg/fwdport/fwdport.go
Expand Up @@ -56,7 +56,6 @@ type PortForwardOpts struct {
LocalPort string
Hostfile *HostFileWithLock
ShortName bool
Remote bool
Domain string
HostsParams *HostsParams
ManualStopChan chan struct{} // Send a signal on this to stop the portforwarding
Expand Down Expand Up @@ -154,11 +153,8 @@ func (pfo *PortForwardOpts) BuildTheHostsParams() {
pfo.HostsParams = &HostsParams{}
localServiceName := pfo.Service
nsServiceName := pfo.Service + "." + pfo.Namespace
fullServiceName := fmt.Sprintf("%s.%s.svc.cluster.local", pfo.Service, pfo.Namespace)
fullServiceName := fmt.Sprintf("%s.%s.svc.cluster.%s", pfo.Service, pfo.Namespace, pfo.Context)
svcServiceName := fmt.Sprintf("%s.%s.svc", pfo.Service, pfo.Namespace)
if pfo.Remote {
fullServiceName = fmt.Sprintf("%s.%s.svc.cluster.%s", pfo.Service, pfo.Namespace, pfo.Context)
}
pfo.HostsParams.localServiceName = localServiceName
pfo.HostsParams.nsServiceName = nsServiceName
pfo.HostsParams.fullServiceName = fullServiceName
Expand All @@ -169,40 +165,27 @@ func (pfo *PortForwardOpts) BuildTheHostsParams() {
func (pfo *PortForwardOpts) AddHosts() {

pfo.Hostfile.Lock()
if pfo.Remote {

pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.fullServiceName)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.svcServiceName)
if pfo.ShortName {
if pfo.Domain != "" {
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.Service+"."+pfo.Domain)
}
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.Service)

} else {

if pfo.ShortName {
if pfo.Domain != "" {
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName + "." + pfo.Domain)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.localServiceName+"."+pfo.Domain)
}
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.localServiceName)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName + "." + pfo.Domain)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.localServiceName+"."+pfo.Domain)
}
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.localServiceName)
}

pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.fullServiceName)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.fullServiceName)

pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.svcServiceName)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.svcServiceName)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.fullServiceName)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.fullServiceName)

if pfo.Domain != "" {
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName + "." + pfo.Domain)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.nsServiceName+"."+pfo.Domain)
}
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.nsServiceName)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.svcServiceName)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.svcServiceName)

if pfo.Domain != "" {
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName + "." + pfo.Domain)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.nsServiceName+"."+pfo.Domain)
}
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName)
pfo.Hostfile.Hosts.AddHost(pfo.LocalIp.String(), pfo.HostsParams.nsServiceName)
err := pfo.Hostfile.Hosts.Save()
if err != nil {
log.Error("Error saving hosts file", err)
Expand All @@ -223,18 +206,16 @@ func (pfo *PortForwardOpts) removeHosts() {
return
}

if !pfo.Remote {
if pfo.Domain != "" {
// fmt.Printf("removeHost: %s\r\n", (pfo.HostsParams.localServiceName + "." + pfo.Domain))
// fmt.Printf("removeHost: %s\r\n", (pfo.HostsParams.nsServiceName + "." + pfo.Domain))
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName + "." + pfo.Domain)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName + "." + pfo.Domain)
}
// fmt.Printf("removeHost: %s\r\n", pfo.HostsParams.localServiceName)
// fmt.Printf("removeHost: %s\r\n", pfo.HostsParams.nsServiceName)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName)
if pfo.Domain != "" {
// fmt.Printf("removeHost: %s\r\n", (pfo.HostsParams.localServiceName + "." + pfo.Domain))
// fmt.Printf("removeHost: %s\r\n", (pfo.HostsParams.nsServiceName + "." + pfo.Domain))
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName + "." + pfo.Domain)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName + "." + pfo.Domain)
}
// fmt.Printf("removeHost: %s\r\n", pfo.HostsParams.localServiceName)
// fmt.Printf("removeHost: %s\r\n", pfo.HostsParams.nsServiceName)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.localServiceName)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.nsServiceName)
// fmt.Printf("removeHost: %s\r\n", pfo.HostsParams.fullServiceName)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.fullServiceName)
pfo.Hostfile.Hosts.RemoveHost(pfo.HostsParams.svcServiceName)
Expand Down
22 changes: 5 additions & 17 deletions pkg/fwdservice/fwdservice.go
Expand Up @@ -29,7 +29,6 @@ type ServiceFWD struct {
ClientConfig *restclient.Config
RESTClient *restclient.RESTClient
ShortName bool
Remote bool
IpC byte
IpD *int
Domain string
Expand All @@ -44,7 +43,7 @@ type ServiceFWD struct {
}

func (svcFwd *ServiceFWD) String() string {
return svcFwd.Svc.Name + "." + svcFwd.Namespace
return svcFwd.Svc.Name + "." + svcFwd.Namespace + "." + svcFwd.Context
}

// GetPodsForService queries k8s and returns all pods backing this service
Expand Down Expand Up @@ -189,25 +188,15 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost

svcName = serviceHostName

if !svcfwd.ShortName {
serviceHostName = serviceHostName + "." + pod.Namespace
}

if svcfwd.Domain != "" {
serviceHostName = serviceHostName + "." + svcfwd.Domain
}

if svcfwd.Remote {
serviceHostName = fmt.Sprintf("%s.svc.cluster.%s", serviceHostName, svcfwd.Context)
}

log.Debugf("Resolving: %s to %s\n",
serviceHostName,
localIp.String(),
)

log.Printf("Port-Forward: %s:%d to pod %s:%s\n",
serviceHostName,
log.Printf("Port-Forward: %s.%s.svc.cluster.%s:%d to pod %s:%s\n",
svcfwd.Svc.Name,
svcfwd.Svc.Namespace,
svcfwd.Context,
port.Port,
pod.Name,
podPort,
Expand All @@ -228,7 +217,6 @@ func (svcfwd *ServiceFWD) LoopPodsToForward(pods []v1.Pod, includePodNameInHost
LocalPort: localPort,
Hostfile: svcfwd.Hostfile,
ShortName: svcfwd.ShortName,
Remote: svcfwd.Remote,
Domain: svcfwd.Domain,

ManualStopChan: make(chan struct{}),
Expand Down

0 comments on commit 0ff8add

Please sign in to comment.