diff --git a/docs/router/configuration.mdx b/docs/router/configuration.mdx index 0571dcf0..a81f9942 100644 --- a/docs/router/configuration.mdx +++ b/docs/router/configuration.mdx @@ -1572,7 +1572,7 @@ cdn: ## Events -The Events section lets you define Event Sources for [Event-Driven Federated Subscriptions (EDFS)](/router/event-driven-federated-subscriptions-edfs). +The Events section lets you define Event Sources for [Cosmo Streams / EDFS](/router/cosmo-streams). We support NATS, Kafka and Redis as event bus provider. @@ -1647,6 +1647,15 @@ events: | | id | | The ID of the provider. This have to match with the ID specified in the subgraph schema. | [] | | | urls | | A list of redis instances URLS, e.g: `redis://localhost:6379/2` | | +### Subscription Hooks + +Configuration for Cosmo Streams hooks. Learn more about [Custom Modules for Cosmo Streams](/router/cosmo-streams/custom-modules). + +| Environment Variable | YAML | Required | Description | Default Value | +| :------------------------------------------------------------- | :--------------------- | :------- | :------------------------------------------------------------------------------------- | :- | +| | subscription_hooks.on_receive_events.handler_timeout | | | The maximum time to wait for all OnReceiveEvents handlers to complete per event-batch. | 5s | +| | subscription_hooks.on_receive_events.max_concurrent_handlers | | | The maximum number of concurrent OnReceiveEvents handlers per trigger. | 100 | + ## Router Engine Configuration Configure the GraphQL Execution Engine of the Router. diff --git a/docs/router/cosmo-streams.mdx b/docs/router/cosmo-streams.mdx index ee75f1b4..8bdd4e28 100644 --- a/docs/router/cosmo-streams.mdx +++ b/docs/router/cosmo-streams.mdx @@ -114,7 +114,7 @@ The `@edfs__natsPublish`, `@edfs__kafkaPublish`, and `@edfs__redisPublish ` dire Using the `@edfs__natsSubscribe`, `@edfs__kafkaSubscribe` and `@edfs__redisSubscribe` directives, you can create a Subscription to the corresponding message bus. By default, all the provider implementations are stateless, meaning every client receives the same events in a broadcast fashion. This behavior can be adjusted. NATS allows you to create a [consumer group](https://docs.nats.io/nats-concepts/jetstream/consumers), resulting in multiple independent streams of the subject, where each client can consume events at their own pace. -The `@openfed__subscriptionFilter` directive allows you to filter subscription messages based on specified conditions. For more information see [Subscription Filter](/router/event-driven-federated-subscriptions-edfs#subscription-filter). +The `@openfed__subscriptionFilter` directive allows you to filter subscription messages based on specified conditions. For more information see [Subscription Filter](/router/cosmo-streams#subscription-filter). An Event-Driven Subgraph does not need to be implemented. It is simply a Subgraph Schema that instructs the Router on how to connect specific root fields to the Event Source. Scroll down for an example. diff --git a/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx b/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx index 17cc90ec..7daa6559 100644 --- a/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx +++ b/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx @@ -1,20 +1,20 @@ --- title: "OnPublishEvents Handler" -description: "A Cosmo Streams Custom Module, which lets you customize events before they are sent to message brokers" +description: "A Cosmo Streams Custom Module, which lets you customize events before they are sent to message providers" icon: "arrow-right" --- -The `OnPublishEvents` handler is a custom module hook that allows you to intercept and process events before they are sent to message brokers through GraphQL mutations. This handler is called whenever a batch of events is about to be published to a provider, giving you the opportunity to filter, transform, enrich, or validate events before they are sent to the message broker. +The `OnPublishEvents` handler is a custom module hook that allows you to intercept and process events before they are sent to providers through GraphQL mutations. This handler is called whenever a batch of events is about to be published to a provider, giving you the opportunity to filter, transform, enrich, or validate events before they are sent to the provider. This handler is particularly useful for: - **Event validation**: Ensure events meet specific criteria before publishing -- **Data transformation**: Modify event payloads to match broker expectations +- **Data transformation**: Modify event payloads to match provider expectations - **Event enrichment**: Add additional metadata or context to events - **Authentication and authorization**: Filter events based on user permissions - **Monitoring and analytics**: Log or track outgoing events for observability -This handler is executed only when a GraphQL mutation triggers event publishing. Unlike `OnReceiveEvents`, this handler processes outgoing events to message brokers, not incoming events from subscriptions. +This handler is executed only when a GraphQL mutation triggers event publishing. Unlike `OnReceiveEvents`, this handler processes outgoing events to providers, not incoming events from subscriptions. ## Handler Interface @@ -49,7 +49,7 @@ type StreamPublishEventHandlerContext interface { ## Error Handling -As mentioned in the [Publish Overview Section](/router/event-driven-federated-subscriptions-edfs#publish) the return type of a Cosmo Streams mutation +As mentioned in the [Publish Overview Section](/router/cosmo-streams#publish) the return type of a Cosmo Streams mutation must use the type `PublishEventResult`. This type declares a boolean `success` field. Implementations of `OnPublishEvents` handlers return two fields: `events` and `error`. When `error` is not `nil`, the client's response will have the success field set to `false`. Also the error will be logged on the routers console output. @@ -62,7 +62,7 @@ See code examples below for a demonstration. When the `OnPublishEvents` handler returns an error, the router takes the following actions: 1. The client will receive a response, where the `success` field is `false` -2. Returned events are sent to the message broker, if any provided +2. Returned events are sent to the message providers, if any provided 3. The error is logged by the router with details about the mutation, provider, and field name @@ -216,7 +216,7 @@ func (m *SelectivePublishHandler) OnPublishEvents( ### Prevent unauthorized users from sending Cosmo Streams mutation events to providers -You can use `ctx.Authentication()` to validate that only authorized users can publish events to specific message brokers. +You can use `ctx.Authentication()` to validate that only authorized users can publish events to specific providers. This is useful for securing mutation operations that trigger event publishing. ```go @@ -264,7 +264,7 @@ func (m *AuthEventHandler) OnPublishEvents( ### Attach headers to Kafka events -You can attach headers to Kafka events before sending them to brokers. +You can attach headers to Kafka events before sending them to providers. ```go func (m *EventPublishModule) OnPublishEvents( diff --git a/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx b/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx index 17fad2ec..99b8a182 100644 --- a/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx +++ b/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx @@ -1,12 +1,12 @@ --- title: "OnReceiveEvents Handler" -description: "A Cosmo Streams Custom Module, which lets you customize events received from a message broker before being passed to subscribers" +description: "A Cosmo Streams Custom Module, which lets you customize events received from a message provider before being passed to subscribers" icon: "arrow-left" --- The `OnReceiveEvents` handler is a custom module hook that allows you to intercept and process events received from -supported message brokers before they are delivered to GraphQL subscription clients. -This handler is called whenever a batch of events is received from a message broker, giving you the opportunity to filter, transform, enrich, +supported message providers before they are delivered to GraphQL subscription clients. +This handler is called whenever a batch of events is received from a provider, giving you the opportunity to filter, transform, enrich, or validate events before they reach your subscribers. This handler is particularly useful for: @@ -16,14 +16,9 @@ This handler is particularly useful for: - **Authentication and authorization**: Filter events based on user permissions - **Monitoring and analytics**: Log or track events for observability - -The handler runs once for each active subscription, so it's recommended to avoid resource-heavy computations or -blocking operations to maintain performance. - - -If there is no active subscription this handler is not executed, even if new messages arrive at the broker. -This is because the Router will not listen for messages on the broker topic/queue until at least one +If there is no active subscription this handler is not executed, even if new messages arrive at the provider. +This is because the Router will not listen for messages on the provider topic/queue until at least one client subscribes to a particular subscription. @@ -46,6 +41,9 @@ type StreamReceiveEventHandler interface { } type StreamReceiveEventHandlerContext interface { + // Context is a context for handlers. + // If it is cancelled, the handler should stop processing. + Context() context.Context // Request is the initial client request that started the subscription Request() *http.Request // Logger is the logger for the request @@ -61,20 +59,65 @@ type StreamReceiveEventHandlerContext interface { } ``` -## Asynchronous Execution +## Asynchronous Execution and Performance Considerations -The `OnReceiveEvents` handler is executed **asynchronously** for each active subscription when events are received from the message broker. +The `OnReceiveEvents` handler is executed **asynchronously** for each active subscription when events are received from the provider. To control resource usage and prevent overwhelming your system, you can configure the maximum number of concurrent handlers using -the `max_concurrent_event_receive_handlers` configuration option. +the `max_concurrent_handlers` configuration option. ```yaml events: subscription_hooks: - max_concurrent_event_receive_handlers: 100 # Default: 100 + on_receive_events: + max_concurrent_handlers: 100 # Default: 100 +``` + +This limit applies per [Trigger](/router/cosmo-streams#deduplication-of-subscriptions), not globablly. + +When the maximum number of concurrent handlers for a topic is reached, the router will not poll new events from the message queue until a handler finishes and becomes available again. +To avoid delivering events out of order to subscription clients, the router waits for all handlers to complete before polling the next batch of events. +This waiting period is configurable via the router settings: + +```yaml +events: + subscription_hooks: + on_receive_events: + handler_timeout: 1s # default: 5s +``` + +If the timeout is reached, the router immediately polls the next batch of events, which may result in out-of-order delivery to subscription clients. +In this case, a warning is logged. + +It is recommended to use `ctx.Context()`, which is cancelled in such situations. +You can use this context to abort any long-running operations: + +```go +func (m *CosmoStreamsModule) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + for _, event := range events.All() { + select { + case <-ctx.Context().Done(): + ctx.Logger().Debug("context cancelled, stopping processing and return no events to subscriber") + return datasource.StreamEvents{}, nil + default: + // process event here... + } + } + return events, nil +} ``` -This setting controls how many `OnReceiveEvents` handlers routines can run simultaneously across all active subscriptions. -This helps prevent resource exhaustion while maintaining good performance and low latency for event processing. + +The router does not abort the handler when the context is cancelled. Instead, it proceeds to receive the next batch of events from the provider. +In this case, events may be delivered out of order because long-running handlers are still processing the previous batch. + + + +While the handler limit is reached, the router will not poll the next batch of events from the provider. +This effectively means that the subscription will not receive updates until a handler becomes free again. + ## Error Handling @@ -259,7 +302,7 @@ func (m *EventFilterModule) OnReceiveEvents( Certain providers enrich their messages with metadata accessible by the Router. Kafka and NATS, for example, have the option to add headers to messages. -Here's an example that filters out all messages coming from a Kafka broker where a header indicates +Here's an example that filters out all messages coming from a Kafka instance where a header indicates it's not meant for GraphQL subscriptions. ```go