Skip to content

Commit

Permalink
super columns + filters
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Mar 3, 2022
1 parent 3a98f46 commit e5b0252
Show file tree
Hide file tree
Showing 15 changed files with 777 additions and 175 deletions.
14 changes: 9 additions & 5 deletions pkg/loki/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (q *Query) convertToAnyMatch() *Query {
// Second capture: name of the field.
// Third capture: JSON string value or empty
// Fourth capture: JSON number value or empty
var jsonField = regexp.MustCompile(`^"([\w.-]*)":(?:(?:"((?:(?:\^")|[^"])*)"?)|(\d+))$`)
var jsonField = regexp.MustCompile(`([\w.-]*)":(?:(?:"((?:(?:\^")|[^"])*)"?)|(\d+))$`)

// lineToLabelFilter extracts the JSON field name and value from a line filter and converts
// it to a labelFilter
Expand All @@ -71,12 +71,16 @@ func lineToLabelFilter(lf string) (labelFilter, bool) {
valueType: typeNumber,
}, true
}

v := submatch[2]
if !strings.HasSuffix(v, `"`) && !strings.HasSuffix(v, ".*") {
v = v + ".*"
}

return labelFilter{
key: submatch[1],
// TODO: if at some point we want to filter by exact string values, we should
// conditionally replace the matcher and remove the .* suffix
key: submatch[1],
matcher: labelMatches,
value: submatch[2] + ".*",
value: v,
valueType: typeRegex,
}, true
}
9 changes: 9 additions & 0 deletions pkg/loki/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func stringLabelFilter(labelKey string, matcher labelMatcher, value string) labe
}
}

func regexLabelFilter(labelKey string, matcher labelMatcher, value string) labelFilter {
return labelFilter{
key: labelKey,
matcher: labelMatches,
value: value,
valueType: typeString,
}
}

