Skip to content

Commit

Permalink
Retrieve port mapping from existing shadow services
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinpollet committed Jun 2, 2020
1 parent 617298e commit 4deb1b5
Show file tree
Hide file tree
Showing 20 changed files with 559 additions and 606 deletions.
5 changes: 1 addition & 4 deletions cmd/maesh/maesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func maeshCommand(iConfig *cmd.MaeshConfiguration) error {
return fmt.Errorf("unable to create the API server: %w", err)
}

ctr, err := controller.NewMeshController(clients, controller.Config{
ctr := controller.NewMeshController(clients, controller.Config{
ACLEnabled: aclEnabled,
DefaultMode: iConfig.DefaultMode,
Namespace: iConfig.Namespace,
Expand All @@ -117,9 +117,6 @@ func maeshCommand(iConfig *cmd.MaeshConfiguration) error {
MinUDPPort: minUDPPort,
MaxUDPPort: minUDPPort + iConfig.LimitUDPPort,
}, apiServer, log)
if err != nil {
return fmt.Errorf("unable to create controller: %w", err)
}

var wg sync.WaitGroup

Expand Down
26 changes: 0 additions & 26 deletions helm/chart/maesh/templates/controller/controller-configmap.yaml

This file was deleted.

1 change: 0 additions & 1 deletion integration/acl_disabled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func (s *ACLDisabledSuite) SetUpSuite(c *check.C) {
}
s.startk3s(c, requiredImages)
s.startAndWaitForCoreDNS(c)
s.createResources(c, "testdata/state-table/")
s.createResources(c, "testdata/smi/crds/")
}

Expand Down
1 change: 0 additions & 1 deletion integration/acl_enabled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func (s *ACLEnabledSuite) SetUpSuite(c *check.C) {
}
s.startk3s(c, requiredImages)
s.startAndWaitForCoreDNS(c)
s.createResources(c, "testdata/state-table/")
s.createResources(c, "testdata/smi/crds/")
}

Expand Down
1 change: 0 additions & 1 deletion integration/coredns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func (s *CoreDNSSuite) SetUpSuite(c *check.C) {
s.startk3s(c, requiredImages)
s.startWhoami(c)
s.installTinyToolsMaesh(c)
s.createResources(c, "testdata/state-table/")
s.createResources(c, "testdata/smi/crds/")
}

Expand Down
1 change: 0 additions & 1 deletion integration/kubedns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func (s *KubeDNSSuite) SetUpSuite(c *check.C) {

s.startWhoami(c)
s.installTinyToolsMaesh(c)
s.createResources(c, "testdata/state-table/")
s.createResources(c, "testdata/smi/crds/")
}

Expand Down
11 changes: 0 additions & 11 deletions integration/testdata/state-table/tcp-configmap.yml

This file was deleted.

11 changes: 0 additions & 11 deletions integration/testdata/state-table/udp-configmap.yaml

This file was deleted.

104 changes: 56 additions & 48 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ const (
maxRetries = 12
)

// PortMapper is capable of storing and retrieving a port mapping for a given service.
type PortMapper interface {
Find(svc k8s.ServiceWithPort) (int32, bool)
Add(svc *k8s.ServiceWithPort) (int32, error)
Remove(svc k8s.ServiceWithPort) (int32, error)
}

// SharedStore is used to share the controller state.
type SharedStore interface {
SetConfig(cfg *dynamic.Configuration)
Expand Down Expand Up @@ -78,8 +71,8 @@ type Controller struct {
shadowServiceManager *ShadowServiceManager
provider *provider.Provider
ignoredResources k8s.IgnoreWrapper
tcpStateTable PortMapper
udpStateTable PortMapper
tcpStateTable *PortMapping
udpStateTable *PortMapping
topologyBuilder TopologyBuilder
store SharedStore
logger logrus.FieldLogger
Expand All @@ -100,49 +93,24 @@ type Controller struct {

// NewMeshController builds the informers and other required components of the mesh controller, and returns an
// initialized mesh controller object.
func NewMeshController(clients k8s.Client, cfg Config, store SharedStore, logger logrus.FieldLogger) (*Controller, error) {
ignoredResources := k8s.NewIgnored()

for _, ns := range cfg.IgnoreNamespaces {
ignoredResources.AddIgnoredNamespace(ns)
}

ignoredResources.AddIgnoredService("kubernetes", metav1.NamespaceDefault)
ignoredResources.AddIgnoredNamespace(metav1.NamespaceSystem)
ignoredResources.AddIgnoredApps("maesh", "jaeger")

tcpStateTable, err := k8s.NewPortMapping(clients.KubernetesClient(), cfg.Namespace, k8s.TCPStateConfigMapName, cfg.MinTCPPort, cfg.MaxTCPPort)
if err != nil {
return nil, err
}

udpStateTable, err := k8s.NewPortMapping(clients.KubernetesClient(), cfg.Namespace, k8s.UDPStateConfigMapName, cfg.MinUDPPort, cfg.MaxUDPPort)
if err != nil {
return nil, err
}

func NewMeshController(clients k8s.Client, cfg Config, store SharedStore, logger logrus.FieldLogger) *Controller {
c := &Controller{
logger: logger,
cfg: cfg,
clients: clients,
ignoredResources: ignoredResources,
tcpStateTable: tcpStateTable,
udpStateTable: udpStateTable,
store: store,
logger: logger,
cfg: cfg,
clients: clients,
store: store,
}

c.init()
// Initialize the ignored resources.
c.ignoredResources = k8s.NewIgnored()

return c, nil
}
for _, ns := range cfg.IgnoreNamespaces {
c.ignoredResources.AddIgnoredNamespace(ns)
}

func (c *Controller) init() {
// Create SharedInformers for non-ACL related resources.
c.kubernetesFactory = informers.NewSharedInformerFactoryWithOptions(c.clients.KubernetesClient(), k8s.ResyncPeriod)
c.splitFactory = splitinformer.NewSharedInformerFactoryWithOptions(c.clients.SplitClient(), k8s.ResyncPeriod)

c.serviceLister = c.kubernetesFactory.Core().V1().Services().Lister()
c.shadowServiceManager = NewShadowServiceManager(c.logger, c.serviceLister, c.cfg.Namespace, c.tcpStateTable, c.udpStateTable, c.cfg.DefaultMode, c.cfg.MinHTTPPort, c.cfg.MaxHTTPPort, c.clients.KubernetesClient())
c.ignoredResources.AddIgnoredService("kubernetes", metav1.NamespaceDefault)
c.ignoredResources.AddIgnoredNamespace(metav1.NamespaceSystem)
c.ignoredResources.AddIgnoredApps("maesh", "jaeger")

// Create the work queue and the enqueue handler.
c.workQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
Expand All @@ -151,9 +119,13 @@ func (c *Controller) init() {
Handler: &enqueueWorkHandler{logger: c.logger, workQueue: c.workQueue},
}

// Create listers and register the event handler to informers that are not ACL related.
// Create SharedInformers, listers and register the event handler to informers that are not ACL related.
c.kubernetesFactory = informers.NewSharedInformerFactoryWithOptions(c.clients.KubernetesClient(), k8s.ResyncPeriod)
c.splitFactory = splitinformer.NewSharedInformerFactoryWithOptions(c.clients.SplitClient(), k8s.ResyncPeriod)

c.podLister = c.kubernetesFactory.Core().V1().Pods().Lister()
c.endpointsLister = c.kubernetesFactory.Core().V1().Endpoints().Lister()
c.serviceLister = c.kubernetesFactory.Core().V1().Services().Lister()
c.trafficSplitLister = c.splitFactory.Split().V1alpha2().TrafficSplits().Lister()

c.kubernetesFactory.Core().V1().Services().Informer().AddEventHandler(handler)
Expand All @@ -175,6 +147,22 @@ func (c *Controller) init() {
c.specsFactory.Specs().V1alpha1().TCPRoutes().Informer().AddEventHandler(handler)
}

c.tcpStateTable = NewPortMapping(c.cfg.Namespace, c.serviceLister, logger, c.cfg.MinTCPPort, c.cfg.MaxTCPPort)

c.udpStateTable = NewPortMapping(c.cfg.Namespace, c.serviceLister, logger, c.cfg.MinUDPPort, c.cfg.MaxUDPPort)

c.shadowServiceManager = NewShadowServiceManager(
c.logger,
c.serviceLister,
c.cfg.Namespace,
c.tcpStateTable,
c.udpStateTable,
c.cfg.DefaultMode,
c.cfg.MinHTTPPort,
c.cfg.MaxHTTPPort,
c.clients.KubernetesClient(),
)

c.topologyBuilder = &topology.Builder{
ServiceLister: c.serviceLister,
EndpointsLister: c.endpointsLister,
Expand All @@ -195,6 +183,8 @@ func (c *Controller) init() {
}

c.provider = provider.New(c.tcpStateTable, c.udpStateTable, annotations.BuildMiddlewares, providerCfg, c.logger)

return c
}

// Run is the main entrypoint for the controller.
Expand All @@ -212,6 +202,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
return fmt.Errorf("could not start informers: %w", err)
}

// Load the TCP and UDP port mapper states.
if err := c.loadPortMappersState(); err != nil {
return fmt.Errorf("could not load port mapper states: %w", err)
}

// Enable API readiness endpoint, informers are started and default conf is available.
c.store.SetReadiness(true)

Expand Down Expand Up @@ -286,6 +281,19 @@ func (c *Controller) startACLInformers(ctx context.Context, stopCh <-chan struct
return nil
}

// loadPortMappersState loads the TCP and UDP port mapper states.
func (c *Controller) loadPortMappersState() error {
if err := c.tcpStateTable.LoadState(); err != nil {
return fmt.Errorf("unable to load TCP state table: %w", err)
}

if err := c.udpStateTable.LoadState(); err != nil {
return fmt.Errorf("unable to load UDP state table: %w", err)
}

return nil
}

// isWatchedResource returns true if the given resource is not ignored, false otherwise.
func (c *Controller) isWatchedResource(obj interface{}) bool {
accessor, err := meta.Accessor(obj)
Expand Down
7 changes: 2 additions & 5 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -41,7 +40,7 @@ func TestController_NewMeshController(t *testing.T) {
log.SetLevel(logrus.DebugLevel)

// Create a new controller with base HTTP mode.
controller, err := NewMeshController(clientMock, Config{
controller := NewMeshController(clientMock, Config{
ACLEnabled: false,
DefaultMode: "http",
Namespace: meshNamespace,
Expand All @@ -54,7 +53,6 @@ func TestController_NewMeshController(t *testing.T) {
MaxUDPPort: maxUDPPort,
}, store, log)

require.NoError(t, err)
assert.NotNil(t, controller)
}

Expand All @@ -70,7 +68,7 @@ func TestController_NewMeshControllerWithSMI(t *testing.T) {
log.SetLevel(logrus.DebugLevel)

// Create a new controller with base HTTP mode, in SMI mode.
controller, err := NewMeshController(clientMock, Config{
controller := NewMeshController(clientMock, Config{
ACLEnabled: true,
DefaultMode: "http",
Namespace: meshNamespace,
Expand All @@ -83,6 +81,5 @@ func TestController_NewMeshControllerWithSMI(t *testing.T) {
MaxUDPPort: maxUDPPort,
}, store, log)

require.NoError(t, err)
assert.NotNil(t, controller)
}

0 comments on commit 4deb1b5

Please sign in to comment.