Skip to content

soapjs/soap-node-demo

Repository files navigation

SoapJS — Comics Universe API demo

REST + WebSocket API showcasing the SoapJS ecosystem with Marvel & DC heroes and villains. Demonstrates clean architecture, full CQRS (commands, queries, read-model projections), domain events with swappable transports (in-memory ↔ Kafka), JWT auth (HTTP and socket-based), MongoDB with a read/write split AND PostgreSQL side by side (two data layers, one domain), cross-feature boundaries, a broadcast-only WebSocket live feed, and a bidirectional chat with rooms on top of @soapjs/soap-node-socket — plus a deliberately plain, non-CQRS feature (comics) and a Postgres-backed feature (authors) to show the framework scales down and across storage engines. From the simplest CRUD to advanced event-driven patterns.

Stack

Package Version Role
@soapjs/soap 0.12.1 Core: DecoratorRegistry, CQRS, Logger / Tracer, HttpContract, ApiDocFragment, MetricsPlugin
@soapjs/soap-express 0.5.0 HTTP layer: Express only — controllers, routing, middleware, CQRS/event decorators, plugins[], middleware.pre
@soapjs/soap-contract-zod 0.1.1 Zod bodyContract — peers: soap + zod + express only
@soapjs/soap-openapi 0.1.1 OpenAPI 3 + Swagger UI (peer: @soapjs/soap only)
@soapjs/soap-node-otel 0.1.0 Request tracing (req.traceId, noop tracer; OTel-ready port)
@soapjs/soap-auth 0.4.4 JWT + local authentication strategies
@soapjs/soap-node-mongo 0.7.1 MongoDB source and query factory
@soapjs/soap-node-sql 0.2.1 PostgreSQL / MySQL / SQLite source and query factory
@soapjs/soap-node-socket 0.0.3 WebSocket / Socket.IO server adapters (HTTP-server attach + path routing)
@soapjs/soap-node-kafka 0.1.2 Kafka EventBus adapter + KafkaDomainEventBus (single consumer per process, in-process fan-out)
pg 8.x Postgres driver (peer of soap-node-sql)
kafkajs 2.x Kafka client (peer of soap-node-kafka)
ws 8.x WebSocket transport (behind soap's AbstractSocketServer)
jsonwebtoken 9.x JWT verification (used by the chat socket auth handshake)

The pinned versions above include the bugfixes that surfaced while building this demo (DI metadata isolation, JWT/Local auth wiring, CQRS bootstrap ordering, RepositoryQuery-vs-FindParams disambiguation, SqlWhereParser dialect-aware identifier quoting, LIMIT/OFFSET syntax, soap-params routing in SqlDataSource, WebSocketServerAdapter HTTP-server attach + path routing + lazy socket.io load, …). They are vendored as local tarballs under vendor/ until the next npm release.

Optional adapters (wired in the app, not in soap-express)

@soapjs/soap-express stays Express-only. Zod contracts, OpenAPI, and tracing are optional packages you install and plug in from src/index.ts (composition root).

Need Package How this demo wires it
Zod body validation + route apiDoc @soapjs/soap-contract-zod bodyContract(schema, meta) on controllers — sets middlewares.pre + apiDoc
OpenAPI + Swagger UI @soapjs/soap-openapi plugins: [{ plugin: new DocumentationPlugin(), options: { … } }]
Request tracing @soapjs/soap-node-otel middleware.pre: [TracingMiddleware.create({ tracer })] + Tracer.Token in DI
Prometheus /metrics @soapjs/soap (MetricsPlugin) plugins: [{ plugin: new MetricsPlugin(…), … }]

Minimal HTTP only (no Zod / docs / OTel):

npm install @soapjs/soap @soapjs/soap-express express
await bootstrap({
  controllers: [MyController],
  middleware: { cors: true, logging: true },
  healthCheck: true,
});

Full stack like this demo — add the adapter packages to package.json, then extend bootstrap({ plugins, middleware: { pre: [...] } }) as in src/index.ts.

Ports live in @soapjs/soap/http: HttpContract, ApiDocFragment, Tracer / Span. Adapters implement them; soap-express does not import adapter packages.

Full write-up: soap/docs/HTTP-ADAPTERS.md

Architecture

Layered by dependency directionapi calls inward, data is called outward. Every external dependency is a port (abstract, lives inward) + an adapter (concrete, at the edge); tech names like Mongo live only on adapters.

src/
├── index.ts                       ← bootstrap() + WebSocket wiring (events wired by soap-express)
│
├── config/
│   ├── config.ts                  ← env config (write + read Mongo connections)
│   └── dependencies.ts            ← composition root: builds container + drainables + Logger
│
├── common/                        ← shared-kernel + cross-cutting (never imports features/)
│   ├── domain/universe.ts         ← shared Universe type (both characters & teams)
│   ├── errors/                    ← ValidationError → 400, DomainRuleError → 422
│   └── sockets/                   ← WsSocketServer (in-house ws transport; soap-node-socket is used for chat)
│                                  ← DomainEventBus + InMemoryDomainEventBus live in @soapjs/soap/cqrs;
│                                    KafkaDomainEventBus lives in @soapjs/soap-node-kafka. The demo just
│                                    binds the right one to DI and lets soap-express wire @EventHandler-s.
│
├── scripts/seed.ts                ← 20 characters, 2 teams, 4 comics, admin, rebuilds projection
│
└── features/
    ├── characters/
    │   ├── domain/
    │   │   ├── character.entity.ts        ← entity + validated create() factory
    │   │   └── events/character.events.ts ← Created / Updated / Deleted (BaseDomainEvent)
    │   ├── application/
    │   │   ├── ports/                      ← Character read/write repo + UniverseStats read model
    │   │   ├── commands/                   ← Create / Update / Delete (+ publish events)
    │   │   └── queries/                    ← Get / List / ListThreats / GetUniverseStats
    │   │       └── specifications/         ← ThreatsSpecification (Tier 2 query object)
    │   ├── contracts/                      ← CharacterLookup — PUBLISHED contract for other features
    │   ├── data/                           ← mapper, CharacterRepositoryMongo, UniverseStats read model
    │   └── api/
    │       ├── characters.controller.ts
    │       ├── events/                     ← audit consumers + UniverseStats projector
    │       └── sockets/                    ← live-feed broadcaster
    │
    ├── teams/                              ← second aggregate (Avengers / Justice League)
    │   ├── domain/                         ← Team entity + TeamMembershipPolicy (domain service)
    │   ├── application/{ports,commands,queries}
    │   ├── data/                           ← mapper, TeamRepositoryMongo
    │   └── api/teams.controller.ts
    │
    ├── comics/                             ← the SIMPLE path: plain HTTP, NO CQRS
    │   ├── domain/comic.entity.ts          ← entity + validated create() factory
    │   ├── application/
    │   │   ├── ports/                      ← single read-write ComicRepository (no split)
    │   │   └── use-cases/                  ← @UseCase classes (Create/Get/Update/Delete/ListPaginated)
    │   ├── data/                           ← mapper, ComicRepositoryMongo
    │   └── api/
    │       ├── comics.controller.ts        ← decorators → use case → repo (/:id, POST, PUT, DELETE)
    │       └── comics.router.ts            ← fluent Router + PaginationIO (GET /comics)
    │
    ├── authors/                             ← the POSTGRES path: same shape, different engine
    │   ├── domain/author.entity.ts          ← entity + validated create() factory
    │   ├── application/
    │   │   ├── ports/author-repository.port.ts ← AuthorRepository abstract port + filters
    │   │   └── use-cases/                   ← Create/Get/List(filters+pagination)/Update/Delete
    │   ├── data/
    │   │   ├── author.dto.ts                ← snake_case AuthorRow (Postgres convention)
    │   │   ├── author.mapper.ts             ← camelCase ↔ snake_case + JSONB serialization
    │   │   ├── author.repository.postgres.ts ← ReadWriteRepository<Author, AuthorRow> over SqlSource
    │   │   └── authors.schema.ts            ← idempotent CREATE TABLE + seed-if-empty
    │   └── api/authors.controller.ts        ← public list/get, admin POST/PUT/DELETE
    │
    ├── chat/                                 ← BIDIRECTIONAL WebSocket feature on @soapjs/soap-node-socket
    │   ├── domain/chat-message.entity.ts    ← ChatMessage entity + ChatRoom union + validation
    │   ├── application/
    │   │   ├── ports/chat-repository.port.ts ← append + listRecent (single RW port)
    │   │   └── use-cases/                    ← GetChatHistoryUseCase, PostChatMessageUseCase
    │   ├── data/                             ← mapper, ChatMessageRepositoryMongo
    │   └── api/
    │       ├── chat.controller.ts            ← REST: GET /chat/:room/messages (bootstrap history)
    │       └── sockets/chat.socket.ts        ← WS: auth-via-first-message, rooms, persistence + broadcast
    │
    └── users/
        ├── domain/ · application/ports/ · data/ · api/
        └── auth.setup.ts                   ← JWT + LocalStrategy config

Quick start with Docker

There's a Makefile at the repo root that wraps the common workflows (make help lists everything):

make up                  # build + start MongoDB + PostgreSQL + Kafka + API (healthchecked)
make seed                # seed Mongo (characters, teams, comics, admin, projection)
make bruno               # run the full Bruno API suite (48 req / 76 tests, see below)
make openapi             # print generated OpenAPI 3 document (API must be up)
make metrics-check       # peek Prometheus /metrics
make kafka-mode          # restart the api with EVENT_BUS=kafka (broker path)
make in-memory-mode      # restart the api with EVENT_BUS=in-memory (default)
make logs                # tail the api container logs
make down-clean          # stop the stack and wipe volumes (mongo + postgres + kafka data)

The equivalent raw commands if you'd rather not use make:

docker compose up -d --build
docker compose exec api node build/scripts/seed.js   # or `npm run seed` locally
EVENT_BUS=kafka docker compose up -d api             # swap to broker path without rebuilding

Chat UI — once the api is up, open http://localhost:3000/chat-ui/chat.html in two browser windows, log in (admin@comics-universe.io / admin123), pick a room, and send messages. Each tab is a separate session so you see messages bounce between them in real-time.

Testing — Bruno collection

A ready-to-run Bruno collection lives under bruno/ — full CRUD per feature plus an E2E Flow that proves the event-driven projection pipeline works end-to-end (create → assert projection incremented → cleanup → assert baseline). Open the bruno/ folder in the Bruno GUI and pick the Local environment, or run it headless:

make bruno         # 48 requests / 76 tests across Health/Auth/Characters/Teams/Comics/Authors/Chat/E2E Flow
make bruno-e2e     # just the E2E Flow folder (fastest smoke test)
make bruno-report  # full suite + bruno-report.html in repo root

make bruno works around two quirks of the Bruno CLI (run . doesn't recurse, and env writes don't persist across CLI processes) by fetching a fresh JWT into the Local env and iterating the folders one by one. See bruno/README.md for the details and the raw CLI fallback.

Local development

docker-compose up mongo postgres   # 1. Start MongoDB + PostgreSQL
npm install                        # 2. Install dependencies
cp .env.example .env               # 3. Configure environment (mongo + postgres)
npm run seed                       # 4. Seed Mongo (postgres seeds itself on boot)
npm run dev                        # 5. Start dev server

API endpoints

Public

Method Path Description
GET /health Liveness check
GET /characters List characters (filter: universe, alignment, search, page, limit)
GET /characters/stats Counts per universe/alignment — served from the read-model projection
GET /characters/threats Villains + anti-heroes (?universe=) — Tier 2 specification query
GET /characters/:id Get one character
GET /teams List teams (?universe=)
GET /teams/:id Get one team
GET /comics List comics — fluent Router + PaginationIO (?page, ?limit, ?filters)
GET /comics/:id Get one comic (decorator controller)
GET /authors List authors — PostgreSQL, Where-based filters (?publisher, ?role, ?country, ?search, ?page, ?limit)
GET /authors/:id Get one author (PostgreSQL)
GET /chat/:room/messages Chat history bootstrap (oldest-first, ?limit=N ≤ 200)
POST /auth/login Login → JWT token
WS /ws Live character feed — broadcasts every character event
WS /chat Bidirectional chat — JWT auth on first frame, rooms, persistence to Mongo
GET /chat-ui Static HTML test client for /chat (two browser tabs for instant 1-on-1)

Protected (JWT required)

Method Path Description
GET /auth/me Current user info

Admin only (role: admin in JWT)

Method Path Description
POST /characters Create character
PUT /characters/:id Update character
DELETE /characters/:id Delete character
POST /teams Create team
POST /teams/:id/members Add a character to a team ({ "characterId": "..." })
DELETE /teams/:id/members/:characterId Remove a member
POST /comics Create comic
PUT /comics/:id Update comic
DELETE /comics/:id Delete comic
POST /authors Create author (PostgreSQL)
PUT /authors/:id Update author (PostgreSQL)
DELETE /authors/:id Delete author (PostgreSQL)

Example requests

# List all Marvel heroes
curl "http://localhost:3000/characters?universe=marvel&alignment=hero"

# Aggregated stats (read-model projection, kept in sync by domain events)
curl "http://localhost:3000/characters/stats"

# Threats only (villains + anti-heroes) — Tier 2 specification query
curl "http://localhost:3000/characters/threats?universe=dc"

# Login
TOKEN=$(curl -s -X POST http://localhost:3000/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":"admin123"}' | jq -r .tokens.accessToken)

# Create character (publishes CharacterCreatedEvent → audit log, projection, WS feed)
curl -X POST http://localhost:3000/characters \
  -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
  -d '{
    "name": "Ant-Man", "alias": "Scott Lang",
    "universe": "marvel", "alignment": "hero",
    "powers": ["Size manipulation", "Strength scaling"],
    "firstAppearance": "Marvel Premiere #47 (1979)",
    "bio": "Former thief turned hero."
  }'

# Add a member to a team — enforces the domain rule (same universe, no dupes, max roster)
curl -X POST "http://localhost:3000/teams/<teamId>/members" \
  -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" \
  -d '{"characterId": "<characterId>"}'

# Watch the live feed (requires wscat: npm i -g wscat)
wscat -c ws://localhost:3000/ws
# → {"type":"character.created","payload":{"id":"...","name":"Ant-Man",...}}

Key SoapJS patterns shown

Ports & adapters with co-located DI tokens

Handlers depend on a tech-free port; the .Token lives on the port itself (no magic strings). Only the composition root knows the concrete adapter.

// application/ports — an abstract class IS the port: the DI token lives inside
// the contract. Ports are implemented by adapters, never extended.
export abstract class CharacterWriteRepository {
  static readonly Token = 'CharacterWriteRepository';
  abstract add(...characters: Character[]): Promise<Result<Character[]>>;
  // ...
}

// data — the adapter is the only place that names the tech, and lives in data/.
// It `extends` the soap base repo and `implements` the (multiple) ports.
export class CharacterRepositoryMongo
  extends ReadWriteRepository<Character, CharacterDocument>
  implements CharacterReadRepository, CharacterWriteRepository { /* ... */ }

// handlers depend on the port + its token — never the concrete adapter:
@Inject(CharacterWriteRepository.Token) private readonly repo: CharacterWriteRepository

CQRS via decorators — with domain validation & events

@CommandHandler(CreateCharacterCommand)
export class CreateCharacterHandler {
  constructor(
    @Inject(CharacterWriteRepository.Token) private readonly repo: CharacterWriteRepository,
    @Inject(DomainEventBus.Token) private readonly events: DomainEventBus,
  ) {}

  async handle(cmd: CreateCharacterCommand): Promise<Result<Character>> {
    const created = Character.create({ id: randomUUID(), ...cmd, createdAt: now, updatedAt: now });
    if (created.isFailure()) return Result.withFailure(created.failure); // ValidationError → 400

    const result = await this.repo.add(created.content);
    if (result.isFailure()) return Result.withFailure(result.failure);

    await this.events.publish(new CharacterCreatedEvent(result.content[0]));
    return Result.withSuccess(result.content[0]);
  }
}

Plain HTTP — no CQRS (the comics feature)

CQRS is opt-in (cqrs: true). For simple features, skip the buses entirely: the controller injects use cases and calls them directly. Same layering (domain / application / data / api), a single read-write repository, far less ceremony.

@UseCase() // = @Injectable; the controller calls execute() directly — no command/query bus
export class CreateComicUseCase {
  static readonly Token = 'CreateComicUseCase';
  constructor(@Inject(ComicRepository.Token) private readonly repo: ComicRepository) {}
  async execute(input: CreateComicInput): Promise<Result<Comic>> {
    const created = Comic.create({ id: randomUUID(), ...input, createdAt: now, updatedAt: now });
    if (created.isFailure()) return Result.withFailure(created.failure);
    return Result.withSuccess((await this.repo.add(created.content)).content[0]);
  }
}

@Controller('/comics')
export class ComicsController {
  constructor(@Inject(CreateComicUseCase.Token) private readonly createComic: CreateComicUseCase /* ...*/) {}
  @Post('/') @AdminOnly()
  async create(req: Request, res: Response) {
    const result = await this.createComic.execute(req.body);   // controller → use case → repo
    ResultMapper.toResponse(result, res, { successStatus: 201 });
  }
}

The same feature also shows the second endpoint style — a fluent Router — for its paginated list, so you can compare both side by side:

// comics.router.ts — define routes centrally, shape I/O with RouteIO
export class ComicsRouter extends ExpressRouter {
  setupRoutes() {
    this.get('/comics', () => undefined)
      .useCase(ListComicsPaginatedUseCase)   // pure handler: input → Result
      .routeIO(new PaginationIO());          // ?page/?limit/?filters → { data, pagination }
  }
}
// index.ts:  app.registerRouter(new ComicsRouter().initialize());

Both styles share the same execution semantics (a Result failure always maps via ResultMapper); pick whichever you prefer — see the Routing docs.

Two-tier queries

// Tier 1 — typed methods on the repo, built with a storage-agnostic Where (zero Mongo syntax)
where.valueOf('universe').isEq('marvel')
     .and.brackets(w => w.valueOf('name').like(q).or.valueOf('alias').like(q));

// Tier 2 — a named, reusable Specification object for complex/domain queries
export class ThreatsSpecification extends RepositoryQuery<FindParams> {
  build(): FindParams {
    const where = new Where().brackets(w =>
      w.valueOf('alignment').isEq('villain').or.valueOf('alignment').isEq('anti-hero'));
    return FindParams.create({ where, sort: { name: 1 } });
  }
}
// Native escape hatch — a query Where can't express (a grouped aggregation) is a
// NAMED RepositoryQuery run through the source, not a reach-around to the raw driver:
export class UniverseStatsAggregation extends RepositoryQuery<Record<string, unknown>[]> {
  build() { return [{ $group: { _id: { universe: '$universe', alignment: '$alignment' }, count: { $sum: 1 } } }]; }
}
// adapter: const rows = await this.context.source.native(new UniverseStatsAggregation());
// Source.native() (soap 0.10.0 / soap-node-mongo 0.7.0) runs the dialect query verbatim,
// bypassing the query factory + mapper — the sanctioned, named escape hatch.

Domain validation maps to HTTP at the boundary

// domain stays HTTP-free: it raises a pure ValidationError / DomainRuleError...
return Result.withFailure(Failure.fromError(new ValidationError('universe must be marvel|dc')));

// ...and the composition root maps error names to status codes:
ResultMapper.register('ValidationError', 400);
ResultMapper.register('DomainRuleError', 422);

Cross-feature boundary via a published contract

teams references characters only through the characters feature's published CharacterLookup contract — never its domain/ or data/ internals.

// features/characters/contracts — the public API other features may depend on
export interface CharacterLookup { findSummary(id: string): Promise<CharacterSummary | null>; }
export const CharacterLookup = { Token: 'CharacterLookup' } as const;

// teams' AddMember handler injects the contract + a pure domain service
const character = await this.characters.findSummary(cmd.characterId);
const admission = TeamMembershipPolicy.ensureCanAdmit(team, character.id, character.universe);

Domain events → read-model projection (full CQRS)

Command handlers publish events; the bound DomainEventBus fans out to every @EventHandler-decorated consumer — soap-express's wireCqrs discovers them in the same pass that sets up the command/query buses, so adding a new event consumer is just a decorator. A projector keeps a denormalized read model in sync; the stats query reads the projection, never aggregating the write store live.

@EventHandler(CharacterCreatedEvent, { token: 'stats.character.created' })
export class UniverseStatsOnCreated implements IEventHandler<CharacterCreatedEvent> {
  constructor(@Inject(UniverseStatsProjector.Token) private readonly projection: UniverseStatsProjector) {}
  async handle(e: CharacterCreatedEvent) { await this.projection.apply(e.data.universe, e.data.alignment, +1); }
}

Note: soap-express @EventHandler defaults its DI token to the event name, so two handlers for the same event collide — pass an explicit { token } (as above) when you have more than one consumer per event.

WebSockets — the same events, a third delivery channel

// soap orchestrates clients/subscriptions/heartbeat; ws is injected as the transport
const wsTransport = new WsSocketServer({ server: app.getServer<Server>(), path: '/ws' });
const socketServer = new SocketServer(wsTransport, { port, heartbeatInterval: 30000 });

// a broadcaster subscribes to the same domain events and pushes them to all clients
eventBus.subscribe(CharacterCreatedEvent, new CharacterFeedBroadcaster(socketServer));

Bootstrap — HTTP wired in one call

const { container, drainables, logger } = await buildContainer(config);

const app = await bootstrap({
  port, container,                                   // pre-wired repos, strategies, event bus
  logger,                                            // framework Logger (bound into DI as Logger.Token)
  drainables,                                        // mongo client, postgres pool, kafka consumer → auto-drained on SIGTERM
  controllers: [CharactersController, TeamsController, AuthController],
  middleware: { cors: true, helmet: true, logging: true, compression: true },
  auth: jwtStrategy,                                 // JWT guard for @Auth() routes
  cqrs: true,                                        // auto-wire CommandBus + QueryBus + EventBus (subscribes @EventHandler-s)
  healthCheck: true,
});

bootstrap() registers a graceful-shutdown handler that on SIGTERM/SIGINT (1) finishes plugin teardown, (2) stops the HTTP server, then (3) drains the drainables in order (mongo → postgres → kafka), so in-flight requests complete before the underlying resources close. The Logger is bound into DI; LoggingMiddleware emits a child logger with requestId on every request which ErrorHandler reuses for structured error logs.

MongoDB read/write split

// Commands hit the primary; queries hit a secondaryPreferred replica.
const soapMongoWrite = await SoapMongo.create(buildMongoConfig(config.mongo));
const soapMongoRead  = await SoapMongo.create(buildMongoConfig(config.mongoRead, /* readOnly */ true));

A second storage engine — PostgreSQL (authors)

The authors feature uses the same building blocks as comics (port + ReadWriteRepository adapter + use cases + controller) but the source under the context is a SqlSource from @soapjs/soap-node-sql. The domain stays camelCase; the mapper translates camelCase ↔ snake_case (Postgres convention) and serializes notableWorks as JSONB.

// composition root — same DI shape, different driver
const soapSql = await SoapSQL.create(buildPostgresConfig(config.postgres));
const authorSource = new SqlSource<AuthorRow>(soapSql, 'authors');
const authorRepo = new AuthorRepositoryPostgres(
  new DatabaseContext(authorSource, new AuthorMapper(), soapSql.sessions),
);
await ensureAuthorsTable(soapSql);          // idempotent CREATE TABLE IF NOT EXISTS
await seedAuthorsIfEmpty(soapSql, repo);    // only inserts on first boot

The repository builds a Where exactly like the Mongo adapters — soap pushes it through SqlQueryFactory + SqlWhereParser, which emits dialect-aware SQL ("col" for Postgres/SQLite, `col` for MySQL) and converts ? placeholders to $1, $2, … when the driver is pg:

private buildWhere(filters: AuthorFilters): Where | undefined {
  const where = new Where();
  if (filters.publisher) where.valueOf('publisher').isEq(filters.publisher);
  if (filters.role)      where.valueOf('role').isEq(filters.role);
  if (filters.country)   where.valueOf('country').isEq(filters.country);
  if (filters.search)    where.valueOf('name').like(`%${filters.search}%`);
  return where;
}
// → SELECT * FROM "authors" WHERE "publisher" = $1 AND "role" = $2 ORDER BY "name" ASC LIMIT 20

Seed data

20 characters across Marvel and DC, plus 2 teams:

Universe Heroes Villains Anti-heroes
Marvel Iron Man, Spider-Man, Black Widow, Thor, Doctor Strange Thanos, Doctor Doom Magneto, Loki, Venom
DC Batman, Superman, Wonder Woman, The Flash, Green Lantern The Joker, Lex Luthor, Darkseid Deathstroke, Harley Quinn

Teams: Avengers (Marvel heroes), Justice League (DC heroes). Comics: 4 landmark issues (Action Comics #1, Detective Comics #27, Amazing Fantasy #15, Iron Man #55). The seed also rebuilds the UniverseStats projection from the write store.

Authors (PostgreSQL, seeded on first boot): Stan Lee, Jack Kirby, Steve Ditko, Chris Claremont, Bob Kane, Bill Finger, Frank Miller, Alan Moore, Grant Morrison, Neil Gaiman — split between Marvel, DC and both, covering writers, artists and writer-artists from the USA and the UK.

Default admin: admin / admin123

A pluggable event bus — in-memory ↔ Kafka

Every command handler publishes domain events through a single port, DomainEventBus, which ships with @soapjs/soap/cqrs. The composition root binds one of two adapters, selected at boot — both are off-the-shelf framework classes, the demo doesn't ship its own:

// src/config/dependencies.ts
import { DomainEventBus, InMemoryDomainEventBus } from '@soapjs/soap/cqrs';
import { KafkaEventBus, KafkaDomainEventBus } from '@soapjs/soap-node-kafka';

let eventBus: DomainEventBus;
if (config.eventBus === 'kafka') {
  const kafkaCore = new KafkaEventBus({ brokers, topicName, ... });
  const kafkaBus  = new KafkaDomainEventBus(kafkaCore, {
    topic: config.kafka.topic,
    groupIdPrefix: config.kafka.groupId,
    logger,                                            // structured diagnostics
  });
  await kafkaBus.start();                              // connects the producer + ensures topic
  eventBus = kafkaBus;
  drainables.push({ close: async () => kafkaBus.stop() });   // auto-drained by bootstrap on shutdown
} else {
  eventBus = new InMemoryDomainEventBus();
}
container.bindValue(DomainEventBus.Token, eventBus);

Subscribers don't need any wiring — bootstrap({ cqrs: true }) discovers every @EventHandler-decorated class from DecoratorRegistry and calls eventBus.subscribe(eventType, handler) automatically (the wireCqrs step became async in soap-express 0.4.4 to accommodate the Kafka consumer setup).

Flip the switch with one env var — no code change, no rebuild:

EVENT_BUS=kafka docker-compose up -d api    # broker path
EVENT_BUS=in-memory docker-compose up -d api # default (zero infra)

What KafkaDomainEventBus does

  • Maintains one Kafka consumer per process (unique groupId per replica) on the configured topic. Every event is fetched by that single consumer, then fanned out in-process to whichever @EventHandler-decorated consumers are wired for the event type. This sidesteps the partition- splitting failure mode you get when multiple subscribers join the same consumer group: each replica still gets every event, the projector and audit handlers don't fight for partitions, and adding handlers doesn't require new Kafka consumers.
  • On publish, serialises the event with an internal __eventType marker so the consumer can rebuild the right prototype on the other side (Object.create(eventType.prototype)) — instanceof checks still hold after the round-trip.
  • Wraps @soapjs/soap-node-kafka's KafkaEventBus, which carries the production goodies (DLQ, retry-with-backoff, batch / concurrent scenarios, memory-pressure back-pressure, partition-lag metrics, graceful drain on shutdown). The demo only uses the simple consume scenario; the rest are available to feature handlers if they need them.

How to see it work

# Bring up Kafka + the api in kafka mode
EVENT_BUS=kafka docker-compose up -d
docker-compose exec api node build/scripts/seed.js

# Login → create a character → watch the stats projection update
TOKEN=$(curl -s -X POST http://localhost:3000/auth/login \
  -H 'Content-Type: application/json' \
  -d '{"email":"admin@comics-universe.io","password":"admin123"}' \
  | python3 -c 'import sys,json; print(json.load(sys.stdin)["tokens"]["accessToken"])')

curl -s http://localhost:3000/characters/stats   # marvel/hero: 5
curl -s -X POST http://localhost:3000/characters \
  -H "Authorization: Bearer $TOKEN" -H 'Content-Type: application/json' \
  -d '{"name":"Kafka Tester","alias":"KT","universe":"marvel","alignment":"hero",
       "powerLevel":42,"powers":["distributed messaging"],
       "firstAppearance":"Kafka #1","bio":"Tests Kafka bus"}'

sleep 2
curl -s http://localhost:3000/characters/stats   # marvel/hero: 6 ← via Kafka
docker-compose logs api --tail 5 | grep '\[audit\]'
# → [audit] character.created — Kafka Tester (marvel/hero)

The same Bruno E2E flow that validated the in-memory path (bruno/E2E Flow/*) passes against the Kafka path unchanged — that's the whole point of the port/adapter split.

Chat over WebSocket — rooms, JWT, persistence

The demo serves two WebSocket endpoints on the same port:

Path Direction Purpose Behind
/ws broadcast (server → client) Every character event becomes a frame for all listeners In-house WsSocketServer over ws
/chat bidirectional JWT-auth on first frame, rooms, persistence, broadcast @soapjs/soap-node-socket 0.0.3

/chat is the bigger story. The transport is the freshly-patched WebSocketServerAdapter (server: …, path: '/chat'), the orchestration is soap-core's SocketServer (clients, heartbeat, subscriptions), and the chat-specific protocol lives in features/chat/api/sockets/chat.socket.ts.

Protocol

Every frame is JSON. Client → server frames:

Frame Required state Effect
{ "type": "auth", "token": "<JWT>" } first frame Verifies the JWT (same secret as HTTP). Sends auth.ok on success, auth.error + disconnect on failure. Unauthenticated sockets are dropped after 5 s.
{ "type": "join", "room": "<id>" } authed Subscribes the client to the room. Replies join.ok, broadcasts user.joined to the room, pushes the last 50 messages back as history.
{ "type": "leave", "room": "<id>" } authed Unsubscribes + broadcasts user.left.
{ "type": "message", "room": "<id>", "text": "..." } authed AND joined Validates, persists to Mongo via PostChatMessageUseCase, broadcasts the saved entity (with id + sentAt) to every subscriber of the room.

Server → client frames cover the matching acks plus message, history, user.joined, user.left, and error ({ payload: { reason } }).

Allowed rooms (bounded so the demo stays tidy): general, marvel, dc, comic-con-2026.

Try it

# Static HTML client — open two tabs, log in, send messages
open http://localhost:3000/chat-ui/chat.html

# Or the Node smoke script — two clients, real WS, full handshake + history
node scripts/chat-smoke.js
# → PASS — chat end-to-end works (auth, join, broadcast, history, validation)

# Or just the REST surface
curl http://localhost:3000/chat/marvel/messages          # [] then growing
curl http://localhost:3000/chat/marvel/messages?limit=5
curl -i http://localhost:3000/chat/atlantis/messages     # → 404 NotFound

Why this is interesting

  • The chat demonstrates two distinct WS adapters living side by side on the same HTTP server — the patched WebSocketServerAdapter from @soapjs/soap-node-socket for /chat, and the in-house WsSocketServer for /ws. Both run on port 3000 via noServer: true + manual upgrade routing, so no extra port, no extra TLS cert, no extra reverse-proxy rule.
  • Auth-via-first-message rather than handshake headers or query strings, because soap-core's SocketServer callbacks don't expose the raw upgrade request and the pattern is identical across ws and socket.io. Same approach you'll find in Discord, Slack, and most production chats.
  • The chat reuses the same JWT issued by /auth/login and the same use cases / repositories / port pattern as comics and authors. Nothing about the WS transport leaks into the domain or the application layer.

SoapJS library patches applied (vendored)

The fixes below were authored in the upstream repos while building this demo and are bundled here as local tarballs (vendor/*.tgz) until released to npm.

Library Bumped to What Why
@soapjs/soap 0.11.3 Logger port + Logger.Token + ConsoleLogger (json/pretty, levels, env-aware, child(bindings)) + NoopLogger. BaseHttpApp binds the Logger into DI, exposes registerDrainable(), and runs shutdown in a fixed order (plugins → HTTP stop() → drain). New DomainEventBus port + InMemoryDomainEventBus in cqrs/. Drainable interface re-exported under @soapjs/soap/events. Demo code had its own ad-hoc Logger, in-process event bus, and SIGTERM handler — all three were boilerplate every consumer would have to rewrite. Moved into the framework so the bootstrap is one line, the lifecycle is correct by construction, and the DomainEventBus port is a proper first-class CQRS primitive.
@soapjs/soap 0.11.2 is{Find,Count,Aggregation,Remove}Params now reject RepositoryQuery instances (typeof build !== 'function'). A RepositoryQuery (e.g. ThreatsSpecification) was being treated as a FindParams, skipping .build() and dropping the Where clause.
@soapjs/soap 0.11.x @Inject metadata moved off Object.prototype (per-constructor map). Old version polluted Object.prototype, leaking decorator params across unrelated classes — broke CQRS handlers.
@soapjs/soap-auth 0.4.4 LocalStrategy.fetchUser accepts either an identifier string or a credentials object; JWT strategy guards optional persistence/refreshTokenConfig. Login crashed on (reading 'store') / (reading 'extract') for the minimal JWT config used here.
@soapjs/soap-express 0.4.4 bootstrap({ logger, drainables, cqrs: true }) binds the Logger into DI, auto-registers a graceful-shutdown handler that drains the supplied resources on SIGTERM/SIGINT, and wireCqrs is now async — it not only sets up the command/query buses but also discovers every @EventHandler in DecoratorRegistry and calls eventBus.subscribe(eventType, handler) for each. LoggingMiddleware attaches a child logger (requestId/method/path) to req.log, emits structured http access logs, and escalates to warn/error for 4xx/5xx; ErrorHandler picks up req.log for contextual error logs. The demo's wireEvents.ts, its own SIGTERM handler, and the ad-hoc console.log access log were all boilerplate. Now bootstrap is one call, the request lifecycle has a consistent logger from middleware through error handler, and event handlers are wired the same way as command/query handlers.
@soapjs/soap-express 0.4.3 bootstrap wires CQRS before instantiating controllers; @AdminOnly() infers a default strategy; auth middleware maps soap-auth errors to 401; role check accepts both user.role and user.roles[]. CQRS controllers couldn't see the buses; admin routes failed because role middleware ran without auth.
@soapjs/soap-node-mongo 0.7.1 Relaxed peerDependencies for @soapjs/soap to >=0.10.0. Was pinned ^0.10.0, blocking install against soap 0.11.
@soapjs/soap-node-sql 0.2.1 SqlWhereParser quotes identifiers per dialect ("col" for PG/SQLite, `col` for MySQL); SqlUtils.buildLimitClause uses SQL-standard LIMIT N OFFSET M; SqlDataSource.{find,count,update,remove,aggregate} route soap params (FindParams etc) through SqlQueryFactory via instanceof; SqlQueryFactory.resolveClause strips the duplicate WHERE prefix the parser prepends; peer dep on soap relaxed. Postgres rejected backtick-quoted identifiers, MySQL-style LIMIT off,lim syntax, and double-WHERE WHERE clauses; FindParams carrying a Where was silently dropped on the legacy path, returning every row.
@soapjs/soap-node-socket 0.0.3 WebSocketServerAdapter / SocketIOServerAdapter accept `{ port, server, path, ws io }options (legacy(port)ctor kept for back-compat);server-mode uses noServer: true+ a manualupgradelistener so multiple sockets can share one HTTP server with different paths;socket.io/socket.io-clientare now lazy-loaded so consumers using only the WebSocket adapter don't have to install them; root index uses@soapjs/soapinstead of forbidden./build/... deep imports; peer dep on soap relaxed. Two new adapter test files (websocket-adapter.test.ts, socketio-adapter.test.ts`) cover all the above.
@soapjs/soap-node-kafka 0.1.2 New KafkaDomainEventBus class — implements the DomainEventBus port from @soapjs/soap/cqrs: one Kafka consumer per process, in-process fan-out to N @EventHandlers, serialises events with an __eventType / __eventVersion header so consumers can rebuild typed prototypes (instanceof checks survive the round-trip), and exposes start() / stop() for the bootstrap drainables. Relaxed peer dep on soap. The demo had this exact class hand-rolled in src/common/events/. Moved into soap-node-kafka so any consumer flipping EVENT_BUS=kafka gets the single-consumer + fan-out shape for free, without rewriting it (and without partition-fighting across @EventHandlers).

All fixes are covered by unit tests in the respective library repos (soap: domain/__tests__/params.test.ts; soap-node-sql: __tests__/sql.where.parser.test.ts + __tests__/sql.query-factory.test.ts

  • __tests__/sql.source.test.ts + __tests__/sql.utils.test.ts; soap-node-socket: __tests__/websocket-adapter.test.ts + __tests__/socketio-adapter.test.ts — 58 unit tests, +29 new ones for the options/routing/lazy-load patches).

@soapjs/soap-node-kafka 0.1.2 ships the KafkaDomainEventBus class directly — the demo just binds it into DI under DomainEventBus.Token and lets bootstrap({ cqrs: true }) subscribe every @EventHandler to it. No boilerplate left in the demo.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors