-
Notifications
You must be signed in to change notification settings - Fork 2
/
utils.go
165 lines (137 loc) · 4.11 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package http
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net/http"
"os"
"time"
openapi "github.com/khulnasoft-lab/golang_sdk/client"
rhttp "github.com/hashicorp/go-retryablehttp"
)
func IsConsoleAgent(url string) bool {
return len(os.Getenv("KHULNASOFT_CONSOLE_AGENT")) > 0
}
func GetConsoleApiToken(console, port string) (string, error) {
// setup http client
rhc := rhttp.NewClient()
rhc.HTTPClient.Timeout = 10 * time.Second
rhc.RetryMax = 3
rhc.RetryWaitMin = 1 * time.Second
rhc.RetryWaitMax = 10 * time.Second
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.TLSClientConfig = &tls.Config{
RootCAs: x509.NewCertPool(),
InsecureSkipVerify: true,
}
tr.DisableKeepAlives = false
rhc.HTTPClient = &http.Client{Transport: tr}
servers := openapi.ServerConfigurations{
{
URL: fmt.Sprintf("http://%s:%s", console, port),
Description: "khulnasoft_server_internal",
},
}
cfg := openapi.NewConfiguration()
cfg.HTTPClient = rhc.StandardClient()
cfg.Servers = servers
client := openapi.NewAPIClient(cfg)
token, resp, err := client.InternalAPI.GetConsoleApiTokenExecute(openapi.ApiGetConsoleApiTokenRequest{})
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf(resp.Status)
}
return token.GetApiToken(), nil
}
type StreamingConfig struct {
Addition bool
Deletion bool
EntityTypes []string //Supported types: Node, Container, ContainerImage
}
func (cfg StreamingConfig) validateConfig() error {
if !cfg.Addition && !cfg.Deletion {
return fmt.Errorf("Invalid StreamingConfig, should have atleast one event set")
}
if len(cfg.EntityTypes) == 0 {
return fmt.Errorf("Invalid StreamingConfig, should have atleast one entity")
}
validEntityTypes := make(map[string]bool)
//We only support streaming for entities of type
//Host, Container and ContainerImage
validEntityTypes["Node"] = true
validEntityTypes["Container"] = true
validEntityTypes["ContainerImage"] = true
for _, entity := range cfg.EntityTypes {
if _, ok := validEntityTypes[entity]; !ok {
return fmt.Errorf("Invalid entity in request:" + entity)
}
}
return nil
}
func StartStreamingDelta(ctx context.Context, cfg StreamingConfig,
url, port, apiToken string,
callback func(bool, []openapi.ModelNodeIdentifier),
onErrorCallback func(error)) error {
//First validate the config
err := cfg.validateConfig()
if err != nil {
return err
}
apiClient := NewHttpsConsoleClient(url, port)
if apiClient == nil {
return fmt.Errorf("Failed to create console client")
}
err = apiClient.APITokenAuthenticate(apiToken)
if err != nil {
return err
}
//Client is authenticated and we have valid config
//Start the streaming loop
ticker := time.NewTicker(30 * time.Second)
timestamp := time.Now()
topologyAPI := apiClient.Client().TopologyAPI
req := topologyAPI.GetTopologyDelta(ctx)
deltaReq := openapi.ModelTopologyDeltaReq{}
deltaReq.SetAddition(cfg.Addition)
deltaReq.SetDeletion(cfg.Deletion)
deltaReq.SetEntityTypes(cfg.EntityTypes)
deltaReq.SetAdditionTimestamp(timestamp.UnixMilli())
deltaReq.SetDeletionTimestamp(timestamp.UnixMilli())
streamloop:
for {
select {
case <-ctx.Done():
break streamloop
case <-ticker.C:
req = req.ModelTopologyDeltaReq(deltaReq)
deltaRes, rh, err := topologyAPI.GetTopologyDeltaExecute(req)
if err != nil {
onErrorCallback(err)
continue
}
if rh.StatusCode != http.StatusOK {
//We will continue on getting any error
//Not sure if we should keep a counter to limit
//this retries.
onErrorCallback(fmt.Errorf("Got non-200 status code:%d", rh.StatusCode))
continue
}
if deltaRes.Additons != nil && len(deltaRes.Additons) > 0 {
callback(true, deltaRes.Additons)
}
if deltaRes.Deletions != nil && len(deltaRes.Deletions) > 0 {
callback(false, deltaRes.Deletions)
}
if deltaReq.Addition && (deltaRes.AdditionTimestamp != nil) {
deltaReq.SetAdditionTimestamp(*deltaRes.AdditionTimestamp)
}
if deltaReq.Deletion && (deltaRes.DeletionTimestamp != nil) {
deltaReq.SetDeletionTimestamp(*deltaRes.DeletionTimestamp)
}
}
}
return nil
}