Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(routtrip): fix round tripper not find correct path #95

Merged
merged 5 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
<!-- main START -->
<!-- 1.0.8 START -->
# 1.0.8 (23/04/2024)
- **[BUGFIX]**: fix deepql running in distributed mode [#95](https://github.com/intergral/deep/pull/95) [@Umaaz](https://github.com/Umaaz)
<!-- 1.0.8 START -->

<!-- 1.0.7 START -->
# 1.0.7 (18/04/2024)
- **[FEATURE]**: add support for deepql [#93](https://github.com/intergral/deep/pull/93) [@Umaaz](https://github.com/Umaaz)
<!-- main START -->
<!-- 1.0.7 START -->

<!-- 1.0.6 START -->
# 1.0.6 (14/03/2024)
Expand Down
4 changes: 2 additions & 2 deletions cmd/deep/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,14 @@ func (t *App) initQuerier() (services.Service, error) {
// tracepointAPI handles requests to change the config for tracepoints
func (t *App) initQueryFrontend() (services.Service, error) {
// we create to 2 bridges (roundTrippers) one for each backend
roundTripper, tpTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, log.Logger, prometheus.DefaultRegisterer)
roundTripper, v1, err := frontend.InitFrontend(t.cfg.Frontend.Config, frontend.CortexNoQuerierLimits{}, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
t.frontend = v1

// create query frontend
queryFrontend, err := frontend.New(t.cfg.Frontend, roundTripper, tpTripper, t.overrides, t.store, log.Logger, prometheus.DefaultRegisterer)
queryFrontend, err := frontend.New(t.cfg.Frontend, roundTripper, t.overrides, t.store, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion examples/docker-compose/debug/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ services:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=true
- GF_INSTALL_PLUGINS=https://github.com/intergral/grafana-deep-tracepoint-panel/releases/download/v1.0.0/intergral-deep-tracepoint-panel-1.0.0.zip;intergral-deep-tracepoint-panel,https://github.com/intergral/grafana-deep-panel/releases/download/v1.0.1/intergral-deep-panel-1.0.1.zip;intergral-deep-panel,https://github.com/intergral/grafana-deep-datasource/releases/download/v1.0.2/intergral-deep-datasource-1.0.2.zip;intergral-deep-datasource
- GF_INSTALL_PLUGINS=https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-tracepoint-panel.v1.0.0/intergral-deep-tracepoint-panel-1.0.0.zip;intergral-deep-tracepoint-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-panel.v1.0.2/intergral-deep-panel-1.0.2.zip;intergral-deep-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-datasource.v1.0.3/intergral-deep-datasource-1.0.3.zip;intergral-deep-datasource
ports:
- "3000:3000"

Expand Down
19 changes: 17 additions & 2 deletions examples/docker-compose/distributed/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,27 @@ services:
grafana:
image: grafana/grafana-oss
volumes:
- ../shared/grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
- ./grafana-datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml
- ../shared/dashboards:/etc/grafana/provisioning/dashboards/
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
- GF_AUTH_DISABLE_LOGIN_FORM=true
- GF_INSTALL_PLUGINS=https://github.com/intergral/grafana-deep-panel/releases/download/v0.0.3/intergral-deep-panel-0.0.3.zip;intergral-deep-panel,https://github.com/intergral/grafana-deep-datasource/releases/download/v0.0.7/intergral-deep-datasource-0.0.7.zip;intergral-deep-datasource
- GF_INSTALL_PLUGINS=https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-tracepoint-panel.v1.0.0/intergral-deep-tracepoint-panel-1.0.0.zip;intergral-deep-tracepoint-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-panel.v1.0.2/intergral-deep-panel-1.0.2.zip;intergral-deep-panel,https://github.com/intergral/plugin-signer/releases/download/intergral%2Fgrafana-deep-datasource.v1.0.3/intergral-deep-datasource-1.0.3.zip;intergral-deep-datasource
ports:
- "3000:3000"

test_load:
image: intergral/deep-cli
depends_on:
- distributor
command:
- generate
- snapshot
- --endpoint=distributor:43315
- --count=3
- --sleep=5
- --iterations=-1
- --random-string
- --random-duration
- --duration-nanos=1000000000
3 changes: 2 additions & 1 deletion examples/docker-compose/distributed/grafana-datasources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ datasources:
apiVersion: 1
uid: deep
jsonData:
httpMethod: GET
experimental:
deepql: true
24 changes: 21 additions & 3 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net/http"
"time"

"github.com/intergral/deep/pkg/api"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -103,12 +105,28 @@ func (CortexNoQuerierLimits) MaxQueriersPerUser(string) int { return 0 }
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg v1.Config, limits v1.Limits, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, http.RoundTripper, *v1.Frontend, error) {
func InitFrontend(cfg v1.Config, limits v1.Limits, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
statVersion.Set("v1")
// No scheduler = use original frontend.
fr, err := v1.New(cfg, limits, log, reg)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.QuerierRoundTrip), transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.TracepointRoundTrip), fr, nil
// create both round trippers
searchRt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.QuerierRoundTrip)
tracepointRt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr.TracepointRoundTrip)
// filter the round trippers based on the paths
tripper := transport.NewSplittingRoundTripper(&transport.SplitRule{
Rules: []string{
api.PathPrefixTracepoints,
},
RoundTripper: tracepointRt,
}, &transport.SplitRule{
Rules: []string{
api.PathPrefixQuerier,
},
RoundTripper: searchRt,
})

return tripper, fr, nil
}
4 changes: 2 additions & 2 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type QueryFrontend struct {
}

// New returns a new QueryFrontend
func New(cfg Config, next http.RoundTripper, tpNext http.RoundTripper, o *overrides.Overrides, store storage.Store, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
func New(cfg Config, next http.RoundTripper, o *overrides.Overrides, store storage.Store, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
level.Info(logger).Log("msg", "creating middleware in query frontend")

if cfg.SnapshotByID.QueryShards < minQueryShards || cfg.SnapshotByID.QueryShards > maxQueryShards {
Expand Down Expand Up @@ -100,7 +100,7 @@ func New(cfg Config, next http.RoundTripper, tpNext http.RoundTripper, o *overri
search := searchMiddleware.Wrap(next)

tpMiddleware := newTracepointForwardMiddleware()
tpHandler := tpMiddleware.Wrap(tpNext)
tpHandler := tpMiddleware.Wrap(next)

return &QueryFrontend{
SnapshotByID: newHandler(snapshots, snapshotByIDCounter, logger),
Expand Down
13 changes: 6 additions & 7 deletions modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func (s *mockNextTripperware) RoundTrip(_ *http.Request) (*http.Response, error)

func TestFrontendRoundTripsSearch(t *testing.T) {
next := &mockNextTripperware{}
tpNext := &mockNextTripperware{}
f, err := New(Config{
SnapshotByID: SnapshotByIDConfig{
QueryShards: minQueryShards,
Expand All @@ -54,7 +53,7 @@ func TestFrontendRoundTripsSearch(t *testing.T) {
},
SLO: testSLOcfg,
},
}, next, tpNext, nil, nil, log.NewNopLogger(), nil)
}, next, nil, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

req := httptest.NewRequest("GET", "/", nil)
Expand All @@ -77,7 +76,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
assert.Nil(t, f)

Expand All @@ -93,7 +92,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
assert.Nil(t, f)

Expand All @@ -109,7 +108,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0")
assert.Nil(t, f)

Expand All @@ -125,7 +124,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0")
assert.Nil(t, f)

Expand All @@ -143,7 +142,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, nil, log.NewNopLogger(), nil)
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
assert.Nil(t, f)
}
44 changes: 44 additions & 0 deletions modules/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ package transport
import (
"bytes"
"context"
"errors"
"io"
"io/ioutil"
"net/http"
"strings"

"github.com/opentracing/opentracing-go"

frontend_v1pb "github.com/intergral/deep/modules/frontend/v1/frontendv1pb"
)
Expand Down Expand Up @@ -95,3 +99,43 @@ func fromHeader(hs http.Header) []*frontend_v1pb.Header {
}
return result
}

// SplitRule allows controlling which requests go to this roundtripper
type SplitRule struct {
Rules []string
RoundTripper http.RoundTripper
}

func (r *SplitRule) matches(url string) bool {
for _, rule := range r.Rules {
if strings.HasPrefix(url, rule) {
return true
}
}
return false
}

type splitRoundTripper struct {
http.RoundTripper

rules []*SplitRule
}

// NewSplittingRoundTripper creates a new round tripper with rules
func NewSplittingRoundTripper(rules ...*SplitRule) http.RoundTripper {
return &splitRoundTripper{
rules: rules,
}
}

func (s *splitRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
span, _ := opentracing.StartSpanFromContext(r.Context(), "splitRoundTripper.RoundTrip")
defer span.Finish()

for _, rule := range s.rules {
if rule.matches(r.RequestURI) {
return rule.RoundTripper.RoundTrip(r)
}
}
return nil, errors.New("no matching round tripper")
}
Loading