Skip to content

Conversation

@kibertoad
Copy link
Owner

@kibertoad kibertoad commented Nov 17, 2025

Summary by CodeRabbit

  • New Features

    • Full Google Cloud Pub/Sub support: publishers, consumers, manager, schema-aware routing, DLQ, deduplication, payload offloading, and resource init utilities.
    • Google Cloud Storage payload store for offloading large message payloads.
    • Local Pub/Sub and GCS emulators added to docker-compose for local testing.
  • Documentation

    • Comprehensive READMEs and usage guides for Pub/Sub adapter and GCS payload store.
  • Improvements

    • Error wrappers now preserve original error causes for better observability.
  • Tests

    • Extensive integration and unit tests covering lifecycle, DLQ, offloading, emulators, and utilities.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Nov 17, 2025

Walkthrough

Adds two Docker emulator services and introduces two new packages: a Google Cloud Pub/Sub adapter (@message-queue-toolkit/gcp-pubsub) and a GCS payload offloading store (@message-queue-toolkit/gcs-payload-store) with implementations, utilities, tests, configs, and documentation.

Changes

Cohort / File(s) Summary
Docker Infrastructure
docker-compose.yml
Added gcs-emulator (fsouza/fake-gcs-server:1.52.3 on port 4443) and pubsub-emulator (google/cloud-sdk:547.0.0-emulators on port 8085) services with startup commands and on-failure restart policies
GCP Pub/Sub — Core Service & Types
packages/gcp-pubsub/lib/pubsub/*
New Pub/Sub foundation: AbstractPubSubService, AbstractPubSubPublisher, AbstractPubSubConsumer, PUBSUB constants and Pub/Sub-specific types (creation/locator/config), lifecycle/init and resource management
GCP Pub/Sub — Error Handling & Fakes
packages/gcp-pubsub/lib/errors/PubSubConsumerErrorResolver.ts, packages/gcp-pubsub/lib/fakes/FakeConsumerErrorResolver.ts
Added PubSubConsumerErrorResolver to normalize consumption errors and FakeConsumerErrorResolver to capture errors in tests
GCP Pub/Sub — Manager & Factory
packages/gcp-pubsub/lib/pubsub/CommonPubSubPublisherFactory.ts, packages/gcp-pubsub/lib/pubsub/PubSubPublisherManager.ts
Added CommonPubSubPublisher and factory, plus PubSubPublisherManager for event→topic mapping, publisher lifecycle and publish orchestration with complex generics
GCP Pub/Sub — Utilities
packages/gcp-pubsub/lib/utils/*
New utilities: Pub/Sub client creator (pubSubUtils), init/delete helpers (pubSubInitter), message reader/deserializer, offloaded-payload helpers, and related helpers
GCP Pub/Sub — Barrel, Package & Configs
packages/gcp-pubsub/lib/index.ts, packages/gcp-pubsub/package.json, packages/gcp-pubsub/tsconfig*.json, packages/gcp-pubsub/vitest.config.ts, packages/gcp-pubsub/README.md
Added package manifest, public exports barrel, tsconfigs/vitest config and README
GCP Pub/Sub — Tests & Test Utilities
packages/gcp-pubsub/test/**
Extensive tests and utilities: DI testContext, test configs, cleanup helpers, FakeLogger, test publishers/consumers, DLQ and payload-offloading tests
GCS Payload Store — Core & Types
packages/gcs-payload-store/lib/GCSPayloadStore.ts, packages/gcs-payload-store/lib/index.ts
New GCSPayloadStore implementation with store/retrieve/delete, resolver for offloading config and related types
GCS Payload Store — Package & Configs
packages/gcs-payload-store/package.json, packages/gcs-payload-store/tsconfig*.json, packages/gcs-payload-store/vitest.config.ts, packages/gcs-payload-store/README.md
Added package manifest, tsconfigs, vitest config and README
GCS Payload Store — Tests & Utils
packages/gcs-payload-store/test/**
Tests for GCSPayloadStore and GCS test utilities (bucket helpers, stream utils, test GCS config)
Other Consumer Error Resolvers
packages/amqp/lib/errors/AmqpConsumerErrorResolver.ts, packages/sns/lib/errors/SnsConsumerErrorResolver.ts, packages/sqs/lib/errors/SqsConsumerErrorResolver.ts
Propagated original errors as cause when wrapping into InternalError in AMQP/SNS/SQS consumer error resolvers
Misc — Types & Utilities
packages/gcp-pubsub/lib/types/MessageTypes.ts, packages/gcp-pubsub/lib/utils/messageUtils.ts, packages/gcp-pubsub/lib/utils/pubSubMessageReader.ts, packages/gcp-pubsub/lib/utils/pubSubMessageDeserializer.ts
Added PubSubMessage type, offloaded-payload attribute helpers, message reader and deserializer functions

Sequence Diagram(s)

sequenceDiagram
    participant App
    participant Manager as PubSubPublisherManager
    participant Factory as CommonPubSubPublisherFactory
    participant Publisher as AbstractPubSubPublisher
    participant PubSub as GooglePubSub

    App->>Manager: publish(topic, message, opts)
    Manager->>Manager: resolveEventTarget & resolveCreationConfig
    alt publisher exists
        Manager->>Publisher: publish(message, opts)
    else
        Manager->>Factory: buildPublisher(deps, options)
        Factory->>Publisher: instantiate
        Publisher->>PubSub: init (ensure topic/subscription)
        Manager->>Publisher: publish(message, opts)
    end
    Publisher->>Publisher: validate schema & deduplication
    alt duplicate
        Publisher-->>App: skip (logged)
    else
        Publisher->>Publisher: apply offload attributes (if large)
        Publisher->>PubSub: send message + attributes
        PubSub-->>Publisher: messageId
        Publisher-->>App: success
    end
Loading
sequenceDiagram
    participant PubSub as GooglePubSub
    participant Consumer as AbstractPubSubConsumer
    participant Resolver as PubSubConsumerErrorResolver
    participant Handler as MessageHandler
    participant GCS as GCSPayloadStore

    PubSub->>Consumer: deliver message
    Consumer->>Consumer: read & deserialize
    alt has offloaded pointer
        Consumer->>GCS: retrievePayload(pointer)
        GCS-->>Consumer: payload stream/data
    end
    Consumer->>Consumer: validate schema
    alt format/validation error
        Consumer->>Resolver: processError(error)
        Resolver-->>Consumer: MessageError
        Consumer->>PubSub: ack or route to DLQ per policy
    else normal
        Consumer->>Consumer: pre-handlers & barrier
        Consumer->>Handler: invoke handler
        alt success
            Handler-->>Consumer: success
            Consumer->>PubSub: ack
        else retry
            Handler-->>Consumer: retryLater
            Consumer->>PubSub: nack/retry (or allow DLQ after attempts)
        end
    end
    Consumer->>Consumer: release locks & cleanup
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Areas to focus:

  • packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (locking, deduplication, retry/DLQ, ack/nack semantics)
  • packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts (lazy init concurrency, offload attributes, error wrapping)
  • packages/gcp-pubsub/lib/utils/pubSubInitter.ts (resource creation/update/delete and DLQ merging)
  • packages/gcp-pubsub/lib/pubsub/PubSubPublisherManager.ts (complex generics and event→topic mapping)
  • packages/gcp-pubsub/test/utils/testContext.ts (DI registration and lifecycle management for tests)

Suggested reviewers

  • dariacm
  • CarlosGamero

Poem

🐰 I hopped in code through topics bright,

Emulators hummed into the night,
Buckets cradled payloads grand,
Messages danced at my command,
I nibble bugs and ship with light delight.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 5.26% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title clearly and directly describes the main change: implementing support for Google Cloud Pub/Sub, which is the core objective reflected throughout the substantial codebase additions.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/pubsub

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 98135ea and 231f239.

📒 Files selected for processing (2)
  • packages/sqs/lib/utils/eventBridgeSchemaBuilder.ts (1 hunks)
  • packages/sqs/package.json (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
packages/sqs/lib/utils/eventBridgeSchemaBuilder.ts (1)
packages/sqs/lib/index.ts (1)
  • EventBridgeDetail (30-30)
🔇 Additional comments (2)
packages/sqs/package.json (1)

42-42: Verify this dependency update is intentional.

The biome version has been bumped from ^2.3.2 to ^2.3.8 (patch-level update), which is generally safe for a dev dependency. However, since the PR objective is GCP Pub/Sub support rather than SQS package maintenance, please confirm this change is intentional or revert it to keep the PR focused on its primary scope.

packages/sqs/lib/utils/eventBridgeSchemaBuilder.ts (1)

71-78: Type refinement improves clarity and correctness.

The explicit : never branches in the EventBridgeDetail type definition follow TypeScript best practices for conditional types and improve type-level explicitness without any breaking changes.

The type is properly exported from packages/sqs/lib/index.ts and works correctly with the test suite. No internal consumers exist in the codebase that could break from this change.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@kibertoad kibertoad marked this pull request as ready for review November 17, 2025 21:49
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (19)
docker-compose.yml (1)

72-85: New emulator services look good; optionally restrict host exposure.

The GCS and Pub/Sub emulator definitions are consistent with the rest of the stack. To avoid exposing them beyond the host (like you already do for localstack), you might bind them to 127.0.0.1 (e.g., '127.0.0.1:4443:4443' and '127.0.0.1:8085:8085').

packages/gcp-pubsub/test/utils/cleanupPubSub.ts (1)

3-42: Cleanup helpers are safe; consider minor DRY and idempotency tweaks.

The existence checks before delete make these utilities safe for tests. If you want them fully idempotent even under races, you could (a) factor out a small “delete if exists” helper used by both subscription/topic deletions and (b) optionally tolerate not-found errors from delete() in addition to the explicit exists() checks.

packages/gcs-payload-store/package.json (1)

1-66: Manifest looks solid; consider adding types and narrowing docker:stop.

The package.json is generally ready for publishing. Two possible improvements:

  • Add a "types": "./dist/index.d.ts" field so TS consumers don’t rely solely on exports.
  • Change "docker:stop": "docker compose down" to only stop the emulator service (e.g., docker compose stop gcs-emulator), so running it from this package doesn’t tear down unrelated services.
packages/gcs-payload-store/lib/GCSPayloadStore.ts (1)

16-120: GCS payload store looks robust; only minor polish opportunities.

The config resolver, key generation with optional prefix, and handling of both string and stream payloads all look correct, and treating missing objects as null / no-op makes operations idempotent. If you want to tighten things further, you could:

  • Extract a small helper for generating the object key (UUID + optional prefix) so storePayload is a bit clearer.
  • Share a tiny “isNotFound error” helper between retrievePayload and deletePayload to avoid repeating the (error as any)?.code check.
packages/gcp-pubsub/lib/utils/pubSubInitter.ts (1)

8-176: Pub/Sub init/delete flow is sound; clarify config precedence and flag behavior.

The topic/subscription creation and DLQ wiring logic look correct, and deletePubSub’s delete-if-exists behavior is straightforward. Two edge cases worth considering:

  • If callers accidentally pass both locatorConfig and creationConfig, the locator path silently wins. You might want to reject that combination explicitly or document the precedence to avoid surprises.
  • PubSubCreationConfig.updateAttributesIfExists and DeletionConfig.waitForConfirmation / forceDeleteInProduction aren’t used here; if higher-level code expects those flags to have an effect, this could be confusing. Either wire them in or document that they’re ignored by these helpers.
packages/gcp-pubsub/vitest.config.ts (1)

17-22: Consider increasing branch coverage.

Branch coverage is at 62% while function coverage is at 87%, suggesting edge cases and error paths may not be fully tested. Consider adding tests for conditional branches to improve overall coverage quality.

packages/gcp-pubsub/lib/utils/pubSubMessageDeserializer.ts (1)

6-21: Consider broadening the return type or clarifying error handling.

The function returns Either<MessageInvalidFormatError, unknown>, but line 15 may return other error types (e.g., MessageValidationError, InternalError) when errorResolver.processError() transforms the error. This could lead to type inconsistencies.

Consider updating the return type to reflect all possible error types:

 export function deserializePubSubMessage(
   message: PubSubMessage,
   errorResolver: ErrorResolver,
-): Either<MessageInvalidFormatError, unknown> {
+): Either<MessageInvalidFormatError | MessageValidationError | InternalError, unknown> {

Or, if the intent is to only return MessageInvalidFormatError, add explicit type validation after error resolution.

packages/gcp-pubsub/README.md (2)

70-72: Add language identifier to fenced code block.

-```
+```text
 Publisher → Topic → Subscription → Consumer

Based on static analysis hints.

---

`672-678`: **Add language identifier to fenced code block.**



```diff
-```
+```text
 Attempt 1: Message nacked, redelivered by Pub/Sub
 Attempt 2: Message nacked, redelivered by Pub/Sub
 Attempt 3: Message nacked, redelivered by Pub/Sub
 ...
 After maxDeliveryAttempts: Sent to DLQ

Based on static analysis hints.

</blockquote></details>
<details>
<summary>packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts (2)</summary><blockquote>

`94-127`: **Test name suggests ordering guarantees that aren’t asserted**

`it('consumes multiple messages in order', ...)` only waits for each ID to be processed, but doesn’t assert any ordering semantics (e.g., by comparing processing sequence). If strict ordering isn’t guaranteed/required, consider renaming the test to avoid implying stronger guarantees than are actually verified; otherwise, extend the assertions to validate order explicitly (e.g., via the spy’s call sequence).

---

`172-186`: **Fixed 1s sleep in invalid-format test may be brittle**

The invalid-format test uses a hard-coded `setTimeout(1000)` and only inspects counters. That can be slightly brittle (slow CI, emulator hiccups) and doesn’t directly assert that the error path was exercised. If the handler spy can observe error statuses for invalid-format messages, consider waiting on the spy instead of a fixed timeout so the test becomes faster and more deterministic.

</blockquote></details>
<details>
<summary>packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (1)</summary><blockquote>

`15-30`: **Tighten option typing and consider dropping redundant publish override**

`PubSubPermissionPublisherOptions` works fine for tests, but there are a couple of small cleanups you might consider:

- Replace `payloadStoreConfig?: any` with a more specific type (or `unknown`) to avoid leaking `any` into the test surface.
- The explicit `publish` override just forwards to `super.publish` without changing behavior; unless you expect to extend it later, you could omit the override and rely on the base implementation to keep the class slimmer.

These are optional and don’t affect correctness.

</blockquote></details>
<details>
<summary>packages/gcp-pubsub/test/pubsub/PubSubPublisherManager.spec.ts (1)</summary><blockquote>

`196-243`: **Minor schema choice nit in injected publisher setup**

In the `injectPublisher` test you build `CommonPubSubPublisher` with `messageSchemas: [injectedSchema.consumerSchema]`. If `publisherSchema` and `consumerSchema` ever diverge, this might become confusing. Using `injectedSchema.publisherSchema` here (to mirror the publishing side) could make the intent clearer, even if they’re currently identical.

</blockquote></details>
<details>
<summary>packages/gcs-payload-store/test/utils/gcsUtils.ts (1)</summary><blockquote>

`3-49`: **Consider pagination when clearing buckets (optional)**

For test usage these helpers look fine. Be aware that `bucket.getFiles()` only returns the first page of objects; if a test ever writes enough objects to trigger pagination, `assertEmptyBucket` would leave some behind. If that’s a realistic scenario, consider switching to the paged API (`getFiles({ autoPaginate: true })`) or looping over pages.

</blockquote></details>
<details>
<summary>packages/gcp-pubsub/test/utils/testContext.ts (2)</summary><blockquote>

`22-43`: **Logger stub works but could avoid `@ts-expect-error`**

Using `console` as a `CommonLogger` stub is fine for tests, but the `// @ts-expect-error` suggests the interfaces don’t quite match. If you want to tighten this up later, you could define a small adapter that implements `CommonLogger` and forwards to `console`, removing the need for the suppression.

---

`63-120`: **Stubbed managers typed as non-optional may hide `undefined` at runtime**

`transactionObservabilityManager` and `messageMetricsManager` are registered as functions returning `undefined`, while the `Dependencies` interface types them as `TransactionObservabilityManager` and `MessageMetricsManager` respectively. That’s fine as long as all consumers treat them as optional, but it’s easy for future code to accidentally call methods on them and hit `undefined`.

Two possible follow-ups (no rush, test-only):

- Change their types in `Dependencies` to `TransactionObservabilityManager | undefined` / `MessageMetricsManager | undefined`, or
- Provide minimal no-op implementations that satisfy the interfaces instead of returning `undefined`.

Similarly, you may want to consider marking `gcsStorage` and `consumerErrorResolver` as singletons for consistency with other shared test dependencies, although it’s not required for correctness.

</blockquote></details>
<details>
<summary>packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts (1)</summary><blockquote>

`50-133`: **Publish flow looks correct; consider minor polish around serialization and error wrapping**

The overall publish pipeline (schema resolution, lazy init, optional logging, dedup, offloading, and publish) is coherent and matches the core abstractions. Two small, non-blocking nits:

- You serialize the message twice (`offloadMessagePayloadIfNeeded` size callback and `sendMessage`). If payloads get large and throughput is high, you might later want to let the offloader return (or reuse) the encoded `Buffer` to avoid the second stringify.
- `messageSchemaResult.error` is thrown before the `try` block, while other failures are wrapped into `InternalError`. If you want a single error surface for callers, consider wrapping schema-resolution errors the same way.

Nothing here is blocking; just potential future cleanup.

</blockquote></details>
<details>
<summary>packages/gcp-pubsub/lib/pubsub/PubSubPublisherManager.ts (1)</summary><blockquote>

`50-77`: **Verify schema-type mapping between `PubSubMessageSchemaType` and the manager’s `publish` return type**

You introduce `PubSubMessageSchemaType<T> = z.output<T['publisherSchema']>` and pass it as the 7th generic argument to `AbstractPublisherManager`, but the overridden `publish` method still uses the core alias:

```ts
override publish(...): Promise<MessageSchemaType<SupportedEventDefinitions[number]>>

where MessageSchemaType in core is based on consumerSchema.

Depending on how AbstractPublisherManager is parameterized, this can mean:

  • The manager is instantiated with one schema type (PubSubMessageSchemaType), but
  • The visible publish return type exposes a different schema (MessageSchemaType).

Please double-check that:

  1. The 7th generic argument to AbstractPublisherManager matches the type you actually want to expose from publish, and
  2. MessageSchemaType<SupportedEventDefinitions[number]> here is indeed assignable to that generic argument.

If they’re intentionally different (e.g. output vs input types, or publisher vs consumer schemas), it might be worth adding a short comment to document that asymmetry.

Also applies to: 101-110

packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (1)

27-75: Minor cleanup: unused constant and unused consumer override fields

Two small cleanups you can consider:

  • _ABORT_EARLY_EITHER is defined but unused; if you’re no longer using the “abort early” pattern, it can be safely removed.
  • PubSubConsumerOptions exposes consumerOverrides.batching, but start() only applies consumerOverrides.flowControl to subscription.setOptions. If batching overrides are intended to be supported, you might want to wire them through here; otherwise, consider dropping them from the options to avoid confusion.

Neither is blocking, but tightening these up will reduce mental overhead for future readers.

Also applies to: 196-203, 495-511

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aa9cc20 and b0bc6f4.

📒 Files selected for processing (44)
  • docker-compose.yml (1 hunks)
  • packages/gcp-pubsub/README.md (1 hunks)
  • packages/gcp-pubsub/lib/errors/PubSubConsumerErrorResolver.ts (1 hunks)
  • packages/gcp-pubsub/lib/fakes/FakeConsumerErrorResolver.ts (1 hunks)
  • packages/gcp-pubsub/lib/index.ts (1 hunks)
  • packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (1 hunks)
  • packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts (1 hunks)
  • packages/gcp-pubsub/lib/pubsub/AbstractPubSubService.ts (1 hunks)
  • packages/gcp-pubsub/lib/pubsub/CommonPubSubPublisherFactory.ts (1 hunks)
  • packages/gcp-pubsub/lib/pubsub/PubSubPublisherManager.ts (1 hunks)
  • packages/gcp-pubsub/lib/schemas/pubSubSchemas.ts (1 hunks)
  • packages/gcp-pubsub/lib/types/MessageTypes.ts (1 hunks)
  • packages/gcp-pubsub/lib/utils/messageUtils.ts (1 hunks)
  • packages/gcp-pubsub/lib/utils/pubSubInitter.ts (1 hunks)
  • packages/gcp-pubsub/lib/utils/pubSubMessageDeserializer.ts (1 hunks)
  • packages/gcp-pubsub/lib/utils/pubSubMessageReader.ts (1 hunks)
  • packages/gcp-pubsub/lib/utils/pubSubUtils.ts (1 hunks)
  • packages/gcp-pubsub/package.json (1 hunks)
  • packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts (1 hunks)
  • packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts (1 hunks)
  • packages/gcp-pubsub/test/consumers/userConsumerSchemas.ts (1 hunks)
  • packages/gcp-pubsub/test/fakes/FakeLogger.ts (1 hunks)
  • packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.spec.ts (1 hunks)
  • packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (1 hunks)
  • packages/gcp-pubsub/test/pubsub/PubSubPublisherManager.spec.ts (1 hunks)
  • packages/gcp-pubsub/test/utils/cleanRedis.ts (1 hunks)
  • packages/gcp-pubsub/test/utils/cleanupPubSub.ts (1 hunks)
  • packages/gcp-pubsub/test/utils/testContext.ts (1 hunks)
  • packages/gcp-pubsub/test/utils/testPubSubConfig.ts (1 hunks)
  • packages/gcp-pubsub/test/utils/testRedisConfig.ts (1 hunks)
  • packages/gcp-pubsub/tsconfig.build.json (1 hunks)
  • packages/gcp-pubsub/tsconfig.json (1 hunks)
  • packages/gcp-pubsub/vitest.config.ts (1 hunks)
  • packages/gcs-payload-store/README.md (1 hunks)
  • packages/gcs-payload-store/lib/GCSPayloadStore.ts (1 hunks)
  • packages/gcs-payload-store/lib/index.ts (1 hunks)
  • packages/gcs-payload-store/package.json (1 hunks)
  • packages/gcs-payload-store/test/store/GCSPayloadStore.spec.ts (1 hunks)
  • packages/gcs-payload-store/test/utils/gcsUtils.ts (1 hunks)
  • packages/gcs-payload-store/test/utils/streamUtils.ts (1 hunks)
  • packages/gcs-payload-store/test/utils/testGCSConfig.ts (1 hunks)
  • packages/gcs-payload-store/tsconfig.build.json (1 hunks)
  • packages/gcs-payload-store/tsconfig.json (1 hunks)
  • packages/gcs-payload-store/vitest.config.ts (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (17)
packages/gcp-pubsub/lib/schemas/pubSubSchemas.ts (1)
packages/gcp-pubsub/lib/pubsub/PubSubPublisherManager.ts (1)
  • PubSubAwareEventDefinition (22-22)
packages/gcp-pubsub/lib/utils/pubSubMessageDeserializer.ts (2)
packages/gcp-pubsub/lib/types/MessageTypes.ts (1)
  • PubSubMessage (4-4)
packages/gcp-pubsub/lib/utils/pubSubMessageReader.ts (1)
  • readPubSubMessage (5-23)
packages/gcp-pubsub/lib/utils/pubSubMessageReader.ts (1)
packages/gcp-pubsub/lib/types/MessageTypes.ts (1)
  • PubSubMessage (4-4)
packages/gcp-pubsub/lib/pubsub/CommonPubSubPublisherFactory.ts (2)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubService.ts (3)
  • PubSubDependencies (10-12)
  • PubSubCreationConfig (48-52)
  • PubSubQueueLocatorType (54-57)
packages/core/lib/types/queueOptionsTypes.ts (1)
  • QueuePublisherOptions (115-122)
packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.spec.ts (3)
packages/gcp-pubsub/test/utils/testContext.ts (1)
  • registerDependencies (25-130)
packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (1)
  • PubSubPermissionPublisher (32-59)
packages/gcp-pubsub/test/utils/cleanupPubSub.ts (1)
  • deletePubSubTopic (3-9)
packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts (4)
packages/gcp-pubsub/test/utils/testContext.ts (1)
  • registerDependencies (25-130)
packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts (1)
  • PubSubPermissionConsumer (60-140)
packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (1)
  • PubSubPermissionPublisher (32-59)
packages/gcp-pubsub/test/utils/cleanupPubSub.ts (1)
  • deletePubSubTopicAndSubscription (27-42)
packages/gcs-payload-store/test/store/GCSPayloadStore.spec.ts (4)
packages/gcs-payload-store/lib/GCSPayloadStore.ts (2)
  • GCSPayloadStore (43-120)
  • resolvePayloadStoreConfig (16-32)
packages/gcs-payload-store/test/utils/testGCSConfig.ts (1)
  • createTestGCSClient (8-13)
packages/gcs-payload-store/test/utils/gcsUtils.ts (3)
  • assertEmptyBucket (3-16)
  • getObjectContent (18-27)
  • objectExists (29-38)
packages/gcs-payload-store/test/utils/streamUtils.ts (1)
  • streamToString (3-10)
packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (3)
packages/gcp-pubsub/test/consumers/userConsumerSchemas.ts (4)
  • PERMISSIONS_ADD_MESSAGE_TYPE (18-18)
  • PERMISSIONS_REMOVE_MESSAGE_TYPE (19-19)
  • PERMISSIONS_ADD_MESSAGE_SCHEMA (3-9)
  • PERMISSIONS_REMOVE_MESSAGE_SCHEMA (11-16)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubService.ts (1)
  • PubSubDependencies (10-12)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts (1)
  • PubSubMessageOptions (23-26)
packages/gcp-pubsub/lib/utils/pubSubInitter.ts (2)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubService.ts (2)
  • PubSubQueueLocatorType (54-57)
  • PubSubCreationConfig (48-52)
packages/core/lib/types/queueOptionsTypes.ts (1)
  • DeletionConfig (95-99)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts (4)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubService.ts (3)
  • PubSubDependencies (10-12)
  • PubSubCreationConfig (48-52)
  • PubSubQueueLocatorType (54-57)
packages/core/lib/types/queueOptionsTypes.ts (1)
  • QueuePublisherOptions (115-122)
packages/gcp-pubsub/lib/utils/messageUtils.ts (1)
  • buildOffloadedPayloadAttributes (7-23)
packages/gcp-pubsub/lib/types/MessageTypes.ts (1)
  • PubSubMessage (4-4)
packages/gcp-pubsub/test/pubsub/PubSubPublisherManager.spec.ts (4)
packages/gcp-pubsub/lib/schemas/pubSubSchemas.ts (1)
  • PubSubAwareEventDefinition (3-5)
packages/gcp-pubsub/test/utils/testContext.ts (2)
  • Dependencies (134-148)
  • registerDependencies (25-130)
packages/gcp-pubsub/lib/pubsub/CommonPubSubPublisherFactory.ts (1)
  • CommonPubSubPublisher (20-22)
packages/core/lib/messages/MetadataFiller.ts (1)
  • CommonMetadataFiller (29-66)
packages/gcp-pubsub/test/utils/testContext.ts (5)
packages/gcp-pubsub/test/utils/testPubSubConfig.ts (1)
  • TEST_PUBSUB_CONFIG (1-4)
packages/gcp-pubsub/lib/errors/PubSubConsumerErrorResolver.ts (1)
  • PubSubConsumerErrorResolver (6-32)
packages/gcp-pubsub/test/utils/testRedisConfig.ts (1)
  • TEST_REDIS_CONFIG (1-10)
packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts (1)
  • PubSubPermissionConsumer (60-140)
packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (1)
  • PubSubPermissionPublisher (32-59)
packages/gcp-pubsub/lib/fakes/FakeConsumerErrorResolver.ts (2)
packages/gcp-pubsub/lib/errors/PubSubConsumerErrorResolver.ts (1)
  • PubSubConsumerErrorResolver (6-32)
packages/amqp/test/fakes/FakeConsumerErrorResolver.ts (1)
  • FakeConsumerErrorResolver (3-23)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubService.ts (3)
packages/core/lib/types/queueOptionsTypes.ts (2)
  • QueueDependencies (12-16)
  • QueueOptions (109-113)
packages/gcp-pubsub/lib/types/MessageTypes.ts (1)
  • PubSubMessage (4-4)
packages/gcp-pubsub/lib/utils/pubSubInitter.ts (2)
  • deletePubSub (152-176)
  • initPubSub (32-150)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (6)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubService.ts (3)
  • PubSubDependencies (10-12)
  • PubSubCreationConfig (48-52)
  • PubSubQueueLocatorType (54-57)
packages/core/lib/types/queueOptionsTypes.ts (2)
  • QueueConsumerDependencies (73-76)
  • QueueConsumerOptions (134-152)
packages/gcp-pubsub/lib/utils/pubSubInitter.ts (2)
  • deletePubSub (152-176)
  • initPubSub (32-150)
packages/gcp-pubsub/lib/types/MessageTypes.ts (1)
  • PubSubMessage (4-4)
packages/gcp-pubsub/lib/utils/pubSubMessageDeserializer.ts (1)
  • deserializePubSubMessage (6-21)
packages/gcp-pubsub/lib/utils/messageUtils.ts (1)
  • hasOffloadedPayload (3-5)
packages/gcp-pubsub/lib/pubsub/PubSubPublisherManager.ts (6)
packages/gcp-pubsub/lib/schemas/pubSubSchemas.ts (1)
  • PubSubAwareEventDefinition (3-5)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubService.ts (3)
  • PubSubDependencies (10-12)
  • PubSubCreationConfig (48-52)
  • PubSubQueueLocatorType (54-57)
packages/gcp-pubsub/lib/pubsub/CommonPubSubPublisherFactory.ts (2)
  • PubSubPublisherFactory (10-18)
  • CommonPubSubPublisherFactory (24-33)
packages/core/lib/messages/MetadataFiller.ts (1)
  • MetadataFiller (18-27)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts (2)
  • PubSubMessageOptions (23-26)
  • publish (50-119)
packages/core/lib/queues/AbstractPublisherManager.ts (2)
  • MessagePublishType (11-11)
  • MessageSchemaType (13-13)
packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts (2)
packages/gcp-pubsub/test/consumers/userConsumerSchemas.ts (4)
  • PERMISSIONS_ADD_MESSAGE_TYPE (18-18)
  • PERMISSIONS_REMOVE_MESSAGE_TYPE (19-19)
  • PERMISSIONS_REMOVE_MESSAGE_SCHEMA (11-16)
  • PERMISSIONS_ADD_MESSAGE_SCHEMA (3-9)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (2)
  • PubSubConsumerOptions (48-74)
  • PubSubConsumerDependencies (46-46)
🪛 LanguageTool
packages/gcp-pubsub/README.md

[style] ~78-~78: This adverb was used twice in the sentence. Consider removing one of them or replacing them with a synonym.
Context: ...to subscriptions - ❌ You CANNOT consume directly from topics - ✅ One topic can have mult...

(ADVERB_REPETITION_PREMIUM)

🪛 markdownlint-cli2 (0.18.1)
packages/gcp-pubsub/README.md

70-70: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


672-672: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (33)
packages/gcp-pubsub/test/utils/cleanRedis.ts (1)

1-5: LGTM! Clean test utility implementation.

The function is simple, well-typed, and appropriate for its purpose as a test cleanup utility. Using flushdb() correctly clears the current Redis database, which is the expected behavior for isolating test state.

packages/gcp-pubsub/tsconfig.json (1)

1-7: LGTM! Clean TypeScript configuration.

The configuration appropriately extends the base config, includes all necessary paths, and adds Vitest globals for testing.

packages/gcp-pubsub/lib/types/MessageTypes.ts (1)

1-4: LGTM! Clean type aliasing.

The type alias provides a consistent abstraction over the Google Cloud SDK Message type, which is a good practice for maintaining a clean public API.

packages/gcs-payload-store/tsconfig.json (1)

1-7: LGTM! Consistent configuration across packages.

The configuration mirrors the gcp-pubsub package setup, which maintains consistency across the monorepo.

packages/gcs-payload-store/tsconfig.build.json (1)

1-5: LGTM! Standard build configuration.

The build configuration appropriately targets library code and excludes test files from the published package.

packages/gcp-pubsub/tsconfig.build.json (1)

1-5: LGTM! Consistent build configuration.

The configuration mirrors the gcs-payload-store package, maintaining consistency in how public libraries are built.

packages/gcs-payload-store/vitest.config.ts (1)

1-25: LGTM! Well-configured test setup.

The Vitest configuration is appropriate with reasonable coverage thresholds. The singleThread setting ensures deterministic test execution, which is beneficial for integration tests with external services like GCS.

packages/gcs-payload-store/test/utils/testGCSConfig.ts (1)

1-13: Port configuration verified—approval confirmed.

The GCS emulator port in the test configuration (4443) correctly matches the docker-compose service port mapping (4443:4443) and the startup command (-port 4443). The test client configuration is properly aligned with the emulator setup.

packages/gcp-pubsub/test/utils/testRedisConfig.ts (1)

1-10: Redis configuration correctly aligns with docker-compose.

The test configuration password ('sOmE_sEcUrE_pAsS'), port (6379), and default host settings match the docker-compose Redis service configuration. Verification confirms proper alignment.

packages/gcp-pubsub/test/consumers/userConsumerSchemas.ts (1)

1-19: Permission message schemas are well-structured for add/remove flows.

Good use of a discriminating messageType field and stricter timestamp/userIds requirements for remove vs add. The inferred types should keep the publisher/consumer tests nicely aligned with the wire format.

packages/gcp-pubsub/test/fakes/FakeLogger.ts (1)

1-52: FakeLogger is a pragmatic test double.

The in-memory message/warning/error buffers and child() returning the same instance are a good fit for tests. This should drop into any CommonLogger-using code without surprises.

packages/gcs-payload-store/lib/index.ts (1)

1-1: LGTM!

The barrel export is correctly structured for ESM compatibility.

packages/gcs-payload-store/test/utils/streamUtils.ts (1)

3-10: LGTM!

The stream-to-string conversion is correctly implemented with proper error handling and Buffer management.

packages/gcp-pubsub/lib/utils/pubSubMessageReader.ts (1)

5-23: LGTM!

The message reader properly handles JSON parsing errors and returns a type-safe Either result with appropriate error details.

packages/gcp-pubsub/test/utils/testPubSubConfig.ts (1)

1-4: LGTM!

The test configuration appropriately points to the Pub/Sub emulator endpoint.

packages/gcp-pubsub/vitest.config.ts (1)

4-12: LGTM!

The single-thread pool configuration is appropriate for integration tests that may share state with the Pub/Sub emulator.

packages/gcp-pubsub/lib/schemas/pubSubSchemas.ts (1)

3-5: LGTM!

The type definition correctly extends CommonEventDefinition with an optional Pub/Sub-specific topic field.

packages/gcp-pubsub/lib/utils/pubSubUtils.ts (2)

11-13: Global environment modification side effect.

Setting process.env.PUBSUB_EMULATOR_HOST globally can affect other code or tests in the same process. This is the standard pattern for GCP emulators, but be aware that once set, it affects all subsequent PubSub client creations in the process.


8-18: LGTM!

The type-only import combined with dynamic require is an intentional pattern to avoid bundling issues with optional peer dependencies.

packages/gcs-payload-store/README.md (1)

1-253: LGTM!

The documentation is well-structured, comprehensive, and provides clear examples for installation, configuration, lifecycle management, and best practices.

Also applies to: 282-345

packages/gcs-payload-store/test/store/GCSPayloadStore.spec.ts (1)

13-145: LGTM!

The test suite is comprehensive and well-structured, covering all major scenarios including:

  • Payload storage (string and stream inputs)
  • Key prefix functionality
  • Retrieval with proper null handling
  • Deletion with graceful error handling
  • Configuration resolution edge cases
packages/gcp-pubsub/lib/utils/messageUtils.ts (1)

1-23: LGTM!

The utility functions correctly handle offloaded payload detection and attribute building. The type guards are appropriate, and the string conversion for the size attribute is handled properly.

packages/gcp-pubsub/lib/fakes/FakeConsumerErrorResolver.ts (1)

3-23: LGTM!

The test fake correctly extends PubSubConsumerErrorResolver and provides error tracking functionality. The implementation follows established patterns from other adapters in the codebase (e.g., packages/amqp/test/fakes/FakeConsumerErrorResolver.ts).

packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.spec.ts (1)

8-145: LGTM!

The test suite provides good coverage of publisher functionality including:

  • Topic creation and idempotent initialization
  • Publishing single and multiple messages
  • Publishing with ordering keys and custom attributes
  • Handler spy integration for verifying published messages
packages/gcp-pubsub/lib/errors/PubSubConsumerErrorResolver.ts (1)

6-32: LGTM!

The error resolver correctly maps different error types to standardized internal errors:

  • SyntaxErrorMessageInvalidFormatError
  • ZodErrorMessageValidationError (with validation details)
  • Standardized errors → InternalError (preserving error code)
  • Fallback → Generic InternalError

The error handling logic is comprehensive and follows established patterns.

packages/gcp-pubsub/package.json (1)

34-34: No security advisories found.

The version @google-cloud/pubsub ^5.2.0 has no known security vulnerabilities according to GitHub's security database. The dependency is safe to use.

packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts (1)

15-40: Lifecycle setup and DI usage look solid

Reinitializing the consumer/publisher and cleaning up topics/subscriptions in beforeEach gives good isolation between tests and mirrors the actual lifecycle of the components. This is a sensible pattern for these integration-style tests.

packages/gcp-pubsub/lib/pubsub/CommonPubSubPublisherFactory.ts (1)

10-32: Generic publisher factory and CommonPubSubPublisher wiring look correct

The PubSubPublisherFactory signature and CommonPubSubPublisherFactory implementation correctly thread PublisherBaseEventType and QueuePublisherOptions<PubSubCreationConfig, PubSubQueueLocatorType, M>. The minimal CommonPubSubPublisher subclass keeps the abstraction clear without introducing extra behavior.

packages/gcp-pubsub/test/pubsub/PubSubPublisherManager.spec.ts (1)

17-48: Event definitions and type plumbing give good coverage

The combination of TestEvents, PubSubAwareEventDefinition, and TestEventPublishPayloadsType gives strong type-safety for the PubSubPublisherManager tests while still being readable. This nicely exercises the manager against realistic event metadata and payload schemas.

packages/gcp-pubsub/lib/index.ts (1)

1-14: Barrel exports are coherent and comprehensive

This index file cleanly exposes the intended public surface (errors, abstract Pub/Sub primitives, factory, manager, schemas, and utilities) without over-coupling tests. It should make the package much easier to consume.

packages/gcp-pubsub/test/utils/testContext.ts (1)

132-148: DI config and returned container shape look consistent

DiConfig as Record<keyof Dependencies, Resolver<any>> and the Dependencies interface align with what you register in diConfig, and returning the Awilix container from registerDependencies integrates cleanly with the tests (AwilixContainer<Dependencies> usage). The async init/dispose setup for Redis and the permission consumer/publisher looks well thought out.

packages/gcp-pubsub/lib/pubsub/AbstractPubSubService.ts (1)

10-105: Base Pub/Sub service abstraction looks consistent with core queue service

Type wiring into AbstractQueueService and the init / close lifecycle look correct, and the dependency contract (PubSubDependencies) is minimal and clear. No blocking issues from my side here.

packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts (1)

20-138: Test consumer wiring into AbstractPubSubConsumer looks good

The options narrowing, default creationConfig, and handler wiring via MessageHandlerConfigBuilder are all consistent with the core consumer abstractions. Arrow-function handlers correctly capture this for counters and tracking, so this should be a solid functional-test harness.

Comment on lines +30 to +48
export type PubSubPublisherManagerOptions<
T extends AbstractPubSubPublisher<EventType>,
EventType extends PublisherBaseEventType,
MetadataType,
> = {
metadataField?: string
publisherFactory?: PubSubPublisherFactory<T, EventType>
metadataFiller: MetadataFiller<EventType, MetadataType>
newPublisherOptions: Omit<
import('@message-queue-toolkit/core').QueuePublisherOptions<
PubSubCreationConfig,
PubSubQueueLocatorType,
EventType
>,
'messageSchemas' | 'creationConfig' | 'locatorConfig'
> & {
creationConfig?: Omit<PubSubCreationConfig, 'topic'>
}
}
Copy link

@coderabbitai coderabbitai bot Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

MetadataFiller generic parameters look mis-ordered in options

In core, MetadataFiller is defined as:

  • MetadataFiller<T, D, M> where:
    • T = message type,
    • D = event-definition type,
    • M = metadata type.

Here the options declare:

metadataFiller: MetadataFiller<EventType, MetadataType>

which effectively treats MetadataType as the event definition (D) and leaves the metadata type at its default (PublisherMessageMetadataType). That doesn’t line up with the generics on PubSubPublisherManager, where MetadataType is meant to be the actual metadata shape.

I’d expect something closer to:

metadataFiller: MetadataFiller<
  EventType,
  PubSubAwareEventDefinition, // or SupportedEventDefinitions[number]
  MetadataType
>

and corresponding adjustments in the manager / options generics so the type system reflects how produceMetadata is actually called.


🏁 Script executed:

#!/bin/bash
# Search for MetadataFiller definition in the codebase
rg -nP "type MetadataFiller|interface MetadataFiller" --type=ts

Length of output: 161


🏁 Script executed:

#!/bin/bash
# Examine the PubSubPublisherManager class definition to understand generics
rg -nP "class PubSubPublisherManager" --type=ts -A 30

Length of output: 3042


🏁 Script executed:

#!/bin/bash
# Find where metadataFiller is used/assigned in PubSubPublisherManager
rg -nP "metadataFiller" --type=ts -B 2 -A 2 -g "**/*PubSubPublisherManager*"

Length of output: 1553


🏁 Script executed:

#!/bin/bash
# Find AbstractPublisherManager to understand the base class generics
rg -nP "class AbstractPublisherManager|export class AbstractPublisherManager" --type=ts -A 20

Length of output: 1969


🏁 Script executed:

#!/bin/bash
# Check if metadataFiller is actually called/used and what signature it expects
rg -nP "produceMetadata|metadataFiller\(" --type=ts -B 3 -A 3 -g "**/*PubSubPublisherManager*"

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Read the MetadataFiller type definition
cat packages/core/lib/messages/MetadataFiller.ts

Length of output: 2082


🏁 Script executed:

#!/bin/bash
# Check AbstractPublisherManager for metadataFiller usage
rg -nP "metadataFiller|produceMetadata" --type=ts -B 3 -A 3 -g "**/*AbstractPublisherManager*"

Length of output: 4391


🏁 Script executed:

#!/bin/bash
# Check if there are TypeScript compilation errors
cd packages/gcp-pubsub && cat tsconfig.json 2>/dev/null | head -20

Length of output: 173


🏁 Script executed:

#!/bin/bash
# Verify the exact lines 30-48 in PubSubPublisherManager.ts still have the issue
sed -n '30,48p' packages/gcp-pubsub/lib/pubsub/PubSubPublisherManager.ts

