Skip to content

Commit

Permalink
split fqdn field
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Mar 7, 2022
1 parent 22a936b commit 697fa74
Show file tree
Hide file tree
Showing 2,443 changed files with 756,153 additions and 3,923 deletions.
32 changes: 30 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,43 @@ require (
github.com/json-iterator/go v1.1.12
github.com/prometheus/common v0.32.1
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.7.0
k8s.io/api v0.23.4
k8s.io/apimachinery v0.23.4
k8s.io/client-go v0.23.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.5 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
github.com/imdario/mergo v0.3.5 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.3.0 // indirect
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect
golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e // indirect
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
240 changes: 235 additions & 5 deletions go.sum

Large diffs are not rendered by default.

58 changes: 0 additions & 58 deletions pkg/handler/loki.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package handler

import (
"encoding/csv"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -119,47 +118,6 @@ func getLokiError(resp []byte, code int) string {
return fmt.Sprintf("Error from Loki (code: %d): %s", code, message)
}

func writeRawJSON(w http.ResponseWriter, code int, payload []byte) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_, err := w.Write(payload)
if err != nil {
hlog.Errorf("Error while responding raw JSON: %v", err)
}
}

func writeCSV(w http.ResponseWriter, code int, payload []byte, columns []string) {
var qr model.QueryResponse
err := json.Unmarshal(payload, &qr)
if err != nil {
writeError(w, http.StatusServiceUnavailable, fmt.Sprintf("Unknown error from Loki - cannot unmarshal (code: %d resp: %s)", code, payload))
return
}

datas, err := getCSVDatas(&qr, columns)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}

t := time.Now()
//output file would be 'export-stdLongYear-stdZeroMonth-stdZeroDay-stdHour-stdZeroMinute.csv'
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=export-%s.csv", t.Format("2006-01-02-15-04")))
w.Header().Set("Content-Type", "text/csv")
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(code)
writer := csv.NewWriter(w)
for _, row := range datas {
//write csv row
err := writer.Write(row)
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("Cannot write row %s", row))
return
}
}
writer.Flush()
}

func getCSVDatas(qr *model.QueryResponse, columns []string) ([][]string, error) {
if columns != nil && len(columns) == 0 {
return nil, fmt.Errorf("columns can't be empty if specified")
Expand Down Expand Up @@ -244,19 +202,3 @@ func getRowDatas(stream model.Stream, entry model.Entry, labels []string, fields

return rowDatas
}

type errorResponse struct{ Message string }

func writeError(w http.ResponseWriter, code int, message string) {
response, err := json.Marshal(errorResponse{Message: message})
if err != nil {
hlog.Errorf("Marshalling error while responding an error: %v (message was: %s)", err, message)
code = http.StatusInternalServerError
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_, err = w.Write(response)
if err != nil {
hlog.Errorf("Error while responding an error: %v (message was: %s)", err, message)
}
}
69 changes: 69 additions & 0 deletions pkg/handler/resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package handler

import (
"encoding/json"
"fmt"
"net/http"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

func GetResources(kubeClient kubernetes.Interface, resourceType string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()

rt := resourceType
if len(rt) == 0 {
rt = params.Get("resourceType")
}

// TODO: remove all logs
hlog.Infof("GetResources resourceType : %s query params : %s\n", rt, params)

result := []string{}
switch rt {
case "namespaces":
// list all namespaces
namespaceList, err := kubeClient.CoreV1().Namespaces().List(r.Context(), metav1.ListOptions{})
if err != nil {
panic(err)
}

fmt.Printf("found %d namespaces\n", len(namespaceList.Items))
for i := range namespaceList.Items {
result = append(result, namespaceList.Items[i].Name)
}
case "pods":
namespace := params.Get("namespace")

if len(namespace) == 0 {
writeError(w, http.StatusServiceUnavailable, "namespace cannot be empty")
return
}

// list all Pods in namespace
podList, err := kubeClient.CoreV1().Pods(namespace).List(r.Context(), metav1.ListOptions{})
if err != nil {
panic(err)
}

fmt.Printf("found %d pods in namespace %s\n", len(podList.Items), namespace)
for i := range podList.Items {
result = append(result, podList.Items[i].Name)
}
default:
writeError(w, http.StatusServiceUnavailable,
fmt.Sprintf("unknown resourceType: %s", rt))
return
}

resp, err := json.Marshal(result)
if err != nil {
writeError(w, http.StatusServiceUnavailable,
fmt.Sprintf("cannot marshal %v", params))
} else {
writeRawJSON(w, http.StatusOK, resp)
}
}
}
68 changes: 68 additions & 0 deletions pkg/handler/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package handler

import (
"encoding/csv"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/netobserv/network-observability-console-plugin/pkg/model"
)

func writeRawJSON(w http.ResponseWriter, code int, payload []byte) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_, err := w.Write(payload)
if err != nil {
hlog.Errorf("Error while responding raw JSON: %v", err)
}
}

func writeCSV(w http.ResponseWriter, code int, payload []byte, columns []string) {
var qr model.QueryResponse
err := json.Unmarshal(payload, &qr)
if err != nil {
writeError(w, http.StatusServiceUnavailable, fmt.Sprintf("Unknown error from Loki - cannot unmarshal (code: %d resp: %s)", code, payload))
return
}

datas, err := getCSVDatas(&qr, columns)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}

t := time.Now()
//output file would be 'export-stdLongYear-stdZeroMonth-stdZeroDay-stdHour-stdZeroMinute.csv'
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=export-%s.csv", t.Format("2006-01-02-15-04")))
w.Header().Set("Content-Type", "text/csv")
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(code)
writer := csv.NewWriter(w)
for _, row := range datas {
//write csv row
err := writer.Write(row)
if err != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("Cannot write row %s", row))
return
}
}
writer.Flush()
}

type errorResponse struct{ Message string }

func writeError(w http.ResponseWriter, code int, message string) {
response, err := json.Marshal(errorResponse{Message: message})
if err != nil {
hlog.Errorf("Marshalling error while responding an error: %v (message was: %s)", err, message)
code = http.StatusInternalServerError
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_, err = w.Write(response)
if err != nil {
hlog.Errorf("Error while responding an error: %v (message was: %s)", err, message)
}
}
51 changes: 33 additions & 18 deletions pkg/loki/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ const (

var qlog = logrus.WithField("component", "loki.query")

// can contains only alphanumeric / '-' / '_' / '.' / ',' / '"' / '*' / ':' characteres
var filterRegexpValidation = regexp.MustCompile(`^[\w-_.,\"*:]*$`)
// can contains only alphanumeric / '-' / '_' / '.' / ',' / '"' / '*' / ':' / '/' characteres
var filterRegexpValidation = regexp.MustCompile(`^[\w-_.,\"*:/]*$`)

// remove quotes and replace * by regex any
var valueReplacer = strings.NewReplacer(`*`, `.*`, `"`, "")

type LabelJoiner string

const (
// joinOr spaces are escaped to avoid problems when querying Loki
joinAnd = LabelJoiner("+and+")
joinOr = LabelJoiner("+or+")
joinPipeAnd = LabelJoiner("|")
)
Expand Down Expand Up @@ -113,7 +113,8 @@ func (q *Query) URLQuery() (string, error) {
}
//group with parenthesis
sb.WriteByte('(')
q.WriteLabelFilter(&sb, &glf, joinOr)
//each group member must match
q.WriteLabelFilter(&sb, &glf, joinAnd)
sb.WriteByte(')')
i++
}
Expand Down Expand Up @@ -163,8 +164,10 @@ func (q *Query) AddParam(key, value string) error {
q.processIPFilters(key, strings.Split(value, ","))
case "Workload", "Namespace":
q.processCommonLabelFilter(key, strings.Split(value, ","))
case "FQDN", "SrcFQDN", "DstFQDN":
q.processFQDNFilter(key, strings.Split(value, ","))
case "NamespacePod", "SrcNamespacePod", "DstNamespacePod":
q.processNamespacePodFilter(key, strings.Split(value, ","))
case "AddrPort", "SrcAddrPort", "DstAddrPort":
q.processAddressPortFilter(key, strings.Split(value, ","))
default:
return q.addParamDefault(key, value)
}
Expand Down Expand Up @@ -370,7 +373,7 @@ func (q *Query) processCommonLabelFilter(key string, values []string) {
}
}

func (q *Query) processFQDNFilter(key string, values []string) {
func (q *Query) processNamespacePodFilter(key string, values []string) {
prefix := ""
if strings.HasPrefix(key, "Src") {
prefix = "Src"
Expand All @@ -379,24 +382,36 @@ func (q *Query) processFQDNFilter(key string, values []string) {
}

for _, value := range values {
//FQDN can either be namespace / pod / namespace.pod / ipaddress / port / ipaddress:port
//can either be namespace / pod / namespace.pod / ipaddress / port / ipaddress:port
if strings.Contains(value, ".") {
splittedValue := strings.Split(value, ".")
q.AddParamSrcDst(prefix, "Namespace", splittedValue[0])
q.AddParamSrcDst(prefix, "Pod", splittedValue[1])
} else {
q.AddParamSrcDst(prefix, "Namespace", value)
q.AddParamSrcDst(prefix, "Pod", value)
}
}
}

func (q *Query) processAddressPortFilter(key string, values []string) {
prefix := ""
if strings.HasPrefix(key, "Src") {
prefix = "Src"
} else if strings.HasPrefix(key, "Dst") {
prefix = "Dst"
}

for _, value := range values {
//can either be ipaddress / port / ipaddress:port
if strings.Contains(value, ":") {
ipAndPort := strings.Split(value, ":")
q.AddParamSrcDst(prefix, "Addr", ipAndPort[0])
q.AddParamSrcDst(prefix, "Port", ipAndPort[1])
} else if strings.Contains(value, ".") {
splittedValue := strings.Split(value, ".")
if len(splittedValue) == 2 {
q.AddParamSrcDst(prefix, "Namespace", splittedValue[0])
q.AddParamSrcDst(prefix, "Pod", splittedValue[1])
} else {
q.AddParamSrcDst(prefix, "Addr", value)
}
q.AddParamSrcDst(prefix, "Addr", value)
} else if _, err := strconv.Atoi(value); err == nil {
q.AddParamSrcDst(prefix, "Port", value)
} else {
q.AddParamSrcDst(prefix, "Namespace", value)
q.AddParamSrcDst(prefix, "Pod", value)
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/server/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import (
"net/http"

"github.com/gorilla/mux"
"k8s.io/client-go/kubernetes"

"github.com/netobserv/network-observability-console-plugin/pkg/handler"
)

func setupRoutes(cfg *Config) *mux.Router {
func setupRoutes(cfg *Config, kubeClient kubernetes.Interface) *mux.Router {
r := mux.NewRouter()
r.HandleFunc("/api/status", handler.Status)
r.HandleFunc("/api/loki/flows", handler.GetFlows(cfg.Loki, false))
r.HandleFunc("/api/loki/export", handler.GetFlows(cfg.Loki, true))
r.HandleFunc("/api/resources", handler.GetResources(kubeClient, ""))
r.HandleFunc("/api/resources/namespaces", handler.GetResources(kubeClient, "namespaces"))
r.HandleFunc("/api/resources/pods", handler.GetResources(kubeClient, "pods"))
r.PathPrefix("/").Handler(http.FileServer(http.Dir("./web/dist/")))
return r
}

0 comments on commit 697fa74

Please sign in to comment.