-
Notifications
You must be signed in to change notification settings - Fork 287
/
watcher.go
134 lines (120 loc) · 2.99 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright (c) 2018 Tigera, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package binder
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"time"
)
type watcher interface {
// Returns a channel that sends events whenever workload mounts are created or removed
watch(stop <-chan bool) <-chan workloadEvent
}
type Operation int
const (
Added Operation = iota
Removed
)
const CredentialsSubdir = "creds"
const CredentialsExtension = ".json"
const PollSleepTime = 100 * time.Millisecond
type workloadEvent struct {
// What happended to the workload mount?
op Operation
uid string
}
func NewWatcher(path string) watcher {
return &pollWatcher{path}
}
type pollWatcher struct {
path string
}
func (p *pollWatcher) watch(stop <-chan bool) <-chan workloadEvent {
c := make(chan workloadEvent)
go p.poll(c, stop)
return c
}
func (p *pollWatcher) poll(events chan<- workloadEvent, stop <-chan bool) {
// The workloads we know about.
known := make(map[string]bool)
credPath := filepath.Join(p.path, CredentialsSubdir)
for {
// Check if we need to stop polling.
select {
case <-stop:
close(events)
return
default:
// continue
}
if _, err := os.Stat(credPath); err != nil {
time.Sleep(PollSleepTime)
} else {
break
}
}
log.Println("Ready to parse credential directory")
for {
// Check if we need to stop polling.
select {
case <-stop:
close(events)
return
default:
// continue
}
// list the contents of the directory
files, err := ioutil.ReadDir(credPath)
if err != nil {
log.Printf("error reading %s: %v", p.path, err)
close(events)
return
}
// This set will contain all previously known UIDs that are now absent.
removed := copyStringSet(known)
for _, file := range files {
isCred, uid := parseFilename(file.Name())
if isCred {
if !known[uid] {
events <- workloadEvent{op: Added, uid: uid}
known[uid] = true
}
delete(removed, uid)
}
}
// Send updates for UIDs we no longer know about.
for uid := range removed {
events <- workloadEvent{op: Removed, uid: uid}
delete(known, uid)
}
time.Sleep(PollSleepTime)
}
}
func parseFilename(name string) (isCred bool, uid string) {
if strings.HasSuffix(name, CredentialsExtension) {
return true, strings.TrimSuffix(name, CredentialsExtension)
} else {
return false, ""
}
}
func copyStringSet(original map[string]bool) map[string]bool {
n := make(map[string]bool)
for k, v := range original {
n[k] = v
}
return n
}