forked from traefik/traefik
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mesos.go
173 lines (149 loc) · 4.79 KB
/
mesos.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package mesos
import (
"fmt"
"strings"
"time"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/mesos/mesos-go/detector"
"github.com/mesosphere/mesos-dns/records"
"github.com/mesosphere/mesos-dns/records/state"
// Register mesos zoo the detector
_ "github.com/mesos/mesos-go/detector/zoo"
"github.com/mesosphere/mesos-dns/detect"
"github.com/mesosphere/mesos-dns/logging"
"github.com/mesosphere/mesos-dns/util"
)
var _ provider.Provider = (*Provider)(nil)
// Provider holds configuration of the provider.
type Provider struct {
provider.BaseProvider
Endpoint string `description:"Mesos server endpoint. You can also specify multiple endpoint for Mesos"`
Domain string `description:"Default domain used"`
ExposedByDefault bool `description:"Expose Mesos apps by default" export:"true"`
GroupsAsSubDomains bool `description:"Convert Mesos groups to subdomains" export:"true"`
ZkDetectionTimeout int `description:"Zookeeper timeout (in seconds)" export:"true"`
RefreshSeconds int `description:"Polling interval (in seconds)" export:"true"`
IPSources string `description:"IPSources (e.g. host, docker, mesos, netinfo)" export:"true"`
StateTimeoutSecond int `description:"HTTP Timeout (in seconds)" export:"true"`
Masters []string
}
// Init the provider
func (p *Provider) Init(constraints types.Constraints) error {
return p.BaseProvider.Init(constraints)
}
// Provide allows the mesos provider to provide configurations to traefik
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
operation := func() error {
// initialize logging
logging.SetupLogs()
log.Debugf("%s", p.IPSources)
var zk string
var masters []string
if strings.HasPrefix(p.Endpoint, "zk://") {
zk = p.Endpoint
} else {
masters = strings.Split(p.Endpoint, ",")
}
errch := make(chan error)
changed := detectMasters(zk, masters)
reload := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
zkTimeout := time.Second * time.Duration(p.ZkDetectionTimeout)
timeout := time.AfterFunc(zkTimeout, func() {
if zkTimeout > 0 {
errch <- fmt.Errorf("master detection timed out after %s", zkTimeout)
}
})
defer reload.Stop()
defer util.HandleCrash()
if !p.Watch {
reload.Stop()
timeout.Stop()
}
for {
select {
case <-reload.C:
tasks := p.getTasks()
configuration := p.buildConfiguration(tasks)
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "mesos",
Configuration: configuration,
}
}
case masters := <-changed:
if len(masters) == 0 || masters[0] == "" {
// no leader
timeout.Reset(zkTimeout)
} else {
timeout.Stop()
}
log.Debugf("new masters detected: %v", masters)
p.Masters = masters
tasks := p.getTasks()
configuration := p.buildConfiguration(tasks)
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "mesos",
Configuration: configuration,
}
}
case err := <-errch:
log.Errorf("%s", err)
}
}
}
notify := func(err error, time time.Duration) {
log.Errorf("Mesos connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Mesos server %+v", err)
}
return nil
}
func detectMasters(zk string, masters []string) <-chan []string {
changed := make(chan []string, 1)
if zk != "" {
log.Debugf("Starting master detector for ZK ", zk)
if md, err := detector.New(zk); err != nil {
log.Errorf("Failed to create master detector: %v", err)
} else if err := md.Detect(detect.NewMasters(masters, changed)); err != nil {
log.Errorf("Failed to initialize master detector: %v", err)
}
} else {
changed <- masters
}
return changed
}
func (p *Provider) getTasks() []state.Task {
rg := records.NewRecordGenerator(time.Duration(p.StateTimeoutSecond) * time.Second)
st, err := rg.FindMaster(p.Masters...)
if err != nil {
log.Errorf("Failed to create a client for Mesos, error: %v", err)
return nil
}
return taskRecords(st)
}
func taskRecords(st state.State) []state.Task {
var tasks []state.Task
for _, f := range st.Frameworks {
for _, task := range f.Tasks {
for _, slave := range st.Slaves {
if task.SlaveID == slave.ID {
task.SlaveIP = slave.PID.Host
}
}
// only do running and discoverable tasks
if task.State == "TASK_RUNNING" {
tasks = append(tasks, task)
}
}
}
return tasks
}