-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
57 lines (50 loc) · 1.42 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package watcher
import (
"sync"
"github.com/jawahars16/kubex/infra"
"github.com/jawahars16/kubex/kube"
v1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
func mapService(service *v1.Service, action string, pods []string) Payload {
ingress := service.Status.LoadBalancer.Ingress
ip := ""
if len(ingress) > 0 {
ip = service.Status.LoadBalancer.Ingress[0].IP
}
meta := Meta{
ID: string(service.UID),
Name: service.Name,
Namespace: service.Namespace,
Labels: service.Labels,
Created: service.CreationTimestamp,
}
resource := Service{
Meta: meta,
IP: ip,
Selector: service.Spec.Selector,
Pods: pods,
}
return Payload{
Action: action,
Resource: resource,
}
}
// WatchServices ...
func WatchServices(socket infra.Socket, mutex *sync.Mutex, namespace string) {
channel := kube.GetServiceChannel(namespace)
for event := range channel {
if service, ok := event.Object.(*v1.Service); ok {
if service.Name != "kubernetes" {
// Critical Section : Multiple goroutines may write to socket at same time.
mutex.Lock()
labelSelector := labels.SelectorFromSet(service.Spec.Selector)
options := metaV1.ListOptions{LabelSelector: labelSelector.String()}
pods := kube.GetPodList(service.Namespace, options)
socket.Write(mapService(service, "SVC_"+string(event.Type), pods))
mutex.Unlock()
}
}
}
}