From c7624deadf5b3b7dcc0f45b3e827ec856c46fd7f Mon Sep 17 00:00:00 2001
From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com>
Date: Thu, 13 Nov 2025 17:01:40 +0100
Subject: [PATCH 1/2] chore: document timeout on OnReceiveEvent hook
---
docs/router/configuration.mdx | 11 +++-
docs/router/cosmo-streams.mdx | 2 +-
.../custom-modules/on-publish-event.mdx | 2 +-
.../custom-modules/on-receive-event.mdx | 63 ++++++++++++++++---
4 files changed, 65 insertions(+), 13 deletions(-)
diff --git a/docs/router/configuration.mdx b/docs/router/configuration.mdx
index 0571dcf0..cd6de117 100644
--- a/docs/router/configuration.mdx
+++ b/docs/router/configuration.mdx
@@ -1572,7 +1572,7 @@ cdn:
## Events
-The Events section lets you define Event Sources for [Event-Driven Federated Subscriptions (EDFS)](/router/event-driven-federated-subscriptions-edfs).
+The Events section lets you define Event Sources for [Cosmo Streams / EDFS](/router/cosmo-streams).
We support NATS, Kafka and Redis as event bus provider.
@@ -1647,6 +1647,15 @@ events:
| | id | | The ID of the provider. This have to match with the ID specified in the subgraph schema. | [] |
| | urls | | A list of redis instances URLS, e.g: `redis://localhost:6379/2` | |
+### Subscription Hooks
+
+Configuration for Cosmo Streams hooks. Learn more about [Custom Modules for Cosmo Streams](/router/cosmo-streams/custom-modules).
+
+| Environment Variable | YAML | Required | Description | Default Value |
+| :------------------------------------------------------------- | :--------------------- | :------- | :------------------------------------------------------------------------------------------ | :- |
+| | subscription_hooks.on_receive_events.handler_timeout | | | The maximum time to wait for all OnReceiveEvents handlers to complete per event-batch. | 5s |
+| | subscription_hooks.on_receive_events.max_concurrent_handlers | | | The maximum number of concurrent OnReceiveEvents handlers per broker message queue / topic. | 100 |
+
## Router Engine Configuration
Configure the GraphQL Execution Engine of the Router.
diff --git a/docs/router/cosmo-streams.mdx b/docs/router/cosmo-streams.mdx
index ee75f1b4..8bdd4e28 100644
--- a/docs/router/cosmo-streams.mdx
+++ b/docs/router/cosmo-streams.mdx
@@ -114,7 +114,7 @@ The `@edfs__natsPublish`, `@edfs__kafkaPublish`, and `@edfs__redisPublish ` dire
Using the `@edfs__natsSubscribe`, `@edfs__kafkaSubscribe` and `@edfs__redisSubscribe` directives, you can create a Subscription to the corresponding message bus. By default, all the provider implementations are stateless, meaning every client receives the same events in a broadcast fashion. This behavior can be adjusted. NATS allows you to create a [consumer group](https://docs.nats.io/nats-concepts/jetstream/consumers), resulting in multiple independent streams of the subject, where each client can consume events at their own pace.
-The `@openfed__subscriptionFilter` directive allows you to filter subscription messages based on specified conditions. For more information see [Subscription Filter](/router/event-driven-federated-subscriptions-edfs#subscription-filter).
+The `@openfed__subscriptionFilter` directive allows you to filter subscription messages based on specified conditions. For more information see [Subscription Filter](/router/cosmo-streams#subscription-filter).
An Event-Driven Subgraph does not need to be implemented. It is simply a Subgraph Schema that instructs the Router on how to connect specific root fields to the Event Source. Scroll down for an example.
diff --git a/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx b/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx
index 17cc90ec..964b8488 100644
--- a/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx
+++ b/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx
@@ -49,7 +49,7 @@ type StreamPublishEventHandlerContext interface {
## Error Handling
-As mentioned in the [Publish Overview Section](/router/event-driven-federated-subscriptions-edfs#publish) the return type of a Cosmo Streams mutation
+As mentioned in the [Publish Overview Section](/router/cosmo-streams#publish) the return type of a Cosmo Streams mutation
must use the type `PublishEventResult`. This type declares a boolean `success` field.
Implementations of `OnPublishEvents` handlers return two fields: `events` and `error`. When `error` is not `nil`, the client's response
will have the success field set to `false`. Also the error will be logged on the routers console output.
diff --git a/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx b/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx
index 17fad2ec..92ba3398 100644
--- a/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx
+++ b/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx
@@ -16,11 +16,6 @@ This handler is particularly useful for:
- **Authentication and authorization**: Filter events based on user permissions
- **Monitoring and analytics**: Log or track events for observability
-
-The handler runs once for each active subscription, so it's recommended to avoid resource-heavy computations or
-blocking operations to maintain performance.
-
-
If there is no active subscription this handler is not executed, even if new messages arrive at the broker.
This is because the Router will not listen for messages on the broker topic/queue until at least one
@@ -46,6 +41,9 @@ type StreamReceiveEventHandler interface {
}
type StreamReceiveEventHandlerContext interface {
+ // Context is a context for handlers.
+ // If it is cancelled, the handler should stop processing.
+ Context() context.Context
// Request is the initial client request that started the subscription
Request() *http.Request
// Logger is the logger for the request
@@ -61,20 +59,65 @@ type StreamReceiveEventHandlerContext interface {
}
```
-## Asynchronous Execution
+## Asynchronous Execution and Performance Considerations
The `OnReceiveEvents` handler is executed **asynchronously** for each active subscription when events are received from the message broker.
To control resource usage and prevent overwhelming your system, you can configure the maximum number of concurrent handlers using
-the `max_concurrent_event_receive_handlers` configuration option.
+the `max_concurrent_handlers` configuration option.
```yaml
events:
subscription_hooks:
- max_concurrent_event_receive_handlers: 100 # Default: 100
+ on_receive_events:
+ max_concurrent_handlers: 100 # Default: 100
```
-This setting controls how many `OnReceiveEvents` handlers routines can run simultaneously across all active subscriptions.
-This helps prevent resource exhaustion while maintaining good performance and low latency for event processing.
+This amount of concurrent handlers is not global inside the router. It is defined per topic / message queue being observed by the router.
+If you define multiple Cosmo Streams subscriptions inside your schema, each one will have its own concurrent handlers limit.
+
+When the maximum number of concurrent handlers is reached for a topic, events will not get polled from the message queue until a handler has finished its work and is free again.
+To avoid sending out of order events to the subscription clients, the router will wait a certain amount of time for all handlers to finish, before polling the next batch of events.
+This timeout is configurable via router configuration:
+```yaml
+events:
+ subscription_hooks:
+ on_receive_events:
+ handler_timeout: 1s # default: 5s
+```
+
+If the timeout is reached, the router will poll the next batch of events immediately, which could result in sending out of order events to the subscription clients.
+In this case a warning will be logged.
+
+We recommend to use the `ctx.Context()`, which is cancelled in such a case.
+You can use it to abort any long running operations:
+
+```go
+func (m *CosmoStreamsModule) OnReceiveEvents(
+ ctx core.StreamReceiveEventHandlerContext,
+ events datasource.StreamEvents,
+) (datasource.StreamEvents, error) {
+ for _, event := range events.All() {
+ select {
+ case <-ctx.Context().Done():
+ ctx.Logger().Debug("context cancelled, stopping processing and return no events to subscriber")
+ return datasource.StreamEvents{}, nil
+ default:
+ // process event here...
+ }
+ }
+ return events, nil
+}
+```
+
+
+The router will not abort the handler if the context is cancelled. It will simply move on to receive the next batch of events from the broker and process them.
+In this case events could be sent out of order to subscription clients, because old long-running handlers are still processing the previous batch of events.
+
+
+
+For as long as the handler limit is reached, the router will not poll the next batch of events from the broker.
+This effectively means that the subscription does not get updated until a new handler is free again.
+
## Error Handling
From 42490e83116af0d5adb0a4922285ae5c9b7c971f Mon Sep 17 00:00:00 2001
From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com>
Date: Fri, 14 Nov 2025 12:00:32 +0100
Subject: [PATCH 2/2] chore: use provider as term and rephrase
---
docs/router/configuration.mdx | 8 ++--
.../custom-modules/on-publish-event.mdx | 14 +++----
.../custom-modules/on-receive-event.mdx | 40 +++++++++----------
3 files changed, 31 insertions(+), 31 deletions(-)
diff --git a/docs/router/configuration.mdx b/docs/router/configuration.mdx
index cd6de117..a81f9942 100644
--- a/docs/router/configuration.mdx
+++ b/docs/router/configuration.mdx
@@ -1651,10 +1651,10 @@ events:
Configuration for Cosmo Streams hooks. Learn more about [Custom Modules for Cosmo Streams](/router/cosmo-streams/custom-modules).
-| Environment Variable | YAML | Required | Description | Default Value |
-| :------------------------------------------------------------- | :--------------------- | :------- | :------------------------------------------------------------------------------------------ | :- |
-| | subscription_hooks.on_receive_events.handler_timeout | | | The maximum time to wait for all OnReceiveEvents handlers to complete per event-batch. | 5s |
-| | subscription_hooks.on_receive_events.max_concurrent_handlers | | | The maximum number of concurrent OnReceiveEvents handlers per broker message queue / topic. | 100 |
+| Environment Variable | YAML | Required | Description | Default Value |
+| :------------------------------------------------------------- | :--------------------- | :------- | :------------------------------------------------------------------------------------- | :- |
+| | subscription_hooks.on_receive_events.handler_timeout | | | The maximum time to wait for all OnReceiveEvents handlers to complete per event-batch. | 5s |
+| | subscription_hooks.on_receive_events.max_concurrent_handlers | | | The maximum number of concurrent OnReceiveEvents handlers per trigger. | 100 |
## Router Engine Configuration
diff --git a/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx b/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx
index 964b8488..7daa6559 100644
--- a/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx
+++ b/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx
@@ -1,20 +1,20 @@
---
title: "OnPublishEvents Handler"
-description: "A Cosmo Streams Custom Module, which lets you customize events before they are sent to message brokers"
+description: "A Cosmo Streams Custom Module, which lets you customize events before they are sent to message providers"
icon: "arrow-right"
---
-The `OnPublishEvents` handler is a custom module hook that allows you to intercept and process events before they are sent to message brokers through GraphQL mutations. This handler is called whenever a batch of events is about to be published to a provider, giving you the opportunity to filter, transform, enrich, or validate events before they are sent to the message broker.
+The `OnPublishEvents` handler is a custom module hook that allows you to intercept and process events before they are sent to providers through GraphQL mutations. This handler is called whenever a batch of events is about to be published to a provider, giving you the opportunity to filter, transform, enrich, or validate events before they are sent to the provider.
This handler is particularly useful for:
- **Event validation**: Ensure events meet specific criteria before publishing
-- **Data transformation**: Modify event payloads to match broker expectations
+- **Data transformation**: Modify event payloads to match provider expectations
- **Event enrichment**: Add additional metadata or context to events
- **Authentication and authorization**: Filter events based on user permissions
- **Monitoring and analytics**: Log or track outgoing events for observability
-This handler is executed only when a GraphQL mutation triggers event publishing. Unlike `OnReceiveEvents`, this handler processes outgoing events to message brokers, not incoming events from subscriptions.
+This handler is executed only when a GraphQL mutation triggers event publishing. Unlike `OnReceiveEvents`, this handler processes outgoing events to providers, not incoming events from subscriptions.
## Handler Interface
@@ -62,7 +62,7 @@ See code examples below for a demonstration.
When the `OnPublishEvents` handler returns an error, the router takes the following actions:
1. The client will receive a response, where the `success` field is `false`
-2. Returned events are sent to the message broker, if any provided
+2. Returned events are sent to the message providers, if any provided
3. The error is logged by the router with details about the mutation, provider, and field name
@@ -216,7 +216,7 @@ func (m *SelectivePublishHandler) OnPublishEvents(
### Prevent unauthorized users from sending Cosmo Streams mutation events to providers
-You can use `ctx.Authentication()` to validate that only authorized users can publish events to specific message brokers.
+You can use `ctx.Authentication()` to validate that only authorized users can publish events to specific providers.
This is useful for securing mutation operations that trigger event publishing.
```go
@@ -264,7 +264,7 @@ func (m *AuthEventHandler) OnPublishEvents(
### Attach headers to Kafka events
-You can attach headers to Kafka events before sending them to brokers.
+You can attach headers to Kafka events before sending them to providers.
```go
func (m *EventPublishModule) OnPublishEvents(
diff --git a/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx b/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx
index 92ba3398..99b8a182 100644
--- a/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx
+++ b/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx
@@ -1,12 +1,12 @@
---
title: "OnReceiveEvents Handler"
-description: "A Cosmo Streams Custom Module, which lets you customize events received from a message broker before being passed to subscribers"
+description: "A Cosmo Streams Custom Module, which lets you customize events received from a message provider before being passed to subscribers"
icon: "arrow-left"
---
The `OnReceiveEvents` handler is a custom module hook that allows you to intercept and process events received from
-supported message brokers before they are delivered to GraphQL subscription clients.
-This handler is called whenever a batch of events is received from a message broker, giving you the opportunity to filter, transform, enrich,
+supported message providers before they are delivered to GraphQL subscription clients.
+This handler is called whenever a batch of events is received from a provider, giving you the opportunity to filter, transform, enrich,
or validate events before they reach your subscribers.
This handler is particularly useful for:
@@ -17,8 +17,8 @@ This handler is particularly useful for:
- **Monitoring and analytics**: Log or track events for observability
-If there is no active subscription this handler is not executed, even if new messages arrive at the broker.
-This is because the Router will not listen for messages on the broker topic/queue until at least one
+If there is no active subscription this handler is not executed, even if new messages arrive at the provider.
+This is because the Router will not listen for messages on the provider topic/queue until at least one
client subscribes to a particular subscription.
@@ -61,7 +61,7 @@ type StreamReceiveEventHandlerContext interface {
## Asynchronous Execution and Performance Considerations
-The `OnReceiveEvents` handler is executed **asynchronously** for each active subscription when events are received from the message broker.
+The `OnReceiveEvents` handler is executed **asynchronously** for each active subscription when events are received from the provider.
To control resource usage and prevent overwhelming your system, you can configure the maximum number of concurrent handlers using
the `max_concurrent_handlers` configuration option.
@@ -72,12 +72,12 @@ events:
max_concurrent_handlers: 100 # Default: 100
```
-This amount of concurrent handlers is not global inside the router. It is defined per topic / message queue being observed by the router.
-If you define multiple Cosmo Streams subscriptions inside your schema, each one will have its own concurrent handlers limit.
+This limit applies per [Trigger](/router/cosmo-streams#deduplication-of-subscriptions), not globablly.
+
+When the maximum number of concurrent handlers for a topic is reached, the router will not poll new events from the message queue until a handler finishes and becomes available again.
+To avoid delivering events out of order to subscription clients, the router waits for all handlers to complete before polling the next batch of events.
+This waiting period is configurable via the router settings:
-When the maximum number of concurrent handlers is reached for a topic, events will not get polled from the message queue until a handler has finished its work and is free again.
-To avoid sending out of order events to the subscription clients, the router will wait a certain amount of time for all handlers to finish, before polling the next batch of events.
-This timeout is configurable via router configuration:
```yaml
events:
subscription_hooks:
@@ -85,11 +85,11 @@ events:
handler_timeout: 1s # default: 5s
```
-If the timeout is reached, the router will poll the next batch of events immediately, which could result in sending out of order events to the subscription clients.
-In this case a warning will be logged.
+If the timeout is reached, the router immediately polls the next batch of events, which may result in out-of-order delivery to subscription clients.
+In this case, a warning is logged.
-We recommend to use the `ctx.Context()`, which is cancelled in such a case.
-You can use it to abort any long running operations:
+It is recommended to use `ctx.Context()`, which is cancelled in such situations.
+You can use this context to abort any long-running operations:
```go
func (m *CosmoStreamsModule) OnReceiveEvents(
@@ -110,13 +110,13 @@ func (m *CosmoStreamsModule) OnReceiveEvents(
```
-The router will not abort the handler if the context is cancelled. It will simply move on to receive the next batch of events from the broker and process them.
-In this case events could be sent out of order to subscription clients, because old long-running handlers are still processing the previous batch of events.
+The router does not abort the handler when the context is cancelled. Instead, it proceeds to receive the next batch of events from the provider.
+In this case, events may be delivered out of order because long-running handlers are still processing the previous batch.
-For as long as the handler limit is reached, the router will not poll the next batch of events from the broker.
-This effectively means that the subscription does not get updated until a new handler is free again.
+While the handler limit is reached, the router will not poll the next batch of events from the provider.
+This effectively means that the subscription will not receive updates until a handler becomes free again.
## Error Handling
@@ -302,7 +302,7 @@ func (m *EventFilterModule) OnReceiveEvents(
Certain providers enrich their messages with metadata accessible by the Router.
Kafka and NATS, for example, have the option to add headers to messages.
-Here's an example that filters out all messages coming from a Kafka broker where a header indicates
+Here's an example that filters out all messages coming from a Kafka instance where a header indicates
it's not meant for GraphQL subscriptions.
```go