forked from joeholley/supergloo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsetup.go
129 lines (108 loc) · 3.51 KB
/
setup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package setup
import (
"context"
"time"
"github.com/solo-io/go-utils/installutils/kuberesource"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/solo-io/supergloo/pkg/install/linkerd"
"github.com/solo-io/go-utils/installutils/kubeinstall"
"github.com/solo-io/supergloo/pkg/install/gloo"
"github.com/solo-io/supergloo/pkg/install/istio"
"github.com/solo-io/supergloo/pkg/api/clientset"
"github.com/solo-io/go-utils/contextutils"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/reporter"
v1 "github.com/solo-io/supergloo/pkg/api/v1"
)
func RunInstallEventLoop(ctx context.Context, cs *clientset.Clientset, customErrHandler func(error)) error {
ctx = contextutils.WithLogger(ctx, "install-event-loop")
logger := contextutils.LoggerFrom(ctx)
errHandler := func(err error) {
if err == nil {
return
}
logger.Errorf("install error: %v", err)
if customErrHandler != nil {
customErrHandler(err)
}
}
installCache := kubeinstall.NewCache()
go func() {
logger.Infof("beginning install cache sync, this may take a while...")
started := time.Now()
if err := installCache.Init(ctx, cs.RestConfig, append(kubeinstall.DefaultFilters, cacheFilters...)...); err != nil {
logger.Fatalf("failed to initialize installation cache: %v", err)
}
logger.Infof("finished install cache sync. took %v", time.Now().Sub(started))
}()
kubeInstaller, err := kubeinstall.NewKubeInstaller(cs.RestConfig, installCache, nil)
if err != nil {
return err
}
installSyncers := createInstallSyncers(cs, kubeInstaller)
if err := startEventLoop(ctx, errHandler, cs, installSyncers); err != nil {
return err
}
return nil
}
// Add install syncers here
func createInstallSyncers(clientset *clientset.Clientset, installer kubeinstall.Installer) v1.InstallSyncers {
return v1.InstallSyncers{
istio.NewInstallSyncer(
installer,
clientset.Supergloo.Mesh,
reporter.NewReporter("istio-install-reporter", clientset.Supergloo.Install.BaseClient()),
),
linkerd.NewInstallSyncer(
installer,
clientset.Kube,
clientset.Supergloo.Mesh,
reporter.NewReporter("linkerd-install-reporter", clientset.Supergloo.Install.BaseClient()),
),
gloo.NewInstallSyncer(
installer,
clientset.Supergloo.MeshIngress,
reporter.NewReporter("gloo-install-reporter", clientset.Supergloo.Install.BaseClient()),
),
}
}
// start the install event loop
func startEventLoop(ctx context.Context, errHandler func(err error), c *clientset.Clientset, syncers v1.InstallSyncers) error {
installEmitter := v1.NewInstallEmitter(c.Supergloo.Install, c.Supergloo.Mesh, c.Supergloo.MeshIngress)
installEventLoop := v1.NewInstallEventLoop(installEmitter, syncers)
watchOpts := clients.WatchOpts{
Ctx: ctx,
RefreshRate: time.Second * 1,
}
installEventLoopErrs, err := installEventLoop.Run(nil, watchOpts)
if err != nil {
return err
}
go func() {
for {
select {
case err := <-installEventLoopErrs:
errHandler(err)
case <-ctx.Done():
}
}
}()
return nil
}
/*
to speed up the cache init, filter out resource types
*/
var cacheFilters = []kuberesource.FilterResource{
func(resource schema.GroupVersionResource) bool {
for _, ignoredType := range ignoreTypesForInstall {
if resource.String() == ignoredType.String() {
return true
}
}
return false
},
}
// types the installer should ignore and the cache should skip
var ignoreTypesForInstall = []schema.GroupVersionResource{
{Resource: "certificatesigningrequests", Version: "v1beta1", Group: "certificates.k8s.io"},
}