[DS-2888] LLO datasource: observation pacing, staleRefreshSkipThreshold, wake after observation and metrics#22005
Conversation
|
I see you updated files related to
|
|
✅ No conflicts with other open PRs targeting |
939f176 to
9538747
Compare
4e941e5 to
b775d47
Compare
c838673 to
1150ce5
Compare
| // must be used together to form a quote. So if any stream in the group failed, we drop | ||
| // the entire group to avoid writing a mix of fresh and stale values to the cache. | ||
| // Mutates observedValues in place. | ||
| func (d *dataSource) removeIncompleteGroups(lggr logger.Logger, observedValues map[streams.StreamID]llo.StreamValue, streamValues llo.StreamValues) []streams.StreamID { |
There was a problem hiding this comment.
trying to understand why we'd delete this - how does the existing changes guarantee an atomic write from a pipeline with multiple outputs? Specifically if a pipeline has 3 output values are we still ensuring an "all-or-nothing" result?
There was a problem hiding this comment.
the observation starts grouped by pipeline for all stream IDs in scope for the plugin (included in the passed streamValues), see buildStreamsRefreshPlan and they only get added if all streams are successfully observed, preserving atomicity and integrity.
it maintains the previous implementation guarantees but builds the plan upfront.
There was a problem hiding this comment.
Makes sense - we do have this test cache writes are atomic per pipeline group across observation cycles which makes sure that behavior stays put
1150ce5 to
d1a0ad7
Compare
d1a0ad7 to
bb124ae
Compare
|




Improves how often the LLO observation background loop can refresh pipeline-backed streams without blocking the plugin Observe path or OCR rounds.
staleRefreshSkipThreshold: This is the value S = (N/D)·T derived from data_source.go, where T is the plugin observation deadline (observationTimeout). After a successful cache write, each stream’s entry has a TTL; buildStreamsRefreshPlan treats a cached stream as still fresh (not a refresh driver) while time.Until(expiresAt) > S. Once remaining TTL drops to S or below, that stream becomes a refresh driver for that loop iteration (subject to pacing and registry grouping). Raising S makes that transition happen earlier after each write (more TTL still left when refresh is allowed), which increases freshness and pipeline load; lowering S delays drivers (staler reads, less load). It must satisfy S + observationLoopPacing(T) < cacheEntryTTL(T) so refresh can run before entries expire.
Changes:
Wake channel: After Observe call, coalesce a non-blocking hint so the loop can exit inter-iteration pacing early instead of always waiting observationLoopPacing(T); expose llo_datasource_observation_loop_wait_outcome_count (wake / timer / shutdown).
Cache staleness metric: Record writtenAt on cache writes and emit llo_datasource_cache_hit_entry_age_ms on plugin reads (per streamID).
Tuning / docs: Align comments and constants with 8/5 for staleRefreshSkipThreshold and 2·T cache TTL; document the invariant above.
Tests: Wake-before-pacing, concurrent Observe with separate StreamValues maps (-race), cache hit-age metrics; metric resets in teardown.