Skip to content

Fix #23765: Duplicate Notifications in Multi-Replica Deployment#24866

Merged
harshach merged 5 commits intomainfrom
issue-23765
Dec 19, 2025
Merged

Fix #23765: Duplicate Notifications in Multi-Replica Deployment#24866
harshach merged 5 commits intomainfrom
issue-23765

Conversation

@harshach
Copy link
Copy Markdown
Collaborator

@harshach harshach commented Dec 17, 2025

Describe your changes:

Fixes #23765

Preventing Duplicate Notifications in Multi-Server OpenMetadata

Problem Statement

In a multi-server OpenMetadata deployment, each server runs its own EventSubscriptionScheduler that polls the change_events table. Without coordination, each server processes the same events independently, resulting in duplicate notifications being sent.

Root Cause

The original EventSubscriptionScheduler used RAM-based Quartz scheduler (RAMJobStore) without clustering support. Each server maintained its own independent job store with no awareness of other servers.

Solution: Quartz JDBC Clustering

Architecture Overview

flowchart TB
    subgraph DB["Shared Database"]
        CE[("change_events<br/>Event data to process")]
        QL[("QRTZ_LOCKS<br/>Row-level locking")]
        QFT[("QRTZ_FIRED_TRIGGERS<br/>Job ownership tracking")]
    end

    subgraph S1["OM Server 1"]
        SCH1["Scheduler<br/>instanceId: AUTO-generated"]
        JS1["JDBC JobStore"]
    end

    subgraph S2["OM Server 2"]
        SCH2["Scheduler<br/>instanceId: AUTO-generated"]
        JS2["JDBC JobStore"]
    end

    subgraph S3["OM Server 3"]
        SCH3["Scheduler<br/>instanceId: AUTO-generated"]
        JS3["JDBC JobStore"]
    end

    JS1 <--> QL
    JS2 <--> QL
    JS3 <--> QL

    JS1 <--> QFT
    JS2 <--> QFT
    JS3 <--> QFT

    SCH1 --> CE
    SCH2 --> CE
    SCH3 --> CE
Loading

Concurrency Prevention Mechanism

flowchart TD
    A[Job Trigger Fires] --> B{Server attempts to<br/>acquire row lock in<br/>QRTZ_LOCKS table}

    B -->|Lock Acquired| C[Execute Job]
    B -->|Lock Denied| D[Skip Execution<br/>Another server has it]

    C --> E["@DisallowConcurrentExecution<br/>prevents same job from<br/>running again until complete"]
    E --> F[Process Events from<br/>change_events table]
    F --> G[Send Notification]
    G --> H[Release Lock]

    D --> I[Wait for Next<br/>Scheduled Trigger]
Loading

Job Distribution Flow

sequenceDiagram
    participant S1 as Server 1
    participant DB as Database (QRTZ_LOCKS)
    participant S2 as Server 2
    participant S3 as Server 3

    Note over S1,S3: Job "subscription-123" triggers at 10:00:00

    S1->>DB: SELECT FOR UPDATE (acquire lock)
    S2->>DB: SELECT FOR UPDATE (acquire lock)
    S3->>DB: SELECT FOR UPDATE (acquire lock)

    DB-->>S1: Lock granted ✓
    DB-->>S2: Blocked (waiting)
    DB-->>S3: Blocked (waiting)

    Note over S1: Executes job
    S1->>S1: Process change_events
    S1->>S1: Send notification

    S1->>DB: Release lock
    DB-->>S2: Lock denied (job complete)
    DB-->>S3: Lock denied (job complete)

    Note over S2,S3: Skip execution - job already ran
Loading

Failover Handling

flowchart LR
    subgraph Normal["Normal Operation"]
        A1[Server 1<br/>Running Job X]
    end

    subgraph Failure["Server 1 Crashes"]
        A2[Server 1<br/>💀 Dead]
    end

    subgraph Recovery["After clusterCheckinInterval"]
        A3[Server 2<br/>Detects failure]
        A4[Server 2<br/>Takes over Job X]
    end

    Normal --> Failure
    Failure --> Recovery
    A3 --> A4
Loading

Key Configuration

Setting Value Purpose
jobStore.class JobStoreTX JDBC-based job storage
jobStore.isClustered true Enable cluster coordination
scheduler.instanceId AUTO Unique ID per server
clusterCheckinInterval 20000ms Failover detection frequency
tablePrefix QRTZ_ Uses existing Quartz tables

Configuration Code

private static final Map<String, String> CLUSTERED_SCHEDULER_CONFIG = new HashMap<>();

