Skip to content

Commit

Permalink
Add klogs plugin (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
ricoberger committed Jun 6, 2022
1 parent 5bb8acc commit a32a80d
Show file tree
Hide file tree
Showing 60 changed files with 5,508 additions and 10 deletions.
242 changes: 242 additions & 0 deletions plugins/plugin-klogs/cmd/klogs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package main

import (
"encoding/json"
"net/http"
"strconv"
"time"

"github.com/kobsio/kobs/pkg/kube/clusters"
"github.com/kobsio/kobs/pkg/log"
"github.com/kobsio/kobs/pkg/middleware/errresponse"
"github.com/kobsio/kobs/pkg/satellite/plugins/plugin"
"github.com/kobsio/kobs/plugins/plugin-klogs/pkg/instance"

"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
"go.uber.org/zap"
)

// Router implements the router for the klogs plugin, which can be registered in the router for our rest api. It contains
// the api routes for the klogs plugin and it's configuration.
type Router struct {
*chi.Mux
instances []instance.Instance
}

// getInstance returns a klogs instance by it's name. If we couldn't found an instance with the provided name and the
// provided name is "default" we return the first instance from the list. The first instance in the list is also the
// first one configured by the user and can be used as default one.
func (router *Router) getInstance(name string) instance.Instance {
for _, i := range router.instances {
if i.GetName() == name {
return i
}
}

if name == "default" && len(router.instances) > 0 {
return router.instances[0]
}

return nil
}

func (router *Router) getFields(w http.ResponseWriter, r *http.Request) {
name := r.Header.Get("x-kobs-plugin")
filter := r.URL.Query().Get("filter")
fieldType := r.URL.Query().Get("fieldType")

log.Debug(r.Context(), "Get fields parameters", zap.String("name", name), zap.String("filter", filter), zap.String("fieldType", fieldType))

i := router.getInstance(name)
if i == nil {
log.Error(r.Context(), "Could not find instance name", zap.String("name", name))
errresponse.Render(w, r, nil, http.StatusBadRequest, "Could not find instance name")
return
}

fields := i.GetFields(filter, fieldType)
log.Debug(r.Context(), "Get fields result", zap.Int("fieldsCount", len(fields)))
render.JSON(w, r, fields)
}

// getLogs implements the special handling when the user selected the "logs" options for the "view" configuration. This
// options is intended to use together with the kobsio/klogs Fluent Bit plugin and provides a custom query language to
// get the logs from ClickHouse ingested via kobsio/klogs.
func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
name := r.Header.Get("x-kobs-plugin")
query := r.URL.Query().Get("query")
order := r.URL.Query().Get("order")
orderBy := r.URL.Query().Get("orderBy")
timeStart := r.URL.Query().Get("timeStart")
timeEnd := r.URL.Query().Get("timeEnd")

log.Debug(r.Context(), "Get logs paramters", zap.String("name", name), zap.String("query", query), zap.String("order", order), zap.String("orderBy", orderBy), zap.String("timeStart", timeStart), zap.String("timeEnd", timeEnd))

i := router.getInstance(name)
if i == nil {
log.Error(r.Context(), "Could not find instance name", zap.String("name", name))
errresponse.Render(w, r, nil, http.StatusBadRequest, "Could not find instance name")
return
}

parsedTimeStart, err := strconv.ParseInt(timeStart, 10, 64)
if err != nil {
log.Error(r.Context(), "Could not parse start time", zap.Error(err))
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse start time")
return
}

parsedTimeEnd, err := strconv.ParseInt(timeEnd, 10, 64)
if err != nil {
log.Error(r.Context(), "Could not parse end time", zap.Error(err))
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse end time")
return
}

// Query for larger time ranges can took several minutes to be completed. To avoid that the connection is closed for
// these long running requests by a load balancer which sits infront of kobs, we are writing a newline character
// every 10 seconds. We shouldn't write sth. else, because this would make parsing the response in the React UI more
// diffucult and with the newline character parsing works in the same ways as it was before.
done := make(chan bool)

go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-done:
return
case <-ticker.C:
if f, ok := w.(http.Flusher); ok {
// We do not set the processing status code, so that the queries always are returning a 200. This is
// necessary because Go doesn't allow to set a new status code once the header was written.
// See: https://github.com/golang/go/issues/36734
// For that we also have to handle errors, when the status code is 200 in the React UI.
// See plugins/klogs/src/components/page/Logs.tsx#L64
// w.WriteHeader(http.StatusProcessing)
w.Write([]byte("\n"))
f.Flush()
}
}
}
}()

defer func() {
done <- true
}()

documents, fields, count, took, buckets, err := i.GetLogs(r.Context(), query, order, orderBy, 1000, parsedTimeStart, parsedTimeEnd)
if err != nil {
log.Error(r.Context(), "Could not get logs", zap.Error(err))
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not get logs")
return
}

data := struct {
Documents []map[string]interface{} `json:"documents"`
Fields []string `json:"fields"`
Count int64 `json:"count"`
Took int64 `json:"took"`
Buckets []instance.Bucket `json:"buckets"`
}{
documents,
fields,
count,
took,
buckets,
}

render.JSON(w, r, data)
}

// getAggregation returns the columns and rows for the user given aggregation request. The aggregation data must
// provided in the body of the request and is the run against the specified Clichouse instance.
func (router *Router) getAggregation(w http.ResponseWriter, r *http.Request) {
name := r.Header.Get("x-kobs-plugin")

log.Debug(r.Context(), "Get aggregation paramters", zap.String("name", name))

i := router.getInstance(name)
if i == nil {
log.Error(r.Context(), "Could not find instance name", zap.String("name", name))
errresponse.Render(w, r, nil, http.StatusBadRequest, "Could not find instance name")
return
}

var aggregationData instance.Aggregation

err := json.NewDecoder(r.Body).Decode(&aggregationData)
if err != nil {
log.Error(r.Context(), "Could not decode request body", zap.Error(err))
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not decode request body")
return
}

done := make(chan bool)

go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-done:
return
case <-ticker.C:
if f, ok := w.(http.Flusher); ok {
// w.WriteHeader(http.StatusProcessing)
w.Write([]byte("\n"))
f.Flush()
}
}
}
}()

defer func() {
done <- true
}()

rows, columns, err := i.GetAggregation(r.Context(), aggregationData)
if err != nil {
log.Error(r.Context(), "Error while running aggregation", zap.Error(err))
errresponse.Render(w, r, err, http.StatusBadRequest, "Error while running aggregation")
return
}

data := struct {
Rows []map[string]interface{} `json:"rows"`
Columns []string `json:"columns"`
}{
rows,
columns,
}

render.JSON(w, r, data)
}

// Mount mounts the klogs plugin routes in the plugins router of a kobs satellite instance.
func Mount(instances []plugin.Instance, clustersClient clusters.Client) (chi.Router, error) {
var klogsInstances []instance.Instance

for _, i := range instances {
klogsInstance, err := instance.New(i.Name, i.Options)
if err != nil {
return nil, err
}

klogsInstances = append(klogsInstances, klogsInstance)
}

router := Router{
chi.NewRouter(),
klogsInstances,
}

router.Get("/fields", router.getFields)
router.Get("/logs", router.getLogs)
router.Post("/aggregation", router.getAggregation)

return router, nil
}
36 changes: 36 additions & 0 deletions plugins/plugin-klogs/cmd/klogs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"testing"

"github.com/kobsio/kobs/pkg/satellite/plugins/plugin"
"github.com/kobsio/kobs/plugins/plugin-klogs/pkg/instance"

"github.com/go-chi/chi/v5"
"github.com/stretchr/testify/require"
)

func TestGetInstance(t *testing.T) {
mockInstance := &instance.MockInstance{}
mockInstance.On("GetName").Return("klogs")

router := Router{chi.NewRouter(), []instance.Instance{mockInstance}}
instance1 := router.getInstance("default")
require.NotNil(t, instance1)

instance2 := router.getInstance("klogs")
require.NotNil(t, instance2)

instance3 := router.getInstance("invalidname")
require.Nil(t, instance3)
}

func TestMount(t *testing.T) {
router1, err := Mount([]plugin.Instance{{Name: "klogs", Options: map[string]interface{}{}}}, nil)
require.NoError(t, err)
require.NotNil(t, router1)

router2, err := Mount([]plugin.Instance{{Name: "klogs", Options: map[string]interface{}{"username": []string{"token"}}}}, nil)
require.Error(t, err)
require.Nil(t, router2)
}
51 changes: 51 additions & 0 deletions plugins/plugin-klogs/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"name": "@kobsio/plugin-klogs",
"version": "0.0.0",
"license": "MIT",
"private": true,
"dependencies": {
"@kobsio/react-scripts": "5.0.1-1",
"@nivo/bar": "^0.79.1",
"@nivo/core": "^0.79.0",
"@nivo/line": "^0.79.1",
"@nivo/pie": "^0.79.1",
"@patternfly/patternfly": "^4.194.4",
"@patternfly/react-charts": "^6.67.1",
"@patternfly/react-core": "^4.214.1",
"@patternfly/react-icons": "^4.65.1",
"@patternfly/react-styles": "^4.64.1",
"@patternfly/react-table": "^4.83.1",
"@testing-library/jest-dom": "^5.14.1",
"@testing-library/react": "^13.0.0",
"@testing-library/user-event": "^13.2.1",
"@types/jest": "^27.0.1",
"@types/node": "^16.7.13",
"@types/react": "^18.0.0",
"@types/react-dom": "^18.0.0",
"@types/react-router-dom": "^5.3.3",
"react": "^18.1.0",
"react-dom": "^18.1.0",
"react-query": "^3.39.0",
"react-router-dom": "^6.3.0",
"typescript": "^4.4.2"
},
"scripts": {
"start": "PUBLIC_URL='http://localhost:3001/' PORT=3001 react-scripts start",
"build": "PUBLIC_URL='/plugins/klogs' react-scripts build",
"test": "react-scripts test",
"eject": "react-scripts eject"
},
"proxy": "http://localhost:15220",
"browserslist": {
"production": [
">0.2%",
"not dead",
"not op_mini all"
],
"development": [
"last 1 chrome version",
"last 1 firefox version",
"last 1 safari version"
]
}
}

0 comments on commit a32a80d

Please sign in to comment.