Skip to content

oplogPopulator: synthesise oplog key via $addFields in connector pipeline#2744

Open
delthas wants to merge 1 commit into
development/9.5from
improvement/BB-768/oplog-key-via-pipeline-addfields
Open

oplogPopulator: synthesise oplog key via $addFields in connector pipeline#2744
delthas wants to merge 1 commit into
development/9.5from
improvement/BB-768/oplog-key-via-pipeline-addfields

Conversation

@delthas
Copy link
Copy Markdown
Contributor

@delthas delthas commented May 29, 2026

Summary

  • extensions/oplogPopulator/pipeline/PipelineFactory.js: insert a $addFields stage into the connector's change-stream pipeline (right after $match, before the conditional location-strip $set) that synthesises a top-level key field via $ifNull coalescing $fullDocument.value.key and $updateDescription.updatedFields.value.key.
  • extensions/oplogPopulator/constants.js: replace the broken fullDocument nested record in output.schema.key with a top-level key: [string, null] field. The existing ns: {coll} projection is preserved, so the Kafka message key remains {ns: {coll: <bucket>}, key: <object-key>} — bucket-level isolation is unchanged.
  • Tests updated in both pipeline-factory subclasses, the functional pipeline test (real-mongo aggregation against the new stage), and the connectorConfig fixture used by ConnectorsManager tests.

Pure Backbeat change. No SMT, no Zenko image change, no operator flag. Always on.

Context

