-
Notifications
You must be signed in to change notification settings - Fork 4.7k
/
poll_service_options.go
78 lines (63 loc) · 2.03 KB
/
poll_service_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package poll_service
import (
"context"
"fmt"
"io"
"os"
"github.com/openshift/origin/pkg/clioptions/iooptions"
"github.com/openshift/origin/pkg/monitor"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
)
type PollServiceOptions struct {
KubeClient kubernetes.Interface
Namespace string
ClusterIP string
Port uint16
BackendPrefix string
OutputFile string
MyNodeName string
StopConfigMapName string
OriginalOutFile io.Writer
CloseFn iooptions.CloseFunc
genericclioptions.IOStreams
}
func (o *PollServiceOptions) Run(ctx context.Context) error {
fmt.Fprintf(o.Out, "Initializing to watch clusterIP %s:%d\n", o.ClusterIP, o.Port)
fmt.Fprintf(o.Out, "Initializing output file to %s\n", o.OutputFile)
startingContent, err := os.ReadFile(o.OutputFile)
if err != nil && !os.IsNotExist(err) {
return err
}
if len(startingContent) > 0 {
// print starting content to the log so that we can simply scrape the log to find all entries at the end
o.OriginalOutFile.Write([]byte(fmt.Sprintf("Found existing content: %s", string(startingContent))))
}
recorder := monitor.WrapWithJSONLRecorder(monitor.NewRecorder(), o.IOStreams.Out, nil)
kubeInformers := informers.NewSharedInformerFactory(o.KubeClient, 0)
namespacedScopedCoreInformers := coreinformers.New(kubeInformers, o.Namespace, nil)
cleanupFinished := make(chan struct{})
podToServiceChecker := NewPollServiceWatcher(
o.BackendPrefix,
o.MyNodeName,
o.Namespace,
o.ClusterIP,
o.Port,
recorder,
o.IOStreams.Out,
o.StopConfigMapName,
namespacedScopedCoreInformers.ConfigMaps(),
)
go podToServiceChecker.Run(ctx, cleanupFinished)
go kubeInformers.Start(ctx.Done())
fmt.Fprintf(o.Out, "Watching configmaps...\n")
<-ctx.Done()
// now wait for the watchers to shutdown
fmt.Fprintf(o.Out, "Waiting for watchers to close...\n")
// TODO add time interrupt too
<-cleanupFinished
fmt.Fprintf(o.Out, "Exiting...\n")
return nil
}