Length of output: 601


🏁 Script executed:

#!/bin/bash
# Check the full context of PubSubPublisherManagerOptions generics
sed -n '30,52p' packages/gcp-pubsub/lib/pubsub/PubSubPublisherManager.ts

Length of output: 713


MetadataFiller generics are mis-ordered; second parameter should be an event-definition type, not MetadataType

The MetadataFiller type is defined as MetadataFiller<T, D, M> where:

  • T = message type
  • D = event-definition type
  • M = metadata type

Line 37 declares:

metadataFiller: MetadataFiller<EventType, MetadataType>

This passes MetadataType as the second parameter (event-definition position), leaving metadata type at its default. The correct signature should be:

metadataFiller: MetadataFiller<EventType, SupportedEventDefinitions[number], MetadataType>

Note: AbstractPublisherManager in core has the same issue and suppresses it with @ts-expect-error on line 238. This type mismatch should be resolved to maintain proper type safety across the hierarchy.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fixed already, could you check it

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check the current state of the metadataFiller declaration in PubSubPublisherManager.ts
sed -n '30,50p' packages/gcp-pubsub/lib/pubsub/PubSubPublisherManager.ts

Length of output: 688


@CarlosGamero, I've checked the current code and the issue is still present. Line 37 shows:

metadataFiller: MetadataFiller<EventType, MetadataType>