func intLabelFilter(labelKey string, value int) labelFilter {
return labelFilter{
key: labelKey,
Expand Down
238 changes: 194 additions & 44 deletions pkg/loki/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/netobserv/network-observability-console-plugin/pkg/utils"
"github.com/sirupsen/logrus"
)

const (
Expand All @@ -28,8 +29,10 @@ const (
anyMatchValue = "any"
)

var qlog = logrus.WithField("component", "loki.query")

// can contains only alphanumeric / '-' / '_' / '.' / ',' / '"' / '*' characteres
var filterRegexpValidation = regexp.MustCompile(`^[\w-_.,\"*]*$`)
var filterRegexpValidation = regexp.MustCompile(`^[\w-_.,\"*:]*$`)

// remove quotes and replace * by regex any
var valueReplacer = strings.NewReplacer(`*`, `.*`, `"`, "")
Expand All @@ -38,21 +41,23 @@ type LabelJoiner string

const (
// joinOr spaces are escaped to avoid problems when querying Loki
joinOr = LabelJoiner("+or+")
joinAnd = LabelJoiner("|")
joinOr = LabelJoiner("+or+")
joinPipeAnd = LabelJoiner("|")
)

// Query for a LogQL HTTP petition
// The HTTP body of the query is composed by:
// {streamSelector}|lineFilters|json|labelFilters
type Query struct {
// urlParams for the HTTP call
urlParams [][2]string
labelMap map[string]struct{}
streamSelector []labelFilter
lineFilters []string
labelFilters []labelFilter
labelJoiner LabelJoiner
urlParams [][2]string
labelMap map[string]struct{}
streamSelector []labelFilter
lineFilters []string
labelFilters []labelFilter
currentGroup *string
groupedLabelFilters map[string][]labelFilter
labelJoiner LabelJoiner
// Attributes with a special meaning that need to be processed independently
specialAttrs map[string]string
export *Export
Expand All @@ -69,10 +74,11 @@ func NewQuery(labels []string, export bool) *Query {
exp = &Export{}
}
return &Query{
specialAttrs: map[string]string{},
labelJoiner: joinAnd,
export: exp,
labelMap: utils.GetMapInterface(labels),
specialAttrs: map[string]string{},
labelJoiner: joinPipeAnd,
export: exp,
labelMap: utils.GetMapInterface(labels),
groupedLabelFilters: map[string][]labelFilter{},
}
}

Expand All @@ -94,16 +100,22 @@ func (q *Query) URLQuery() (string, error) {
sb.WriteString(lf)
sb.WriteByte('`')
}
if len(q.labelFilters) > 0 {
if len(q.labelFilters) > 0 || len(q.groupedLabelFilters) > 0 {
if q.labelJoiner == "" {
panic("Label Joiner can't be empty")
}
sb.WriteString("|json|")
for i, lf := range q.labelFilters {
q.WriteLabelFilter(&sb, &q.labelFilters, q.labelJoiner)
i := 0
for _, glf := range q.groupedLabelFilters {
if i > 0 {
sb.WriteString(string(q.labelJoiner))
}
lf.writeInto(&sb)
//group with parenthesis
sb.WriteByte('(')
q.WriteLabelFilter(&sb, &glf, joinOr)
sb.WriteByte(')')
i++
}
}
if len(q.urlParams) > 0 {
Expand All @@ -117,34 +129,30 @@ func (q *Query) URLQuery() (string, error) {
return sb.String(), nil
}

func (q *Query) WriteLabelFilter(sb *strings.Builder, lfs *[]labelFilter, lj LabelJoiner) {
for i, lf := range *lfs {
if i > 0 {
sb.WriteString(string(lj))
}
lf.writeInto(sb)
}
}

func (q *Query) AddParam(key, value string) error {
if !filterRegexpValidation.MatchString(value) {
return fmt.Errorf("unauthorized sign in flows request: %s", value)
}
switch key {
case exportFormatKey:
if q.export != nil {
q.export.format = value
} else {
return fmt.Errorf("export format is not allowed for this endpoint")
}
return q.addParamFormat(value)
case columnsKey:
if q.export != nil {
values := strings.Split(value, ",")
q.export.columns = values
} else {
return fmt.Errorf("export columns are not allowed for this endpoint")
}
return q.addParamColumns(value)
case startTimeKey:
q.addURLParam(startParam, value)
case endTimeTimeKey:
q.addURLParam(endParam, value)
case timeRangeKey:
r, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return err
}
q.addURLParam(startParam, strconv.FormatInt(time.Now().Unix()-r, 10))
return q.addParamTime(value)
case limitKey:
q.addURLParam(limitParam, value)
// Attributes that have a special meaning and need to be treated apart
Expand All @@ -153,13 +161,50 @@ func (q *Query) AddParam(key, value string) error {
// IP filter labels
case "DstAddr", "SrcAddr", "DstHostIP", "SrcHostIP":
q.processIPFilters(key, strings.Split(value, ","))
case "Workload", "Namespace":
q.processCommonLabelFilter(key, strings.Split(value, ","))
case "FQDN", "SrcFQDN", "DstFQDN":
q.processFQDNFilter(key, strings.Split(value, ","))
default:
// Stream selector labels
if _, ok := q.labelMap[key]; ok {
q.processStreamSelector(key, strings.Split(value, ","))
} else {
return q.processLineFilters(key, strings.Split(value, ","))
}
return q.addParamDefault(key, value)
}
return nil
}

func (q *Query) addParamFormat(value string) error {
if q.export != nil {
q.export.format = value
} else {
return fmt.Errorf("export format is not allowed for this endpoint")
}
return nil
}

func (q *Query) addParamColumns(value string) error {
if q.export != nil {
values := strings.Split(value, ",")
q.export.columns = values
} else {
return fmt.Errorf("export columns are not allowed for this endpoint")
}
return nil
}

func (q *Query) addParamTime(value string) error {
r, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return err
}
q.addURLParam(startParam, strconv.FormatInt(time.Now().Unix()-r, 10))
return nil
}

func (q *Query) addParamDefault(key, value string) error {
// Stream selector labels
if _, ok := q.labelMap[key]; ok {
q.processStreamSelector(key, strings.Split(value, ","))
} else {
return q.processLineFilters(key, strings.Split(value, ","))
}
return nil
}
Expand Down Expand Up @@ -217,16 +262,26 @@ func (q *Query) processStreamSelector(key string, values []string) {
}

if regexStr.Len() > 0 {
q.streamSelector = append(q.streamSelector,
stringLabelFilter(key, labelMatches, regexStr.String()))
if q.currentGroup == nil {
q.streamSelector = append(q.streamSelector,
stringLabelFilter(key, labelMatches, regexStr.String()))
} else {
q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup],
stringLabelFilter(key, labelMatches, regexStr.String()))
}
}
}

// filterIPInLine assumes that we are searching for that IP addresses as part
// of the log line (not in the stream selector labels)
func (q *Query) processIPFilters(key string, values []string) {
for _, value := range values {
q.labelFilters = append(q.labelFilters, ipLabelFilter(key, value))
if q.currentGroup == nil {
q.labelFilters = append(q.labelFilters, ipLabelFilter(key, value))
} else {
q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup],
ipLabelFilter(key, value))
}
}
}

Expand All @@ -250,8 +305,11 @@ func (q *Query) processLineFilters(key string, values []string) error {
if i > 0 {
regexStr.WriteByte('|')
}
//match KEY + VALUE: "KEY":"[^\"]*VALUE" (ie: contains VALUE) or, if numeric, "KEY":VALUE
regexStr.WriteString(`"`)
//match end of KEY + regex VALUE:
//if numeric, KEY":VALUE
//if string KEY":"VALUE"
//ie 'Port' key will match both 'SrcPort":"XXX"' and 'DstPort":"XXX"
//VALUE can be quoted for exact match or contains * to inject regex any
regexStr.WriteString(key)
regexStr.WriteString(`":`)
if isNumeric(key) {
Expand All @@ -273,14 +331,106 @@ func (q *Query) processLineFilters(key string, values []string) error {
}

if regexStr.Len() > 0 {
q.lineFilters = append(q.lineFilters, regexStr.String())
if q.currentGroup == nil {
q.lineFilters = append(q.lineFilters, regexStr.String())
} else {
lf, ok := lineToLabelFilter(regexStr.String())
if ok {
q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], lf)
} else {
qlog.WithField("lineFilter", lf).
Warningf("line filter can't be parsed as json attribute. Ignoring it")
}
}
}
return nil
}

func (q *Query) processCommonLabelFilter(key string, values []string) {
for _, value := range values {
regexStr := strings.Builder{}
// match start any if not quoted
if !strings.HasPrefix(value, `"`) {
regexStr.WriteString(".*")
}
//inject value with regex
regexStr.WriteString(valueReplacer.Replace(value))
// match end any if not quoted
if !strings.HasSuffix(value, `"`) {
regexStr.WriteString(".*")
}
// apply filter on both Src and Dst fields
if q.currentGroup == nil {
q.labelFilters = append(q.labelFilters, regexLabelFilter("Src"+key, labelMatches, regexStr.String()))
q.labelFilters = append(q.labelFilters, regexLabelFilter("Dst"+key, labelMatches, regexStr.String()))
} else {
q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], regexLabelFilter("Src"+key, labelMatches, regexStr.String()))
q.groupedLabelFilters[*q.currentGroup] = append(q.groupedLabelFilters[*q.currentGroup], regexLabelFilter("Dst"+key, labelMatches, regexStr.String()))
}
}
}

func (q *Query) processFQDNFilter(key string, values []string) {
prefix := ""
if strings.HasPrefix(key, "Src") {
prefix = "Src"
} else if strings.HasPrefix(key, "Dst") {
prefix = "Dst"
}

for _, value := range values {
//FQDN can either be namespace / pod / namespace.pod / ipaddress / port / ipaddress:port
if strings.Contains(value, ":") {
ipAndPort := strings.Split(value, ":")
q.AddParamSrcDst(prefix, "Addr", ipAndPort[0])
q.AddParamSrcDst(prefix, "Port", ipAndPort[1])
} else if strings.Contains(value, ".") {
splittedValue := strings.Split(value, ".")
if len(splittedValue) == 2 {
q.AddParamSrcDst(prefix, "Namespace", splittedValue[0])
q.AddParamSrcDst(prefix, "Pod", splittedValue[1])
} else {
q.AddParamSrcDst(prefix, "Addr", value)
}
} else if _, err := strconv.Atoi(value); err == nil {
q.AddParamSrcDst(prefix, "Port", value)
} else {
q.AddParamSrcDst(prefix, "Namespace", value)
q.AddParamSrcDst(prefix, "Pod", value)
}
}
}

func (q *Query) AddParamSrcDst(prefix, key, value string) {
if len(prefix) > 0 {
q.currentGroup = &prefix
err := q.AddParam(prefix+key, value)
if err != nil {
qlog.Error(err)
}
q.currentGroup = nil
} else {
srcPrefix := "Src"
dstPrefix := "Dst"
q.currentGroup = &srcPrefix
err := q.AddParam(srcPrefix+key, value)
if err != nil {
qlog.Error(err)
}
q.currentGroup = &dstPrefix
err = q.AddParam(dstPrefix+key, value)
if err != nil {
qlog.Error(err)
}
q.currentGroup = nil
}

}

func isNumeric(v string) bool {
switch v {
case
"Port",
"SrcPort",
"DstPort",
"Packets",
Expand Down

0 comments on commit e5b0252

Please sign in to comment.