static {
  // Scheduler identification - AUTO generates unique instance ID per server
  CLUSTERED_SCHEDULER_CONFIG.put("org.quartz.scheduler.instanceName", SCHEDULER_NAME);
  CLUSTERED_SCHEDULER_CONFIG.put("org.quartz.scheduler.instanceId", "AUTO");

  // Thread pool configuration
  CLUSTERED_SCHEDULER_CONFIG.put("org.quartz.threadPool.threadCount", "10");

  // Job store configuration - JDBC for clustering support
  CLUSTERED_SCHEDULER_CONFIG.put("org.quartz.jobStore.class",
      "org.quartz.impl.jdbcjobstore.JobStoreTX");
  CLUSTERED_SCHEDULER_CONFIG.put("org.quartz.jobStore.tablePrefix", "QRTZ_");

  // Enable clustering
  CLUSTERED_SCHEDULER_CONFIG.put("org.quartz.jobStore.isClustered", "true");
  CLUSTERED_SCHEDULER_CONFIG.put("org.quartz.jobStore.clusterCheckinInterval", "20000");

  // Data source configuration
  CLUSTERED_SCHEDULER_CONFIG.put("org.quartz.jobStore.dataSource", "omDS");
  CLUSTERED_SCHEDULER_CONFIG.put("org.quartz.dataSource.omDS.maxConnections", "5");
}

Files Modified

File Purpose
EventSubscriptionScheduler.java Added CLUSTERED_SCHEDULER_CONFIG with JDBC JobStore settings
EventSubscriptionSchedulerTest.java Configuration validation tests
EventSubscriptionSchedulerClusteringTest.java Concurrency behavior tests

Distribution with Many Notifications

With 100+ notification subscriptions:

flowchart TB
    subgraph Jobs["100 Notification Jobs"]
        J1[Job 1]
        J2[Job 2]
        J3[Job 3]
        JN[Job N...]
    end

    subgraph Distribution["Distribution via Lock Contention"]
        LC["First server to acquire<br/>database lock wins"]
    end

    subgraph Servers["3 Server Cluster"]
        S1["Server 1<br/>Runs: Jobs 1, 4, 7..."]
        S2["Server 2<br/>Runs: Jobs 2, 5, 8..."]
        S3["Server 3<br/>Runs: Jobs 3, 6, 9..."]
    end

    Jobs --> LC
    LC --> Servers
Loading

Jobs are distributed via lock contention (not load balancing):

  • Each subscription = 1 Quartz job
  • Server that acquires the lock first runs the job
  • Natural distribution based on timing and server load

Guarantees

Guarantee How It's Achieved
No duplicate notifications Only one server acquires the database lock per job trigger
Automatic failover Dead servers detected within clusterCheckinInterval (20s)
Horizontal scaling Add more servers without configuration changes
Backwards compatible Uses existing QRTZ_* tables, no schema migration needed

Existing Infrastructure Leveraged

  • Quartz tables (QRTZ_*) already exist in OpenMetadata database
  • @DisallowConcurrentExecution already present on AbstractEventConsumer
  • Same pattern as AppScheduler which already uses clustering

mohityadav766
mohityadav766 previously approved these changes Dec 17, 2025
manerow
manerow previously approved these changes Dec 17, 2025
@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented Dec 19, 2025

🔍 CI failure analysis for 8d4bd72: Playwright shard (6,6) has 10 test failures with mixed timeout patterns (loader timeouts, hook timeouts, browser crashes, 5-min lineage timeouts). Combined with shard (5,6), total 16 flaky failures. All are CI infrastructure issues (resource exhaustion, browser instability), NOT related to backend scheduler changes. Multiple tests passed on retry. Non-blocking, should be retried or ignored.

Combined Playwright CI Failure Analysis - Shards (5, 6) and (6, 6)


Shard (5, 6) - Previously Analyzed

6 test failures - All with identical UI loader timeout pattern

  • Dashboard Data Model › Certification Add Remove
  • File › Certification Add Remove (passed on retry)
  • Directory › Certification Add Remove
  • Metric › Tag Add, Update and Remove

Shard (6, 6) - NEW Analysis

10 test failures - Mixed timeout patterns

