Skip to content

Conversation

@hekike
Copy link
Contributor

@hekike hekike commented Sep 17, 2025

  • Make streaming list subjects generic.
  • List customer usage attributions
  • Fix paginate with select

Summary by CodeRabbit

  • New Features

    • Flexible subject listing: filter by namespace, optional meter, and optional time range.
    • Customer usage attribution list with pagination, namespace filtering, and optional customer ID filtering.
  • Bug Fixes

    • Stricter validation for subject queries (namespace required, meter key if provided) and time-range checks.
    • More reliable pagination count queries (clear selected fields for count).
  • Chores

    • Dependency declaration adjusted to ensure stable builds.
  • Tests

    • Added compile-time interface assertion and updated tests for API changes; added tests for customer usage attributions.

@hekike hekike requested a review from a team as a code owner September 17, 2025 16:41
@hekike hekike added area/processor release-note/ignore Ignore this change when generating release notes labels Sep 17, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 17, 2025

📝 Walkthrough

Walkthrough

Replaces ListMeterSubjects with ListSubjects across API, implementations, handlers, tests, and mocks; introduces ListSubjectsParams (Namespace, optional *Meter, optional From/To) with validation; updates ClickHouse query types/SQL and adds columnFactory; adds customer usage-attribution APIs, adapter/service/tests; minor go.mod dependency change.

Changes

Cohort / File(s) Summary
Streaming API & params
openmeter/streaming/connector.go
Replace ListMeterSubjects with ListSubjects(ctx, ListSubjectsParams). Add ListSubjectsParams { Namespace string; Meter *meter.Meter; From *time.Time; To *time.Time } and stronger Validate(); remove ListMeterSubjectsParams.
ClickHouse connector & query logic
openmeter/streaming/clickhouse/connector.go, openmeter/streaming/clickhouse/meter_query.go, openmeter/streaming/clickhouse/meter_query_test.go, openmeter/streaming/clickhouse/queryhelper.go
Implement public ListSubjects and internal listSubjects/listSubjectsQuery (Meter now *Meter), make type filter conditional, add From/To filters, rename types/tests, adjust error messages, add columnFactory helper.
Streaming mocks & server tests
openmeter/streaming/testutils/streaming.go, openmeter/server/server_test.go
Update mock method signature to ListSubjects; add compile-time var _ streaming.Connector = (*MockStreamingConnector)(nil) assertion; add noop ListCustomerUsageAttributions on NoopCustomerService.
HTTP handler call site
openmeter/meter/httphandler/query.go
Update call to use h.streaming.ListSubjects(ctx, streaming.ListSubjectsParams{ Namespace: request.namespace, Meter: &meter }).
Customer API, service & adapter
openmeter/customer/adapter.go, openmeter/customer/adapter/customer.go, openmeter/customer/customer.go, openmeter/customer/service.go, openmeter/customer/service/customer.go, openmeter/customer/adapter/entitymapping.go
Add ListCustomerUsageAttributionsInput + Validate(); extend CustomerAdapter and CustomerService with ListCustomerUsageAttributions; implement adapter method that paginates and maps DB entities to streaming.CustomerUsageAttribution; factor subject-key extraction into subjectKeysFromDBEntity.
Streaming query params
openmeter/streaming/query_params.go
Add CustomerUsageAttribution.GetValues() []string returning optional Key then SubjectKeys.
Pagination count-query fix / templates
openmeter/ent/db/paginate.go, pkg/framework/entutils/entpaginate/paginate.tpl, pkg/framework/entutils/testutils/ent1/db/paginate.go
Ensure cloned count query unsets selected fields (countQuery.ctx.Fields = []string{}) before Count to avoid projecting fields on count queries.
Tests
test/customer/customer.go, test/customer/customer_test.go
Add TestListCustomerUsageAttributions and wire it into the Customer test suite.
Dependency file
go.mod
Remove // indirect marker for github.com/xdg-go/scram v1.1.2, making it a direct require entry.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The title "feat(streaming): list subjects" is concise, accurately describes the primary change (introducing a generic ListSubjects API and related streaming refactors), and is specific enough for a teammate scanning PR history to understand the main intent.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/streaming-list-subjects

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (10)
openmeter/streaming/clickhouse/meter_query.go (2)

413-438: Honor Meter.EventFrom when From is nil.

QueryMeter uses meter.EventFrom as a floor; listSubjects should mirror that for consistency.

