Skip to content

Commit

Permalink
Detect change when service or node are in maintenance mode
Browse files Browse the repository at this point in the history
  • Loading branch information
mmatur authored and traefiker committed Jun 5, 2018
1 parent 2c18750 commit e299775
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 21 deletions.
92 changes: 85 additions & 7 deletions integration/consul_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,25 @@ func (s *ConsulCatalogSuite) registerService(name string, address string, port i
return err
}

func (s *ConsulCatalogSuite) registerAgentService(name string, address string, port int, tags []string) error {
func (s *ConsulCatalogSuite) registerAgentService(name string, address string, port int, tags []string, withHealthCheck bool) error {
agent := s.consulClient.Agent()
var healthCheck *api.AgentServiceCheck
if withHealthCheck {
healthCheck = &api.AgentServiceCheck{
HTTP: "http://" + address,
Interval: "10s",
}
} else {
healthCheck = nil
}
return agent.ServiceRegister(
&api.AgentServiceRegistration{
ID: address,
Tags: tags,
Name: name,
Address: address,
Port: port,
Check: &api.AgentServiceCheck{
HTTP: "http://" + address,
Interval: "10s",
},
Check: healthCheck,
},
)
}
Expand Down Expand Up @@ -124,6 +130,22 @@ func (s *ConsulCatalogSuite) deregisterService(name string, address string) erro
return err
}

func (s *ConsulCatalogSuite) consulEnableServiceMaintenance(name string) error {
return s.consulClient.Agent().EnableServiceMaintenance(name, fmt.Sprintf("Maintenance mode for service %s", name))
}

func (s *ConsulCatalogSuite) consulDisableServiceMaintenance(name string) error {
return s.consulClient.Agent().DisableServiceMaintenance(name)
}

func (s *ConsulCatalogSuite) consulEnableNodeMaintenance() error {
return s.consulClient.Agent().EnableNodeMaintenance("Maintenance mode for node")
}

func (s *ConsulCatalogSuite) consulDisableNodeMaintenance() error {
return s.consulClient.Agent().DisableNodeMaintenance()
}

func (s *ConsulCatalogSuite) TestSimpleConfiguration(c *check.C) {
cmd, display := s.traefikCmd(
withConfigFile("fixtures/consul_catalog/simple.toml"),
Expand Down Expand Up @@ -282,7 +304,7 @@ func (s *ConsulCatalogSuite) TestRefreshConfigWithMultipleNodeWithoutHealthCheck
c.Assert(err, checker.IsNil, check.Commentf("Error registering service"))
defer s.deregisterService("test", whoami.NetworkSettings.IPAddress)

err = s.registerAgentService("test", whoami.NetworkSettings.IPAddress, 80, []string{"name=whoami1"})
err = s.registerAgentService("test", whoami.NetworkSettings.IPAddress, 80, []string{"name=whoami1"}, true)
c.Assert(err, checker.IsNil, check.Commentf("Error registering agent service"))
defer s.deregisterAgentService(whoami.NetworkSettings.IPAddress)

Expand Down Expand Up @@ -558,7 +580,7 @@ func (s *ConsulCatalogSuite) TestServiceWithMultipleHealthCheck(c *check.C) {

whoami := s.composeProject.Container(c, "whoami1")
// Register service
err = s.registerAgentService("test", whoami.NetworkSettings.IPAddress, 80, []string{"name=whoami1"})
err = s.registerAgentService("test", whoami.NetworkSettings.IPAddress, 80, []string{"name=whoami1"}, true)
c.Assert(err, checker.IsNil, check.Commentf("Error registering agent service"))
defer s.deregisterAgentService(whoami.NetworkSettings.IPAddress)

Expand Down Expand Up @@ -596,3 +618,59 @@ func (s *ConsulCatalogSuite) TestServiceWithMultipleHealthCheck(c *check.C) {
err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
c.Assert(err, checker.IsNil)
}

func (s *ConsulCatalogSuite) TestMaintenanceMode(c *check.C) {
cmd, display := s.traefikCmd(
withConfigFile("fixtures/consul_catalog/simple.toml"),
"--consulCatalog",
"--consulCatalog.endpoint="+s.consulIP+":8500",
"--consulCatalog.domain=consul.localhost")
defer display(c)
err := cmd.Start()
c.Assert(err, checker.IsNil)
defer cmd.Process.Kill()

// Wait for Traefik to turn ready.
err = try.GetRequest("http://127.0.0.1:8000/", 2*time.Second, try.StatusCodeIs(http.StatusNotFound))
c.Assert(err, checker.IsNil)

whoami := s.composeProject.Container(c, "whoami1")

err = s.registerAgentService("test", whoami.NetworkSettings.IPAddress, 80, []string{}, false)
c.Assert(err, checker.IsNil, check.Commentf("Error registering service"))

req, err := http.NewRequest(http.MethodGet, "http://127.0.0.1:8000/", nil)
c.Assert(err, checker.IsNil)
req.Host = "test.consul.localhost"

err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
c.Assert(err, checker.IsNil)

// Enable service maintenance mode
err = s.consulEnableServiceMaintenance(whoami.NetworkSettings.IPAddress)
c.Assert(err, checker.IsNil)

err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusNotFound), try.HasBody())
c.Assert(err, checker.IsNil)

// Disable service maintenance mode
err = s.consulDisableServiceMaintenance(whoami.NetworkSettings.IPAddress)
c.Assert(err, checker.IsNil)

err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
c.Assert(err, checker.IsNil)

// Enable node maintenance mode
err = s.consulEnableNodeMaintenance()
c.Assert(err, checker.IsNil)

err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusNotFound), try.HasBody())
c.Assert(err, checker.IsNil)