This is still missing the event-definition type parameter. MetadataFiller expects three generic parameters:

  • T = message type
  • D = event-definition type
  • M = metadata type

The correct signature should be:

metadataFiller: MetadataFiller<EventType, PubSubAwareEventDefinition, MetadataType>

Or if you want to be more specific:

metadataFiller: MetadataFiller<EventType, SupportedEventDefinitions[number], MetadataType>

This ensures MetadataType is properly positioned as the third generic parameter (metadata type) rather than the second (event-definition type).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be fixed in core and then in all the adapters, I don't want to touch it yet

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

"peerDependencies": {
"@google-cloud/pubsub": "^5.2.0",
"@message-queue-toolkit/core": ">=22.0.0",
"zod": ">=3.25.76 <5.0.0"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Verify Zod peer dependency range matches actual usage.

The peer dependency allows zod >=3.25.76 <5.0.0, but the code imports from zod/v4 (see packages/gcp-pubsub/lib/errors/PubSubConsumerErrorResolver.ts:4), which is specific to Zod 4.x. This mismatch could cause runtime failures if users install Zod 3.x.

Consider tightening the peer dependency to require Zod 4:

-    "zod": ">=3.25.76 <5.0.0"
+    "zod": "^4.0.0"

Alternatively, if Zod 3 support is intended, update the import to use the standard zod path instead of zod/v4.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"zod": ">=3.25.76 <5.0.0"
"zod": "^4.0.0"
🤖 Prompt for AI Agents
In packages/gcp-pubsub/package.json around line 36, the declared Zod peer
dependency ">=3.25.76 <5.0.0" conflicts with code that imports from "zod/v4"
(which requires Zod 4.x); to fix either tighten the peer dependency to require
Zod 4 (e.g., restrict to ^4.0.0 <5.0.0) or modify the code to import from the
generic "zod" entry so it remains compatible with Zod 3 and 4; choose one
approach and update package.json and/or the import paths accordingly, then run
tests and update package lock files if needed.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bcb1135 and 88954b7.

📒 Files selected for processing (1)
  • packages/gcp-pubsub/README.md (1 hunks)
🧰 Additional context used
🪛 LanguageTool
packages/gcp-pubsub/README.md

[style] ~81-~81: This adverb was used twice in the sentence. Consider removing one of them or replacing them with a synonym.
Context: ...to subscriptions - ❌ You CANNOT consume directly from topics - ✅ One topic can have mult...

(ADVERB_REPETITION_PREMIUM)

🪛 markdownlint-cli2 (0.18.1)
packages/gcp-pubsub/README.md

73-73: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


794-794: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
  • GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/redis-message-deduplication-store) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/redis-message-deduplication-store) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/core) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/amqp) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/metrics) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/s3-payload-store) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/s3-payload-store) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/amqp) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: kafka (24.x) / build
  • GitHub Check: kafka (22.x) / build
