diff --git a/CLAUDE.md b/CLAUDE.md index de6af3af..65560099 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -42,7 +42,7 @@ mintlify install - **Namespaces**: Environment isolation for graphs (dev, staging, prod) - **Feature Flags & Feature Subgraphs**: Toggle-able subgraph replacements for incremental rollout - **Schema Contracts**: Filtered graph versions using @tag directives -- **EDFS**: Event-Driven Federated Subscriptions for real-time data +- **Cosmo Streams / EDFS: Connects to event streaming systems like Kafka, NATS, and Redis to power event-driven, federated operations through real-time subscriptions. ## Documentation Structure diff --git a/docs/docs.json b/docs/docs.json index ad7ed671..1d31236e 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -224,20 +224,30 @@ ] }, { - "group": "Event-Driven Federated Subscriptions (EDFS)", - "icon": "calendar-users", + "group": "Cosmo Streams (EDFS)", + "icon": "wave", "pages": [ - "router/event-driven-federated-subscriptions-edfs", + "router/cosmo-streams", { "group": "NATS", "icon": "puzzle-piece", "pages": [ - "router/event-driven-federated-subscriptions-edfs/nats", - "router/event-driven-federated-subscriptions-edfs/nats/stream-and-consumer-configuration" + "router/cosmo-streams/nats", + "router/cosmo-streams/nats/stream-and-consumer-configuration" ] }, - "router/event-driven-federated-subscriptions-edfs/kafka", - "router/event-driven-federated-subscriptions-edfs/redis" + "router/cosmo-streams/kafka", + "router/cosmo-streams/redis", + { + "group": "Custom Modules", + "icon": "cubes", + "pages": [ + "router/cosmo-streams/custom-modules", + "router/cosmo-streams/custom-modules/subscription-on-start", + "router/cosmo-streams/custom-modules/on-receive-event", + "router/cosmo-streams/custom-modules/on-publish-event" + ] + } ] }, "router/compliance-and-data-management", @@ -745,5 +755,11 @@ "gtm": { "tagId": "GTM-5GPL7DQH" } - } + }, + "redirects": [ + { + "source": "/router/event-driven-federated-subscriptions-edfs/:path*", + "destination": "/router/cosmo-streams/:path*" + } + ] } diff --git a/docs/federation/directives/openfed__subscriptionfilter.mdx b/docs/federation/directives/openfed__subscriptionfilter.mdx index 3088a849..80a6c9b6 100644 --- a/docs/federation/directives/openfed__subscriptionfilter.mdx +++ b/docs/federation/directives/openfed__subscriptionfilter.mdx @@ -26,7 +26,7 @@ input openfed__SubscriptionFilterCondition { ## Overview -The `@openfed__subscriptionFilter` directive declares that a field definition can be filtered by filter conditions. The directive can only be applied to [EDFS](/federation/event-driven-federated-subscriptions) subscriptions. +The `@openfed__subscriptionFilter` directive declares that a field definition can be filtered by filter conditions. The directive can only be applied to [EDG](/federation/event-driven-federated-subscriptions) subscriptions. ## Arguments diff --git a/docs/federation/event-driven-federated-subscriptions.mdx b/docs/federation/event-driven-federated-subscriptions.mdx index 8fb2a622..5c43f3f5 100644 --- a/docs/federation/event-driven-federated-subscriptions.mdx +++ b/docs/federation/event-driven-federated-subscriptions.mdx @@ -7,6 +7,6 @@ icon: circle-info Defining an Event-Driven Graph with Event-Driven Federated Subscriptions. -An Event-Driven Graph (EDG) is best thought to be an abstract subgraph that facilitates Event-Driven Federated Subscriptions (EDFS). If a subgraph uses or defines any event driven directives, it will be interpreted to be an Event-Driven Graph. +An Event-Driven Graph (EDG) is best thought to be an abstract subgraph that facilitates [Cosmo Streams](/router/cosmo-streams). If a subgraph uses or defines any event driven directives, it will be interpreted to be an Event-Driven Graph. diff --git a/docs/router/configuration.mdx b/docs/router/configuration.mdx index 0cf0ce94..18c7e31e 100644 --- a/docs/router/configuration.mdx +++ b/docs/router/configuration.mdx @@ -637,7 +637,7 @@ This option may change or be removed in future versions as the OpenTelemetry SDK | METRICS_OTLP_EXCLUDE_METRIC_LABELS | exclude_metric_labels | | The metric labels to exclude from the OTEL metrics. Accepts a list of Go regular expressions. Use https://regex101.com/ to test your regular expressions. | [] | | METRICS_OTLP_CONNECTION_STATS | connection_stats | | Enable connection metrics. | false | | METRICS_OTLP_CIRCUIT_BREAKER | circuit_breaker | | Ensure that circuit breaker metrics are enabled for OTEL. | false | -| METRICS_OTLP_STREAM | streams | | Enable EDFS stream metrics. | false | +| METRICS_OTLP_STREAM | streams | | Enable Cosmo Streams metrics. | false | ### Attributes @@ -688,7 +688,7 @@ telemetry: | PROMETHEUS_EXCLUDE_METRIC_LABELS | exclude_metric_labels | | | | | PROMETHEUS_EXCLUDE_SCOPE_INFO | exclude_scope_info | | Exclude scope info from Prometheus metrics. | false | | PROMETHEUS_CIRCUIT_BREAKER | circuit_breaker | | Enable the circuit breaker metrics for prometheus metric collection. | false | -| PROMETHEUS_OTLP_STREAM | streams | | Enable EDFS stream metrics. | false | +| PROMETHEUS_OTLP_STREAM | streams | | Enable Cosmo Streams metrics. | false | ### Example YAML config: @@ -1613,7 +1613,7 @@ cdn: ## Events -The Events section lets you define Event Sources for [Event-Driven Federated Subscriptions (EDFS)](/router/event-driven-federated-subscriptions-edfs). +The Events section lets you define Event Sources for [Cosmo Streams / EDFS](/router/cosmo-streams). We support NATS, Kafka and Redis as event bus provider. @@ -1688,6 +1688,15 @@ events: | | id | | The ID of the provider. This have to match with the ID specified in the subgraph schema. | [] | | | urls | | A list of redis instances URLS, e.g: `redis://localhost:6379/2` | | +### Cosmo Streams handlers + +Configuration for Cosmo Streams handlers. Learn more about [Custom Modules for Cosmo Streams](/router/cosmo-streams/custom-modules). + +| Environment Variable | YAML | Required | Description | Default Value | +| :------------------- | :------------------------------------------------- | :--------------------- | :------------------------------------------------------------------------------------- | :- | +| | handlers.on_receive_events.handler_timeout | | The maximum time to wait for all OnReceiveEvents handlers to complete per event-batch. | 5s | +| | handlers.on_receive_events.max_concurrent_handlers | | The maximum number of concurrent OnReceiveEvents handlers per trigger. | 100 | + ## Router Engine Configuration Configure the GraphQL Execution Engine of the Router. diff --git a/docs/router/event-driven-federated-subscriptions-edfs.mdx b/docs/router/cosmo-streams.mdx similarity index 87% rename from docs/router/event-driven-federated-subscriptions-edfs.mdx rename to docs/router/cosmo-streams.mdx index e43cb5ee..c3150d88 100644 --- a/docs/router/event-driven-federated-subscriptions-edfs.mdx +++ b/docs/router/cosmo-streams.mdx @@ -1,6 +1,6 @@ --- -title: "Event-Driven Federated Subscriptions (EDFS)" -description: "EDFS combines the power of GraphQL Federation and Event-Driven Architecture (Kafka, NATS, Redis) to update a user GraphQL Subscription after an event occurs in your system." +title: "Cosmo Streams (EDFS)" +description: "Cosmo Streams (formally known as EDFS) combines the power of GraphQL Federation and Event-Driven Architecture to update a user GraphQL Subscription after an event occurs in your system." icon: "circle-info" sidebarTitle: "Overview" --- @@ -12,7 +12,7 @@ sidebarTitle: "Overview" /> -Event Driven Federated Subscriptions (EDFS) solves 3 major problems when it comes to GraphQL Federation and Subscriptions by directly connecting the Router to an event source like Kafka and NATS and making it a part of the Graph. +Cosmo Streams solves 3 major problems when it comes to GraphQL Federation and Subscriptions by directly connecting the Router to an event source like Kafka and NATS and making it a part of the Graph. ## Intro @@ -41,18 +41,18 @@ Furthermore, classic Subscriptions with Federation are quite expensive when it c ## Specification -Enter Event-Driven Federated Subscriptions, a simple way to scale Federated Subscriptions in a resource-efficient manner. +Enter Cosmo Streams, a simple way to scale Federated Subscriptions in a resource-efficient manner. -EDFS supports three event providers: +Cosmo Streams supports three event providers: - + - + - + @@ -61,7 +61,7 @@ Each provider consists of at least Publish and Subscribe capabilities. For NATS, Our goal is to integrate with various technologies rather than agree on a single unified interface. This approach allows us to leverage the strengths of each technology. This philosophy is reflected in how we structure the directives, naming parameters, exposing features as they would appear in their respective ecosystems. -Here is an overview about all EDFS directives: +Here is an overview about all Cosmo Streams directives: ```js # Nats and JetStream integration @@ -117,13 +117,13 @@ The `@edfs__natsPublish`, `@edfs__kafkaPublish`, and `@edfs__redisPublish ` dire Using the `@edfs__natsSubscribe`, `@edfs__kafkaSubscribe` and `@edfs__redisSubscribe` directives, you can create a Subscription to the corresponding message bus. By default, all the provider implementations are stateless, meaning every client receives the same events in a broadcast fashion. This behavior can be adjusted. NATS allows you to create a [consumer group](https://docs.nats.io/nats-concepts/jetstream/consumers), resulting in multiple independent streams of the subject, where each client can consume events at their own pace. -The `@openfed__subscriptionFilter` directive allows you to filter subscription messages based on specified conditions. For more information see [Subscription Filter](/router/event-driven-federated-subscriptions-edfs#subscription-filter). +The `@openfed__subscriptionFilter` directive allows you to filter subscription messages based on specified conditions. For more information see [Subscription Filter](/router/cosmo-streams#subscription-filter). An Event-Driven Subgraph does not need to be implemented. It is simply a Subgraph Schema that instructs the Router on how to connect specific root fields to the Event Source. Scroll down for an example. ## Prerequisites -To use EDFS, you need to have an Event Source running and connected to the Router. Currently, we support NATS, Kafka, and Redis. For simplicity, NATS is used to explain the examples. +To use Cosmo Streams, you need to have an Event Source running and connected to the Router. Currently, we support NATS, Kafka, and Redis. For simplicity, NATS is used to explain the examples. To get started, run a NATS instance and add the following configuration to your `config.yaml` Router Configuration: @@ -318,7 +318,7 @@ Here's an **invalid** message as the `__typename` field is missing: -It's important to send the `__typename` field because this allows EDFS to also work for Union and Interface types. +It's important to send the `__typename` field because this allows Cosmo Streams to also work for Union and Interface types. It's worth noting that the Router will not send any responses before you publish a message on the topic. If you need the most recent result, first make a Query, and then subscribe to the Topic. The Router will send the first response only after a message is published on the rendered topic. @@ -440,11 +440,11 @@ The Cosmo Router deduplicates Subscriptions internally to save resources. If mul ### Statelessness of Subgraphs -With EDFS, the Router connects directly to the Event Source but doesn't require any stateful connections, e.g. WebSocket, to the Subgraphs. This makes the Subgraphs much simpler to reason about and easier to deploy. Serverless deployment options usually have limitations on request length. With an Event Broker in the middle, Subgraphs can be stateless without having to give up on Subscriptions. +With Cosmo Streams, the Router connects directly to the Event Source but doesn't require any stateful connections, e.g. WebSocket, to the Subgraphs. This makes the Subgraphs much simpler to reason about and easier to deploy. Serverless deployment options usually have limitations on request length. With an Event Broker in the middle, Subgraphs can be stateless without having to give up on Subscriptions. ### Efficiency, CPU & Memory Consumption (Epoll/Kqueue) -EDFS is built on top of Event-Driven principles, which means that the implementation is non-blocking, as CPU efficient as possible, and has a very low memory footprint. +Cosmo Streams is built on top of Event-Driven principles, which means that the implementation is non-blocking, as CPU efficient as possible, and has a very low memory footprint. We're using Epoll and Kqueue on Systems that support it (Linux, Darwin, etc.) to be as efficient as possible. @@ -454,10 +454,10 @@ The Router supports multi-core out of the box and is capable of scaling up to a ### Publish Events from any System, not just Subgraphs -It's worth noting that publishing Entity update Events is not limited to just Subgraphs. EDFS is designed to fully decouple the API Consumer from the implementation of the Event-Driven Architecture. +It's worth noting that publishing Entity update Events is not limited to just Subgraphs. Cosmo Streams is designed to fully decouple the API Consumer from the implementation of the Event-Driven Architecture. -A client can create a Job via a Mutation and Subscribe to the Job state via EDFS. Next, the Mutation can kick off a long-running process that will be handled by one or many systems in the background. At each step, e.g. when an increment of work is done, each subsystem can publish a message to indicate that the state of an Entity has changed. +A client can create a Job via a Mutation and Subscribe to the Job state via Cosmo Streams. Next, the Mutation can kick off a long-running process that will be handled by one or many systems in the background. At each step, e.g. when an increment of work is done, each subsystem can publish a message to indicate that the state of an Entity has changed. Once the message is published by one of the sub-systems, the Router can Query all Subgraphs to resolve the current state of the Job. -With EDFS, each Subgraph can add fields to an Entity that it's responsible for and publish events to the Message Broker when a long-running Operation updates the overall state of an Entity. \ No newline at end of file +With Cosmo Streams, each Subgraph can add fields to an Entity that it's responsible for and publish events to the Message Broker when a long-running Operation updates the overall state of an Entity. \ No newline at end of file diff --git a/docs/router/cosmo-streams/custom-modules.mdx b/docs/router/cosmo-streams/custom-modules.mdx new file mode 100644 index 00000000..d40ac79e --- /dev/null +++ b/docs/router/cosmo-streams/custom-modules.mdx @@ -0,0 +1,21 @@ +--- +title: "Custom Modules" +sidebarTitle: Overview +description: "Customize Streams behavior with powerful handlers for subscription lifecycle, event processing, and data transformation." +icon: "circle-info" +--- + +Cosmo Router provides powerful handlers to hook into the event processing of Cosmo Streams. +These handlers allow you to implement custom logic for subscription lifecycle management, event processing and data transformation. + + +Custom Modules in Cosmo Streams are available since Router version 0.266.0 + + +## Available Hooks + +The Cosmo Streams system provides three main hook interfaces that you can implement with [Custom Modules](/router/custom-modules): + +- [`SubscriptionOnStartHandler`](/router/cosmo-streams/custom-modules/subscription-on-start): Called when a client subscribes +- [`OnReceiveEventHandler`](/router/cosmo-streams/custom-modules/on-receive-event): Called when events are received from a message broker +- [`OnPublishEventHandler`](/router/cosmo-streams/custom-modules/on-publish-event): Called when events are going to be sent to a message broker diff --git a/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx b/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx new file mode 100644 index 00000000..4d519d11 --- /dev/null +++ b/docs/router/cosmo-streams/custom-modules/on-publish-event.mdx @@ -0,0 +1,305 @@ +--- +title: "OnPublishEvents Handler" +description: "A Cosmo Streams Custom Module, which lets you customize events before they are sent to message providers" +icon: "arrow-right" +--- + +The `OnPublishEvents` handler is a custom module hook that allows you to intercept and process events before they are sent to providers through GraphQL mutations. This handler is called whenever a batch of events is about to be published to a provider, giving you the opportunity to filter, transform, enrich, or validate events before they are sent to the provider. + +This handler is particularly useful for: +- **Event validation**: Ensure events meet specific criteria before publishing +- **Data transformation**: Modify event payloads to match provider expectations +- **Event enrichment**: Add additional metadata or context to events +- **Authentication and authorization**: Filter events based on user permissions +- **Monitoring and analytics**: Log or track outgoing events for observability + + +This handler is executed only when a GraphQL mutation triggers event publishing. Unlike `OnReceiveEvents`, this handler processes outgoing events to providers, not incoming events from subscriptions. + + +## Handler Interface + +In order to use the `OnPublishEvents` handler you need to create a [Custom Module](../../custom-modules) which implements the `StreamPublishEventHandler` interface. + +```go +type StreamPublishEventHandler interface { + // OnPublishEvents is called each time a batch of events is going to be sent to a provider. + // The events argument contains all events from a batch. + // Use events.All() to iterate through them and event.Clone() to create mutable copies, when needed. + // Returning an error will result in a GraphQL error being returned to the client, could be customized returning a + // StreamHookError. + OnPublishEvents(ctx StreamPublishEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) +} + +type StreamPublishEventHandlerContext interface { + // Request is the original request received by the router. + Request() *http.Request + // Logger is the logger for the request + Logger() *zap.Logger + // Operation is the GraphQL operation + Operation() OperationContext + // Authentication is the authentication for the request + Authentication() authentication.Authentication + // PublishEventConfiguration the publish event configuration + PublishEventConfiguration() datasource.PublishEventConfiguration + // NewEvent creates a new event that can be used in the subscription. + // + // The data parameter must contain valid JSON bytes representing the event payload + // that will be sent to your message broker (Kafka, NATS, etc.). The JSON must have + // properly quoted property names and must include the __typename field required by GraphQL. + // For example: []byte(`{"__typename": "Employee", "id": 1, "update": {"name": "John"}}`). + // + // This method is typically used in OnPublishEvents hooks to create new or modified events + // before they are sent to the message broker. + NewEvent(data []byte) datasource.MutableStreamEvent +} +``` + +## Error Handling + +As mentioned in the [Publish Overview Section](/router/cosmo-streams#publish) the return type of a Cosmo Streams mutation +must use the type `PublishEventResult`. This type declares a boolean `success` field. +Implementations of `OnPublishEvents` handlers return two fields: `events` and `error`. When `error` is not `nil`, the client's response +will have the success field set to `false`. Also the error will be logged on the routers console output. +When events are returned, these will always be sent to the provider, even if you return an error. +This can be useful in case you partially processed data but hit an error along the way. +In case you don't wont to sent any events to the provider, you can return `datasource.NewStreamEvents(nil)`. +See code examples below for a demonstration. + + +When the `OnPublishEvents` handler returns an error, the router takes the following actions: + +1. The client will receive a response, where the `success` field is `false` +2. Returned events are sent to the message providers, if any provided +3. The error is logged by the router with details about the mutation, provider, and field name + + + +Returning events alongide an error from `OnPublishEvents` will send these events to the provider. +In case you don't want to send any you need to return an empty list of events. +Refer to the examples down below to see how this can be done. + + +Here is an example of proper error handling: + +```go +func (m *MyEventHandler) OnPublishEvents( + ctx core.StreamPublishEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + // For validation failures, don't send events to the provider + // and return a success=false to the client. + for _, event := range events.All() { + if !isValidEvent(event.GetData()) { + return datasource.NewStreamEvents(nil), errors.New("invalid event data - publication rejected") + } + } + + // In case of partial processing of data, + // you can send all events part of the successfull processing to the provider + // and can still return an error. + if failureAfterPartialProcessing { + return partialEvents, errors.New("error during data processing") + } + + return events, nil +} +``` + +## Usage Example + +### Complete Custom Module with Event Bypass + +The following example contains a complete Custom Module implementation, including handler registration, with a handler that will simply pass events through unchanged. This demonstrates how to register your `OnPublishEvents` Custom Module. + +```go +package module + +import ( + "github.com/wundergraph/cosmo/router/core" + "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" + "go.uber.org/zap" +) + +func init() { + // Register your module with the router + core.RegisterModule(&EventPublishModule{}) +} + +const ModuleID = "eventPublishModule" + +// EventPublishModule demonstrates a complete custom module implementation +// that implements StreamPublishEventHandler but simply passes events through unchanged +type EventPublishModule struct {} + +func (m *AuthEventHandler) OnPublishEvents( + ctx core.StreamPublishEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + auth := ctx.Authentication() + + // Require authentication for publishing events + if auth == nil { + logger.Warn("Unauthenticated user attempted to publish events") + return nil, errors.New("authentication required to publish events") + } + + // Check JWT claims for required permissions + claims := auth.Claims() + if claims == nil { + return nil, errors.New("invalid authentication token") + } + + // Check for required role to publish events + roleClaimValue, hasRole := claims["role"] + if !hasRole { + return nil, errors.New("missing role claim - publication denied") + } + + userRole, ok := roleClaimValue.(string) + if !ok || (userRole != "admin" && userRole != "publisher") { + logger.Warn("User without publish permissions attempted to publish events", + zap.Any("role", roleClaimValue), + ) + return nil, errors.New("insufficient permissions to publish events") + } + + // User is authorized - allow event publishing + logger.Debug("Authorized user publishing events", + zap.String("role", userRole), + zap.Int("event_count", events.Len()), + ) + return events, nil +} + +// Module returns the module information for registration +func (m *EventPublishModule) Module() core.ModuleInfo { + return core.ModuleInfo{ + ID: ModuleID, + New: func() core.Module { + return &EventPublishModule{} + }, + } +} + +// Interface guards to ensure we implement the required interfaces +var ( + _ core.StreamPublishEventHandler = (*EventPublishModule)(nil) +) +``` + +### Restrict Handler to run on certain mutations and providers + +Most of the time you want your hook to only deal with certain mutations. +The `OnPublishEvents` Handler is run for every mutation configured for Cosmo Streams. +You can access the name of the mutation you care for and return early if it's not the right one. + +```go +func (m *SelectivePublishHandler) OnPublishEvents( + ctx core.StreamPublishEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + pubConfig := ctx.PublishEventConfiguration() + + // Bypass handler if it's not the right mutation + if pubConfig.RootFieldName() != "updateEmployee" { + return events, nil + } + + // And / or you can decide to bypass in case it's not the right provider + // you want to deal with here. + if pubConfig.ProviderID() != "my-kafka" { + return events, nil + } + + + // Your specific event processing logic here + // ... + + return datasource.NewStreamEvents(processedEvents), nil +} +``` + +### Prevent unauthorized users from sending Cosmo Streams mutation events to providers + +You can use `ctx.Authentication()` to validate that only authorized users can publish events to specific providers. +This is useful for securing mutation operations that trigger event publishing. + +```go +func (m *AuthEventHandler) OnPublishEvents( + ctx core.StreamPublishEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + auth := ctx.Authentication() + + // Require authentication for publishing events + if auth == nil { + logger.Warn("Unauthenticated user attempted to publish events") + return nil, errors.New("authentication required to publish events") + } + + // Check JWT claims for required permissions + claims := auth.Claims() + if claims == nil { + return nil, errors.New("invalid authentication token") + } + + // Check for required role to publish events + roleClaimValue, hasRole := claims["role"] + if !hasRole { + return nil, errors.New("missing role claim - publication denied") + } + + userRole, ok := roleClaimValue.(string) + if !ok || (userRole != "admin" && userRole != "publisher") { + logger.Warn("User without publish permissions attempted to publish events", + zap.Any("role", roleClaimValue), + ) + return nil, errors.New("insufficient permissions to publish events") + } + + // User is authorized - allow event publishing + logger.Debug("Authorized user publishing events", + zap.String("role", userRole), + zap.Int("event_count", events.Len()), + ) + return events, nil +} +``` + +### Attach headers to Kafka events + +You can attach headers to Kafka events before sending them to providers. + +```go +func (m *EventPublishModule) OnPublishEvents( + ctx core.StreamPublishEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + // Bypass handler in case it's not about Kafka events + if ctx.PublishEventConfiguration().ProviderType() != datasource.ProviderTypeKafka { + return events, nil + } + + eventsWithHeaders := make([]datasource.StreamEvent, 0, events.Len()) + for _, evt := range events.All() { + // In order to set headers we need to clone the event first to make it mutable + clonedEvent := evt.Clone() + kafkaEvent, ok := clonedEvent.(*kafka.MutableEvent) + if !ok { + rootFieldName := ctx.PublishEventConfiguration().RootFieldName() + ctx.Logger(). + With(zapcore.Field{Key: "root_field_name", String: rootFieldName}). + Warn("got non-kafka event in kafka based handler, this should not happen") + } + + kafkaEvent.Headers["event_source"] = []byte("graphql_mutation") + eventsWithHeaders = append(eventsWithHeaders, kafkaEvent) + } + + return datasource.NewStreamEvents(eventsWithHeaders), nil +} +``` diff --git a/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx b/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx new file mode 100644 index 00000000..c4b74a9f --- /dev/null +++ b/docs/router/cosmo-streams/custom-modules/on-receive-event.mdx @@ -0,0 +1,366 @@ +--- +title: "OnReceiveEvents Handler" +description: "A Cosmo Streams Custom Module, which lets you customize events received from a message provider before being passed to subscribers" +icon: "arrow-left" +--- + +The `OnReceiveEvents` handler is a custom module hook that allows you to intercept and process events received from +supported message providers before they are delivered to GraphQL subscription clients. +This handler is called whenever a batch of events is received from a provider, giving you the opportunity to filter, transform, enrich, +or validate events before they reach your subscribers. + +This handler is particularly useful for: +- **Event filtering**: Remove unwanted events based on custom logic +- **Data transformation**: Modify event payloads to match client expectations +- **Event enrichment**: Add additional data to events from external sources +- **Authentication and authorization**: Filter events based on user permissions +- **Monitoring and analytics**: Log or track events for observability + + +If there is no active subscription this handler is not executed, even if new messages arrive at the provider. +This is because the Router will not listen for messages on the provider topic/queue until at least one +client subscribes to a particular subscription. + + +## Handler Interface + +In order to use the `OnReceiveEvents` handler you need to create a [Custom Module](../../custom-modules) which implements +the `StreamReceiveEventHandler` interface. + +```go +type StreamReceiveEventHandler interface { + // OnReceiveEvents is called whenever a batch of events is received from a provider, + // before delivering them to clients. + // The hook will be called once for each active subscription, therefore it is advised to + // avoid resource heavy computation or blocking tasks whenever possible. + // The events argument contains all events from a batch and is shared between + // all active subscribers of these events. + // Use events.All() to iterate through them and event.Clone() to create mutable copies, when needed. + // Returning an error will result in the subscription being closed and the error being logged. + OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events datasource.StreamEvents) (datasource.StreamEvents, error) +} + +type StreamReceiveEventHandlerContext interface { + // Context is a context for handlers. + // If it is cancelled, the handler should stop processing. + Context() context.Context + // Request is the initial client request that started the subscription + Request() *http.Request + // Logger is the logger for the request + Logger() *zap.Logger + // Operation is the GraphQL operation + Operation() OperationContext + // Authentication is the authentication for the request + Authentication() authentication.Authentication + // SubscriptionEventConfiguration the subscription event configuration + SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration + // NewEvent creates a new event that can be used in the subscription. + // + // The data parameter must contain valid JSON bytes representing the raw event payload + // from your message broker (Kafka, NATS, etc.). The JSON must have properly quoted + // property names and must include the __typename field required by GraphQL. + // For example: []byte(`{"__typename": "Employee", "id": 1, "update": {"name": "John"}}`). + // + // This method is typically used in OnReceiveEvents hooks to create new or modified events. + NewEvent(data []byte) datasource.MutableStreamEvent +} +``` + +## Asynchronous Execution and Performance Considerations + +The `OnReceiveEvents` handler is executed **asynchronously** for each active subscription when events are received from the provider. +To control resource usage and prevent overwhelming your system, you can configure the maximum number of concurrent handlers using +the `max_concurrent_handlers` configuration option. + +```yaml +events: + handlers: + on_receive_events: + max_concurrent_handlers: 100 # Default: 100 +``` + +This limit applies per [Trigger](/router/cosmo-streams#deduplication-of-subscriptions), not globablly. + +When the maximum number of concurrent handlers for a topic is reached, the router will not poll new events from the message queue until a handler finishes and becomes available again. +To avoid delivering events out of order to subscription clients, the router waits for all handlers to complete before polling the next batch of events. +This waiting period is configurable via the router settings: + +```yaml +events: + handlers: + on_receive_events: + handler_timeout: 1s # default: 5s +``` + +If the timeout is reached, the router immediately polls the next batch of events, which may result in out-of-order delivery to subscription clients. +In this case, a warning is logged. + +It is recommended to use `ctx.Context()`, which is cancelled in such situations. +You can use this context to abort any long-running operations: + +```go +func (m *CosmoStreamsModule) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + for _, event := range events.All() { + select { + case <-ctx.Context().Done(): + ctx.Logger().Debug("context cancelled, stopping processing and return no events to subscriber") + return datasource.StreamEvents{}, nil + default: + // process event here... + } + } + return events, nil +} +``` + + +The router does not abort the handler when the context is cancelled. Instead, it proceeds to receive the next batch of events from the provider. +In this case, events may be delivered out of order because long-running handlers are still processing the previous batch. + + + +While the handler limit is reached, the router will not poll the next batch of events from the provider. +This effectively means that the subscription will not receive updates until a handler becomes free again. + + +## Error Handling + +When the `OnReceiveEvents` handler returns an error, the router takes the following actions: + +1. **Send Returned Events**: Alongside the error you can return events, if you wish to sent them to the client prior connection closing +2. **Subscription Closure**: The affected subscription is immediately closed for the client that encountered the error +3. **Error Logging**: The error is logged by the router with details about the subscription, provider, and field name +4. **Error Deduplication**: If multiple subscriptions experience the same error for the same events, the router deduplicates the error messages in the logs to prevent spam +5. **No Error Propagation**: The error is **not** sent directly to the GraphQL client - the subscription simply closes + + +Returning an error from `OnReceiveEvents` will close the subscription for that specific client. Use this only when you want to terminate the subscription due to unrecoverable conditions. For filtering events, return an empty event list instead of an error. + + + +The error gets logged by the router but it won't be send to the client. +From the view of the client the subscription closes server-side without a reason. We are working on a solution for this. + + +**Example of proper error handling:** + +```go +func (m *MyEventHandler) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + // For recoverable issues, filter events instead of returning errors + if someCondition { + return datasource.NewStreamEvents(nil), nil // Empty events, no error + } + + // Only return errors for unrecoverable conditions + if criticalSystemFailure { + return nil, errors.New("critical system failure - closing subscription") + } + + return events, nil +} +``` + +## Usage Example + +### Complete Custom Module with Event Bypass + +The following example contains a complete Custom Module implementation, including handler registration, +with a handler that will simply bypass events unchanged. This is not useful on it's own but demonstrates +how to register your `OnReceiveEvents` Custom Module. + +```go +package module + +import ( + "github.com/wundergraph/cosmo/router/core" + "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource" + "go.uber.org/zap" +) + +func init() { + // Register your module with the router + core.RegisterModule(&EventBypassModule{}) +} + +const ModuleID = "eventBypassModule" + +// EventBypassModule demonstrates a complete custom module implementation +// that implements StreamReceiveEventHandler but simply passes events through unchanged +type EventBypassModule struct {} + +// OnReceiveEvents passes all events through unchanged +func (m *EventBypassModule) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + logger.Debug("Processing events - bypassing unchanged", + zap.Int("event_count", len(events.All())), + ) + + // Simply return the events unchanged + return events, nil +} + +// Module returns the module information for registration +func (m *EventBypassModule) Module() core.ModuleInfo { + return core.ModuleInfo{ + ID: ModuleID, + New: func() core.Module { + return &EventBypassModule{} + }, + } +} + +// Interface guards to ensure we implement the required interfaces +var ( + _ core.StreamReceiveEventHandler = (*EventBypassModule)(nil) +) +``` + +### Restrict Handler to run on certain subscriptions and providers + +Most of the time you want your hook to only deal with a certain subscription. +The `OnReceiveEvents` Handler is run for every subscription configured for Cosmo Streams. +You can access the name of the subscription you care for and return early if it's not the right one. + +```go +func (m *SelectiveEventHandler) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + subConfig := ctx.SubscriptionEventConfiguration() + + // Bypass handler if it's not the right subscription + if subConfig.RootFieldName() != "employeeUpdated" { + return events, nil + } + + // And / or you can decide to process events only from a specific provider configured in the Router + if subConfig.ProviderID() != "my-nats" { + return events, nil + } + + + // Your specific event processing logic here + // ... + + return datasource.NewStreamEvents(processedEvents), nil +} +``` + +### Filter out events based on clients authentication token claim + +You can use `ctx.Authentication()` to access authentication data, such as tokens, if available. +Based on that you can filter events, if the token misses the proper claim. + +```go +func (m *EventFilterModule) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + auth := ctx.Authentication() + + // If no authentication, block all events + if auth == nil { + return datasource.NewStreamEvents(nil), + errors.New("No authentication present, closing subscription") + } + + // Check JWT claims + claims := auth.Claims() + if claims == nil { + return datasource.NewStreamEvents(nil), + errors.New("No claims present, closing subscription") + } + + // Check for admin role claim + roleClaimValue, hasRole := claims["role"] + if !hasRole { + logger.Debug("No role claim, blocking all events") + return datasource.NewStreamEvents(nil), nil + } + + userRole, ok := roleClaimValue.(string) + if !ok || userRole != "admin" { + logger.Debug("User is not admin, blocking all events", + zap.Any("role", roleClaimValue), + ) + return datasource.NewStreamEvents(nil), nil + } + + // User is admin - pass all events through + logger.Debug("Admin user authorized, passing all events", + zap.Int("event_count", events.Len()), + ) + return events, nil +} +``` + +### Filter out events based on message metadata + +Certain providers enrich their messages with metadata accessible by the Router. +Kafka and NATS, for example, have the option to add headers to messages. +Here's an example that filters out all messages coming from a Kafka instance where a header indicates +it's not meant for GraphQL subscriptions. + +```go +import ( + "github.com/wundergraph/cosmo/router/pkg/pubsub/kafka" +) + +func (m *HeaderFilterModule) OnReceiveEvents( + ctx core.StreamReceiveEventHandlerContext, + events datasource.StreamEvents, +) (datasource.StreamEvents, error) { + logger := ctx.Logger() + + // Only process events from Kafka providers. + // Pass through unchanged for non-Kafka providers. + if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeKafka { + return events, nil + } + + // Optionally validate specific provider ID or subscription field + logger.Debug("Processing Kafka events for subscription", + zap.String("provider_id", subConfig.ProviderID()), + zap.String("field_name", subConfig.RootFieldName()), + zap.Int("event_count", events.Len()), + ) + + filteredEvents := make([]datasource.StreamEvent, 0, events.Len()) + + for _, event := range events.All() { + // Check if this is a Kafka event with headers + if kafkaEvent, ok := event.(*kafka.Event); ok { + headers := kafkaEvent.GetHeaders() + + // Filter out events with "internal" header set to "true" + if internalHeader, exists := headers["internal"]; exists { + if string(internalHeader) == "true" { + logger.Debug("Filtering out internal event") + continue + } + } + } + + // Include this event in the results + filteredEvents = append(filteredEvents, event) + } + + logger.Debug("Filtered events by headers", + zap.Int("original_count", events.Len()), + zap.Int("filtered_count", len(filteredEvents)), + ) + + return datasource.NewStreamEvents(filteredEvents), nil +} +``` \ No newline at end of file diff --git a/docs/router/cosmo-streams/custom-modules/subscription-on-start.mdx b/docs/router/cosmo-streams/custom-modules/subscription-on-start.mdx new file mode 100644 index 00000000..03f9266a --- /dev/null +++ b/docs/router/cosmo-streams/custom-modules/subscription-on-start.mdx @@ -0,0 +1,233 @@ +--- +title: "SubscriptionOnStart Handler" +description: "A Cosmo Streams Custom Module, which lets you customize subscription initialization behavior" +icon: "circle-play" +--- + +The `SubscriptionOnStart` handler is a custom module hook that allows you to intercept and customize the initialization of GraphQL subscriptions. +This handler is called once when a subscription starts, giving you the opportunity to validate permissions, send initial events, or perform setup logic. + +This handler is particularly useful for: +- **Subscription authentication**: Validate JWT tokens or user permissions before allowing subscriptions +- **Initial event delivery**: Send welcome messages or current state to new subscribers +- **Subscription logging**: Track subscription attempts and user behavior +- **Connection validation**: Ensure clients meet specific criteria before subscribing +- **Rate limiting**: Control subscription attempts per user or client +- **State initialization**: Initialize state used by other handlers such as `OnReceiveEvents` or `OnPublishEvents` of the same module + +## Handler Interface + +In order to use the `SubscriptionOnStart` handler you need to create a [Custom Module](../../custom-modules) which implements the `SubscriptionOnStartHandler` interface. + +```go +type SubscriptionOnStartHandler interface { + // SubscriptionOnStart is called once at subscription start + // The error is propagated to the client. + SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error +} + +type SubscriptionOnStartHandlerContext interface { + // Request is the original request received by the router. + Request() *http.Request + // Logger is the logger for the request + Logger() *zap.Logger + // Operation is the GraphQL operation + Operation() OperationContext + // Authentication is the authentication for the request + Authentication() authentication.Authentication + // SubscriptionEventConfiguration is the subscription event configuration (will return nil for engine subscription) + SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration + // EmitLocalEvent sends an event directly to the subscription stream of the + // currently connected client. + // + // This method triggers the router to resolve the client's operation and emit + // the resulting data as a stream event. The event exists only within the + // router; it is not forwarded to any message broker. + // + // The event is delivered exclusively to the client associated with the current + // handler execution. No other subscriptions are affected. + // + // The method returns true if the event was successfully emitted, or false if + // it was dropped. + EmitLocalEvent(event datasource.StreamEvent) bool + // NewEvent creates a new event that can be used in the subscription. + // + // The data parameter must contain valid JSON bytes. The format depends on the subscription type. + // + // For event-driven subscriptions (Cosmo Streams / EDFS), the data should contain: + // __typename : The name of the schema entity, which is expected to be returned to the client. + // {keyName} : The key of the entity as configured on the schema via @key directive. + // Example usage: ctx.NewEvent([]byte(`{"__typename": "Employee", "id": 1}`)) + // + // For normal subscriptions, you need to provide the complete GraphQL response structure. + // Example usage: ctx.NewEvent([]byte(`{"data": {"fieldName": value}}`)) + // + // You can use EmitLocalEvent to emit this event to subscriptions. + NewEvent(data []byte) datasource.MutableStreamEvent +} +``` + +## Error Handling + +When you return an error from the `SubscriptionOnStart` handler, the router responds to the client with an error event and closes the subscription. +You can choose to log a generic error or a custom error response with more details for the client. + +```go +return errors.New("my handler error") +``` + +This will result in an internal server error response to the client. + +```json +{ + "errors": [ + { + "message": "Internal server error" + } + ] +} +``` + +Whereas you can return a custom error response with more details for the client. + +```go +return &core.StreamHandlerError{Message: "my graphql error"} +``` + +This will result in a error response with more details for the client. + +```json +{ + "errors": [ + { + "message": "my graphql error", + } + ] +} +``` + + +Errors are not logged automatically by the router. If you need the error to be logged, you can use `ctx.Logger()` to log the error yourself. + + +## Usage Example + +### Complete Custom Module with Event Bypass + +The following example demonstrates how to register a passive `SubscriptionOnStart` handler that logs subscription attempts but allows all subscriptions to proceed normally. + +```go +package module + +import ( + "github.com/wundergraph/cosmo/router/core" + "go.uber.org/zap" +) + +func init() { + // Register your module with the router + core.RegisterModule(&SubscriptionStartModule{}) +} + +const ModuleID = "subscriptionStartModule" + +// SubscriptionStartModule demonstrates a passive subscription start handler +type SubscriptionStartModule struct{} + +// SubscriptionOnStart logs subscription attempts and allows them to proceed +func (m *SubscriptionStartModule) SubscriptionOnStart( + ctx core.SubscriptionOnStartHandlerContext, +) error { + logger := ctx.Logger() + config := ctx.SubscriptionEventConfiguration() + + // Log subscription details + logger.Info("Subscription started", + zap.String("field_name", config.RootFieldName()), + zap.String("provider_id", config.ProviderID()), + zap.String("provider_type", string(config.ProviderType())), + ) + + // Allow subscription to proceed + return nil +} + +// Module returns the module information for registration +func (m *SubscriptionStartModule) Module() core.ModuleInfo { + return core.ModuleInfo{ + ID: ModuleID, + New: func() core.Module { + return &SubscriptionStartModule{} + }, + } +} + +// Interface guards to ensure we implement the required interfaces +var ( + _ core.SubscriptionOnStartHandler = (*SubscriptionStartModule)(nil) +) +``` + +### Return initial events + +You can use `ctx.EmitLocalEvent()` to send initial or welcome events to subscribers immediately when they connect. +This is useful for providing current state or welcome messages. + +```go +func (m *SubscriptionStartModule) SubscriptionOnStart( + ctx core.SubscriptionOnStartHandlerContext, +) error { + // Bypass the handler on other subscriptions + if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdated" { + return nil + } + + // Create an initial event with minimal required fields + // The router will resolve all other fields requested by the subscriber + initialEventData := `{ "__typename": "Employee", "id": 1 }` + initialEvent := ctx.NewEvent([]byte(initialEventData)) + + success := ctx.EmitLocalEvent(initialEvent) + if !success { + ctx.Logger().Warn("Failed to send initial event to subscriber") + } + + return nil +} +``` + +The payload data used to create a new event has to follow a specific format. +It has to be a valid JSON object that contains the `__typename` field to identify the entity type we want to return +for this subscription. The other field in this case is `id`, which represents the entity key of `Employee` types as defined in the schema. +The router will use this information to resolve all fields requested by the subscriber to generate a complete response. + +### Prevent subscriptions on missing token claims + +This example validates JWT tokens and blocks subscriptions for users without the required "role" claim, demonstrating proper authentication enforcement. + +```go +func (m *SubscriptionStartModule) SubscriptionOnStart(ctx core.SubscriptionOnStartHandlerContext) error { + // Only check "employeeUpdated" subscription + if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdated" { + return nil + } + + auth := ctx.Authentication() + if auth == nil { + return &core.StreamHandlerError{Message: "unauthorized"} + } + + // Check for specific "admin" role + roleValue, hasRole := auth.Claims()["role"] + if !hasRole { + return &core.StreamHandlerError{Message: "missing role claim"} + } + + role, ok := roleValue.(string) + if !ok || role != "admin" { + return &core.StreamHandlerError{Message: "admin role required"} + } + + return nil +} +``` diff --git a/docs/router/event-driven-federated-subscriptions-edfs/kafka.mdx b/docs/router/cosmo-streams/kafka.mdx similarity index 90% rename from docs/router/event-driven-federated-subscriptions-edfs/kafka.mdx rename to docs/router/cosmo-streams/kafka.mdx index d81627e6..a3848cb9 100644 --- a/docs/router/event-driven-federated-subscriptions-edfs/kafka.mdx +++ b/docs/router/cosmo-streams/kafka.mdx @@ -1,10 +1,10 @@ --- title: "Kafka" icon: "sitemap" -descripton: "Kafka event provider support for EDFS" +descripton: "Kafka event provider support for Cosmo Streams" --- - + Kafka system with CMS, CRM, ERP feeding into Kafka, output to three routers/clients @@ -20,10 +20,10 @@ descripton: "Kafka event provider support for EDFS" ## Full schema example -Here is a comprehensive example of how to use Kafka with EDFS. This guide covers publish, subscribe, and the filter directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the EDFS schema contract and must not be modified. +Below is a comprehensive example of how to use Kafka with Cosmo Streams. This guide covers publish, subscribe, and the filter directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the Cosmo Streams schema contract and must not be modified. ```js -# EDFS +# Cosmo Streams directive @edfs__kafkaPublish(topic: String!, providerId: String! = "default") on FIELD_DEFINITION directive @edfs__kafkaSubscribe(topics: [String!]!, providerId: String! = "default") on FIELD_DEFINITION diff --git a/docs/router/event-driven-federated-subscriptions-edfs/nats.mdx b/docs/router/cosmo-streams/nats.mdx similarity index 90% rename from docs/router/event-driven-federated-subscriptions-edfs/nats.mdx rename to docs/router/cosmo-streams/nats.mdx index 405b8d61..1cb89f62 100644 --- a/docs/router/event-driven-federated-subscriptions-edfs/nats.mdx +++ b/docs/router/cosmo-streams/nats.mdx @@ -2,7 +2,7 @@ title: "NATS" sidebarTitle: Overview icon: circle-info -description: "NATS event provider support for EDFS" +description: "NATS event provider support for Cosmo Streams" --- @@ -23,10 +23,10 @@ description: "NATS event provider support for EDFS" ## Full schema example -Here is a comprehensive example of how to use NATS with EDFS. This guide covers request, publish, subscribe directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the EDFS schema contract and must not be modified. +Here is a comprehensive example of how to use NATS with Cosmo Streams. This guide covers request, publish, subscribe directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the Cosmo Streams schema contract and must not be modified. ```js -# EDFS +# Cosmo Streams directive @edfs__natsRequest(subject: String!, providerId: String! = "default") on FIELD_DEFINITION directive @edfs__natsPublish(subject: String!, providerId: String! = "default") on FIELD_DEFINITION diff --git a/docs/router/event-driven-federated-subscriptions-edfs/nats/stream-and-consumer-configuration.mdx b/docs/router/cosmo-streams/nats/stream-and-consumer-configuration.mdx similarity index 100% rename from docs/router/event-driven-federated-subscriptions-edfs/nats/stream-and-consumer-configuration.mdx rename to docs/router/cosmo-streams/nats/stream-and-consumer-configuration.mdx diff --git a/docs/router/event-driven-federated-subscriptions-edfs/redis.mdx b/docs/router/cosmo-streams/redis.mdx similarity index 91% rename from docs/router/event-driven-federated-subscriptions-edfs/redis.mdx rename to docs/router/cosmo-streams/redis.mdx index 91d8c0a8..e7f0d35e 100644 --- a/docs/router/event-driven-federated-subscriptions-edfs/redis.mdx +++ b/docs/router/cosmo-streams/redis.mdx @@ -1,7 +1,7 @@ --- title: "Redis" icon: "sitemap" -descripton: "Redis event provider support for EDFS" +descripton: "Redis event provider support for Cosmo Streams" --- @@ -21,10 +21,10 @@ descripton: "Redis event provider support for EDFS" ## Full schema example -Here is a comprehensive example of how to use Redis with EDFS. This guide covers publish, subscribe, and the filter directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the EDFS schema contract and must not be modified. +Below is a comprehensive example of how to use Redis with Cosmo Streams. This guide covers publish, subscribe, and the filter directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the Cosmo Streams schema contract and must not be modified. ```js -# EDFS +# Cosmo Streams directive @edfs__redisPublish(channel: String!, providerId: String! = "default") on FIELD_DEFINITION directive @edfs__redisSubscribe(channels: [String!]!, providerId: String! = "default") on FIELD_DEFINITION diff --git a/docs/router/metrics-and-monitoring.mdx b/docs/router/metrics-and-monitoring.mdx index 3d8c3e5e..f856ecfa 100644 --- a/docs/router/metrics-and-monitoring.mdx +++ b/docs/router/metrics-and-monitoring.mdx @@ -214,8 +214,8 @@ telemetry: * `router.engine.messages.sent`: The number of total messages for subscriptions sent over from the subgraph to the router. -### EDFS Streams Metrics -We expose metrics for EDFS streams, these statistics are collected at the level when the message is sent to the messaging backend or directly received from the messaging backend. +### Cosmo Streams Metrics +We expose metrics for [Cosmo Streams](/router/cosmo-streams). New data points are emitted when new messages are either sent or received from the messaging backend ```yaml config.yaml telemetry: @@ -234,7 +234,7 @@ telemetry: The following attributes are attached to both metrics: * `wg.stream.operation.name`: -This contains the operation type used to send a message to the message backend. This is useful to differentiate when an edfs adapter has multiple ways of sending messages, like in the case of "nats", with `publish`, and `request`. +This contains the operation type used to send a message to the message backend. This is useful to differentiate when a Cosmo Streams adapter has multiple ways of sending messages, like in the case of "nats", with `publish`, and `request`. The following values are possible, based on the messaging backend - nats: `publish`, `request`, `receive` @@ -244,7 +244,7 @@ The following values are possible, based on the messaging backend - redis: `publish`, `receive` * `wg.provider.type`: -One of the supported edfs provider types, which includes `kafka`, `nats`, `redis` +One of the supported Cosmo Streams provider types, which includes `kafka`, `nats`, `redis` * `wg.destination.name`: The name of the destination of the messaging backend (topic, queue, etc) @@ -438,8 +438,8 @@ Here you can see a few example queries to query useful information about your cl ``` - - EDFS stream metrics contain only two metrics. To make sense of your data you need to filter by the attributes. The following examples give you a basic idea of how to use these two metrics. + +Cosmo Streams exposes only two core metrics. To get meaningful insights, you’ll need to filter them using the available attributes. The examples below show how to work with these metrics effectively. #### Get failed publishes for a message broker Let's say we want to see any failed publishes to our kafka broker. We can use the following query, diff --git a/docs/router/metrics-and-monitoring/prometheus-metric-reference.mdx b/docs/router/metrics-and-monitoring/prometheus-metric-reference.mdx index 76424243..9eb4f429 100644 --- a/docs/router/metrics-and-monitoring/prometheus-metric-reference.mdx +++ b/docs/router/metrics-and-monitoring/prometheus-metric-reference.mdx @@ -122,8 +122,8 @@ telemetry: * [`router_engine_messages_sent_total`](#router-engine-messages-sent-total): The number of total messages for subscriptions sent over from the subgraph to the router. -### EDFS Streams Metrics -We expose metrics for EDFS streams, these statistics are collected at the level when the message is sent to the messaging backend or directly received from the messaging backend. +### Cosmo Streams Metrics +We expose metrics for [Cosmo Streams](/router/cosmo-streams), these statistics are collected at the level when the message is sent to the messaging backend or directly received from the messaging backend. ```yaml config.yaml telemetry: @@ -140,7 +140,7 @@ telemetry: The following attributes are attached to both metrics: * `wg_stream_operation_name`: -This contains the operation type used to send a message to the message backend. This is useful to differentiate when an edfs adapter has multiple ways of sending messages, like in the case of "nats", with `publish`, and `request`. +This contains the operation type used to send a message to the message backend. This is useful to differentiate when a Cosmo Streams adapter has multiple ways of sending messages, like in the case of "nats", with `publish`, and `request`. The following values are possible, based on the messaging backend - nats: `publish`, `request`, `receive` @@ -150,7 +150,7 @@ The following values are possible, based on the messaging backend - redis: `publish`, `receive` * `wg_provider_type`: -One of the supported edfs provider types, which includes `kafka`, `nats`, `redis` +One of the supported Cosmo Streams provider types, which includes `kafka`, `nats`, `redis` * `wg_destination_name`: The name of the destination of the messaging backend (topic, queue, etc)