fix: data sink worker optimizations (CM-1054)#3952
Merged
Conversation
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces optimizations aimed at reducing write amplification and redundant DB lookups in the data sink worker path (CM-1054), primarily by throttling organizations.updatedAt updates and caching existence checks while upserting activity relations.
Changes:
- Add optional
throttleUpdatedAtbehavior to organization updates and enable it fromdata_sink_worker. - Reduce repeated
SELECTchecks increateOrUpdateRelationsby caching member/org/segment/conversation existence results within a batch. - Thread the new throttling flag through
findOrCreateOrganization -> updateOrganization.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| services/libs/data-access-layer/src/organizations/base.ts | Adds an optional throttled updatedAt update expression and passes the flag through org upsert flow. |
| services/libs/data-access-layer/src/activities/sql.ts | Adds in-function caches to reduce repeated existence checks during activity relation upserts. |
| services/apps/data_sink_worker/src/service/organization.service.ts | Enables throttled updatedAt updates for data sink worker organization upserts. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
…on logging Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
mbani01
approved these changes
Mar 25, 2026
mbani01
pushed a commit
that referenced
this pull request
Mar 26, 2026
Signed-off-by: Uroš Marolt <uros@marolt.me> Signed-off-by: Mouad BANI <mouad-mb@outlook.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Note
Medium Risk
Moderate risk because it changes data-sink worker batching semantics (org creation, relation upserts, sync triggering) and adds new production indexes/migration behavior, which could affect throughput and correctness under load.
Overview
Improves data-sink ingestion performance by adding new covering/partial Postgres indexes for verified username/email member-identity lookups, and refactoring
findMembersByVerifiedEmailsto use anANY(array[...])predicate.Reduces redundant work in the data-sink worker by introducing a per-batch organization creation promise cache, skipping a known placeholder org (
individual-noaccount.com), throttlingorganizations.updatedAtupdates duringfindOrCreateOrganization, and batching/parallelizing activity queue dispatch.Optimizes downstream side effects by deduplicating
triggerMemberSynccalls per(memberId, segmentId), batching Redis org-id aggregation writes, using bulk affiliation-policy checks, and adding in-function caches to cut repeated existence checks when upserting activity relations (withcreateOrUpdateRelations(..., true)to skip checks in this path).Written by Cursor Bugbot for commit 08893ea. This will update automatically on new commits. Configure here.