Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
List label values of allowed tenants only
Browse files Browse the repository at this point in the history
  • Loading branch information
niksajakovljevic committed Jun 14, 2022
1 parent c443ec7 commit 307350b
Show file tree
Hide file tree
Showing 14 changed files with 81 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -26,6 +26,7 @@ We use the following categories for changes:
- Tags from process table in Jaeger UI [#1385]
- Tags that have a numeric value, like `http.status_code=200` [#1385]
- Tags that involve status code [#1384]
- List label values of allowed tenants only [#1427]

## [0.11.0] - 2022-05-11

Expand Down
2 changes: 1 addition & 1 deletion pkg/pgclient/client.go
Expand Up @@ -142,7 +142,7 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t
AsyncAcks: cfg.AsyncAcks,
}

labelsReader := lreader.NewLabelsReader(dbConn, labelsCache)
labelsReader := lreader.NewLabelsReader(dbConn, labelsCache, mt.ReadAuthorizer())
exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig)

dbQuerierConn := pgxconn.NewQueryLoggingPgxConn(connPool)
Expand Down
23 changes: 17 additions & 6 deletions pkg/pgmodel/lreader/labels_reader.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/model/pgutf8str"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/tenancy"
)

const (
Expand All @@ -33,13 +34,18 @@ type LabelsReader interface {
LabelsForIdMap(idMap map[int64]labels.Label) (err error)
}

func NewLabelsReader(conn pgxconn.PgxConn, labels cache.LabelsCache) LabelsReader {
return &labelsReader{conn: conn, labels: labels}
func NewLabelsReader(conn pgxconn.PgxConn, labels cache.LabelsCache, mt tenancy.ReadAuthorizer) LabelsReader {
var authConfig tenancy.AuthConfig
if mt != nil {
authConfig = mt.(tenancy.AuthConfig)
}
return &labelsReader{conn: conn, labels: labels, authConfig: authConfig}
}

type labelsReader struct {
conn pgxconn.PgxConn
labels cache.LabelsCache
conn pgxconn.PgxConn
labels cache.LabelsCache
authConfig tenancy.AuthConfig
}

// LabelValues implements the LabelsReader interface. It returns all distinct values
Expand All @@ -59,8 +65,13 @@ func (lr *labelsReader) LabelValues(labelName string) ([]string, error) {
if err := rows.Scan(&value); err != nil {
return nil, err
}

labelValues = append(labelValues, value)
if labelName == tenancy.TenantLabelKey && lr.authConfig != nil {
if lr.authConfig.IsTenantAllowed(value) {
labelValues = append(labelValues, value)
}
} else {
labelValues = append(labelValues, value)
}
}

sort.Strings(labelValues)
Expand Down
37 changes: 36 additions & 1 deletion pkg/pgmodel/lreader/labels_reader_test.go
Expand Up @@ -11,6 +11,7 @@ import (
"testing"

"github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/tenancy"
)

func TestLabelsReaderLabelsNames(t *testing.T) {
Expand Down Expand Up @@ -105,6 +106,8 @@ func TestLabelsReaderLabelsValues(t *testing.T) {
name string
expectedRes []string
sqlQueries []model.SqlQuery
labelName string
tenant tenancy.AuthConfig
}{
{
name: "Error on query",
Expand All @@ -116,6 +119,7 @@ func TestLabelsReaderLabelsValues(t *testing.T) {
Err: fmt.Errorf("some error"),
},
},
labelName: "m",
}, {
name: "Error on scanning values",
sqlQueries: []model.SqlQuery{
Expand All @@ -125,6 +129,7 @@ func TestLabelsReaderLabelsValues(t *testing.T) {
Results: model.RowResults{{1}},
},
},
labelName: "m",
}, {
name: "Empty result, is ok",
sqlQueries: []model.SqlQuery{
Expand All @@ -134,6 +139,7 @@ func TestLabelsReaderLabelsValues(t *testing.T) {
Results: model.RowResults{},
},
},
labelName: "m",
expectedRes: []string{},
}, {
name: "Result should be sorted",
Expand All @@ -145,14 +151,43 @@ func TestLabelsReaderLabelsValues(t *testing.T) {
},
},
expectedRes: []string{"a", "b"},
labelName: "m",
},
{
name: "Tenant values are not filtered when tenant is not configured",
sqlQueries: []model.SqlQuery{
{
Sql: "SELECT value from _prom_catalog.label WHERE key = $1",
Args: []interface{}{tenancy.TenantLabelKey},
Results: model.RowResults{{"a"}, {"b"}},
},
},
expectedRes: []string{"a", "b"},
labelName: tenancy.TenantLabelKey,
},
{
name: "Tenant values are filtered when tenant is configured",
sqlQueries: []model.SqlQuery{
{
Sql: "SELECT value from _prom_catalog.label WHERE key = $1",
Args: []interface{}{tenancy.TenantLabelKey},
Results: model.RowResults{{"a"}, {"b"}},
},
},
expectedRes: []string{"a"},
labelName: tenancy.TenantLabelKey,
tenant: tenancy.NewSelectiveTenancyConfig([]string{"a"}, false),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mock := model.NewSqlRecorder(tc.sqlQueries, t)
querier := labelsReader{conn: mock}
res, err := querier.LabelValues("m")
if tc.tenant != nil {
querier = labelsReader{conn: mock, authConfig: tc.tenant}
}
res, err := querier.LabelValues(tc.labelName)

var expectedErr error
for _, q := range tc.sqlQueries {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pgmodel/querier/querier_sql_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/timescale/promscale/pkg/pgmodel/lreader"
"github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/prompb"
"github.com/timescale/promscale/pkg/tenancy"
)

func TestPGXQuerierQuery(t *testing.T) {
Expand Down Expand Up @@ -720,7 +721,7 @@ func TestPGXQuerierQuery(t *testing.T) {
if err != nil {
t.Fatalf("error setting up mock cache: %s", err.Error())
}
querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0))}}
querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}}

result, err := querier.RemoteReadQuerier().Query(c.query)

Expand Down
5 changes: 3 additions & 2 deletions pkg/runner/runner.go
Expand Up @@ -271,8 +271,9 @@ func Run(cfg *Config) error {
mux.Handle("/", router)

server := http.Server{
Addr: cfg.ListenAddr,
Handler: mux,
Addr: cfg.ListenAddr,
Handler: mux,
ReadHeaderTimeout: time.Second * 30, // To mitigate Slowloris DDoS attack. Value is arbitrary picked
}
group.Add(
func() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/tests/end_to_end_tests/continuous_agg_test.go
Expand Up @@ -211,7 +211,7 @@ WITH (timescaledb.continuous) AS
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(readOnly)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer)
r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil)
queryable := query.NewQueryable(r, labelsReader)
queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil)
Expand Down
3 changes: 2 additions & 1 deletion pkg/tests/end_to_end_tests/exemplar_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/prompb"
"github.com/timescale/promscale/pkg/query"
"github.com/timescale/promscale/pkg/tenancy"
)

var rawExemplar = []prompb.Exemplar{
Expand Down Expand Up @@ -198,7 +199,7 @@ func TestExemplarQueryingAPI(t *testing.T) {
// We do not check num of insertablesIngested and metadataIngested returned above from ingestor.Ingest,
// since the return will be 0, as they have already been ingested by TestExemplarIngestion.

labelsReader := lreader.NewLabelsReader(pgxconn.NewPgxConn(db), cache.NewLabelsCache(cache.DefaultConfig))
labelsReader := lreader.NewLabelsReader(pgxconn.NewPgxConn(db), cache.NewLabelsCache(cache.DefaultConfig), tenancy.NewNoopAuthorizer().ReadAuthorizer())
r := querier.NewQuerier(
pgxconn.NewPgxConn(db),
cache.NewMetricCache(cache.DefaultConfig),
Expand Down
5 changes: 4 additions & 1 deletion pkg/tests/end_to_end_tests/main_test.go
Expand Up @@ -8,13 +8,15 @@ import (
"context"
"flag"
"fmt"
constants "github.com/timescale/promscale/pkg/tests"
"io"
"os"
"path/filepath"
"testing"
"time"

"github.com/timescale/promscale/pkg/tenancy"
constants "github.com/timescale/promscale/pkg/tests"

"github.com/docker/go-connections/nat"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
Expand Down Expand Up @@ -60,6 +62,7 @@ var (
// timeseriesWithExemplars is a singleton instance for testing generated timeseries for exemplars based E2E tests.
// The called must ensure to use by call/copy only.
timeseriesWithExemplars []prompb.TimeSeries
noopReadAuthorizer = tenancy.NewNoopAuthorizer().ReadAuthorizer()
)

func init() {
Expand Down
12 changes: 6 additions & 6 deletions pkg/tests/end_to_end_tests/multi_tenancy_test.go
Expand Up @@ -51,7 +51,7 @@ func TestMultiTenancyWithoutValidTenants(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(db)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer())
qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer())

// ----- query-test: querying a single tenant (tenant-a) -----
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestMultiTenancyWithValidTenants(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(db)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer())
qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer())

// ----- query-test: querying a valid tenant (tenant-a) -----
Expand Down Expand Up @@ -367,7 +367,7 @@ func TestMultiTenancyWithValidTenants(t *testing.T) {
mt, err = tenancy.NewAuthorizer(cfg)
require.NoError(t, err)

labelsReader = lreader.NewLabelsReader(dbConn, lCache)
labelsReader = lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer())
qr = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer())

expectedResult = []prompb.TimeSeries{}
Expand Down Expand Up @@ -445,7 +445,7 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(db)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer())
qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer())

