Skip to content

Reactive flows#110

Merged
jeroenrinzema merged 6 commits intomainfrom
feat/reactive-flows
Dec 26, 2025
Merged

Reactive flows#110
jeroenrinzema merged 6 commits intomainfrom
feat/reactive-flows

Conversation

@jeroenrinzema
Copy link
Copy Markdown
Contributor

This PR includes the initial support for reactive flows. The system should rely as little as possible on batch jobs or scheduled intervals. This flow allows users to create reactive lists and journeys based on user events.

graph TB
    Client[Client/API] -->|Submit Event| EventSubject["events.projects.{project_id}"]
    Client -->|Submit User| UserSubject[["users.projects.{project_id}"]]

    EventSubject -->|Consume| EventHandler["Event Handler"]
    UserSubject -->|Consume| UserHandler["User Handler"]

    %% Schema publishing
    EventHandler --> EventSchemaSubject[["events.schemas.{project_id}"]]
    UserHandler --> UserSchemaSubject[["users.schemas.{project_id}"]]

    %% Recompute triggers from events
    EventHandler -->|Affected Lists| ListSubject[["recompute.lists.{project_id}.{list_id}"]]
    EventHandler -->|Affected Journeys| JourneySubject[["recompute.journeys.{project_id}.{journey_id}"]]

    %% Recompute triggers from users
    UserHandler -->|Affected Lists| ListSubject
    UserHandler -->|User system events| EventSubject

    %% Schema handlers
    EventSchemaSubject -->|Consume| EventSchemaHandler["Event Schema Handler"]
    UserSchemaSubject -->|Consume| UserSchemaHandler["User Schema Handler"]

    ListSubject -->|Consume| ListRecompute["List Recomputation"]
    JourneySubject -->|Consume| JourneyRecompute["Journey Recomputation"]

    ListRecompute -->|Publish Jobs| ListJobSubject[["jobs.lists.{project_id}.{list_id}"]]
    ListRecompute -->|List Membership Change| JourneyJobSubject[["jobs.journeys.{project_id}.{journey_id}"]]

    JourneyRecompute -->|Publish Jobs| JourneyJobSubject

    ListJobSubject -->|Consume| ListJobHandler["List Job Handler"]
    JourneyJobSubject -->|Consume| JourneyJobHandler["Journey Job Handler"]
Loading

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces reactive flows to the Lunogram platform, shifting from batch/scheduled processing to event-driven architecture using NATS JetStream. The system now processes users and events asynchronously, automatically recomputing dynamic lists and journeys in response to changes.

Key Changes:

  • Implemented NATS JetStream-based pub/sub system for async event processing
  • Refactored dynamic list membership to use reactive recomputation instead of batch jobs
  • Updated public API endpoints to accept projectID as a path parameter and return 202 for async operations

Reviewed changes

Copilot reviewed 58 out of 60 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
services/nexus/main.go Initializes pubsub system and passes publisher to HTTP servers
services/nexus/internal/pubsub/*.go New pub/sub infrastructure with handlers for events, users, schemas, and list recomputation
services/nexus/internal/store/rules.go New rules store for managing dynamic list evaluation rules
services/nexus/internal/store/lists.go Refactored lists to use separate rules table and reactive recomputation via SQL MERGE
services/nexus/internal/store/migrations/*.sql Database schema changes for reactive flows and rule management
services/nexus/internal/rules/query/*.go Query builder refactored to generate JOINs instead of EXISTS subqueries
services/nexus/internal/http/controllers/v1/public/*.go Public API updated to publish events/users to NATS instead of synchronous processing
services/nexus/internal/http/controllers/v1/management/*.go Management API updated for new list/rule structure and reactive recomputation
pkg/claim/rbac/rbac.go Removed ProjectID from RBAC scope (now in path parameters)
Comments suppressed due to low confidence (1)

services/nexus/internal/store/rules.go:1

  • The ON DELETE RESTRICT constraint on rules_events.rule_id prevents rule deletion if events are associated. This could create operational issues when trying to delete rules. Consider using CASCADE or implementing a soft delete pattern for rules to avoid orphaned data.
package store

Comment thread services/nexus/internal/store/migrations/1766513276_lists.up.sql
Comment thread services/nexus/internal/store/lists.go
Comment thread services/nexus/internal/rules/query/rule.go Outdated
Comment thread services/nexus/internal/pubsub/recompute.go
Comment thread services/nexus/internal/http/controllers/v1/public/oapi/resources.yml Outdated
Comment thread services/nexus/internal/http/controllers/v1/management/lists.go Outdated
Comment thread services/nexus/internal/store/users.go Outdated
@jeroenrinzema jeroenrinzema merged commit 6bf2c70 into main Dec 26, 2025
4 checks passed
@jeroenrinzema jeroenrinzema deleted the feat/reactive-flows branch December 26, 2025 22:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants