Skip to content

Commit

Permalink
Merge pull request #85 from vinted/add_e2e_test
Browse files Browse the repository at this point in the history
receive/handler: properly split for tenant label
  • Loading branch information
GiedriusS committed Feb 2, 2024
2 parents a81df7a + 8ff399c commit 0ad442f
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 16 deletions.
38 changes: 24 additions & 14 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,6 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) e
maxBufferedResponses := len(localWrites) + len(remoteWrites)
responses := make(chan writeResponse, maxBufferedResponses)
wg := sync.WaitGroup{}
wg.Add(len(remoteWrites))

h.sendWrites(ctx, &wg, params, localWrites, remoteWrites, responses)

Expand Down Expand Up @@ -763,11 +762,11 @@ func (h *Handler) distributeTimeseriesToReplicas(
tenantHTTP string,
replicas []uint64,
timeseries []prompb.TimeSeries,
) (map[endpointReplica]trackedSeries, map[endpointReplica]trackedSeries, error) {
) (map[endpointReplica]map[string]trackedSeries, map[endpointReplica]map[string]trackedSeries, error) {
h.mtx.RLock()
defer h.mtx.RUnlock()
remoteWrites := make(map[endpointReplica]trackedSeries)
localWrites := make(map[endpointReplica]trackedSeries)
remoteWrites := make(map[endpointReplica]map[string]trackedSeries)
localWrites := make(map[endpointReplica]map[string]trackedSeries)
for tsIndex, ts := range timeseries {
var tenant = tenantHTTP

Expand Down Expand Up @@ -796,14 +795,19 @@ func (h *Handler) distributeTimeseriesToReplicas(
}
writeableSeries, ok := writeDestination[endpointReplica]
if !ok {
writeableSeries = trackedSeries{
seriesIDs: make([]int, 0),
timeSeries: make([]prompb.TimeSeries, 0),
writeDestination[endpointReplica] = map[string]trackedSeries{
tenant: {
seriesIDs: make([]int, 0),
timeSeries: make([]prompb.TimeSeries, 0),
},
}
}
writeableSeries.timeSeries = append(writeDestination[endpointReplica].timeSeries, ts)
writeableSeries.seriesIDs = append(writeDestination[endpointReplica].seriesIDs, tsIndex)
writeDestination[endpointReplica] = writeableSeries
tenantSeries := writeableSeries[tenant]

tenantSeries.timeSeries = append(tenantSeries.timeSeries, ts)
tenantSeries.seriesIDs = append(tenantSeries.seriesIDs, tsIndex)

writeDestination[endpointReplica][tenant] = tenantSeries
}
}
return localWrites, remoteWrites, nil
Expand All @@ -815,20 +819,26 @@ func (h *Handler) sendWrites(
ctx context.Context,
wg *sync.WaitGroup,
params remoteWriteParams,
localWrites map[endpointReplica]trackedSeries,
remoteWrites map[endpointReplica]trackedSeries,
localWrites map[endpointReplica]map[string]trackedSeries,
remoteWrites map[endpointReplica]map[string]trackedSeries,
responses chan writeResponse,
) {
// Do the writes to the local node first. This should be easy and fast.
for writeDestination := range localWrites {
func(writeDestination endpointReplica) {
h.sendLocalWrite(ctx, writeDestination, params.tenantHTTP, localWrites[writeDestination], responses)
for tenant, trackedSeries := range localWrites[writeDestination] {
h.sendLocalWrite(ctx, writeDestination, tenant, trackedSeries, responses)
}
}(writeDestination)
}

// Do the writes to remote nodes. Run them all in parallel.
for writeDestination := range remoteWrites {
h.sendRemoteWrite(ctx, params.tenantHTTP, writeDestination, remoteWrites[writeDestination], params.alreadyReplicated, responses, wg)
for tenant, trackedSeries := range remoteWrites[writeDestination] {
wg.Add(1)

h.sendRemoteWrite(ctx, tenant, writeDestination, trackedSeries, params.alreadyReplicated, responses, wg)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1705,8 +1705,8 @@ func TestDistributeSeries(t *testing.T) {
},
)
require.NoError(t, err)
require.Equal(t, 1, labelpb.ZLabelsToPromLabels(remote[endpointReplica{endpoint: "http://localhost:9090", replica: 0}].timeSeries[0].Labels).Len())
require.Equal(t, 1, labelpb.ZLabelsToPromLabels(remote[endpointReplica{endpoint: "http://localhost:9090", replica: 0}].timeSeries[1].Labels).Len())
require.Equal(t, 1, labelpb.ZLabelsToPromLabels(remote[endpointReplica{endpoint: "http://localhost:9090", replica: 0}]["bar"].timeSeries[0].Labels).Len())
require.Equal(t, 1, labelpb.ZLabelsToPromLabels(remote[endpointReplica{endpoint: "http://localhost:9090", replica: 0}]["boo"].timeSeries[0].Labels).Len())
require.Equal(t, map[string]struct{}{"bar": {}, "boo": {}}, hr.seenTenants)
}

Expand Down
46 changes: 46 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@ import (
"log"
"net/http"
"net/http/httputil"
"os"
"testing"
"time"

"github.com/efficientgo/core/backoff"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
e2emon "github.com/efficientgo/e2e/monitoring"
"github.com/efficientgo/e2e/monitoring/matchers"
logkit "github.com/go-kit/log"
"github.com/prometheus/common/model"

"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/require"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)

Expand Down Expand Up @@ -968,3 +975,42 @@ test_metric{a="2", b="2"} 1`)
})
})
}

func TestReceiveExtractsTenant(t *testing.T) {
e, err := e2e.NewDockerEnvironment("receive-extract")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

h := receive.HashringConfig{
Endpoints: []receive.Endpoint{
{Address: i.InternalEndpoint("grpc")},
},
}

r := e2ethanos.NewReceiveBuilder(e, "router").WithRouting(1, h).Init()
testutil.Ok(t, e2e.StartAndWaitReady(r))

q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

require.NoError(t, runutil.RetryWithLog(logkit.NewLogfmtLogger(os.Stdout), 1*time.Second, make(<-chan struct{}), func() error {
return storeWriteRequest(context.Background(), "http://"+r.Endpoint("remote-write")+"/api/v1/receive", &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "thanos_tenant_id", Value: "tenant-1"},
{Name: "aa", Value: "bb"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: time.Now().UnixMilli()},
},
},
},
})
}))

testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "tenant-1")), e2emon.WaitMissingMetrics()))
}

0 comments on commit 0ad442f

Please sign in to comment.