Skip to content

Conversation

@turip
Copy link
Member

@turip turip commented Nov 13, 2025

Overview

This patch makes sure that for each incoming event balance worker yields another event, that is used to process the entitlement recalculation. This is useful as going forward we will not need redis based caches as the entitlements end up on the same worker instance.

Summary by CodeRabbit

  • New Features

    • Introduced comprehensive operation type tracking supporting entitlements, grants, and ingestion workflows
    • Enhanced event system with richer metadata including operation source and raw ingested event data
    • Improved event versioning to support evolving formats and operation types
  • Refactor

    • Refactored internal event handling to use standardized event publishing and processing patterns

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Nov 13, 2025

📝 Walkthrough

Walkthrough

This PR refactors the balance worker's event handling by introducing a new OperationType enum and extending the RecalculateEvent structure. It replaces direct calls to handleEntitlementEvent across all event handlers with publishing standardized RecalculateEvent messages via the EventBus, bumping the event version from v1 to v2 and adding fields for source operation tracking and raw ingested events.

Changes

Cohort / File(s) Summary
Event type definition
openmeter/entitlement/balanceworker/events/recalculate.go
Introduces OperationType enum with 7 operation kinds (entitlement/grant creation/deletion, void, metered entitlement reset, ingest). Adds Validate() and Values() methods. Extends RecalculateEvent with SourceOperation, RawIngestedEvents, and renames OriginalSourceEvent to OriginalEventSource. Bumps event version to v2.
Event publishing layer
openmeter/entitlement/balanceworker/ingesthandler.go, openmeter/entitlement/balanceworker/worker.go
Refactors all event handlers to publish RecalculateEvent via EventBus instead of calling handleEntitlementEvent directly. Updates handlers for entitlement creation/deletion, grant operations, metered entitlement resets, and batch ingestion to construct and publish events with appropriate operation types and metadata. Adds mapping logic to translate source operations for downstream snapshot handling.

Sequence Diagram

sequenceDiagram
    participant EventSource as Event Handler
    participant EventBus as EventBus
    participant Worker as Balance Worker
    participant Handler as handleEntitlementEvent

    rect rgb(200, 220, 240)
    Note over EventSource,Handler: Previous Flow
    EventSource->>Handler: Call directly with options
    Handler->>Handler: Process event
    end

    rect rgb(220, 240, 200)
    Note over EventSource,Handler: New Flow (This PR)
    EventSource->>EventBus: Publish RecalculateEvent<br/>(SourceOperation, OriginalEventSource, etc.)
    EventBus->>Worker: Consume RecalculateEvent
    Worker->>Handler: Call with mapped operation<br/>& options from event
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Areas requiring extra attention:

  • Event version bump (v1 → v2): Verify backward compatibility handling and any serialization/deserialization logic
  • Field rename (OriginalSourceEventOriginalEventSource): Check all usages throughout the codebase to ensure no references to the old field name remain
  • Operation type mapping: Review the logic in worker.go that translates SourceOperation to snapshot operations (especially the conditional mapping for different operation kinds)
  • Error handling changes: The new pattern returns on publish failure rather than aggregating errors—verify this aligns with the intended failure semantics
  • RawIngestedEvents handling: Ensure the raw event payload is correctly populated and passed through the system

Possibly related PRs

Suggested labels

area/balance-worker, area/entitlements

Suggested reviewers

  • chrisgacsal
  • tothandras

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Title check ⚠️ Warning The title 'refactor: balance worker partition affinity' doesn't match the actual changes, which involve refactoring event handling to publish RecalculateEvents rather than partition affinity improvements. Update the title to reflect the actual changes, such as: 'refactor: balance worker event handling to publish recalculate events' or similar to accurately describe the recalculation event-based refactoring.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor/balance-worker-events

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@turip turip changed the title refactor: balance worker should use a single event type for recalcula… refactor: balance worker partition affinity Nov 14, 2025
@turip turip added the release-note/misc Miscellaneous changes label Nov 14, 2025
…tions

This allows us to not to use redis for any caches in balance worker.
@turip turip force-pushed the refactor/balance-worker-events branch from d21436b to 946813e Compare November 14, 2025 09:14
@turip turip marked this pull request as ready for review November 14, 2025 09:14
@turip turip requested a review from a team as a code owner November 14, 2025 09:14
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
openmeter/entitlement/balanceworker/ingesthandler.go (1)

54-61: RecalculateEvent publish path looks good; consider cleaning up error handling and double‑check AsOf choice

