-
-
Notifications
You must be signed in to change notification settings - Fork 8
/
helpers.go
250 lines (204 loc) · 8.7 KB
/
helpers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package instance
import (
"fmt"
"strings"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
defaultFields = []string{"timestamp", "cluster", "namespace", "app", "pod_name", "container_name", "host", "log"}
defaultColumns = "timestamp, cluster, namespace, app, pod_name, container_name, host, fields_string, fields_number, log"
fieldsMetric = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "kobs",
Name: "klogs_fields_total",
Help: "Number how often a field was used in a query.",
}, []string{"field"})
)
// parseLogsQuery parses the given query string and return the conditions for the where statement in the sql query. We
// are providing a very simple query language where the user can use "(", ")", "_not_", "_and_" and "_or_" operators.
// Then we are splitting the string again for the other operators "=", "!=", ">", ">=", "<", "<=" and "~" which are used
// to check the value of a field.
// Once we have build all the conditions we concate all the strings to the final sql statement for the where clause.
func parseLogsQuery(query string, materializedColumns []string) (string, error) {
var newOpenBrackets []string
openBrackets := strings.Split(query, "(")
for _, openBracket := range openBrackets {
var newCloseBrackets []string
closeBrackets := strings.Split(openBracket, ")")
for _, closeBracket := range closeBrackets {
var newNots []string
nots := strings.Split(closeBracket, "_not_")
for _, not := range nots {
var newAnds []string
ands := strings.Split(not, "_and_")
for _, and := range ands {
var newOrs []string
ors := strings.Split(and, "_or_")
for _, or := range ors {
condition, err := splitOperator(or, materializedColumns)
if err != nil {
return "", err
}
newOrs = append(newOrs, condition)
}
newAnds = append(newAnds, strings.Join(newOrs, " OR "))
}
newNots = append(newNots, strings.Join(newAnds, " AND "))
}
newCloseBrackets = append(newCloseBrackets, strings.Join(newNots, " NOT "))
}
newOpenBrackets = append(newOpenBrackets, strings.Join(newCloseBrackets, ")"))
}
return strings.Join(newOpenBrackets, "("), nil
}
// splitOperator splits the given string by the following operators "=", "!=", ">", ">=", "<", "<=" and "~". If the
// result is a slice with two items we found the operator which was used by the user to check the value of a field. So
// that we pass the key (first item), value (second item) and the operator to the handleConditionParts to build the
// where condition.
func splitOperator(condition string, materializedColumns []string) (string, error) {
greaterThanOrEqual := strings.Split(condition, ">=")
if len(greaterThanOrEqual) == 2 {
return handleConditionParts(greaterThanOrEqual[0], greaterThanOrEqual[1], ">=", materializedColumns)
}
greaterThan := strings.Split(condition, ">")
if len(greaterThan) == 2 {
return handleConditionParts(greaterThan[0], greaterThan[1], ">", materializedColumns)
}
lessThanOrEqual := strings.Split(condition, "<=")
if len(lessThanOrEqual) == 2 {
return handleConditionParts(lessThanOrEqual[0], lessThanOrEqual[1], "<=", materializedColumns)
}
lessThan := strings.Split(condition, "<")
if len(lessThan) == 2 {
return handleConditionParts(lessThan[0], lessThan[1], "<", materializedColumns)
}
ilike := strings.Split(condition, "=~")
if len(ilike) == 2 {
return handleConditionParts(ilike[0], ilike[1], "=~", materializedColumns)
}
notEqual := strings.Split(condition, "!=")
if len(notEqual) == 2 {
return handleConditionParts(notEqual[0], notEqual[1], "!=", materializedColumns)
}
notIlike := strings.Split(condition, "!~")
if len(notIlike) == 2 {
return handleConditionParts(notIlike[0], notIlike[1], "!~", materializedColumns)
}
regex := strings.Split(condition, "~")
if len(regex) == 2 {
return handleConditionParts(regex[0], regex[1], "~", materializedColumns)
}
equal := strings.Split(condition, "=")
if len(equal) == 2 {
return handleConditionParts(equal[0], equal[1], "=", materializedColumns)
}
if strings.Contains(condition, "_exists_ ") {
return handleExistsCondition(strings.TrimLeft(strings.TrimSpace(condition), "_exists_ "), materializedColumns), nil
}
if strings.TrimSpace(condition) == "" {
return "", nil
}
return "", fmt.Errorf("invalid operator: %s", condition)
}
// handleConditionParts converts the given key, value and operator to it's sql representation. This is required because
// some fields like "timestamp", "cluster", "namespace", etc. are a seperate column in the sql table, where others like
// "content_level" or "content_response_code" are only available via the fields_strings / fields_numbers column. For
// these nested columns we have to use a special query syntax. We also have to use the match function when the operator
// is "~" which says that the user checks the field value against a regular expression.
//
// See: https://gist.github.com/alexey-milovidov/d6ffc9e0bc0bc72dd7bca90e76e3b83b
// See: https://clickhouse.tech/docs/en/sql-reference/functions/string-search-functions/#matchhaystack-pattern
func handleConditionParts(key, value, operator string, materializedColumns []string) (string, error) {
key = strings.TrimSpace(key)
value = strings.TrimSpace(value)
// The kobs_klogs_fields_total metric can be used to determine how often a field is used. This information can
// then be used to create an additional column for this field via the following SQL commands:
// ALTER TABLE logs.logs ON CLUSTER '{cluster}' ADD COLUMN <FIELD> String DEFAULT fields_string['<FIELD>'];
// ALTER TABLE logs.logs ON CLUSTER '{cluster}' ADD COLUMN <FIELD> Float64 DEFAULT fields_number['<FIELD>'];
fieldsMetric.WithLabelValues(key).Inc()
if contains(defaultFields, key) || contains(materializedColumns, key) {
if operator == "=~" {
return fmt.Sprintf("%s ILIKE %s", key, value), nil
}
if operator == "!~" {
return fmt.Sprintf("%s NOT ILIKE %s", key, value), nil
}
if operator == "~" {
return fmt.Sprintf("match(%s, %s)", key, value), nil
}
return fmt.Sprintf("%s%s%s", key, operator, value), nil
}
if value != "" && string(value[0]) == "'" && string(value[len(value)-1]) == "'" {
if operator == "=~" {
return fmt.Sprintf("fields_string['%s'] ILIKE %s", key, value), nil
}
if operator == "!~" {
return fmt.Sprintf("fields_string['%s'] NOT ILIKE %s", key, value), nil
}
if operator == "~" {
return fmt.Sprintf("match(fields_string['%s'], %s)", key, value), nil
}
return fmt.Sprintf("fields_string['%s'] %s %s", key, operator, value), nil
}
if operator == "=~" {
return fmt.Sprintf("fields_number['%s'] ILIKE %s", key, value), nil
}
if operator == "!~" {
return fmt.Sprintf("fields_number['%s'] NOT ILIKE %s", key, value), nil
}
if operator == "~" {
return fmt.Sprintf("match(fields_number['%s'], %s)", key, value), nil
}
return fmt.Sprintf("fields_number['%s'] %s %s", key, operator, value), nil
}
func handleExistsCondition(key string, materializedColumns []string) string {
if contains(defaultFields, key) || contains(materializedColumns, key) {
return fmt.Sprintf("%s IS NOT NULL", key)
}
return fmt.Sprintf("(mapContains(fields_string, '%s') = 1 OR mapContains(fields_number, '%s') = 1)", key, key)
}
func parseOrder(order, orderBy string, materializedColumns []string) string {
if order == "" || orderBy == "" {
return "timestamp DESC"
}
if order == "ascending" {
order = "ASC"
} else {
order = "DESC"
}
orderBy = strings.TrimSpace(orderBy)
if contains(defaultFields, orderBy) || contains(materializedColumns, orderBy) {
return fmt.Sprintf("%s %s", orderBy, order)
}
return fmt.Sprintf("fields_string['%s'] %s, fields_number['%s'] %s", orderBy, order, orderBy, order)
}
// getBucketTimes determines the start and end time of an bucket. This is necessary, because the first and last bucket
// time can be outside of the user defined time range.
func getBucketTimes(interval, bucketTime, timeStart, timeEnd int64) (int64, int64) {
if bucketTime < timeStart {
return timeStart, timeStart + interval - (timeStart - bucketTime)
}
if bucketTime+interval > timeEnd {
return bucketTime, bucketTime + timeEnd - bucketTime
}
return bucketTime, bucketTime + interval
}
// appendIfMissing appends a value to a slice, when this values doesn't exist in the slice already.
func appendIfMissing(items []string, item string) []string {
for _, ele := range items {
if ele == item {
return items
}
}
return append(items, item)
}
// contains checks if the given slice of string contains the given item. It returns true when the slice contains the
// given item.
func contains(items []string, item string) bool {
for _, ele := range items {
if ele == item {
return true
}
}
return false
}