-
Notifications
You must be signed in to change notification settings - Fork 4.7k
/
egress_dns.go
167 lines (139 loc) · 3.39 KB
/
egress_dns.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package plugin
import (
"net"
"sync"
"time"
"github.com/golang/glog"
osapi "github.com/openshift/origin/pkg/sdn/apis/network"
ktypes "k8s.io/apimachinery/pkg/types"
kexec "k8s.io/kubernetes/pkg/util/exec"
)
type EgressDNSUpdate struct {
UID ktypes.UID
Namespace string
}
type EgressDNS struct {
// Protects pdMap/namespaces operations
lock sync.Mutex
// Holds Egress DNS entries for each policy
pdMap map[ktypes.UID]*DNS
// Maintain namespaces for each policy to avoid querying etcd in syncEgressDNSPolicyRules()
namespaces map[ktypes.UID]string
// Report change when Add operation is done
added chan bool
// Report changes when there are dns updates
updates chan EgressDNSUpdate
}
func NewEgressDNS() *EgressDNS {
return &EgressDNS{
pdMap: map[ktypes.UID]*DNS{},
namespaces: map[ktypes.UID]string{},
added: make(chan bool),
updates: make(chan EgressDNSUpdate),
}
}
func (e *EgressDNS) Add(policy osapi.EgressNetworkPolicy) {
dnsInfo := NewDNS(kexec.New())
for _, rule := range policy.Spec.Egress {
if len(rule.To.DNSName) > 0 {
if err := dnsInfo.Add(rule.To.DNSName); err != nil {
glog.Error(err)
}
}
}
if dnsInfo.Size() > 0 {
e.lock.Lock()
defer e.lock.Unlock()
e.pdMap[policy.UID] = dnsInfo
e.namespaces[policy.UID] = policy.Namespace
e.signalAdded()
}
}
func (e *EgressDNS) Delete(policy osapi.EgressNetworkPolicy) {
e.lock.Lock()
defer e.lock.Unlock()
if _, ok := e.pdMap[policy.UID]; ok {
delete(e.pdMap, policy.UID)
delete(e.namespaces, policy.UID)
}
}
func (e *EgressDNS) Update(policyUID ktypes.UID) (error, bool) {
e.lock.Lock()
defer e.lock.Unlock()
if dnsInfo, ok := e.pdMap[policyUID]; ok {
return dnsInfo.Update()
}
return nil, false
}
func (e *EgressDNS) Sync() {
var duration time.Duration
for {
tm, policyUID, policyNamespace, ok := e.GetMinQueryTime()
if !ok {
duration = 30 * time.Minute
} else {
now := time.Now()
if tm.After(now) {
// Item needs to wait for this duration before it can be processed
duration = tm.Sub(now)
} else {
err, changed := e.Update(policyUID)
if err != nil {
glog.Error(err)
}
if changed {
e.updates <- EgressDNSUpdate{policyUID, policyNamespace}
}
continue
}
}
// Wait for the given duration or till something got added
select {
case <-e.added:
case <-time.After(duration):
}
}
}
func (e *EgressDNS) GetMinQueryTime() (time.Time, ktypes.UID, string, bool) {
e.lock.Lock()
defer e.lock.Unlock()
timeSet := false
var minTime time.Time
var uid ktypes.UID
for policyUID, dnsInfo := range e.pdMap {
tm, ok := dnsInfo.GetMinQueryTime()
if !ok {
continue
}
if (timeSet == false) || tm.Before(minTime) {
timeSet = true
minTime = tm
uid = policyUID
}
}
return minTime, uid, e.namespaces[uid], timeSet
}
func (e *EgressDNS) GetIPs(policy osapi.EgressNetworkPolicy, dnsName string) []net.IP {
e.lock.Lock()
defer e.lock.Unlock()
dnsInfo, ok := e.pdMap[policy.UID]
if !ok {
return []net.IP{}
}
return dnsInfo.Get(dnsName).ips
}
func (e *EgressDNS) GetNetCIDRs(policy osapi.EgressNetworkPolicy, dnsName string) []net.IPNet {
cidrs := []net.IPNet{}
for _, ip := range e.GetIPs(policy, dnsName) {
// IPv4 CIDR
cidrs = append(cidrs, net.IPNet{IP: ip, Mask: net.CIDRMask(32, 32)})
}
return cidrs
}
func (e *EgressDNS) signalAdded() {
// Non-blocking op
select {
case e.added <- true:
default:
}
}