Failed Tests

  1. ✘ ExploreTree › Verify Database and Database schema after rename (passed on retry Create teams page similar to tags #1)
  2. ✘ Glossary › Glossary & terms creation for reviewer as team
  3. ✘ Glossary › Glossary creation with domain selection
  4. ✘ Lineage › Lineage creation from Table entity
  5. ✘ Lineage › Lineage creation from Topic entity
  6. ✘ Lineage › Lineage creation from MlModel entity
  7. ✘ Mlmodel Service › Tag Add, Update and Remove (+ retry Create teams page similar to tags #1 failed)
  8. ✘ Tag › Add and Remove Assets and Check Restricted Entity
  9. ✘ Teams › Add and Remove User for Team

Error Patterns (Shard 6, 6)

Pattern 1: UI Loader Timeout (same as shard 5,6)

Error: Timed out 30000ms waiting for expect(locator).toHaveCount(expected)
Locator: locator('[data-testid="loader"]')
At: waitForLoader utility line 52

Pattern 2: BeforeEach Hook Timeout

Test timeout of 60000ms exceeded while running "beforeEach" hook
Error: page.goto: Target page, context or browser has been closed

Pattern 3: BeforeAll Hook Timeout

"beforeAll" hook timeout of 60000ms exceeded

Pattern 4: Strict Mode Violation (Glossary test)

Error: strict mode violation: getByTestId('glossary-terms-table').getByText('Owners') 
resolved to 2 elements

Pattern 5: Long Test Timeout

Test timeout of 180000ms exceeded

Root Cause Analysis

Primary Cause: Infrastructure/Resource Issues

All failures indicate CI environment problems, NOT code bugs:

  1. Resource Exhaustion: Multiple 5-minute timeouts (300s) on Lineage tests suggest CI runners are severely resource-constrained
  2. Page/Context Closure: "Target page, context or browser has been closed" indicates Playwright browser processes are crashing or timing out
  3. Loader Timeouts: Same pattern as shard (5,6) - UI loaders not disappearing within 30s
  4. Test Flakiness: ExploreTree test passed on retry Create teams page similar to tags #1, confirming intermittent nature

Why This Is NOT Related to PR Changes

PR Modifications:

  • Backend Java code: AbstractEventConsumer.java, EventSubscriptionScheduler.java
  • Reverted JDBC JobStore to RAMJobStore for Quartz scheduler
  • Zero frontend code changes
  • Zero UI changes

Failing Tests:

  • Frontend E2E Playwright tests
  • Testing UI workflows: Lineage, Glossary, Tags, Teams, Services
  • Different components across the application
  • No shared code path with scheduler changes

Evidence of Environmental Flakiness

  1. Intermittent Successes:

  2. Diverse Failure Patterns:

    • Multiple unrelated timeout types
    • Different components failing (Lineage, Glossary, Tags, Teams)
    • No single root cause in code - points to infrastructure
  3. Extreme Timeouts:

    • Lineage tests timing out at 5+ minutes (300+ seconds)
    • These are configured with 180s test timeout
    • Suggests CI runner is severely overloaded or network issues
  4. Browser/Page Crashes:

    • "Target page, context or browser has been closed"
    • Indicates Playwright browser process instability
    • Environment issue, not application code

Historical Context

This PR has experienced flaky Playwright failures across multiple shards:

  • Shard (4, 6): Container › Certification Add Remove
  • Shard (5, 6): 6 failures with loader timeouts
  • Shard (6, 6): 10 failures with mixed patterns

Pattern: Different tests failing in different shards, but same infrastructure symptoms.

Impact Assessment

Non-blocking: Environmental test infrastructure issues

NOT related to PR code changes: Backend scheduler changes cannot cause frontend UI test timeouts

🔄 Recommendation: Retry CI jobs or ignore these failures

Technical Separation

PR Changes Failing Tests
Backend Java Frontend Playwright
Event subscription scheduler UI entity workflows
Quartz JobStore configuration Lineage, Glossary, Tags, Teams UI
No frontend changes E2E browser tests
Zero logical connection Zero code overlap

Conclusion

Both shard (5, 6) and shard (6, 6) failures are flaky infrastructure issues:

✅ Environmental resource exhaustion/timing issues
✅ Multiple tests passing on retry (proving flakiness)
✅ No connection to backend scheduler changes
✅ Browser process instability in CI environment

Verdict: ❌ NOT BLOCKING - These failures should be retried or ignored

Code Review ⚠️ Changes requested

Good refactoring for clustered Quartz support with one important serialization inconsistency to address.

⚠️ Bug: Inconsistent serialization: commit stores object, load expects JSON

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java:255 📄 openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java:216

There's an inconsistency in how ALERT_INFO_KEY is serialized when storing vs loading:

In commit() (line 255):

jobExecutionContext.getJobDetail().getJobDataMap().put(ALERT_INFO_KEY, eventSubscription);

The EventSubscription object is stored directly, not as JSON.

In init() (lines 77-89):

if (alertInfoValue instanceof String subscriptionJson) {
  this.eventSubscription = JsonUtils.readValue(subscriptionJson, EventSubscription.class);
} else if (alertInfoValue instanceof EventSubscription subscription) {
  this.eventSubscription = subscription;
}

The load code handles both String and EventSubscription, but the commit stores the raw object.

In addEventSubscriptionJobDataMap() (line 216):

dataMap.put(ALERT_INFO_KEY, eventSubscription);

Also stores the raw object.

For JDBC JobStore clustering to work correctly, all data in JobDataMap should be serialized to JSON strings. The current implementation will work with RAMJobStore but will fail with JDBC JobStore if you switch back.

Suggested fix: Serialize to JSON in commit():

jobExecutionContext.getJobDetail().getJobDataMap().put(ALERT_INFO_KEY, JsonUtils.pojoToJson(eventSubscription));

And in addEventSubscriptionJobDataMap():

dataMap.put(ALERT_INFO_KEY, JsonUtils.pojoToJson(eventSubscription));
More details 💡 2 suggestions ✅ 6 resolved
💡 Edge Case: Redundant getStartingOffset call when dbOffset is null

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java:189-198

In loadInitialOffset, when dbOffset from getStartingOffset() is null, the code falls through to call getStartingOffset(eventSubscription.getId()) again (line 98), which will return the same null result since the underlying data hasn't changed.

This creates unclear control flow and the warning message suggests a default should be used, but getStartingOffset is called instead of creating a proper default offset.

Suggested fix:

EventSubscriptionOffset dbOffset = getStartingOffset(eventSubscription.getId());
if (dbOffset != null) {
  context.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, JsonUtils.pojoToJson(dbOffset));
  return dbOffset;
}

LOG.warn("No offset found for subscription {}, using default", eventSubscription.getId());
// Create a proper default offset instead of calling getStartingOffset again
EventSubscriptionOffset defaultOffset = new EventSubscriptionOffset()
    .withCurrentOffset(0L)
    .withStartingOffset(0L);
context.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, JsonUtils.pojoToJson(defaultOffset));
return defaultOffset;
💡 Code Quality: Early returns leave consumer in partially initialized state

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java:73-114

In the init() method, early returns on error conditions (lines 78, 85, 92, 101) leave the consumer in a partially initialized state where some fields may be null while others have default values.

While the execute() method does check for this.eventSubscription == null before proceeding, other fields like offset, startingOffset, alertMetrics, and destinationMap remain uninitialized (null or 0), which could cause NullPointerExceptions if any code path attempts to use them.

Suggested fix:
Consider throwing an exception or using a dedicated "initialization failed" flag:

private boolean initializationFailed = false;

private void init(JobExecutionContext context) {
  this.jobDetail = context.getJobDetail();
  try {
    // ... initialization logic ...
  } catch (Exception e) {
    LOG.error("Failed to initialize EventConsumer from JobDataMap", e);
    this.initializationFailed = true;
  }
}

@Override
public void execute(JobExecutionContext jobExecutionContext) {
  this.init(jobExecutionContext);
  if (this.initializationFailed) {
    LOG.error("Skipping job execution - initialization failed");
    return;
  }
  // ... rest of execute logic ...
}
Bug: Database password exposed in Quartz properties

📄 openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java:183-186
The configureDataSource method directly puts the database password into Quartz properties:

properties.put("org.quartz.dataSource.omDS.password", config.getDataSourceFactory().getPassword());

These properties may be logged by Quartz during startup or error conditions, potentially exposing the database password in log files. Quartz's default logging can output configuration details.

Impact: Sensitive credentials could be exposed in application logs.

Suggested fix: Consider using Quartz's ConnectionProvider interface to manage database connections programmatically, or verify that Quartz's logging is configured to redact sensitive properties. Alternatively, use an encrypted password or reference the datasource from a JNDI name.

Edge Case: Unreachable code pattern in loadInitialOffset

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java:155-159
In loadInitialOffset, after the null check for dbOffset, there's a fallback that calls getStartingOffset() again:

// Should not reach here - getStartingOffset always returns a valid offset
LOG.warn("No offset found for subscription {}, using default", eventSubscription.getId());
return getStartingOffset(eventSubscription.getId());

If getStartingOffset() returned null once, calling it again will return null again. This creates an infinite logical loop where the code either never reaches this point (as the comment suggests) or returns null again.

Suggested fix: Either remove this unreachable code block or return a sensible default offset:

LOG.warn("No offset found for subscription {}, creating default", eventSubscription.getId());
return new EventSubscriptionOffset().withCurrentOffset(0L).withStartingOffset(0L);
Code Quality: Silent failure in JSON deserialization without validation