// Disable node maintenance mode
err = s.consulDisableNodeMaintenance()
c.Assert(err, checker.IsNil)

err = try.Request(req, 10*time.Second, try.StatusCodeIs(http.StatusOK), try.HasBody())
c.Assert(err, checker.IsNil)
}
33 changes: 19 additions & 14 deletions provider/consulcatalog/consul_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s
safe.Go(func() {
// variable to hold previous state
var flashback map[string][]string
var flashbackMaintenance []string

options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}

Expand All @@ -277,12 +278,15 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s

var current = make(map[string][]string)
var currentFailing = make(map[string]*api.HealthCheck)
var maintenance []string
if healthyState != nil {
for _, healthy := range healthyState {
key := fmt.Sprintf("%s-%s", healthy.Node, healthy.ServiceID)
_, failing := currentFailing[key]
if healthy.Status == "passing" && !failing {
current[key] = append(current[key], healthy.Node)
} else if strings.HasPrefix(healthy.CheckID, "_service_maintenance") || strings.HasPrefix(healthy.CheckID, "_node_maintenance") {
maintenance = append(maintenance, healthy.CheckID)
} else {
currentFailing[key] = healthy
if _, ok := current[key]; ok {
Expand Down Expand Up @@ -314,24 +318,25 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s
// Thus it is required to do extra check for changes...
addedKeys, removedKeys, changedKeys := getChangedHealth(current, flashback)

if len(addedKeys) > 0 {
log.WithField("DiscoveredServices", addedKeys).Debug("Health State change detected.")
watchCh <- data
flashback = current
}
if len(addedKeys) > 0 || len(removedKeys) > 0 || len(changedKeys) > 0 {
log.WithField("DiscoveredServices", addedKeys).
WithField("MissingServices", removedKeys).
WithField("ChangedServices", changedKeys).
Debug("Health State change detected.")

if len(removedKeys) > 0 {
log.WithField("MissingServices", removedKeys).Debug("Health State change detected.")
watchCh <- data
flashback = current
}

if len(changedKeys) > 0 {
log.WithField("ChangedServices", changedKeys).Debug("Health State change detected.")
watchCh <- data
flashback = current
flashbackMaintenance = maintenance
} else {
addedKeysMaintenance, removedMaintenance := getChangedStringKeys(maintenance, flashbackMaintenance)

if len(addedKeysMaintenance) > 0 || len(removedMaintenance) > 0 {
log.WithField("MaintenanceMode", maintenance).Debug("Maintenance change detected.")
watchCh <- data
flashback = current
flashbackMaintenance = maintenance
}
}

}
}
})
Expand Down

0 comments on commit e299775

Please sign in to comment.