BB-768: the current key schema projects fullDocument.value.key, which is null on update events (since BB-355 removed change.stream.full.document=updateLookup). Every update for a given bucket therefore serialises to the same key Struct ({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition — both an ordering problem (insert → update on different partitions for the same object) and a throughput problem (a hot bucket's update traffic can't scale with partition count, blocking BB-756).

After ticket discussion (Jira comment 477122) we picked the pipeline-$addFields route over a Kafka Connect SMT: it's a single-repo fix, and the measured server-side cost is ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s — not a throughput concern.

For the same logical S3 object — insert (key in fullDocument.value.key), master PUT update (key in updatedFields.value.key), replication-status update, delete-marker update — all yield the same key value → same partition. Master and version documents both store the same value.key, so master/version events also collapse to the same partition without prefix-stripping.

Coupling caveat

The $ifNull variant relies on metadata always writing the whole value subdocument (full $set). Confirmed today against arsenal — there's no partial dotted-$set path for object MD. A future partial update ($set: {"value.x": …}) would not populate updatedFields.value.key and the resulting event would mis-partition. Accepted risk per the ticket discussion.

Migration

The change is propagated to existing connectors via the existing in-place PUT /connectors/{name}/config reconciliation (no recreate, no resume-token loss — the change touches the key schema + the added $addFields stage only, not the $match stage). Downstream oplog consumers don't read the Kafka message key, so the new key shape is consumer-transparent. See this comment for the full migration trace.

Issue: BB-768

@bert-e
Copy link
Copy Markdown
Contributor

bert-e commented May 29, 2026

Hello delthas,

My role is to assist you with the merge of this
pull request. Please type @bert-e help to get information
on this process, or consult the user documentation.

Available options
name description privileged authored
/after_pull_request Wait for the given pull request id to be merged before continuing with the current one.
/bypass_author_approval Bypass the pull request author's approval
/bypass_build_status Bypass the build and test status
/bypass_commit_size Bypass the check on the size of the changeset TBA
/bypass_incompatible_branch Bypass the check on the source branch prefix
/bypass_jira_check Bypass the Jira issue check
/bypass_peer_approval Bypass the pull request peers' approval
/bypass_leader_approval Bypass the pull request leaders' approval
/approve Instruct Bert-E that the author has approved the pull request. ✍️
/create_pull_requests Allow the creation of integration pull requests.
/create_integration_branches Allow the creation of integration branches.
/no_octopus Prevent Wall-E from doing any octopus merge and use multiple consecutive merge instead
/unanimity Change review acceptance criteria from one reviewer at least to all reviewers
/wait Instruct Bert-E not to run until further notice.
Available commands
name description privileged
/help Print Bert-E's manual in the pull request.
/status Print Bert-E's current status in the pull request TBA
/clear Remove all comments from Bert-E from the history TBA
/retry Re-start a fresh build TBA
/build Re-start a fresh build TBA
/force_reset Delete integration branches & pull requests, and restart merge process from the beginning.
/reset Try to remove integration branches unless there are commits on them which do not appear on the source branch.

Status report is not available.

@bert-e
Copy link
Copy Markdown
Contributor

bert-e commented May 29, 2026

Waiting for approval

The following approvals are needed before I can proceed with the merge:

  • the author

  • 2 peers

@codecov
Copy link
Copy Markdown

codecov Bot commented May 29, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.50%. Comparing base (c52fbcc) to head (e0ce22c).

Additional details and impacted files

Impacted file tree graph

Files with missing lines Coverage Δ
extensions/oplogPopulator/constants.js 100.00% <ø> (ø)
...ensions/oplogPopulator/pipeline/PipelineFactory.js 100.00% <ø> (ø)

... and 10 files with indirect coverage changes

Components Coverage Δ
Bucket Notification 80.22% <ø> (ø)
Core Library 80.61% <ø> (-0.38%) ⬇️
Ingestion 70.63% <ø> (-0.61%) ⬇️
Lifecycle 79.06% <ø> (ø)
Oplog Populator 85.83% <ø> (ø)
Replication 59.67% <ø> (-0.12%) ⬇️
Bucket Scanner 85.76% <ø> (ø)
@@                 Coverage Diff                 @@
##           development/9.5    #2744      +/-   ##
===================================================
- Coverage            74.73%   74.50%   -0.23%     
===================================================
  Files                  199      199              
  Lines                13650    13650              
===================================================
- Hits                 10201    10170      -31     
- Misses                3439     3470      +31     
  Partials                10       10              
Flag Coverage Δ
api:retry 9.12% <ø> (ø)
api:routes 8.94% <ø> (ø)
bucket-scanner 85.76% <ø> (ø)
ft_test:queuepopulator 9.08% <ø> (-1.05%) ⬇️
ingestion 12.51% <ø> (-0.06%) ⬇️
lib 7.75% <ø> (-0.04%) ⬇️
lifecycle 18.89% <ø> (-0.11%) ⬇️
notification 1.02% <ø> (ø)
oplogPopulator 0.14% <ø> (ø)
replication 18.64% <ø> (-0.08%) ⬇️
unit 51.23% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@delthas delthas marked this pull request as ready for review May 29, 2026 10:27
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch from 1193adb to 20249e5 Compare May 29, 2026 10:28
@scality scality deleted a comment from claude Bot May 29, 2026
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch 2 times, most recently from 4edd41b to e24ee0a Compare May 29, 2026 10:43
@scality scality deleted a comment from claude Bot May 29, 2026
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch from e24ee0a to 37fb5a1 Compare May 29, 2026 10:47
@scality scality deleted a comment from claude Bot May 29, 2026
@delthas delthas requested review from a team, francoisferrand and maeldonn May 29, 2026 10:48
@scality scality deleted a comment from claude Bot May 29, 2026
Copy link
Copy Markdown
Contributor

@francoisferrand francoisferrand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the recently updated logic correctly update the pipeline in all cases?
or do we have some situations were we may still stay with an old pipeline?

Comment thread extensions/oplogPopulator/pipeline/PipelineFactory.js Outdated
Comment thread extensions/oplogPopulator/pipeline/PipelineFactory.js Outdated
…line

The current Kafka message key projects fullDocument.value.key, which is
null on update events since BB-355 removed
change.stream.full.document=updateLookup. Every update for a given bucket
therefore serialises to the same key Struct
({ns:{coll:<bucket>}, fullDocument:null}) and lands on one partition,
breaking per-object ordering across op types (insert vs update) and
pinning a hot bucket's update traffic to a single partition (a blocker
for the oplog scaling work tracked in BB-756).

Fix: append a $addFields stage to the MongoDB connector's change-stream
pipeline that synthesises a top-level s3Key field by $ifNull coalescing
$fullDocument.value.key and $updateDescription.updatedFields.value.key.
The connector's output.schema.key is replaced with a single-field Avro
record {s3Key: [string, null]} so the Kafka message key is just that
field. All events for the same logical S3 object — insert, master/version
updates, replication-status updates, delete-marker updates — yield the
same s3Key value and hash to the same partition. Master and version
documents share the same value.key, so master/version events also
collapse to the same partition without prefix-stripping.

The $ifNull variant relies on metadata always writing the whole 'value'
subdocument (full $set). Confirmed against arsenal: there is no partial
dotted-$set path for object MD today. A hypothetical future partial
$set would not populate updateDescription.updatedFields.value.key and
that event would mis-partition — accepted risk per the ticket
discussion.

The change is propagated to existing connectors via the existing in-place
PUT /connectors/{name}/config reconciliation (no recreate, no
resume-token loss — the change touches the key schema + pipeline only,
not the pipeline match stage). Downstream oplog consumers do not read
the Kafka message key, so the new key shape is consumer-transparent.

History / discussion:

  https://scality.atlassian.net/browse/BB-768?focusedCommentId=477122

This supersedes the earlier SMT-based approach (a Kafka Connect Single
Message Transform deriving the key from documentKey._id with
master/version prefix-stripping), which would have spanned three repos
and added a new Java artifact + operator feature flag. Closed in favour
of this single-repo fix after a per-event-cost measurement showed the
$addFields adds ~600–900 ns/event ≈ ~1–2% of one core at 20k ops/s on
the mongod — not a throughput concern at our target rates.

Superseded work (to be closed):

  #2741   (SMT-track Backbeat PR)
  scality/Zenko#2410      (ZENKO-5274 — Java SMT in kafka-connect image)
  https://scality.atlassian.net/browse/ZKOP-553   (operator feature flag — no longer needed)

Issue: BB-768
@delthas delthas force-pushed the improvement/BB-768/oplog-key-via-pipeline-addfields branch from 37fb5a1 to e0ce22c Compare May 29, 2026 13:07
@delthas
Copy link
Copy Markdown
Contributor Author

delthas commented May 29, 2026

will the recently updated logic correctly update the pipeline in all cases?

Yes, on the next Backbeat restart, no existing connector stays on the old pipeline:

  • OplogPopulator.initializeConnectorsConnectorsManager._processOldConnectors retrieves each existing connector's config from Kafka Connect, extracts buckets from pipeline[0].$match (works on both the old 1–2-stage shape and the new 2–3-stage shape — the bucket stage stays at index 0), then constructs a new Connector with {...oldConfig, ...config} where config = _getDefaultConnectorConfiguration(). The spread overwrites output.schema.key with the new single-key-field Avro, and the new Connector starts with bucketsGotModified: true.
  • On the first reconciliation tick after restart, Connector.updatePipeline(doUpdate=true) rebuilds _config.pipeline via the new PipelineFactory.getPipeline() and PUTs the full _config (new schema + new pipeline) via _kafkaConnect.updateConnectorConfig. That's an in-place PUT /connectors/{name}/config — no recreate, no resume-token loss (the change doesn't touch $match).
  • If that first PUT fails (network blip / Connect not ready), bucketsGotModified stays true and it retries on each subsequent tick until success.
  • Newly-spawned connectors (from addConnector) go through _getDefaultConnectorConfiguration directly, so they start with the new schema + new pipeline from the first POST.

The one path that doesn't flip in-process: an already-running Backbeat with old code in memory — the in-process Connector._config was constructed before this code was deployed, so it keeps the old schema until restart. Backbeat restart is required as part of the rollout (which is normal for any Backbeat upgrade).

Also addressed in e0ce22c9:

Comment thread extensions/oplogPopulator/constants.js
@delthas delthas requested a review from francoisferrand May 29, 2026 13:09
@claude
Copy link
Copy Markdown

claude Bot commented May 29, 2026

LGTM. Clean, well-scoped change. The $addFields pipeline stage correctly synthesises the key via $ifNull coalescing, the Avro key schema simplification is consistent with the new pipeline field, and the test coverage (unit + functional against real MongoDB) is thorough — covering insert, update, both-present, and neither-present paths. One minor nit: the PR description refers to the Avro field as s3Key but the code uses key — worth a description fix to avoid confusion.

Review by Claude Code

@scality scality deleted a comment from claude Bot May 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants