Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
- [#159](https://github.com/kobsio/kobs/pull/159): Allow users to select a time range within the logs chart in the ClickHouse plugin.
- [#160](https://github.com/kobsio/kobs/pull/160): Allow users to sort the returned logs within the documents table in the ClickHouse plugin.
- [#161](https://github.com/kobsio/kobs/pull/161): Add support for materialized columns, to improve query performance for most frequently queried field.
- [#162](https://github.com/kobsio/kobs/pull/162): Add support to visualize logs in the ClickHouse plugin.

### Fixed

Expand Down
78 changes: 78 additions & 0 deletions plugins/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,83 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
render.JSON(w, r, data)
}

// getVisualization runs an aggregation for the given values and returns an array of data point which can be used in a
// chart in the React UI. All fields except the limit, start and end time must be strings. The mentioned fields must be
// numbers so we can parse them. If the groupBy field isn't present we use the operationField as groupBy field.
func (router *Router) getVisualization(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "name")
groupBy := r.URL.Query().Get("groupBy")
limit := r.URL.Query().Get("limit")
order := r.URL.Query().Get("order")
operation := r.URL.Query().Get("operation")
operationField := r.URL.Query().Get("operationField")
query := r.URL.Query().Get("query")
timeStart := r.URL.Query().Get("timeStart")
timeEnd := r.URL.Query().Get("timeEnd")

log.WithFields(logrus.Fields{"name": name, "query": query, "groupBy": groupBy, "limit": limit, "operation": operation, "operationField": operationField, "order": order, "timeStart": timeStart, "timeEnd": timeEnd}).Tracef("getLogs")

i := router.getInstance(name)
if i == nil {
errresponse.Render(w, r, nil, http.StatusBadRequest, "Could not find instance name")
return
}

parsedTimeStart, err := strconv.ParseInt(timeStart, 10, 64)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse start time")
return
}

parsedTimeEnd, err := strconv.ParseInt(timeEnd, 10, 64)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse end time")
return
}

parsedLimit, err := strconv.ParseInt(limit, 10, 64)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse limit")
return
}

if groupBy == "" {
groupBy = operationField
}

done := make(chan bool)

go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-done:
return
case <-ticker.C:
if f, ok := w.(http.Flusher); ok {
// w.WriteHeader(http.StatusProcessing)
w.Write([]byte("\n"))
f.Flush()
}
}
}
}()

defer func() {
done <- true
}()

data, err := i.GetVisualization(r.Context(), parsedLimit, groupBy, operation, operationField, order, query, parsedTimeStart, parsedTimeEnd)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not get logs")
return
}

render.JSON(w, r, data)
}

// Register returns a new router which can be used in the router for the kobs rest api.
func Register(clusters *clusters.Clusters, plugins *plugin.Plugins, config Config) chi.Router {
var instances []*instance.Instance
Expand Down Expand Up @@ -156,6 +233,7 @@ func Register(clusters *clusters.Clusters, plugins *plugin.Plugins, config Confi
}

router.Get("/logs/{name}", router.getLogs)
router.Get("/visualization/{name}", router.getVisualization)

return router
}
3 changes: 3 additions & 0 deletions plugins/clickhouse/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
},
"dependencies": {
"@kobsio/plugin-core": "*",
"@nivo/bar": "^0.73.1",
"@nivo/pie": "^0.73.0",
"@nivo/tooltip": "^0.73.0",
"@patternfly/react-charts": "^6.15.23",
"@patternfly/react-core": "^4.128.2",
"@patternfly/react-icons": "^4.10.11",
Expand Down
47 changes: 47 additions & 0 deletions plugins/clickhouse/pkg/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,53 @@ func (i *Instance) GetLogs(ctx context.Context, query, order, orderBy string, ti
return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, nil
}

// GetVisualization build an aggregation query for the given parameters and returns the result as slice of label, value
// pairs.
func (i *Instance) GetVisualization(ctx context.Context, limit int64, groupBy, operation, operationField, order, query string, timeStart, timeEnd int64) ([]VisualizationRow, error) {
var data []VisualizationRow

// As we also do it for the logs query we have to build our where condition for the SQL query first.
whereConditions := ""
if query != "" {
parsedQuery, err := parseLogsQuery(query, i.materializedColumns)
if err != nil {
return nil, err
}

whereConditions = fmt.Sprintf("timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d) AND %s", timeStart, timeEnd, parsedQuery)
}

// Now we have to transform all the given fields / values into a format, which we can use in our SQL query. This
// query is built in the following and then run against ClickHouse. All the returned rows are added to our data
// slice and returned, so that we can used it later in the React UI.
groupBy = formatField(groupBy, i.materializedColumns)
operationField = formatField(operationField, i.materializedColumns)
order = formatOrder(order)

sql := fmt.Sprintf("SELECT %s as label, %s(%s) as value FROM %s.logs WHERE %s GROUP BY %s ORDER BY value %s LIMIT %d SETTINGS skip_unavailable_shards = 1", groupBy, operation, operationField, i.database, whereConditions, groupBy, order, limit)
log.WithFields(logrus.Fields{"query": sql}).Tracef("sql query visualization")
rows, err := i.client.QueryContext(ctx, sql)
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
var r VisualizationRow
if err := rows.Scan(&r.Label, &r.Value); err != nil {
return nil, err
}

data = append(data, r)
}

if err := rows.Err(); err != nil {
return nil, err
}

return data, nil
}

// New returns a new ClickHouse instance for the given configuration.
func New(config Config) (*Instance, error) {
if config.WriteTimeout == "" {
Expand Down
6 changes: 6 additions & 0 deletions plugins/clickhouse/pkg/instance/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@ type Bucket struct {
Interval int64 `json:"interval"`
Count int64 `json:"count"`
}

// VisualizationRow is the structure of a single row for a visualization.
type VisualizationRow struct {
Label string `json:"label"`
Value float64 `json:"value"`
}
37 changes: 37 additions & 0 deletions plugins/clickhouse/pkg/instance/visualization.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package instance

import (
"fmt"
"strings"
)

// formatField returns the SQL syntax for the given field. If the field is of type string and not in the default fields
// or materialized columns list it must be wrapped in single quotes.
func formatField(field string, materializedColumns []string) string {
field = strings.TrimSpace(field)

if contains(defaultFields, field) || contains(materializedColumns, field) {
return field
}

if string(field[0]) == "'" && string(field[len(field)-1]) == "'" {
field = field[1 : len(field)-1]

if contains(defaultFields, field) || contains(materializedColumns, field) {
return field
}

return fmt.Sprintf("fields_string.value[indexOf(fields_string.key, '%s')]", field)
}

return fmt.Sprintf("fields_number.value[indexOf(fields_number.key, '%s')]", field)
}

// formatOrder returns the order key word which can be used in the SQL query for the given input.
func formatOrder(order string) string {
if order == "descending" {
return "DESC"
}

return "ASC"
}
40 changes: 40 additions & 0 deletions plugins/clickhouse/pkg/instance/visualization_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package instance

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFormatField(t *testing.T) {
for _, tc := range []struct {
field string
expect string
}{
{field: "namespace", expect: "namespace"},
{field: "'namespace'", expect: "namespace"},
{field: "'content.method'", expect: "fields_string.value[indexOf(fields_string.key, 'content.method')]"},
{field: "content.duration", expect: "fields_number.value[indexOf(fields_number.key, 'content.duration')]"},
} {
t.Run(tc.field, func(t *testing.T) {
actual := formatField(tc.field, nil)
require.Equal(t, tc.expect, actual)
})
}
}

func TestFormatOrder(t *testing.T) {
for _, tc := range []struct {
order string
expect string
}{
{order: "descending", expect: "DESC"},
{order: "ascending", expect: "ASC"},
{order: "foo bar", expect: "ASC"},
} {
t.Run(tc.order, func(t *testing.T) {
actual := formatOrder(tc.order)
require.Equal(t, tc.expect, actual)
})
}
}
10 changes: 5 additions & 5 deletions plugins/clickhouse/src/components/page/Logs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import LogsChart from '../panel/LogsChart';
import LogsDocuments from '../panel/LogsDocuments';
import LogsFields from './LogsFields';

interface IPageLogsProps {
interface ILogsProps {
name: string;
fields?: string[];
order: string;
Expand All @@ -35,7 +35,7 @@ interface IPageLogsProps {
times: IPluginTimes;
}

const PageLogs: React.FunctionComponent<IPageLogsProps> = ({
const Logs: React.FunctionComponent<ILogsProps> = ({
name,
fields,
order,
Expand All @@ -46,11 +46,11 @@ const PageLogs: React.FunctionComponent<IPageLogsProps> = ({
changeOrder,
selectField,
times,
}: IPageLogsProps) => {
}: ILogsProps) => {
const history = useHistory();

const { isError, isFetching, isLoading, data, error, refetch } = useQuery<ILogsData, Error>(
['clickhouse/logs', query, order, orderBy, times],
['clickhouse/logs', name, query, order, orderBy, times],
async () => {
try {
const response = await fetch(
Expand Down Expand Up @@ -165,4 +165,4 @@ const PageLogs: React.FunctionComponent<IPageLogsProps> = ({
);
};

export default PageLogs;
export default Logs;
Loading