/
main.go
120 lines (102 loc) · 3.17 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package main
import (
"context"
"encoding/json"
"fmt"
argoTriggers "github.com/argoproj/argo-events/sensors/triggers"
"github.com/kiowy-org/argo-events-jsonpatch-trigger/proto"
"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"net"
"os"
"sigs.k8s.io/yaml"
)
type JSONPatchTrigger struct {
K8sClient kubernetes.Interface
DynamicClient dynamic.Interface
namespaceableDynamicClient dynamic.NamespaceableResourceInterface
proto.TriggerServer
}
func (t *JSONPatchTrigger) FetchResource(ctx context.Context, request *proto.FetchResourceRequest) (*proto.FetchResourceResponse, error) {
// Extract gvr from the request
var resource map[string]string
if err := yaml.Unmarshal(request.Resource, &resource); err != nil {
return nil, err
}
data, err := json.Marshal(resource["target"])
if err != nil {
return nil, err
}
objData := make(map[string]interface{})
err = json.Unmarshal(data, &objData)
obj := unstructured.Unstructured{Object: objData}
// Fetch the resource from the k8s cluster
gvr := argoTriggers.GetGroupVersionResource(&obj)
t.namespaceableDynamicClient = t.DynamicClient.Resource(gvr)
//since we are patching, we always retrieve a live object
objName := obj.GetName()
if objName == "" {
return nil, fmt.Errorf("resource name is required")
}
// for now, we don't support ClusterWide objects
//todo: add support for cluster wide objects
objNamespace := obj.GetNamespace()
if objNamespace == "" {
return nil, fmt.Errorf("resource namespace is required")
}
rObj, err := t.namespaceableDynamicClient.Namespace(objNamespace).Get(ctx, objName, metav1.GetOptions{})
if err != nil {
return nil, err
}
body, err := json.Marshal(rObj)
if err != nil {
return nil, err
}
return &proto.FetchResourceResponse{Resource: body}, nil
}
func (t *JSONPatchTrigger) Execute(ctx context.Context, request *proto.ExecuteRequest) (*proto.ExecuteResponse, error) {
// Extract object from the request
var resource map[string]string
if err := yaml.Unmarshal(request.Resource, &resource); err != nil {
return nil, err
}
//t.namespaceableDynamicClient.Namespace().Patch(ctx, name, types.JSONPatchType, patch, metav1.PatchOptions{})
return &proto.ExecuteResponse{
Response: nil,
}, nil
}
func (t *JSONPatchTrigger) ApplyPolicy(ctx context.Context, request *proto.ApplyPolicyRequest) (*proto.ApplyPolicyResponse, error) {
//for now, we don't implement any policy so always return success
return &proto.ApplyPolicyResponse{
Success: true,
Message: "success",
}, nil
}
func main() {
port, ok := os.LookupEnv("JPT_PORT")
if !ok {
port = "9000"
}
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// Start the server
lis, err := net.Listen("tcp", fmt.Sprintf(":%s", port))
if err != nil {
panic(err.Error())
}
srv := grpc.NewServer()
trigger := &JSONPatchTrigger{
K8sClient: kubernetes.NewForConfigOrDie(config),
DynamicClient: dynamic.NewForConfigOrDie(config),
}
proto.RegisterTriggerServer(srv, trigger)
if err := srv.Serve(lis); err != nil {
panic(err.Error())
}
}