// ----- query-test: querying a non-tenant -----
Expand Down Expand Up @@ -532,7 +532,7 @@ func TestMultiTenancyWithValidTenantsAndNonTenantOps(t *testing.T) {
mt, err = tenancy.NewAuthorizer(cfg)
require.NoError(t, err)

labelsReader = lreader.NewLabelsReader(dbConn, lCache)
labelsReader = lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer())
qr = querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer())

expectedResult = []prompb.TimeSeries{
Expand Down Expand Up @@ -638,7 +638,7 @@ func TestMultiTenancyWithValidTenantsAsLabels(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(db)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, mt.ReadAuthorizer())
qr := querier.NewQuerier(dbConn, mCache, labelsReader, nil, mt.ReadAuthorizer())

// ----- query-test: querying a single tenant (tenant-b) -----
Expand Down
2 changes: 1 addition & 1 deletion pkg/tests/end_to_end_tests/nan_test.go
Expand Up @@ -127,7 +127,7 @@ func TestSQLStaleNaN(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(db)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer)
r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil)
resp, err := r.RemoteReadQuerier().Query(c.query)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/tests/end_to_end_tests/null_chars_test.go
Expand Up @@ -65,7 +65,7 @@ func TestOperationWithNullChars(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(db)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer)
r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil)
resp, err := r.RemoteReadQuerier().Query(&prompb.Query{
Matchers: []*prompb.LabelMatcher{
Expand Down
2 changes: 1 addition & 1 deletion pkg/tests/end_to_end_tests/promql_label_endpoint_test.go
Expand Up @@ -110,7 +110,7 @@ func TestPromQLLabelEndpoint(t *testing.T) {

lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(readOnly)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer)
labelNames, err := labelsReader.LabelNames()
if err != nil {
t.Fatalf("could not get label names from querier")
Expand Down
10 changes: 5 additions & 5 deletions pkg/tests/end_to_end_tests/query_integration_test.go
Expand Up @@ -112,7 +112,7 @@ func TestDroppedViewQuery(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(readOnly)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer)
r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil)
_, err := r.RemoteReadQuerier().Query(&prompb.Query{
Matchers: []*prompb.LabelMatcher{
Expand Down Expand Up @@ -690,7 +690,7 @@ func TestSQLQuery(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(readOnly)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer)
r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil)
for _, c := range testCases {
tester.Run(c.name, func(t *testing.T) {
Expand Down Expand Up @@ -1064,7 +1064,7 @@ func TestPromQL(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(readOnly)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer)
r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil)
for _, c := range testCases {
tester.Run(c.name, func(t *testing.T) {
Expand Down Expand Up @@ -1265,7 +1265,7 @@ func TestPushdownDelta(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(readOnly)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer)
r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil)
queryable := query.NewQueryable(r, labelsReader)
queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil)
Expand Down Expand Up @@ -1340,7 +1340,7 @@ func TestPushdownVecSel(t *testing.T) {
mCache := &cache.MetricNameCache{Metrics: clockcache.WithMax(cache.DefaultMetricCacheSize)}
lCache := clockcache.WithMax(100)
dbConn := pgxconn.NewPgxConn(readOnly)
labelsReader := lreader.NewLabelsReader(dbConn, lCache)
labelsReader := lreader.NewLabelsReader(dbConn, lCache, noopReadAuthorizer)
r := querier.NewQuerier(dbConn, mCache, labelsReader, nil, nil)
queryable := query.NewQueryable(r, labelsReader)
queryEngine, err := query.NewEngine(log.GetLogger(), time.Minute, time.Minute*5, time.Minute, 50000000, nil)
Expand Down

0 comments on commit 307350b

Please sign in to comment.