diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index af89b79bc81..99132705d84 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -361,7 +361,7 @@ func processDownsampling( } level.Info(logger).Log("msg", "downloaded block", "id", m.ULID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) - if err := block.VerifyIndex(logger, filepath.Join(bdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil && !acceptMalformedIndex { + if err := block.VerifyIndex(ctx, logger, filepath.Join(bdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil && !acceptMalformedIndex { return errors.Wrap(err, "input block index not valid") } @@ -380,7 +380,7 @@ func processDownsampling( } defer runutil.CloseWithLogOnErr(log.With(logger, "outcome", "potential left mmap file handlers left"), b, "tsdb reader") - id, err := downsample.Downsample(logger, m, b, dir, resolution) + id, err := downsample.Downsample(ctx, logger, m, b, dir, resolution) if err != nil { return errors.Wrapf(err, "downsample block %s to window %d", m.ULID, resolution) } @@ -391,7 +391,7 @@ func processDownsampling( "from", m.ULID, "to", id, "duration", downsampleDuration, "duration_ms", downsampleDuration.Milliseconds()) metrics.downsampleDuration.WithLabelValues(m.Thanos.GroupKey()).Observe(downsampleDuration.Seconds()) - stats, err := block.GatherIndexHealthStats(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime) + stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime) if err == nil { err = stats.AnyErr() } diff --git a/go.mod b/go.mod index fab037080d9..0a65da66ab0 100644 --- a/go.mod +++ b/go.mod @@ -56,17 +56,17 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/alertmanager v0.25.1 + github.com/prometheus/alertmanager v0.26.0 github.com/prometheus/client_golang v1.16.0 github.com/prometheus/client_model v0.4.0 github.com/prometheus/common v0.44.0 github.com/prometheus/exporter-toolkit v0.10.0 // Prometheus maps version 2.x.y to tags v0.x.y. - github.com/prometheus/prometheus v0.46.1-0.20230818184859-4d8e380269da + github.com/prometheus/prometheus v0.47.1-0.20231002142425-1492031ef2e0 github.com/sony/gobreaker v0.5.0 github.com/stretchr/testify v1.8.4 github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed - github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e + github.com/thanos-io/promql-engine v0.0.0-20231003153358-8605b6afba51 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible // indirect github.com/vimeo/galaxycache v0.0.0-20210323154928-b7e5d71c067a @@ -120,7 +120,7 @@ require ( github.com/onsi/gomega v1.27.10 go.opentelemetry.io/contrib/propagators/autoprop v0.38.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb - golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 + golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b ) require ( @@ -129,6 +129,7 @@ require ( github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/ginkgo v1.16.5 // indirect + github.com/zhangyunhao116/umap v0.0.0-20221211160557-cb7705fafa39 // indirect go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 // indirect go.opentelemetry.io/collector/semconv v0.81.0 // indirect go.opentelemetry.io/contrib/propagators/ot v1.13.0 // indirect diff --git a/go.sum b/go.sum index 05a679f7666..15dc7b3e564 100644 --- a/go.sum +++ b/go.sum @@ -437,7 +437,7 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/gnostic v0.6.9 h1:ZK/5VhkoX835RikCHpSUJV9a+S3e1zLh59YnyWeBW+0= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -534,7 +534,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rH github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= -github.com/hashicorp/consul/api v1.22.0 h1:ydEvDooB/A0c/xpsBd8GSt7P2/zYPBui4KrNip0xGjE= +github.com/hashicorp/consul/api v1.25.1 h1:CqrdhYzc8XZuPnhIYZWH45toM0LB9ZeYr/gvpLVI3PE= github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/cronexpr v1.1.2 h1:wG/ZYIKT+RT3QkOdgYc+xsKWVRgnxJ1OJtjjy84fJ9A= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -779,8 +779,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= -github.com/prometheus/alertmanager v0.25.1 h1:LGBNMspOfv8h7brb+LWj2wnwBCg2ZuuKWTh6CAVw2/Y= -github.com/prometheus/alertmanager v0.25.1/go.mod h1:MEZ3rFVHqKZsw7IcNS/m4AWZeXThmJhumpiWR4eHU/w= +github.com/prometheus/alertmanager v0.26.0 h1:uOMJWfIwJguc3NaM3appWNbbrh6G/OjvaHMk22aBBYc= +github.com/prometheus/alertmanager v0.26.0/go.mod h1:rVcnARltVjavgVaNnmevxK7kOn7IZavyf0KNgHkbEpU= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= @@ -826,8 +826,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY= -github.com/prometheus/prometheus v0.46.1-0.20230818184859-4d8e380269da h1:D5uk+FEdNjQs9ly/wkb/pXkoWc60GcV9RVsMUpg/BIE= -github.com/prometheus/prometheus v0.46.1-0.20230818184859-4d8e380269da/go.mod h1:uvQsz/zwlfb8TRuWjK7L7ofV5ycAYq8dorvNf2iOBN4= +github.com/prometheus/prometheus v0.47.1-0.20231002142425-1492031ef2e0 h1:hwaGXm+Cmz3s6YFBxLaEVORpeoxvEHd6H4anoIhCMVM= +github.com/prometheus/prometheus v0.47.1-0.20231002142425-1492031ef2e0/go.mod h1:UC0TwJiF90m2T3iYPQBKnGu8gv3s55dF/EgpTq8gyvo= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw= github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU= @@ -904,8 +904,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1 github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed h1:iWQdY3S6DpWjelVvKKSKgS7LeLkhK4VaEnQfphB9ZXA= github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE= -github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e h1:kwsFCU8eSkZehbrAN3nXPw5RdMHi/Bok/y8l2C4M+gk= -github.com/thanos-io/promql-engine v0.0.0-20230821193351-e1ae4275b96e/go.mod h1:+T/ZYNCGybT6eTsGGvVtGb63nT1cvUmH6MjqRrcQoKw= +github.com/thanos-io/promql-engine v0.0.0-20231003153358-8605b6afba51 h1:Av62ac0O9wRbLI6xvtm51BBZnxHyEgLXV/YmiJpdogc= +github.com/thanos-io/promql-engine v0.0.0-20231003153358-8605b6afba51/go.mod h1:vfXJv1JXNdLfHnjsHsLLJl5tyI7KblF76Wo5lZ9YC4Q= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= @@ -948,6 +948,8 @@ github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLC github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +github.com/zhangyunhao116/umap v0.0.0-20221211160557-cb7705fafa39 h1:D3ltj0b2c2FgUacKrB1pWGgwrUyCESY9W8XYYQ5sqY8= +github.com/zhangyunhao116/umap v0.0.0-20221211160557-cb7705fafa39/go.mod h1:r86X1CnsDRrOeLtJlqRWdELPWpkcf933GTlojQlifQw= go.elastic.co/apm v1.11.0 h1:uJyt6nCW9880sZhfl1tB//Jy/5TadNoAd8edRUtgb3w= go.elastic.co/apm v1.11.0/go.mod h1:qoOSi09pnzJDh5fKnfY7bPmQgl8yl2tULdOu03xhui0= go.elastic.co/apm/module/apmhttp v1.11.0 h1:k/MjK0y2aLOXumoM8jcWXqxvIFlMS4U8Bn9cMUPdVX0= @@ -1062,8 +1064,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= -golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b h1:r+vk0EmXNmekl0S0BascoeeoHk/L7wmaW2QF90K+kYI= +golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -1593,10 +1595,10 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= howett.net/plist v0.0.0-20181124034731-591f970eefbb h1:jhnBjNi9UFpfpl8YZhA9CrOqpnJdvzuiHsl/dnxl11M= howett.net/plist v0.0.0-20181124034731-591f970eefbb/go.mod h1:vMygbs4qMhSZSc4lCUl2OEE+rDiIIJAIdR4m7MiMcm0= -k8s.io/api v0.27.3 h1:yR6oQXXnUEBWEWcvPWS0jQL575KoAboQPfJAuKNrw5Y= -k8s.io/apimachinery v0.27.3 h1:Ubye8oBufD04l9QnNtW05idcOe9Z3GQN8+7PqmuVcUM= -k8s.io/client-go v0.27.3 h1:7dnEGHZEJld3lYwxvLl7WoehK6lAq7GvgjxpA3nv1E8= -k8s.io/kube-openapi v0.0.0-20230525220651-2546d827e515 h1:OmK1d0WrkD3IPfkskvroRykOulHVHf0s0ZIFRjyt+UI= +k8s.io/api v0.28.1 h1:i+0O8k2NPBCPYaMB+uCkseEbawEt/eFaiRqUx8aB108= +k8s.io/apimachinery v0.28.1 h1:EJD40og3GizBSV3mkIoXQBsws32okPOy+MkRyzh6nPY= +k8s.io/client-go v0.28.1 h1:pRhMzB8HyLfVwpngWKE8hDcXRqifh1ga2Z/PU9SXVK8= +k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= k8s.io/utils v0.0.0-20230711102312-30195339c3c7 h1:ZgnF1KZsYxWIifwSNZFZgNtWE89WI5yiP5WwlfDoIyc= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/internal/cortex/querier/series/series_set.go b/internal/cortex/querier/series/series_set.go index ff213d183e7..ef82cc2d858 100644 --- a/internal/cortex/querier/series/series_set.go +++ b/internal/cortex/querier/series/series_set.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/annotations" ) // ConcreteSeriesSet implements storage.SeriesSet. @@ -62,7 +63,7 @@ func (c *ConcreteSeriesSet) Err() error { } // Warnings implements storage.SeriesSet. -func (c *ConcreteSeriesSet) Warnings() storage.Warnings { +func (c *ConcreteSeriesSet) Warnings() annotations.Annotations { return nil } diff --git a/pkg/api/api.go b/pkg/api/api.go index 6be81c8b158..52444797cd2 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -25,6 +25,7 @@ import ( "net/http" "os" "runtime" + "strings" "time" "github.com/go-kit/log" @@ -33,6 +34,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/common/route" "github.com/prometheus/common/version" + "github.com/prometheus/prometheus/util/annotations" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/logging" @@ -233,9 +235,22 @@ func GetInstr( return instr } +func shouldNotCacheBecauseOfWarnings(warnings []error) bool { + // PromQL warnings should not prevent caching + for _, w := range warnings { + // We cannot use "errors.Is(w, annotations.PromQLInfo)" here because of gRPC + hasPromqlInfoPrefix := strings.HasPrefix(w.Error(), annotations.PromQLInfo.Error()) + hasPromqlWarningPrefix := strings.HasPrefix(w.Error(), annotations.PromQLWarning.Error()) + if !(hasPromqlInfoPrefix || hasPromqlWarningPrefix) { + return true + } + } + return false +} + func Respond(w http.ResponseWriter, data interface{}, warnings []error) { w.Header().Set("Content-Type", "application/json") - if len(warnings) > 0 { + if shouldNotCacheBecauseOfWarnings(warnings) { w.Header().Set("Cache-Control", "no-store") } w.WriteHeader(http.StatusOK) diff --git a/pkg/api/query/grpc.go b/pkg/api/query/grpc.go index f41675c8516..769b0001c5b 100644 --- a/pkg/api/query/grpc.go +++ b/pkg/api/query/grpc.go @@ -129,7 +129,7 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer } if len(result.Warnings) != 0 { - if err := server.Send(querypb.NewQueryWarningsResponse(result.Warnings...)); err != nil { + if err := server.Send(querypb.NewQueryWarningsResponse(result.Warnings.AsErrors()...)); err != nil { return err } } @@ -231,7 +231,7 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que } if len(result.Warnings) != 0 { - if err := srv.Send(querypb.NewQueryRangeWarningsResponse(result.Warnings...)); err != nil { + if err := srv.Send(querypb.NewQueryRangeWarningsResponse(result.Warnings.AsErrors()...)); err != nil { return err } } diff --git a/pkg/api/query/grpc_test.go b/pkg/api/query/grpc_test.go index 208efb2e6c2..d95e228090b 100644 --- a/pkg/api/query/grpc_test.go +++ b/pkg/api/query/grpc_test.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/thanos-io/thanos/pkg/api/query/querypb" @@ -41,7 +42,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) { { name: "error response", engine: &engineStub{ - warns: []error{errors.New("warn stub")}, + warns: annotations.New().Add(errors.New("warn stub")), }, }, } @@ -68,7 +69,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) { if len(test.engine.warns) > 0 { testutil.Ok(t, err) for i, resp := range srv.responses { - testutil.Equals(t, test.engine.warns[i].Error(), resp.GetWarnings()) + testutil.Equals(t, test.engine.warns.AsErrors()[i].Error(), resp.GetWarnings()) } } }) @@ -87,7 +88,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) { if len(test.engine.warns) > 0 { testutil.Ok(t, err) for i, resp := range srv.responses { - testutil.Equals(t, test.engine.warns[i].Error(), resp.GetWarnings()) + testutil.Equals(t, test.engine.warns.AsErrors()[i].Error(), resp.GetWarnings()) } } }) @@ -97,7 +98,7 @@ func TestGRPCQueryAPIErrorHandling(t *testing.T) { type engineStub struct { v1.QueryEngine err error - warns []error + warns annotations.Annotations } func (e engineStub) NewInstantQuery(_ context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { @@ -111,7 +112,7 @@ func (e engineStub) NewRangeQuery(_ context.Context, q storage.Queryable, opts p type queryStub struct { promql.Query err error - warns []error + warns annotations.Annotations } func (q queryStub) Close() {} diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index ff8422086b7..89986864128 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -41,6 +41,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/stats" v1 "github.com/prometheus/prometheus/web/api/v1" promqlapi "github.com/thanos-io/promql-engine/api" @@ -589,7 +590,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro Result: res.Value, Stats: qs, QueryExplanation: explanation, - }, res.Warnings, nil, qry.Close + }, res.Warnings.AsErrors(), nil, qry.Close } func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.ApiError, func()) { @@ -759,7 +760,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap Result: res.Value, Stats: qs, QueryExplanation: explanation, - }, res.Warnings, nil, qry.Close + }, res.Warnings.AsErrors(), nil, qry.Close } func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.ApiError, func()) { @@ -811,7 +812,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A true, nil, query.NoopSeriesStatsReporter, - ).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) + ).Querier(timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } @@ -819,17 +820,17 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A var ( vals []string - warnings storage.Warnings + warnings annotations.Annotations ) if len(matcherSets) > 0 { - var callWarnings storage.Warnings + var callWarnings annotations.Annotations labelValuesSet := make(map[string]struct{}) for _, matchers := range matcherSets { - vals, callWarnings, err = q.LabelValues(name, matchers...) + vals, callWarnings, err = q.LabelValues(ctx, name, matchers...) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } - warnings = append(warnings, callWarnings...) + warnings.Merge(callWarnings) for _, val := range vals { labelValuesSet[val] = struct{}{} } @@ -841,7 +842,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A } sort.Strings(vals) } else { - vals, warnings, err = q.LabelValues(name) + vals, warnings, err = q.LabelValues(ctx, name) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } @@ -851,7 +852,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A vals = make([]string, 0) } - return vals, warnings, nil, func() {} + return vals, warnings.AsErrors(), nil, func() {} } func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiError, func()) { @@ -914,7 +915,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr true, nil, query.NoopSeriesStatsReporter, - ).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) + ).Querier(timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} @@ -926,7 +927,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr sets []storage.SeriesSet ) for _, mset := range matcherSets { - sets = append(sets, q.Select(false, nil, mset...)) + sets = append(sets, q.Select(ctx, false, nil, mset...)) } set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) @@ -936,7 +937,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr if set.Err() != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: set.Err()}, func() {} } - return metrics, set.Warnings(), nil, func() {} + return metrics, set.Warnings().AsErrors(), nil, func() {} } func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.ApiError, func()) { @@ -981,7 +982,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap true, nil, query.NoopSeriesStatsReporter, - ).Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end)) + ).Querier(timestamp.FromTime(start), timestamp.FromTime(end)) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } @@ -989,18 +990,18 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap var ( names []string - warnings storage.Warnings + warnings annotations.Annotations ) if len(matcherSets) > 0 { - var callWarnings storage.Warnings + var callWarnings annotations.Annotations labelNamesSet := make(map[string]struct{}) for _, matchers := range matcherSets { - names, callWarnings, err = q.LabelNames(matchers...) + names, callWarnings, err = q.LabelNames(ctx, matchers...) if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, func() {} } - warnings = append(warnings, callWarnings...) + warnings.Merge(callWarnings) for _, val := range names { labelNamesSet[val] = struct{}{} } @@ -1012,7 +1013,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap } sort.Strings(names) } else { - names, warnings, err = q.LabelNames() + names, warnings, err = q.LabelNames(ctx) } if err != nil { @@ -1022,7 +1023,7 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap names = make([]string, 0) } - return names, warnings, nil, func() {} + return names, warnings.AsErrors(), nil, func() {} } func (qapi *QueryAPI) stores(_ *http.Request) (interface{}, []error, *api.ApiError, func()) { @@ -1065,7 +1066,7 @@ func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) f return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving targets")}, func() {} } - return t, warnings, nil, func() {} + return t, warnings.AsErrors(), nil, func() {} } } @@ -1083,7 +1084,7 @@ func NewAlertsHandler(client rules.UnaryClient, enablePartialResponse bool) func var ( groups *rulespb.RuleGroups - warnings storage.Warnings + warnings annotations.Annotations err error ) @@ -1111,7 +1112,7 @@ func NewAlertsHandler(client rules.UnaryClient, enablePartialResponse bool) func resp.Alerts = append(resp.Alerts, a.Alerts...) } } - return resp, warnings, nil, func() {} + return resp, warnings.AsErrors(), nil, func() {} } } @@ -1129,7 +1130,7 @@ func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func( var ( groups *rulespb.RuleGroups - warnings storage.Warnings + warnings annotations.Annotations err error ) @@ -1158,7 +1159,7 @@ func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func( if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Errorf("error retrieving rules: %v", err)}, func() {} } - return groups, warnings, nil, func() {} + return groups, warnings.AsErrors(), nil, func() {} } } @@ -1176,7 +1177,7 @@ func NewExemplarsHandler(client exemplars.UnaryClient, enablePartialResponse boo var ( data []*exemplarspb.ExemplarData - warnings storage.Warnings + warnings annotations.Annotations err error ) @@ -1203,7 +1204,7 @@ func NewExemplarsHandler(client exemplars.UnaryClient, enablePartialResponse boo if err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving exemplars")}, func() {} } - return data, warnings, nil, func() {} + return data, warnings.AsErrors(), nil, func() {} } } @@ -1294,7 +1295,7 @@ func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse var ( t map[string][]metadatapb.Meta - warnings storage.Warnings + warnings annotations.Annotations err error ) @@ -1321,6 +1322,6 @@ func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving metadata")}, func() {} } - return t, warnings, nil, func() {} + return t, warnings.AsErrors(), nil, func() {} } } diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index d4191f2e8c6..b24797af276 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -44,7 +44,8 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/tsdb/tsdbutil" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/util/annotations" promgate "github.com/prometheus/prometheus/util/gate" "github.com/prometheus/prometheus/util/stats" baseAPI "github.com/thanos-io/thanos/pkg/api" @@ -691,7 +692,7 @@ func TestMetadataEndpoints(t *testing.T) { var series []storage.Series for _, lbl := range old { - var samples []tsdbutil.Sample + var samples []chunks.Sample for i := int64(0); i < 10; i++ { samples = append(samples, sample{ @@ -1797,7 +1798,7 @@ func TestRulesHandler(t *testing.T) { } res, errors, apiError, releaseResources := endpoint(req.WithContext(ctx)) defer releaseResources() - if errors != nil { + if len(errors) > 0 { t.Fatalf("Unexpected errors: %s", errors) return } @@ -1850,11 +1851,11 @@ func BenchmarkQueryResultEncoding(b *testing.B) { type mockedRulesClient struct { g map[rulespb.RulesRequest_Type][]*rulespb.RuleGroup - w storage.Warnings + w annotations.Annotations err error } -func (c mockedRulesClient) Rules(_ context.Context, req *rulespb.RulesRequest) (*rulespb.RuleGroups, storage.Warnings, error) { +func (c mockedRulesClient) Rules(_ context.Context, req *rulespb.RulesRequest) (*rulespb.RuleGroups, annotations.Annotations, error) { return &rulespb.RuleGroups{Groups: c.g[req.Type]}, c.w, c.err } diff --git a/pkg/block/index.go b/pkg/block/index.go index d83c944cec0..a0307ccfed5 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -29,8 +29,8 @@ import ( ) // VerifyIndex does a full run over a block index and verifies that it fulfills the order invariants. -func VerifyIndex(logger log.Logger, fn string, minTime, maxTime int64) error { - stats, err := GatherIndexHealthStats(logger, fn, minTime, maxTime) +func VerifyIndex(ctx context.Context, logger log.Logger, fn string, minTime, maxTime int64) error { + stats, err := GatherIndexHealthStats(ctx, logger, fn, minTime, maxTime) if err != nil { return err } @@ -213,14 +213,15 @@ func (n *minMaxSumInt64) Avg() int64 { // helps to assess index health. // It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle. // See HealthStats.Issue347OutsideChunks for details. -func GatherIndexHealthStats(logger log.Logger, fn string, minTime, maxTime int64) (stats HealthStats, err error) { +func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, minTime, maxTime int64) (stats HealthStats, err error) { r, err := index.NewFileReader(fn) if err != nil { return stats, errors.Wrap(err, "open index file") } defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader") - p, err := r.Postings(index.AllPostingsKey()) + key, value := index.AllPostingsKey() + p, err := r.Postings(ctx, key, value) if err != nil { return stats, errors.Wrap(err, "get all postings") } @@ -238,13 +239,13 @@ func GatherIndexHealthStats(logger log.Logger, fn string, minTime, maxTime int64 seriesSize = newMinMaxSumInt64() ) - lnames, err := r.LabelNames() + lnames, err := r.LabelNames(ctx) if err != nil { return stats, errors.Wrap(err, "label names") } stats.LabelNamesCount = int64(len(lnames)) - lvals, err := r.LabelValues("__name__") + lvals, err := r.LabelValues(ctx, "__name__") if err != nil { return stats, errors.Wrap(err, "metric label values") } @@ -405,7 +406,7 @@ type ignoreFnType func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) ( // - removes all near "complete" outside chunks introduced by https://github.com/prometheus/tsdb/issues/347. // Fixable inconsistencies are resolved in the new block. // TODO(bplotka): https://github.com/thanos-io/thanos/issues/378. -func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { +func Repair(ctx context.Context, logger log.Logger, dir string, id ulid.ULID, source metadata.SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { if len(ignoreChkFns) == 0 { return resid, errors.New("no ignore chunk function specified") } @@ -461,7 +462,7 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT resmeta.Stats = tsdb.BlockStats{} // Reset stats. resmeta.Thanos.Source = source // Update source. - if err := rewrite(logger, indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { + if err := rewrite(ctx, logger, indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { return resid, errors.Wrap(err, "rewrite block") } resmeta.Thanos.SegmentFiles = GetSegmentFiles(resdir) @@ -567,6 +568,7 @@ type seriesRepair struct { // rewrite writes all data from the readers back into the writers while cleaning // up mis-ordered and duplicated chunks. func rewrite( + ctx context.Context, logger log.Logger, indexr tsdb.IndexReader, chunkr tsdb.ChunkReader, indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter, @@ -583,7 +585,8 @@ func rewrite( return errors.Wrap(symbols.Err(), "next symbol") } - all, err := indexr.Postings(index.AllPostingsKey()) + key, value := index.AllPostingsKey() + all, err := indexr.Postings(ctx, key, value) if err != nil { return errors.Wrap(err, "postings") } diff --git a/pkg/block/index_test.go b/pkg/block/index_test.go index 56fac512615..626e28e9dc6 100644 --- a/pkg/block/index_test.go +++ b/pkg/block/index_test.go @@ -61,7 +61,7 @@ func TestRewrite(t *testing.T) { defer cw.Close() - testutil.Ok(t, rewrite(log.NewNopLogger(), ir, cr, iw, cw, m, []ignoreFnType{func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) (bool, error) { + testutil.Ok(t, rewrite(ctx, log.NewNopLogger(), ir, cr, iw, cw, m, []ignoreFnType{func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) (bool, error) { return curr.MaxTime == 696, nil }})) @@ -73,7 +73,8 @@ func TestRewrite(t *testing.T) { defer func() { testutil.Ok(t, ir2.Close()) }() - all, err := ir2.Postings(index.AllPostingsKey()) + key, value := index.AllPostingsKey() + all, err := ir2.Postings(ctx, key, value) testutil.Ok(t, err) for p := ir2.SortedPostings(all); p.Next(); { @@ -86,12 +87,13 @@ func TestRewrite(t *testing.T) { } func TestGatherIndexHealthStatsReturnsOutOfOrderChunksErr(t *testing.T) { + ctx := context.Background() blockDir := t.TempDir() err := e2eutil.PutOutOfOrderIndex(blockDir, 0, math.MaxInt64) testutil.Ok(t, err) - stats, err := GatherIndexHealthStats(log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64) + stats, err := GatherIndexHealthStats(ctx, log.NewLogfmtLogger(os.Stderr), blockDir+"/"+IndexFilename, 0, math.MaxInt64) testutil.Ok(t, err) testutil.Equals(t, 1, stats.OutOfOrderChunks) diff --git a/pkg/block/indexheader/binary_reader.go b/pkg/block/indexheader/binary_reader.go index 7dbed1bec24..1a8af6a83ce 100644 --- a/pkg/block/indexheader/binary_reader.go +++ b/pkg/block/indexheader/binary_reader.go @@ -505,8 +505,7 @@ type BinaryReader struct { postingsV1 map[string]map[string]index.Range // Symbols struct that keeps only 1/postingOffsetsInMemSampling in the memory, then looks up the rest via mmap. - // Use Symbols as interface for ease of testing. - symbols Symbols + symbols *index.Symbols // Cache of the label name symbol lookups, // as there are not many and they are half of all lookups. nameSymbols map[uint32]string @@ -925,7 +924,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra return rngs, nil } -func (r *BinaryReader) LookupSymbol(o uint32) (string, error) { +func (r *BinaryReader) LookupSymbol(ctx context.Context, o uint32) (string, error) { if r.indexVersion == index.FormatV1 { // For v1 little trick is needed. Refs are actual offset inside index, not index-header. This is different // of the header length difference between two files. @@ -945,7 +944,7 @@ func (r *BinaryReader) LookupSymbol(o uint32) (string, error) { } r.valueSymbolsMx.Unlock() - s, err := r.symbols.Lookup(o) + s, err := r.symbols.Lookup(ctx, o) if err != nil { return s, err } @@ -1048,8 +1047,3 @@ func (b realByteSlice) Range(start, end int) []byte { func (b realByteSlice) Sub(start, end int) index.ByteSlice { return b[start:end] } - -type Symbols interface { - Lookup(o uint32) (string, error) - ReverseLookup(sym string) (uint32, error) -} diff --git a/pkg/block/indexheader/header.go b/pkg/block/indexheader/header.go index d0b4141afd8..225f5cd31e7 100644 --- a/pkg/block/indexheader/header.go +++ b/pkg/block/indexheader/header.go @@ -4,6 +4,7 @@ package indexheader import ( + "context" "io" "github.com/pkg/errors" @@ -34,7 +35,7 @@ type Reader interface { // LookupSymbol returns string based on given reference. // Error is return if the symbol can't be found. - LookupSymbol(o uint32) (string, error) + LookupSymbol(ctx context.Context, o uint32) (string, error) // LabelValues returns all label values for given label name or error. // If no values are found for label name, or label name does not exists, diff --git a/pkg/block/indexheader/header_test.go b/pkg/block/indexheader/header_test.go index 2ae833d8fc2..e6421ee3e55 100644 --- a/pkg/block/indexheader/header_test.go +++ b/pkg/block/indexheader/header_test.go @@ -110,7 +110,7 @@ func TestReaders(t *testing.T) { testutil.Equals(t, 2, br.indexVersion) testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 70}, br.toc) testutil.Equals(t, int64(710), br.indexLastPostingEnd) - testutil.Equals(t, 8, br.symbols.(*index.Symbols).Size()) + testutil.Equals(t, 8, br.symbols.Size()) testutil.Equals(t, 0, len(br.postingsV1)) testutil.Equals(t, 2, len(br.nameSymbols)) testutil.Equals(t, map[string]*postingValueOffsets{ @@ -219,6 +219,8 @@ func TestReaders(t *testing.T) { } func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerReader Reader) { + ctx := context.Background() + indexReader, err := index.NewReader(indexByteSlice) testutil.Ok(t, err) defer func() { _ = indexReader.Close() }() @@ -232,14 +234,14 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe iter := indexReader.Symbols() i := 0 for iter.Next() { - r, err := headerReader.LookupSymbol(uint32(i)) + r, err := headerReader.LookupSymbol(ctx, uint32(i)) testutil.Ok(t, err) testutil.Equals(t, iter.At(), r) i++ } testutil.Ok(t, iter.Err()) - _, err := headerReader.LookupSymbol(uint32(i)) + _, err := headerReader.LookupSymbol(ctx, uint32(i)) testutil.NotOk(t, err) } else { @@ -248,19 +250,19 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe testutil.Ok(t, err) for refs, sym := range symbols { - r1, err := headerReader.LookupSymbol(refs) + r1, err := headerReader.LookupSymbol(ctx, refs) testutil.Ok(t, err) testutil.Equals(t, sym, r1) - r2, err := headerReader.LookupSymbol(refs) + r2, err := headerReader.LookupSymbol(ctx, refs) testutil.Ok(t, err) testutil.Equals(t, sym, r2) } - _, err = headerReader.LookupSymbol(200000) + _, err = headerReader.LookupSymbol(ctx, 200000) testutil.NotOk(t, err) } - expLabelNames, err := indexReader.LabelNames() + expLabelNames, err := indexReader.LabelNames(ctx) testutil.Ok(t, err) actualLabelNames, err := headerReader.LabelNames() testutil.Ok(t, err) @@ -272,7 +274,7 @@ func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerRe minStart := int64(math.MaxInt64) maxEnd := int64(math.MinInt64) for il, lname := range expLabelNames { - expectedLabelVals, err := indexReader.SortedLabelValues(lname) + expectedLabelVals, err := indexReader.SortedLabelValues(ctx, lname) testutil.Ok(t, err) vals, err := headerReader.LabelValues(lname) @@ -451,7 +453,7 @@ func benchmarkBinaryReaderLookupSymbol(b *testing.B, numSeries int) { for n := 0; n < b.N; n++ { for i := 0; i < len(symbolsOffsets); i++ { - if _, err := reader.LookupSymbol(symbolsOffsets[i]); err != nil { + if _, err := reader.LookupSymbol(ctx, symbolsOffsets[i]); err != nil { b.Fail() } } @@ -519,78 +521,3 @@ func readSymbols(bs index.ByteSlice, version, off int) ([]string, map[uint32]str } return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols") } - -func TestIndexHeaderV1LookupSymbols(t *testing.T) { - ctx := context.Background() - - tmpDir := t.TempDir() - bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) - testutil.Ok(t, err) - defer func() { testutil.Ok(t, bkt.Close()) }() - - m, err := metadata.ReadFromDir("./testdata/index_format_v1") - testutil.Ok(t, err) - e2eutil.Copy(t, "./testdata/index_format_v1", filepath.Join(tmpDir, m.ULID.String())) - - _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, m.ULID.String()), metadata.Thanos{ - Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), - Downsample: metadata.ThanosDownsample{Resolution: 0}, - Source: metadata.TestSource, - }, &m.BlockMeta) - testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()), metadata.NoneFunc)) - - fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename) - _, err = WriteBinary(ctx, bkt, m.ULID, fn) - testutil.Ok(t, err) - - br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, m.ULID, 3) - testutil.Ok(t, err) - - defer func() { testutil.Ok(t, br.Close()) }() - - indexFile, err := fileutil.OpenMmapFile(filepath.Join(tmpDir, m.ULID.String(), block.IndexFilename)) - testutil.Ok(t, err) - defer func() { _ = indexFile.Close() }() - - // This should get the correct symbol table and its offset. - symbols, err := getSymbolTable(realByteSlice(indexFile.Bytes())) - testutil.Ok(t, err) - - nameSymbolSet := make(map[string]struct{}, 0) - for _, symbol := range br.nameSymbols { - nameSymbolSet[symbol] = struct{}{} - } - - // Make sure we can look up the correct symbol for values. - for o, sym := range symbols { - if _, ok := nameSymbolSet[sym]; !ok { - res, err := br.LookupSymbol(o) - testutil.Ok(t, err) - testutil.Equals(t, sym, res) - } - } - - // For names, we want to always hit name cache so use - // `ErrorSymbols` to make sure no cache miss. - br.symbols = ErrorSymbols{} - for o, sym := range symbols { - if _, ok := nameSymbolSet[sym]; ok { - res, err := br.LookupSymbol(o) - testutil.Ok(t, err) - testutil.Equals(t, sym, res) - } - } -} - -// ErrorSymbols will throw error if its methods are getting called. -type ErrorSymbols struct { -} - -func (s ErrorSymbols) Lookup(o uint32) (string, error) { - return "", errors.New("shouldn't be called") -} - -func (s ErrorSymbols) ReverseLookup(sym string) (uint32, error) { - return 0, errors.New("shouldn't be called") -} diff --git a/pkg/block/indexheader/lazy_binary_reader.go b/pkg/block/indexheader/lazy_binary_reader.go index 451a79b6ee5..45d36f3bab8 100644 --- a/pkg/block/indexheader/lazy_binary_reader.go +++ b/pkg/block/indexheader/lazy_binary_reader.go @@ -181,7 +181,7 @@ func (r *LazyBinaryReader) PostingsOffset(name, value string) (index.Range, erro } // LookupSymbol implements Reader. -func (r *LazyBinaryReader) LookupSymbol(o uint32) (string, error) { +func (r *LazyBinaryReader) LookupSymbol(ctx context.Context, o uint32) (string, error) { r.readerMx.RLock() defer r.readerMx.RUnlock() @@ -190,7 +190,7 @@ func (r *LazyBinaryReader) LookupSymbol(o uint32) (string, error) { } r.usedAt.Store(time.Now().UnixNano()) - return r.reader.LookupSymbol(o) + return r.reader.LookupSymbol(ctx, o) } // LabelValues implements Reader. diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 2e20ad73db9..af7425fa551 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1003,13 +1003,13 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return errors.Wrapf(err, "read meta from %s", bdir) } - resid, err := block.Repair(logger, tmpdir, ie.id, metadata.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) + resid, err := block.Repair(ctx, logger, tmpdir, ie.id, metadata.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) if err != nil { return errors.Wrapf(err, "repair failed for block %s", ie.id) } // Verify repaired id before uploading it. - if err := block.VerifyIndex(logger, filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { + if err := block.VerifyIndex(ctx, logger, filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { return errors.Wrapf(err, "repaired block is invalid %s", resid) } @@ -1091,7 +1091,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // Ensure all input blocks are valid. var stats block.HealthStats if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) { - stats, e = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + stats, e = block.GatherIndexHealthStats(ctx, cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) return e }, opentracing.Tags{"block.id": meta.ULID}); err != nil { return errors.Wrapf(err, "gather index issues for block %s", bdir) @@ -1174,7 +1174,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp var stats block.HealthStats // Ensure the output block is valid. err = tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error { - stats, err = block.GatherIndexHealthStats(cg.logger, index, newMeta.MinTime, newMeta.MaxTime) + stats, err = block.GatherIndexHealthStats(ctx, cg.logger, index, newMeta.MinTime, newMeta.MaxTime) if err != nil { return err } diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 51d895058ac..829cc8d4565 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -24,7 +24,6 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" - "github.com/prometheus/prometheus/tsdb/tsdbutil" "golang.org/x/sync/errgroup" "github.com/thanos-io/objstore" @@ -50,6 +49,7 @@ const ( // Downsample downsamples the given block. It writes a new block into dir and returns its ID. func Downsample( + ctx context.Context, logger log.Logger, origMeta *metadata.Meta, b tsdb.BlockReader, @@ -104,7 +104,8 @@ func Downsample( } defer runutil.CloseWithErrCapture(&err, streamedBlockWriter, "close stream block writer") - postings, err := indexr.Postings(index.AllPostingsKey()) + key, values := index.AllPostingsKey() + postings, err := indexr.Postings(ctx, key, values) if err != nil { return id, errors.Wrap(err, "get all postings list") } @@ -788,7 +789,7 @@ func (it *AverageChunkIterator) Err() error { } // SamplesFromTSDBSamples converts tsdbutil.Sample slice to samples. -func SamplesFromTSDBSamples(samples []tsdbutil.Sample) []sample { +func SamplesFromTSDBSamples(samples []chunks.Sample) []sample { res := make([]sample, len(samples)) for i, s := range samples { res[i] = sample{t: s.T(), v: s.F()} diff --git a/pkg/compact/downsample/downsample_test.go b/pkg/compact/downsample/downsample_test.go index 16cc9246172..36b5f0ff3c6 100644 --- a/pkg/compact/downsample/downsample_test.go +++ b/pkg/compact/downsample/downsample_test.go @@ -4,6 +4,7 @@ package downsample import ( + "context" "math" "os" "path/filepath" @@ -1134,6 +1135,7 @@ func TestDownsample(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) dir := t.TempDir() + ctx := context.Background() // Ideally we would use tsdb.HeadBlock here for less dependency on our own code. However, // it cannot accept the counter signal sample with the same timestamp as the previous sample. @@ -1146,7 +1148,7 @@ func TestDownsample(t *testing.T) { fakeMeta.Thanos.Downsample.Resolution = tcase.resolution - 1 } - id, err := Downsample(logger, fakeMeta, mb, dir, tcase.resolution) + id, err := Downsample(ctx, logger, fakeMeta, mb, dir, tcase.resolution) if tcase.expectedDownsamplingErr != nil { testutil.NotOk(t, err) testutil.Equals(t, tcase.expectedDownsamplingErr(ser.chunks).Error(), err.Error()) @@ -1165,7 +1167,8 @@ func TestDownsample(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, chunkr.Close()) }() - pall, err := indexr.Postings(index.AllPostingsKey()) + key, values := index.AllPostingsKey() + pall, err := indexr.Postings(ctx, key, values) testutil.Ok(t, err) var series []storage.SeriesRef @@ -1210,6 +1213,7 @@ func TestDownsample(t *testing.T) { func TestDownsampleAggrAndEmptyXORChunks(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) dir := t.TempDir() + ctx := context.Background() ser := &series{lset: labels.FromStrings("__name__", "a")} aggr := map[AggrType][]sample{ @@ -1231,15 +1235,16 @@ func TestDownsampleAggrAndEmptyXORChunks(t *testing.T) { fakeMeta := &metadata.Meta{} fakeMeta.Thanos.Downsample.Resolution = 300_000 - id, err := Downsample(logger, fakeMeta, mb, dir, 3_600_000) + id, err := Downsample(ctx, logger, fakeMeta, mb, dir, 3_600_000) _ = id testutil.Ok(t, err) } func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) { - logger := log.NewLogfmtLogger(os.Stderr) dir := t.TempDir() + ctx := context.Background() + ser := &series{lset: labels.FromStrings("__name__", "a")} aggr := map[AggrType][]sample{ AggrCount: {{t: 1587690299999, v: 20}, {t: 1587690599999, v: 20}, {t: 1587690899999, v: 20}}, @@ -1265,7 +1270,7 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) { fakeMeta := &metadata.Meta{} fakeMeta.Thanos.Downsample.Resolution = 300_000 - id, err := Downsample(logger, fakeMeta, mb, dir, 3_600_000) + id, err := Downsample(ctx, logger, fakeMeta, mb, dir, 3_600_000) _ = id testutil.Ok(t, err) @@ -1290,7 +1295,8 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, chunkr.Close()) }() - pall, err := indexr.Postings(index.AllPostingsKey()) + key, values := index.AllPostingsKey() + pall, err := indexr.Postings(ctx, key, values) testutil.Ok(t, err) var series []storage.SeriesRef @@ -1637,23 +1643,23 @@ func TestSamplesFromTSDBSamples(t *testing.T) { for _, tcase := range []struct { name string - input []tsdbutil.Sample + input []chunks.Sample expected []sample }{ { name: "empty", - input: []tsdbutil.Sample{}, + input: []chunks.Sample{}, expected: []sample{}, }, { name: "one sample", - input: []tsdbutil.Sample{testSample{1, 1}}, + input: []chunks.Sample{testSample{1, 1}}, expected: []sample{{1, 1}}, }, { name: "multiple samples", - input: []tsdbutil.Sample{testSample{1, 1}, testSample{2, 2}, testSample{3, 3}, testSample{4, 4}, testSample{5, 5}}, + input: []chunks.Sample{testSample{1, 1}, testSample{2, 2}, testSample{3, 3}, testSample{4, 4}, testSample{5, 5}}, expected: []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}}, }, } { @@ -1664,7 +1670,7 @@ func TestSamplesFromTSDBSamples(t *testing.T) { } } -// testSample implements tsdbutil.Sample interface. +// testSample implements chunks.Sample interface. type testSample struct { t int64 f float64 @@ -1845,7 +1851,7 @@ func (b *memBlock) Meta() tsdb.BlockMeta { return tsdb.BlockMeta{} } -func (b *memBlock) Postings(name string, val ...string) (index.Postings, error) { +func (b *memBlock) Postings(_ context.Context, name string, val ...string) (index.Postings, error) { allName, allVal := index.AllPostingsKey() if name != allName || val[0] != allVal { diff --git a/pkg/compactv2/chunk_series_set.go b/pkg/compactv2/chunk_series_set.go index 188f5e28aff..52653350e9f 100644 --- a/pkg/compactv2/chunk_series_set.go +++ b/pkg/compactv2/chunk_series_set.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/thanos/pkg/block" ) @@ -74,7 +75,7 @@ func (s *lazyPopulateChunkSeriesSet) Err() error { return s.all.Err() } -func (s *lazyPopulateChunkSeriesSet) Warnings() storage.Warnings { return nil } +func (s *lazyPopulateChunkSeriesSet) Warnings() annotations.Annotations { return nil } type lazyPopulatableChunk struct { m *chunks.Meta diff --git a/pkg/compactv2/compactor.go b/pkg/compactv2/compactor.go index f5c05fddc2b..b8eaaf6eecf 100644 --- a/pkg/compactv2/compactor.go +++ b/pkg/compactv2/compactor.go @@ -166,7 +166,7 @@ func compactSeries(ctx context.Context, sReaders ...seriesReader) (symbols index } k, v := index.AllPostingsKey() - all, err := r.ir.Postings(k, v) + all, err := r.ir.Postings(ctx, k, v) if err != nil { return nil, nil, err } diff --git a/pkg/compactv2/compactor_test.go b/pkg/compactv2/compactor_test.go index e0ac1e4bd46..5a853d9f9a3 100644 --- a/pkg/compactv2/compactor_test.go +++ b/pkg/compactv2/compactor_test.go @@ -651,6 +651,8 @@ type seriesSamples struct { } func readBlockSeries(t *testing.T, bDir string) []seriesSamples { + ctx := context.Background() + indexr, err := index.NewFileReader(filepath.Join(bDir, block.IndexFilename)) testutil.Ok(t, err) defer indexr.Close() @@ -659,7 +661,8 @@ func readBlockSeries(t *testing.T, bDir string) []seriesSamples { testutil.Ok(t, err) defer chunkr.Close() - all, err := indexr.Postings(index.AllPostingsKey()) + k, v := index.AllPostingsKey() + all, err := indexr.Postings(ctx, k, v) testutil.Ok(t, err) all = indexr.SortedPostings(all) diff --git a/pkg/compactv2/modifiers.go b/pkg/compactv2/modifiers.go index a6e369e3b46..d291be131af 100644 --- a/pkg/compactv2/modifiers.go +++ b/pkg/compactv2/modifiers.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -146,7 +147,7 @@ func (d *delModifierSeriesSet) Err() error { return d.ChunkSeriesSet.Err() } -func (d *delModifierSeriesSet) Warnings() storage.Warnings { +func (d *delModifierSeriesSet) Warnings() annotations.Annotations { return d.ChunkSeriesSet.Warnings() } @@ -504,6 +505,6 @@ func (s *listChunkSeriesSet) Next() bool { return s.idx < len(s.css) } -func (s *listChunkSeriesSet) At() storage.ChunkSeries { return s.css[s.idx] } -func (s *listChunkSeriesSet) Err() error { return nil } -func (s *listChunkSeriesSet) Warnings() storage.Warnings { return nil } +func (s *listChunkSeriesSet) At() storage.ChunkSeries { return s.css[s.idx] } +func (s *listChunkSeriesSet) Err() error { return nil } +func (s *listChunkSeriesSet) Warnings() annotations.Annotations { return nil } diff --git a/pkg/dedup/chunk_iter_test.go b/pkg/dedup/chunk_iter_test.go index bcfdd139195..3b3fff42028 100644 --- a/pkg/dedup/chunk_iter_test.go +++ b/pkg/dedup/chunk_iter_test.go @@ -10,7 +10,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/efficientgo/core/testutil" @@ -35,9 +34,9 @@ func TestDedupChunkSeriesMerger(t *testing.T) { { name: "single series", input: []storage.ChunkSeries{ - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}}), }, - expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}), + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}}), }, { name: "two empty series", @@ -50,86 +49,86 @@ func TestDedupChunkSeriesMerger(t *testing.T) { { name: "two non overlapping", input: []storage.ChunkSeries{ - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}), - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{7, 7}, sample{9, 9}}, []chunks.Sample{sample{10, 10}}), }, - expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{5, 5}}, []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}, sample{5, 5}}, []chunks.Sample{sample{7, 7}, sample{9, 9}}, []chunks.Sample{sample{10, 10}}), }, { name: "two overlapping", input: []storage.ChunkSeries{ - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}), - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}, sample{8, 8}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{7, 7}, sample{9, 9}}, []chunks.Sample{sample{10, 10}}), }, - expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}, sample{8, 8}}, []tsdbutil.Sample{sample{10, 10}}), + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{3, 3}, sample{8, 8}}, []chunks.Sample{sample{10, 10}}), }, { name: "two overlapping with large time diff", input: []storage.ChunkSeries{ - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{2, 2}, sample{5008, 5008}}), - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{7, 7}, sample{9, 9}}, []tsdbutil.Sample{sample{10, 10}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}}, []chunks.Sample{sample{2, 2}, sample{5008, 5008}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{7, 7}, sample{9, 9}}, []chunks.Sample{sample{10, 10}}), }, // sample{5008, 5008} is added to the result due to its large timestamp. - expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{5008, 5008}}), + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{5008, 5008}}), }, { name: "two duplicated", input: []storage.ChunkSeries{ - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{2, 2}, sample{3, 3}, sample{5, 5}}), }, - expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), }, { name: "three overlapping", input: []storage.ChunkSeries{ - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}), - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{2, 2}, sample{3, 3}, sample{6, 6}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{0, 0}, sample{4, 4}}), }, // only samples from the last series are retained due to high penalty. - expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{4, 4}}), + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{0, 0}, sample{4, 4}}), }, { name: "three in chained overlap", input: []storage.ChunkSeries{ - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{4, 4}, sample{6, 66}}), - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{6, 6}, sample{10, 10}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{4, 4}, sample{6, 66}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{6, 6}, sample{10, 10}}), }, // only samples from the last series are retained due to high penalty. - expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), + expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{5, 5}}), }, { name: "three in chained overlap complex", input: []storage.ChunkSeries{ - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, []tsdbutil.Sample{sample{10, 10}, sample{15, 15}}), - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{2, 2}, sample{20, 20}}, []tsdbutil.Sample{sample{25, 25}, sample{30, 30}}), - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{18, 18}, sample{26, 26}}, []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{0, 0}, sample{5, 5}}, []chunks.Sample{sample{10, 10}, sample{15, 15}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{2, 2}, sample{20, 20}}, []chunks.Sample{sample{25, 25}, sample{30, 30}}), + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), []chunks.Sample{sample{18, 18}, sample{26, 26}}, []chunks.Sample{sample{31, 31}, sample{35, 35}}), }, expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), - []tsdbutil.Sample{sample{0, 0}, sample{5, 5}}, - []tsdbutil.Sample{sample{31, 31}, sample{35, 35}}, + []chunks.Sample{sample{0, 0}, sample{5, 5}}, + []chunks.Sample{sample{31, 31}, sample{35, 35}}, ), }, { name: "110 overlapping samples", input: []storage.ChunkSeries{ - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 110)), // [0 - 110) - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 50)), // [60 - 110) + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), chunks.GenerateSamples(0, 110)), // [0 - 110) + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), chunks.GenerateSamples(60, 50)), // [60 - 110) }, expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), - tsdbutil.GenerateSamples(0, 110), + chunks.GenerateSamples(0, 110), ), }, { name: "150 overlapping samples, no chunk splitting due to penalty deduplication", input: []storage.ChunkSeries{ - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 90)), // [0 - 90) - storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 90)), // [90 - 150) + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), chunks.GenerateSamples(0, 90)), // [0 - 90) + storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), chunks.GenerateSamples(60, 90)), // [90 - 150) }, expected: storage.NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), - tsdbutil.GenerateSamples(0, 90), + chunks.GenerateSamples(0, 90), ), }, } { @@ -149,7 +148,7 @@ func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) { m := NewChunkSeriesMerger() defaultLabels := labels.FromStrings("bar", "baz") - emptySamples := downsample.SamplesFromTSDBSamples([]tsdbutil.Sample{}) + emptySamples := downsample.SamplesFromTSDBSamples([]chunks.Sample{}) // Samples are created with step 1m. So the 5m downsampled chunk has 2 samples. samples1 := downsample.SamplesFromTSDBSamples(createSamplesWithStep(0, 10, 60*1000)) // Non overlapping samples with samples1. 5m downsampled chunk has 2 samples. @@ -288,7 +287,7 @@ func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) { expected: &storage.ChunkSeriesEntry{ Lset: defaultLabels, ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator { - samples := [][]tsdbutil.Sample{ + samples := [][]chunks.Sample{ {sample{299999, 3}, sample{540000, 5}}, {sample{299999, 540000}, sample{540000, 2100000}}, {sample{299999, 120000}, sample{540000, 300000}}, @@ -297,7 +296,7 @@ func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) { } var chks [5]chunkenc.Chunk for i, s := range samples { - chk, err := tsdbutil.ChunkFromSamples(s) + chk, err := chunks.ChunkFromSamples(s) testutil.Ok(t, err) chks[i] = chk.Chunk } @@ -322,8 +321,8 @@ func TestDedupChunkSeriesMergerDownsampledChunks(t *testing.T) { } } -func createSamplesWithStep(start, numOfSamples, step int) []tsdbutil.Sample { - res := make([]tsdbutil.Sample, numOfSamples) +func createSamplesWithStep(start, numOfSamples, step int) []chunks.Sample { + res := make([]chunks.Sample, numOfSamples) cur := start for i := 0; i < numOfSamples; i++ { res[i] = sample{t: int64(cur), f: float64(cur)} diff --git a/pkg/dedup/iter.go b/pkg/dedup/iter.go index 31b63cfad2a..8f60c40363b 100644 --- a/pkg/dedup/iter.go +++ b/pkg/dedup/iter.go @@ -10,6 +10,8 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -200,7 +202,7 @@ func (s *dedupSeriesSet) Err() error { return s.set.Err() } -func (s *dedupSeriesSet) Warnings() storage.Warnings { +func (s *dedupSeriesSet) Warnings() annotations.Annotations { return s.set.Warnings() } diff --git a/pkg/dedup/iter_test.go b/pkg/dedup/iter_test.go index 3579f69fd0e..8b9b4161258 100644 --- a/pkg/dedup/iter_test.go +++ b/pkg/dedup/iter_test.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/efficientgo/core/testutil" @@ -72,7 +73,7 @@ func (s *mockedSeriesSet) At() storage.Series { } func (s *mockedSeriesSet) Err() error { return nil } -func (s *mockedSeriesSet) Warnings() storage.Warnings { return nil } +func (s *mockedSeriesSet) Warnings() annotations.Annotations { return nil } type mockedSeriesIterator struct { cur int diff --git a/pkg/exemplars/exemplars.go b/pkg/exemplars/exemplars.go index 2817d155557..9638074bd02 100644 --- a/pkg/exemplars/exemplars.go +++ b/pkg/exemplars/exemplars.go @@ -9,7 +9,8 @@ import ( "sync" "github.com/pkg/errors" - "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/tracing" @@ -20,7 +21,7 @@ var _ UnaryClient = &GRPCClient{} // UnaryClient is gRPC exemplarspb.Exemplars client which expands streaming exemplars API. Useful for consumers that does not // support streaming. type UnaryClient interface { - Exemplars(ctx context.Context, req *exemplarspb.ExemplarsRequest) ([]*exemplarspb.ExemplarData, storage.Warnings, error) + Exemplars(ctx context.Context, req *exemplarspb.ExemplarsRequest) ([]*exemplarspb.ExemplarData, annotations.Annotations, error) } // GRPCClient allows to retrieve exemplars from local gRPC streaming server implementation. @@ -36,7 +37,7 @@ type exemplarsServer struct { exemplarspb.Exemplars_ExemplarsServer ctx context.Context - warnings []error + warnings annotations.Annotations data []*exemplarspb.ExemplarData mu sync.Mutex } @@ -45,7 +46,7 @@ func (srv *exemplarsServer) Send(res *exemplarspb.ExemplarsResponse) error { if res.GetWarning() != "" { srv.mu.Lock() defer srv.mu.Unlock() - srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) + srv.warnings.Add(errors.New(res.GetWarning())) return nil } @@ -79,7 +80,7 @@ func NewGRPCClientWithDedup(es exemplarspb.ExemplarsServer, replicaLabels []stri return c } -func (rr *GRPCClient) Exemplars(ctx context.Context, req *exemplarspb.ExemplarsRequest) ([]*exemplarspb.ExemplarData, storage.Warnings, error) { +func (rr *GRPCClient) Exemplars(ctx context.Context, req *exemplarspb.ExemplarsRequest) ([]*exemplarspb.ExemplarData, annotations.Annotations, error) { span, ctx := tracing.StartSpan(ctx, "exemplar_grpc_request") defer span.Finish() diff --git a/pkg/metadata/metadata.go b/pkg/metadata/metadata.go index f3aae015a87..8067ee224c1 100644 --- a/pkg/metadata/metadata.go +++ b/pkg/metadata/metadata.go @@ -8,7 +8,8 @@ import ( "sync" "github.com/pkg/errors" - "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -18,7 +19,7 @@ var _ UnaryClient = &GRPCClient{} // UnaryClient is a gRPC metadatapb.Metadata client which expands streaming metadata API. Useful for consumers that does not // support streaming. type UnaryClient interface { - MetricMetadata(ctx context.Context, req *metadatapb.MetricMetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) + MetricMetadata(ctx context.Context, req *metadatapb.MetricMetadataRequest) (map[string][]metadatapb.Meta, annotations.Annotations, error) } // GRPCClient allows to retrieve metadata from local gRPC streaming server implementation. @@ -33,7 +34,7 @@ func NewGRPCClient(ts metadatapb.MetadataServer) *GRPCClient { } } -func (rr *GRPCClient) MetricMetadata(ctx context.Context, req *metadatapb.MetricMetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) { +func (rr *GRPCClient) MetricMetadata(ctx context.Context, req *metadatapb.MetricMetadataRequest) (map[string][]metadatapb.Meta, annotations.Annotations, error) { span, ctx := tracing.StartSpan(ctx, "metadata_grpc_request") defer span.Finish() @@ -66,7 +67,7 @@ type metadataServer struct { metric string limit int - warnings []error + warnings annotations.Annotations metadataMap map[string][]metadatapb.Meta mu sync.Mutex } @@ -75,7 +76,7 @@ func (srv *metadataServer) Send(res *metadatapb.MetricMetadataResponse) error { if res.GetWarning() != "" { srv.mu.Lock() defer srv.mu.Unlock() - srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) + srv.warnings.Add(errors.New(res.GetWarning())) return nil } diff --git a/pkg/metadata/prometheus_test.go b/pkg/metadata/prometheus_test.go index a486196c262..b45229c2947 100644 --- a/pkg/metadata/prometheus_test.go +++ b/pkg/metadata/prometheus_test.go @@ -11,11 +11,10 @@ import ( "testing" "time" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/pkg/errors" - "github.com/prometheus/prometheus/storage" - - "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/thanos/pkg/metadata/metadatapb" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" @@ -115,7 +114,7 @@ scrape_configs: Limit: tcase.limit, }) - testutil.Equals(t, storage.Warnings(nil), w) + testutil.Equals(t, annotations.Annotations(nil), w) testutil.Ok(t, err) testutil.Assert(t, tcase.expectedFunc(meta)) }) diff --git a/pkg/query/iter.go b/pkg/query/iter.go index 8d1d043b203..b9f40999956 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/dedup" @@ -23,7 +24,7 @@ type promSeriesSet struct { mint, maxt int64 aggrs []storepb.Aggr - warns storage.Warnings + warns annotations.Annotations } func (s *promSeriesSet) Next() bool { @@ -43,7 +44,7 @@ func (s *promSeriesSet) Err() error { return s.set.Err() } -func (s *promSeriesSet) Warnings() storage.Warnings { +func (s *promSeriesSet) Warnings() annotations.Annotations { return s.warns } @@ -297,7 +298,7 @@ func (c *lazySeriesSet) At() storage.Series { return nil } -func (c *lazySeriesSet) Warnings() storage.Warnings { +func (c *lazySeriesSet) Warnings() annotations.Annotations { if c.set != nil { return c.set.Warnings() } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 8f668374323..52a5c46f743 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -12,10 +12,13 @@ import ( "github.com/go-kit/log" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" @@ -117,14 +120,12 @@ type queryable struct { } // Querier returns a new storage querier against the underlying proxy store API. -func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil +func (q *queryable) Querier(mint, maxt int64) (storage.Querier, error) { + return newQuerier(q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil } type querier struct { - ctx context.Context logger log.Logger - cancel func() mint, maxt int64 replicaLabels []string storeDebugMatchers [][]*labels.Matcher @@ -143,7 +144,6 @@ type querier struct { // newQuerier creates implementation of storage.Querier that fetches data from the proxy // store API endpoints. func newQuerier( - ctx context.Context, logger log.Logger, mint, maxt int64, @@ -163,8 +163,6 @@ func newQuerier( if logger == nil { logger = log.NewNopLogger() } - ctx, cancel := context.WithCancel(ctx) - rl := make(map[string]struct{}) for _, replicaLabel := range replicaLabels { rl[replicaLabel] = struct{}{} @@ -175,9 +173,7 @@ func newQuerier( partialResponseStrategy = storepb.PartialResponseStrategy_WARN } return &querier{ - ctx: ctx, logger: logger, - cancel: cancel, selectGate: selectGate, selectTimeout: selectTimeout, @@ -207,12 +203,12 @@ type seriesServer struct { seriesSet []storepb.Series seriesSetStats storepb.SeriesStatsCounter - warnings []string + warnings annotations.Annotations } func (s *seriesServer) Send(r *storepb.SeriesResponse) error { if r.GetWarning() != "" { - s.warnings = append(s.warnings, r.GetWarning()) + s.warnings.Add(errors.New(r.GetWarning())) return nil } @@ -267,7 +263,7 @@ func storeHintsFromPromHints(hints *storage.SelectHints) *storepb.QueryHints { } } -func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { +func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { if hints == nil { hints = &storage.SelectHints{ Start: q.mint, @@ -283,7 +279,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match // The querier has a context, but it gets canceled as soon as query evaluation is completed by the engine. // We want to prevent this from happening for the async store API calls we make while preserving tracing context. // TODO(bwplotka): Does the above still is true? It feels weird to leave unfinished calls behind query API. - ctx := tracing.CopyTraceContext(context.Background(), q.ctx) + ctx = tracing.CopyTraceContext(context.Background(), ctx) ctx, cancel := context.WithTimeout(ctx, q.selectTimeout) span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{ "minTime": hints.Start, @@ -341,7 +337,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) - ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey)) + ctx = context.WithValue(ctx, tenancy.TenantKey, ctx.Value(tenancy.TenantKey)) // TODO(bwplotka): Use inprocess gRPC when we want to stream responses. // Currently streaming won't help due to nature of the both PromQL engine which @@ -368,11 +364,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . if err := q.proxy.Series(&req, resp); err != nil { return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()") } - - var warns storage.Warnings - for _, w := range resp.warnings { - warns = append(warns, errors.New(w)) - } + warns := annotations.New().Merge(resp.warnings) if q.enableQueryPushdown && (hints.Func == "max_over_time" || hints.Func == "min_over_time") { // On query pushdown, delete the metric's name from the result because that's what the @@ -415,13 +407,13 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . } // LabelValues returns all potential values for a label name. -func (q *querier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - span, ctx := tracing.StartSpan(q.ctx, "querier_label_values") +func (q *querier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + span, ctx := tracing.StartSpan(ctx, "querier_label_values") defer span.Finish() // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) - ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey)) + ctx = context.WithValue(ctx, tenancy.TenantKey, ctx.Value(tenancy.TenantKey)) pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) if err != nil { @@ -439,9 +431,9 @@ func (q *querier) LabelValues(name string, matchers ...*labels.Matcher) ([]strin return nil, nil, errors.Wrap(err, "proxy LabelValues()") } - var warns storage.Warnings + var warns annotations.Annotations for _, w := range resp.Warnings { - warns = append(warns, errors.New(w)) + warns.Add(errors.New(w)) } return resp.Values, warns, nil @@ -449,13 +441,13 @@ func (q *querier) LabelValues(name string, matchers ...*labels.Matcher) ([]strin // LabelNames returns all the unique label names present in the block in sorted order constrained // by the given matchers. -func (q *querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - span, ctx := tracing.StartSpan(q.ctx, "querier_label_names") +func (q *querier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + span, ctx := tracing.StartSpan(ctx, "querier_label_names") defer span.Finish() // TODO(bwplotka): Pass it using the SeriesRequest instead of relying on context. ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeDebugMatchers) - ctx = context.WithValue(ctx, tenancy.TenantKey, q.ctx.Value(tenancy.TenantKey)) + ctx = context.WithValue(ctx, tenancy.TenantKey, ctx.Value(tenancy.TenantKey)) pbMatchers, err := storepb.PromMatchersToMatchers(matchers...) if err != nil { @@ -472,15 +464,14 @@ func (q *querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.War return nil, nil, errors.Wrap(err, "proxy LabelNames()") } - var warns storage.Warnings + var warns annotations.Annotations for _, w := range resp.Warnings { - warns = append(warns, errors.New(w)) + warns.Add(errors.New(w)) } return resp.Names, warns, nil } func (q *querier) Close() error { - q.cancel() return nil } diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index ea005e8eda1..faf56e1113b 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/gate" "github.com/thanos-io/thanos/pkg/component" @@ -59,7 +60,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { NoopSeriesStatsReporter, ) - q, err := queryable.Querier(context.Background(), 0, 42) + q, err := queryable.Querier(0, 42) testutil.Ok(t, err) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) @@ -394,7 +395,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) { g := gate.New(2) mq := &mockedQueryable{ Creator: func(mint, maxt int64) storage.Querier { - return newQuerier(context.Background(), nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + return newQuerier(nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) }, } t.Cleanup(func() { @@ -409,7 +410,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) { if tcase.expectedWarning != "" { warns := res.Warnings testutil.Assert(t, len(warns) == 1, "expected only single warnings") - testutil.Equals(t, tcase.expectedWarning, warns[0].Error()) + testutil.Equals(t, tcase.expectedWarning, warns.AsErrors()[0]) } } @@ -773,7 +774,6 @@ func TestQuerier_Select(t *testing.T) { } { g := gate.New(2) q := newQuerier( - context.Background(), nil, tcase.mint, tcase.maxt, @@ -794,13 +794,13 @@ func TestQuerier_Select(t *testing.T) { t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) { t.Run("querier.Select", func(t *testing.T) { - res := q.Select(false, tcase.hints, tcase.matchers...) + res := q.Select(context.Background(), false, tcase.hints, tcase.matchers...) testSelectResponse(t, sc.expected, res) if tcase.expectedWarning != "" { - w := res.Warnings() - testutil.Equals(t, 1, len(w)) - testutil.Equals(t, tcase.expectedWarning, w[0].Error()) + warns := res.Warnings() + testutil.Equals(t, 1, len(warns)) + testutil.Equals(t, tcase.expectedWarning, warns.AsErrors()[0].Error()) } }) // Integration test: Make sure the PromQL would select exactly the same. @@ -817,13 +817,11 @@ func TestQuerier_Select(t *testing.T) { warns := catcher.warns() // We don't care about anything else, all should be recorded. - testutil.Assert(t, len(warns) == 1, "expected only single warnings") testutil.Assert(t, len(catcher.resp) == 1, "expected only single response, subqueries?") - w := warns[0] if tcase.expectedWarning != "" { - testutil.Equals(t, 1, len(w)) - testutil.Equals(t, tcase.expectedWarning, w[0].Error()) + testutil.Equals(t, 1, len(warns)) + testutil.Equals(t, tcase.expectedWarning, warns.AsErrors()[0].Error()) } }) }) @@ -966,7 +964,7 @@ type mockedQueryable struct { // Querier creates a querier with the provided min and max time. // The promQL engine sets mint and it is calculated based on the default lookback delta. -func (q *mockedQueryable) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { +func (q *mockedQueryable) Querier(mint, maxt int64) (storage.Querier, error) { if q.Creator == nil { return q.querier, nil } @@ -993,18 +991,18 @@ type querierResponseCatcher struct { resp []storage.SeriesSet } -func (q *querierResponseCatcher) Select(selectSorted bool, p *storage.SelectHints, m ...*labels.Matcher) storage.SeriesSet { - s := q.Querier.Select(selectSorted, p, m...) +func (q *querierResponseCatcher) Select(ctx context.Context, selectSorted bool, p *storage.SelectHints, m ...*labels.Matcher) storage.SeriesSet { + s := q.Querier.Select(ctx, selectSorted, p, m...) q.resp = append(q.resp, s) return storage.NoopSeriesSet() } func (q querierResponseCatcher) Close() error { return nil } -func (q *querierResponseCatcher) warns() []storage.Warnings { - var warns []storage.Warnings +func (q *querierResponseCatcher) warns() annotations.Annotations { + var warns annotations.Annotations for _, r := range q.resp { - warns = append(warns, r.Warnings()) + warns.Merge(r.Warnings()) } return warns } @@ -1080,7 +1078,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 100 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) @@ -1150,7 +1148,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 5 * time.Second g := gate.New(2) - q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) diff --git a/pkg/query/query_bench_test.go b/pkg/query/query_bench_test.go index 8de127bab56..48a9d7673da 100644 --- a/pkg/query/query_bench_test.go +++ b/pkg/query/query_bench_test.go @@ -84,7 +84,6 @@ func benchQuerySelect(t testutil.TB, totalSamples, totalSeries int, dedup bool) logger := log.NewNopLogger() q := newQuerier( - context.Background(), logger, math.MinInt64, math.MaxInt64, @@ -129,8 +128,9 @@ func testSelect(t testutil.TB, q *querier, expectedSeries []labels.Labels) { t.Run("select", func(t testutil.TB) { t.ResetTimer() + ctx := context.Background() for i := 0; i < t.N(); i++ { - ss := q.Select(true, nil, &labels.Matcher{Value: "foo", Name: "bar", Type: labels.MatchEqual}) + ss := q.Select(ctx, true, nil, &labels.Matcher{Value: "foo", Name: "bar", Type: labels.MatchEqual}) testutil.Ok(t, ss.Err()) testutil.Equals(t, 0, len(ss.Warnings())) diff --git a/pkg/query/remote_engine.go b/pkg/query/remote_engine.go index 4655023d70b..a7dc7fd4330 100644 --- a/pkg/query/remote_engine.go +++ b/pkg/query/remote_engine.go @@ -16,10 +16,10 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" - "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/stats" - "github.com/thanos-io/promql-engine/api" + "github.com/thanos-io/promql-engine/api" "github.com/thanos-io/thanos/pkg/api/query/querypb" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -237,7 +237,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { var ( result = make(promql.Matrix, 0) - warnings storage.Warnings + warnings annotations.Annotations ) for { @@ -250,7 +250,7 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { } if warn := msg.GetWarnings(); warn != "" { - warnings = append(warnings, errors.New(warn)) + warnings.Add(errors.New(warn)) continue } diff --git a/pkg/query/test_test.go b/pkg/query/test_test.go index abc35b82e4b..612d44f0f8c 100644 --- a/pkg/query/test_test.go +++ b/pkg/query/test_test.go @@ -21,6 +21,7 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/teststorage" ) @@ -325,7 +326,7 @@ func ParseEval(lines []string, i int) (int, *evalCmd, error) { if err != nil { if perr, ok := err.(*parser.ParseErr); ok { perr.LineOffset = i - posOffset := parser.Pos(strings.Index(lines[i], expr)) + posOffset := posrange.Pos(strings.Index(lines[i], expr)) perr.PositionRange.Start += posOffset perr.PositionRange.End += posOffset perr.Query = lines[i] diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 509e9f35351..240e3e0dbfd 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -731,9 +731,9 @@ func (s *ReadyStorage) StartTime() (int64, error) { } // Querier implements the Storage interface. -func (s *ReadyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { +func (s *ReadyStorage) Querier(mint, maxt int64) (storage.Querier, error) { if x := s.get(); x != nil { - return x.Querier(ctx, mint, maxt) + return x.Querier(mint, maxt) } return nil, ErrNotReady } @@ -772,8 +772,8 @@ func (a adapter) StartTime() (int64, error) { return 0, errors.New("not implemented") } -func (a adapter) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return a.db.Querier(ctx, mint, maxt) +func (a adapter) Querier(mint, maxt int64) (storage.Querier, error) { + return a.db.Querier(mint, maxt) } func (a adapter) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { diff --git a/pkg/rules/manager_test.go b/pkg/rules/manager_test.go index 1814f18d358..5a80f39124c 100644 --- a/pkg/rules/manager_test.go +++ b/pkg/rules/manager_test.go @@ -59,7 +59,7 @@ func (n nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.M type nopQueryable struct{} -func (n nopQueryable) Querier(_ context.Context, _, _ int64) (storage.Querier, error) { +func (n nopQueryable) Querier(_, _ int64) (storage.Querier, error) { return storage.NoopQuerier(), nil } diff --git a/pkg/rules/queryable.go b/pkg/rules/queryable.go index 399a27cd259..d178df7ba1c 100644 --- a/pkg/rules/queryable.go +++ b/pkg/rules/queryable.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/thanos/internal/cortex/querier/series" "github.com/thanos-io/thanos/pkg/httpconfig" @@ -36,7 +37,6 @@ type promClientsQueryable struct { duplicatedQuery prometheus.Counter } type promClientsQuerier struct { - ctx context.Context mint, maxt int64 step int64 httpMethod string @@ -66,9 +66,8 @@ func NewPromClientsQueryable(logger log.Logger, queryClients []*httpconfig.Clien } // Querier returns a new Querier for the given time range. -func (q *promClientsQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { +func (q *promClientsQueryable) Querier(mint, maxt int64) (storage.Querier, error) { return &promClientsQuerier{ - ctx: ctx, mint: mint, maxt: maxt, step: int64(q.step / time.Second), @@ -81,14 +80,14 @@ func (q *promClientsQueryable) Querier(ctx context.Context, mint, maxt int64) (s } // Select implements storage.Querier interface. -func (q *promClientsQuerier) Select(_ bool, _ *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { +func (q *promClientsQuerier) Select(ctx context.Context, _ bool, _ *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { query := storepb.PromMatchersToString(matchers...) for _, i := range rand.Perm(len(q.queryClients)) { promClient := q.promClients[i] endpoints := RemoveDuplicateQueryEndpoints(q.logger, q.duplicatedQuery, q.queryClients[i].Endpoints()) for _, i := range rand.Perm(len(endpoints)) { - m, warns, _, err := promClient.QueryRange(q.ctx, endpoints[i], query, q.mint, q.maxt, q.step, promclient.QueryOptions{ + m, warns, _, err := promClient.QueryRange(ctx, endpoints[i], query, q.mint, q.maxt, q.step, promclient.QueryOptions{ Deduplicate: true, Method: q.httpMethod, }) @@ -119,12 +118,12 @@ func (q *promClientsQuerier) Select(_ bool, _ *storage.SelectHints, matchers ... } // LabelValues implements storage.LabelQuerier interface. -func (q *promClientsQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { +func (q *promClientsQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } // LabelNames implements storage.LabelQuerier interface. -func (q *promClientsQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { +func (q *promClientsQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { return nil, nil, nil } diff --git a/pkg/rules/rules.go b/pkg/rules/rules.go index 5fd3dc01e81..ab539a41025 100644 --- a/pkg/rules/rules.go +++ b/pkg/rules/rules.go @@ -13,7 +13,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" - "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/tracing" @@ -24,7 +24,7 @@ var _ UnaryClient = &GRPCClient{} // UnaryClient is gRPC rulespb.Rules client which expands streaming rules API. Useful for consumers that does not // support streaming. type UnaryClient interface { - Rules(ctx context.Context, req *rulespb.RulesRequest) (*rulespb.RuleGroups, storage.Warnings, error) + Rules(ctx context.Context, req *rulespb.RulesRequest) (*rulespb.RuleGroups, annotations.Annotations, error) } // GRPCClient allows to retrieve rules from local gRPC streaming server implementation. @@ -51,7 +51,7 @@ func NewGRPCClientWithDedup(rs rulespb.RulesServer, replicaLabels []string) *GRP return c } -func (rr *GRPCClient) Rules(ctx context.Context, req *rulespb.RulesRequest) (*rulespb.RuleGroups, storage.Warnings, error) { +func (rr *GRPCClient) Rules(ctx context.Context, req *rulespb.RulesRequest) (*rulespb.RuleGroups, annotations.Annotations, error) { span, ctx := tracing.StartSpan(ctx, "rules_request") defer span.Finish() @@ -222,7 +222,7 @@ type rulesServer struct { rulespb.Rules_RulesServer ctx context.Context - warnings []error + warnings annotations.Annotations groups []*rulespb.RuleGroup mu sync.Mutex } @@ -231,7 +231,7 @@ func (srv *rulesServer) Send(res *rulespb.RulesResponse) error { if res.GetWarning() != "" { srv.mu.Lock() defer srv.mu.Unlock() - srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) + srv.warnings.Add(errors.New(res.GetWarning())) return nil } diff --git a/pkg/rules/rules_test.go b/pkg/rules/rules_test.go index 721da26c10c..921d4b7cc5a 100644 --- a/pkg/rules/rules_test.go +++ b/pkg/rules/rules_test.go @@ -10,11 +10,10 @@ import ( "testing" "time" + "github.com/efficientgo/core/testutil" "github.com/gogo/protobuf/proto" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - - "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -151,7 +150,7 @@ func testRulesAgainstExamples(t *testing.T, dir string, server rulespb.RulesServ groups, w, err := NewGRPCClientWithDedup(server, nil).Rules(context.Background(), &rulespb.RulesRequest{ Type: tcase.requestedType, }) - testutil.Equals(t, storage.Warnings(nil), w) + testutil.Equals(t, annotations.Annotations(nil), w) if tcase.expectedErr != nil { testutil.NotOk(t, err) testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index c22c27bf3c1..392c0623f5e 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -705,8 +705,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset } testutil.Ok(t, err) - testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { - return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) int { + return labels.Compare(x.PromLabels(), y.PromLabels()) })) receivedLabels := make([]labels.Labels, 0) @@ -723,6 +723,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset func TestBucketStore_Acceptance(t *testing.T) { t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) + ctx := context.Background() for _, lazyExpandedPosting := range []bool{false, true} { testStoreAPIsAcceptance(t, func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { @@ -757,7 +758,7 @@ func TestBucketStore_Acceptance(t *testing.T) { auxBlockDir := filepath.Join(auxDir, id.String()) meta, err := metadata.ReadFromDir(auxBlockDir) testutil.Ok(t, err) - stats, err := block.GatherIndexHealthStats(logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime) + stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime) testutil.Ok(t, err) _, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{ Labels: extLset.Map(), @@ -767,8 +768,8 @@ func TestBucketStore_Acceptance(t *testing.T) { }, nil) testutil.Ok(tt, err) - testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc)) - testutil.Ok(tt, block.Upload(context.Background(), logger, bkt, auxBlockDir, metadata.NoneFunc)) + testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc)) + testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc)) chunkPool, err := NewDefaultChunkBytesPool(2e5) testutil.Ok(tt, err) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 01f32f43664..575db245a48 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1174,7 +1174,7 @@ OUTER: continue } - if err := b.indexr.LookupLabelsSymbols(b.symbolizedLset, &b.lset); err != nil { + if err := b.indexr.LookupLabelsSymbols(b.ctx, b.symbolizedLset, &b.lset); err != nil { return errors.Wrap(err, "Lookup labels symbols") } @@ -2665,8 +2665,8 @@ func matchersToPostingGroups(ctx context.Context, lvalsFn func(name string) ([]s mergedPG.matchers = newSortedMatchers(matchers) pgs = append(pgs, mergedPG) } - slices.SortFunc(pgs, func(a, b *postingGroup) bool { - return a.name < b.name + slices.SortFunc(pgs, func(a, b *postingGroup) int { + return strings.Compare(a.name, b.name) }) return pgs, nil } @@ -3222,14 +3222,14 @@ func (r *bucketIndexReader) Close() error { } // LookupLabelsSymbols allows populates label set strings from symbolized label set. -func (r *bucketIndexReader) LookupLabelsSymbols(symbolized []symbolizedLabel, lbls *labels.Labels) error { +func (r *bucketIndexReader) LookupLabelsSymbols(ctx context.Context, symbolized []symbolizedLabel, lbls *labels.Labels) error { *lbls = (*lbls)[:0] for _, s := range symbolized { - ln, err := r.dec.LookupSymbol(s.name) + ln, err := r.dec.LookupSymbol(ctx, s.name) if err != nil { return errors.Wrap(err, "lookup label name") } - lv, err := r.dec.LookupSymbol(s.value) + lv, err := r.dec.LookupSymbol(ctx, s.value) if err != nil { return errors.Wrap(err, "lookup label value") } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c77c9f9f915..3680975c3c5 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -986,6 +986,7 @@ func expectedTouchedBlockOps(all, expected, cached []ulid.ULID) []string { // Regression tests against: https://github.com/thanos-io/thanos/issues/1983. func TestReadIndexCache_LoadSeries(t *testing.T) { bkt := objstore.NewInMemBucket() + ctx := context.Background() s := newBucketStoreMetrics(nil) b := &bucketBlock{ @@ -1018,7 +1019,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { } // Success with no refetches. - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) + testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 2, 100, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), @@ -1028,7 +1029,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { // Success with 2 refetches. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) + testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 2, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), 13: []byte("bbbbbbbbbb"), @@ -1038,7 +1039,7 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { // Success with refetch on first element. r.loadedSeries = map[storage.SeriesRef][]byte{} - testutil.Ok(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) + testutil.Ok(t, r.loadSeries(ctx, []storage.SeriesRef{2}, false, 2, 5, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) testutil.Equals(t, map[storage.SeriesRef][]byte{ 2: []byte("aaaaaaaaaa"), }, r.loadedSeries) @@ -1049,10 +1050,10 @@ func TestReadIndexCache_LoadSeries(t *testing.T) { buf.PutByte(0) buf.PutUvarint(10) buf.PutString("aaaaaaa") - testutil.Ok(t, bkt.Upload(context.Background(), filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get()))) + testutil.Ok(t, bkt.Upload(ctx, filepath.Join(b.meta.ULID.String(), block.IndexFilename), bytes.NewReader(buf.Get()))) // Fail, but no recursion at least. - testutil.NotOk(t, r.loadSeries(context.TODO(), []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) + testutil.NotOk(t, r.loadSeries(ctx, []storage.SeriesRef{2, 13, 24}, false, 1, 15, NewBytesLimiterFactory(0)(nil), tenancy.DefaultTenant)) } func TestBucketIndexReader_ExpandedPostings(t *testing.T) { @@ -1109,7 +1110,7 @@ func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series in bdir := filepath.Join(dir, id.String()) meta, err := metadata.ReadFromDir(bdir) testutil.Ok(t, err) - stats, err := block.GatherIndexHealthStats(logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) testutil.Ok(t, err) _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, "tmp", id.String()), metadata.Thanos{ @@ -1369,7 +1370,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk, blockIDDir := filepath.Join(blockDir, id.String()) meta, err := metadata.ReadFromDir(blockIDDir) testutil.Ok(t, err) - stats, err := block.GatherIndexHealthStats(logger, filepath.Join(blockIDDir, block.IndexFilename), meta.MinTime, meta.MaxTime) + stats, err := block.GatherIndexHealthStats(context.TODO(), logger, filepath.Join(blockIDDir, block.IndexFilename), meta.MinTime, meta.MaxTime) testutil.Ok(t, err) thanosMeta := metadata.Thanos{ Labels: extLset.Map(), @@ -2688,7 +2689,7 @@ func prepareBucket(b *testing.B, resolutionLevel compact.ResolutionLevel) (*buck if resolutionLevel > 0 { // Downsample newly-created block. - blockID, err = downsample.Downsample(logger, blockMeta, head, tmpDir, int64(resolutionLevel)) + blockID, err = downsample.Downsample(ctx, logger, blockMeta, head, tmpDir, int64(resolutionLevel)) testutil.Ok(b, err) blockMeta, err = metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) testutil.Ok(b, err) @@ -3483,8 +3484,8 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { }, }, srv)) - testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { - return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) int { + return labels.Compare(x.PromLabels(), y.PromLabels()) })) testutil.Equals(t, 2, len(srv.SeriesSet)) } diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index e6cadfbea9d..6be9c544c5b 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -68,11 +68,11 @@ func (r *resortingServer) Send(response *storepb.SeriesResponse) error { } func (r *resortingServer) Flush() error { - slices.SortFunc(r.series, func(a, b *storepb.Series) bool { + slices.SortFunc(r.series, func(a, b *storepb.Series) int { return labels.Compare( labelpb.ZLabelsToPromLabels(a.Labels), labelpb.ZLabelsToPromLabels(b.Labels), - ) < 0 + ) }) for _, response := range r.series { if err := r.Store_SeriesServer.Send(storepb.NewSeriesResponse(response)); err != nil { diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index 4fb4a155f96..146e7b0eec5 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -6,6 +6,7 @@ package store import ( "context" "math" + "strings" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -67,11 +68,11 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups pg.cardinality += (r.End - r.Start - 4) / 4 } } - slices.SortFunc(postingGroups, func(a, b *postingGroup) bool { + slices.SortFunc(postingGroups, func(a, b *postingGroup) int { if a.cardinality == b.cardinality { - return a.name < b.name + return strings.Compare(a.name, b.name) } - return a.cardinality < b.cardinality + return int(a.cardinality - b.cardinality) }) /* @@ -251,7 +252,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post postingIndex++ } - groupAdds = append(groupAdds, index.Merge(toMerge...)) + groupAdds = append(groupAdds, index.Merge(ctx, toMerge...)) } for _, l := range g.removeKeys { @@ -260,7 +261,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post } } - result := index.Without(index.Intersect(groupAdds...), index.Merge(groupRemovals...)) + result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...)) if ctx.Err() != nil { return nil, nil, ctx.Err() diff --git a/pkg/store/lazy_postings_test.go b/pkg/store/lazy_postings_test.go index 7b17a59ec6a..69bfbece61d 100644 --- a/pkg/store/lazy_postings_test.go +++ b/pkg/store/lazy_postings_test.go @@ -241,7 +241,9 @@ func (h *mockIndexHeaderReader) PostingsOffset(name string, value string) (index return index.Range{}, nil } -func (h *mockIndexHeaderReader) LookupSymbol(o uint32) (string, error) { return "", nil } +func (h *mockIndexHeaderReader) LookupSymbol(ctx context.Context, o uint32) (string, error) { + return "", nil +} func (h *mockIndexHeaderReader) LabelValues(name string) ([]string, error) { return nil, nil } diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index 68458245a8b..fd623594613 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -89,18 +89,19 @@ func TestDiffVarintCodec(t *testing.T) { testutil.Ok(t, idx.Close()) }() + ctx := context.TODO() postingsMap := map[string]index.Postings{ - "all": allPostings(t, idx), - `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+storetestutil.LabelLongSuffix)), - `j="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "j", "foo")), - `j!="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")), - `i=~".*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".*")), - `i=~".+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")), - `i=~"1.+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), - `i=~"^$"'`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), - `i!~""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), - `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+storetestutil.LabelLongSuffix)), - `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), + "all": allPostings(ctx, t, idx), + `n="1"`: matchPostings(ctx, t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+storetestutil.LabelLongSuffix)), + `j="foo"`: matchPostings(ctx, t, idx, labels.MustNewMatcher(labels.MatchEqual, "j", "foo")), + `j!="foo"`: matchPostings(ctx, t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")), + `i=~".*"`: matchPostings(ctx, t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".*")), + `i=~".+"`: matchPostings(ctx, t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")), + `i=~"1.+"`: matchPostings(ctx, t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), + `i=~"^$"'`: matchPostings(ctx, t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), + `i!~""`: matchPostings(ctx, t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), + `n!="2"`: matchPostings(ctx, t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+storetestutil.LabelLongSuffix)), + `i!~"2.*"`: matchPostings(ctx, t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), } codecs := map[string]struct { @@ -167,15 +168,15 @@ func comparePostings(t *testing.T, p1, p2 index.Postings) { testutil.Ok(t, p2.Err()) } -func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { +func allPostings(ctx context.Context, t testing.TB, ix tsdb.IndexReader) index.Postings { k, v := index.AllPostingsKey() - p, err := ix.Postings(k, v) + p, err := ix.Postings(ctx, k, v) testutil.Ok(t, err) return p } -func matchPostings(t testing.TB, ix tsdb.IndexReader, m *labels.Matcher) index.Postings { - vals, err := ix.LabelValues(m.Name) +func matchPostings(ctx context.Context, t testing.TB, ix tsdb.IndexReader, m *labels.Matcher) index.Postings { + vals, err := ix.LabelValues(ctx, m.Name) testutil.Ok(t, err) matching := []string(nil) @@ -185,7 +186,7 @@ func matchPostings(t testing.TB, ix tsdb.IndexReader, m *labels.Matcher) index.P } } - p, err := ix.Postings(m.Name, matching...) + p, err := ix.Postings(ctx, m.Name, matching...) testutil.Ok(t, err) return p } diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index b5d2fc27504..ffc69d1cc6c 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -40,9 +40,9 @@ const ( LabelLongSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" ) -func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { +func allPostings(ctx context.Context, t testing.TB, ix tsdb.IndexReader) index.Postings { k, v := index.AllPostingsKey() - p, err := ix.Postings(k, v) + p, err := ix.Postings(ctx, k, v) testutil.Ok(t, err) return p } @@ -145,7 +145,7 @@ func ReadSeriesFromBlock(t testing.TB, h tsdb.BlockReader, extLabels labels.Labe var builder labels.ScratchBuilder - all := allPostings(t, ir) + all := allPostings(context.TODO(), t, ir) for all.Next() { testutil.Ok(t, ir.Series(all.At(), &builder, &chunkMetas)) lset = builder.Labels() @@ -197,7 +197,7 @@ func appendFloatSamples(t testing.TB, app storage.Appender, tsLabel int, opts He func appendHistogramSamples(t testing.TB, app storage.Appender, tsLabel int, opts HeadGenOptions) { sample := &histogram.Histogram{ Schema: 0, - Count: 9, + Count: 20, Sum: -3.1415, ZeroCount: 12, ZeroThreshold: 0.001, diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index b5182f30081..3f3eaadfc96 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -184,7 +184,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error()) } - q, err := s.db.ChunkQuerier(context.Background(), r.MinTime, r.MaxTime) + q, err := s.db.ChunkQuerier(r.MinTime, r.MaxTime) if err != nil { return status.Error(codes.Internal, err.Error()) } @@ -195,7 +195,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb chunk querier series") } - set := q.Select(true, nil, matchers...) + set := q.Select(srv.Context(), true, nil, matchers...) shardMatcher := r.ShardInfo.Matcher(&s.buffers) defer shardMatcher.Close() @@ -297,13 +297,13 @@ func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest return &storepb.LabelNamesResponse{Names: nil}, nil } - q, err := s.db.ChunkQuerier(ctx, r.Start, r.End) + q, err := s.db.ChunkQuerier(r.Start, r.End) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label names") - res, _, err := q.LabelNames(matchers...) + res, _, err := q.LabelNames(ctx, matchers...) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -348,13 +348,13 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return &storepb.LabelValuesResponse{Values: []string{v}}, nil } - q, err := s.db.ChunkQuerier(ctx, r.Start, r.End) + q, err := s.db.ChunkQuerier(r.Start, r.End) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label values") - res, _, err := q.LabelValues(r.Label, matchers...) + res, _, err := q.LabelValues(ctx, r.Label, matchers...) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/targets/prometheus_test.go b/pkg/targets/prometheus_test.go index a2f761441a7..2564c93d37e 100644 --- a/pkg/targets/prometheus_test.go +++ b/pkg/targets/prometheus_test.go @@ -11,13 +11,12 @@ import ( "testing" "time" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - - "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -136,7 +135,7 @@ scrape_configs: targets, w, err := grpcClient.Targets(context.Background(), &targetspb.TargetsRequest{ State: tcase.requestedState, }) - testutil.Equals(t, storage.Warnings(nil), w) + testutil.Equals(t, annotations.Annotations(nil), w) if tcase.expectedErr != nil { testutil.NotOk(t, err) testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) diff --git a/pkg/targets/targets.go b/pkg/targets/targets.go index 9a1b4016c31..8dfe9431f77 100644 --- a/pkg/targets/targets.go +++ b/pkg/targets/targets.go @@ -9,7 +9,8 @@ import ( "sync" "github.com/pkg/errors" - "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/targets/targetspb" ) @@ -19,7 +20,7 @@ var _ UnaryClient = &GRPCClient{} // UnaryClient is gRPC targetspb.Targets client which expands streaming targets API. Useful for consumers that does not // support streaming. type UnaryClient interface { - Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error) + Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, annotations.Annotations, error) } // GRPCClient allows to retrieve targets from local gRPC streaming server implementation. @@ -46,7 +47,7 @@ func NewGRPCClientWithDedup(ts targetspb.TargetsServer, replicaLabels []string) return c } -func (rr *GRPCClient) Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error) { +func (rr *GRPCClient) Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, annotations.Annotations, error) { resp := &targetsServer{ctx: ctx, targets: &targetspb.TargetDiscovery{ ActiveTargets: make([]*targetspb.ActiveTarget, 0), DroppedTargets: make([]*targetspb.DroppedTarget, 0), @@ -184,7 +185,7 @@ type targetsServer struct { targetspb.Targets_TargetsServer ctx context.Context - warnings []error + warnings annotations.Annotations targets *targetspb.TargetDiscovery mu sync.Mutex } @@ -193,7 +194,7 @@ func (srv *targetsServer) Send(res *targetspb.TargetsResponse) error { if res.GetWarning() != "" { srv.mu.Lock() defer srv.mu.Unlock() - srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) + srv.warnings.Add(errors.New(res.GetWarning())) return nil } diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index 9da879de823..96b2f69a459 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -556,7 +556,7 @@ func createBlock( blockDir := filepath.Join(dir, id.String()) logger := log.NewNopLogger() - seriesSize, err := gatherMaxSeriesSize(filepath.Join(blockDir, "index")) + seriesSize, err := gatherMaxSeriesSize(ctx, filepath.Join(blockDir, "index")) if err != nil { return id, errors.Wrap(err, "gather max series size") } @@ -605,14 +605,15 @@ func createBlock( return id, nil } -func gatherMaxSeriesSize(fn string) (int64, error) { +func gatherMaxSeriesSize(ctx context.Context, fn string) (int64, error) { r, err := index.NewFileReader(fn) if err != nil { return 0, errors.Wrap(err, "open index file") } defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader") - p, err := r.Postings(index.AllPostingsKey()) + key, value := index.AllPostingsKey() + p, err := r.Postings(ctx, key, value) if err != nil { return 0, errors.Wrap(err, "get all postings") } diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index 892aa037411..d0e91d1c1b2 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -97,6 +97,7 @@ func repairIndex(stats block.HealthStats, ctx Context, id ulid.ULID, meta *metad level.Info(ctx.Logger).Log("msg", "repairing block", "id", id, "issue") resid, err := block.Repair( + ctx, ctx.Logger, dir, id, @@ -110,7 +111,7 @@ func repairIndex(stats block.HealthStats, ctx Context, id ulid.ULID, meta *metad } level.Info(ctx.Logger).Log("msg", "verifying repaired block", "id", id, "newID", resid) - if err := block.VerifyIndex(ctx.Logger, filepath.Join(dir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { + if err := block.VerifyIndex(ctx, ctx.Logger, filepath.Join(dir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { return errors.Wrapf(err, "repaired block is invalid %s", resid) } @@ -132,7 +133,7 @@ func verifyIndex(ctx Context, id ulid.ULID, dir string, meta *metadata.Meta) (st return stats, errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename)) } - stats, err = block.GatherIndexHealthStats(ctx.Logger, filepath.Join(dir, block.IndexFilename), meta.MinTime, meta.MaxTime) + stats, err = block.GatherIndexHealthStats(ctx, ctx.Logger, filepath.Join(dir, block.IndexFilename), meta.MinTime, meta.MaxTime) if err != nil { return stats, errors.Wrapf(err, "gather index issues %s", id) } diff --git a/test/e2e/native_histograms_test.go b/test/e2e/native_histograms_test.go index 48d1f256391..a8c5a3080cc 100644 --- a/test/e2e/native_histograms_test.go +++ b/test/e2e/native_histograms_test.go @@ -71,7 +71,7 @@ func TestQueryNativeHistograms(t *testing.T) { t.Run("query histogram using histogram_count fn and deduplication", func(t *testing.T) { queryAndAssert(t, ctx, querier.Endpoint("http"), func() string { return fmt.Sprintf("histogram_count(%v)", testHistogramMetricName) }, ts, promclient.QueryOptions{Deduplicate: true}, model.Vector{ &model.Sample{ - Value: 34, + Value: 39, Metric: model.Metric{ "foo": "bar", "prometheus": "prom-ha", diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 042e053f65d..7d0c6d15a11 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/util/annotations" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -1531,8 +1532,10 @@ func simpleInstantQuery(t testing.TB, ctx context.Context, addr string, q func() return nil, nil, err } - if len(warnings) > 0 { - return nil, nil, errors.Errorf("unexpected warnings %s", warnings) + for _, w := range warnings { + if !(strings.HasPrefix(w, annotations.PromQLInfo.Error()) || strings.HasPrefix(w, annotations.PromQLWarning.Error())) { + return nil, nil, errors.Errorf("unexpected warnings %s", warnings) + } } if len(res) != expectedSeriesLen { @@ -1798,7 +1801,7 @@ func storeWriteRequest(ctx context.Context, rawRemoteWriteURL string, req *promp } compressed := snappy.Encode(buf, pBuf.Bytes()) - return client.Store(ctx, compressed) + return client.Store(ctx, compressed, 0) } func TestSidecarQueryEvaluationWithDedup(t *testing.T) {