This repository has been archived by the owner on Sep 16, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
/
ingress.go
177 lines (144 loc) · 4.4 KB
/
ingress.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package producers
import (
"fmt"
"html/template"
"strings"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/zalando-incubator/mate/pkg"
"github.com/zalando-incubator/mate/pkg/kubernetes"
k8s "k8s.io/client-go/kubernetes"
api "k8s.io/client-go/pkg/api/v1"
extensions "k8s.io/client-go/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/pkg/watch"
)
type kubernetesIngressProducer struct {
client *k8s.Clientset
tmpl *template.Template
filter map[string]string
}
func NewKubernetesIngress(cfg *KubernetesOptions) (*kubernetesIngressProducer, error) {
client, err := kubernetes.NewClient(cfg.APIServer)
if err != nil {
return nil, fmt.Errorf("[Ingress] Unable to setup Kubernetes API client: %v", err)
}
tmpl, err := template.New("endpoint").Funcs(template.FuncMap{
"trimPrefix": strings.TrimPrefix,
}).Parse(cfg.Format)
if err != nil {
return nil, fmt.Errorf("[Ingress] Error parsing template: %s", err)
}
return &kubernetesIngressProducer{
client: client,
tmpl: tmpl,
filter: cfg.Filter,
}, nil
}
func (a *kubernetesIngressProducer) Endpoints() ([]*pkg.Endpoint, error) {
allIngress, err := a.client.Ingresses(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
return nil, fmt.Errorf("[Ingress] Unable to retrieve list of ingress: %v", err)
}
endpoints := make([]*pkg.Endpoint, 0)
for _, ing := range allIngress.Items {
if err := validateIngress(ing, a.filter); err != nil {
log.Warnln(err)
continue
}
eps := a.convertIngressToEndpoint(ing)
endpoints = append(endpoints, eps...)
}
return endpoints, nil
}
func (a *kubernetesIngressProducer) Monitor(results chan *pkg.Endpoint, errChan chan error, done chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
loop:
for {
w, err := a.client.Ingresses(api.NamespaceAll).Watch(api.ListOptions{})
if err != nil {
errChan <- fmt.Errorf("[Ingress] Unable to watch list of ingress: %v", err)
select {
case <-done:
log.Info("[Ingress] Exited monitoring loop.")
return
case <-time.After(5 * time.Second):
goto loop
}
}
for {
select {
case event, ok := <-w.ResultChan():
if !ok {
goto loop
}
if event.Type == watch.Error {
// TODO: consider allowing the service continue running and just log this error
errChan <- fmt.Errorf("[Ingress] Event listener received an error, terminating: %v", event)
continue
}
if event.Type != watch.Added && event.Type != watch.Modified {
continue
}
ing, ok := event.Object.(*extensions.Ingress)
if !ok {
// If the object wasn't a Service we can safely ignore it
log.Printf("[Ingress] Cannot cast object to ingress: %v", ing)
continue
}
log.Printf("%s: %s/%s", event.Type, ing.Namespace, ing.Name)
if err := validateIngress(*ing, a.filter); err != nil {
log.Warnln(err)
continue
}
eps := a.convertIngressToEndpoint(*ing)
for _, ep := range eps {
results <- ep
}
case <-done:
log.Info("[Ingress] Exited monitoring loop.")
return
}
}
}
}
func validateIngress(ing extensions.Ingress, filter map[string]string) error {
for key := range filter {
if ing.Annotations[key] != filter[key] {
return fmt.Errorf(
"[Ingress] Ingress '%s/%s' doesn't match filter for annotation %s: %s != %s",
ing.Namespace, ing.Name, key, filter[key], ing.Annotations[key],
)
}
}
switch {
case len(ing.Status.LoadBalancer.Ingress) == 0:
return fmt.Errorf(
"[Ingress] The load balancer of ingress '%s/%s' does not have any ingress.",
ing.Namespace, ing.Name,
)
case len(ing.Status.LoadBalancer.Ingress) > 1:
// TODO(linki): in case we have multiple ingress we can just create multiple A or CNAME records
log.Warnf("[Ingress] Ingress '%s/%s' has more than one ingress (%d). Only using the first one.",
ing.Namespace, ing.Name, len(ing.Status.LoadBalancer.Ingress))
return nil
}
return nil
}
func (a *kubernetesIngressProducer) convertIngressToEndpoint(ing extensions.Ingress) []*pkg.Endpoint {
endpoints := make([]*pkg.Endpoint, 0, len(ing.Spec.Rules))
for _, rule := range ing.Spec.Rules {
ep := &pkg.Endpoint{}
for _, i := range ing.Status.LoadBalancer.Ingress {
ep.IP = i.IP
ep.Hostname = i.Hostname
// take the first entry and exit
// TODO(linki): we could easily return a list of endpoints
break
}
ep.DNSName = pkg.SanitizeDNSName(rule.Host)
endpoints = append(endpoints, ep)
}
return endpoints
}