🔇 Additional comments (1)
packages/gcp-pubsub/README.md (1)

1-1600: Excellent documentation coverage.

This README provides comprehensive guidance on GCP Pub/Sub integration. The structure is well-organized with clear sections on configuration, advanced features, testing, and troubleshooting. Code examples are practical and demonstrate both basic and advanced patterns. The API reference and best practices sections will be valuable for users adopting the library.

"peerDependencies": {
"@google-cloud/pubsub": "^5.2.0",
"@message-queue-toolkit/core": ">=22.0.0",
"zod": ">=3.25.76 <5.0.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it is a new lib, I think it is ok to use Zod 4 directly

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to keep it to increase compatibility - potentially it might be used by teams who didn't migrate to 4.x yet

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
packages/gcp-pubsub/package.json (1)

36-36: Zod peer dependency range still conflicts with code imports.

The peer dependency allows >=3.25.76 <5.0.0, but the code imports from zod/v4 (e.g., packages/gcp-pubsub/lib/errors/PubSubConsumerErrorResolver.ts:4), which will fail at runtime with Zod 3.x installations. While the maintainer chose to keep the wider range for compatibility with teams not yet on Zod 4, users installing Zod 3.x will encounter runtime errors.

🧹 Nitpick comments (1)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (1)

200-207: Consider using proper types for flowControl configuration.

The double @ts-expect-error suppressions suggest the type definitions for subscription.setOptions() and consumerOverrides.flowControl may be incomplete. While the runtime code is safe (checks for existence), consider investigating whether proper types can be imported from @google-cloud/pubsub to eliminate these suppressions.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 88954b7 and 98135ea.

📒 Files selected for processing (10)
  • packages/amqp/lib/errors/AmqpConsumerErrorResolver.ts (1 hunks)
  • packages/gcp-pubsub/README.md (1 hunks)
  • packages/gcp-pubsub/lib/errors/PubSubConsumerErrorResolver.ts (1 hunks)
  • packages/gcp-pubsub/lib/index.ts (1 hunks)
  • packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (1 hunks)
  • packages/gcp-pubsub/lib/utils/pubSubMessageDeserializer.ts (1 hunks)
  • packages/gcp-pubsub/package.json (1 hunks)
  • packages/gcs-payload-store/README.md (1 hunks)
  • packages/sns/lib/errors/SnsConsumerErrorResolver.ts (1 hunks)
  • packages/sqs/lib/errors/SqsConsumerErrorResolver.ts (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • packages/gcs-payload-store/README.md
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/gcp-pubsub/lib/index.ts
  • packages/gcp-pubsub/lib/utils/pubSubMessageDeserializer.ts
🧰 Additional context used
🪛 LanguageTool
packages/gcp-pubsub/README.md

[style] ~81-~81: This adverb was used twice in the sentence. Consider removing one of them or replacing them with a synonym.
Context: ...to subscriptions - ❌ You CANNOT consume directly from topics - ✅ One topic can have mult...

(ADVERB_REPETITION_PREMIUM)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: general (22.x, @message-queue-toolkit/outbox-core) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/amqp) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/s3-payload-store) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
  • GitHub Check: kafka (22.x) / build
  • GitHub Check: kafka (24.x) / build
