Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
- [#175](https://github.com/kobsio/kobs/pull/175): [prometheus] Fix tooltip when no unit is provided.
- [#186](https://github.com/kobsio/kobs/pull/186): [jaeger] Fix tooltip position in traces chart.
- [#189](https://github.com/kobsio/kobs/pull/189): [clickhouse] Fix download of `.csv` fiels.
- [#191](https://github.com/kobsio/kobs/pull/191): [clickhouse] Fix returned logs, when user selected a custom order.

### Changed

Expand Down
56 changes: 41 additions & 15 deletions plugins/clickhouse/pkg/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func (i *Instance) GetLogs(ctx context.Context, query, order, orderBy string, li
conditions = fmt.Sprintf("AND %s", parsedQuery)
}

parsedOrder := parseOrder(order, orderBy, i.materializedColumns)

// We check that the time range if not 0 or lower then 0, because this would mean that the end time is equal to the
// start time or before the start time, which results in an error for the following SQL queries.
if timeEnd-timeStart <= 0 {
Expand Down Expand Up @@ -214,22 +216,48 @@ func (i *Instance) GetLogs(ctx context.Context, query, order, orderBy string, li
return buckets[i].Interval < buckets[j].Interval
})

// We are only returning the first 1000 documents in buckets of the given limit, to speed up the following query
// to get the documents. For that we are looping through the sorted buckets and using the timestamp from the
// bucket where the sum of all newer buckets contains 1000 docuemnts.
// This new start time is then also returned in the response and can be used for the "load more" call as the new
// start date. In these follow up calls the start time isn't changed again, because we are skipping the count
// and bucket queries.
for i := len(buckets) - 1; i >= 0; i-- {
if count < limit && buckets[i].Count > 0 {
if timeConditions == "" {
timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", buckets[i].Interval, buckets[i].Interval+interval)
} else {
timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, buckets[i].Interval, buckets[i].Interval+interval)
// To optimize the query to get the raw logs we are creating a new time condition for the where statement. In that
// way we only have to look into the buckets which are containing some documents only have to include the first N
// buckets until the limit is reached.
// When provided a custom order (not "timestamp DESC") we can also optimize the search based on the limit when the
// user wants to sort the returned documents via "timestamp ASC". For all other order conditions we can only check
// if the bucket contains some documents, but we can not optimize the results based on the limit.
if parsedOrder == "timestamp DESC" {
for i := len(buckets) - 1; i >= 0; i-- {
if count < limit && buckets[i].Count > 0 {
if timeConditions == "" {
timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", buckets[i].Interval, buckets[i].Interval+interval)
} else {
timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, buckets[i].Interval, buckets[i].Interval+interval)
}
}

count = count + buckets[i].Count
}
} else if parsedOrder == "timestamp ASC" {
for i := 0; i < len(buckets); i++ {
if count < limit && buckets[i].Count > 0 {
if timeConditions == "" {
timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", buckets[i].Interval, buckets[i].Interval+interval)
} else {
timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, buckets[i].Interval, buckets[i].Interval+interval)
}
}

count = count + buckets[i].Count
count = count + buckets[i].Count
}
} else {
for i := 0; i < len(buckets); i++ {
if buckets[i].Count > 0 {
if timeConditions == "" {
timeConditions = fmt.Sprintf("(timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", buckets[i].Interval, buckets[i].Interval+interval)
} else {
timeConditions = fmt.Sprintf("%s OR (timestamp >= FROM_UNIXTIME(%d) AND timestamp <= FROM_UNIXTIME(%d))", timeConditions, buckets[i].Interval, buckets[i].Interval+interval)
}
}

count = count + buckets[i].Count
}
}

log.WithFields(logrus.Fields{"count": count, "buckets": buckets}).Tracef("sql result buckets")
Expand All @@ -240,8 +268,6 @@ func (i *Instance) GetLogs(ctx context.Context, query, order, orderBy string, li
return documents, fields, count, time.Now().Sub(queryStartTime).Milliseconds(), buckets, nil
}

parsedOrder := parseOrder(order, orderBy, i.materializedColumns)

// Now we are building and executing our sql query. We always return all fields from the logs table, where the
// timestamp of a row is within the selected query range and the parsed query. We also order all the results by the
// timestamp field and limiting the results / using a offset for pagination.
Expand Down