Skip to content

Commit

Permalink
Send query plan to querier. (grafana#11246)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Following grafana#11123 and in order to
enable grafana#10417 the query frontend
should send the serialized LogQL AST instead of the query string to the
queriers. This enables the frontend to change the AST and inject
expressions that are not expressible in LogQL.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](grafana@0d4416a)

---------

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: Callum Styan <callumstyan@gmail.com>
  • Loading branch information
2 people authored and rhnasc committed Apr 12, 2024
1 parent d72328d commit d15a1e3
Show file tree
Hide file tree
Showing 43 changed files with 932 additions and 530 deletions.
10 changes: 8 additions & 2 deletions pkg/logcli/client/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.

ctx = user.InjectOrgID(ctx, f.orgID)

params := logql.NewLiteralParams(
params, err := logql.NewLiteralParams(
q,
t, t,
0,
Expand All @@ -78,6 +78,9 @@ func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.
uint32(limit),
nil,
)
if err != nil {
return nil, fmt.Errorf("failed to parse query: %w", err)
}

query := f.engine.Query(params)

Expand Down Expand Up @@ -106,7 +109,7 @@ func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time

ctx = user.InjectOrgID(ctx, f.orgID)

params := logql.NewLiteralParams(
params, err := logql.NewLiteralParams(
queryStr,
start,
end,
Expand All @@ -116,6 +119,9 @@ func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time
uint32(limit),
nil,
)
if err != nil {
return nil, err
}

query := f.engine.Query(params)

Expand Down
22 changes: 18 additions & 4 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
var query logql.Query

if q.isInstant() {
query = eng.Query(logql.NewLiteralParams(
params, err := logql.NewLiteralParams(
q.QueryString,
q.Start,
q.Start,
Expand All @@ -460,9 +460,14 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
q.resultsDirection(),
uint32(q.Limit),
nil,
))
)
if err != nil {
return err
}

query = eng.Query(params)
} else {
query = eng.Query(logql.NewLiteralParams(
params, err := logql.NewLiteralParams(
q.QueryString,
q.Start,
q.End,
Expand All @@ -471,7 +476,16 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
q.resultsDirection(),
uint32(q.Limit),
nil,
))
)
if err != nil {
return err
}

query = eng.Query(params)
}

if err != nil {
return err
}

// execute the query
Expand Down
5 changes: 4 additions & 1 deletion pkg/logcli/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,10 @@ func (t *testQueryClient) Query(_ string, _ int, _ time.Time, _ logproto.Directi
func (t *testQueryClient) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, _ bool) (*loghttp.QueryResponse, error) {
ctx := user.InjectOrgID(context.Background(), "fake")

params := logql.NewLiteralParams(queryStr, from, through, step, interval, direction, uint32(limit), nil)
params, err := logql.NewLiteralParams(queryStr, from, through, step, interval, direction, uint32(limit), nil)
if err != nil {
return nil, err
}

v, err := t.engine.Query(params).Exec(ctx)
if err != nil {
Expand Down
50 changes: 24 additions & 26 deletions pkg/logproto/indexgateway.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/logproto/indexgateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ syntax = "proto3";

package indexgatewaypb;

import "gogoproto/gogo.proto";
import "pkg/logproto/logproto.proto";

option go_package = "github.com/grafana/loki/pkg/logproto";
Expand Down
78 changes: 38 additions & 40 deletions pkg/logproto/sketch.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/logproto/sketch.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ syntax = "proto3";

package logproto;

import "gogoproto/gogo.proto";
import "pkg/logproto/logproto.proto";

option go_package = "github.com/grafana/loki/pkg/logproto";
Expand Down
4 changes: 2 additions & 2 deletions pkg/logql/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
return false
}

query := qb.q.params.Query()
typ, err := QueryType(query)
query := qb.q.params.QueryString()
typ, err := QueryType(qb.q.params.GetExpression())
if err != nil {
typ = "unknown"
}
Expand Down
13 changes: 4 additions & 9 deletions pkg/logql/blocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,10 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
limits.blockedQueries = test.blocked

q := eng.Query(LiteralParams{
qs: test.q,
start: time.Unix(0, 0),
end: time.Unix(100000, 0),
step: 60 * time.Second,
direction: logproto.FORWARD,
limit: 1000,
})
_, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
params, err := NewLiteralParams(test.q, time.Unix(0, 0), time.Unix(100000, 0), 60*time.Second, 0, logproto.FORWARD, 1000, nil)
require.NoError(t, err)
q := eng.Query(params)
_, err = q.Exec(user.InjectOrgID(context.Background(), "fake"))

if test.expectedErr == nil {
require.NoError(t, err)
Expand Down

0 comments on commit d15a1e3

Please sign in to comment.