🔇 Additional comments (6)
packages/sns/lib/errors/SnsConsumerErrorResolver.ts (1)

23-34: LGTM! Error cause propagation improves observability.

Adding cause: error to both the standardized error path and the fallback error path enables proper error chaining and preserves the original error context. This aligns with similar improvements across AMQP, SQS, and Pub/Sub error resolvers in this PR.

packages/sqs/lib/errors/SqsConsumerErrorResolver.ts (1)

23-36: LGTM! Error cause propagation improves observability.

Adding cause: error to both the standardized error path and the fallback error path enables proper error chaining. This matches the pattern applied consistently across error resolvers in this PR.

packages/amqp/lib/errors/AmqpConsumerErrorResolver.ts (1)

23-34: LGTM! Error cause propagation improves observability.

Adding cause: error enables proper error chaining and preserves the original error context. This aligns with the consistent pattern applied across all error resolvers (AMQP, SNS, SQS, Pub/Sub) in this PR.

packages/gcp-pubsub/README.md (1)

1-1604: Excellent comprehensive documentation.

The README provides thorough coverage of all features including quick start examples, configuration options, advanced features (payload offloading, deduplication, DLQ, ordering, barriers, handler spies), error handling, testing guidance, and troubleshooting tips. The structure and examples make it easy for users to adopt the library.

packages/gcp-pubsub/lib/errors/PubSubConsumerErrorResolver.ts (1)

21-32: LGTM! Error cause propagation addresses previous feedback.

Adding cause: error to both the standardized error path and the fallback error path implements the suggestion from previous review feedback. This enables proper error chaining and aligns with the consistent pattern applied across all error resolvers in this PR.

Based on learnings from past review comments.

packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (1)

219-386: LGTM! Message handling flow correctly implements offloaded payload retrieval.

The message handling flow properly addresses the previous review feedback:

  1. Deserializes the message once via resolveMessage() (lines 228-243)
  2. Retrieves offloaded payload if needed (lines 245-263)
  3. Uses the (possibly offloaded) messagePayload for schema resolution and parsing (lines 265-290)
  4. Implements proper deduplication with lock acquisition (lines 294-322)
  5. Handles retry logic with duration checks (lines 329-381)

The complexity is well-managed despite the justified cognitive complexity warning.

@kibertoad kibertoad merged commit 2077222 into main Dec 3, 2025
49 of 61 checks passed
@kibertoad kibertoad deleted the feat/pubsub branch December 3, 2025 16:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants