generated from khulnasoft/khulnasoft-repo-template
/
meshsync.go
79 lines (68 loc) · 2.44 KB
/
meshsync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package meshsync
import (
"github.com/khulnasoft/meshkit/broker"
"github.com/khulnasoft/meshkit/config"
"github.com/khulnasoft/meshkit/logger"
meshplaykube "github.com/khulnasoft/meshkit/utils/kubernetes"
"github.com/khulnasoft/meshsync/internal/channels"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// Handler contains all handlers, channels, clients, and other parameters for an adapter.
// Use type embedding in a specific adapter to extend it.
type Handler struct {
Config config.Handler
Log logger.Handler
Broker broker.Handler
restConfig rest.Config
informer dynamicinformer.DynamicSharedInformerFactory
staticClient *kubernetes.Clientset
channelPool map[string]channels.GenericChannel
stores map[string]cache.Store
}
func GetListOptionsFunc(config config.Handler) (func(*v1.ListOptions), error) {
var blacklist []string
err := config.GetObject("spec.informer_config", blacklist)
if err != nil {
return nil, err
}
return func(lo *v1.ListOptions) {
// Create a label selector to include all objects
labelSelector := &v1.LabelSelector{}
// Add label selector requirements to exclude blacklisted types
labelSelectorReq := v1.LabelSelectorRequirement{
Key: "type",
Operator: v1.LabelSelectorOpNotIn,
Values: blacklist,
}
labelSelector.MatchExpressions = append(labelSelector.MatchExpressions, labelSelectorReq)
}, nil
}
func New(config config.Handler, log logger.Handler, br broker.Handler, pool map[string]channels.GenericChannel) (*Handler, error) {
// Initialize Kubeconfig
kubeClient, err := meshplaykube.New(nil)
if err != nil {
return nil, ErrKubeConfig(err)
}
listOptionsFunc, err := GetListOptionsFunc(config)
if err != nil {
return nil, err
}
informer := GetDynamicInformer(config, kubeClient.DynamicKubeClient, listOptionsFunc)
return &Handler{
Config: config,
Log: log,
Broker: br,
informer: informer,
restConfig: kubeClient.RestConfig,
staticClient: kubeClient.KubeClient,
channelPool: pool,
}, nil
}
func GetDynamicInformer(config config.Handler, dynamicKubeClient dynamic.Interface, listOptionsFunc func(*v1.ListOptions)) dynamicinformer.DynamicSharedInformerFactory {
return dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicKubeClient, 0, v1.NamespaceAll, listOptionsFunc)
}