📄 openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java:478-486
In convertToSubscriptionStatus, when JSON deserialization fails, the method logs an error and returns null. This silently swallows the issue:

try {
  String json = JsonUtils.pojoToJson(status);
  return JsonUtils.readValue(json, SubscriptionStatus.class);
} catch (Exception e) {
  LOG.error("Failed to convert status to SubscriptionStatus: {}", status, e);
  return null;
}

While returning null is acceptable, consider also including what type the status object actually was to help with debugging:

LOG.error("Failed to convert status (type: {}) to SubscriptionStatus: {}", 
    status.getClass().getName(), status, e);

This would help diagnose issues when unexpected types are stored in statusDetails.

Bug: Potential NPE when eventSubscription is null in loadInitialOffset

📄 openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java:140-159
In AbstractEventConsumer.loadInitialOffset(), if eventSubscription is null (which can happen if deserialization fails), the code will throw a NullPointerException when calling eventSubscription.getId().

Current code flow:

  1. init() calls loadInitialOffset(context)
  2. loadInitialOffset() calls getStartingOffset(eventSubscription.getId()) on line 58
  3. But eventSubscription could be null if deserialization failed

While there's a null check for eventSubscription later in execute(), the init() method calls loadInitialOffset() before the caller can check the null status.

Suggested fix:
Either:

  1. Check for null eventSubscription in init() before calling loadInitialOffset(), or
  2. Guard the access in loadInitialOffset():
if (eventSubscription == null) {
  LOG.error("Cannot load offset - eventSubscription is null");
  return new EventSubscriptionOffset().withCurrentOffset(0L).withStartingOffset(0L);
}
Bug: Database driver class check incorrectly compares driver with connection type label

📄 openmetadata-service/src/main/java/org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.java:156-165
The code compares config.getDataSourceFactory().getDriverClass() with ConnectionType.MYSQL.label, but ConnectionType.label typically contains a label like "mysql" while getDriverClass() returns a fully qualified JDBC driver class name like com.mysql.cj.jdbc.Driver.

This comparison will likely never match, causing MySQL deployments to incorrectly use PostgreSQLDelegate instead of StdJDBCDelegate.

Impact: MySQL users may experience issues with the Quartz scheduler as the wrong JDBC delegate is used.

Suggested fix:

// Check the driver class name for MySQL patterns
String driverClass = config.getDataSourceFactory().getDriverClass();
if (driverClass != null && driverClass.toLowerCase().contains("mysql")) {
    properties.put(
        "org.quartz.jobStore.driverDelegateClass",
        "org.quartz.impl.jdbcjobstore.StdJDBCDelegate");
} else {
    properties.put(
        "org.quartz.jobStore.driverDelegateClass",
        "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate");
}

Alternatively, check the URL pattern or use a more reliable method to determine the database type.

...and 1 more from earlier reviews

What Works Well

  • Proper handling of JSON deserialization for clustered JobStore scenarios with type checking for both String and native object types
  • Moving scheduler initialization from static block to constructor improves testability and configuration flexibility
  • Comprehensive test coverage for clustering behavior with EventSubscriptionSchedulerClusteringTest
  • Better error handling and logging throughout the initialization flow

Recommendations

  • Important: Ensure consistent serialization - commit() and addEventSubscriptionJobDataMap() should serialize EventSubscription to JSON string to match the load expectations and enable future JDBC JobStore support
  • The redundant getStartingOffset() call when offset is null should create a proper default instead
  • Consider using an explicit initialization failure flag rather than relying on null checks across multiple fields

Tip

Comment Gitar fix CI or enable auto-apply: gitar auto-apply:on

Options

Auto-apply is off Gitar will not commit updates to this branch.
✅ Code review is on Gitar will review this change.
Display: compact Hiding non-applicable rules.

Comment with these commands to change:

Auto-apply ✅ Code review Compact
gitar auto-apply:on         
gitar code-review:off         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | This comment will update automatically (Docs)

@sonarqubecloud
Copy link
Copy Markdown

@harshach harshach merged commit 8c855c6 into main Dec 19, 2025
24 of 26 checks passed
@harshach harshach deleted the issue-23765 branch December 19, 2025 23:36
ShaileshParmar11 pushed a commit that referenced this pull request Dec 26, 2025
* Fix #23765: Duplicate Notifications in Multi-Replica Deployment

* Fix  tests

* Fix  tests in EventSubscription

* Fix  tests in EventSubscription
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Duplicate Notifications in Multi-Replica Deployment

3 participants