-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.go
97 lines (84 loc) · 2.6 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package main
import (
goflag "flag"
flag "github.com/spf13/pflag"
"io/ioutil"
"k8s.io/client-go/rest"
"net"
"os"
"path/filepath"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
certutil "k8s.io/client-go/util/cert"
"k8s.io/klog"
)
var (
namespace = flag.String("namespace", "", "Namespace the pod is running in")
tokenDirs = flag.StringSlice("acess-token-dirs", nil, "Directories with access-token. A worker will be started per each dir/token")
qps = flag.Float64("qps-per-worker", 0.5, "QPS per worker")
)
func main() {
initFlagsAndKlog()
for i, tokenDir := range *tokenDirs {
tokenFile := filepath.Join(tokenDir, "token")
rootCAFile := filepath.Join(tokenDir, "ca.crt")
config, err := NewConfig(tokenFile, rootCAFile)
if err != nil {
klog.Fatal(err.Error())
}
client, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err.Error())
}
klog.Infof("Starting worker %d\n", i)
go worker(i, client, *qps)
}
klog.Infof("Started %d workers \n", len(*tokenDirs))
if len(*tokenDirs) > 0 {
<-make(chan bool) // Block main routine.
}
}
func initFlagsAndKlog() {
flag.Set("alsologtostderr", "true")
klogFlags := goflag.NewFlagSet("klog", goflag.ExitOnError)
klog.InitFlags(klogFlags)
flag.CommandLine.AddGoFlagSet(klogFlags)
flag.Parse()
}
func worker(id int, client kubernetes.Interface, qps float64) {
duration := time.Duration(float64(int64(time.Second)) / qps)
ticker := time.NewTicker(duration)
for {
klog.V(4).Infof("Worker %v sends request\n", id)
svcAccount, err := client.CoreV1().ServiceAccounts(*namespace).Get("default", metav1.GetOptions{})
if err != nil {
klog.Warningf("Got error when getting default svcAccount: %v", err)
} else {
klog.V(4).Infof("Worker %v fetched %s svcAccount in namespace %s\n", id, svcAccount.Name, svcAccount.Namespace)
}
<-ticker.C
}
}
func NewConfig(tokenFile, rootCAFile string) (*rest.Config, error) {
host, port := os.Getenv("KUBERNETES_SERVICE_HOST"), os.Getenv("KUBERNETES_SERVICE_PORT")
if len(host) == 0 || len(port) == 0 {
return nil, rest.ErrNotInCluster
}
token, err := ioutil.ReadFile(tokenFile)
if err != nil {
return nil, err
}
tlsClientConfig := rest.TLSClientConfig{}
if _, err := certutil.NewPool(rootCAFile); err != nil {
klog.Errorf("Expected to load root CA config from %s, but got err: %v", rootCAFile, err)
} else {
tlsClientConfig.CAFile = rootCAFile
}
return &rest.Config{
Host: "https://" + net.JoinHostPort(host, port),
TLSClientConfig: tlsClientConfig,
BearerToken: string(token),
BearerTokenFile: tokenFile,
}, nil
}