Skip to content

Commit

Permalink
Refine the controller readiness status
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinpollet committed May 13, 2020
1 parent e9c0efa commit 3224474
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 116 deletions.
40 changes: 20 additions & 20 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ type Interface interface {

// API is an implementation of an api.
type API struct {
log logrus.FieldLogger
router *mux.Router
readiness bool
lastConfiguration *safe.Safe
apiPort int32
apiHost string
meshNamespace string
podLister listers.PodLister
log logrus.FieldLogger
router *mux.Router
readiness bool
currentConfiguration *safe.Safe
apiPort int32
apiHost string
meshNamespace string
podLister listers.PodLister
}

type podInfo struct {
Expand All @@ -43,15 +43,15 @@ type podInfo struct {
}

// NewAPI creates a new api.
func NewAPI(log logrus.FieldLogger, apiPort int32, apiHost string, lastConfiguration *safe.Safe, podLister listers.PodLister, meshNamespace string) *API {
func NewAPI(log logrus.FieldLogger, apiPort int32, apiHost string, currentConfiguration *safe.Safe, podLister listers.PodLister, meshNamespace string) *API {
a := &API{
log: log,
readiness: false,
lastConfiguration: lastConfiguration,
apiPort: apiPort,
apiHost: apiHost,
podLister: podLister,
meshNamespace: meshNamespace,
log: log,
readiness: false,
currentConfiguration: currentConfiguration,
apiPort: apiPort,
apiHost: apiHost,
podLister: podLister,
meshNamespace: meshNamespace,
}

if err := a.Init(); err != nil {
Expand All @@ -63,7 +63,7 @@ func NewAPI(log logrus.FieldLogger, apiPort int32, apiHost string, lastConfigura

// Init handles any api initialization.
func (a *API) Init() error {
a.log.Debugln("API.Init")
a.log.Debugln("Initializing API")

a.router = mux.NewRouter()

Expand All @@ -77,7 +77,7 @@ func (a *API) Init() error {

// Start runs the API.
func (a *API) Start() {
a.log.Debugln("API.Start")
a.log.Debug("Starting API")

go a.Run()
}
Expand All @@ -90,7 +90,7 @@ func (a *API) Run() {
// EnableReadiness enables the readiness flag in the API.
func (a *API) EnableReadiness() {
if !a.readiness {
a.log.Debug("Controller Readiness enabled")
a.log.Debug("API readiness enabled")

a.readiness = true
}
Expand All @@ -100,7 +100,7 @@ func (a *API) EnableReadiness() {
func (a *API) getCurrentConfiguration(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")

if err := json.NewEncoder(w).Encode(a.lastConfiguration.Get()); err != nil {
if err := json.NewEncoder(w).Encode(a.currentConfiguration.Get()); err != nil {
a.log.Error(err)
}
}
Expand Down
97 changes: 46 additions & 51 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,31 @@ type Config struct {

// Controller hold controller configuration.
type Controller struct {
cfg Config
handler cache.ResourceEventHandler
serviceManager ServiceManager
configRefreshChan chan struct{}
provider *provider.Provider
ignoredResources k8s.IgnoreWrapper
tcpStateTable PortMapper
udpStateTable PortMapper
topologyBuilder TopologyBuilder
lastConfiguration safe.Safe
api api.Interface
logger logrus.FieldLogger
cfg Config
handler cache.ResourceEventHandler
serviceManager ServiceManager
configRefreshChan chan struct{}
provider *provider.Provider
ignoredResources k8s.IgnoreWrapper
tcpStateTable PortMapper
udpStateTable PortMapper
topologyBuilder TopologyBuilder
currentConfiguration *safe.Safe
api api.Interface
logger logrus.FieldLogger

clients k8s.Client
kubernetesFactory informers.SharedInformerFactory
accessFactory accessinformer.SharedInformerFactory
specsFactory specsinformer.SharedInformerFactory
splitFactory splitinformer.SharedInformerFactory
PodLister listers.PodLister
ServiceLister listers.ServiceLister
EndpointsLister listers.EndpointsLister
TrafficTargetLister accesslister.TrafficTargetLister
HTTPRouteGroupLister specslister.HTTPRouteGroupLister
TCPRouteLister specslister.TCPRouteLister
TrafficSplitLister splitlister.TrafficSplitLister
podLister listers.PodLister
serviceLister listers.ServiceLister
endpointsLister listers.EndpointsLister
trafficTargetLister accesslister.TrafficTargetLister
httpRouteGroupLister specslister.HTTPRouteGroupLister
tcpRouteLister specslister.TCPRouteLister
trafficSplitLister splitlister.TrafficSplitLister
}

// NewMeshController is used to build the informers and other required components of the mesh controller,
Expand Down Expand Up @@ -134,8 +134,8 @@ func (c *Controller) init() {
c.kubernetesFactory = informers.NewSharedInformerFactoryWithOptions(c.clients.GetKubernetesClient(), k8s.ResyncPeriod)
c.splitFactory = splitinformer.NewSharedInformerFactoryWithOptions(c.clients.GetSplitClient(), k8s.ResyncPeriod)

c.ServiceLister = c.kubernetesFactory.Core().V1().Services().Lister()
c.serviceManager = NewShadowServiceManager(c.logger, c.ServiceLister, c.cfg.Namespace, c.tcpStateTable, c.udpStateTable, c.cfg.DefaultMode, c.cfg.MinHTTPPort, c.cfg.MaxHTTPPort, c.clients.GetKubernetesClient())
c.serviceLister = c.kubernetesFactory.Core().V1().Services().Lister()
c.serviceManager = NewShadowServiceManager(c.logger, c.serviceLister, c.cfg.Namespace, c.tcpStateTable, c.udpStateTable, c.cfg.DefaultMode, c.cfg.MinHTTPPort, c.cfg.MaxHTTPPort, c.clients.GetKubernetesClient())

// configRefreshChan is used to trigger configuration refreshes.
c.configRefreshChan = make(chan struct{})
Expand All @@ -145,9 +145,9 @@ func (c *Controller) init() {
}

// Create listers and register the event handler to informers that are not ACL related.
c.PodLister = c.kubernetesFactory.Core().V1().Pods().Lister()
c.EndpointsLister = c.kubernetesFactory.Core().V1().Endpoints().Lister()
c.TrafficSplitLister = c.splitFactory.Split().V1alpha2().TrafficSplits().Lister()
c.podLister = c.kubernetesFactory.Core().V1().Pods().Lister()
c.endpointsLister = c.kubernetesFactory.Core().V1().Endpoints().Lister()
c.trafficSplitLister = c.splitFactory.Split().V1alpha2().TrafficSplits().Lister()

c.kubernetesFactory.Core().V1().Services().Informer().AddEventHandler(c.handler)
c.kubernetesFactory.Core().V1().Endpoints().Informer().AddEventHandler(c.handler)
Expand All @@ -158,26 +158,28 @@ func (c *Controller) init() {
c.accessFactory = accessinformer.NewSharedInformerFactoryWithOptions(c.clients.GetAccessClient(), k8s.ResyncPeriod)
c.specsFactory = specsinformer.NewSharedInformerFactoryWithOptions(c.clients.GetSpecsClient(), k8s.ResyncPeriod)

c.TrafficTargetLister = c.accessFactory.Access().V1alpha1().TrafficTargets().Lister()
c.HTTPRouteGroupLister = c.specsFactory.Specs().V1alpha1().HTTPRouteGroups().Lister()
c.TCPRouteLister = c.specsFactory.Specs().V1alpha1().TCPRoutes().Lister()
c.trafficTargetLister = c.accessFactory.Access().V1alpha1().TrafficTargets().Lister()
c.httpRouteGroupLister = c.specsFactory.Specs().V1alpha1().HTTPRouteGroups().Lister()
c.tcpRouteLister = c.specsFactory.Specs().V1alpha1().TCPRoutes().Lister()

c.accessFactory.Access().V1alpha1().TrafficTargets().Informer().AddEventHandler(c.handler)
c.kubernetesFactory.Core().V1().Pods().Informer().AddEventHandler(c.handler)
c.specsFactory.Specs().V1alpha1().HTTPRouteGroups().Informer().AddEventHandler(c.handler)
c.specsFactory.Specs().V1alpha1().TCPRoutes().Informer().AddEventHandler(c.handler)
}

c.api = api.NewAPI(c.logger, c.cfg.APIPort, c.cfg.APIHost, &c.lastConfiguration, c.PodLister, c.cfg.Namespace)
c.currentConfiguration = safe.New(provider.NewDefaultDynamicConfig())

c.api = api.NewAPI(c.logger, c.cfg.APIPort, c.cfg.APIHost, c.currentConfiguration, c.podLister, c.cfg.Namespace)

c.topologyBuilder = &topology.Builder{
ServiceLister: c.ServiceLister,
EndpointsLister: c.EndpointsLister,
PodLister: c.PodLister,
TrafficTargetLister: c.TrafficTargetLister,
TrafficSplitLister: c.TrafficSplitLister,
HTTPRouteGroupLister: c.HTTPRouteGroupLister,
TCPRoutesLister: c.TCPRouteLister,
ServiceLister: c.serviceLister,
EndpointsLister: c.endpointsLister,
PodLister: c.podLister,
TrafficTargetLister: c.trafficTargetLister,
TrafficSplitLister: c.trafficSplitLister,
HTTPRouteGroupLister: c.httpRouteGroupLister,
TCPRoutesLister: c.tcpRouteLister,
Logger: c.logger,
}

Expand All @@ -199,6 +201,9 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {

c.logger.Debug("Initializing Mesh controller")

// Start the api.
c.api.Start()

// Start the informers.
c.startInformers(stopCh, 10*time.Second)

Expand All @@ -209,15 +214,15 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
c.logger.Errorf("Could not create mesh services: %v", err)
}

// Start the api, and enable the readiness endpoint.
c.api.Start()
// Enable API readiness endpoint, informers are started and default conf is available.
c.api.EnableReadiness()

for {
timer := time.NewTimer(10 * time.Second)
select {
case <-stopCh:
c.logger.Info("Shutting down workers")
return nil

case <-c.configRefreshChan:
// Reload the configuration.
topo, err := c.topologyBuilder.Build(c.ignoredResources)
Expand All @@ -228,19 +233,9 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {

conf := c.provider.BuildConfig(topo)

if !reflect.DeepEqual(c.lastConfiguration.Get(), conf) {
c.lastConfiguration.Set(conf)

// Configuration successfully created, enable readiness in the api.
c.api.EnableReadiness()
if !reflect.DeepEqual(c.currentConfiguration.Get(), conf) {
c.currentConfiguration.Set(conf)
}
case <-timer.C:
if rawCfg := c.lastConfiguration.Get(); rawCfg == nil {
break
}

// Configuration successfully created, enable readiness in the api.
c.api.EnableReadiness()
}
}
}
Expand Down Expand Up @@ -295,7 +290,7 @@ func (c *Controller) createMeshServices() error {

// Because createMeshServices is called after startInformers,
// then we already have the cache built, so we can use it.
svcs, err := c.ServiceLister.List(sel)
svcs, err := c.serviceLister.List(sel)
if err != nil {
return fmt.Errorf("unable to get services: %w", err)
}
Expand Down
92 changes: 47 additions & 45 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,55 @@ func New(tcpStateTable, udpStateTable PortFinder, middlewareBuilder MiddlewareBu
}
}

// NewDefaultDynamicConfig creates and returns the minimal working dynamic configuration which should be propagated
// to proxy nodes.
func NewDefaultDynamicConfig() *dynamic.Configuration {
return &dynamic.Configuration{
HTTP: &dynamic.HTTPConfiguration{
Routers: map[string]*dynamic.Router{
"readiness": {
Rule: "Path(`/ping`)",
EntryPoints: []string{"readiness"},
Service: "readiness",
},
},
Services: map[string]*dynamic.Service{
"readiness": {
LoadBalancer: &dynamic.ServersLoadBalancer{
PassHostHeader: getBoolRef(true),
Servers: []dynamic.Server{
{
URL: "http://127.0.0.1:8080",
},
},
},
},
blockAllServiceKey: {
LoadBalancer: &dynamic.ServersLoadBalancer{},
},
},
Middlewares: map[string]*dynamic.Middleware{
blockAllMiddlewareKey: {
IPWhiteList: &dynamic.IPWhiteList{
SourceRange: []string{"255.255.255.255"},
},
},
},
},
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Services: map[string]*dynamic.TCPService{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
}
}

// BuildConfig builds a dynamic configuration.
func (p *Provider) BuildConfig(t *topology.Topology) *dynamic.Configuration {
cfg := buildDefaultDynamicConfig()
cfg := NewDefaultDynamicConfig()

for svcKey, svc := range t.Services {
if err := p.buildConfigForService(t, cfg, svc); err != nil {
Expand Down Expand Up @@ -801,50 +847,6 @@ func hasTrafficTargetSpecTCPRoute(tt *topology.ServiceTrafficTarget) bool {
return false
}

func buildDefaultDynamicConfig() *dynamic.Configuration {
return &dynamic.Configuration{
HTTP: &dynamic.HTTPConfiguration{
Routers: map[string]*dynamic.Router{
"readiness": {
Rule: "Path(`/ping`)",
EntryPoints: []string{"readiness"},
Service: "readiness",
},
},
Services: map[string]*dynamic.Service{
"readiness": {
LoadBalancer: &dynamic.ServersLoadBalancer{
PassHostHeader: getBoolRef(true),
Servers: []dynamic.Server{
{
URL: "http://127.0.0.1:8080",
},
},
},
},
blockAllServiceKey: {
LoadBalancer: &dynamic.ServersLoadBalancer{},
},
},
Middlewares: map[string]*dynamic.Middleware{
blockAllMiddlewareKey: {
IPWhiteList: &dynamic.IPWhiteList{
SourceRange: []string{"255.255.255.255"},
},
},
},
},
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Services: map[string]*dynamic.TCPService{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
}
}

func addToSliceCopy(items []string, item string) []string {
cpy := make([]string, len(items)+1)
copy(cpy, items)
Expand Down

0 comments on commit 3224474

Please sign in to comment.