Nice move pushing ingest handling through events.RecalculateEvent – the payload looks consistent with the struct (entitlement ID, original source, SourceOperationIngest, and RawIngestedEvents).

Two small follow‑ups worth considering:

  • You now return on the first Publish error, but the errs slice and errors.Join at the end are never populated anymore. Either:
    • drop errs and the final Join entirely, or
    • if you still want partial progress, keep aggregating per‑entitlement publish errors and only return after processing all entitlements.
  • AsOf is currently event.StoredAt. If the downstream snapshot logic expects “when the underlying usage happened” rather than “when we stored the batch”, you might want to feed it something derived from eventTimestamps (e.g. the max timestamp) instead. If “storage time” is the intended semantics, then this is fine as‑is.

I’d lean toward cleaning up the unused errs logic so the function’s intent is obvious at a glance.

Also applies to: 62-63

openmeter/entitlement/balanceworker/worker.go (2)

17-17: Nice unification of event handling via RecalculateEvent

This refactor to have all the entitlement/grant/reset paths publish a single events.RecalculateEvent shape looks solid:

  • Entitlement identity is consistently pkgmodels.NamespacedID{Namespace: ..., ID: ...} derived from the event’s namespace + owner/entitlement ID.
  • OriginalEventSource uses metadata.ComposeResourcePath with appropriate entities and IDs so you can still trace back to the concrete entitlement/grant event.
  • AsOf is taken from CreatedAt / DeletedAt / UpdatedAt / ResetAt, which lines up with how you’d expect the snapshot to be cut.
  • SourceOperation is set explicitly for each case, which lets the downstream handler switch cleanly on intent.

If you find yourself adding more producers later, it might be worth a tiny helper like newRecalculateEvent(entitlementID, originalSource string, asOf time.Time, op events.OperationType) to avoid repeating the struct literal boilerplate, but that’s purely a readability nit.

Also applies to: 211-218, 223-229, 234-240, 245-251, 256-262, 267-273, 278-284


302-315: RecalculateEvent consumer and snapshot operation mapping look correct

The new handler for events.RecalculateEvent looks coherent with the rest of the design:

  • Defaulting snapshotOperation to snapshot.ValueOperationUpdate and only switching to Reset for OperationTypeMeteredEntitlementReset and Delete for OperationTypeEntitlementDeleted matches how those upstream events are modeled.
  • Passing OriginalEventSource and AsOf into WithSource / WithEventAt preserves the original context all the way into handleEntitlementEvent.
  • Conditionally adding WithRawIngestedEvents keeps ingest‑specific payloads attached without impacting other operation types.
  • Wrapping handleEntitlementEvent in EventBus.WithContext(ctx).PublishIfNoError(...) is consistent with the publishing pattern elsewhere and avoids extra ceremony here.

Only tiny thought: if RawIngestedEvents slices can be large, keep an eye on allocations as they flow through the pipeline, but that’s more of an operational concern than a blocker.

Also applies to: 317-319, 321-327

openmeter/entitlement/balanceworker/events/recalculate.go (1)

6-6: OperationType enum + validation are clear; possible micro‑tweak

Defining OperationType as a string with explicit constants and a Validate method is a nice way to keep the event schema self‑describing. Using slices.Contains(o.Values(), o) is totally fine for this scale.

If you ever care about shaving a few allocations here (since Values() builds a slice on each call), you could switch Validate to a simple switch over the known constants and drop the slices import. Not urgent, just a small optimization knob if validation ends up in a hot path.

Also applies to: 10-10, 20-49

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f17f5ba and 946813e.

📒 Files selected for processing (3)
  • openmeter/entitlement/balanceworker/events/recalculate.go (4 hunks)
  • openmeter/entitlement/balanceworker/ingesthandler.go (2 hunks)
  • openmeter/entitlement/balanceworker/worker.go (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.go

⚙️ CodeRabbit configuration file

**/*.go: In general when reviewing the Golang code make readability and maintainability a priority, even potentially suggest restructuring the code to improve them.

Performance should be a priority in critical code paths. Anything related to event ingestion, message processing, database operations (regardless of database) should be vetted for potential performance bottlenecks.

Files:

  • openmeter/entitlement/balanceworker/ingesthandler.go
  • openmeter/entitlement/balanceworker/events/recalculate.go
  • openmeter/entitlement/balanceworker/worker.go
🧠 Learnings (1)
📚 Learning: 2025-03-07T12:17:43.129Z
Learnt from: GAlexIHU
Repo: openmeterio/openmeter PR: 2383
File: openmeter/entitlement/metered/lateevents_test.go:37-45
Timestamp: 2025-03-07T12:17:43.129Z
Learning: In the OpenMeter codebase, test files like `openmeter/entitlement/metered/lateevents_test.go` may use variables like `meterSlug` and `namespace` without explicit declarations visible in the same file. This appears to be an accepted pattern in their test structure.

Applied to files:

  • openmeter/entitlement/balanceworker/ingesthandler.go
🧬 Code graph analysis (3)
openmeter/entitlement/balanceworker/ingesthandler.go (3)
openmeter/entitlement/balanceworker/events/recalculate.go (2)
  • RecalculateEvent (63-69)
  • OperationTypeIngest (29-29)
pkg/models/id.go (1)
  • NamespacedID (7-10)
openmeter/event/metadata/resourcepath.go (2)
  • ComposeResourcePath (29-31)
  • EntityEvent (26-26)
openmeter/entitlement/balanceworker/events/recalculate.go (3)
pkg/models/validator.go (1)
  • Validate (16-26)
pkg/models/id.go (1)
  • NamespacedID (7-10)
openmeter/ingest/kafkaingest/serializer/serializer.go (1)
  • CloudEventsKafkaPayload (20-28)
openmeter/entitlement/balanceworker/worker.go (7)
openmeter/entitlement/balanceworker/events/recalculate.go (6)
  • RecalculateEvent (63-69)
  • OperationTypeEntitlementCreated (23-23)
  • OperationTypeEntitlementDeleted (24-24)
  • OperationTypeGrantCreated (25-25)
  • OperationTypeGrantVoided (27-27)
  • OperationTypeMeteredEntitlementReset (28-28)
pkg/models/id.go (1)
  • NamespacedID (7-10)
openmeter/event/metadata/resourcepath.go (3)
  • ComposeResourcePath (29-31)
  • EntityEntitlement (10-10)
  • EntityGrant (16-16)
pkg/models/model.go (1)
  • ManagedModel (119-125)
openmeter/entitlement/metered/events.go (1)
  • EntitlementResetEvent (17-24)
openmeter/entitlement/snapshot/event.go (3)
  • ValueOperationUpdate (23-23)
  • ValueOperationReset (22-22)
  • ValueOperationDelete (24-24)
openmeter/entitlement/balanceworker/entitlementhandler.go (4)
  • WithSource (49-53)
  • WithEventAt (55-59)
  • WithSourceOperation (61-65)
  • WithRawIngestedEvents (67-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: Artifacts / Container image
  • GitHub Check: Artifacts / Benthos Collector Container image
  • GitHub Check: Lint
  • GitHub Check: Code Generators
  • GitHub Check: Migration Checks
  • GitHub Check: Build
  • GitHub Check: Test
  • GitHub Check: Secret Scanning
  • GitHub Check: Scan GitHub Workflows
  • GitHub Check: Repository Scan
  • GitHub Check: SAST (semgrep)
  • GitHub Check: Analyze (go)
🔇 Additional comments (2)
openmeter/entitlement/balanceworker/ingesthandler.go (1)

12-12: Importing balanceworker events looks good

Pulling in the typed events package here fits the new RecalculateEvent flow and keeps the handler nicely aligned with the worker logic. No issues from my side.

openmeter/entitlement/balanceworker/events/recalculate.go (1)

57-57: RecalculateEvent v2 shape and validation look consistent with usage

The v2 RecalculateEvent struct and its helpers line up well with how the worker is using it:

  • Entitlement + AsOf carry the core recalculation coordinates.
  • OriginalEventSource is now the source for EventMetadata.Source, while Subject continues to be the entitlement resource path, giving you both “where it came from” and “what it’s about”.
  • SourceOperation and RawIngestedEvents give the consumer enough context to decide how to treat the recalculation (update/reset/delete / ingest details).
  • Validate() now requires a non‑zero SourceOperation, which should help catch mis‑constructed events early.

The version bump to "v2" makes sense given the schema change; just make sure your producers/consumers are all aligned on the new version and that any legacy v1 messages in the wild are either drained or handled gracefully.

Also applies to: 64-68, 76-79, 93-95

@turip turip requested a review from chrisgacsal November 14, 2025 09:35
@turip turip merged commit 2620162 into main Nov 14, 2025
29 checks passed
@turip turip deleted the refactor/balance-worker-events branch November 14, 2025 09:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-note/misc Miscellaneous changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants