Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/router/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,7 @@ cdn:

## Events

The Events section lets you define Event Sources for [Event-Driven Federated Subscriptions (EDFS)](/router/event-driven-federated-subscriptions-edfs).
The Events section lets you define Event Sources for [Cosmo Streams / EDFS](/router/cosmo-streams).

We support NATS, Kafka and Redis as event bus provider.

Expand Down Expand Up @@ -1647,6 +1647,15 @@ events:
| | id | | The ID of the provider. This have to match with the ID specified in the subgraph schema. | [] |
| | urls | | A list of redis instances URLS, e.g: `redis://localhost:6379/2` | |

### Subscription Hooks

Configuration for Cosmo Streams hooks. Learn more about [Custom Modules for Cosmo Streams](/router/cosmo-streams/custom-modules).

| Environment Variable | YAML | Required | Description | Default Value |
| :------------------------------------------------------------- | :--------------------- | :------- | :------------------------------------------------------------------------------------- | :- |
| | subscription_hooks.on_receive_events.handler_timeout | <Icon icon="square" /> | | The maximum time to wait for all OnReceiveEvents handlers to complete per event-batch. | 5s |
| | subscription_hooks.on_receive_events.max_concurrent_handlers | <Icon icon="square" /> | | The maximum number of concurrent OnReceiveEvents handlers per trigger. | 100 |

## Router Engine Configuration

Configure the GraphQL Execution Engine of the Router.
Expand Down
2 changes: 1 addition & 1 deletion docs/router/cosmo-streams.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ The `@edfs__natsPublish`, `@edfs__kafkaPublish`, and `@edfs__redisPublish ` dire

Using the `@edfs__natsSubscribe`, `@edfs__kafkaSubscribe` and `@edfs__redisSubscribe` directives, you can create a Subscription to the corresponding message bus. By default, all the provider implementations are stateless, meaning every client receives the same events in a broadcast fashion. This behavior can be adjusted. NATS allows you to create a [consumer group](https://docs.nats.io/nats-concepts/jetstream/consumers), resulting in multiple independent streams of the subject, where each client can consume events at their own pace.

The `@openfed__subscriptionFilter` directive allows you to filter subscription messages based on specified conditions. For more information see [Subscription Filter](/router/event-driven-federated-subscriptions-edfs#subscription-filter).
The `@openfed__subscriptionFilter` directive allows you to filter subscription messages based on specified conditions. For more information see [Subscription Filter](/router/cosmo-streams#subscription-filter).

An Event-Driven Subgraph does not need to be implemented. It is simply a Subgraph Schema that instructs the Router on how to connect specific root fields to the Event Source. Scroll down for an example.

Expand Down
16 changes: 8 additions & 8 deletions docs/router/cosmo-streams/custom-modules/on-publish-event.mdx
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
---
title: "OnPublishEvents Handler"
description: "A Cosmo Streams Custom Module, which lets you customize events before they are sent to message brokers"
description: "A Cosmo Streams Custom Module, which lets you customize events before they are sent to message providers"
icon: "arrow-right"
---

The `OnPublishEvents` handler is a custom module hook that allows you to intercept and process events before they are sent to message brokers through GraphQL mutations. This handler is called whenever a batch of events is about to be published to a provider, giving you the opportunity to filter, transform, enrich, or validate events before they are sent to the message broker.
The `OnPublishEvents` handler is a custom module hook that allows you to intercept and process events before they are sent to providers through GraphQL mutations. This handler is called whenever a batch of events is about to be published to a provider, giving you the opportunity to filter, transform, enrich, or validate events before they are sent to the provider.

This handler is particularly useful for:
- **Event validation**: Ensure events meet specific criteria before publishing
- **Data transformation**: Modify event payloads to match broker expectations
- **Data transformation**: Modify event payloads to match provider expectations
- **Event enrichment**: Add additional metadata or context to events
- **Authentication and authorization**: Filter events based on user permissions
- **Monitoring and analytics**: Log or track outgoing events for observability

<Info>
This handler is executed only when a GraphQL mutation triggers event publishing. Unlike `OnReceiveEvents`, this handler processes outgoing events to message brokers, not incoming events from subscriptions.
This handler is executed only when a GraphQL mutation triggers event publishing. Unlike `OnReceiveEvents`, this handler processes outgoing events to providers, not incoming events from subscriptions.
</Info>

## Handler Interface
Expand Down Expand Up @@ -49,7 +49,7 @@ type StreamPublishEventHandlerContext interface {

## Error Handling

As mentioned in the [Publish Overview Section](/router/event-driven-federated-subscriptions-edfs#publish) the return type of a Cosmo Streams mutation
As mentioned in the [Publish Overview Section](/router/cosmo-streams#publish) the return type of a Cosmo Streams mutation
must use the type `PublishEventResult`. This type declares a boolean `success` field.
Implementations of `OnPublishEvents` handlers return two fields: `events` and `error`. When `error` is not `nil`, the client's response
will have the success field set to `false`. Also the error will be logged on the routers console output.
Expand All @@ -62,7 +62,7 @@ See code examples below for a demonstration.
When the `OnPublishEvents` handler returns an error, the router takes the following actions:

1. The client will receive a response, where the `success` field is `false`
2. Returned events are sent to the message broker, if any provided
2. Returned events are sent to the message providers, if any provided
3. The error is logged by the router with details about the mutation, provider, and field name


Expand Down Expand Up @@ -216,7 +216,7 @@ func (m *SelectivePublishHandler) OnPublishEvents(

### Prevent unauthorized users from sending Cosmo Streams mutation events to providers

You can use `ctx.Authentication()` to validate that only authorized users can publish events to specific message brokers.
You can use `ctx.Authentication()` to validate that only authorized users can publish events to specific providers.
This is useful for securing mutation operations that trigger event publishing.

```go
Expand Down Expand Up @@ -264,7 +264,7 @@ func (m *AuthEventHandler) OnPublishEvents(

### Attach headers to Kafka events

You can attach headers to Kafka events before sending them to brokers.
You can attach headers to Kafka events before sending them to providers.

```go
func (m *EventPublishModule) OnPublishEvents(
Expand Down
77 changes: 60 additions & 17 deletions docs/router/cosmo-streams/custom-modules/on-receive-event.mdx
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
---
title: "OnReceiveEvents Handler"
description: "A Cosmo Streams Custom Module, which lets you customize events received from a message broker before being passed to subscribers"
description: "A Cosmo Streams Custom Module, which lets you customize events received from a message provider before being passed to subscribers"
icon: "arrow-left"
---

The `OnReceiveEvents` handler is a custom module hook that allows you to intercept and process events received from
supported message brokers before they are delivered to GraphQL subscription clients.
This handler is called whenever a batch of events is received from a message broker, giving you the opportunity to filter, transform, enrich,
supported message providers before they are delivered to GraphQL subscription clients.
This handler is called whenever a batch of events is received from a provider, giving you the opportunity to filter, transform, enrich,
or validate events before they reach your subscribers.

This handler is particularly useful for:
Expand All @@ -16,14 +16,9 @@ This handler is particularly useful for:
- **Authentication and authorization**: Filter events based on user permissions
- **Monitoring and analytics**: Log or track events for observability

<Warning>
The handler runs once for each active subscription, so it's recommended to avoid resource-heavy computations or
blocking operations to maintain performance.
</Warning>

<Info>
If there is no active subscription this handler is not executed, even if new messages arrive at the broker.
This is because the Router will not listen for messages on the broker topic/queue until at least one
If there is no active subscription this handler is not executed, even if new messages arrive at the provider.
This is because the Router will not listen for messages on the provider topic/queue until at least one
client subscribes to a particular subscription.
</Info>

Expand All @@ -46,6 +41,9 @@ type StreamReceiveEventHandler interface {
}

type StreamReceiveEventHandlerContext interface {
// Context is a context for handlers.
// If it is cancelled, the handler should stop processing.
Context() context.Context
// Request is the initial client request that started the subscription
Request() *http.Request
// Logger is the logger for the request
Expand All @@ -61,20 +59,65 @@ type StreamReceiveEventHandlerContext interface {
}
```

## Asynchronous Execution
## Asynchronous Execution and Performance Considerations

The `OnReceiveEvents` handler is executed **asynchronously** for each active subscription when events are received from the message broker.
The `OnReceiveEvents` handler is executed **asynchronously** for each active subscription when events are received from the provider.
To control resource usage and prevent overwhelming your system, you can configure the maximum number of concurrent handlers using
the `max_concurrent_event_receive_handlers` configuration option.
the `max_concurrent_handlers` configuration option.

```yaml
events:
subscription_hooks:
max_concurrent_event_receive_handlers: 100 # Default: 100
on_receive_events:
max_concurrent_handlers: 100 # Default: 100
```

This limit applies per [Trigger](/router/cosmo-streams#deduplication-of-subscriptions), not globablly.

When the maximum number of concurrent handlers for a topic is reached, the router will not poll new events from the message queue until a handler finishes and becomes available again.
To avoid delivering events out of order to subscription clients, the router waits for all handlers to complete before polling the next batch of events.
This waiting period is configurable via the router settings:

```yaml
events:
subscription_hooks:
on_receive_events:
handler_timeout: 1s # default: 5s
```

If the timeout is reached, the router immediately polls the next batch of events, which may result in out-of-order delivery to subscription clients.
In this case, a warning is logged.

It is recommended to use `ctx.Context()`, which is cancelled in such situations.
You can use this context to abort any long-running operations:

```go
func (m *CosmoStreamsModule) OnReceiveEvents(
ctx core.StreamReceiveEventHandlerContext,
events datasource.StreamEvents,
) (datasource.StreamEvents, error) {
for _, event := range events.All() {
select {
case <-ctx.Context().Done():
ctx.Logger().Debug("context cancelled, stopping processing and return no events to subscriber")
return datasource.StreamEvents{}, nil
default:
// process event here...
}
}
return events, nil
}
```

This setting controls how many `OnReceiveEvents` handlers routines can run simultaneously across all active subscriptions.
This helps prevent resource exhaustion while maintaining good performance and low latency for event processing.
<Info>
The router does not abort the handler when the context is cancelled. Instead, it proceeds to receive the next batch of events from the provider.
In this case, events may be delivered out of order because long-running handlers are still processing the previous batch.
</Info>

<Warning>
While the handler limit is reached, the router will not poll the next batch of events from the provider.
This effectively means that the subscription will not receive updates until a handler becomes free again.
</Warning>

## Error Handling

Expand Down Expand Up @@ -259,7 +302,7 @@ func (m *EventFilterModule) OnReceiveEvents(

Certain providers enrich their messages with metadata accessible by the Router.
Kafka and NATS, for example, have the option to add headers to messages.
Here's an example that filters out all messages coming from a Kafka broker where a header indicates
Here's an example that filters out all messages coming from a Kafka instance where a header indicates
it's not meant for GraphQL subscriptions.

```go
Expand Down