diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 245772e..1187a27 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: --health-retries 5 strategy: matrix: - node-version: [22, 24] + node-version: [22, 24, 26] steps: - uses: actions/checkout@v4 - uses: pnpm/action-setup@v4 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..26368ca --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2026 Platformatic Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md index f1538f8..651ec11 100644 --- a/README.md +++ b/README.md @@ -1,65 +1,51 @@ # @platformatic/coordinator -Sticky-instance coordinator library for multi-pod Fastify services. Routes a client's request -- carrying an instance id -- to the pod that owns that instance, using Redis as the source of truth. Provides a member registry, allocation strategies, three Fastify helpers for the common sticky-routing patterns, and small utilities for the parts that are easy to get wrong. +Multi-pod destination routing for stateful tiers. Valkey-backed registry, pod-side `Member` class, allocation strategies, lock routing, TTL cache, and optional Fastify helpers. -This package is a library, not a Watt stackable. It is consumed by **preset stackables** (e.g. `@platformatic/regina-coordinator`) that publish their own API contract on top. +See [`coordinator-pattern.md`](./coordinator-pattern.md) for the architecture this library implements: caller / coordinator / resource pod, failover, fan-out, transactions and locks. ## What it solves -You have N pods, each holding stateful instances (chat sessions, connection pools, simulations, sandboxes). A client request carries an instance id. You need that request to land on the pod that owns that instance. When pods die, surviving pods should be able to take over (assuming they can rehydrate from shared state). Allocation of new instances should spread across pods. +You have N pods that hold stateful resources (PostgreSQL connection pools, agent processes, sandboxes, simulations). Requests carry a routing key -- a "destination" or "instance" id. You need every request for that destination to land on a pod that owns it. Pods die; surviving pods should take over. Under sustained load, a single destination may need to live on more than one pod. -## Concepts +This library handles all of that: -- **Member** -- a pod, identified by `memberId`, advertising its address. The pod registers itself in Redis on startup and keeps its address key alive with a TTL it refreshes from a heartbeat loop. -- **Instance** -- an opaque id whose ownership is bound to a member. The coordinator writes this binding when an instance is created and reads it on every subsequent request. -- **Sticky mapping** -- `instance -> member`, **no TTL**. Outlives the member's address key, which is what makes orphan detection possible. -- **Allocation strategy** -- how a fresh instance is assigned to a member: round-robin, least-loaded, or random. +- Destination → pod set, persisted in Valkey +- Pod self-registration + heartbeat, with `total_connections` published as a metric +- Atomic first-touch claim, atomic failover (SREM dead + SADD fresh) +- Pluggable allocation strategies (round-robin, least-loaded by total connections, random) +- Lock routing for transaction-bound calls (lockId → pod, resolved via Valkey) +- A short-lived local cache for the hot resolution path +- Fastify helpers for HTTP coordinators -## How a request is routed - -```mermaid -graph TB - Client([Client]) -->|"POST /instances//work"| App[Fastify app] - - subgraph Library["@platformatic/coordinator"] - App --> Helper["lookupAndProxy"] - Helper --> Registry - Helper --> Proxy[proxyRequest] - end +## Install - Registry <--> Redis[("Redis / Valkey")] - Proxy -->|"undici"| Pod1["Pod owning the instance"] - Pod1 --> Helper - Helper --> App - App --> Client +```sh +npm install @platformatic/coordinator ``` -Per request: two Redis reads (instance mapping, then member address) plus one upstream HTTP call. Stateless across coordinator replicas because Redis is the source of truth. - -## Install +For the Fastify helpers, also: ```sh -npm install @platformatic/coordinator @fastify/reply-from +npm install @fastify/reply-from ``` -Peer dependencies: `fastify >= 5` and `@fastify/reply-from >= 12`. The helpers proxy upstream responses through `reply.from`, so the host application **must** register `@fastify/reply-from` once before any helper-backed route is mounted. Runtime dependencies: `iovalkey`, `undici`. +Peer dependency: `fastify >= 5` when using the Fastify helpers. -## Redis layout +## Valkey layout With `keyPrefix: 'myservice'`: -| Key | Type | Owner | Notes | -|-------------------------------------------|--------|-------------|-----------------------------------------| -| `myservice:members` | set | pod | the pod's `memberId` lives here | -| `myservice:member:` | string | pod | pod's address; TTL refreshed by heartbeat | -| `myservice:member::instances` | string | pod | pod's instance count (for `least-loaded`) | -| `myservice:instance:` | string | coordinator | binds instance to memberId, **no TTL** | - -The coordinator only writes the `:instance:` keys. Everything under `:members` and `:member:*` is owned by the pods themselves. +| Key | Type | Owner | Purpose | +|---|---|---|---| +| `myservice:members` | set | pod | the set of memberIds known to be live | +| `myservice:member:` | hash with `address`, `load` | pod | live pod registration and load metric, TTL refreshed by heartbeat | +| `myservice:destination:` | set of memberIds | coordinator + pod | pods currently serving this destination | +| `myservice:lock:` | hash with `podId`, `destinationId`, metadata | pod | lockId routing for transaction-bound calls | -## Pod responsibilities +## Pod side: `Member` -A pod that wants to participate in a coordinator-managed mesh uses the `Member` class, which encapsulates every Redis operation the pod side needs: +The pod-side class owns its own iovalkey connection and writes the keys the pod is responsible for. ```ts import { Member } from '@platformatic/coordinator' @@ -68,260 +54,149 @@ const member = new Member({ redis: 'redis://valkey:6379', memberId: 'pod-1', address: 'http://pod-1.local:3000', - keyPrefix: 'myservice', // must match the coordinator's keyPrefix - ttl: 30 // optional, seconds; default 30 + keyPrefix: 'myservice', + ttl: 30, // seconds; default 30 + getLoad: () => pool.openCount() // optional; default () => 0 }) -await member.register() // on startup -const heartbeat = setInterval(() => member.heartbeat(), 10_000) // every ~10s +await member.register() // SADD + HSET + EXPIRE +const heartbeat = setInterval(() => member.heartbeat(), 10_000) // HSET + EXPIRE heartbeat.unref() -// when an instance is created/destroyed on this pod: -await member.registerInstance(instanceId) -await member.deregisterInstance(instanceId) +// When this pod fans itself in to a destination: +await member.addToDestination(destId) +await member.removeFromDestination(destId) + +// When this pod mints / releases a transaction lock: +await member.registerLock(lockId, destId, { isolationLevel: 'serializable' }) +await member.unregisterLock(lockId) + +// Peer query for fan-out picks (returns live pods with their load): +const peers = await member.listPeerLoad() -// on graceful shutdown: +// Graceful shutdown: clearInterval(heartbeat) await member.deregister() await member.close() ``` -Under the hood `Member` performs the four Redis operations every coordinator-managed mesh requires: +## Coordinator side: `Registry` -1. **Register** -- `SADD :members ` and `SET :member:
EX `. -2. **Heartbeat** -- `EXPIRE` on the address key. Skipping this is what marks the pod dead. -3. **Track load** -- `INCR` / `DECR` on `:member::instances` (used by `least-loaded`). -4. **Deregister** -- `SREM` from the members set, `DEL` the address and load keys. - -`Member` owns its own Redis connection and never exposes it; pods don't need to depend on `iovalkey` directly. +```ts +import { Registry } from '@platformatic/coordinator' -## Allocation strategies +const registry = new Registry({ + redis: 'redis://valkey:6379', + keyPrefix: 'myservice', + strategy: 'least-loaded', + cache: { ttl: 5000, max: 10_000 } // default; pass `false` to disable +}) -```mermaid -flowchart TD - A[New instance] --> B[Registry.listMembersWithLoad] - B --> C["SMEMBERS <prefix>:members"] - C --> D["MGET addresses + instance counts"] - D --> E{Strategy} +// Hot path: resolve a destination, pick one pod, return its address. +const resolved = await registry.resolveDestination(destId, { + claimOnMiss: true, // SADD a fresh pod if the destination's set is empty + reassignOrphans: true // SREM dead + SADD fresh if every pod in the set is dead +}) +if (resolved) { + // { address, memberId, reassigned } +} - E -->|round-robin| F["Next member in cycle"] - E -->|least-loaded| G["Member with min count
(ties broken round-robin)"] - E -->|random| H["Math.random()"] +// Lock-bound call: route by lockId, not destination. +const lockRouting = await registry.resolveLock(lockId) +if (lockRouting) { + // { address, memberId } +} - F --> I[Selected pod] - G --> I - H --> I +// Other primitives: +await registry.listLiveMembers() // [{ memberId, address, load }, ...] +await registry.pickMember({ destinationId: destId }) +await registry.addPodToDestination(destId, memberId) +await registry.hasBinding(destId) +await registry.deregisterDestination(destId) +await registry.close() ``` -Pick one based on workload shape: +## Resolution and failover -- **`round-robin`** (default) -- even distribution, ignores load. Cheapest. Right when instances are roughly uniform in cost. -- **`least-loaded`** -- reads each pod's instance count from Redis (one `MGET` round-trip), picks the minimum, breaks ties round-robin. Right when instances are heavy and you want to spread them (e.g. connection pools, long-lived simulations). -- **`random`** -- one `Math.random()`. Right for sharded scenarios where you want zero coordination state. +`resolveDestination` reads the destination's pod set, filters by liveness, and applies the allocation strategy. The four cases: -You can also pass a custom `AllocationStrategy` instance to the `Registry` constructor. +| Set state | `claimOnMiss` | `reassignOrphans` | Result | +|---|---|---|---| +| Empty | false | -- | `null` (404 territory) | +| Empty | true | -- | Pick a live pod, `SADD` it, return | +| Has live pods (possibly with dead too) | -- | -- | Pick one of the live pods; dead ones cleaned up in background | +| All dead, non-empty | -- | false | `null` | +| All dead, non-empty | -- | true | Pick fresh, `SREM` dead + `SADD` fresh, return with `reassigned: true` | -## Orphan detection +All writes use `SADD` / `SREM` (atomic). Concurrent first-touch by two coordinators can produce a destination with two pods from the start. That's a valid steady state, not a corrupted one. -When a pod's address key TTL expires (pod crashed, pod missed heartbeats), its instance mappings still point at the dead `memberId`. The next request for one of those instances surfaces the situation: - -```mermaid -flowchart TD - A[Request for instanceId] --> B{Lookup mapping} - B -->|Not found| E[404] - B -->|memberId exists| C{Member address alive?} - C -->|Yes| D[Proxy to pod] - C -->|No, reassignOrphans=false| F["{address: null}
caller decides"] - C -->|No, reassignOrphans=true| G[Pick a live pod] - G --> H[Update mapping in Redis] - H --> I[Proxy to new pod] - I --> J["Pod rehydrates from
shared state on first hit"] -``` - -`reassignOrphans` is opt-in **per call** (per route, in practice). Use `true` only when your pods can rebuild an instance's state from shared storage on demand. Use `false` (the default) when "pod is dead" should mean "instance is gone" -- the helper returns a `{ address: null }` result and the route handler decides what to do. - -## Sticky helpers - -Three Fastify route handlers cover the patterns that repeat across consumers. Each is a one-liner at the call site and captures one specific footgun. Helpers do not touch metrics directly -- they emit a tagged result via an optional `onResult` callback, which the preset can hook into to bump its own counters with whatever label scheme it likes. +## Allocation strategies -### `lookupAndProxy` +Pluggable. Built-in: `round-robin` (default), `least-loaded`, `random`. Custom strategies implement: ```ts -app.post('/instances/:id/work', lookupAndProxy(registry, { - instanceFrom: req => req.params.id, - reassignOrphans: true -})) +interface AllocationStrategy { + pick (candidates: MemberInfo[], ctx: { destinationId?: string }): MemberInfo | null +} ``` -Resolves the instance, proxies the request to the owning pod, drains the response. On unknown instance: 404. On dead pod with `reassignOrphans: false`: also 404. On dead pod with `reassignOrphans: true`: picks a new pod, rewrites the mapping, proxies there. - -Defaults: `reassignOrphans: false`, `notFoundMessage: 'Instance not found'`. Optional `onResult: (r) => void` is called with `'hit' | 'orphan_reassigned' | 'not_found'`. +`candidates` is the pool to choose from -- the full live set on first-touch / failover, or the live members of a destination's pod set on the hot path for fanned-out destinations. `ctx.destinationId` is the destination, so custom strategies can branch on it (for example, pin "dedicated" tenants to a designated subset of pods and round-robin "shared" tenants across the rest). -Doesn't handle: response-header rewrites, custom error envelopes. For those, resolve the instance yourself and call `reply.from(...)` inline (see "When the helper isn't enough" below). - -### `pickAndRegister` +Built-in least-loaded reads `load` from each candidate's member record (`HGET` pipeline). It runs at first touch for single-pod destinations and on every request for fanned-out destinations. -```ts -app.post('/instances', pickAndRegister(registry, { - registerIdFrom: res => res.instanceId -})) -``` +## TTL cache -Picks a member via the strategy, proxies the create request to it, and registers the resulting id in Redis **only** if the upstream returns `expectedStatus`. The footgun this kills: forgetting to gate `registerInstance` on success, which would otherwise bind Redis mappings to instances the pod never created. +`resolveDestination` checks a local LRU+TTL cache before reading Valkey. Default 5 s TTL, 10 000 entries. Configure with `cache: { ttl, max }` or disable with `cache: false`. Writes through the registry (`addPodToDestination`, `deregisterDestination`) evict the affected key. Each replica has its own cache. -On no live pods: 503 with the configured message. The upstream's response body is forwarded to the client verbatim. +## Fastify helpers -Defaults: `expectedStatus: 201`, `unavailableMessage: 'No pods available'`. Optional `onResult: (r) => void` is called with `'spawned' | 'unavailable' | 'upstream_error'`. +For HTTP-based coordinators, three helpers wrap the common patterns. Each emits a tagged result via an optional `onResult` callback so presets can hook their own metric counters. -### `lookupAndDeregister` +### `lookupAndProxy` ```ts -app.delete('/instances/:id', lookupAndDeregister(registry, { - instanceFrom: req => req.params.id +app.post('/destinations/:id/work', lookupAndProxy(registry, { + destinationFrom: req => req.params.id, + reassignOrphans: true, + onResult: result => metrics.inc({ type: 'work', result }) // 'hit' | 'orphan_reassigned' | 'not_found' })) ``` -Resolves the instance (no orphan reassignment -- a dead instance cannot be deleted). If the pod is alive, proxies the DELETE; on `expectedStatus`, drains and deregisters. If the pod is dead, **skips the proxy entirely** and just removes the Redis mapping. The footgun this kills: forcing a freshly assigned pod to spin an instance up just to delete it. - -Defaults: `expectedStatus: 204`, `notFoundMessage: 'Instance not found'`. Optional `onResult: (r) => void` is called with `'deregistered' | 'deregistered_dead_pod' | 'not_found' | 'upstream_error'`. - -### When the helper isn't enough - -Helpers cover the simple case. For header overrides, response-body massaging, or alternative error shapes, resolve the instance yourself and forward with `reply.from`: - -```ts -app.post('/instances/:id/stream', async (req, reply) => { - const resolved = await registry.resolveInstance(req.params.id, { reassignOrphans: true }) - if (!resolved?.address) return reply.code(404).send({ error: 'Not found' }) - - return reply.from(`${resolved.address}${req.url}`, { - rewriteHeaders: (headers) => { - if ((headers['content-type'] as string | undefined)?.includes('text/event-stream')) { - headers['content-type'] = 'application/x-ndjson' - } - return headers - } - }) -}) -``` +Resolves the destination, proxies via `reply.from`, returns 404 if the destination has no live pod. -## Utilities - -### `proxyRequest(address, req, opts?)` - -A small undici wrapper used for fan-out routes (where you want to call several pods in parallel and aggregate). The sticky helpers do **not** use this -- they go through `@fastify/reply-from`. Reach for `proxyRequest` only when you genuinely need the raw upstream response object (status code, body parser) outside the request/reply pipeline. - -- Pulls `req.method` and `req.url` from Fastify (or uses `opts.upstreamPath`). -- If `req.body` is set, JSON-stringifies it and sets `content-type: application/json`. JSON-only in v1; non-JSON requires `fastify-raw-body` and is out of scope. -- Applies `opts.timeout` as both `headersTimeout` and `bodyTimeout` on the undici call. -- Inbound headers are not propagated. - -## Metrics - -The library does not ship a metrics module. Each preset is responsible for its own counters and gauges -- the label scheme that makes sense for one preset (e.g. Regina's `type` label with values like `chat`, `chat_stream`, `messages`) won't fit another. A typical preset reads `globalThis.platformatic?.prometheus` inside its Fastify plugin, builds the counters/gauges it needs, and feeds them into the helpers via `onResult`: +### `pickAndRegister` ```ts -const prom = (globalThis as any).platformatic?.prometheus -const requestsTotal = prom && new prom.client.Counter({ - name: 'my_coordinator_requests_total', - help: 'Routed requests', - labelNames: ['type', 'result'], - registers: [prom.registry] -}) - -app.post('/things/:id/work', lookupAndProxy(registry, { - instanceFrom: req => req.params.id, - reassignOrphans: true, - onResult: (result) => requestsTotal?.inc({ type: 'work', result }) +app.post('/destinations', pickAndRegister(registry, { + registerIdFrom: res => res.id })) ``` -A common gauge is `_pod_count`. Refresh it on a `setInterval`: - -```ts -const podCount = prom && new prom.client.Gauge({ - name: 'my_coordinator_pod_count', help: 'Registered pod count', registers: [prom.registry] -}) -const refresh = async () => podCount?.set((await registry.listMembers()).length) -const interval = setInterval(refresh, 15_000) -interval.unref() -refresh() -app.addHook('onClose', () => clearInterval(interval)) -``` - -## Writing a preset +Picks a pod, proxies the create request, and `SADD`s the returned id to the destination set only on a 2xx upstream response. Returns 503 if there are no live pods. -A preset is the smallest possible Watt stackable that exposes its API contract using this library. Skeleton: +### `lookupAndDeregister` ```ts -import fp from 'fastify-plugin' -import replyFrom from '@fastify/reply-from' -import { - Registry, - lookupAndProxy, pickAndRegister, lookupAndDeregister -} from '@platformatic/coordinator' - -async function myCoordinatorPlugin (app) { - // Watt augments the Fastify instance with `.platformatic`; cast in TS. - const config = (app as any).platformatic.config.myCoordinator ?? {} - - // Required: the helpers proxy via reply.from, so register the plugin once. - await app.register(replyFrom) - - const registry = new Registry({ - redis: config.redis, // string from watt.json - keyPrefix: 'myservice', // YOUR namespace - strategy: config.allocationStrategy ?? 'least-loaded', - requestTimeout: config.requestTimeout - }) - - app.addHook('onClose', () => registry.close()) - - // (Optional) Wire your own metrics here. See the Metrics section. - - // YOUR routes here -- the API contract you're publishing. - const id = req => req.params.instanceId - app.post('/instances', pickAndRegister(registry, { registerIdFrom: r => r.instanceId })) - app.post('/instances/:instanceId/work', lookupAndProxy(registry, { instanceFrom: id, reassignOrphans: true })) - app.delete('/instances/:instanceId', lookupAndDeregister(registry, { instanceFrom: id })) -} - -export const plugin = fp(myCoordinatorPlugin, { name: 'my-coordinator' }) +app.delete('/destinations/:id', lookupAndDeregister(registry, { + destinationFrom: req => req.params.id +})) ``` -The preset also owns its `schema` (with `$id` for Watt validation) and its `Generator` (for `wattpm create`). See [`@platformatic/regina-coordinator`](https://github.com/platformatic/regina) for a complete example with eleven routes, fan-out aggregation, and a streaming variant. - -## Public API - -```ts -export { - Registry, - Member, - RoundRobinStrategy, LeastLoadedStrategy, RandomStrategy, createStrategy, - proxyRequest, - lookupAndProxy, pickAndRegister, lookupAndDeregister -} +Resolves, proxies the delete; on `expectedStatus` (204 by default), `DEL`s the destination set. If the destination has only dead pods, skips the proxy and just deletes the set ("deregistered_dead_pod"). -export type { - AllocationStrategy, MemberInfo, MemberWithLoad, ResolveResult, - RegistryOptions, MemberOptions, ProxyRequestOptions, - LookupAndProxyOptions, LookupAndProxyResult, - PickAndRegisterOptions, PickAndRegisterResult, - LookupAndDeregisterOptions, LookupAndDeregisterResult -} -``` +All three helpers go through `@fastify/reply-from`, which the host application must register once before any helper-backed route is mounted. ## Testing -Tests use a dedicated Redis on `127.0.0.1:6390` so they don't collide with anything you run on the default port. A `docker-compose.yml` is included. +Tests use Redis on `127.0.0.1:6390`. A `docker-compose.yml` is included. ```sh -pnpm run test:redis:up # starts redis:7-alpine on 6390 (waits for healthcheck) +pnpm run test:redis:up pnpm test -pnpm run test:redis:down # stops and removes the container +pnpm run test:redis:down ``` -The URL is read from `REDIS_URL` (default `redis://127.0.0.1:6390`), so CI can point tests at any Redis. Tests isolate keys with a random prefix and clean up after themselves. +The URL is read from `REDIS_URL` (default `redis://127.0.0.1:6390`). Tests isolate keys with a random prefix and clean up after themselves. ## License diff --git a/coordinator-pattern.md b/coordinator-pattern.md new file mode 100644 index 0000000..c0f22e3 --- /dev/null +++ b/coordinator-pattern.md @@ -0,0 +1,279 @@ +# The Coordinator Pattern + +A design note for `@platformatic/coordinator`. The library implements the runtime primitives described below; this document explains the architecture they fit into and why each piece is shaped the way it is. + +## Architecture + +```mermaid +flowchart TB + User([Client]) + LB[Load balancer] + Valkey[("Valkey")] + + subgraph SL["Caller pods, stateless, many -- K8s Deployment"] + subgraph SPod["Caller process"] + direction TB + App["Application logic"] + Router["coordinator"] + App -->|"in-process call"| Router + end + end + + subgraph DL["Resource pods, stateful, few -- K8s Headless Service"] + direction LR + subgraph DPod1["Resource pod"] + Owner1["resource owner
dest 1..41"] + end + subgraph DPod2["Resource pod"] + Owner2["resource owner
dest 42..N"] + end + end + + Backend[("Underlying resource")] + + User --> LB --> App + Router -->|"1. resolve destination"| Valkey + Router -->|"2. forward call over HTTP"| Owner2 + DPod2 -.->|"3. reassign on failover"| Valkey + Owner1 --> Backend + Owner2 --> Backend +``` + +Steps 1 and 2 run on every call; step 3 only on failover. The router-to-resource hop is the only HTTP hop on the resource path. It replaces the per-call overhead of local resource management with a single network hop to a warm shared owner. + +## The Two Sides + +The library splits into two halves with a clear contract between them: + +- **The resource pod** owns the stateful thing (connection pools, agent processes, sandboxes, simulations). It self-registers, heartbeats, and is responsible for fanning a destination out when its local share saturates. It runs the `Member` class. +- **The coordinator** lives co-located with the caller. It receives application calls, resolves a destination through Valkey, and forwards the call over HTTP to the owning resource pod. It runs the `Registry` class. + +The caller speaks to the coordinator over an in-process channel (function call, message port, Watt Messaging API, etc.). The coordinator speaks to resource pods over HTTP. Both relationships are pluggable; the library does not mandate either transport. + +## Per-Request Flow + +```mermaid +sequenceDiagram + participant C as Client + participant A as Caller application + participant R as Coordinator in same pod + participant V as Valkey + participant P as Resource pod + participant B as Underlying backend + + C->>A: HTTP via LB + Note over A: auth, validate, build request + A->>R: call destination, payload + Note over A,R: in-process + R->>V: resolve destination, get pod set + Note over R: pick one pod from the set + R->>P: HTTP forward + P->>B: execute on resource for destination + B-->>P: result + P-->>R: result + R-->>A: result + A-->>C: HTTP response +``` + +Two HTTP hops on the external path (client to caller, coordinator to resource pod). Two Valkey reads per call worst case (the destination's pod set, then a chosen pod's address); most hit a short-lived local cache. When a destination is served by more than one pod, the coordinator picks among the destination's current pod set using the same allocation strategy as first-touch. The coordinator runs in every caller pod, so Valkey load scales with the caller tier. Valkey is cheap; the underlying resource (connections, sessions, etc.) is not. + +## No Stateful Protocol Needed + +Many resource protocols are stateful: database connections carry session state, transactions, advisory locks, and prepared statements; long-lived agents hold conversation context. Designs that proxy raw resource traffic preserve that statefulness via sticky TCP between client and owner. This pattern does not. The resource's statefulness is contained inside the resource pod's local owner; the hop between coordinator and pod is plain HTTP carrying a request payload, response payload, and opaque lockIds where needed. + +A lockId does not need TCP-socket affinity, only routing to the pod that holds the pinned resource. Destination-based routing handles the single-pod case; the lockId resolves through Valkey for the fanned-out case (see "Transactions and Locks"). The coordinator stays stateless throughout, and standard HTTP/2 multiplexing and L7 load balancing apply on the resource hop. + +The in-process channel between caller and coordinator is an optimization for co-location. The design does not depend on it: caller and coordinator could be split into separate deployments and use HTTP on that hop too, at the cost of one extra network round-trip per request. + +## Transactions and Locks + +Some resources are bound to a single owner for the lifetime of a session (a database transaction pinned to a connection, an agent step pinned to a process). Pinning happens on the resource pod under an opaque lock ID. When a destination is served by a single pod (the common case), destination-based routing is enough: every follow-up call lands on the pod that minted the lock. When a destination is fanned out across multiple pods (see "Load Metrics and Fan-out"), the coordinator resolves the lock ID through Valkey to find the owning pod and forwards there. The resource pod validates the token against its lock table. The lock ID stays opaque to clients throughout; only the coordinator and resource owner know the routing record. + +```mermaid +sequenceDiagram + participant A as Caller + participant R as Coordinator + participant P as Resource pod owning destination + participant B as Pinned resource + + A->>R: beginSession destination + R->>P: forward + P->>B: checkout, BEGIN + P-->>A: lockId + + A->>R: lockedCall lockId, payload + Note over R: routes by destination,
or by lockId lookup when fanned out + R->>P: forward + P->>B: execute on pinned resource + B-->>A: result + + A->>R: commitSession lockId + R->>P: forward + P->>B: COMMIT, release +``` + +From the caller's perspective: + +```ts +const { lockId } = await client.beginSession(destinationId) +// lockId is now a token; treat it as opaque + +await client.lockedCall(lockId, payload1) // same pod, automatic +await client.lockedCall(lockId, payload2) // same pod, automatic +await client.commitSession(lockId) // same pod, automatic +``` + +The caller never sees, parses, or cares which pod holds the pinned resource. The coordinator looks up the lockId in Valkey on every call and forwards to that pod, where the pinned resource lives. If the pod dies mid-session, the next call comes back as a session failure; the caller handles it like any other transient error and retries at the request level. + +Four invariants: + +1. **The lock ID is opaque to clients.** Callers cannot construct it. The coordinator only uses it as a lookup key; the resource owner owns the lock record and the pinned resource. +2. **Coordinators hold no lock state.** They route by destination; lock metadata, pinned resources, and timers live on the resource pod. +3. **The lock record is in Valkey, but the pinned resource is not.** The coordinator can resolve a lock from any Valkey replica, yet the live resource still lives only on the pod that minted it. If the pod exits, the lock record becomes invalid and the caller retries at the request level. +4. **The resource owner enforces cleanup.** Idle timeout, max lifetime, automatic rollback on channel close. Callers are not trusted to clean up locks they forgot. + +### Cancellation + +`AbortSignal` does not cross an in-process channel boundary in every runtime. Caller-side calls return a request ID; abort or timeout becomes an explicit `cancel(requestId)` message. The coordinator forwards it; the resource owner cancels the in-flight operation. + +## What Each Layer Owns + +| Layer | Owns | +|---|---| +| **Caller application** (stateless) | HTTP, auth, request validation, business logic, building the resource request, calling the coordinator | +| **Coordinator** (built on `@platformatic/coordinator`) | Coordinator entry point, Valkey destination resolution, HTTP forwarding, short-lived lookup cache, orphan detection and reassignment, picking among a destination's pod set when it is fanned out, routing locked calls by looking up the lock record in Valkey | +| **Resource pod** | Local resource ownership (pools, sessions, sandboxes), destination provisioning, lifecycle, transactions/sessions, pinning, cancellation, retry policy, metrics, lock timeouts, Valkey self-registration and heartbeat, publishing a load metric, detecting saturation, fanning a saturated destination out to the least-loaded live pod | + +The coordinator does not understand application semantics, parse payloads, authenticate, or hold resource/session/lock state. Its only job is to pick the right resource pod and forward the call. + +## Valkey State Model + +Valkey is the shared coordination layer for live pod membership, destination ownership, and lock routing. + +| Key | Shape | TTL | Owned by | Purpose | +|---|---|---|---|---| +| `:member:` | hash with fields `address` and `load` | 30 s, refreshed by heartbeat | resource pod | live pod registration and load metric | +| `:destination:` | set of `podId` values | none | coordinator and resource pod | the destination's pod set, sole source of truth for routing | +| `:lock:` | hash with `podId`, `destinationId`, and lock metadata | session or lock lifetime | resource pod | resolve lock-bound follow-up calls | + +The coordinator resolves stateless requests from the destination set on every call. First-touch placement and failover both mutate the set via `SADD` / `SREM`; no separate binding key exists. For lock-bound requests, the coordinator resolves the lock record first and then forwards to the owning pod. Resource pods are responsible for writing and cleaning up their own lock records. + +## Failure Handover + +When a resource pod dies, its member record in Valkey expires after the TTL (30 s). Destination sets have no TTL, so they outlive the pod and may briefly point at dead members. The next call for one of those destinations cleans up and routes. + +```mermaid +flowchart TD + A[Coordinator: resolve dest X] --> B[SMEMBERS destination:X] + B --> C{Any live pods?} + C -->|Yes, all live| D[Pick one by strategy,
forward] + C -->|Yes, some dead| E[SREM dead members,
forward to a live one] + C -->|No, set non-empty| F[Pick fresh live pod] + C -->|Empty or missing| G[First-touch: see allocation] + F --> H[SREM dead members,
SADD fresh pod] + H --> I[Forward,
cold resource opens on first hit] +``` + +Locks and sessions on the dead pod are gone. The design does not try to migrate them. The caller sees a normal failure and retries at the request level. + +`SADD` and `SREM` are atomic on their own, so concurrent reassignments by two coordinators are safe: both `SREM` the same dead member (idempotent), and if both `SADD` fresh picks, the destination ends up briefly fanned out across both new pods. That is a valid steady state. + +When a fanned-out destination loses one of its pods, the others keep serving. The coordinator `SREM`s the dead member on the next lookup that observes a dead address; the remaining pods cover the destination with no cold-resource moment. + +## Load Metrics and Fan-out + +Each resource pod publishes a single integer load value to Valkey on every heartbeat. The heartbeat updates the `load` field of the member record (`HSET :member: load `) in the same pipeline that resets the record's TTL. The value's meaning is whatever the pod decides via `getLoad`: open connections, active sessions, queued requests, GPU utilisation, anything that monotonically grows with how stretched the pod is. It serves operator scaling decisions and the runtime fan-out logic below. + +A destination is normally bound to one pod. When that pod's local share for a destination saturates (the per-destination cap is reached and the wait queue stays non-empty past a configured threshold), the pod fans the destination out. It reads `load` for every live pod, picks the one with the smallest value, and adds it to the destination's pod set: `SADD :destination:X memberId`. The coordinator notices the expanded set on the next lookup and starts splitting requests across both pods. + +### Worked example: PostgreSQL connection pool + +The pattern was first developed for a multi-tenant database tier. The mapping: + +| Abstract concept | PostgreSQL implementation | +|---|---| +| Resource pod | A pod running a PostgreSQL connection pool process, fronting one or more tenant databases | +| Destination | A tenant id (one logical database / schema bundle per tenant) | +| Local share | The per-tenant subset of pool connections for that destination | +| `load` | Total open PostgreSQL connections on the pod across all tenants (`pool.openCount()`) | +| Saturation signal | Per-tenant pool capped *and* request wait queue non-empty past a threshold | +| Lock ID | An opaque token returned by `beginTransaction`, mapped on the pod to one pinned PG connection inside a `BEGIN` block | +| Pinned resource | The PostgreSQL connection holding the open transaction | +| Cold local share opening on first hit | The pod lazily opens a new per-tenant pool slice the first time it receives a request for a tenant after being added to that tenant's destination set | + +In this case the scaler uses the same `load` value as a global signal: total open connections across the database tier divided by pod count. When that exceeds a threshold (well below the PostgreSQL `max_connections` limit, with headroom), the scaler adds a database pod; existing tenants stay sticky and new ones land on the new pod via the allocation strategy. + +The same pattern applies, with different mappings, to AI agent processes (load = active sessions; lock = an in-progress agent step), sandbox runners (load = active sandboxes; lock = an attached debugger session), or simulation workers (load = active simulations; lock = an in-flight tick). + +```mermaid +sequenceDiagram + participant Pod1 as Resource pod 1, saturated for dest X + participant V as Valkey + participant Pod2 as Resource pod 2, least loaded + participant R as Coordinator on next request for dest X + + Pod1->>Pod1: detect saturation for dest X + Pod1->>V: read load for live pods + Pod1->>Pod1: pick smallest count, pod 2 + Pod1->>V: SADD :destination:X pod 2 + + Note over V: :destination:X is now {pod 1, pod 2} + + R->>V: SMEMBERS :destination:X + V-->>R: pod 1, pod 2 + R->>Pod2: forward by allocation strategy + Pod2->>Pod2: open cold local share for dest X on first hit +``` + +The saturating pod initiates fan-out, not the coordinator. The decision is local: the pod sees its own share saturating and acts. Multiple pods can fan in over time if load keeps climbing; each adds itself at most once. + +The coordinator picks among a destination's pod set on the request path using the same `AllocationStrategy` as first-touch. With `round-robin`, requests cycle through the destination's pods. With `least-loaded`, each request goes to the pod in the set with the lowest `load`. + +Reducing fan-out (removing a pod from a destination's set when load drops) is not a runtime concern. It is an operator action or a slow background reconciler; running it from the data path risks thrash under bursty load. + +## Why Kubernetes + +The architecture leans on two K8s features that some other platforms (ECS, plain VM fleets) do not directly provide: + +- **Headless Services**: the coordinator addresses resource pods by pod IP, read from Valkey. Platforms that route traffic through load balancers or DNS service discovery typically do not expose a stable per-pod endpoint for the coordinator to dial directly. +- **Stable pod identity** (StatefulSet-style): the identity registered into Valkey is the pod's K8s identity. Ephemeral task IPs force the heartbeat path to do more work after every restart. + +The in-process co-location of caller and coordinator inside one process is a separate constraint and works on either platform. ECS can be made to work with Cloud Map for service discovery and a custom registration path, but at meaningful cost to operational simplicity. + +## Scaling + +The scaler reads each pod's `load` from Valkey and adds a resource pod when it climbs past a threshold. Platformatic ICC supports arbitrary metrics, including this one. New destinations land on the new pod via the allocation strategy; existing destinations stay sticky. + +### How allocation works + +The strategy runs at first touch, when the coordinator sees a destination with no pod set, and at failover, when an existing set has no live members. When a destination has more than one live pod, the same strategy also chooses the request target from that pod set on the hot path. The strategy does not run for single-pod destinations after they are bound. + +The flow at first touch: + +1. Coordinator receives a request for destination *D*. +2. Registry runs `SMEMBERS :destination:D`; set is empty. +3. Registry calls `strategy.pick(liveMembers, { destinationId: D })`. +4. Registry runs `SADD :destination:D `. Concurrent racers each add their pick; the set's members are all valid serving pods for D. +5. Coordinator forwards to the picked pod. + +Built-in strategies: + +- **Round-robin**: each coordinator replica keeps an in-memory cursor that advances per pick. Different replicas have independent cursors but over time spread evenly across pods. Zero Valkey reads per pick. +- **Least-loaded**: reads `load` from each candidate pod's member record (a pipeline of `HGET`s). Smallest wins; ties go round-robin. One extra Valkey round-trip per pick. For single-pod destinations, that means first touch only; for fanned-out destinations the cost lands on every request. +- **Random**: a `Math.random()`. For zero-coordination sharding. + +Custom strategies receive the destination ID through the context and can branch on it. For example, send tagged "dedicated" tenants to a designated pod and round-robin "shared" ones across the rest. The classification source is left to the integrator: a config service, a Valkey tag, a database lookup. + +Failover runs the same `strategy.pick(...)` and applies the change with `SREM` for dead members and `SADD` for the freshly picked pod. Both are atomic; concurrent racers may end up with multiple new members in the set, which is acceptable. + +### High-volume and low-volume tenants + +Two tenant regimes coexist, and the design treats them differently. + +Low-volume tenants are many, each cheap. Round-robin or least-loaded allocation packs many destinations per pod. Cold-start cost is paid once per tenant on first hit; the warm footprint is small. Most tenants live their entire life on a single pod. + +High-volume tenants are few, each expensive. A single tenant can outgrow a single pod's local capacity. The architecture handles this *reactively* through fan-out (see "Load Metrics and Fan-out"): when a tenant saturates a pod's local share, the pod adds another pod to the tenant's set and the coordinator spreads load across them. Per-tenant caps inside the resource owner keep one tenant from monopolizing a pod's budget while the fan-out signal builds. + +Reactive fan-out works well for tenants whose volume is hard to predict. For tenants known in advance to be heavy (a large enterprise customer, a regional shard), proactive provisioning is still useful: tag the tenant as "dedicated" at first touch and use a custom `AllocationStrategy` that pins it to a designated subset of pods from the start. The two mechanisms compose: a tenant pinned to a dedicated pod can still fan out further if it outgrows it. + +Rebalancing an established destination (moving it to a different pod, rather than adding another) is expensive: the old pod's local share drains, the set membership flips, the new pod opens cold. It should be opt-in, triggered by operator action or a slow background job, not by autoscaling. Adding capacity is cheap; fan-out is cheap; reshape is not. diff --git a/package.json b/package.json index 73b2f79..3c075bf 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@platformatic/coordinator", - "version": "0.1.3", - "description": "Sticky-resource coordinator: Redis-backed registry, allocation strategies, and Fastify route helpers for multi-pod services", + "version": "0.2.0", + "description": "Multi-pod destination routing for stateful tiers: Valkey-backed registry with hash member records, set-based destinations, lock routing, and allocation strategies", "homepage": "https://github.com/platformatic/coordinator", "author": "Platformatic Inc. (https://platformatic.dev)", "license": "Apache-2.0", @@ -11,7 +11,7 @@ }, "private": false, "publishConfig": { - "access": "restricted" + "access": "public" }, "files": [ "dist", diff --git a/src/cache.ts b/src/cache.ts new file mode 100644 index 0000000..683c549 --- /dev/null +++ b/src/cache.ts @@ -0,0 +1,51 @@ +export interface CacheOptions { + ttl?: number // milliseconds; default 5000 + max?: number // entries; default 10000 +} + +interface Entry { + value: V + expires: number +} + +export class TTLCache { + #map = new Map>() + #ttl: number + #max: number + + constructor (opts: CacheOptions = {}) { + this.#ttl = opts.ttl ?? 5000 + this.#max = opts.max ?? 10000 + } + + get (key: K): V | undefined { + const entry = this.#map.get(key) + if (!entry) return undefined + if (entry.expires < Date.now()) { + this.#map.delete(key) + return undefined + } + // Refresh insertion order so frequently-read entries are evicted last. + this.#map.delete(key) + this.#map.set(key, entry) + return entry.value + } + + set (key: K, value: V): void { + if (this.#map.has(key)) this.#map.delete(key) + while (this.#map.size >= this.#max) { + const oldest = this.#map.keys().next().value + if (oldest === undefined) break + this.#map.delete(oldest) + } + this.#map.set(key, { value, expires: Date.now() + this.#ttl }) + } + + delete (key: K): void { + this.#map.delete(key) + } + + clear (): void { + this.#map.clear() + } +} diff --git a/src/helpers/lookup-and-deregister.ts b/src/helpers/lookup-and-deregister.ts index ee97ce5..37594e9 100644 --- a/src/helpers/lookup-and-deregister.ts +++ b/src/helpers/lookup-and-deregister.ts @@ -10,7 +10,7 @@ export type LookupAndDeregisterResult = | 'upstream_error' export interface LookupAndDeregisterOptions { - instanceFrom: (req: FastifyRequest) => string + destinationFrom: (req: FastifyRequest) => string expectedStatus?: number notFoundMessage?: string onResult?: (result: LookupAndDeregisterResult) => void @@ -21,36 +21,38 @@ export function lookupAndDeregister ( opts: LookupAndDeregisterOptions ): RouteHandlerMethod { const { - instanceFrom, + destinationFrom, expectedStatus = 204, - notFoundMessage = 'Instance not found', + notFoundMessage = 'Destination not found', onResult } = opts return async function (request: FastifyRequest, reply: FastifyReply) { - const instanceId = instanceFrom(request) - const resolved = await registry.resolveInstance(instanceId) + const destinationId = destinationFrom(request) + const resolved = await registry.resolveDestination(destinationId) if (!resolved) { + // No live pod for this destination. Distinguish "binding exists but pods dead" + // from "destination unknown". + const exists = await registry.hasBinding(destinationId) + if (exists) { + await registry.deregisterDestination(destinationId) + onResult?.('deregistered_dead_pod') + return reply.code(expectedStatus).send() + } onResult?.('not_found') return reply.code(404).send({ error: notFoundMessage }) } - if (resolved.address === null) { - await registry.deregisterInstance(instanceId) - onResult?.('deregistered_dead_pod') - return reply.code(expectedStatus).send() - } - const onResponse: FastifyReplyFromHooks['onResponse'] = (_req, replyOut, res) => { if (res.statusCode === expectedStatus) { res.stream.resume() - registry.deregisterInstance(instanceId).then( + registry.deregisterDestination(destinationId).then( () => { onResult?.('deregistered') replyOut.send() }, - (err) => replyOut.send(err) + (err: Error) => replyOut.send(err) ) } else { onResult?.('upstream_error') diff --git a/src/helpers/lookup-and-proxy.ts b/src/helpers/lookup-and-proxy.ts index 7b17263..7bd802c 100644 --- a/src/helpers/lookup-and-proxy.ts +++ b/src/helpers/lookup-and-proxy.ts @@ -5,8 +5,9 @@ import type { Registry } from '../registry.ts' export type LookupAndProxyResult = 'hit' | 'orphan_reassigned' | 'not_found' export interface LookupAndProxyOptions { - instanceFrom: (req: FastifyRequest) => string + destinationFrom: (req: FastifyRequest) => string reassignOrphans?: boolean + claimOnMiss?: boolean notFoundMessage?: string onResult?: (result: LookupAndProxyResult) => void } @@ -16,17 +17,18 @@ export function lookupAndProxy ( opts: LookupAndProxyOptions ): RouteHandlerMethod { const { - instanceFrom, + destinationFrom, reassignOrphans = false, - notFoundMessage = 'Instance not found', + claimOnMiss = false, + notFoundMessage = 'Destination not found', onResult } = opts return async function (request: FastifyRequest, reply: FastifyReply) { - const instanceId = instanceFrom(request) - const resolved = await registry.resolveInstance(instanceId, { reassignOrphans }) + const destinationId = destinationFrom(request) + const resolved = await registry.resolveDestination(destinationId, { reassignOrphans, claimOnMiss }) - if (!resolved || resolved.address === null) { + if (!resolved) { onResult?.('not_found') return reply.code(404).send({ error: notFoundMessage }) } diff --git a/src/helpers/pick-and-register.ts b/src/helpers/pick-and-register.ts index f6a0428..5bb23ee 100644 --- a/src/helpers/pick-and-register.ts +++ b/src/helpers/pick-and-register.ts @@ -47,12 +47,12 @@ export function pickAndRegister ( if (res.statusCode === expectedStatus) { const id = registerIdFrom(body) - registry.registerInstance(id, member.memberId).then( + registry.addPodToDestination(id, member.memberId).then( () => { onResult?.('spawned') replyOut.send(body) }, - (err) => replyOut.send(err) + (err: Error) => replyOut.send(err) ) } else { onResult?.('upstream_error') diff --git a/src/index.ts b/src/index.ts index 7123707..861f3ef 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ export { Registry } from './registry.ts' -export type { RegistryOptions, MemberInfo, ResolveResult } from './registry.ts' +export type { RegistryOptions, ResolveResult, ResolveLockResult } from './registry.ts' export { Member } from './member.ts' export type { MemberOptions } from './member.ts' @@ -10,7 +10,10 @@ export { RandomStrategy, createStrategy } from './strategies.ts' -export type { AllocationStrategy, MemberWithLoad } from './strategies.ts' +export type { AllocationStrategy, MemberInfo, PickContext } from './strategies.ts' + +export { TTLCache } from './cache.ts' +export type { CacheOptions } from './cache.ts' export { proxyRequest } from './proxy-request.ts' export type { ProxyRequestOptions } from './proxy-request.ts' diff --git a/src/member.ts b/src/member.ts index 8c04311..f8df529 100644 --- a/src/member.ts +++ b/src/member.ts @@ -1,4 +1,5 @@ import { Redis } from 'iovalkey' +import type { MemberInfo } from './strategies.ts' export interface MemberOptions { redis: string @@ -6,6 +7,7 @@ export interface MemberOptions { address: string keyPrefix?: string ttl?: number + getLoad?: () => number } export class Member { @@ -14,6 +16,7 @@ export class Member { #address: string #keyPrefix: string #ttl: number + #getLoad: () => number constructor (opts: MemberOptions) { this.#redis = new Redis(opts.redis) @@ -21,6 +24,7 @@ export class Member { this.#address = opts.address this.#keyPrefix = opts.keyPrefix ?? 'coordinator' this.#ttl = opts.ttl ?? 30 + this.#getLoad = opts.getLoad ?? (() => 0) } get memberId (): string { @@ -35,48 +39,88 @@ export class Member { return `${this.#keyPrefix}:members` } - #memberKey (): string { - return `${this.#keyPrefix}:member:${this.#memberId}` + #memberKey (memberId: string = this.#memberId): string { + return `${this.#keyPrefix}:member:${memberId}` } - #instanceKey (instanceId: string): string { - return `${this.#keyPrefix}:instance:${instanceId}` + #destinationKey (destId: string): string { + return `${this.#keyPrefix}:destination:${destId}` } - #memberLoadKey (): string { - return `${this.#keyPrefix}:member:${this.#memberId}:instances` + #lockKey (lockId: string): string { + return `${this.#keyPrefix}:lock:${lockId}` } async register (): Promise { - await this.#redis.sadd(this.#membersKey(), this.#memberId) - await this.#redis.set(this.#memberKey(), this.#address, 'EX', this.#ttl) - await this.#redis.set(this.#memberLoadKey(), '0', 'EX', this.#ttl) + const pipeline = this.#redis.pipeline() + pipeline.sadd(this.#membersKey(), this.#memberId) + pipeline.hset(this.#memberKey(), { + address: this.#address, + load: String(this.#getLoad()) + }) + pipeline.expire(this.#memberKey(), this.#ttl) + await pipeline.exec() + } + + async heartbeat (): Promise { + const pipeline = this.#redis.pipeline() + pipeline.hset(this.#memberKey(), 'load', String(this.#getLoad())) + pipeline.expire(this.#memberKey(), this.#ttl) + await pipeline.exec() } async deregister (): Promise { - await this.#redis.srem(this.#membersKey(), this.#memberId) - await this.#redis.del(this.#memberKey(), this.#memberLoadKey()) + const pipeline = this.#redis.pipeline() + pipeline.srem(this.#membersKey(), this.#memberId) + pipeline.del(this.#memberKey()) + await pipeline.exec() } - async heartbeat (): Promise { - await this.#redis.expire(this.#memberKey(), this.#ttl) - await this.#redis.expire(this.#memberLoadKey(), this.#ttl) + async addToDestination (destinationId: string): Promise { + await this.#redis.sadd(this.#destinationKey(destinationId), this.#memberId) + } + + async removeFromDestination (destinationId: string): Promise { + await this.#redis.srem(this.#destinationKey(destinationId), this.#memberId) } - async registerInstance (instanceId: string): Promise { - await this.#redis.set(this.#instanceKey(instanceId), this.#memberId) - await this.#redis.incr(this.#memberLoadKey()) + async registerLock ( + lockId: string, + destinationId: string, + metadata: Record = {} + ): Promise { + await this.#redis.hset(this.#lockKey(lockId), { + podId: this.#memberId, + destinationId, + ...metadata + }) } - async deregisterInstance (instanceId: string): Promise { - await this.#redis.del(this.#instanceKey(instanceId)) - await this.#redis.decr(this.#memberLoadKey()) + async unregisterLock (lockId: string): Promise { + await this.#redis.del(this.#lockKey(lockId)) } - async lookupInstance (instanceId: string): Promise { - const memberId = await this.#redis.get(this.#instanceKey(instanceId)) - if (!memberId) return null - return this.#redis.get(`${this.#keyPrefix}:member:${memberId}`) + async listPeerLoad (): Promise { + const memberIds = await this.#redis.smembers(this.#membersKey()) + if (memberIds.length === 0) return [] + + const pipeline = this.#redis.pipeline() + for (const id of memberIds) { + pipeline.hmget(this.#memberKey(id), 'address', 'load') + } + const results = await pipeline.exec() + if (!results) return [] + + const live: MemberInfo[] = [] + for (let i = 0; i < memberIds.length; i++) { + const [err, fields] = results[i] as [Error | null, (string | null)[] | null] + if (err || !fields) continue + const address = fields[0] + if (!address) continue + const load = parseInt(fields[1] ?? '0', 10) || 0 + live.push({ memberId: memberIds[i], address, load }) + } + return live } async close (): Promise { diff --git a/src/registry.ts b/src/registry.ts index a769887..2c4f60a 100644 --- a/src/registry.ts +++ b/src/registry.ts @@ -1,28 +1,37 @@ import { Redis } from 'iovalkey' -import { createStrategy, type AllocationStrategy, type MemberWithLoad } from './strategies.ts' - -export interface MemberInfo { - memberId: string - address: string -} +import { createStrategy, type AllocationStrategy, type MemberInfo } from './strategies.ts' +import { TTLCache, type CacheOptions } from './cache.ts' export interface RegistryOptions { redis: string keyPrefix?: string strategy?: 'round-robin' | 'least-loaded' | 'random' | AllocationStrategy requestTimeout?: number + cache?: CacheOptions | false } export interface ResolveResult { - address: string | null + address: string + memberId: string reassigned: boolean } +export interface ResolveLockResult { + address: string + memberId: string +} + +interface MemberRecord { + memberId: string + address: string | null + load: number +} + export class Registry { #redis: Redis #keyPrefix: string #strategy: AllocationStrategy - + #cache: TTLCache | null readonly requestTimeout: number | undefined constructor (opts: RegistryOptions) { @@ -35,6 +44,9 @@ export class Registry { this.#strategy = createStrategy(opts.strategy ?? 'round-robin') } + this.#cache = opts.cache === false + ? null + : new TTLCache(opts.cache) this.requestTimeout = opts.requestTimeout } @@ -46,81 +58,150 @@ export class Registry { return `${this.#keyPrefix}:member:${memberId}` } - #instanceKey (instanceId: string): string { - return `${this.#keyPrefix}:instance:${instanceId}` + #destinationKey (destId: string): string { + return `${this.#keyPrefix}:destination:${destId}` } - #memberLoadKey (memberId: string): string { - return `${this.#keyPrefix}:member:${memberId}:instances` + #lockKey (lockId: string): string { + return `${this.#keyPrefix}:lock:${lockId}` } - async listMembers (): Promise { + async listLiveMembers (): Promise { const memberIds = await this.#redis.smembers(this.#membersKey()) - const members: MemberInfo[] = [] - for (const memberId of memberIds) { - const address = await this.#redis.get(this.#memberKey(memberId)) - if (address) { - members.push({ memberId, address }) - } + if (memberIds.length === 0) return [] + + const pipeline = this.#redis.pipeline() + for (const id of memberIds) { + pipeline.hmget(this.#memberKey(id), 'address', 'load') } - return members + const results = await pipeline.exec() + if (!results) return [] + + const live: MemberInfo[] = [] + for (let i = 0; i < memberIds.length; i++) { + const [err, fields] = results[i] as [Error | null, (string | null)[] | null] + if (err || !fields) continue + const address = fields[0] + if (!address) continue + const load = parseInt(fields[1] ?? '0', 10) || 0 + live.push({ memberId: memberIds[i], address, load }) + } + return live } - async listMembersWithLoad (): Promise { - const members = await this.listMembers() - if (members.length === 0) return [] - - const countKeys = members.map(m => this.#memberLoadKey(m.memberId)) - const counts = await this.#redis.mget(...countKeys) - - return members.map((member, i) => ({ - ...member, - instanceCount: parseInt(counts[i] ?? '0', 10) || 0 - })) + async pickMember (ctx: { destinationId?: string } = {}): Promise { + const live = await this.listLiveMembers() + return this.#strategy.pick(live, ctx) } - async pickMember (): Promise { - const members = await this.listMembersWithLoad() - return this.#strategy.pick(members) + async resolveDestination ( + destinationId: string, + opts: { claimOnMiss?: boolean, reassignOrphans?: boolean } = {} + ): Promise { + if (this.#cache) { + const cached = this.#cache.get(destinationId) + if (cached) return cached + } + const result = await this.#resolveDestinationUncached(destinationId, opts) + if (this.#cache && result) this.#cache.set(destinationId, result) + return result } - async lookupInstance (instanceId: string): Promise { - const memberId = await this.#redis.get(this.#instanceKey(instanceId)) - if (!memberId) return null - return this.#redis.get(this.#memberKey(memberId)) - } + async #resolveDestinationUncached ( + destinationId: string, + opts: { claimOnMiss?: boolean, reassignOrphans?: boolean } + ): Promise { + const podIds = await this.#redis.smembers(this.#destinationKey(destinationId)) + + if (podIds.length === 0) { + if (!opts.claimOnMiss) return null + const live = await this.listLiveMembers() + if (live.length === 0) return null + const picked = this.#strategy.pick(live, { destinationId }) + if (!picked) return null + await this.#redis.sadd(this.#destinationKey(destinationId), picked.memberId) + return { address: picked.address, memberId: picked.memberId, reassigned: false } + } - async lookupInstanceMemberId (instanceId: string): Promise { - return this.#redis.get(this.#instanceKey(instanceId)) - } + const records = await this.#getMemberRecords(podIds) + const livePods: MemberInfo[] = [] + const deadPods: string[] = [] + for (const r of records) { + if (r.address) { + livePods.push({ memberId: r.memberId, address: r.address, load: r.load }) + } else { + deadPods.push(r.memberId) + } + } - async resolveInstance ( - instanceId: string, - opts: { reassignOrphans?: boolean } = {} - ): Promise { - const memberId = await this.#redis.get(this.#instanceKey(instanceId)) - if (!memberId) return null + if (livePods.length > 0) { + if (deadPods.length > 0) { + this.#redis.srem(this.#destinationKey(destinationId), ...deadPods).catch(() => {}) + } + const picked = this.#strategy.pick(livePods, { destinationId }) + if (!picked) return null + return { address: picked.address, memberId: picked.memberId, reassigned: false } + } + + if (!opts.reassignOrphans) return null + const live = await this.listLiveMembers() + if (live.length === 0) return null + const picked = this.#strategy.pick(live, { destinationId }) + if (!picked) return null + + const pipeline = this.#redis.pipeline() + for (const deadId of deadPods) { + pipeline.srem(this.#destinationKey(destinationId), deadId) + } + pipeline.sadd(this.#destinationKey(destinationId), picked.memberId) + await pipeline.exec() - const address = await this.#redis.get(this.#memberKey(memberId)) - if (address) return { address, reassigned: false } + return { address: picked.address, memberId: picked.memberId, reassigned: true } + } - if (!opts.reassignOrphans) { - return { address: null, reassigned: false } + async #getMemberRecords (memberIds: string[]): Promise { + if (memberIds.length === 0) return [] + const pipeline = this.#redis.pipeline() + for (const id of memberIds) { + pipeline.hmget(this.#memberKey(id), 'address', 'load') } + const results = await pipeline.exec() + if (!results) return memberIds.map(memberId => ({ memberId, address: null, load: 0 })) + + return memberIds.map((memberId, i) => { + const [err, fields] = results[i] as [Error | null, (string | null)[] | null] + if (err || !fields) return { memberId, address: null, load: 0 } + const address = fields[0] ?? null + const load = parseInt(fields[1] ?? '0', 10) || 0 + return { memberId, address, load } + }) + } - const newPod = await this.pickMember() - if (!newPod) return { address: null, reassigned: false } + async addPodToDestination (destinationId: string, memberId: string): Promise { + await this.#redis.sadd(this.#destinationKey(destinationId), memberId) + if (this.#cache) this.#cache.delete(destinationId) + } + + async hasBinding (destinationId: string): Promise { + const count = await this.#redis.scard(this.#destinationKey(destinationId)) + return count > 0 + } - await this.registerInstance(instanceId, newPod.memberId) - return { address: newPod.address, reassigned: true } + async resolveLock (lockId: string): Promise { + const podId = await this.#redis.hget(this.#lockKey(lockId), 'podId') + if (!podId) return null + const address = await this.#redis.hget(this.#memberKey(podId), 'address') + if (!address) return null + return { address, memberId: podId } } - async registerInstance (instanceId: string, memberId: string): Promise { - await this.#redis.set(this.#instanceKey(instanceId), memberId) + invalidateCache (destinationId: string): void { + if (this.#cache) this.#cache.delete(destinationId) } - async deregisterInstance (instanceId: string): Promise { - await this.#redis.del(this.#instanceKey(instanceId)) + async deregisterDestination (destinationId: string): Promise { + await this.#redis.del(this.#destinationKey(destinationId)) + if (this.#cache) this.#cache.delete(destinationId) } async close (): Promise { diff --git a/src/strategies.ts b/src/strategies.ts index a5b8c82..47089d7 100644 --- a/src/strategies.ts +++ b/src/strategies.ts @@ -1,20 +1,24 @@ -export interface MemberWithLoad { +export interface MemberInfo { memberId: string address: string - instanceCount: number + load: number +} + +export interface PickContext { + destinationId?: string } export interface AllocationStrategy { - pick (members: MemberWithLoad[]): MemberWithLoad | null + pick (candidates: MemberInfo[], ctx: PickContext): MemberInfo | null } export class RoundRobinStrategy implements AllocationStrategy { #index = 0 - pick (members: MemberWithLoad[]): MemberWithLoad | null { - if (members.length === 0) return null - const member = members[this.#index % members.length] - this.#index = (this.#index + 1) % members.length + pick (candidates: MemberInfo[], _ctx: PickContext): MemberInfo | null { + if (candidates.length === 0) return null + const member = candidates[this.#index % candidates.length] + this.#index = (this.#index + 1) % candidates.length return member } } @@ -22,20 +26,20 @@ export class RoundRobinStrategy implements AllocationStrategy { export class LeastLoadedStrategy implements AllocationStrategy { #tieBreaker = 0 - pick (members: MemberWithLoad[]): MemberWithLoad | null { - if (members.length === 0) return null - const minCount = Math.min(...members.map(m => m.instanceCount)) - const candidates = members.filter(m => m.instanceCount === minCount) - const member = candidates[this.#tieBreaker % candidates.length] - this.#tieBreaker = (this.#tieBreaker + 1) % candidates.length + pick (candidates: MemberInfo[], _ctx: PickContext): MemberInfo | null { + if (candidates.length === 0) return null + const min = Math.min(...candidates.map(m => m.load)) + const tied = candidates.filter(m => m.load === min) + const member = tied[this.#tieBreaker % tied.length] + this.#tieBreaker = (this.#tieBreaker + 1) % tied.length return member } } export class RandomStrategy implements AllocationStrategy { - pick (members: MemberWithLoad[]): MemberWithLoad | null { - if (members.length === 0) return null - return members[Math.floor(Math.random() * members.length)] + pick (candidates: MemberInfo[], _ctx: PickContext): MemberInfo | null { + if (candidates.length === 0) return null + return candidates[Math.floor(Math.random() * candidates.length)] } } diff --git a/test/cache.test.ts b/test/cache.test.ts new file mode 100644 index 0000000..3301ee6 --- /dev/null +++ b/test/cache.test.ts @@ -0,0 +1,60 @@ +import { strictEqual, ok } from 'node:assert' +import test from 'node:test' +import { setTimeout as sleep } from 'node:timers/promises' +import { TTLCache } from '../src/cache.ts' + +test('TTLCache returns undefined for missing keys', () => { + const c = new TTLCache({ ttl: 1000 }) + strictEqual(c.get('missing'), undefined) +}) + +test('TTLCache returns set values within TTL', () => { + const c = new TTLCache({ ttl: 1000 }) + c.set('a', 1) + strictEqual(c.get('a'), 1) +}) + +test('TTLCache expires entries past TTL', async () => { + const c = new TTLCache({ ttl: 50 }) + c.set('a', 1) + await sleep(80) + strictEqual(c.get('a'), undefined) +}) + +test('TTLCache evicts oldest when max is reached', () => { + const c = new TTLCache({ ttl: 60_000, max: 2 }) + c.set('a', 1) + c.set('b', 2) + c.set('c', 3) + strictEqual(c.get('a'), undefined, 'oldest evicted') + strictEqual(c.get('b'), 2) + strictEqual(c.get('c'), 3) +}) + +test('TTLCache get refreshes insertion order', () => { + const c = new TTLCache({ ttl: 60_000, max: 2 }) + c.set('a', 1) + c.set('b', 2) + // Read 'a' so it becomes the most-recently-used. + ok(c.get('a')) + c.set('c', 3) + strictEqual(c.get('b'), undefined, 'b was evicted, not a') + strictEqual(c.get('a'), 1) + strictEqual(c.get('c'), 3) +}) + +test('TTLCache delete removes entries', () => { + const c = new TTLCache({ ttl: 1000 }) + c.set('a', 1) + c.delete('a') + strictEqual(c.get('a'), undefined) +}) + +test('TTLCache clear empties the cache', () => { + const c = new TTLCache({ ttl: 1000 }) + c.set('a', 1) + c.set('b', 2) + c.clear() + strictEqual(c.get('a'), undefined) + strictEqual(c.get('b'), undefined) +}) diff --git a/test/helpers.test.ts b/test/helpers.test.ts index 9d295fc..32384b6 100644 --- a/test/helpers.test.ts +++ b/test/helpers.test.ts @@ -12,9 +12,9 @@ import { REDIS_URL } from './redis-url.ts' const PREFIX = `test-${randomBytes(4).toString('hex')}` -function membersKey (): string { return `${PREFIX}:members` } -function memberKey (id: string): string { return `${PREFIX}:member:${id}` } -function resourceKey (id: string): string { return `${PREFIX}:instance:${id}` } +const membersKey = (): string => `${PREFIX}:members` +const memberKey = (id: string): string => `${PREFIX}:member:${id}` +const destinationKey = (id: string): string => `${PREFIX}:destination:${id}` interface MockPod { app: ReturnType @@ -49,7 +49,6 @@ async function createMockPod (): Promise { app.post('/resources/:id/heartbeat', async (req, reply) => { const { id } = req.params as { id: string } if (!resources.has(id)) { - // Simulate auto-restore so orphan reassignment can succeed resources.set(id, { resourceId: id, status: 'restored' }) } return reply.code(204).send() @@ -60,7 +59,7 @@ async function createMockPod (): Promise { return { app, address: `http://127.0.0.1:${addr.port}`, resources } } -async function createCoordinator (registry: Registry) { +async function createCoordinator (registry: Registry): Promise> { const app = Fastify() await app.register(replyFrom) @@ -71,22 +70,28 @@ async function createCoordinator (registry: Registry) { app.post('/resources/:id/echo', { schema: { body: { type: 'object', properties: { msg: { type: 'string' } } } } }, lookupAndProxy(registry, { - instanceFrom: (req: any) => req.params.id, + destinationFrom: (req: any) => req.params.id, reassignOrphans: true })) app.post('/resources/:id/heartbeat', lookupAndProxy(registry, { - instanceFrom: (req: any) => req.params.id, + destinationFrom: (req: any) => req.params.id, reassignOrphans: true })) app.delete('/resources/:id', lookupAndDeregister(registry, { - instanceFrom: (req: any) => req.params.id + destinationFrom: (req: any) => req.params.id })) return app } +async function makeLivePod (redis: Redis, memberId: string, address: string): Promise { + await redis.sadd(membersKey(), memberId) + await redis.hset(memberKey(memberId), { address, load: '0' }) + await redis.expire(memberKey(memberId), 60) +} + test('Coordinator helpers', async (t) => { const redis = new Redis(REDIS_URL) const memberId1 = 'pod-1' @@ -95,12 +100,10 @@ test('Coordinator helpers', async (t) => { const pod1 = await createMockPod() const pod2 = await createMockPod() - await redis.sadd(membersKey(), memberId1) - await redis.set(memberKey(memberId1), pod1.address, 'EX', 60) - await redis.sadd(membersKey(), memberId2) - await redis.set(memberKey(memberId2), pod2.address, 'EX', 60) + await makeLivePod(redis, memberId1, pod1.address) + await makeLivePod(redis, memberId2, pod2.address) - const registry = new Registry({ redis: REDIS_URL, keyPrefix: PREFIX }) + const registry = new Registry({ redis: REDIS_URL, keyPrefix: PREFIX, cache: false }) const coordinator = await createCoordinator(registry) t.after(async () => { @@ -115,19 +118,20 @@ test('Coordinator helpers', async (t) => { await redis.quit() }) - await t.test('pickAndRegister: spawns and registers in Redis', async () => { + await t.test('pickAndRegister: spawns and binds in Redis', async () => { const res = await coordinator.inject({ method: 'POST', url: '/resources' }) strictEqual(res.statusCode, 201) const body = res.json() as any ok(body.resourceId) - const memberId = await redis.get(resourceKey(body.resourceId)) - ok(memberId === memberId1 || memberId === memberId2) + const set = await redis.smembers(destinationKey(body.resourceId)) + strictEqual(set.length, 1) + ok(set[0] === memberId1 || set[0] === memberId2) }) await t.test('pickAndRegister: returns 503 when no pods are available', async () => { const isolatedPrefix = `${PREFIX}-empty-${randomBytes(2).toString('hex')}` - const emptyRegistry = new Registry({ redis: REDIS_URL, keyPrefix: isolatedPrefix }) + const emptyRegistry = new Registry({ redis: REDIS_URL, keyPrefix: isolatedPrefix, cache: false }) const app = Fastify() await app.register(replyFrom) app.post('/spawn', pickAndRegister(emptyRegistry, { registerIdFrom: (r: any) => r.id })) @@ -143,7 +147,7 @@ test('Coordinator helpers', async (t) => { } }) - await t.test('lookupAndProxy: routes to the registered pod', async () => { + await t.test('lookupAndProxy: routes to the bound pod', async () => { const spawnRes = await coordinator.inject({ method: 'POST', url: '/resources' }) const id = (spawnRes.json() as any).resourceId @@ -166,13 +170,12 @@ test('Coordinator helpers', async (t) => { }) strictEqual(res.statusCode, 404) const body = res.json() as any - strictEqual(body.error, 'Instance not found') + strictEqual(body.error, 'Destination not found') }) await t.test('lookupAndProxy: reassigns orphan when reassignOrphans is true', async () => { const orphanId = `orphan-${randomBytes(3).toString('hex')}` - const deadMemberId = 'dead-pod' - await redis.set(resourceKey(orphanId), deadMemberId) + await redis.sadd(destinationKey(orphanId), 'dead-pod') const res = await coordinator.inject({ method: 'POST', @@ -180,38 +183,33 @@ test('Coordinator helpers', async (t) => { }) strictEqual(res.statusCode, 204) - const newMember = await redis.get(resourceKey(orphanId)) - ok(newMember === memberId1 || newMember === memberId2, 'should reassign to a live pod') + const set = await redis.smembers(destinationKey(orphanId)) + strictEqual(set.length, 1) + ok(set[0] === memberId1 || set[0] === memberId2, 'reassigned to a live pod') + ok(!set.includes('dead-pod')) }) - await t.test('lookupAndDeregister: deletes the resource and the registry mapping', async () => { + await t.test('lookupAndDeregister: deletes the resource and the destination set', async () => { const spawnRes = await coordinator.inject({ method: 'POST', url: '/resources' }) const id = (spawnRes.json() as any).resourceId - const delRes = await coordinator.inject({ - method: 'DELETE', - url: `/resources/${id}` - }) + const delRes = await coordinator.inject({ method: 'DELETE', url: `/resources/${id}` }) strictEqual(delRes.statusCode, 204) - const mapping = await redis.get(resourceKey(id)) - strictEqual(mapping, null) + const exists = await redis.exists(destinationKey(id)) + strictEqual(exists, 0) }) await t.test('lookupAndDeregister: fast-paths when the pod is dead', async () => { const orphanId = `orphan-del-${randomBytes(3).toString('hex')}` - const deadMemberId = 'dead-pod' - await redis.set(resourceKey(orphanId), deadMemberId) + await redis.sadd(destinationKey(orphanId), 'dead-pod') const totalSpawnsBefore = pod1.resources.size + pod2.resources.size - const res = await coordinator.inject({ - method: 'DELETE', - url: `/resources/${orphanId}` - }) + const res = await coordinator.inject({ method: 'DELETE', url: `/resources/${orphanId}` }) strictEqual(res.statusCode, 204) - const mapping = await redis.get(resourceKey(orphanId)) - strictEqual(mapping, null, 'mapping should be removed') + const exists = await redis.exists(destinationKey(orphanId)) + strictEqual(exists, 0, 'set should be removed') const totalSpawnsAfter = pod1.resources.size + pod2.resources.size strictEqual(totalSpawnsAfter, totalSpawnsBefore, 'no proxy call should reach a live pod') diff --git a/test/member.test.ts b/test/member.test.ts index d55ed4e..2005d0c 100644 --- a/test/member.test.ts +++ b/test/member.test.ts @@ -1,4 +1,4 @@ -import { strictEqual, ok } from 'node:assert' +import { strictEqual, ok, deepStrictEqual } from 'node:assert' import { randomBytes } from 'node:crypto' import { setTimeout as sleep } from 'node:timers/promises' import test from 'node:test' @@ -8,27 +8,24 @@ import { REDIS_URL } from './redis-url.ts' const PREFIX = `test-${randomBytes(4).toString('hex')}` -function membersKey (): string { - return `${PREFIX}:members` -} - -function memberKey (memberId: string): string { - return `${PREFIX}:member:${memberId}` -} - -function instanceKey (instanceId: string): string { - return `${PREFIX}:instance:${instanceId}` -} - -function loadKey (memberId: string): string { - return `${PREFIX}:member:${memberId}:instances` -} +const membersKey = (): string => `${PREFIX}:members` +const memberKey = (id: string): string => `${PREFIX}:member:${id}` +const destinationKey = (id: string): string => `${PREFIX}:destination:${id}` +const lockKey = (id: string): string => `${PREFIX}:lock:${id}` test('Member', async (t) => { const sharedRedis = new Redis(REDIS_URL) const memberId = 'member-1' const address = 'http://localhost:3001' - const member = new Member({ redis: REDIS_URL, memberId, address, keyPrefix: PREFIX }) + + let connections = 0 + const member = new Member({ + redis: REDIS_URL, + memberId, + address, + keyPrefix: PREFIX, + getLoad: () => connections + }) t.after(async () => { const stream = sharedRedis.scanStream({ match: `${PREFIX}:*`, count: 100 }) @@ -39,96 +36,100 @@ test('Member', async (t) => { await sharedRedis.quit() }) - await t.test('register adds member to set and sets address with TTL', async () => { + await t.test('register adds member to set and writes hash with TTL', async () => { + connections = 7 await member.register() const isMember = await sharedRedis.sismember(membersKey(), memberId) strictEqual(isMember, 1) - const stored = await sharedRedis.get(memberKey(memberId)) - strictEqual(stored, address) + const fields = await sharedRedis.hmget(memberKey(memberId), 'address', 'load') + deepStrictEqual(fields, [address, '7']) const ttl = await sharedRedis.ttl(memberKey(memberId)) ok(ttl > 0 && ttl <= 30, `TTL should be between 1 and 30, got ${ttl}`) }) - await t.test('register initializes load count to 0 with TTL', async () => { + await t.test('heartbeat updates load and refreshes TTL', async () => { + connections = 7 await member.register() - const count = await sharedRedis.get(loadKey(memberId)) - strictEqual(count, '0') - const ttl = await sharedRedis.ttl(loadKey(memberId)) - ok(ttl > 0 && ttl <= 30, `load TTL should be between 1 and 30, got ${ttl}`) + connections = 42 + await sleep(1100) + const ttlBefore = await sharedRedis.ttl(memberKey(memberId)) + + await member.heartbeat() + + const updated = await sharedRedis.hget(memberKey(memberId), 'load') + strictEqual(updated, '42') + + const ttlAfter = await sharedRedis.ttl(memberKey(memberId)) + ok(ttlAfter >= ttlBefore, `TTL after (${ttlAfter}) >= before (${ttlBefore})`) }) - await t.test('deregister removes member and load key', async () => { + await t.test('deregister removes member from set and deletes hash', async () => { await member.register() await member.deregister() const isMember = await sharedRedis.sismember(membersKey(), memberId) strictEqual(isMember, 0) - const stored = await sharedRedis.get(memberKey(memberId)) - strictEqual(stored, null) - - const count = await sharedRedis.get(loadKey(memberId)) - strictEqual(count, null) + const exists = await sharedRedis.exists(memberKey(memberId)) + strictEqual(exists, 0) }) - await t.test('heartbeat refreshes both TTLs', async () => { - await member.register() - - await sleep(1100) - const addressTtlBefore = await sharedRedis.ttl(memberKey(memberId)) - const loadTtlBefore = await sharedRedis.ttl(loadKey(memberId)) - - await member.heartbeat() - const addressTtlAfter = await sharedRedis.ttl(memberKey(memberId)) - const loadTtlAfter = await sharedRedis.ttl(loadKey(memberId)) - - ok(addressTtlAfter >= addressTtlBefore, `address TTL after (${addressTtlAfter}) >= before (${addressTtlBefore})`) - ok(loadTtlAfter >= loadTtlBefore, `load TTL after (${loadTtlAfter}) >= before (${loadTtlBefore})`) + await t.test('addToDestination SADDs self to the destination set', async () => { + await member.addToDestination('dest-A') + const members = await sharedRedis.smembers(destinationKey('dest-A')) + deepStrictEqual(members.sort(), [memberId]) }) - await t.test('registerInstance sets mapping and increments load', async () => { - await member.register() - const instanceId = 'inst-1' - await member.registerInstance(instanceId) + await t.test('removeFromDestination SREMs self', async () => { + await member.addToDestination('dest-B') + await member.removeFromDestination('dest-B') + const members = await sharedRedis.smembers(destinationKey('dest-B')) + deepStrictEqual(members, []) + }) - const stored = await sharedRedis.get(instanceKey(instanceId)) - strictEqual(stored, memberId) + await t.test('registerLock writes lock record with podId and destinationId', async () => { + await member.registerLock('lock-1', 'dest-X', { isolationLevel: 'serializable' }) + const fields = await sharedRedis.hgetall(lockKey('lock-1')) + strictEqual(fields.podId, memberId) + strictEqual(fields.destinationId, 'dest-X') + strictEqual(fields.isolationLevel, 'serializable') + }) - const count = await sharedRedis.get(loadKey(memberId)) - strictEqual(count, '1') + await t.test('unregisterLock deletes the lock record', async () => { + await member.registerLock('lock-2', 'dest-Y') + await member.unregisterLock('lock-2') + const exists = await sharedRedis.exists(lockKey('lock-2')) + strictEqual(exists, 0) }) - await t.test('deregisterInstance removes mapping and decrements load', async () => { + await t.test('listPeerLoad returns live members with load', async () => { + connections = 5 await member.register() - const instanceId = 'inst-2' - await member.registerInstance(instanceId) - const before = parseInt(await sharedRedis.get(loadKey(memberId)) ?? '0', 10) - - await member.deregisterInstance(instanceId) - const stored = await sharedRedis.get(instanceKey(instanceId)) - strictEqual(stored, null) + // Add a peer manually. + await sharedRedis.sadd(membersKey(), 'peer-1') + await sharedRedis.hset(memberKey('peer-1'), { address: 'http://peer:9000', load: '12' }) + await sharedRedis.expire(memberKey('peer-1'), 30) - const after = parseInt(await sharedRedis.get(loadKey(memberId)) ?? '0', 10) - strictEqual(after, before - 1) - }) - - await t.test('lookupInstance returns address via two-step lookup', async () => { - await member.register() - const instanceId = 'inst-3' - await member.registerInstance(instanceId) + const peers = await member.listPeerLoad() + strictEqual(peers.length, 2) - const resolved = await member.lookupInstance(instanceId) - strictEqual(resolved, address) + const self = peers.find(p => p.memberId === memberId) + const peer = peers.find(p => p.memberId === 'peer-1') + ok(self); ok(peer) + strictEqual(self.load, 5) + strictEqual(peer.load, 12) + strictEqual(peer.address, 'http://peer:9000') }) - await t.test('lookupInstance returns null for unknown instance', async () => { - const resolved = await member.lookupInstance('nonexistent') - strictEqual(resolved, null) + await t.test('listPeerLoad skips members whose hash has expired', async () => { + await sharedRedis.del(memberKey('peer-1')) + const peers = await member.listPeerLoad() + ok(peers.every(p => p.memberId !== 'peer-1')) }) await t.test('custom ttl is respected', async () => { @@ -138,4 +139,12 @@ test('Member', async (t) => { ok(ttl > 0 && ttl <= 5, `custom TTL should be <= 5, got ${ttl}`) await m.close() }) + + await t.test('getLoad defaults to () => 0 when omitted', async () => { + const m = new Member({ redis: REDIS_URL, memberId: 'm-default', address, keyPrefix: PREFIX }) + await m.register() + const v = await sharedRedis.hget(memberKey('m-default'), 'load') + strictEqual(v, '0') + await m.close() + }) }) diff --git a/test/registry.test.ts b/test/registry.test.ts index 75b751e..9dca2b2 100644 --- a/test/registry.test.ts +++ b/test/registry.test.ts @@ -7,30 +7,25 @@ import { REDIS_URL } from './redis-url.ts' const PREFIX = `test-${randomBytes(4).toString('hex')}` -function membersKey (): string { - return `${PREFIX}:members` -} - -function memberKey (memberId: string): string { - return `${PREFIX}:member:${memberId}` -} - -function instanceKey (instanceId: string): string { - return `${PREFIX}:instance:${instanceId}` -} - -function loadKey (memberId: string): string { - return `${PREFIX}:member:${memberId}:instances` +const membersKey = (): string => `${PREFIX}:members` +const memberKey = (id: string): string => `${PREFIX}:member:${id}` +const destinationKey = (id: string): string => `${PREFIX}:destination:${id}` +const lockKey = (id: string): string => `${PREFIX}:lock:${id}` + +async function makeLivePod (redis: Redis, memberId: string, address: string, load = 0): Promise { + await redis.sadd(membersKey(), memberId) + await redis.hset(memberKey(memberId), { address, load: String(load) }) + await redis.expire(memberKey(memberId), 30) } test('Registry', async (t) => { const sharedRedis = new Redis(REDIS_URL) - const registry = new Registry({ redis: REDIS_URL, keyPrefix: PREFIX }) + const registry = new Registry({ redis: REDIS_URL, keyPrefix: PREFIX, cache: false }) - const member1Id = 'member-1' - const member1Address = 'http://localhost:3001' - const member2Id = 'member-2' - const member2Address = 'http://localhost:3002' + const m1 = 'member-1' + const m1Address = 'http://localhost:3001' + const m2 = 'member-2' + const m2Address = 'http://localhost:3002' t.after(async () => { const stream = sharedRedis.scanStream({ match: `${PREFIX}:*`, count: 100 }) @@ -41,211 +36,210 @@ test('Registry', async (t) => { await sharedRedis.quit() }) - await t.test('listMembers returns empty array when no members', async () => { - const members = await registry.listMembers() - deepStrictEqual(members, []) - }) - - await t.test('listMembers returns registered members', async () => { - await sharedRedis.sadd(membersKey(), member1Id) - await sharedRedis.set(memberKey(member1Id), member1Address, 'EX', 30) - await sharedRedis.sadd(membersKey(), member2Id) - await sharedRedis.set(memberKey(member2Id), member2Address, 'EX', 30) - - const members = await registry.listMembers() - strictEqual(members.length, 2) - - const m1 = members.find(m => m.memberId === member1Id) - ok(m1) - strictEqual(m1.address, member1Address) - - const m2 = members.find(m => m.memberId === member2Id) - ok(m2) - strictEqual(m2.address, member2Address) - }) - - await t.test('listMembers skips members with expired keys', async () => { - await sharedRedis.del(memberKey(member2Id)) - - const members = await registry.listMembers() - strictEqual(members.length, 1) - strictEqual(members[0].memberId, member1Id) - - await sharedRedis.set(memberKey(member2Id), member2Address, 'EX', 30) + await t.test('listLiveMembers returns empty when no members registered', async () => { + const live = await registry.listLiveMembers() + deepStrictEqual(live, []) }) - await t.test('lookupInstance returns pod address via two-step lookup', async () => { - const instanceId = 'inst-1' - await sharedRedis.set(instanceKey(instanceId), member1Id) + await t.test('listLiveMembers returns hash-based pods with load', async () => { + await makeLivePod(sharedRedis, m1, m1Address, 3) + await makeLivePod(sharedRedis, m2, m2Address, 7) - const address = await registry.lookupInstance(instanceId) - strictEqual(address, member1Address) - }) + const live = await registry.listLiveMembers() + strictEqual(live.length, 2) - await t.test('lookupInstance returns null for unknown instance', async () => { - const address = await registry.lookupInstance('nonexistent') - strictEqual(address, null) + const a = live.find(m => m.memberId === m1) + const b = live.find(m => m.memberId === m2) + ok(a); ok(b) + strictEqual(a.address, m1Address) + strictEqual(a.load, 3) + strictEqual(b.load, 7) }) - await t.test('registerInstance sets instance mapping', async () => { - const instanceId = 'inst-2' - await registry.registerInstance(instanceId, member2Id) - - const value = await sharedRedis.get(instanceKey(instanceId)) - strictEqual(value, member2Id) + await t.test('listLiveMembers skips members whose hash has expired', async () => { + await sharedRedis.del(memberKey(m2)) + const live = await registry.listLiveMembers() + strictEqual(live.length, 1) + strictEqual(live[0].memberId, m1) + await makeLivePod(sharedRedis, m2, m2Address, 7) }) - await t.test('deregisterInstance removes instance mapping', async () => { - const instanceId = 'inst-2' - await registry.deregisterInstance(instanceId) - - const value = await sharedRedis.get(instanceKey(instanceId)) - strictEqual(value, null) + await t.test('resolveDestination returns null for unknown instance without claimOnMiss', async () => { + strictEqual(await registry.resolveDestination('unknown'), null) }) - await t.test('pickMember round-robins across members', async () => { - const first = await registry.pickMember() - ok(first) - const second = await registry.pickMember() - ok(second) - ok(first.memberId !== second.memberId, 'round-robin should cycle through members') - }) + await t.test('resolveDestination with claimOnMiss SADDs a fresh pod and returns it', async () => { + const result = await registry.resolveDestination('inst-claim', { claimOnMiss: true }) + ok(result) + ok(result.address === m1Address || result.address === m2Address) + strictEqual(result.reassigned, false) - await t.test('lookupInstanceMemberId returns memberId for registered instance', async () => { - const instanceId = 'inst-1' - const memberId = await registry.lookupInstanceMemberId(instanceId) - strictEqual(memberId, member1Id) - }) + const set = await sharedRedis.smembers(destinationKey('inst-claim')) + deepStrictEqual(set, [result.memberId]) - await t.test('lookupInstanceMemberId returns null for unknown instance', async () => { - const memberId = await registry.lookupInstanceMemberId('nonexistent') - strictEqual(memberId, null) + await sharedRedis.del(destinationKey('inst-claim')) }) - await t.test('resolveInstance returns address for live instance', async () => { - const result = await registry.resolveInstance('inst-1') + await t.test('resolveDestination returns address from existing single-pod set', async () => { + await sharedRedis.sadd(destinationKey('inst-existing'), m1) + const result = await registry.resolveDestination('inst-existing') ok(result) - strictEqual(result.address, member1Address) + strictEqual(result.address, m1Address) + strictEqual(result.memberId, m1) strictEqual(result.reassigned, false) + await sharedRedis.del(destinationKey('inst-existing')) }) - await t.test('resolveInstance returns null for completely unknown instance', async () => { - const result = await registry.resolveInstance('nonexistent') + await t.test('resolveDestination returns null when set is non-empty but all pods are dead and reassignOrphans=false', async () => { + await sharedRedis.sadd(destinationKey('inst-orphan'), 'dead-pod') + const result = await registry.resolveDestination('inst-orphan') strictEqual(result, null) + // The dead binding is preserved (caller can choose to clean it up explicitly). + const set = await sharedRedis.smembers(destinationKey('inst-orphan')) + deepStrictEqual(set, ['dead-pod']) + await sharedRedis.del(destinationKey('inst-orphan')) }) - await t.test('resolveInstance returns address: null when pod is dead and reassignOrphans is false', async () => { - const orphanId = 'inst-orphan-no-reassign' - const deadMemberId = 'dead-pod' - await sharedRedis.set(instanceKey(orphanId), deadMemberId) - - const result = await registry.resolveInstance(orphanId) + await t.test('resolveDestination reassigns orphan with reassignOrphans=true', async () => { + await sharedRedis.sadd(destinationKey('inst-reassign'), 'dead-pod') + const result = await registry.resolveDestination('inst-reassign', { reassignOrphans: true }) ok(result) - strictEqual(result.address, null) - strictEqual(result.reassigned, false) - - // Mapping must still exist - const mapping = await sharedRedis.get(instanceKey(orphanId)) - strictEqual(mapping, deadMemberId) + strictEqual(result.reassigned, true) + ok(result.address === m1Address || result.address === m2Address) - await sharedRedis.del(instanceKey(orphanId)) + const set = await sharedRedis.smembers(destinationKey('inst-reassign')) + strictEqual(set.length, 1) + ok(set[0] === m1 || set[0] === m2) + ok(!set.includes('dead-pod')) + await sharedRedis.del(destinationKey('inst-reassign')) }) - await t.test('listMembersWithLoad returns instance counts', async () => { - await sharedRedis.set(loadKey(member1Id), '3', 'EX', 30) - await sharedRedis.set(loadKey(member2Id), '7', 'EX', 30) + await t.test('resolveDestination with multi-pod set picks one live pod, cleans dead members in background', async () => { + await sharedRedis.sadd(destinationKey('inst-multi'), m1, 'dead-pod', m2) + const result = await registry.resolveDestination('inst-multi') + ok(result) + ok(result.memberId === m1 || result.memberId === m2) + strictEqual(result.reassigned, false) - const members = await registry.listMembersWithLoad() - strictEqual(members.length, 2) + // Give the background SREM a moment. + await new Promise(resolve => setTimeout(resolve, 30)) + const set = await sharedRedis.smembers(destinationKey('inst-multi')) + ok(!set.includes('dead-pod'), 'dead member is removed eventually') + await sharedRedis.del(destinationKey('inst-multi')) + }) - const m1 = members.find(m => m.memberId === member1Id) - ok(m1) - strictEqual(m1.instanceCount, 3) + await t.test('resolveDestination returns null when reassignOrphans=true but no live pods', async () => { + await sharedRedis.del(memberKey(m1), memberKey(m2)) + await sharedRedis.sadd(destinationKey('inst-none'), 'dead-pod') - const m2 = members.find(m => m.memberId === member2Id) - ok(m2) - strictEqual(m2.instanceCount, 7) + const result = await registry.resolveDestination('inst-none', { reassignOrphans: true }) + strictEqual(result, null) - await sharedRedis.del(loadKey(member1Id), loadKey(member2Id)) + await sharedRedis.del(destinationKey('inst-none')) + await makeLivePod(sharedRedis, m1, m1Address, 3) + await makeLivePod(sharedRedis, m2, m2Address, 7) }) - await t.test('listMembersWithLoad defaults to 0 for missing count keys', async () => { - const members = await registry.listMembersWithLoad() - for (const member of members) { - strictEqual(member.instanceCount, 0) - } + await t.test('addPodToDestination SADDs and invalidates cache', async () => { + await registry.addPodToDestination('inst-add', m1) + const set = await sharedRedis.smembers(destinationKey('inst-add')) + deepStrictEqual(set, [m1]) + await sharedRedis.del(destinationKey('inst-add')) }) - await t.test('resolveInstance detects orphan and reassigns when reassignOrphans is true', async () => { - const orphanId = 'inst-orphan-reassign' - const deadMemberId = 'dead-pod' - - await sharedRedis.set(instanceKey(orphanId), deadMemberId) - - const result = await registry.resolveInstance(orphanId, { reassignOrphans: true }) - ok(result) - strictEqual(result.reassigned, true) - ok(result.address === member1Address || result.address === member2Address) - - const newMemberId = await sharedRedis.get(instanceKey(orphanId)) - ok(newMemberId === member1Id || newMemberId === member2Id) - - await sharedRedis.del(instanceKey(orphanId)) + await t.test('hasBinding returns true for a non-empty set, false otherwise', async () => { + strictEqual(await registry.hasBinding('inst-empty'), false) + await sharedRedis.sadd(destinationKey('inst-empty'), m1) + strictEqual(await registry.hasBinding('inst-empty'), true) + await sharedRedis.del(destinationKey('inst-empty')) }) - await t.test('resolveInstance returns address: null when reassignOrphans is true but no live pods', async () => { - const orphanId = 'inst-orphan-no-pods' - const deadMemberId = 'dead-pod' + await t.test('deregisterDestination DELs the destination set', async () => { + await sharedRedis.sadd(destinationKey('inst-del'), m1, m2) + await registry.deregisterDestination('inst-del') + const exists = await sharedRedis.exists(destinationKey('inst-del')) + strictEqual(exists, 0) + }) - // Temporarily remove all live pods - await sharedRedis.del(memberKey(member1Id), memberKey(member2Id)) - await sharedRedis.set(instanceKey(orphanId), deadMemberId) + await t.test('resolveLock returns null for unknown lockId', async () => { + strictEqual(await registry.resolveLock('missing'), null) + }) - const result = await registry.resolveInstance(orphanId, { reassignOrphans: true }) + await t.test('resolveLock returns the owning pod address', async () => { + await sharedRedis.hset(lockKey('lock-x'), { podId: m1, destinationId: 'dest-1' }) + const result = await registry.resolveLock('lock-x') ok(result) - strictEqual(result.address, null) - strictEqual(result.reassigned, false) + strictEqual(result.memberId, m1) + strictEqual(result.address, m1Address) + await sharedRedis.del(lockKey('lock-x')) + }) - await sharedRedis.del(instanceKey(orphanId)) - await sharedRedis.set(memberKey(member1Id), member1Address, 'EX', 30) - await sharedRedis.set(memberKey(member2Id), member2Address, 'EX', 30) + await t.test('resolveLock returns null when the owning pod is dead', async () => { + await sharedRedis.hset(lockKey('lock-dead'), { podId: 'dead-pod', destinationId: 'dest-1' }) + strictEqual(await registry.resolveLock('lock-dead'), null) + await sharedRedis.del(lockKey('lock-dead')) }) - await t.test('pickMember returns null when no members available', async () => { - const isolated = `${PREFIX}-empty-${randomBytes(2).toString('hex')}` - const empty = new Registry({ redis: REDIS_URL, keyPrefix: isolated }) - try { - const result = await empty.pickMember() - strictEqual(result, null) - } finally { - await empty.close() - } + await t.test('pickMember round-robins across live pods', async () => { + const first = await registry.pickMember({ destinationId: 'pick-test' }) + const second = await registry.pickMember({ destinationId: 'pick-test' }) + ok(first); ok(second) + ok(first.memberId !== second.memberId, 'round-robin should cycle') }) await t.test('keyPrefix isolates two registries pointed at the same Redis', async () => { const prefixA = `${PREFIX}-isoA-${randomBytes(2).toString('hex')}` const prefixB = `${PREFIX}-isoB-${randomBytes(2).toString('hex')}` - const a = new Registry({ redis: REDIS_URL, keyPrefix: prefixA }) - const b = new Registry({ redis: REDIS_URL, keyPrefix: prefixB }) + const a = new Registry({ redis: REDIS_URL, keyPrefix: prefixA, cache: false }) + const b = new Registry({ redis: REDIS_URL, keyPrefix: prefixB, cache: false }) try { await sharedRedis.sadd(`${prefixA}:members`, 'pod-x') - await sharedRedis.set(`${prefixA}:member:pod-x`, 'http://x', 'EX', 30) + await sharedRedis.hset(`${prefixA}:member:pod-x`, { address: 'http://x', load: '0' }) + await sharedRedis.expire(`${prefixA}:member:pod-x`, 30) - const aMembers = await a.listMembers() - const bMembers = await b.listMembers() - - strictEqual(aMembers.length, 1) - strictEqual(bMembers.length, 0) + strictEqual((await a.listLiveMembers()).length, 1) + strictEqual((await b.listLiveMembers()).length, 0) } finally { await a.close() await b.close() } }) + await t.test('cache: resolveDestination hits the cache on the second call', async () => { + const cached = new Registry({ redis: REDIS_URL, keyPrefix: PREFIX, cache: { ttl: 60_000 } }) + try { + await sharedRedis.sadd(destinationKey('inst-cache'), m1) + const first = await cached.resolveDestination('inst-cache') + ok(first) + + // Mutate Valkey behind the cache; we should still see the cached value. + await sharedRedis.del(destinationKey('inst-cache')) + const second = await cached.resolveDestination('inst-cache') + ok(second, 'cache returned the previously-resolved value') + strictEqual(second.address, first.address) + } finally { + await cached.close() + } + }) + + await t.test('cache is invalidated by deregisterDestination', async () => { + const cached = new Registry({ redis: REDIS_URL, keyPrefix: PREFIX, cache: { ttl: 60_000 } }) + try { + await sharedRedis.sadd(destinationKey('inst-inv'), m1) + const first = await cached.resolveDestination('inst-inv') + ok(first) + + await cached.deregisterDestination('inst-inv') + const second = await cached.resolveDestination('inst-inv') + strictEqual(second, null, 'cache was invalidated, lookup re-reads Valkey') + } finally { + await cached.close() + } + }) + await t.test('close quits the registry-owned Redis connection', async () => { - const owned = new Registry({ redis: REDIS_URL }) + const owned = new Registry({ redis: REDIS_URL, cache: false }) await owned.close() - // Should not hang or throw. }) }) diff --git a/test/strategies.test.ts b/test/strategies.test.ts index 98d907c..287520e 100644 --- a/test/strategies.test.ts +++ b/test/strategies.test.ts @@ -1,26 +1,24 @@ import { strictEqual, ok } from 'node:assert' import test from 'node:test' import { RoundRobinStrategy, LeastLoadedStrategy, RandomStrategy, createStrategy } from '../src/strategies.ts' -import type { MemberWithLoad } from '../src/strategies.ts' +import type { MemberInfo } from '../src/strategies.ts' -const members: MemberWithLoad[] = [ - { memberId: 'pod-1', address: 'http://localhost:3001', instanceCount: 5 }, - { memberId: 'pod-2', address: 'http://localhost:3002', instanceCount: 2 }, - { memberId: 'pod-3', address: 'http://localhost:3003', instanceCount: 8 } +const members: MemberInfo[] = [ + { memberId: 'pod-1', address: 'http://localhost:3001', load: 5 }, + { memberId: 'pod-2', address: 'http://localhost:3002', load: 2 }, + { memberId: 'pod-3', address: 'http://localhost:3003', load: 8 } ] -test('RoundRobinStrategy - cycles through members', () => { - const strategy = new RoundRobinStrategy() - const first = strategy.pick(members) - const second = strategy.pick(members) - const third = strategy.pick(members) - const fourth = strategy.pick(members) +const ctx = { destinationId: 'inst-x' } - ok(first) - ok(second) - ok(third) - ok(fourth) +test('RoundRobinStrategy - cycles through candidates', () => { + const strategy = new RoundRobinStrategy() + const first = strategy.pick(members, ctx) + const second = strategy.pick(members, ctx) + const third = strategy.pick(members, ctx) + const fourth = strategy.pick(members, ctx) + ok(first); ok(second); ok(third); ok(fourth) strictEqual(first.memberId, 'pod-1') strictEqual(second.memberId, 'pod-2') strictEqual(third.memberId, 'pod-3') @@ -28,58 +26,63 @@ test('RoundRobinStrategy - cycles through members', () => { }) test('RoundRobinStrategy - returns null for empty list', () => { - const strategy = new RoundRobinStrategy() - strictEqual(strategy.pick([]), null) + strictEqual(new RoundRobinStrategy().pick([], ctx), null) }) -test('LeastLoadedStrategy - picks member with fewest instances', () => { - const strategy = new LeastLoadedStrategy() - const picked = strategy.pick(members) +test('LeastLoadedStrategy - picks candidate with fewest total connections', () => { + const picked = new LeastLoadedStrategy().pick(members, ctx) ok(picked) strictEqual(picked.memberId, 'pod-2') - strictEqual(picked.instanceCount, 2) + strictEqual(picked.load, 2) }) test('LeastLoadedStrategy - breaks ties with round-robin', () => { const strategy = new LeastLoadedStrategy() - const tiedMembers: MemberWithLoad[] = [ - { memberId: 'pod-a', address: 'http://a', instanceCount: 3 }, - { memberId: 'pod-b', address: 'http://b', instanceCount: 3 }, - { memberId: 'pod-c', address: 'http://c', instanceCount: 5 } + const tied: MemberInfo[] = [ + { memberId: 'pod-a', address: 'http://a', load: 3 }, + { memberId: 'pod-b', address: 'http://b', load: 3 }, + { memberId: 'pod-c', address: 'http://c', load: 5 } ] - const first = strategy.pick(tiedMembers) - const second = strategy.pick(tiedMembers) - const third = strategy.pick(tiedMembers) - - ok(first) - ok(second) - ok(third) + const first = strategy.pick(tied, ctx) + const second = strategy.pick(tied, ctx) + const third = strategy.pick(tied, ctx) + ok(first); ok(second); ok(third) strictEqual(first.memberId, 'pod-a') strictEqual(second.memberId, 'pod-b') strictEqual(third.memberId, 'pod-a') }) test('LeastLoadedStrategy - returns null for empty list', () => { - const strategy = new LeastLoadedStrategy() - strictEqual(strategy.pick([]), null) + strictEqual(new LeastLoadedStrategy().pick([], ctx), null) }) -test('RandomStrategy - returns a member from the list', () => { +test('RandomStrategy - returns a candidate from the list', () => { const strategy = new RandomStrategy() const ids = new Set() for (let i = 0; i < 50; i++) { - const picked = strategy.pick(members) + const picked = strategy.pick(members, ctx) ok(picked) ids.add(picked.memberId) } - ok(ids.size > 1, 'random should pick different members') + ok(ids.size > 1, 'random should pick different candidates') }) test('RandomStrategy - returns null for empty list', () => { - const strategy = new RandomStrategy() - strictEqual(strategy.pick([]), null) + strictEqual(new RandomStrategy().pick([], ctx), null) +}) + +test('Custom strategy receives ctx with destinationId', () => { + let seenDestinationId: string | undefined + const strategy = { + pick (candidates: MemberInfo[], pickCtx: { destinationId?: string }) { + seenDestinationId = pickCtx.destinationId + return candidates[0] ?? null + } + } + strategy.pick(members, { destinationId: 'tenant-42' }) + strictEqual(seenDestinationId, 'tenant-42') }) test('createStrategy - returns correct strategy types', () => {