Apply:

 func (d listSubjectsQuery) toSQL() (string, []interface{}) {
@@
-    if d.From != nil {
-        sb.Where(sb.GreaterEqualThan("time", d.From.Unix()))
-    }
+    effectiveFrom := d.From
+    if effectiveFrom == nil && d.Meter != nil && d.Meter.EventFrom != nil {
+        effectiveFrom = d.Meter.EventFrom
+    }
+    if effectiveFrom != nil {
+        sb.Where(sb.GreaterEqualThan("time", effectiveFrom.Unix()))
+    }

413-426: Column qualification consistency.

Elsewhere you qualify with the table alias (om_events.namespace/type/time/subject). Consider doing the same here to avoid future ambiguity if joins/CTEs appear.

openmeter/streaming/testutils/streaming.go (1)

224-226: LGTM: mock updated to new signature.

Returns empty list by default; fine for tests that don’t assert content. Consider allowing injection of subjects later if needed.

openmeter/streaming/clickhouse/connector.go (2)

193-208: Unreachable meter-not-found passthrough.

listSubjects() never maps ClickHouse code 60 to a MeterNotFoundError, so this branch won’t trigger. Either remove it or adjust error mapping below.

-    subjects, err := c.listSubjects(ctx, params)
-    if err != nil {
-        if meterpkg.IsMeterNotFoundError(err) {
-            return nil, err
-        }
-        return nil, fmt.Errorf("list subjects: %w", err)
-    }
+    subjects, err := c.listSubjects(ctx, params)
+    if err != nil {
+        return nil, fmt.Errorf("list subjects: %w", err)
+    }

510-546: Map ClickHouse “code: 60” to NamespaceNotFound for parity with other paths.

Other query methods do this; it helps callers distinguish configuration vs. runtime failures.

Apply:

 func (c *Connector) listSubjects(ctx context.Context, params streaming.ListSubjectsParams) ([]string, error) {
@@
     rows, err := c.config.ClickHouse.Query(ctx, sql, args...)
     if err != nil {
-        return nil, fmt.Errorf("list subjects: %w", err)
+        if strings.Contains(err.Error(), "code: 60") {
+            return nil, models.NewNamespaceNotFoundError(params.Namespace)
+        }
+        return nil, fmt.Errorf("list subjects: %w", err)
     }
openmeter/streaming/connector.go (5)

44-46: Clarify ListSubjects doc and align with filter semantics

Doc should mention namespace requirement and optional meter/time filters. This also brings it in line with the V2-style param object.

Apply this diff to tighten the comment:

-	// ListSubjects lists the subjects that have events in the database
+	// ListSubjects lists unique subjects that have events in the given namespace,
+	// optionally filtered by meter and time range.

59-61: Fix stale comment above Validate

Comment still says “list meters”. Update for accuracy.

Apply this diff:

-// Validate validates the list meters parameters.
+// Validate validates the ListSubjectsParams.
 func (p ListSubjectsParams) Validate() error {

67-72: Improve validation message; check the key, not “meter empty”

Condition checks p.Meter.Key == "", so the error should call out the key specifically.

Apply this diff:

-		if p.Meter.Key == "" {
-			errs = append(errs, errors.New("meter cannot be empty when provided"))
+		if p.Meter.Key == "" {
+			errs = append(errs, errors.New("meter key is required when meter is provided"))
 		}

If you adopt the MeterKey *string refactor, replace this block with:

-	if p.Meter != nil {
-		if p.Meter.Key == "" {
-			errs = append(errs, errors.New("meter key is required when meter is provided"))
-		}
-	}
+	if p.MeterKey != nil && *p.MeterKey == "" {
+		errs = append(errs, errors.New("meterKey cannot be empty when provided"))
+	}

83-88: Return nil directly when no errors

errors.Join(nil...) returns nil, but returning nil is clearer.

Apply this diff:

 	if len(errs) > 0 {
 		return models.NewNillableGenericValidationError(errors.Join(errs...))
 	}
 
-	return errors.Join(errs...)
+	return nil

51-57: Optional: decouple ListSubjectsParams from meter.Meter — invasive change

rg shows ListSubjectsParams.Meter is actively consumed (not just the key) by ClickHouse query builders and many callers; changing to a MeterKey *string will require updates to connector query code, validation, mocks and handlers.

Key locations to change:

  • openmeter/streaming/clickhouse/meter_query.go (uses d.Meter.EventType/Aggregation/ValueProperty/GroupBy/EventFrom)
  • openmeter/streaming/clickhouse/connector.go (ListSubjects / listSubjects)
  • openmeter/meter/httphandler/query.go (passes &meter)
  • openmeter/streaming/testutils/streaming.go (mocks)
  • openmeter/streaming/connector.go (Validate checks p.Meter.Key)
  • tests referencing ListSubjectsParams / clickhouse meter query

If you still want this refactor, update the struct and callers (validation, query builders, mocks). Suggested struct change (apply and then fix usages):

 type ListSubjectsParams struct {
 	Namespace string
-	Meter     *meter.Meter
+	// Optional: filter by meter key (slug). Nil means all meters.
+	MeterKey  *string
 	From      *time.Time
 	To        *time.Time
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7e18ee1 and 440e3f9.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (9)
  • go.mod (1 hunks)
  • openmeter/meter/httphandler/query.go (1 hunks)
  • openmeter/server/server_test.go (2 hunks)
  • openmeter/streaming/clickhouse/connector.go (2 hunks)
  • openmeter/streaming/clickhouse/meter_query.go (1 hunks)
  • openmeter/streaming/clickhouse/meter_query_test.go (3 hunks)
  • openmeter/streaming/clickhouse/queryhelper.go (1 hunks)
  • openmeter/streaming/connector.go (1 hunks)
  • openmeter/streaming/testutils/streaming.go (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-03-07T12:17:43.129Z
Learnt from: GAlexIHU
PR: openmeterio/openmeter#2383
File: openmeter/entitlement/metered/lateevents_test.go:37-45
Timestamp: 2025-03-07T12:17:43.129Z
Learning: In the OpenMeter codebase, test files like `openmeter/entitlement/metered/lateevents_test.go` may use variables like `meterSlug` and `namespace` without explicit declarations visible in the same file. This appears to be an accepted pattern in their test structure.

Applied to files:

  • openmeter/streaming/clickhouse/meter_query_test.go
🧬 Code graph analysis (7)
openmeter/streaming/testutils/streaming.go (3)
openmeter/server/server_test.go (1)
  • MockStreamingConnector (594-594)
openmeter/meter/httphandler/query.go (1)
  • ListSubjectsParams (20-22)
openmeter/streaming/connector.go (1)
  • ListSubjectsParams (52-57)
openmeter/meter/httphandler/query.go (2)
openmeter/streaming/connector.go (1)
  • ListSubjectsParams (52-57)
openmeter/meter/meter.go (1)
  • Meter (139-148)
openmeter/server/server_test.go (2)
openmeter/streaming/connector.go (2)
  • Connector (38-49)
  • ListSubjectsParams (52-57)
openmeter/streaming/testutils/streaming.go (1)
  • MockStreamingConnector (32-35)
openmeter/streaming/clickhouse/meter_query.go (2)
api/api.gen.go (1)
  • Meter (4766-4815)
openmeter/meter/meter.go (1)
  • Meter (139-148)
openmeter/streaming/clickhouse/connector.go (2)
openmeter/streaming/connector.go (2)
  • Connector (38-49)
  • ListSubjectsParams (52-57)
openmeter/meter/errors.go (1)
  • IsMeterNotFoundError (37-45)
openmeter/streaming/clickhouse/meter_query_test.go (1)
openmeter/meter/meter.go (1)
  • Meter (139-148)
openmeter/streaming/connector.go (2)
openmeter/meter/httphandler/query.go (1)
  • ListSubjectsParams (20-22)
openmeter/meter/meter.go (1)
  • Meter (139-148)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Artifacts / Container image
  • GitHub Check: Code Generators
  • GitHub Check: Lint
  • GitHub Check: Test
  • GitHub Check: Build
  • GitHub Check: Analyze (go)
🔇 Additional comments (7)
openmeter/streaming/clickhouse/queryhelper.go (1)

89-93: LGTM: simple, focused column qualifier.

Closure-based aliasing is clear and fits existing usage.

openmeter/streaming/clickhouse/meter_query.go (1)

404-411: Pointer Meter in listSubjectsQuery is a good API for optional filtering.

Allows listing across all event types when nil.

openmeter/meter/httphandler/query.go (1)

52-55: LGTM: migrated to new ListSubjects API correctly.

Passing Namespace and &meter matches the new params object.

openmeter/server/server_test.go (2)

591-593: Nice: compile-time interface assertion.

Ensures MockStreamingConnector tracks interface changes.


650-652: LGTM: mock ListSubjects behavior matches test expectation.

Fixed return ["s1"] lines up with the “list meter subjects” route test.

openmeter/streaming/clickhouse/meter_query_test.go (1)

442-507: Add a nil-meter case to lock in optional filtering behavior.

Current tests always set Meter. Add one where Meter is nil to assert no type filter appears.

Proposed case (pattern only):

{
  query: listSubjectsQuery{
    Database: "openmeter", EventsTableName: "om_events",
    Namespace: "my_namespace",
    Meter: nil,
  },
  wantSQL:  "SELECT DISTINCT subject FROM openmeter.om_events WHERE namespace = ? ORDER BY subject",
  wantArgs: []interface{}{"my_namespace"},
},
go.mod (1)

484-484: Keep github.com/xdg-go/scram as a direct dependency — it's imported.

Import found in openmeter/watermill/driver/kafka/saslscram.go:10; go list shows the package in the dependency graph and go mod tidy made no changes.

@hekike hekike enabled auto-merge (squash) September 17, 2025 17:02
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
openmeter/streaming/query_params.go (2)

79-90: GetValues is correct; consider tiny alloc tweak.

Pre-size the slice to avoid a small reallocation.

 func (ua CustomerUsageAttribution) GetValues() []string {
-	attributions := []string{}
-
-	if ua.Key != nil {
-		attributions = append(attributions, *ua.Key)
-	}
-
-	attributions = append(attributions, ua.SubjectKeys...)
-
-	return attributions
+	capHint := len(ua.SubjectKeys)
+	if ua.Key != nil {
+		capHint++
+	}
+	attributions := make([]string, 0, capHint)
+	if ua.Key != nil {
+		attributions = append(attributions, *ua.Key)
+	}
+	return append(attributions, ua.SubjectKeys...)
 }

92-94: Fix pointer comparison in Equal (compare values, not addresses).

Current code uses pointer identity; two equal string values via different pointers will be “not equal.”

 func (ua CustomerUsageAttribution) Equal(other CustomerUsageAttribution) bool {
-	return ua.ID == other.ID && ua.Key == other.Key && slices.Equal(ua.SubjectKeys, other.SubjectKeys)
+	keysEqual := (ua.Key == nil && other.Key == nil) ||
+		(ua.Key != nil && other.Key != nil && *ua.Key == *other.Key)
+	return ua.ID == other.ID && keysEqual && slices.Equal(ua.SubjectKeys, other.SubjectKeys)
 }
openmeter/customer/customer.go (1)

214-231: Input type and validation LGTM; consider paging guardrails.

Namespace check is good. Optionally, bound Page.PageSize to a sane max to protect adapters.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 440e3f9 and 9fc8195.

📒 Files selected for processing (7)
  • openmeter/customer/adapter.go (2 hunks)
  • openmeter/customer/adapter/customer.go (2 hunks)
  • openmeter/customer/customer.go (1 hunks)
  • openmeter/customer/service.go (2 hunks)
  • openmeter/customer/service/customer.go (2 hunks)
  • openmeter/server/server_test.go (3 hunks)
  • openmeter/streaming/query_params.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
openmeter/customer/adapter.go (2)
openmeter/customer/customer.go (2)
  • ListCustomerUsageAttributionsInput (216-223)
  • CustomerUsageAttribution (155-157)
openmeter/streaming/query_params.go (1)
  • CustomerUsageAttribution (73-77)
openmeter/customer/service.go (2)
openmeter/customer/customer.go (2)
  • ListCustomerUsageAttributionsInput (216-223)
  • CustomerUsageAttribution (155-157)
openmeter/streaming/query_params.go (1)
  • CustomerUsageAttribution (73-77)
openmeter/streaming/query_params.go (5)
openmeter/customer/customer.go (1)
  • CustomerUsageAttribution (155-157)
openmeter/billing/invoice.go (1)
  • CustomerUsageAttribution (655-655)
api/client/javascript/src/client/schemas.ts (1)
  • CustomerUsageAttribution (11226-11227)
api/api.gen.go (1)
  • CustomerUsageAttribution (2373-2376)
api/client/go/client.gen.go (1)
  • CustomerUsageAttribution (2203-2206)
openmeter/customer/service/customer.go (4)
openmeter/customer/service.go (1)
  • Service (11-16)
openmeter/customer/service/service.go (1)
  • Service (13-19)
openmeter/customer/customer.go (2)
  • ListCustomerUsageAttributionsInput (216-223)
  • CustomerUsageAttribution (155-157)
openmeter/streaming/query_params.go (1)
  • CustomerUsageAttribution (73-77)
openmeter/customer/adapter/customer.go (5)
openmeter/customer/customer.go (3)
  • ListCustomerUsageAttributionsInput (216-223)
  • CustomerUsageAttribution (155-157)
  • Customer (18-30)
openmeter/streaming/query_params.go (2)
  • CustomerUsageAttribution (73-77)
  • Customer (68-70)
pkg/framework/entutils/transaction.go (1)
  • TransactingRepo (199-221)
pkg/clock/clock.go (1)
  • Now (14-21)
openmeter/customer/adapter/entitymapping.go (1)
  • CustomerFromDBEntity (13-98)
openmeter/server/server_test.go (4)
openmeter/streaming/connector.go (2)
  • Connector (38-49)
  • ListSubjectsParams (52-57)
openmeter/streaming/testutils/streaming.go (1)
  • MockStreamingConnector (32-35)
openmeter/customer/customer.go (2)
  • ListCustomerUsageAttributionsInput (216-223)
  • CustomerUsageAttribution (155-157)
openmeter/streaming/query_params.go (1)
  • CustomerUsageAttribution (73-77)
openmeter/customer/customer.go (4)
pkg/pagination/page.go (1)
  • Page (14-17)
openmeter/customer/defaults.go (1)
  • IncludeDeleted (4-4)
pkg/models/validator.go (1)
  • Validate (16-26)
pkg/models/errors.go (1)
  • NewGenericValidationError (138-140)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Test
  • GitHub Check: Artifacts / Container image
  • GitHub Check: Code Generators
  • GitHub Check: Lint
  • GitHub Check: Build
  • GitHub Check: Analyze (go)
🔇 Additional comments (6)
openmeter/customer/adapter.go (1)

19-19: Interface extension LGTM.

New ListCustomerUsageAttributions method signature aligns with service/impl changes.

openmeter/customer/service.go (1)

24-24: Service interface extension LGTM.

Matches adapter and implementation; type usage is consistent.

openmeter/customer/service/customer.go (1)

21-24: Straight delegation LGTM.

Input is validated in adapter; mirrors ListCustomers pattern.

openmeter/server/server_test.go (3)

592-592: Interface conformance check LGTM.

Compile-time assertion is helpful during API evolution.


650-653: Updated mock to ListSubjects LGTM.

Signature matches streaming.Connector.


1039-1042: NoopCustomerService addition LGTM.

Keeps tests compiling with new service API.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
openmeter/customer/adapter/customer.go (1)

143-147: Resolved: root SELECT no longer references edge-table columns.

This fixes the previously flagged invalid SELECT from the customersubjects table in the root query. Good change.

🧹 Nitpick comments (1)
openmeter/customer/adapter/customer.go (1)

175-190: Avoid full customer mapping; build attribution directly for lower latency.

You only need ID, Key, SubjectKeys. Skip CustomerFromDBEntity and construct streaming.CustomerUsageAttribution from the loaded edge to reduce work and remove the need to load subscriptions.

Apply:

-        cust, err := CustomerFromDBEntity(*item)
-        if err != nil {
-            return response, fmt.Errorf("failed to convert customer: %w", err)
-        }
-        if cust == nil {
-            return response, fmt.Errorf("invalid query result: nil customer received")
-        }
-
-        result = append(result, cust.GetUsageAttribution())
+        subjects, err := item.Edges.SubjectsOrErr()
+        if err != nil {
+            return response, fmt.Errorf("subjects not loaded for customer %s: %w", item.ID, err)
+        }
+        subjectKeys := lo.FilterMap(subjects, func(s *entdb.CustomerSubjects, _ int) (string, bool) {
+            if s == nil {
+                return "", false
+            }
+            return s.SubjectKey, true
+        })
+        var keyPtr *string
+        if item.Key != "" {
+            keyPtr = &item.Key
+        }
+        result = append(result, streaming.CustomerUsageAttribution{
+            ID:          item.ID,
+            Key:         keyPtr,
+            SubjectKeys: subjectKeys,
+        })
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9fc8195 and 3d6b0d6.

📒 Files selected for processing (1)
  • openmeter/customer/adapter/customer.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
openmeter/customer/adapter/customer.go (4)
openmeter/customer/customer.go (3)
  • ListCustomerUsageAttributionsInput (216-223)
  • CustomerUsageAttribution (155-157)
  • Customer (18-30)
openmeter/streaming/query_params.go (2)
  • CustomerUsageAttribution (73-77)
  • Customer (68-70)
pkg/framework/entutils/transaction.go (1)
  • TransactingRepo (199-221)
openmeter/customer/adapter/entitymapping.go (1)
  • CustomerFromDBEntity (13-98)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: Quickstart
  • GitHub Check: E2E
  • GitHub Check: Lint
  • GitHub Check: Build
  • GitHub Check: Test
  • GitHub Check: Code Generators
  • GitHub Check: Analyze (go)
🔇 Additional comments (1)
openmeter/customer/adapter/customer.go (1)

19-19: Import looks correct.

Needed for the new return type.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
openmeter/customer/adapter/customer.go (1)

141-149: Root SELECT is minimal and avoids edge columns — good fix.

Selecting only Customer.ID and Customer.Key in the root query prevents invalid references to edge tables and reduces I/O.

🧹 Nitpick comments (3)
openmeter/customer/adapter/entitymapping.go (1)

88-107: Pass Customer by pointer to avoid copying; update call sites.

The ent Customer struct is non-trivial; pass a *db.Customer to avoid copying and to align with other helpers operating on ent entities.

Apply these diffs:

- func subjectKeysFromDBEntity(customerEntity db.Customer) ([]string, error) {
+ func subjectKeysFromDBEntity(customerEntity *db.Customer) ([]string, error) {

And update callers:

- subjectKeys, err := subjectKeysFromDBEntity(e)
+ subjectKeys, err := subjectKeysFromDBEntity(&e)

(See also customer.go Line 181 below.)

openmeter/customer/adapter/customer.go (2)

156-162: Use GT instead of GTE for deleted filter for consistency and intent.

“Do not return deleted customers” should exclude rows with DeletedAt == now. Other places (e.g., GetCustomerByUsageAttribution) use GT. Align semantics.

-            query = query.Where(customerdb.Or(
-                customerdb.DeletedAtIsNil(),
-                customerdb.DeletedAtGTE(now),
-            ))
+            query = query.Where(customerdb.Or(
+                customerdb.DeletedAtIsNil(),
+                customerdb.DeletedAtGT(now),
+            ))

181-184: Wrap edge-extraction error with context.

Adds clarity to logs/metrics without changing control flow.

-            subjectKeys, err := subjectKeysFromDBEntity(*item)
+            subjectKeys, err := subjectKeysFromDBEntity(item)
             if err != nil {
-                return response, err
+                return response, fmt.Errorf("list usage attributions: load subject keys: %w", err)
             }

Note: This also adopts the pointer parameter suggested for subjectKeysFromDBEntity.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3d6b0d6 and 62afd91.

📒 Files selected for processing (2)
  • openmeter/customer/adapter/customer.go (2 hunks)
  • openmeter/customer/adapter/entitymapping.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
openmeter/customer/adapter/entitymapping.go (2)
openmeter/ent/db/customer.go (2)
  • Customer (20-64)
  • Customer (142-157)
openmeter/ent/db/ent.go (1)
  • IsNotLoaded (340-346)
openmeter/customer/adapter/customer.go (6)
openmeter/customer/customer.go (3)
  • ListCustomerUsageAttributionsInput (216-223)
  • CustomerUsageAttribution (155-157)
  • Customer (18-30)
openmeter/streaming/query_params.go (2)
  • CustomerUsageAttribution (73-77)
  • Customer (68-70)
pkg/models/errors.go (1)
  • NewGenericValidationError (138-140)
pkg/framework/entutils/transaction.go (1)
  • TransactingRepo (199-221)
pkg/clock/clock.go (1)
  • Now (14-21)
openmeter/ent/db/customer.go (2)
  • Customer (20-64)
  • Customer (142-157)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Artifacts / Benthos Collector Container image
  • GitHub Check: Artifacts / Container image
  • GitHub Check: Test
  • GitHub Check: Lint
  • GitHub Check: Build
  • GitHub Check: Migration Checks
  • GitHub Check: Code Generators
  • GitHub Check: Analyze (go)
🔇 Additional comments (3)
openmeter/customer/adapter/entitymapping.go (1)

14-17: Good extraction; simpler, single-responsibility mapping.

Delegating subject-key extraction to a helper makes CustomerFromDBEntity cleaner and keeps edge-loading errors centralized.

openmeter/customer/adapter/customer.go (2)

19-19: Import looks correct.

New import is justified by the new return type.


149-151: ```shell
#!/bin/bash
set -euo pipefail

echo "PWD: $(pwd)"
echo "--- WithSubjects call sites ---"
rg -n --type=go -S 'WithSubjects(' -C3 || true

echo
echo "--- WithSubjects definitions ---"
rg -n --type=go -S 'func WithSubjects(' -C3 || true

echo
echo "--- Edges.Subjects usages ---"
rg -n --type=go -S '\bEdges.Subjects\b' -C3 || true

echo
echo "--- CustomerSubjects type / usages ---"
rg -n --type=go -S 'CustomerSubjects' -C3 || true

echo
echo "--- SubjectKey occurrences (field/uses) ---"
rg -n --type=go -S '\bSubjectKey\b' -C3 || true

echo
echo "--- FieldSubjectKey occurrences ---"
rg -n --type=go -S 'FieldSubjectKey' -C3 || true

echo
echo "--- dotted access .SubjectKey ---"
rg -n --type=go -S '.SubjectKey\b' -C3 || true

echo
echo "--- Search for likely other fields on customersubjects (displayName, metadata) ---"
rg -n --type=go -S 'DisplayName|Metadata|Key' -g '!/vendor/' -C3 || true

echo
echo "--- List ent/schema and ent/db files for customersubjects ---"
fd -HI --type f 'customersubjects*' || true


</blockquote></details>

</blockquote></details>

</details>

<!-- This is an auto-generated comment by CodeRabbit for review status -->

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
openmeter/customer/adapter/entitymapping.go (1)

43-47: Avoid retaining the entire db.Customer by taking address of a field

annotations = &e.Annotations takes the address of a field of a copied struct, causing the whole e (including loaded edges) to escape and be retained. Copy the value first to avoid retaining the large struct and to make the intent explicit.

   var annotations *models.Annotations

   if len(e.Annotations) > 0 {
-    annotations = &e.Annotations
+    anns := e.Annotations // copy to avoid retaining the whole db.Customer
+    annotations = &anns
   }
🧹 Nitpick comments (5)
pkg/framework/entutils/entpaginate/paginate.tpl (1)

29-30: Use nil to truly unset field selection

Setting an empty slice still conveys “selection set” in some code paths. Prefer nil to avoid edge cases where non-nil empties are treated as explicit selection.

Apply in the template so it propagates everywhere:

-        // Unset select for count query
-        countQuery.ctx.Fields = []string{}
+        // Unset select for count query
+        countQuery.ctx.Fields = nil

Please confirm the ent version used checks len(fields) > 0 (not fields != nil) before applying projections.

pkg/framework/entutils/testutils/ent1/db/paginate.go (1)

27-29: Mirror template change: set Fields to nil

Keep generated test utils consistent with the template; use nil instead of an empty slice here too. This will be auto-updated once the template is regenerated.

-	// Unset select for count query
-	countQuery.ctx.Fields = []string{}
+	// Unset select for count query
+	countQuery.ctx.Fields = nil
openmeter/ent/db/paginate.go (1)

1-2505: Bulk change looks good; prefer nil over empty slice across this file

The “unset select for count query” change is correct and aligns with unsetting order. For robustness, switch all occurrences of countQuery.ctx.Fields = []string{} to nil (the template change will regenerate this file accordingly).

openmeter/customer/adapter/entitymapping.go (2)

89-111: Deterministic ordering: add dedupe and better error context

  • Nice: preserving NotLoaded semantics and sorting for stable output.
  • Suggest: dedupe after sorting to avoid accidental duplicates (defensive).
  • Suggest: include customer ID in the NotLoaded error to ease debugging.

Apply:

 func subjectKeysFromDBEntity(customerEntity db.Customer) ([]string, error) {
   subjectEntities, err := customerEntity.Edges.SubjectsOrErr()
   if err != nil {
-    if db.IsNotLoaded(err) {
-      return nil, errors.New("subjects must be loaded for customer")
-    }
+    if db.IsNotLoaded(err) {
+      return nil, fmt.Errorf("subjects must be loaded for customer %s", customerEntity.ID)
+    }
     return nil, err
   }
@@
   // Sort the subject keys to make sure the order is consistent
   slices.Sort(subjectKeys)
+  // Remove accidental duplicates
+  subjectKeys = slices.Compact(subjectKeys)
 
   return subjectKeys, nil
 }

Add import:

import "fmt"

89-111: Nit: reduce copying by using pointer receiver

You pass db.Customer by value; for large structs this copies edges and maps. Consider func subjectKeysFromDBEntity(customerEntity *db.Customer) and update the call site accordingly. Not critical, but cleaner.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 62afd91 and a6bd893.

📒 Files selected for processing (6)
  • openmeter/customer/adapter/entitymapping.go (3 hunks)
  • openmeter/ent/db/paginate.go (43 hunks)
  • pkg/framework/entutils/entpaginate/paginate.tpl (1 hunks)
  • pkg/framework/entutils/testutils/ent1/db/paginate.go (1 hunks)
  • test/customer/customer.go (2 hunks)
  • test/customer/customer_test.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
test/customer/customer.go (3)
openmeter/customer/customer.go (5)
  • Customer (18-30)
  • CreateCustomerInput (234-237)
  • CustomerMutate (74-84)
  • CustomerUsageAttribution (155-157)
  • ListCustomerUsageAttributionsInput (216-223)
openmeter/streaming/query_params.go (2)
  • Customer (68-70)
  • CustomerUsageAttribution (73-77)
pkg/pagination/page.go (1)
  • Page (14-17)
openmeter/customer/adapter/entitymapping.go (3)
openmeter/ent/schema/customer.go (10)
  • Customer (17-19)
  • Customer (21-29)
  • Customer (31-39)
  • Customer (41-54)
  • Customer (56-69)
  • CustomerSubjects (72-74)
  • CustomerSubjects (76-80)
  • CustomerSubjects (82-100)
  • CustomerSubjects (102-119)
  • CustomerSubjects (121-138)
openmeter/ent/db/customer.go (2)
  • Customer (20-64)
  • Customer (142-157)
openmeter/ent/db/ent.go (1)
  • IsNotLoaded (340-346)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Artifacts / Container image
  • GitHub Check: Build
  • GitHub Check: Test
  • GitHub Check: Code Generators
  • GitHub Check: Lint
  • GitHub Check: Analyze (go)
🔇 Additional comments (6)
openmeter/customer/adapter/entitymapping.go (2)

15-18: Good extraction for subject-key mapping

Delegating to a focused helper improves readability and reuse. LGTM.


5-5: No change needed — stdlib slices is supported.

go.mod shows go 1.24.1 and toolchain go1.24.2 (both ≥ 1.21), so keep the slices import as-is.

test/customer/customer_test.go (1)

38-40: Nice: wiring the new subtest into the suite

The new usage-attributions list test is properly integrated.

test/customer/customer.go (3)

21-21: Import looks appropriate

streaming is used for the expected item type; OK.


545-605: Subjects are already eager-loaded in ListCustomerUsageAttributions — no action required

ListCustomerUsageAttributions calls WithSubjects(query, now) in openmeter/customer/adapter/customer.go (func starts at line 132), so the mapper's Subjects will be loaded.


545-605: Fix assertion message and guard against ordering flakiness

Message asserts PageNumber == 1 but the error text says “must be 0”; backend ordering isn’t guaranteed — fix the message and make the item comparison deterministic (or use ElementsMatch). File: test/customer/customer.go (lines 545–605)

   require.NoError(t, err, "Listing customer usage attributions must not return error")
   require.Equal(t, 2, list.TotalCount, "Customer usage attributions total count must be 2")
-  require.Equal(t, 1, list.Page.PageNumber, "Customer usage attributions page must be 0")
+  require.Equal(t, 1, list.Page.PageNumber, "Customer usage attributions page must be 1")
   require.Equal(t, createCustomer1.ID, list.Items[0].ID, "Customer 1 must be first in order")
   require.Equal(t, createCustomer2.ID, list.Items[1].ID, "Customer 2 must be second in order")
@@
-  require.Equal(t, expectedItems, list.Items, "Customer usage attributions must match")
+  // Ensure stable comparison regardless of backend default ordering
+  sort.Slice(list.Items, func(i, j int) bool { return list.Items[i].ID < list.Items[j].ID })
+  sort.Slice(expectedItems, func(i, j int) bool { return expectedItems[i].ID < expectedItems[j].ID })
+  require.Equal(t, expectedItems, list.Items, "Customer usage attributions must match")
+  require.Equal(t, page.PageSize, list.Page.PageSize, "Customer usage attributions page size must match")

Also add: import "sort"

@hekike hekike merged commit 861409e into main Sep 17, 2025
21 checks passed
@hekike hekike deleted the feat/streaming-list-subjects branch September 17, 2025 19:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/processor release-note/ignore Ignore this change when generating release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants