forked from traefik/traefik
-
Notifications
You must be signed in to change notification settings - Fork 0
/
metadata.go
138 lines (118 loc) · 4.05 KB
/
metadata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package rancher
import (
"context"
"fmt"
"time"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/sirupsen/logrus"
rancher "github.com/rancher/go-rancher-metadata/metadata"
)
// MetadataConfiguration contains configuration properties specific to
// the Rancher metadata service provider.
type MetadataConfiguration struct {
IntervalPoll bool `description:"Poll the Rancher metadata service every 'rancher.refreshseconds' (less accurate)"`
Prefix string `description:"Prefix used for accessing the Rancher metadata service"`
}
func (p *Provider) metadataProvide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
metadataServiceURL := fmt.Sprintf("http://rancher-metadata.rancher.internal/%s", p.Metadata.Prefix)
safe.Go(func() {
operation := func() error {
client, err := rancher.NewClientAndWait(metadataServiceURL)
if err != nil {
log.Errorf("Failed to create Rancher metadata service client: %v", err)
return err
}
updateConfiguration := func(version string) {
log.WithField("metadata_version", version).Debugln("Refreshing configuration from Rancher metadata service")
stacks, err := client.GetStacks()
if err != nil {
log.Errorf("Failed to query Rancher metadata service: %v", err)
return
}
rancherData := parseMetadataSourcedRancherData(stacks)
configuration := p.buildConfiguration(rancherData)
configurationChan <- types.ConfigMessage{
ProviderName: "rancher",
Configuration: configuration,
}
}
updateConfiguration("init")
if p.Watch {
pool.Go(func(stop chan bool) {
switch {
case p.Metadata.IntervalPoll:
p.intervalPoll(client, updateConfiguration, stop)
default:
p.longPoll(client, updateConfiguration, stop)
}
})
}
return nil
}
notify := func(err error, time time.Duration) {
log.WithFields(logrus.Fields{
"error": err,
"retry_in": time,
}).Errorln("Rancher metadata service connection error")
}
if err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify); err != nil {
log.WithField("endpoint", metadataServiceURL).Errorln("Cannot connect to Rancher metadata service")
}
})
return nil
}
func (p *Provider) intervalPoll(client rancher.Client, updateConfiguration func(string), stop chan bool) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
defer ticker.Stop()
var version string
for {
select {
case <-ticker.C:
newVersion, err := client.GetVersion()
if err != nil {
log.WithField("error", err).Errorln("Failed to read Rancher metadata service version")
} else if version != newVersion {
version = newVersion
updateConfiguration(version)
}
case <-stop:
return
}
}
}
func (p *Provider) longPoll(client rancher.Client, updateConfiguration func(string), stop chan bool) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
// Holds the connection until there is either a change in the metadata
// repository or `p.RefreshSeconds` has elapsed. Long polling should be
// favoured for the most accurate configuration updates.
safe.Go(func() {
client.OnChange(p.RefreshSeconds, updateConfiguration)
})
<-stop
}
func parseMetadataSourcedRancherData(stacks []rancher.Stack) (rancherDataList []rancherData) {
for _, stack := range stacks {
for _, service := range stack.Services {
var containerIPAddresses []string
for _, container := range service.Containers {
if containerFilter(container.Name, container.HealthState, container.State) {
containerIPAddresses = append(containerIPAddresses, container.PrimaryIp)
}
}
rancherDataList = append(rancherDataList, rancherData{
Name: service.Name + "/" + stack.Name,
State: service.State,
Labels: service.Labels,
Containers: containerIPAddresses,
})
}
}
return rancherDataList
}