diff --git a/core/services/llo/observation/cache.go b/core/services/llo/observation/cache.go index 8446020294e..70812cbc4ee 100644 --- a/core/services/llo/observation/cache.go +++ b/core/services/llo/observation/cache.go @@ -29,6 +29,17 @@ var ( }, []string{"streamID", "reason"}, ) + promCacheHitEntryAgeMs = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "llo", + Subsystem: "datasource", + Name: "cache_hit_entry_age_ms", + Help: "Wall time since the cache entry was written when a plugin read hits the cache (staleness proxy)", + Buckets: []float64{ + 0.5, 1, 2, 5, 10, 25, 50, 100, 250, 500, 1000, + }, + }, + []string{"streamID"}, + ) ) // StreamValueCache is used by dataSource to decouple the read/write paths for stream values. @@ -60,6 +71,7 @@ type Cache struct { type item struct { value llo.StreamValue expiresAt time.Time + writtenAt time.Time // wall clock at Add/AddMany; used for cache_hit_entry_age_ms } type cacheOutcome string @@ -73,6 +85,7 @@ const ( type metricEvent struct { id llotypes.StreamID cacheOutcome cacheOutcome + ageMs float64 // valid when cacheOutcomeHit and writtenAt was set on the item } // NewCache creates a new cache. @@ -112,24 +125,26 @@ func NewCache(cleanupInterval time.Duration) *Cache { // Add adds a stream value to the cache. func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, ttl time.Duration) { + now := time.Now() var expiresAt time.Time if ttl > 0 { - expiresAt = time.Now().Add(ttl) + expiresAt = now.Add(ttl) } c.mu.Lock() defer c.mu.Unlock() - c.values[id] = item{value: value, expiresAt: expiresAt} + c.values[id] = item{value: value, expiresAt: expiresAt, writtenAt: now} } func (c *Cache) AddMany(values map[llotypes.StreamID]llo.StreamValue, ttl time.Duration) { + now := time.Now() var expiresAt time.Time if ttl > 0 { - expiresAt = time.Now().Add(ttl) + expiresAt = now.Add(ttl) } c.mu.Lock() defer c.mu.Unlock() for id, value := range values { - c.values[id] = item{value: value, expiresAt: expiresAt} + c.values[id] = item{value: value, expiresAt: expiresAt, writtenAt: now} } } @@ -152,7 +167,11 @@ func (c *Cache) UpdateStreamValues(streamValues llo.StreamValues) { streamValues[id] = nil continue } - events = append(events, metricEvent{id: id, cacheOutcome: cacheOutcomeHit}) + ageMs := -1.0 + if !itm.writtenAt.IsZero() { + ageMs = float64(now.Sub(itm.writtenAt).Milliseconds()) + } + events = append(events, metricEvent{id: id, cacheOutcome: cacheOutcomeHit, ageMs: ageMs}) streamValues[id] = itm.value } c.mu.RUnlock() @@ -193,6 +212,9 @@ func (c *Cache) updateMetrics() { idStr := strconv.FormatUint(uint64(e.id), 10) if e.cacheOutcome == cacheOutcomeHit { promCacheHitCount.WithLabelValues(idStr).Inc() + if e.ageMs >= 0 { + promCacheHitEntryAgeMs.WithLabelValues(idStr).Observe(e.ageMs) + } } else { promCacheMissCount.WithLabelValues(idStr, string(e.cacheOutcome)).Inc() } diff --git a/core/services/llo/observation/cache_test.go b/core/services/llo/observation/cache_test.go index 6c07d757343..501ecd66fed 100644 --- a/core/services/llo/observation/cache_test.go +++ b/core/services/llo/observation/cache_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -188,6 +190,30 @@ func TestCache_UpdateStreamValues(t *testing.T) { }) } +func TestCache_UpdateStreamValues_RecordsHitEntryAge(t *testing.T) { + promCacheHitEntryAgeMs.Reset() + promCacheHitCount.Reset() + + cache := NewCache(0) + defer cache.Close() + cache.AddMany(map[llotypes.StreamID]llo.StreamValue{ + 1: &mockStreamValue{value: []byte{1}}, + }, time.Hour) + + streamValues := llo.StreamValues{1: nil} + cache.UpdateStreamValues(streamValues) + + var m io_prometheus_client.Metric + require.Eventually(t, func() bool { + hist := promCacheHitEntryAgeMs.WithLabelValues("1").(prometheus.Metric) + if err := hist.Write(&m); err != nil { + return false + } + return m.GetHistogram().GetSampleCount() >= 1 + }, time.Second, 5*time.Millisecond) + assert.GreaterOrEqual(t, m.GetHistogram().GetSampleSum(), 0.0) +} + func TestCache_Add_Get(t *testing.T) { tests := []struct { name string diff --git a/core/services/llo/observation/data_source.go b/core/services/llo/observation/data_source.go index 6681377d404..22982d1b90f 100644 --- a/core/services/llo/observation/data_source.go +++ b/core/services/llo/observation/data_source.go @@ -17,19 +17,72 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/logger" - llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/streams" ) +// Observation cache and loop tuning (all durations scale with observationTimeout T from the plugin Observe deadline). +// +// Invariants: +// - cacheEntryTTL(T) = cacheTTLMultiplier·T — how long successful values remain valid after cache write. +// - staleRefreshSkipThreshold(T) = (staleRefreshRemainingNumerator/Denominator)·T — a stream is not a refresh driver +// while time.Until(expiresAt) is strictly greater than this (see buildStreamsRefreshPlan). A larger threshold +// makes that inequality fail sooner as TTL decays, so the stream becomes a refresh driver earlier after each write +// (higher freshness, more pipeline work); a smaller threshold lengthens the no-driver interval (staler reads, less load). +// - Keep staleRefreshSkipThreshold(T)+observationLoopPacing(T) < cacheEntryTTL(T) (same T throughout). With +// num/den = 8/5 and default pacing = T/10, (8/5+1/10)·T = 1.7·T < 2·T. +// +// Example timings for observationTimeout T = 250ms (cacheTTLMultiplier=2, pacing divisor=10, staleRefresh num/den = 8/5): +// - cacheEntryTTL = 2·T = 500ms — TTL applied on successful per-pipeline-group AddMany writes. +// - staleRefreshSkipThreshold = (8/5)·T = 400ms — a stream in the plugin scope is not a refresh driver while time.Until(expiresAt) > 400ms. +// - observationLoopPacing = T/10 = 25ms (≥ observationLoopPacingMin and ≤ T/2) — minimum delay between loop iterations after the first (plugin Observe may wake the loop earlier; see loopWakeCh). +// - per-iteration context uses WithTimeout(..., T) = 250ms — ceiling on wall time for one observation loop iteration (pipeline workers run in parallel under that deadline). +const ( + cacheTTLMultiplier = 2 + staleRefreshRemainingNumerator int64 = 8 + staleRefreshRemainingDenominator int64 = 5 + + observationLoopPacingMin = 10 * time.Millisecond + observationLoopPacingDivisor = 10 // pacing default = T/10, capped below +) + +func cacheEntryTTL(observationTimeout time.Duration) time.Duration { + return time.Duration(cacheTTLMultiplier) * observationTimeout +} + +// staleRefreshSkipThreshold returns (staleRefreshRemainingNumerator/staleRefreshRemainingDenominator)·T. +// buildStreamsRefreshPlan treats a cached stream as still fresh (not a refresh driver) while time.Until(expiresAt) +// is strictly greater than this value. A larger fraction (e.g. higher numerator) raises the threshold, so the stream +// becomes a refresh driver again sooner after each successful write (more pipeline work, fresher cache entries). +func staleRefreshSkipThreshold(observationTimeout time.Duration) time.Duration { + return (time.Duration(staleRefreshRemainingNumerator) * observationTimeout) / time.Duration(staleRefreshRemainingDenominator) +} + +// observationLoopPacing returns the minimum time between observation loop iterations to cap CPU while +// staying responsive relative to T. Scales with T, clamped to [observationLoopPacingMin, T/2]. +func observationLoopPacing(observationTimeout time.Duration) time.Duration { + if observationTimeout <= 0 { + return observationLoopPacingMin + } + p := observationTimeout / observationLoopPacingDivisor + maxP := observationTimeout / 2 + if p < observationLoopPacingMin { + p = observationLoopPacingMin + } + if p > maxP { + p = maxP + } + return p +} + var ( promMissingStreamCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "llo", Subsystem: "datasource", Name: "stream_missing_count", - Help: "Number of times we tried to observe a stream, but it was missing", + Help: "Number of times a stream had no pipeline in the registry (including refresh planning before Observe)", }, []string{"streamID"}, ) @@ -45,13 +98,21 @@ var ( Namespace: "llo", Subsystem: "datasource", Name: "observation_loop_duration_ms", - Help: "Duration of the observation loop", + Help: "Wall time for one observation loop iteration (pacing excluded)", Buckets: []float64{ 10, 25, 50, 100, 250, 500, 750, 1000, }, }, []string{"configDigest"}, ) + promObservationLoopWaitOutcome = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "llo", + Subsystem: "datasource", + Name: "observation_loop_wait_outcome_count", + Help: "How the observation loop ended its inter-iteration wait: timer (pacing), wake (plugin Observe hint), or shutdown", + }, + []string{"outcome"}, + ) ) type ErrObservationFailed struct { @@ -94,6 +155,9 @@ type dataSource struct { observableStreamsMu sync.Mutex observableStreams *observableStreamValues + + // loopWakeCh coalesces plugin Observe hints (buffer 1): the loop may skip pacing when the plugin is active. + loopWakeCh chan struct{} } func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter) llo.DataSource { @@ -107,15 +171,28 @@ func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSour t: t, cache: NewCache(time.Minute), observationLoopCloseCh: make(chan struct{}), + loopWakeCh: make(chan struct{}, 1), } } -// Observe looks up all streams in the registry and populates a map of stream ID => value +// signalObservationLoopWake notifies the background observation loop that the plugin called Observe (non-blocking, +// coalesced). Safe when the loop is not running (no-op). +func (d *dataSource) signalObservationLoopWake() { + if d.loopWakeCh == nil || !d.observationLoopStarted.Load() { + return + } + select { + case d.loopWakeCh <- struct{}{}: + default: + } +} + +// Observe starts or refreshes the background observation loop for the plugin's stream set, then fills streamValues +// from the in-memory cache (backed by pipeline observations registered for each stream ID). func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error { // Observation loop logic { - // Update the list of streams to observe for this config digest and set the timeout - // StreamValues needs a copy to avoid concurrent access + // setObservableStreams copies stream IDs and deadline into internal state (the plugin's map is not retained). d.setObservableStreams(ctx, streamValues, opts) if !d.observationLoopStarted.Load() { @@ -123,6 +200,7 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, go d.startObservationLoop(loopStartedCh) <-loopStartedCh } + d.signalObservationLoopWake() } // Update stream values with the cached observations for all streams. @@ -131,11 +209,13 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, return nil } -// startObservationLoop continuously makes observations for the streams in this data source -// caching them in memory making the Observe call duration and performance independent -// of the underlying resources providing the observations. -// Based on the expected maxObservationDuration determine the pace of the observation loop -// and for how long to cache the observations. +// startObservationLoop continuously runs pipeline observations for this data source and caches results in memory, +// so the plugin Observe path stays fast regardless of adapter latency. +// Loop pacing (observationLoopPacing), per-round deadline (observationTimeout), cache TTL, and stale-refresh +// threshold (see package constants) are chosen so refresh tends to run before entries expire without busy-spinning. +// Each iteration schedules one goroutine per pipeline that needs work; that worker calls Observe once per +// stream ID in the intersection of p.StreamIDs() with the plugin's requested keys (see buildStreamsRefreshPlan), then +// AddMany for that batch if every Observe in the worker succeeded. If any Observe fails, the worker skips AddMany for that pipeline. func (d *dataSource) startObservationLoop(loopStartedCh chan struct{}) { // atomically set the observation loop started flag to true // or return if it's already started @@ -158,10 +238,6 @@ func (d *dataSource) startObservationLoop(loopStartedCh chan struct{}) { }() for { - if stopChanCtx.Err() != nil { - return - } - osv := d.getObservableStreams() if osv == nil || len(osv.streamValues) == 0 { // There is nothing to observe, exit and let the next Observe() call reinitialize the loop. @@ -169,18 +245,50 @@ func (d *dataSource) startObservationLoop(loopStartedCh chan struct{}) { return } - time.Sleep(osv.observationTimeout) + if !loopStarting { + // Pace the loop to bound CPU; plugin Observe can wake via loopWakeCh to skip sleep when data is needed sooner. + pacing := observationLoopPacing(osv.observationTimeout) + t := time.NewTimer(pacing) + select { + case <-stopChanCtx.Done(): + promObservationLoopWaitOutcome.WithLabelValues("shutdown").Inc() + if !t.Stop() { + select { + case <-t.C: + default: + } + } + return + case <-t.C: + promObservationLoopWaitOutcome.WithLabelValues("timer").Inc() + case <-d.loopWakeCh: + promObservationLoopWaitOutcome.WithLabelValues("wake").Inc() + if !t.Stop() { + select { + case <-t.C: + default: + } + } + } + } + + if stopChanCtx.Err() != nil { + return + } + startTS := time.Now() ctx, cancel := context.WithTimeout(stopChanCtx, osv.observationTimeout) lggr := logger.With(d.lggr, "observationTimestamp", osv.opts.ObservationTimestamp(), "configDigest", osv.opts.ConfigDigest(), "seqNr", osv.opts.OutCtx().SeqNr) + var mu sync.Mutex + var wg sync.WaitGroup + var errs []ErrObservationFailed + successfulStreamIDs := make([]streams.StreamID, 0, len(osv.streamValues)) + plan := d.buildStreamsRefreshPlan(osv.streamValues, osv.observationTimeout, lggr) + ttl := cacheEntryTTL(osv.observationTimeout) + if osv.opts.VerboseLogging() { - streamIDs := make([]streams.StreamID, 0, len(osv.streamValues)) - for streamID := range osv.streamValues { - streamIDs = append(streamIDs, streamID) - } - sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] }) - lggr = logger.With(lggr, "streamIDs", streamIDs) + lggr = logger.With(lggr, "staleStreamIDs", plan.streamIDsToRefresh) lggr.Debugw("Observing streams") } @@ -189,8 +297,9 @@ func (d *dataSource) startObservationLoop(loopStartedCh chan struct{}) { { // Size needs to accommodate the max number of telemetry events that could be generated // Standard case might be about 3 bridge requests per spec and one stream<=>spec - // Overallocate for safety (to avoid dropping packets) - telemCh = d.t.MakeObservationScopedTelemetryCh(osv.opts, 10*len(osv.streamValues)) + // Overallocate for safety (to avoid dropping packets). Sized from stale driver count; total Observe + // calls can be higher when one pipeline has several plugin-requested streams (same worker, same iteration). + telemCh = d.t.MakeObservationScopedTelemetryCh(osv.opts, 10*len(plan.streamIDsToRefresh)) if telemCh != nil { if d.t.CaptureEATelemetry() { ctx = pipeline.WithTelemetryCh(ctx, telemCh) @@ -201,124 +310,170 @@ func (d *dataSource) startObservationLoop(loopStartedCh chan struct{}) { } } - var mu sync.Mutex - successfulStreamIDs := make([]streams.StreamID, 0, len(osv.streamValues)) - observedValues := make(map[streams.StreamID]llo.StreamValue, len(osv.streamValues)) - var errs []ErrObservationFailed - - var wg sync.WaitGroup oc := NewObservationContext(lggr, d.registry, d.t) - - streamsToRefresh := d.getStreamsToRefresh(osv.streamValues, osv.observationTimeout) - - for streamID := range streamsToRefresh { + for p := range plan.groups { wg.Add(1) - go func(streamID llotypes.StreamID) { + go func(streamIDs []streams.StreamID) { defer wg.Done() - var val llo.StreamValue - var err error - - // Observe the stream - if val, err = oc.Observe(ctx, streamID, osv.opts); err != nil { - streamIDStr := strconv.FormatUint(uint64(streamID), 10) - if errors.As(err, &MissingStreamError{}) { - promMissingStreamCount.WithLabelValues(streamIDStr).Inc() + local := make(llo.StreamValues, len(streamIDs)) + var hadErr bool + for _, sid := range streamIDs { + local[sid] = nil + val, err := oc.Observe(ctx, sid, osv.opts) + if err != nil { + hadErr = true + streamIDStr := strconv.FormatUint(uint64(sid), 10) + if errors.As(err, &MissingStreamError{}) { + promMissingStreamCount.WithLabelValues(streamIDStr).Inc() + } + promObservationErrorCount.WithLabelValues(streamIDStr).Inc() + mu.Lock() + errs = append(errs, ErrObservationFailed{inner: err, streamID: sid, reason: "failed to observe stream"}) + mu.Unlock() + continue } - promObservationErrorCount.WithLabelValues(streamIDStr).Inc() mu.Lock() - errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"}) + successfulStreamIDs = append(successfulStreamIDs, sid) mu.Unlock() - return + local[sid] = val } - - mu.Lock() - observedValues[streamID] = val - successfulStreamIDs = append(successfulStreamIDs, streamID) - mu.Unlock() - }(streamID) + if !hadErr { + d.cache.AddMany(local, ttl) + } + }(plan.groups[p]) } wg.Wait() elapsed = time.Since(startTS) - droppedStreamIDs := d.removeIncompleteGroups(lggr, observedValues, osv.streamValues) - - d.cache.AddMany(observedValues, 4*osv.observationTimeout) - - // notify the caller that we've completed our first round of observations. + // Unblock the first Observe caller once the initial observation round has finished. if loopStarting { loopStarting = false close(loopStartedCh) } - // After all Observations have returned, nothing else will be sent to the - // telemetry channel, so it can safely be closed + // After all Observe calls return, nothing else is sent on the telemetry channel for this iteration. if telemCh != nil { close(telemCh) } - // Only log on errors or if VerboseLogging is turned on - if len(errs) > 0 || osv.opts.VerboseLogging() { - slices.Sort(successfulStreamIDs) - sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) + slices.Sort(successfulStreamIDs) + hasRefreshWork := len(plan.groups) > 0 || len(plan.missingStreamIDs) > 0 + elapsedMs := elapsed.Milliseconds() + nOK := len(successfulStreamIDs) - failedStreamIDs := make([]streams.StreamID, len(errs)) + switch { + case len(errs) > 0: + sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) errStrs := make([]string, len(errs)) + failedStreamIDs := make([]streams.StreamID, len(errs)) for i, e := range errs { errStrs[i] = e.String() failedStreamIDs[i] = e.streamID } - - lggr = logger.With(lggr, "elapsed", elapsed, "nSuccessfulStreams", - len(observedValues), "nFailedStreams", len(failedStreamIDs), "nDroppedStreams", len(droppedStreamIDs), "errs", errStrs) - + wl := logger.With(lggr, + "elapsed_ms", elapsedMs, + "nSuccessfulStreams", nOK, + "nFailedStreams", len(failedStreamIDs), + "failedStreamIDs", failedStreamIDs, + "errs", errStrs, + ) if osv.opts.VerboseLogging() { - lggr = logger.With(lggr, "streamValues", osv.streamValues) + wl = logger.With(wl, "staleStreamIDs", plan.streamIDsToRefresh) } + wl.Warnw("Observation loop completed with observation errors") + case osv.opts.VerboseLogging(): + logger.With(lggr, + "elapsed_ms", elapsedMs, + "nSuccessfulStreams", nOK, + "staleStreamIDs", plan.streamIDsToRefresh, + ).Debugw("Observation loop") + case hasRefreshWork: + lggr.Debugw("Observation loop", + "elapsed_ms", elapsedMs, + "nSuccessfulStreams", nOK, + ) } promObservationLoopDuration.WithLabelValues( - osv.opts.ConfigDigest().String()).Observe(float64(elapsed.Milliseconds())) - - lggr.Debugw("Observation loop", "elapsed_ms", elapsed.Milliseconds()) + osv.opts.ConfigDigest().String()).Observe(float64(elapsedMs)) - // context cancellation cancel() } }) } -// getStreamsToRefresh returns the set of stream IDs that need to be re-observed. -// When any stream in a pipeline is stale, ALL streams from that pipeline should be -// re-observed to ensure atomic observation of pipeline groups (e.g. bid/mid/ask must be observed together). -func (d *dataSource) getStreamsToRefresh(streamValues llo.StreamValues, observationTimeout time.Duration) map[streams.StreamID]struct{} { - streamIDs := make(map[streams.StreamID]struct{}) +// streamsRefreshPlan is the refresh scope for one observation loop iteration. +// groups maps each pipeline that needs work to the ordered list of stream IDs that worker will Observe: the +// intersection of p.StreamIDs() with the plugin's requested keys (streamValues), in pipeline order. +// streamIDsToRefresh is the sorted list of plugin-scope keys that are refresh drivers this round (stale or uncached). +// missingStreamIDs lists drivers with no registry entry (no Observe worker is started for them). +type streamsRefreshPlan struct { + groups map[streams.Pipeline][]streams.StreamID + streamIDsToRefresh []streams.StreamID + missingStreamIDs []streams.StreamID +} + +// buildStreamsRefreshPlan derives pipeline work groups and refresh-driver IDs from the cache and streamValues keys. +// A key is a refresh driver when the cache has no live value for it, or when time.Until(expiresAt) is not greater than +// staleRefreshSkipThreshold (same rule as package-level tuning comments). Each registered driver records its pipeline +// in groups; the worker observe list is built from p.StreamIDs() filtered to keys present in streamValues so we never +// run Observe for pipeline siblings the plugin did not request this round. Unregistered drivers go to missingStreamIDs; +// each increments promMissingStreamCount and triggers a single Warn when missingStreamIDs is non-empty. +func (d *dataSource) buildStreamsRefreshPlan(streamValues llo.StreamValues, observationTimeout time.Duration, lggr logger.Logger) streamsRefreshPlan { + candidatesValues := make(llo.StreamValues, len(streamValues)) for streamID := range streamValues { - if _, exists := streamIDs[streamID]; exists { - continue - } - // refresh stream and associated streams from pipeline if this streamID is stale + // Plugin-scope keys that need refresh become drivers; pipelines are collected below and scoped to these keys. if val, expiresAt := d.cache.Get(streamID); val != nil { - if time.Until(expiresAt) > 2*observationTimeout { + if time.Until(expiresAt) > staleRefreshSkipThreshold(observationTimeout) { continue } } + candidatesValues[streamID] = nil + } - streamIDs[streamID] = struct{}{} - - p, exists := d.registry.Get(streamID) - if !exists { - // pipeline isn't registered yet so we can't get associated stream IDs - // this might happen if the plugin requests observations for streamIDs before - // the node operator has registered its job spec or before the registry is fully initialized + // Observe all streams for the pipelines that have at least one candidate stream + // that are in the plugin scope + groups := make(map[streams.Pipeline][]streams.StreamID, len(candidatesValues)) + missingSet := []streams.StreamID{} + for sid := range candidatesValues { + p, ok := d.registry.Get(sid) + if !ok { + missingSet = append(missingSet, sid) continue } - for _, sid := range p.StreamIDs() { - streamIDs[sid] = struct{}{} + if _, ok := groups[p]; !ok { + for _, sid := range p.StreamIDs() { + if _, ok := streamValues[sid]; ok { + groups[p] = append(groups[p], sid) + } + } } } - return streamIDs + + var candidates = make([]streams.StreamID, 0, len(candidatesValues)) + for sid := range candidatesValues { + candidates = append(candidates, sid) + } + slices.Sort(candidates) + + for _, sid := range missingSet { + streamIDStr := strconv.FormatUint(uint64(sid), 10) + promMissingStreamCount.WithLabelValues(streamIDStr).Inc() + } + if len(missingSet) > 0 { + lggr.Warnw("observation loop: streams have no pipeline in registry; discarding", + "missingStreamIDs", missingSet, + "nMissingStreams", len(missingSet), + ) + } + + return streamsRefreshPlan{ + groups: groups, + streamIDsToRefresh: candidates, + missingStreamIDs: missingSet, + } } func (d *dataSource) Close() error { @@ -329,60 +484,13 @@ func (d *dataSource) Close() error { return nil } -// removeIncompleteGroups enforces all-or-nothing (atomic) writes per pipeline group. -// Some pipelines produce values that must be used together. For example jobs that output a bid/mid/ask -// must be used together to form a quote. So if any stream in the group failed, we drop -// the entire group to avoid writing a mix of fresh and stale values to the cache. -// Mutates observedValues in place. -func (d *dataSource) removeIncompleteGroups(lggr logger.Logger, observedValues map[streams.StreamID]llo.StreamValue, streamValues llo.StreamValues) []streams.StreamID { - var dropped []streams.StreamID - checked := make(map[streams.Pipeline]bool) - for streamID := range observedValues { - // we only need to check the pipeline once per group. So if we've already checked this pipeline, skip it. - p, exists := d.registry.Get(streamID) - if !exists || checked[p] { - continue - } - checked[p] = true - - // Check that every in-scope stream for this pipeline succeeded. - // This is because some pipelines might emit values for streams that the plugin is not requesting to be observed - var missing []streams.StreamID - for _, sid := range p.StreamIDs() { - if _, inScope := streamValues[sid]; !inScope { - continue // not requested this cycle so we can skip evaluating result - } - if _, ok := observedValues[sid]; !ok { - missing = append(missing, sid) - } - } - - if len(missing) > 0 { - var droppedFromGroup []streams.StreamID - for _, sid := range p.StreamIDs() { - if _, ok := observedValues[sid]; ok { - droppedFromGroup = append(droppedFromGroup, sid) - } - delete(observedValues, sid) - } - dropped = append(dropped, droppedFromGroup...) - lggr.Debugw("Discarding incomplete pipeline group", - "pipelineStreamIDs", p.StreamIDs(), - "missingStreamIDs", missing, - "droppedStreamIDs", droppedFromGroup, - ) - } - } - return dropped -} - type observableStreamValues struct { opts llo.DSOpts streamValues llo.StreamValues observationTimeout time.Duration } -// setObservableStreams sets the observable streams for the given config digest. +// setObservableStreams updates the stream set and observation deadline (T) used by the background loop when in production. func (d *dataSource) setObservableStreams(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) { if opts == nil || len(streamValues) == 0 { d.lggr.Warnw("setObservableStreams: no observable streams to set", @@ -437,9 +545,8 @@ func (d *dataSource) setObservableStreams(ctx context.Context, streamValues llo. d.observableStreams = osv } -// getObservableStreams returns the active plugin data source options, the streams to observe and the observation interval -// the observation interval is the maximum time we can spend observing streams. We ensure that we don't exceed this time and -// we wait for the remaining time in the observation loop. +// getObservableStreams returns the current observableStreamValues (opts, stream key set, and T), or nil if unset. +// T is the plugin Observe deadline; the loop uses a separate pacing timer between iterations and WithTimeout(..., T) per round. func (d *dataSource) getObservableStreams() *observableStreamValues { d.observableStreamsMu.Lock() defer d.observableStreamsMu.Unlock() diff --git a/core/services/llo/observation/data_source_test.go b/core/services/llo/observation/data_source_test.go index 6a9aaf32732..9ba0c723374 100644 --- a/core/services/llo/observation/data_source_test.go +++ b/core/services/llo/observation/data_source_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -74,6 +75,14 @@ func makePipelineWithSingleResult[T any](runID int64, res T, err error) *mockPip } } +// pipelineForStream sets StreamIDs so the data source schedules Observe for this stream. +// Distinct *mockPipeline values are distinct workers; sibling streams in one job must share one pipeline pointer. +func pipelineForStream(streamID streams.StreamID, runID int64, res *big.Int, err error) *mockPipeline { + p := makePipelineWithSingleResult[*big.Int](runID, res, err) + p.streamIDs = []streams.StreamID{streamID} + return p +} + func makeStreamValues(streamIDs ...llotypes.StreamID) llo.StreamValues { if len(streamIDs) == 0 { return llo.StreamValues{ @@ -228,9 +237,15 @@ func Test_DataSource(t *testing.T) { ds := newDataSource(lggr, reg, telem.NullTelemeter) reg.mu.Lock() - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) - reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, big.NewInt(15), nil) + sids := []streams.StreamID{1, 2, 3} + multi := makePipelineWithMultipleStreamResults(sids, []any{ + decimal.NewFromInt(2181), + decimal.NewFromInt(40602), + decimal.NewFromInt(15), + }) + reg.pipelines[1] = multi + reg.pipelines[2] = multi + reg.pipelines[3] = multi reg.mu.Unlock() vals := makeStreamValues() @@ -252,9 +267,9 @@ func Test_DataSource(t *testing.T) { ds := newDataSource(lggr, reg, telem.NullTelemeter) reg.mu.Lock() - reg.pipelines[11] = makePipelineWithSingleResult[*big.Int](11, big.NewInt(21810), errors.New("something exploded")) - reg.pipelines[12] = makePipelineWithSingleResult[*big.Int](12, big.NewInt(40602), nil) - reg.pipelines[13] = makePipelineWithSingleResult[*big.Int](13, nil, errors.New("something exploded 2")) + reg.pipelines[11] = pipelineForStream(11, 11, big.NewInt(21810), errors.New("something exploded")) + reg.pipelines[12] = pipelineForStream(12, 12, big.NewInt(40602), nil) + reg.pipelines[13] = pipelineForStream(13, 13, nil, errors.New("something exploded 2")) reg.mu.Unlock() vals := makeStreamValues(11, 12, 13) @@ -278,9 +293,9 @@ func Test_DataSource(t *testing.T) { ds := newDataSource(lggr, reg, tm) reg.mu.Lock() - reg.pipelines[21] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), nil) - reg.pipelines[22] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) - reg.pipelines[23] = makePipelineWithSingleResult[*big.Int](102, big.NewInt(15), nil) + reg.pipelines[21] = pipelineForStream(21, 100, big.NewInt(2181), nil) + reg.pipelines[22] = pipelineForStream(22, 101, big.NewInt(40602), nil) + reg.pipelines[23] = pipelineForStream(23, 102, big.NewInt(15), nil) reg.mu.Unlock() vals := makeStreamValues(21, 22, 23) @@ -351,9 +366,9 @@ func Test_DataSource(t *testing.T) { ds := newDataSource(lggr, reg, tm) reg.mu.Lock() - reg.pipelines[31] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), errors.New("something exploded")) - reg.pipelines[32] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) - reg.pipelines[33] = makePipelineWithSingleResult[*big.Int](102, nil, errors.New("something exploded 2")) + reg.pipelines[31] = pipelineForStream(31, 100, big.NewInt(2181), errors.New("something exploded")) + reg.pipelines[32] = pipelineForStream(32, 101, big.NewInt(40602), nil) + reg.pipelines[33] = pipelineForStream(33, 102, nil, errors.New("something exploded 2")) reg.mu.Unlock() vals := makeStreamValues(31, 32, 33) @@ -390,8 +405,8 @@ func Test_DataSource(t *testing.T) { // First observation to populate cache reg.mu.Lock() - reg.pipelines[10001] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil) - reg.pipelines[20001] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) + reg.pipelines[10001] = pipelineForStream(10001, 1, big.NewInt(2181), nil) + reg.pipelines[20001] = pipelineForStream(20001, 2, big.NewInt(40602), nil) reg.mu.Unlock() vals := llo.StreamValues{ @@ -414,8 +429,8 @@ func Test_DataSource(t *testing.T) { // Change pipeline results reg.mu.Lock() - reg.pipelines[10001] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(9999), nil) - reg.pipelines[20001] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(8888), nil) + reg.pipelines[10001] = pipelineForStream(10001, 1, big.NewInt(9999), nil) + reg.pipelines[20001] = pipelineForStream(20001, 2, big.NewInt(8888), nil) reg.mu.Unlock() // Second observation should use cached values @@ -443,7 +458,7 @@ func Test_DataSource(t *testing.T) { // First observation reg.mu.Lock() - reg.pipelines[50002] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) + reg.pipelines[50002] = pipelineForStream(50002, 1, big.NewInt(100), nil) reg.mu.Unlock() vals := llo.StreamValues{50002: nil} @@ -454,7 +469,7 @@ func Test_DataSource(t *testing.T) { // Change pipeline result reg.mu.Lock() - reg.pipelines[50002] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(200), nil) + reg.pipelines[50002] = pipelineForStream(50002, 1, big.NewInt(200), nil) reg.mu.Unlock() // Wait for cache to expire @@ -477,7 +492,7 @@ func Test_DataSource(t *testing.T) { // Set up pipeline to return different values reg.mu.Lock() - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) + reg.pipelines[1] = pipelineForStream(1, 1, big.NewInt(100), nil) reg.mu.Unlock() // First observation to cache @@ -588,9 +603,9 @@ func Test_DataSource(t *testing.T) { // First observation with error reg.mu.Lock() - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, nil, errors.New("pipeline error")) + reg.pipelines[1] = pipelineForStream(1, 1, nil, errors.New("pipeline error")) reg.mu.Unlock() - vals := makeStreamValues() + vals := makeStreamValues(1) ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) defer cancel() @@ -599,7 +614,7 @@ func Test_DataSource(t *testing.T) { // Second observation should try again (not use cache for error case) reg.mu.Lock() - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) + reg.pipelines[1] = pipelineForStream(1, 1, big.NewInt(100), nil) reg.mu.Unlock() time.Sleep(observationTimeout * 3) @@ -615,131 +630,76 @@ func Test_DataSource(t *testing.T) { promCacheHitCount.Reset() promCacheMissCount.Reset() + promCacheHitEntryAgeMs.Reset() + promObservationLoopWaitOutcome.Reset() } -func Test_removeIncompleteGroups(t *testing.T) { +func Test_DataSource_ObservationLoopWakeSkipsPacing(t *testing.T) { + promObservationLoopWaitOutcome.Reset() lggr := logger.NullLogger + mainCtx := testutils.Context(t) + opts := &mockOpts{} - pipelineAB := &mockPipeline{streamIDs: []streams.StreamID{1, 2, 3}} - pipelineC := &mockPipeline{streamIDs: []streams.StreamID{10}} - pipelineDE := &mockPipeline{streamIDs: []streams.StreamID{20, 21}} - - reg := &mockRegistry{pipelines: map[streams.StreamID]*mockPipeline{ - 1: pipelineAB, 2: pipelineAB, 3: pipelineAB, - 10: pipelineC, - 20: pipelineDE, 21: pipelineDE, - }} - ds := &dataSource{registry: reg} - - t.Run("all streams present for pipeline group, nothing removed", func(t *testing.T) { - observed := map[streams.StreamID]llo.StreamValue{ - 1: llo.ToDecimal(decimal.NewFromInt(100)), - 2: llo.ToDecimal(decimal.NewFromInt(200)), - 3: llo.ToDecimal(decimal.NewFromInt(300)), - } - scope := llo.StreamValues{1: nil, 2: nil, 3: nil} - - dropped := ds.removeIncompleteGroups(lggr, observed, scope) - - assert.Len(t, observed, 3) - assert.Empty(t, dropped) - assert.Contains(t, observed, streams.StreamID(1)) - assert.Contains(t, observed, streams.StreamID(2)) - assert.Contains(t, observed, streams.StreamID(3)) - }) - - t.Run("one stream missing from 3-stream pipeline, entire group dropped", func(t *testing.T) { - observed := map[streams.StreamID]llo.StreamValue{ - 1: llo.ToDecimal(decimal.NewFromInt(100)), - 3: llo.ToDecimal(decimal.NewFromInt(300)), - // stream 2 missing (e.g. extraction failed) - } - scope := llo.StreamValues{1: nil, 2: nil, 3: nil} - - dropped := ds.removeIncompleteGroups(lggr, observed, scope) - - assert.Empty(t, observed, "entire group should be dropped when one stream is missing") - assert.ElementsMatch(t, []streams.StreamID{1, 3}, dropped) - }) - - t.Run("two independent pipelines, one complete one incomplete, only incomplete dropped", func(t *testing.T) { - observed := map[streams.StreamID]llo.StreamValue{ - 1: llo.ToDecimal(decimal.NewFromInt(100)), - 3: llo.ToDecimal(decimal.NewFromInt(300)), - 10: llo.ToDecimal(decimal.NewFromInt(1000)), - // stream 2 missing from pipelineAB; pipelineC (stream 10) is complete - } - scope := llo.StreamValues{1: nil, 2: nil, 3: nil, 10: nil} - - dropped := ds.removeIncompleteGroups(lggr, observed, scope) - - assert.Len(t, observed, 1) - assert.ElementsMatch(t, []streams.StreamID{1, 3}, dropped) - assert.Contains(t, observed, streams.StreamID(10), "complete pipeline should be kept") - assert.NotContains(t, observed, streams.StreamID(1), "incomplete pipeline streams should be dropped") - assert.NotContains(t, observed, streams.StreamID(3), "incomplete pipeline streams should be dropped") - }) - - t.Run("stream in pipeline.StreamIDs() but not in scope (not requested), group kept", func(t *testing.T) { - observed := map[streams.StreamID]llo.StreamValue{ - 1: llo.ToDecimal(decimal.NewFromInt(100)), - 2: llo.ToDecimal(decimal.NewFromInt(200)), - // stream 3 is in pipelineAB.StreamIDs() but NOT in scope - } - scope := llo.StreamValues{1: nil, 2: nil} // stream 3 not requested - - dropped := ds.removeIncompleteGroups(lggr, observed, scope) - - assert.Len(t, observed, 2, "group should be kept; stream 3 is out of scope, not missing") - assert.Empty(t, dropped) - assert.Contains(t, observed, streams.StreamID(1)) - assert.Contains(t, observed, streams.StreamID(2)) - }) - - t.Run("empty observedValues, no panic", func(t *testing.T) { - observed := map[streams.StreamID]llo.StreamValue{} - scope := llo.StreamValues{1: nil, 2: nil, 3: nil} - - var dropped []streams.StreamID - assert.NotPanics(t, func() { - dropped = ds.removeIncompleteGroups(lggr, observed, scope) - }) - assert.Empty(t, observed) - assert.Empty(t, dropped) - }) - - t.Run("single-stream pipeline always kept when present", func(t *testing.T) { - observed := map[streams.StreamID]llo.StreamValue{ - 10: llo.ToDecimal(decimal.NewFromInt(1000)), - } - scope := llo.StreamValues{10: nil} + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + reg.mu.Lock() + reg.pipelines[1] = pipelineForStream(1, 1, big.NewInt(42), nil) + reg.mu.Unlock() - dropped := ds.removeIncompleteGroups(lggr, observed, scope) + ds := newDataSource(lggr, reg, telem.NullTelemeter) + defer ds.Close() - assert.Len(t, observed, 1) - assert.Empty(t, dropped) - assert.Contains(t, observed, streams.StreamID(10)) - }) + // Long plugin deadline => large inter-iteration pacing; wake from Observe should advance the loop without waiting. + longCtx, cancel := context.WithTimeout(mainCtx, 30*time.Second) + defer cancel() + vals := makeStreamValues(1) + require.NoError(t, ds.Observe(longCtx, vals, opts)) - t.Run("all groups complete with multiple pipelines", func(t *testing.T) { - observed := map[streams.StreamID]llo.StreamValue{ - 1: llo.ToDecimal(decimal.NewFromInt(100)), - 2: llo.ToDecimal(decimal.NewFromInt(200)), - 3: llo.ToDecimal(decimal.NewFromInt(300)), - 10: llo.ToDecimal(decimal.NewFromInt(1000)), - 20: llo.ToDecimal(decimal.NewFromInt(2000)), - 21: llo.ToDecimal(decimal.NewFromInt(2100)), - } - scope := llo.StreamValues{1: nil, 2: nil, 3: nil, 10: nil, 20: nil, 21: nil} + require.Eventually(t, func() bool { + return promtest.ToFloat64(promObservationLoopWaitOutcome.WithLabelValues("wake")) >= 1 + }, 2*time.Second, 5*time.Millisecond, "expected at least one pacing wait satisfied by plugin wake") +} - dropped := ds.removeIncompleteGroups(lggr, observed, scope) +func Test_DataSource_ObserveWakeManyConcurrent(t *testing.T) { + lggr := logger.NullLogger + mainCtx := testutils.Context(t) + opts := &mockOpts{} - assert.Len(t, observed, 6, "all groups complete, nothing should be dropped") - assert.Empty(t, dropped) - }) + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + reg.mu.Lock() + reg.pipelines[1] = pipelineForStream(1, 1, big.NewInt(1), nil) + reg.mu.Unlock() + + ds := newDataSource(lggr, reg, telem.NullTelemeter) + ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() + vals := makeStreamValues(1) + require.NoError(t, ds.Observe(ctx, vals, opts)) + + done := make(chan struct{}) + var wg sync.WaitGroup + for i := 0; i < 200; i++ { + wg.Add(1) + go func() { + defer wg.Done() + // Each call needs its own StreamValues map: Observe mutates it in place (UpdateStreamValues). + localVals := makeStreamValues(1) + _ = ds.Observe(ctx, localVals, opts) + }() + } + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("concurrent Observe calls did not complete") + } + require.NoError(t, ds.Close()) } -func Test_getStreamsToRefresh(t *testing.T) { +func Test_buildStreamsRefreshPlan(t *testing.T) { lggr := logger.NullLogger timeout := 100 * time.Millisecond @@ -762,7 +722,7 @@ func Test_getStreamsToRefresh(t *testing.T) { ds := &dataSource{lggr: lggr, registry: reg, cache: cache} sv := llo.StreamValues{1: nil, 2: nil, 3: nil} - result := ds.getStreamsToRefresh(sv, timeout) + result := ds.buildStreamsRefreshPlan(sv, timeout, lggr).streamIDsToRefresh assert.Len(t, result, 3) for _, id := range []streams.StreamID{1, 2, 3} { @@ -778,12 +738,12 @@ func Test_getStreamsToRefresh(t *testing.T) { ds := &dataSource{lggr: lggr, registry: reg, cache: cache} sv := llo.StreamValues{1: nil, 2: nil, 3: nil} - result := ds.getStreamsToRefresh(sv, timeout) + result := ds.buildStreamsRefreshPlan(sv, timeout, lggr).streamIDsToRefresh assert.Empty(t, result) }) - t.Run("one stale stream triggers entire pipeline group", func(t *testing.T) { + t.Run("one stale driver lists only stale IDs; worker observes all requested streams on that pipeline", func(t *testing.T) { cache := NewCache(0) cache.Add(1, llo.ToDecimal(decimal.NewFromInt(100)), time.Hour) cache.Add(2, llo.ToDecimal(decimal.NewFromInt(200)), 1*time.Millisecond) @@ -791,49 +751,54 @@ func Test_getStreamsToRefresh(t *testing.T) { ds := &dataSource{lggr: lggr, registry: reg, cache: cache} sv := llo.StreamValues{1: nil, 2: nil, 3: nil} - result := ds.getStreamsToRefresh(sv, timeout) + plan := ds.buildStreamsRefreshPlan(sv, timeout, lggr) - assert.Len(t, result, 3, "all pipeline siblings should be included when one is stale") - for _, id := range []streams.StreamID{1, 2, 3} { - assert.Contains(t, result, id) + assert.Equal(t, []streams.StreamID{2}, plan.streamIDsToRefresh, "streamIDsToRefresh is stale plugin-scope keys only") + require.Len(t, plan.groups, 1) + for _, sids := range plan.groups { + assert.ElementsMatch(t, []streams.StreamID{1, 2, 3}, sids, "all three are in plugin scope, so observe list matches pipeline StreamIDs()") } }) - t.Run("stale stream adds pipeline siblings even if not in scope", func(t *testing.T) { + t.Run("staleStreamIDs lists only stale keys; groups intersect pipeline with plugin scope", func(t *testing.T) { cache := NewCache(0) cache.Add(1, llo.ToDecimal(decimal.NewFromInt(100)), 1*time.Millisecond) cache.Add(2, llo.ToDecimal(decimal.NewFromInt(200)), time.Hour) - // pipeline has {1,2,3}, but only {1,2} in scope (plugin requested streamIds) + // pipeline has {1,2,3}, but only {1,2} in plugin scope ds := &dataSource{lggr: lggr, registry: reg, cache: cache} sv := llo.StreamValues{1: nil, 2: nil} // stream 3 not requested - result := ds.getStreamsToRefresh(sv, timeout) + plan := ds.buildStreamsRefreshPlan(sv, timeout, lggr) - assert.Contains(t, result, streams.StreamID(1)) - assert.Contains(t, result, streams.StreamID(2)) - assert.Contains(t, result, streams.StreamID(3), "out-of-scope pipeline sibling should still be included") + assert.Equal(t, []streams.StreamID{1}, plan.streamIDsToRefresh) + assert.NotContains(t, plan.streamIDsToRefresh, streams.StreamID(3), "out-of-scope stream is not a refresh driver") + require.Len(t, plan.groups, 1) + for _, sids := range plan.groups { + assert.Equal(t, []streams.StreamID{1, 2}, sids, "stream 3 is not requested; observe list is intersection with streamValues") + } }) - t.Run("stream not in registry is still included", func(t *testing.T) { + t.Run("stream not in registry is stale driver only; no pipeline worker", func(t *testing.T) { ds := &dataSource{lggr: lggr, registry: reg, cache: NewCache(0)} sv := llo.StreamValues{999: nil} // plugin requested streamId not yet in registry - result := ds.getStreamsToRefresh(sv, timeout) + plan := ds.buildStreamsRefreshPlan(sv, timeout, lggr) - assert.Len(t, result, 1) - assert.Contains(t, result, streams.StreamID(999)) + assert.Equal(t, []streams.StreamID{999}, plan.streamIDsToRefresh, "stale in-scope keys are listed even without registry") + assert.Empty(t, plan.groups, "no Observe workers without a pipeline") + assert.ElementsMatch(t, []streams.StreamID{999}, plan.missingStreamIDs) }) t.Run("empty streamValues returns empty set", func(t *testing.T) { ds := &dataSource{lggr: lggr, registry: reg, cache: NewCache(0)} sv := llo.StreamValues{} - result := ds.getStreamsToRefresh(sv, timeout) + result := ds.buildStreamsRefreshPlan(sv, timeout, lggr).streamIDsToRefresh assert.Empty(t, result) }) - t.Run("multiple pipelines: only stale pipeline expanded", func(t *testing.T) { + t.Run("multiple pipelines: only stale keys appear in streamIDsToRefresh", func(t *testing.T) { cache := NewCache(0) // Pipeline {10}: all fresh cache.Add(10, llo.ToDecimal(decimal.NewFromInt(100)), time.Hour) @@ -844,15 +809,39 @@ func Test_getStreamsToRefresh(t *testing.T) { ds := &dataSource{lggr: lggr, registry: reg, cache: cache} sv := llo.StreamValues{10: nil, 20: nil, 21: nil} - result := ds.getStreamsToRefresh(sv, timeout) + plan := ds.buildStreamsRefreshPlan(sv, timeout, lggr) - assert.NotContains(t, result, streams.StreamID(10), "fresh pipeline should not be refreshed") - assert.Contains(t, result, streams.StreamID(20), "stale stream should be refreshed") - assert.Contains(t, result, streams.StreamID(21), "fresh sibling of stale stream should also be refreshed") + assert.NotContains(t, plan.streamIDsToRefresh, streams.StreamID(10), "fresh pipeline should not be refreshed") + assert.Equal(t, []streams.StreamID{20}, plan.streamIDsToRefresh) + assert.NotContains(t, plan.streamIDsToRefresh, streams.StreamID(21), "fresh sibling is not a refresh driver") + require.Len(t, plan.groups, 1) + for _, sids := range plan.groups { + assert.ElementsMatch(t, []streams.StreamID{20, 21}, sids) + } }) promCacheHitCount.Reset() promCacheMissCount.Reset() + promCacheHitEntryAgeMs.Reset() + promObservationLoopWaitOutcome.Reset() +} + +func Test_observationTuningHelpers(t *testing.T) { + t.Parallel() + + const tuningTestT = 100 * time.Millisecond + wantStaleSkip := time.Duration(staleRefreshRemainingNumerator) * tuningTestT / time.Duration(staleRefreshRemainingDenominator) + + assert.Equal(t, time.Duration(cacheTTLMultiplier)*tuningTestT, cacheEntryTTL(tuningTestT)) + assert.Equal(t, wantStaleSkip, staleRefreshSkipThreshold(tuningTestT)) + assert.Less(t, staleRefreshSkipThreshold(tuningTestT), cacheEntryTTL(tuningTestT)) + assert.Less(t, staleRefreshSkipThreshold(tuningTestT)+observationLoopPacing(tuningTestT), cacheEntryTTL(tuningTestT)) + + assert.Equal(t, 10*time.Millisecond, observationLoopPacing(100*time.Millisecond)) + assert.Equal(t, 50*time.Millisecond, observationLoopPacing(500*time.Millisecond)) + assert.Equal(t, observationLoopPacingMin, observationLoopPacing(0)) + // T/10 below floor clamps to min, then caps to T/2 + assert.Equal(t, 10*time.Millisecond, observationLoopPacing(30*time.Millisecond)) } func BenchmarkObserve(b *testing.B) {