Conversation
afrind
left a comment
There was a problem hiding this comment.
Ok, flushing some initial comments. I couldn't get through all of PropertyRanking or MoqxRelay yet.
@afrind reviewed 7 files and all commit messages, and made 45 comments.
Reviewable status: 7 of 11 files reviewed, 35 unresolved discussions (waiting on suhasHere).
include/moqx/MoqxRelay.h line 252 at r1 (raw file):
void onEmpty(moxygen::MoQForwarder* forwarder) override; void forwardChanged(moxygen::MoQForwarder* forwarder) override; void newGroupRequested(moxygen::MoQForwarder* forwarder, uint64_t group) override;
Where did this come from? Did we lose it? It should already have been there. In any case, let's move to another PR.
include/moqx/relay/PropertyRanking.h line 31 at r1 (raw file):
*/ struct RankKey { int64_t negValue; // Negated for descending order
Why not just value, and use > compare?
include/moqx/relay/PropertyRanking.h line 62 at r1 (raw file):
* Includes cached rank to avoid O(n) std::distance() calls. */ struct TrackEntry {
It's called TrackEntry but it's more like a RankIndex?
include/moqx/relay/PropertyRanking.h line 79 at r1 (raw file):
// Waterline for self-exclusion (publisher-subscribers only). // nullopt = use shared threshold (viewers, or PS with no self-tracks)
PS = pubs? You can have self-tracks, just not in the top-n.
include/moqx/relay/PropertyRanking.h line 86 at r1 (raw file):
bool waterlineValid{false}; // Tracks published by this session (for self-exclusion).
Tracks published by this session in the subscribed namespace
include/moqx/relay/PropertyRanking.h line 111 at r1 (raw file):
folly::F14FastMap<moxygen::FullTrackName, TrackState, moxygen::FullTrackName::hash> trackStates; // FIFO queue for cheap reselection (bounded by maxDeselected_)
mention that maxDeselected_ is a relay property?
include/moqx/relay/PropertyRanking.h line 119 at r1 (raw file):
* Only collected when metricsEnabled() is true (disabled by default). */ struct PropertyRankingMetrics {
Let's drop the atomics for now -- they aren't needed. IO threads accumulate in unsynchronized integers and snapshot runs safely in the IO thread to accumulate. See the rest of moqx stat infrastructure.
And consider a rename to PropertyRankingStats to match?
include/moqx/relay/PropertyRanking.h line 137 at r1 (raw file):
// Timing (nanoseconds, accumulated) - only when timing enabled std::atomic<uint64_t> updateSortValueTimeNs{0};
There's histograms in stats if you want those
include/moqx/relay/PropertyRanking.h line 142 at r1 (raw file):
// Idle sweep std::atomic<uint64_t> idleSweeps{0};
I think you used the wrong version of the plan. We removed idle sweeps in favor of driving idle track deselection based on object arrival, with a dampening timeout.
include/moqx/relay/PropertyRanking.h line 176 at r1 (raw file):
// Batch notification callback for viewers (optimization). // Called once per track state change with all affected viewer sessions. using BatchSelectCallback = std::function<void(
I'm interested.
include/moqx/relay/PropertyRanking.h line 235 at r1 (raw file):
const moxygen::FullTrackName& ftn, std::optional<uint64_t> initialValue, std::weak_ptr<moxygen::MoQSession> publisher = {});
do we ever register a track without a publisher?
include/moqx/relay/PropertyRanking.h line 251 at r1 (raw file):
* Get or create group for given N. */ TopNGroup& getOrCreateGroup(uint64_t maxSelected);
Group already has a meaning in MOQT -- let's use TopNGroup for this throughout the class?
include/moqx/relay/PropertyRanking.h line 315 at r1 (raw file):
// Test accessors const std::map<RankKey, RankedEntry>& ranked() const {
I think I would call the member rankedTracks_ and the accessor similarly.
include/moqx/relay/PropertyRanking.h line 461 at r1 (raw file):
uint64_t nextSeq_{0}; // Cached pool boundary (max N + maxDeselected_)
First use of the term "pool" - maybe just boundary_ or selectionThreshold_?
include/moqx/relay/PropertyRanking.h line 465 at r1 (raw file):
// Sorted threshold values for O(log G) crossesThreshold check std::vector<uint64_t> sortedThresholds_;
Unsure what kind of thresholds these are
include/moqx/relay/TopNFilter.h line 23 at r1 (raw file):
// Monotonic tick counter for activity tracking. // Using ticks is faster than std::chrono::steady_clock::now(). using Tick = uint64_t;
If we're going to keep Tick, let's put it in it's own file for now. I don't think otherwise that PropertyRanking.h needs to include TopNFilter.
include/moqx/relay/TopNFilter.h line 68 at r1 (raw file):
// Set the pointer to write activity ticks to. // This allows RelaySubscription to own the tick storage. void setActivityTarget(Tick* target) {
maybe lastActivityTarget is more clear
include/moqx/relay/TopNFilter.h line 73 at r1 (raw file):
// Register an observer for a specific property type. // The observer will be called when the property value changes.
Maybe note there can be only one observer per property type
include/moqx/relay/TopNFilter.h line 162 at r1 (raw file):
// timer or rdtsc. For now, we use a simple atomic counter that can be // advanced by the caller (e.g., HHWheelTimer callback). Tick getCurrentTick();
Same as above. Prefer to remove entirely for now, and belongs it a sep file when added.
src/relay/PropertyRanking.cpp line 2 at r1 (raw file):
/* * Copyright (c) Meta Platforms, Inc. and affiliates.
Ooops - this was copied and is incorrect
src/relay/PropertyRanking.cpp line 15 at r1 (raw file):
namespace openmoq::moqx { PropertyRanking::PropertyRanking(
Why two ctors, do we use both?
src/relay/PropertyRanking.cpp line 57 at r1 (raw file):
// Check if already registered if (tracks_.find(ftn) != tracks_.end()) { XLOG(WARN) << "Track already registered: " << ftn;
Should we overwrite the intialValue in this case or update the publisher? I'm not sure we've thought how multipublisher is going to intersect with top-N 😅
src/relay/PropertyRanking.cpp line 111 at r1 (raw file):
// Batch notify viewers if callback available if (onBatchSelected_ && !viewerBatch.empty()) {
Let's just require onBatchSelected_ and not fallback to individual - makes the code simpler.
src/relay/PropertyRanking.cpp line 128 at r1 (raw file):
// Notify publishers individually for (const auto& [session, forward] : publisherNotifications) {
Something doesn't quite add up here --
Don't publisherNotifications and viewerBatch contain the same content?
src/relay/PropertyRanking.cpp line 141 at r1 (raw file):
std::chrono::steady_clock::now() - notifyStart).count(); } } else if (rank < poolBoundary_) {
This is registerTrack - we should never push into deselectedQueue from here I don't think.
src/relay/PropertyRanking.cpp line 160 at r1 (raw file):
auto& entry = it->second; RankKey oldKey = entry.rankIter->first; RankKey newKey{-static_cast<int64_t>(value), oldKey.arrivalSeq};
can move this after if check
src/relay/PropertyRanking.cpp line 218 at r1 (raw file):
bool wasSelected = stateIt->second == TrackState::Selected; group.trackStates.erase(stateIt);
Do we have all tracks in tracks_ or just selected+deselected? We can skip work if tracks_.erase() tells us it wasn't there.
src/relay/PropertyRanking.cpp line 222 at r1 (raw file):
// Remove from deselected queue if present auto& dq = group.deselectedQueue; dq.erase(std::remove(dq.begin(), dq.end(), ftn), dq.end());
Use !wasSelected?
src/relay/PropertyRanking.cpp line 226 at r1 (raw file):
if (wasSelected) { // Copy sessions to avoid iterator invalidation during callback std::vector<std::pair<std::shared_ptr<moxygen::MoQSession>, bool>> sessionsToNotify;
Oh but this is the list of subscribers - could be a long list with shared_ptr copies. Can we use a state member to track when iteration is happening and XCHECK that no one is e.g. inserting/removing a session while we are iterating/
src/relay/PropertyRanking.cpp line 241 at r1 (raw file):
for (const auto& [session, forward] : sessionsToNotify) { if (session && onSelected_) { onSelected_(promoted, session, forward);
Are we missing self-selection exclusion here?
src/relay/PropertyRanking.cpp line 247 at r1 (raw file):
// Find next track from ranked list that isn't already selected uint64_t count = 0; for (auto& [key, rankedEntry] : ranked_) {
This nested for loop over ranked feels avoidable, somehow, but it's hard to reason about. It seems like if the list of TopNGroups was sorted by N (reversed?) we could avoid it somehow.
src/relay/PropertyRanking.cpp line 257 at r1 (raw file):
if (count < n) { group.trackStates[rankedEntry.ftn] = TrackState::Selected; for (const auto& [session, forward] : sessionsToNotify) {
Whatever our notification loop is, it should be in a helper.
src/relay/PropertyRanking.cpp line 276 at r1 (raw file):
if (inserted) { it->second.maxSelected = maxSelected; updatePoolBoundary();
What is the pool boundary?
src/relay/PropertyRanking.cpp line 285 at r1 (raw file):
} else if (rank < poolBoundary_) { it->second.trackStates[entry.ftn] = TrackState::Deselected; it->second.deselectedQueue.push_back(entry.ftn);
You can't start in the deselected queue
src/relay/PropertyRanking.cpp line 321 at r1 (raw file):
// Compute waterline if publisher if (info.isPublisher()) { info.waterlineKey = computeWaterlineKey(info, maxSelected);
Do we properly account for this when the session publishes and stops publishing tracks in this namespace?
src/relay/PropertyRanking.cpp line 353 at r1 (raw file):
void PropertyRanking::removeSessionFromGroup( uint64_t maxSelected, const std::shared_ptr<moxygen::MoQSession>& session) {
Can this change the pool boundary if this session had the most number of self-published tracks?
src/relay/PropertyRanking.cpp line 400 at r1 (raw file):
auto it = tracks_.find(entry.ftn); if (it != tracks_.end()) { const_cast<TrackEntry&>(it->second).cachedRank = rank;
I see you const_cast
src/relay/PropertyRanking.cpp line 448 at r1 (raw file):
// OPTIMIZATION: O(log G) check using sorted thresholds // Find first threshold > minRank, check if it's <= maxRank auto it = std::upper_bound(sortedThresholds_.begin(), sortedThresholds_.end(), minRank);
This needs more explanation, but I think I see what you are doing.
src/relay/PropertyRanking.cpp line 456 at r1 (raw file):
} void PropertyRanking::recomputeGroups(
JFYI - this is where I am getting dizzy reading this code
src/relay/PropertyRanking.cpp line 483 at r1 (raw file):
} // OPTIMIZATION: Batch viewer notifications
This bit needs to be in a helper also
src/relay/TopNFilter.cpp line 19 at r1 (raw file):
namespace { // Global tick counter for activity tracking std::atomic<Tick> gCurrentTick{0};
I'd prefer not to do this now. Ultimately I want to adopt the laps Tick service model throughout, but unless we need this now I'd prefer not to half-do it.
src/relay/TopNFilter.cpp line 67 at r1 (raw file):
// Check extensions for property values // Optimization: iterate the smaller of observers vs extensions
Cool idea, but I think I'd prefer simplicity for now. Note that getIntExtension currently walks the whole extension list, so you aren't saving much.
src/relay/TopNFilter.cpp line 180 at r1 (raw file):
moxygen::PublishDone pubDone) { // Notify observers that the track has ended notifyTrackEnded();
technically objects can arrive after publishDone. The only guarantee from MoQSession is you will receive beginSubgroup for every publisher subgroup before publishDone. So you can refcount them and fire notifyTrackEnded when the last one is gone?
At least deserves a comment or TODO, we could handle this in filter parent class somehow or with a mixin helper?
Implements MOQ TRACK_FILTER parameter for dynamic top-N track selection in webinar/conference scenarios. Key components: - PropertyRanking: Maintains sorted track rankings by property values with O(log N) updates and threshold-based fast path optimization - TopNFilter: Intercepts published objects to extract property values and track activity for ranking updates - Self-exclusion: Publisher-subscribers don't receive their own tracks - Session validation: Checks isClosed() before sending to sessions Supports 500+ concurrent subscribers with 100% delivery efficiency.
- Add PropertyRanking.cpp and TopNFilter.cpp to moqx_core library - Add MoqxTrackFilterTest unit test executable - Add TrackFilterLoadTest and WebinarBlackboxTest executables - Update build script for new test targets
- Add publish() mock in createMockSession() to return a valid mock consumer when relay forwards selected tracks to subscriber sessions - Update sendObjectWithProperty() to not expect success for non-selected tracks (they have no subscribers, but property updates still work) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add O(1) amortized getRank() via lazy rank cache rebuild - Use sorted thresholds with binary search for O(log G) crossesThreshold() - Batch viewer notifications and smarter waterline invalidation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove "Phase X" references from comments and test logs - Increase kDefaultMaxDeselected from 5 to 250 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Key changes: - RankKey: Use positive value with std::greater comparator instead of negated value - Rename TrackEntry to RankIndex for clarity - Rename PropertyRankingMetrics to PropertyRankingStats, remove atomics - Remove idle sweep functionality (HHWheelTimer, Tick, etc.) - Rename ranked_ to rankedTracks_, poolBoundary_ to selectionThreshold_ - Rename groups_ to topNGroups_, methods to use TopNGroup naming - Require onBatchSelected callback (no fallback to individual) - Don't add tracks to deselectedQueue on registerTrack - TopNFilter: Add ended_ flag to track publishDone, log warnings for late objects - Fix test include path Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…king - Add IterationGuard RAII class to prevent session add/remove during iteration - Add XCHECK in addSessionToTopNGroup/removeSessionFromTopNGroup - Extract notifyTrackSelected() helper to reduce duplication - Add self-exclusion check when promoting tracks - Fix: Call removePublishedTrackFromSession in MoqxRelay::onPublishDone - Add TODO for potential optimization of nested ranked loop Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…tion - Replace inline notification loop with notifyTrackSelected helper - Add IterationGuard to recomputeTopNGroups notification loop - Improve comment explaining viewer vs publisher notification logic Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
suhasHere
left a comment
There was a problem hiding this comment.
@suhasHere made 26 comments.
Reviewable status: 0 of 13 files reviewed, 35 unresolved discussions (waiting on afrind).
src/relay/PropertyRanking.cpp line 2 at r1 (raw file):
Previously, afrind wrote…
Ooops - this was copied and is incorrect
Done.
src/relay/PropertyRanking.cpp line 15 at r1 (raw file):
Previously, afrind wrote…
Why two ctors, do we use both?
Done.
src/relay/PropertyRanking.cpp line 57 at r1 (raw file):
Previously, afrind wrote…
Should we overwrite the intialValue in this case or update the publisher? I'm not sure we've thought how multipublisher is going to intersect with top-N 😅
Done.
src/relay/PropertyRanking.cpp line 111 at r1 (raw file):
Previously, afrind wrote…
Let's just require onBatchSelected_ and not fallback to individual - makes the code simpler.
Done.
src/relay/PropertyRanking.cpp line 128 at r1 (raw file):
Previously, afrind wrote…
Something doesn't quite add up here --
Don't publisherNotifications and viewerBatch contain the same content?
Done.
src/relay/PropertyRanking.cpp line 141 at r1 (raw file):
Previously, afrind wrote…
This is registerTrack - we should never push into deselectedQueue from here I don't think.
Done.
src/relay/PropertyRanking.cpp line 160 at r1 (raw file):
Previously, afrind wrote…
can move this after if check
Done.
src/relay/PropertyRanking.cpp line 218 at r1 (raw file):
Previously, afrind wrote…
Do we have all tracks in tracks_ or just selected+deselected? We can skip work if tracks_.erase() tells us it wasn't there.
Done.
src/relay/PropertyRanking.cpp line 222 at r1 (raw file):
Previously, afrind wrote…
Use !wasSelected?
Done.
src/relay/PropertyRanking.cpp line 226 at r1 (raw file):
Previously, afrind wrote…
Oh but this is the list of subscribers - could be a long list with shared_ptr copies. Can we use a state member to track when iteration is happening and XCHECK that no one is e.g. inserting/removing a session while we are iterating/
Done.
src/relay/PropertyRanking.cpp line 276 at r1 (raw file):
Previously, afrind wrote…
What is the pool boundary?
Done.
src/relay/PropertyRanking.cpp line 285 at r1 (raw file):
Previously, afrind wrote…
You can't start in the deselected queue
Done.
src/relay/TopNFilter.cpp line 19 at r1 (raw file):
Previously, afrind wrote…
I'd prefer not to do this now. Ultimately I want to adopt the laps Tick service model throughout, but unless we need this now I'd prefer not to half-do it.
Done.
include/moqx/MoqxRelay.h line 252 at r1 (raw file):
Previously, afrind wrote…
Where did this come from? Did we lose it? It should already have been there. In any case, let's move to another PR.
Done.
include/moqx/relay/PropertyRanking.h line 31 at r1 (raw file):
Previously, afrind wrote…
Why not just value, and use > compare?
Done.
include/moqx/relay/PropertyRanking.h line 62 at r1 (raw file):
Previously, afrind wrote…
It's called TrackEntry but it's more like a RankIndex?
Done.
include/moqx/relay/PropertyRanking.h line 119 at r1 (raw file):
Previously, afrind wrote…
Let's drop the atomics for now -- they aren't needed. IO threads accumulate in unsynchronized integers and
snapshotruns safely in the IO thread to accumulate. See the rest of moqx stat infrastructure.And consider a rename to PropertyRankingStats to match?
Done.
include/moqx/relay/PropertyRanking.h line 142 at r1 (raw file):
Previously, afrind wrote…
I think you used the wrong version of the plan. We removed idle sweeps in favor of driving idle track deselection based on object arrival, with a dampening timeout.
Done.
include/moqx/relay/PropertyRanking.h line 235 at r1 (raw file):
Previously, afrind wrote…
do we ever register a track without a publisher?
Done.
include/moqx/relay/PropertyRanking.h line 251 at r1 (raw file):
Previously, afrind wrote…
Group already has a meaning in MOQT -- let's use TopNGroup for this throughout the class?
Done.
include/moqx/relay/PropertyRanking.h line 315 at r1 (raw file):
Previously, afrind wrote…
I think I would call the member rankedTracks_ and the accessor similarly.
Done.
include/moqx/relay/PropertyRanking.h line 461 at r1 (raw file):
Previously, afrind wrote…
First use of the term "pool" - maybe just
boundary_orselectionThreshold_?
Done.
include/moqx/relay/PropertyRanking.h line 465 at r1 (raw file):
Previously, afrind wrote…
Unsure what kind of thresholds these are
Done.
include/moqx/relay/TopNFilter.h line 23 at r1 (raw file):
Previously, afrind wrote…
If we're going to keep Tick, let's put it in it's own file for now. I don't think otherwise that PropertyRanking.h needs to include TopNFilter.
Done.
include/moqx/relay/TopNFilter.h line 68 at r1 (raw file):
Previously, afrind wrote…
maybe lastActivityTarget is more clear
Done.
include/moqx/relay/TopNFilter.h line 162 at r1 (raw file):
Previously, afrind wrote…
Same as above. Prefer to remove entirely for now, and belongs it a sep file when added.
Done.
- Use deterministic audio levels (100-i for panelist i) instead of random - Track which tracks each client receives via PUBLISH - computeExpectedTopN() calculates expected selection with self-exclusion - Report shows verification results with detailed mismatch info - Exit code now fails if top-N selection is incorrect Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
We split this into 9 other commits. |
Suhas confirmed picoquic's "bbr" is BBRv3 (current) and "bbr1" is the legacy v1. The earlier bbr → bbr1 mapping was a silent downgrade; remove it. With moxygen #139 landed (registers all picoquic CC algorithms), "bbr" works natively for pico. Changes: - Validator allowlist: accept "bbr" for picoquic (no mapping) - ParsedConfig.h description updated to match - Drop test for bbr → bbr1 mapping (mapping removed) - Add docker/config.production.yaml as deployment template (dual listener: mvfst + picoquic, both cc_algo: bbr, exact paths) - Bump deps/moxygen submodule to 9e06544d (pico CC fix + latest upstream sync) copa is not in the picoquic allowlist — need to verify the picoquic build actually has it before adding.
Suhas confirmed picoquic's "bbr" is BBRv3 (current) and "bbr1" is the legacy v1. The earlier bbr → bbr1 mapping was a silent downgrade; remove it. With moxygen #139 landed (registers all picoquic CC algorithms), "bbr" works natively for pico. Changes: - Validator allowlist: accept "bbr" for picoquic (no mapping) - ParsedConfig.h description updated to match - Drop test for bbr → bbr1 mapping (mapping removed) - Add docker/config.production.yaml as deployment template (dual listener: mvfst + picoquic, both cc_algo: bbr, exact paths) - Bump deps/moxygen submodule to 9e06544d (pico CC fix + latest upstream sync) copa is not in the picoquic allowlist — need to verify the picoquic build actually has it before adding.
* config: default congestion control to copa (match buildTransportSettings) PR #148 correctly sets Copa + pacing as the server-side default in buildTransportSettings(), but the config layer default in Config.h was still "bbr". When no congestion_control is specified in the YAML config, the config default overwrites Copa with BBR. BBR without pacing triggers mvfst's "Unpaced BBR isn't supported" fallback to Cubic. Change the config default to "copa" so it matches the transport settings baseline. Users who explicitly set congestion_control in their config are unaffected. * config: map bbr → bbr1 for picoquic, fix CC validator allowlists picoquic registers its BBR as "bbr1", not "bbr". Users shouldn't need to know this — cc_algo: bbr should work for both stacks. Changes: - Add "bbr" and "copa" to the picoquic validator allowlist (both are valid: "bbr" is mapped, "copa" is natively registered) - After validation, transparently map "bbr" → "bbr1" for picoquic listeners before the config reaches the transport layer - Add test: PicoBbrMappedToBbr1 verifies the mapping - Update ParsedConfig.h description to list "bbr" for picoquic The default cc_algo remains "bbr" (unchanged). With PR #148's pacing fix, BBR works correctly on mvfst without warnings. * config: add bbr to picoquic allowlist, add production config template Suhas confirmed picoquic's "bbr" is BBRv3 (current) and "bbr1" is the legacy v1. The earlier bbr → bbr1 mapping was a silent downgrade; remove it. With moxygen #139 landed (registers all picoquic CC algorithms), "bbr" works natively for pico. Changes: - Validator allowlist: accept "bbr" for picoquic (no mapping) - ParsedConfig.h description updated to match - Drop test for bbr → bbr1 mapping (mapping removed) - Add docker/config.production.yaml as deployment template (dual listener: mvfst + picoquic, both cc_algo: bbr, exact paths) - Bump deps/moxygen submodule to 9e06544d (pico CC fix + latest upstream sync) copa is not in the picoquic allowlist — need to verify the picoquic build actually has it before adding. * retrigger ci
This PR adds support for track filters based on @afrind's design and some additional optimizations to store precompute states
Fixes: #139
This change is