diff --git a/packages/opni-agent/opni-agent/charts/templates/deployment.yaml b/packages/opni-agent/opni-agent/charts/templates/deployment.yaml index b23bf89e2d..834deccff1 100644 --- a/packages/opni-agent/opni-agent/charts/templates/deployment.yaml +++ b/packages/opni-agent/opni-agent/charts/templates/deployment.yaml @@ -59,6 +59,10 @@ spec: mountPath: /etc/opni - name: plugins mountPath: /var/lib/opni-agent/plugins + {{- if .Values.import-buffer.enabled }} + - name: import-buffer + mountPath: /var/lib/opni-agent/import-buffer + {{- end }} {{- if .Values.volumeMounts }} {{- toYaml .Values.volumeMounts | nindent 12 }} {{- end }} @@ -175,6 +179,11 @@ spec: {{- if .Values.volumes }} {{- toYaml .Values.volumes | nindent 8 }} {{- end }} + {{- .Values.import-buffer.enabled }} + - name: import-buffer + persistentVolumeClaim: + claimName: {{ include "opni-agent.fullname" .}}-import-buffer + {{- end }} {{- if eq .Values.persistence.mode "pvc" }} --- apiVersion: v1 @@ -194,4 +203,23 @@ spec: resources: requests: storage: 2Gi +{{- end }} +{{- if .Values.import-buffer.enabled }} +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: {{ include "onpi-agent.fullname" .}}-import-buffer + namespace: {{ include "opni-agent.namespace" . }} + labels: + {{- include "opni-agent.labels" . | nindent 4 }} + opni.io/app: agent +spec: + accessModes: + - ReadWriteOnce + {{- if .Values.global.storageClass }} + storageClassName: {{ .Values.global.storageClass }} + {{- end }} + resources: + {{- toYaml .Values.importBuffer.resources | nindent 4 }} {{- end }} \ No newline at end of file diff --git a/packages/opni-agent/opni-agent/charts/values.yaml b/packages/opni-agent/opni-agent/charts/values.yaml index 59da633a1a..8cc9aa22ea 100644 --- a/packages/opni-agent/opni-agent/charts/values.yaml +++ b/packages/opni-agent/opni-agent/charts/values.yaml @@ -102,6 +102,12 @@ kube-prometheus-stack: alertmanager: enabled: false # disable the default Alertmanager deployment +import-buffer: + enabled: false + resources: + requests: + storage: 10Gi + global: cattle: systemDefaultRegistry: "" diff --git a/pkg/gateway/delegate.go b/pkg/gateway/delegate.go index e85b35c52a..52764984e3 100644 --- a/pkg/gateway/delegate.go +++ b/pkg/gateway/delegate.go @@ -88,7 +88,6 @@ func (d *DelegateServer) Request(ctx context.Context, req *streamv1.DelegatedMes "target", targetId, "request", req.GetRequest().QualifiedMethodName(), ) - lg.Debug("delegating rpc request") target, ok := d.activeAgents[targetId] if ok { fwdResp := &totem.RPC{} diff --git a/plugins/metrics/pkg/agent/buffer.go b/plugins/metrics/pkg/agent/buffer.go new file mode 100644 index 0000000000..9c2f82bbcb --- /dev/null +++ b/plugins/metrics/pkg/agent/buffer.go @@ -0,0 +1,227 @@ +package agent + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path" + "sync" + + "github.com/golang/snappy" + "github.com/google/uuid" +) + +var ErrBufferNotFound = fmt.Errorf("buffer not found") + +type ChunkBuffer interface { + // Add blo cks until the value can be added to the buffer. + Add(context.Context, string, ChunkMetadata) error + + // Get blocks until a value can be retrieved from the buffer. + Get(context.Context, string) (ChunkMetadata, error) + + // Delete removes a buffer for the named task from the buffer. + Delete(context.Context, string) error +} + +type memoryBuffer struct { + chanLocker sync.RWMutex + chunkChan map[string]chan ChunkMetadata +} + +func NewMemoryBuffer() ChunkBuffer { + return &memoryBuffer{ + chunkChan: make(map[string]chan ChunkMetadata), + } +} + +func (b *memoryBuffer) Add(_ context.Context, name string, meta ChunkMetadata) error { + b.chanLocker.RLock() + chunkChan, found := b.chunkChan[name] + b.chanLocker.RUnlock() + + if !found { + chunkChan = make(chan ChunkMetadata) + + b.chanLocker.Lock() + b.chunkChan[name] = chunkChan + b.chanLocker.Unlock() + } + + chunkChan <- meta + + return nil +} + +func (b *memoryBuffer) Get(ctx context.Context, name string) (ChunkMetadata, error) { + b.chanLocker.RLock() + chunkChan, found := b.chunkChan[name] + b.chanLocker.RUnlock() + + if !found { + return ChunkMetadata{}, ErrBufferNotFound + } + + select { + case <-ctx.Done(): + return ChunkMetadata{}, ctx.Err() + case meta := <-chunkChan: + return meta, nil + } +} + +func (b *memoryBuffer) Delete(_ context.Context, name string) error { + b.chanLocker.Lock() + delete(b.chunkChan, name) + b.chanLocker.Unlock() + + return nil +} + +type diskBuffer struct { + dir string + + diskWriteLock sync.Mutex + + chanLocker sync.RWMutex + chunkChans map[string]chan string +} + +func NewDiskBuffer(dir string) (ChunkBuffer, error) { + buffer := &diskBuffer{ + dir: path.Join(BufferDir), + chunkChans: make(map[string]chan string), + } + + if err := os.MkdirAll(buffer.dir, 0755); err != nil { + return nil, fmt.Errorf("could not create buffer directory: %w", err) + } + + if err := buffer.reconcileExistingChunks(); err != nil { + return nil, err + } + + return buffer, nil +} + +func (b *diskBuffer) reconcileExistingChunks() error { + entries, err := os.ReadDir(b.dir) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + + return fmt.Errorf("could not reconcile existing chunks: %w", err) + } + + for _, e := range entries { + if !e.IsDir() { + continue + } + + chunkChan := make(chan string, 100) + + b.chanLocker.Lock() + b.chunkChans[e.Name()] = chunkChan + b.chanLocker.Unlock() + + subBufferDir := path.Join(b.dir, e.Name()) + subEntries, err := os.ReadDir(subBufferDir) + if err != nil { + return fmt.Errorf("could not reconcile existing chunks: %w", err) + } + + for _, se := range subEntries { + chunkChan <- path.Join(subBufferDir, se.Name()) + } + } + return nil +} + +func (b *diskBuffer) Add(_ context.Context, name string, meta ChunkMetadata) error { + b.chanLocker.RLock() + chunkChan, found := b.chunkChans[name] + b.chanLocker.RUnlock() + + if !found { + chunkChan = make(chan string, 100) + + b.chanLocker.Lock() + b.chunkChans[name] = chunkChan + b.chanLocker.Unlock() + } + + filePath := path.Join(b.dir, name, uuid.New().String()) + + if err := os.MkdirAll(path.Dir(filePath), 0755); err != nil && !errors.Is(err, os.ErrExist) { + return fmt.Errorf("could not create buffer directory for target '%s': %w", name, err) + } + + uncompressed, err := json.Marshal(meta) + if err != nil { + return fmt.Errorf("could not marshal chunk for buffer: %w", err) + } + + compressed := snappy.Encode(nil, uncompressed) + + b.diskWriteLock.Lock() + if err := os.WriteFile(filePath, compressed, 0644); err != nil { + return fmt.Errorf("could not write chunk to buffer: %w", err) + } + b.diskWriteLock.Unlock() + + chunkChan <- filePath + + return nil +} + +func (b *diskBuffer) Get(ctx context.Context, name string) (ChunkMetadata, error) { + b.chanLocker.RLock() + chunkChan, found := b.chunkChans[name] + b.chanLocker.RUnlock() + + if !found { + return ChunkMetadata{}, ErrBufferNotFound + } + + select { + case <-ctx.Done(): + return ChunkMetadata{}, ctx.Err() + case path := <-chunkChan: + compressed, err := os.ReadFile(path) + if err != nil { + return ChunkMetadata{}, fmt.Errorf("could not read chunk from buffer: %w", err) + } + + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { + return ChunkMetadata{}, fmt.Errorf("could not decompress chunk from buffer: %w", err) + } + + var meta ChunkMetadata + if err := json.Unmarshal(uncompressed, &meta); err != nil { + return ChunkMetadata{}, fmt.Errorf("could not unmarshal chunk from buffer: %w", err) + } + + if err := os.Remove(path); err != nil { + return ChunkMetadata{}, fmt.Errorf("could not remove chunk file from disk, data may linger on system longer than expected: %w", err) + } + + return meta, nil + } +} + +func (b *diskBuffer) Delete(_ context.Context, name string) error { + b.chanLocker.Lock() + delete(b.chunkChans, name) + b.chanLocker.Unlock() + + subBufferDir := path.Join(b.dir, name) + if err := os.RemoveAll(subBufferDir); err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("could not remove buffer directory: %w", err) + } + + return nil +} diff --git a/plugins/metrics/pkg/agent/node.go b/plugins/metrics/pkg/agent/node.go index dcc6fd27cd..655efa521a 100644 --- a/plugins/metrics/pkg/agent/node.go +++ b/plugins/metrics/pkg/agent/node.go @@ -3,7 +3,6 @@ package agent import ( "context" "fmt" - "net/http" "sort" "strings" "sync" @@ -68,7 +67,6 @@ func NewMetricsNode(ct health.ConditionTracker, lg *zap.SugaredLogger) *MetricsN targetRunner: NewTargetRunner(lg), } mn.conditions.AddListener(mn.sendHealthUpdate) - mn.targetRunner.SetRemoteReaderClient(NewRemoteReader(&http.Client{})) // FIXME: this is a hack, update the old sync code to use delegates instead mn.conditions.AddListener(func(key string) { diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index 5f58d7cea9..4d427172a0 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -3,7 +3,10 @@ package agent import ( "context" "encoding/json" + "errors" "fmt" + "net/http" + "os" "strings" "sync" "time" @@ -17,6 +20,7 @@ import ( "github.com/rancher/opni/pkg/util" "github.com/rancher/opni/plugins/metrics/apis/remoteread" "github.com/rancher/opni/plugins/metrics/apis/remotewrite" + "github.com/samber/lo" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc/codes" @@ -25,6 +29,8 @@ import ( var TimeDeltaMillis = time.Minute.Milliseconds() +const BufferDir = "/var/lib/opni-agent/import-buffer" + func toLabelMatchers(rrLabelMatchers []*remoteread.LabelMatcher) []*prompb.LabelMatcher { pbLabelMatchers := make([]*prompb.LabelMatcher, 0, len(rrLabelMatchers)) @@ -54,16 +60,6 @@ func toLabelMatchers(rrLabelMatchers []*remoteread.LabelMatcher) []*prompb.Label return pbLabelMatchers } -func dereferenceResultTimeseries(in []*prompb.TimeSeries) []prompb.TimeSeries { - dereferenced := make([]prompb.TimeSeries, 0, len(in)) - - for _, ref := range in { - dereferenced = append(dereferenced, *ref) - } - - return dereferenced -} - func getMessageFromTaskLogs(logs []*corev1.LogEntry) string { if len(logs) == 0 { return "" @@ -79,11 +75,24 @@ func getMessageFromTaskLogs(logs []*corev1.LogEntry) string { return "" } +func dirExists(p string) bool { + _, err := os.Stat(p) + return err == nil +} + type TargetRunMetadata struct { Target *remoteread.Target Query *remoteread.Query } +type ChunkMetadata struct { + Query *prompb.Query + WriteChunk *prompb.WriteRequest + + // ProgressRatio is the ratio of the progress of this chunk to the total progress of the original request. + ProgressRatio float64 +} + type targetStore struct { innerMu sync.RWMutex inner map[string]*corev1.TaskStatus @@ -130,15 +139,30 @@ func (store *targetStore) ListKeys(_ context.Context, prefix string) ([]string, type taskRunner struct { remoteWriteClient clients.Locker[remotewrite.RemoteWriteClient] - remoteReaderMu sync.RWMutex - remoteReader RemoteReader - backoffPolicy backoff.Policy logger *zap.SugaredLogger + + buffer ChunkBuffer } -func newTaskRunner(logger *zap.SugaredLogger) *taskRunner { +func newTaskRunner(logger *zap.SugaredLogger) (*taskRunner, error) { + logger = logger.Named("task-runner") + + var buffer ChunkBuffer + var err error + + // if buffer volume is not mounted use a simple blocking buffer + if !dirExists(BufferDir) { + logger.Infof("buffer not enabled, using in memory buffer") + buffer = NewMemoryBuffer() + } else { + buffer, err = NewDiskBuffer(BufferDir) + if err != nil { + return nil, fmt.Errorf("could not create buffer: %w", err) + } + } + return &taskRunner{ backoffPolicy: backoff.Exponential( backoff.WithMaxRetries(0), @@ -146,21 +170,15 @@ func newTaskRunner(logger *zap.SugaredLogger) *taskRunner { backoff.WithMaxInterval(5*time.Minute), backoff.WithMultiplier(1.1), ), - logger: logger.Named("task-runner"), - } + logger: logger, + buffer: buffer, + }, nil } func (tr *taskRunner) SetRemoteWriteClient(client clients.Locker[remotewrite.RemoteWriteClient]) { tr.remoteWriteClient = client } -func (tr *taskRunner) SetRemoteReaderClient(client RemoteReader) { - tr.remoteReaderMu.Lock() - defer tr.remoteReaderMu.Unlock() - - tr.remoteReader = client -} - func (tr *taskRunner) OnTaskPending(_ context.Context, _ task.ActiveTask) error { return nil } @@ -203,10 +221,70 @@ func (tr *taskRunner) doPush(ctx context.Context, writeRequest *prompb.WriteRequ } } -func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveTask) error { +func (tr *taskRunner) runPush(ctx context.Context, stopChan chan struct{}, activeTask task.ActiveTask, run *TargetRunMetadata) error { + progress := &corev1.Progress{ + Current: 0, + Total: uint64(run.Query.EndTimestamp.AsTime().UnixMilli() - run.Query.StartTimestamp.AsTime().UnixMilli()), + } + activeTask.SetProgress(progress) + + for progress.Current < progress.Total { + select { + case <-ctx.Done(): + return nil + case <-stopChan: + return nil + default: // continue pushing + } + + meta, getErr := tr.buffer.Get(ctx, run.Target.Meta.Name) + if getErr != nil { + if !errors.Is(getErr, ErrBufferNotFound) { + activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not get chunk from buffer: %s", getErr.Error())) + } + continue + } + + activeTask.AddLogEntry(zapcore.DebugLevel, "received chunk from buffer") + + if err := tr.doPush(ctx, meta.WriteChunk); err != nil { + close(stopChan) + return err + } + + progressDelta := uint64(float64(meta.Query.EndTimestampMs-meta.Query.StartTimestampMs) * meta.ProgressRatio) + + progress.Current += progressDelta + activeTask.SetProgress(progress) + } + + return nil +} + +func (tr *taskRunner) doRead(ctx context.Context, reader RemoteReader, run *TargetRunMetadata, readRequest *prompb.ReadRequest) (*prompb.ReadResponse, error) { + expbackoff := tr.backoffPolicy.Start(ctx) + + for { + select { + case <-expbackoff.Done(): + return nil, ctx.Err() + case <-expbackoff.Next(): + readResponse, err := reader.Read(context.Background(), run.Target.Spec.Endpoint, readRequest) + + if err == nil { + return readResponse, nil + } + + tr.logger.With( + zap.Error(err), + "endpoint", run.Target.Spec.Endpoint, + ).Warn("failed to read from target endpoint, retrying...") + } + } +} + +func (tr *taskRunner) runRead(ctx context.Context, stopChan chan struct{}, activeTask task.ActiveTask, run *TargetRunMetadata) error { limit := util.DefaultWriteLimit() - run := &TargetRunMetadata{} - activeTask.LoadTaskMetadata(run) labelMatchers := toLabelMatchers(run.Query.Matchers) @@ -214,23 +292,15 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT nextStart := run.Query.StartTimestamp.AsTime().UnixMilli() nextEnd := nextStart - progressDelta := nextStart - - progress := &corev1.Progress{ - Current: 0, - Total: uint64(importEnd - progressDelta), - } - - activeTask.SetProgress(progress) - - activeTask.AddLogEntry(zapcore.InfoLevel, "import running") + reader := NewRemoteReader(&http.Client{}) for nextStart < importEnd { select { case <-ctx.Done(): - activeTask.AddLogEntry(zapcore.InfoLevel, "import stopped") - return ctx.Err() - default: // continue with import + return nil + case <-stopChan: + return nil + default: // continue reading } nextStart = nextEnd @@ -254,53 +324,78 @@ func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveT }, } - readResponse, err := tr.remoteReader.Read(context.Background(), run.Target.Spec.Endpoint, readRequest) + readResponse, err := tr.doRead(ctx, reader, run, readRequest) + if err != nil { + close(stopChan) + return err + } + writeRequest := &prompb.WriteRequest{ + Timeseries: lo.Map(readResponse.Results[0].GetTimeseries(), func(t *prompb.TimeSeries, _ int) prompb.TimeSeries { + return lo.FromPtr(t) + }), + Metadata: []prompb.MetricMetadata{}, + } + + var chunks []*prompb.WriteRequest + + chunks, err = util.SplitChunksWithLimit(writeRequest, limit) if err != nil { - activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("failed to read from target endpoint: %s", err)) - return fmt.Errorf("failed to read from target endpoint: %w", err) + close(stopChan) + return err } - for _, result := range readResponse.Results { - if len(result.Timeseries) == 0 { - continue - } + activeTask.AddLogEntry(zapcore.InfoLevel, fmt.Sprintf("split request into %d chunks", len(chunks))) - writeRequest := prompb.WriteRequest{ - Timeseries: dereferenceResultTimeseries(result.Timeseries), + lo.ForEach(chunks, func(chunk *prompb.WriteRequest, i int) { + if err := tr.buffer.Add(ctx, run.Target.Meta.Name, ChunkMetadata{ + Query: readRequest.Queries[0], + WriteChunk: chunk, + ProgressRatio: 1.0 / float64(len(chunks)), + }); err != nil { + activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not add chunk to buffer: %s", err.Error())) } + }) + } - chunkedRequests, err := util.SplitChunksWithLimit(&writeRequest, limit) - if err != nil { - return fmt.Errorf("failed to chunk request: %w", err) - } + return nil +} - if len(chunkedRequests) > 1 { - activeTask.AddLogEntry(zapcore.DebugLevel, fmt.Sprintf("split write request into %d chunks", len(chunkedRequests))) - } +func (tr *taskRunner) OnTaskRunning(ctx context.Context, activeTask task.ActiveTask) error { + run := &TargetRunMetadata{} + activeTask.LoadTaskMetadata(run) - for i, request := range chunkedRequests { - if err := tr.doPush(ctx, request); err != nil { - activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) - return err - } - activeTask.AddLogEntry(zapcore.DebugLevel, fmt.Sprintf("pushed chunk %d of %d", i+1, len(chunkedRequests))) - } + stopChan := make(chan struct{}) - progress.Current = uint64(nextEnd - progressDelta) - activeTask.SetProgress(progress) - } + var eg util.MultiErrGroup + + eg.Go(func() error { + return tr.runRead(ctx, stopChan, activeTask, run) + }) + + eg.Go(func() error { + return tr.runPush(ctx, stopChan, activeTask, run) + }) + + eg.Wait() + + if err := eg.Error(); err != nil { + activeTask.AddLogEntry(zapcore.ErrorLevel, err.Error()) + return err } - return nil + return ctx.Err() } -func (tr *taskRunner) OnTaskCompleted(_ context.Context, activeTask task.ActiveTask, state task.State, _ ...any) { +func (tr *taskRunner) OnTaskCompleted(ctx context.Context, activeTask task.ActiveTask, state task.State, _ ...any) { + if err := tr.buffer.Delete(ctx, activeTask.TaskId()); err != nil { + activeTask.AddLogEntry(zapcore.ErrorLevel, fmt.Sprintf("could not delete task from buffer: %s", err.Error())) + } + switch state { case task.StateCompleted: activeTask.AddLogEntry(zapcore.InfoLevel, "completed") case task.StateFailed: - // a log will be added in OnTaskRunning for failed imports so we don't need to log anything here case task.StateCanceled: activeTask.AddLogEntry(zapcore.WarnLevel, "canceled") } @@ -311,7 +406,6 @@ type TargetRunner interface { Stop(name string) error GetStatus(name string) (*remoteread.TargetStatus, error) SetRemoteWriteClient(client clients.Locker[remotewrite.RemoteWriteClient]) - SetRemoteReaderClient(client RemoteReader) } type taskingTargetRunner struct { @@ -328,7 +422,10 @@ func NewTargetRunner(logger *zap.SugaredLogger) TargetRunner { inner: make(map[string]*corev1.TaskStatus), } - runner := newTaskRunner(logger) + runner, err := newTaskRunner(logger) + if err != nil { + panic(fmt.Sprintf("bug: failed to create target task runner: %s", err)) + } controller, err := task.NewController(context.Background(), "target-runner", store, runner) if err != nil { @@ -433,11 +530,13 @@ func (runner *taskingTargetRunner) GetStatus(name string) (*remoteread.TargetSta state = remoteread.TargetState_Canceled } - return &remoteread.TargetStatus{ + status := &remoteread.TargetStatus{ Progress: statusProgress, Message: getMessageFromTaskLogs(taskStatus.Logs), State: state, - }, nil + } + + return status, nil } func (runner *taskingTargetRunner) SetRemoteWriteClient(client clients.Locker[remotewrite.RemoteWriteClient]) { @@ -446,10 +545,3 @@ func (runner *taskingTargetRunner) SetRemoteWriteClient(client clients.Locker[re runner.runner.SetRemoteWriteClient(client) } - -func (runner *taskingTargetRunner) SetRemoteReaderClient(client RemoteReader) { - runner.runnerMu.Lock() - defer runner.runnerMu.Unlock() - - runner.runner.SetRemoteReaderClient(client) -} diff --git a/test/plugins/metrics/import_test.go b/test/plugins/metrics/import_test.go index a0a4c0d92c..cf00249d8d 100644 --- a/test/plugins/metrics/import_test.go +++ b/test/plugins/metrics/import_test.go @@ -7,12 +7,8 @@ import ( "os" "time" - "github.com/gogo/protobuf/proto" - "github.com/golang/snappy" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/prometheus/prometheus/prompb" - "github.com/samber/lo" monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" monitoringclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned" @@ -34,53 +30,6 @@ import ( const testNamespace = "test-ns" -// blockingHttpHandler is only here to keep a remote reader connection open to keep it running indefinitely -type blockingHttpHandler struct { -} - -func (h blockingHttpHandler) ServeHTTP(w http.ResponseWriter, request *http.Request) { - switch request.URL.Path { - case "/block": - // select {} will block forever without using CPU. - select {} - case "/large": - uncompressed, err := proto.Marshal(&prompb.ReadResponse{ - Results: []*prompb.QueryResult{ - { - Timeseries: []*prompb.TimeSeries{ - { - Labels: []prompb.Label{ - { - Name: "__name__", - Value: "test_metric", - }, - }, - // Samples: lo.Map(make([]prompb.Sample, 4194304), func(sample prompb.Sample, i int) prompb.Sample { - Samples: lo.Map(make([]prompb.Sample, 65536), func(sample prompb.Sample, i int) prompb.Sample { - sample.Timestamp = time.Now().UnixMilli() - return sample - }), - }, - }, - }, - }, - }) - if err != nil { - panic(err) - } - - compressed := snappy.Encode(nil, uncompressed) - - _, err = w.Write(compressed) - if err != nil { - panic(err) - } - case "/health": - default: - panic(fmt.Sprintf("unsupported endpoint: %s", request.URL.Path)) - } -} - var _ = Describe("Remote Read Import", Ordered, Label("integration", "slow"), func() { ctx := context.Background() agentId := "import-agent" @@ -212,7 +161,7 @@ var _ = Describe("Remote Read Import", Ordered, Label("integration", "slow"), fu server := http.Server{ Addr: addr, - Handler: blockingHttpHandler{}, + Handler: NewReadHandler(), } go func() { diff --git a/test/plugins/metrics/runner_test.go b/test/plugins/metrics/runner_test.go index 4f99783b20..55e29f2c21 100644 --- a/test/plugins/metrics/runner_test.go +++ b/test/plugins/metrics/runner_test.go @@ -3,13 +3,14 @@ package metrics_test import ( "fmt" "math" + "net/http" "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/prometheus/prometheus/prompb" "github.com/rancher/opni/pkg/clients" "github.com/rancher/opni/pkg/logger" + "github.com/rancher/opni/pkg/test/freeport" "github.com/rancher/opni/plugins/metrics/apis/remoteread" "github.com/rancher/opni/plugins/metrics/apis/remotewrite" "github.com/rancher/opni/plugins/metrics/pkg/agent" @@ -17,57 +18,13 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -func newRespondingReader() *mockRemoteReader { - return &mockRemoteReader{ - Responses: []*prompb.ReadResponse{ - { - Results: []*prompb.QueryResult{ - { - Timeseries: []*prompb.TimeSeries{ - { - Labels: []prompb.Label{}, - Samples: []prompb.Sample{ - { - Value: 100, - Timestamp: 100, - }, - }, - Exemplars: []prompb.Exemplar{ - { - Labels: nil, - Value: 0, - Timestamp: 0, - }, - }, - }, - }, - }, - }, - }, - }, - } -} - var _ = Describe("Target Runner", Ordered, Label("unit"), func() { var ( - failingReader = &mockRemoteReader{ - Error: fmt.Errorf("failed"), - } - - runner agent.TargetRunner - + // todo: set up a mock prometheus endpoint since we no longer handle readers + addr string + runner agent.TargetRunner writerClient *mockRemoteWriteClient - - target = &remoteread.Target{ - Meta: &remoteread.TargetMeta{ - Name: "test", - ClusterId: "00000-00000", - }, - Spec: &remoteread.TargetSpec{ - Endpoint: "http://127.0.0.1:9090/api/v1/read", - }, - Status: nil, - } + target *remoteread.Target query = &remoteread.Query{ StartTimestamp: ×tamppb.Timestamp{}, @@ -78,6 +35,26 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { } ) + BeforeAll(func() { + By("adding a remote read server") + addr = fmt.Sprintf("127.0.0.1:%d", freeport.GetFreePort()) + + server := http.Server{ + Addr: addr, + Handler: NewReadHandler(), + } + + go func() { + server.ListenAndServe() + }() + DeferCleanup(server.Close) + + Eventually(func() error { + _, err := (&http.Client{}).Get(fmt.Sprintf("http://%s/health", addr)) + return err + }).Should(Not(HaveOccurred())) + }) + BeforeEach(func() { lg := logger.NewPluginLogger().Named("test-runner") @@ -87,6 +64,18 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { runner.SetRemoteWriteClient(clients.NewLocker(nil, func(connInterface grpc.ClientConnInterface) remotewrite.RemoteWriteClient { return writerClient })) + + target = &remoteread.Target{ + Meta: &remoteread.TargetMeta{ + Name: "testTarget", + ClusterId: "testCluster", + }, + Spec: &remoteread.TargetSpec{ + Endpoint: "http://127.0.0.1:9090/api/v1/read", + }, + Status: nil, + } + }) When("target status is not running", func() { @@ -107,37 +96,24 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { }) When("target runner cannot reach target endpoint", func() { - It("should fail", func() { - runner.SetRemoteReaderClient(failingReader) + It("should retry until success", func() { + target.Spec.Endpoint = "http://i.do.not.exist:9090/api/v1/read" err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) - var status *remoteread.TargetStatus - Eventually(func() remoteread.TargetState { - status, _ = runner.GetStatus(target.Meta.Name) - return status.State - }).Should(Equal(remoteread.TargetState_Failed)) + status, err := runner.GetStatus(target.Meta.Name) + Expect(err).NotTo(HaveOccurred()) - expected := &remoteread.TargetStatus{ - Progress: &remoteread.TargetProgress{ - StartTimestamp: ×tamppb.Timestamp{}, - LastReadTimestamp: ×tamppb.Timestamp{}, - EndTimestamp: ×tamppb.Timestamp{ - Seconds: agent.TimeDeltaMillis / 2 / time.Second.Milliseconds(), - }, - }, - Message: "failed to read from target endpoint: failed", - State: remoteread.TargetState_Failed, - } + time.Sleep(time.Second) - AssertTargetStatus(expected, status) + Expect(status.State).To(Equal(remoteread.TargetState_Running)) }) }) - When("editing and restarting failed import", func() { - It("should succeed", func() { - runner.SetRemoteReaderClient(newRespondingReader()) + When("target runner can reach target endpoint", func() { + It("should complete", func() { + target.Spec.Endpoint = fmt.Sprintf("http://%s/small", addr) err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) @@ -148,7 +124,6 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { return status.State }).Should(Equal(remoteread.TargetState_Completed)) - // log message timing is not guaranteed Eventually(func() string { status, _ = runner.GetStatus(target.Meta.Name) return status.Message @@ -171,11 +146,9 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { AssertTargetStatus(expected, status) Expect(len(writerClient.Payloads)).To(Equal(1)) }) - }) - When("target runner can reach target endpoint", func() { - It("should complete", func() { - runner.SetRemoteReaderClient(newRespondingReader()) + It("should complete with large payload", func() { + target.Spec.Endpoint = fmt.Sprintf("http://%s/large", addr) err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) @@ -184,7 +157,7 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { Eventually(func() remoteread.TargetState { status, _ = runner.GetStatus(target.Meta.Name) return status.State - }).Should(Equal(remoteread.TargetState_Completed)) + }, "5s").Should(Equal(remoteread.TargetState_Completed)) Eventually(func() string { status, _ = runner.GetStatus(target.Meta.Name) @@ -206,14 +179,14 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { } AssertTargetStatus(expected, status) - Expect(len(writerClient.Payloads)).To(Equal(1)) + Expect(len(writerClient.Payloads)).To(Equal(4)) }) }) When("target is stopped during push", func() { It("should be marked as stopped", func() { - // new reader with the longest possible delay - runner.SetRemoteReaderClient(newRespondingReader()) + target.Spec.Endpoint = fmt.Sprintf("http://%s/small", addr) + runner.SetRemoteWriteClient(clients.NewLocker(nil, func(connInterface grpc.ClientConnInterface) remotewrite.RemoteWriteClient { return &mockRemoteWriteClient{ Delay: math.MaxInt64, @@ -242,12 +215,13 @@ var _ = Describe("Target Runner", Ordered, Label("unit"), func() { When("target pushes with unrecoverable error", func() { It("should fail", func() { + target.Spec.Endpoint = fmt.Sprintf("http://%s/small", addr) + runner.SetRemoteWriteClient(clients.NewLocker(nil, func(connInterface grpc.ClientConnInterface) remotewrite.RemoteWriteClient { return &mockRemoteWriteClient{ - Error: fmt.Errorf("context canceled"), + Error: fmt.Errorf("some unrecoverable error"), } })) - runner.SetRemoteReaderClient(newRespondingReader()) err := runner.Start(target, query) Expect(err).NotTo(HaveOccurred()) diff --git a/test/plugins/metrics/utils_test.go b/test/plugins/metrics/utils_test.go index 7d8bf88989..3a66aaf9d5 100644 --- a/test/plugins/metrics/utils_test.go +++ b/test/plugins/metrics/utils_test.go @@ -3,14 +3,20 @@ package metrics_test import ( "context" "fmt" + "net/http" "time" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/prometheus/prometheus/prompb" + "github.com/rancher/opni/pkg/logger" "github.com/rancher/opni/plugins/metrics/apis/remoteread" "github.com/rancher/opni/plugins/metrics/apis/remotewrite" + "github.com/samber/lo" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" ) @@ -34,31 +40,6 @@ func AssertTargetStatus(expected *remoteread.TargetStatus, actual *remoteread.Ta AssertTargetProgress(expected.Progress, actual.Progress) } -type mockRemoteReader struct { - Error error - Responses []*prompb.ReadResponse - - // Delay is the time.Duration to wait before returning the next response. - Delay time.Duration - i int -} - -func (reader *mockRemoteReader) Read(_ context.Context, _ string, _ *prompb.ReadRequest) (*prompb.ReadResponse, error) { - if reader.Error != nil { - return nil, reader.Error - } - - if reader.i >= len(reader.Responses) { - return nil, fmt.Errorf("all reader responses have alaredy been consumed") - } - - time.Sleep(reader.Delay) - - response := reader.Responses[reader.i] - reader.i++ - return response, reader.Error -} - type mockRemoteWriteClient struct { Error error Payloads []*cortexpb.WriteRequest @@ -85,3 +66,88 @@ func (client *mockRemoteWriteClient) Push(ctx context.Context, in *cortexpb.Writ func (client *mockRemoteWriteClient) SyncRules(_ context.Context, _ *remotewrite.Payload, _ ...grpc.CallOption) (*emptypb.Empty, error) { return &emptypb.Empty{}, nil } + +// readHttpHandler is only here to keep a remote reader connection open to keep it running indefinitely +type readHttpHandler struct { + lg *zap.SugaredLogger +} + +func NewReadHandler() http.Handler { + return readHttpHandler{ + lg: logger.New( + logger.WithLogLevel(zap.DebugLevel), + ).Named("read-handler"), + } +} + +func (h readHttpHandler) writeReadResponse(w http.ResponseWriter, r *prompb.ReadResponse) { + uncompressed, err := proto.Marshal(r) + if err != nil { + panic(err) + } + + compressed := snappy.Encode(nil, uncompressed) + + _, err = w.Write(compressed) + if err != nil { + panic(err) + } +} + +func (h readHttpHandler) ServeHTTP(w http.ResponseWriter, request *http.Request) { + switch request.URL.Path { + case "/block": + // select {} will block forever without using CPU. + select {} + case "/large": + h.writeReadResponse(w, &prompb.ReadResponse{ + Results: []*prompb.QueryResult{ + { + Timeseries: []*prompb.TimeSeries{ + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "test_metric", + }, + }, + // Samples: lo.Map(make([]prompb.Sample, 4194304), func(sample prompb.Sample, i int) prompb.Sample { + Samples: lo.Map(make([]prompb.Sample, 65536), func(sample prompb.Sample, i int) prompb.Sample { + sample.Timestamp = time.Now().UnixMilli() + return sample + }), + }, + }, + }, + }, + }) + case "/small": + h.writeReadResponse(w, &prompb.ReadResponse{ + Results: []*prompb.QueryResult{ + { + Timeseries: []*prompb.TimeSeries{ + { + Labels: []prompb.Label{}, + Samples: []prompb.Sample{ + { + Value: 100, + Timestamp: 100, + }, + }, + Exemplars: []prompb.Exemplar{ + { + Labels: nil, + Value: 0, + Timestamp: 0, + }, + }, + }, + }, + }, + }, + }) + case "/health": + default: + panic(fmt.Sprintf("unsupported endpoint: %s", request.URL.Path)) + } +}