Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Parrallel import #1588

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 0 additions & 1 deletion pkg/gateway/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (d *DelegateServer) Request(ctx context.Context, req *streamv1.DelegatedMes
"target", targetId,
"request", req.GetRequest().QualifiedMethodName(),
)
lg.Debug("delegating rpc request")
target, ok := d.activeAgents[targetId]
if ok {
fwdResp := &totem.RPC{}
Expand Down
24 changes: 24 additions & 0 deletions plugins/metrics/pkg/agent/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package agent

import "context"

type Buffer[T any] interface {
// Add blo cks until the value can be added to the buffer.
Add(context.Context, T) error

// Get blocks until a value can be retrieved from the buffer.
Get(context.Context) (T, error)
}

type memoryBuffer[T any] struct {
ch chan T
}

func (b memoryBuffer[T]) Add(_ context.Context, t T) error {
b.ch <- t
return nil
}

func (b memoryBuffer[T]) Get(_ context.Context) (T, error) {
return <-b.ch, nil
}
2 changes: 0 additions & 2 deletions plugins/metrics/pkg/agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package agent
import (
"context"
"fmt"
"net/http"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -68,7 +67,6 @@ func NewMetricsNode(ct health.ConditionTracker, lg *zap.SugaredLogger) *MetricsN
targetRunner: NewTargetRunner(lg),
}
mn.conditions.AddListener(mn.sendHealthUpdate)
mn.targetRunner.SetRemoteReaderClient(NewRemoteReader(&http.Client{}))

// FIXME: this is a hack, update the old sync code to use delegates instead
mn.conditions.AddListener(func(key string) {
Expand Down
217 changes: 131 additions & 86 deletions plugins/metrics/pkg/agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
Expand All @@ -15,8 +16,10 @@ import (
"github.com/rancher/opni/pkg/storage"
"github.com/rancher/opni/pkg/task"
"github.com/rancher/opni/pkg/util"
"github.com/rancher/opni/pkg/util/waitctx"
"github.com/rancher/opni/plugins/metrics/apis/remoteread"
"github.com/rancher/opni/plugins/metrics/apis/remotewrite"
"github.com/samber/lo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -54,16 +57,6 @@ func toLabelMatchers(rrLabelMatchers []*remoteread.LabelMatcher) []*prompb.Label
return pbLabelMatchers
}

func dereferenceResultTimeseries(in []*prompb.TimeSeries) []prompb.TimeSeries {
dereferenced := make([]prompb.TimeSeries, 0, len(in))

for _, ref := range in {
dereferenced = append(dereferenced, *ref)
}

return dereferenced
}

func getMessageFromTaskLogs(logs []*corev1.LogEntry) string {
if len(logs) == 0 {
return ""
Expand All @@ -84,6 +77,15 @@ type TargetRunMetadata struct {
Query *remoteread.Query
}

// todo: could probably find a better name for this
type WriteMetadata struct {
Query *prompb.Query
WriteChunk *prompb.WriteRequest

// ProgressRatio is the ratio of the progress of this chunk to the total progress of the original request.
ProgressRatio float64
}

type targetStore struct {
innerMu sync.RWMutex
inner map[string]*corev1.TaskStatus
Expand Down Expand Up @@ -130,9 +132,6 @@ func (store *targetStore) ListKeys(_ context.Context, prefix string) ([]string,
type taskRunner struct {
remoteWriteClient clients.Locker[remotewrite.RemoteWriteClient]

remoteReaderMu sync.RWMutex
remoteReader RemoteReader

backoffPolicy backoff.Policy

logger *zap.SugaredLogger
Expand All @@ -154,13 +153,6 @@ func (tr *taskRunner) SetRemoteWriteClient(client clients.Locker[remotewrite.Rem
tr.remoteWriteClient = client
}

func (tr *taskRunner) SetRemoteReaderClient(client RemoteReader) {
tr.remoteReaderMu.Lock()
defer tr.remoteReaderMu.Unlock()

tr.remoteReader = client
}

func (tr *taskRunner) OnTaskPending(_ context.Context, _ task.ActiveTask) error {
return nil
}
Expand Down Expand Up @@ -203,96 +195,157 @@ func (tr *taskRunner) doPush(ctx context.Context, writeRequest *prompb.WriteRequ
}
}

func (tr *taskRunner) doRead(ctx context.Context, reader RemoteReader, run *TargetRunMetadata, readRequest *prompb.ReadRequest) (*prompb.ReadResponse, error) {
expbackoff := tr.backoffPolicy.Start(ctx)

for {
select {
case <-expbackoff.Done():
return nil, ctx.Err()
case <-expbackoff.Next():
readResponse, err := reader.Read(context.Background(), run.Target.Spec.Endpoint, readRequest)

if err == nil {
return readResponse, nil
}

tr.logger.With(
zap.Error(err),
"endpoint", run.Target.Spec.Endpoint,
).Warn("failed to read from target endpoint, retrying...")
}
}
}

func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveTask) error {
limit := util.DefaultWriteLimit()

b := &memoryBuffer[WriteMetadata]{
ch: make(chan WriteMetadata),
}

wc := waitctx.FromContext(ctx)

run := &TargetRunMetadata{}
activeTask.LoadTaskMetadata(run)

labelMatchers := toLabelMatchers(run.Query.Matchers)
var err error

importEnd := run.Query.EndTimestamp.AsTime().UnixMilli()
nextStart := run.Query.StartTimestamp.AsTime().UnixMilli()
nextEnd := nextStart
waitctx.Go(wc, func() {
labelMatchers := toLabelMatchers(run.Query.Matchers)

progressDelta := nextStart
importEnd := run.Query.EndTimestamp.AsTime().UnixMilli()
nextStart := run.Query.StartTimestamp.AsTime().UnixMilli()
nextEnd := nextStart

progress := &corev1.Progress{
Current: 0,
Total: uint64(importEnd - progressDelta),
}
reader := NewRemoteReader(&http.Client{})

activeTask.SetProgress(progress)
for err == nil && nextStart < importEnd {
select {
case <-wc.Done():
return
default: // continue reading
}

activeTask.AddLogEntry(zapcore.InfoLevel, "import running")
nextStart = nextEnd
nextEnd = nextStart + TimeDeltaMillis

for nextStart < importEnd {
select {
case <-ctx.Done():
activeTask.AddLogEntry(zapcore.InfoLevel, "import stopped")
return ctx.Err()
default: // continue with import
}
if nextStart >= importEnd {
break
}

nextStart = nextEnd
nextEnd = nextStart + TimeDeltaMillis
if nextEnd >= importEnd {
nextEnd = importEnd
}

if nextStart >= importEnd {
break
}
readRequest := &prompb.ReadRequest{
Queries: []*prompb.Query{
{
StartTimestampMs: nextStart,
EndTimestampMs: nextEnd,
Matchers: labelMatchers,
},
},
}

if nextEnd >= importEnd {
nextEnd = importEnd
}
var readResponse *prompb.ReadResponse

readRequest := &prompb.ReadRequest{
Queries: []*prompb.Query{
{
StartTimestampMs: nextStart,
EndTimestampMs: nextEnd,
Matchers: labelMatchers,
},
},
}
readResponse, err = tr.doRead(wc, reader, run, readRequest)
if err != nil {
return
}

readResponse, err := tr.remoteReader.Read(context.Background(), run.Target.Spec.Endpoint, readRequest)
writeRequest := &prompb.WriteRequest{
Timeseries: lo.Map(readResponse.Results[0].GetTimeseries(), func(t *prompb.TimeSeries, _ int) prompb.TimeSeries {
return lo.FromPtr(t)
}),
Metadata: []prompb.MetricMetadata{},
}

if err != nil {
activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("failed to read from target endpoint: %s", err))
return fmt.Errorf("failed to read from target endpoint: %w", err)
}
var chunks []*prompb.WriteRequest

for _, result := range readResponse.Results {
if len(result.Timeseries) == 0 {
continue
chunks, err = util.SplitChunksWithLimit(writeRequest, limit)
if err != nil {
return
}

writeRequest := prompb.WriteRequest{
Timeseries: dereferenceResultTimeseries(result.Timeseries),
activeTask.AddLogEntry(zapcore.InfoLevel, fmt.Sprintf("split request into %d chunks", len(chunks)))

lo.ForEach(chunks, func(chunk *prompb.WriteRequest, i int) {
if err := b.Add(wc, WriteMetadata{
Query: readRequest.Queries[0],
WriteChunk: chunk,
ProgressRatio: 1.0 / float64(len(chunks)),
joshmeranda marked this conversation as resolved.
Show resolved Hide resolved
}); err != nil {
activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not add chunk to buffer: %s", err.Error()))
}
})
}
})

waitctx.Go(wc, func() {
progress := &corev1.Progress{
Current: 0,
Total: uint64(run.Query.EndTimestamp.AsTime().UnixMilli() - run.Query.StartTimestamp.AsTime().UnixMilli()),
}
activeTask.SetProgress(progress)

for err == nil && progress.Current < progress.Total {
select {
case <-wc.Done():
return
default: // continue pushing
}

chunkedRequests, err := util.SplitChunksWithLimit(&writeRequest, limit)
var meta WriteMetadata

meta, err = b.Get(wc)
if err != nil {
return fmt.Errorf("failed to chunk request: %w", err)
activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not get chunk from buffer: %s", err.Error()))
continue
}

if len(chunkedRequests) > 1 {
activeTask.AddLogEntry(zapcore.DebugLevel, fmt.Sprintf("split write request into %d chunks", len(chunkedRequests)))
}
activeTask.AddLogEntry(zapcore.DebugLevel, "received chunk from buffer")

for i, request := range chunkedRequests {
if err := tr.doPush(ctx, request); err != nil {
activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error())
return err
}
activeTask.AddLogEntry(zapcore.DebugLevel, fmt.Sprintf("pushed chunk %d of %d", i+1, len(chunkedRequests)))
if err = tr.doPush(wc, meta.WriteChunk); err != nil {
return
}

progress.Current = uint64(nextEnd - progressDelta)
progressDelta := uint64(float64(meta.Query.EndTimestampMs-meta.Query.StartTimestampMs) * meta.ProgressRatio)

progress.Current += progressDelta
activeTask.SetProgress(progress)
}
})

waitctx.Wait(wc)

if err != nil {
activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error())
return err
}

return nil
return ctx.Err()
}

func (tr *taskRunner) OnTaskCompleted(_ context.Context, activeTask task.ActiveTask, state task.State, _ ...any) {
Expand All @@ -311,7 +364,6 @@ type TargetRunner interface {
Stop(name string) error
GetStatus(name string) (*remoteread.TargetStatus, error)
SetRemoteWriteClient(client clients.Locker[remotewrite.RemoteWriteClient])
SetRemoteReaderClient(client RemoteReader)
}

type taskingTargetRunner struct {
Expand Down Expand Up @@ -446,10 +498,3 @@ func (runner *taskingTargetRunner) SetRemoteWriteClient(client clients.Locker[re

runner.runner.SetRemoteWriteClient(client)
}

func (runner *taskingTargetRunner) SetRemoteReaderClient(client RemoteReader) {
runner.runnerMu.Lock()
defer runner.runnerMu.Unlock()

runner.runner.SetRemoteReaderClient(client)
}
Loading