Durable Object Groups
- Supports
Replica
workloads using the HTTP and/or WS protocols - Creates or reuses a
Replica
based on configured connection limit - Includes
Replica
-to-Replica
(peer-to-peer) communication - Ready for strongly-typed, strict TypeScript usage
- Allows an active connection to:
broadcast
messages to the entire clusteremit
messages toReplica
-owned connections- send a
whisper
a single connection within the cluster
With DOG, it's easy to setup named clusters of related Durable Objects. Each cluster is controlled by a Group
, which directs an incoming Request
to a specific Replica
instance. A Group
adheres to the user-defined limit of active connections per Replica
and, in doing so, will reuse existing or create new Replica
instances as necessary.
DOG includes convenience methods that allow a Replica
to directly communicate with another Replica
belonging to the same Group
– effectively a peer-to-peer/gossip network. Additionally, when dealing with active client connections, a Replica
class allows you to:
broadcast
a message to all active connections within the entire clusteremit
a message only to active connections owned by theReplica
itselfwhisper
a message to a single, targeted connection (via your own identification system); even if it's owned by anotherReplica
instance!
Group
and Replica
are both abstract classes, which means that you're allowed — and required — to extend them with your own application needs. You may define your own class methods, add your own state properties, or use Durable Storage to fulfill your needs.
Please see Usage, the API docs, and the example application for further information!
npm install @soketi/dog
Refer to the
/example
for a complete Chat Room application.
import { identify, Group, Replica } from '@soketi/dog';
// deployed as `POOL` binding
export class Pool extends Group {
limit = 50; // each Replica handles 50 connections max
link(env: Bindings) {
return {
child: env.TASK, // receiving Replica
self: env.POOL, // self-identifier
};
}
}
// deployed as `TASK` binding
export class Task extends Replica {
link(env) {
return {
parent: env.POOL, // parent Group
self: env.TASK, // self-identifier
};
}
async onmessage(socket, data) {
let message = JSON.parse(data);
console.log('[task] onmessage', message);
if (message.type === 'crawl:url') {
let { url } = message;
// ...
let output = { url, done: true };
// alert everyone that the task is complete
return socket.broadcast(JSON.stringify(output), true);
}
// other events
}
receive(req) {
// Receive & handle the request
// NOTE: This is the original, forwarded request
let { pathname } = new URL(req.url);
// Rely on internal util for WebSocket upgrade
if (pathname === '/ws') return this.connect(req);
// Any other custom routing behavior(s)
if (pathname === '/') return new Response('OK');
return toError('Unknown path', 404);
}
}
function toError(msg, status) {
return new Response(msg, { status });
}
// Module Worker
export default {
fetch(req, env, ctx) {
// Accept: /tasks/<taskname>
let match = /[/]tasks[/]([^/]+)[/]?/.exec(req.url);
if (match == null) return toError('Missing task name', 404);
let taskname = match[1].trim();
if (taskname.length < 1) return toError('Invalid task name', 400);
// Generate Durable Object ID from taskname
let group = env.POOL.idFromName(taskname);
// Custom request identifier logic
let reqid = req.headers.get('x-request-id');
// Identify the `Replica` stub to use
let replica = await identify(group, reqid, {
parent: env.POOL,
child: env.TASK,
});
// (Optional) Save reqid -> replica.id
// await KV.put(`req::${reqid}`, replica.id.toString());
// Send request to the Replica instance
return replica.fetch(req);
}
}
Note: Refer to the TypeScript definitions for more information.
The utility function to identify a Replica
to be used and, if necessary, will create a new Replica
if none are available. Returns the Replica
stub directly.
Note: Refer to the TypeScript definitions for more information.
Required:
limit: number
– the maximum number of active connections aReplica
can handlelink(env: Bindings): { self, child }
– define the relationships between thisGroup
and itsReplica
child class
A Group
is initial coordinator for the cluster. It receives a user-supplied request identifier, ReqID
, and replies with the Durable Object ID for the Replica
instance to be used. If the ReqID
has been seen before, the Group will attempt to target the same Replica that the ReqID
was previously connected to. If the ReqID
is unknown, the Group will send the request to the least-utilized Replica
instance or generate a new Replica
ID to be used.
When targeting an existing Replica
instance, the Group verifies that the Replica
actually has availability for the request, as determined by the user-supplied limit
value. If a new Replica instance needs to be created, the Group's clusterize()
method is called to generate a new Replica
instance identifier. You may override this method with your own logic – for example, including a jurisdiction – but by default, the Group calls newUniqueId()
for a system-guaranteed identifier.
The number of active connections within each Replica
instance is automatically tracked and shared between the Replica
and its Group
parent. The Replica
's count is decremented when the connection is closed. This means that when a Replica
works with WebSockets, open connections continue to reserve Replica
quota until closed. Non-upgraded HTTP connections close and decrement the Replica
count as soon as a Response
is returned.
Important: Do not define your own
fetch()
method! Doing so requires thatsuper.fetch()
be called appropriately, otherwise the entire cluster's inter-communication will fail.
You may attach any additional state and/or methods to your Group
class extension.
Note: Refer to the TypeScript definitions for more information.
Required:
link(env: Bindings): { self, child }
– define the relationships between thisReplica
and itsGroup
parent classreceive(req: Request): Promise<Response> | Response
– a user-supplied method to handle an incoming Request
A Replica
is the cluster's terminating node. In other words, it's your workhorse and is where the bulk of your application logic will reside. By default, a Replica
actually does nothing and requires your user-supplied code to become useful. It does, however, provide you with utilities, lifecycle hooks, and event listeners to organize and structure your logic.
A Replica
can only receive a Request
from its parent Group
or from its Replica
siblings/peers. Because of this, you cannot define a fetch()
method in your Replica
class extension, otherwise all internal routing and inter-communication will break.
However, this does not mean that you cannot deploy your own external-facing routing solution!
If an incoming request to a Replica
is not an internal DOG event, the request is passed to your receive
method, which receives the original Request
without any modifications. This means that the execution order for a client request looks like this:
client request
└──> dog.identify(...)
│ ├──> Group#fetch (internal)
│ └──> Group#clusterize (optional)
└──> Replica
└──> Replica.fetch (user)
└──> Replica#receive
Your receive
method is the final handler and decides what the Replica
actually does.
If you'd like to remain in the HTTP protocol, then you can treat receive()
as if it were the underyling fetch()
method. Otherwise, to upgrade the HTTP connection into a WebSocket connection, then you may reach for the Replica.connect()
method, which handles the upgrade and unlocks the rest of the Replica
abstractions.
Internally, a Socket
interface is instantiated and passed to WebSocket event listeners that you chose to define. For example, to handle incoming messages or to react to a new connection, your Replica
class may including the following:
import { Replica } from '@soketi/dog';
export class Counter extends Replica {
#counts = new Map<string, number>;
onopen(socket) {
// via dog.identify
// ~> your own ReqID
let reqid = socket.uid;
this.#counts.set(reqid, 0);
// notify others ONLY in Replica
socket.emit(`"${reqid}" has joined`);
}
onmessage(socket, data) {
let reqid = socket.uid;
let current = this.#counts.get(reqid);
// data is always a string
let msg = JSON.parse(data);
if (msg.type === '+1') count++;
else if (msg.type === '-1') count--;
else return; // unknown msg type
this.#counts.set(reqid, count);
// tell EVERYONE in cluster about new count
socket.broadcast(`"${reqid}" now has ${count}`);
}
receive(req) {
// Only accept "/ws" pathname
let isWS = /^[/]ws[/]?/.test(req.url);
// Handle upgrade, return 101 status
if (isWS) return this.connect(req);
// Otherwise return a 404
return new Response('Invalid', { status: 404 });
}
}
The Replica
class allows you to optionally define event listeners for the underlying WebSocket events. Whether or not you define onclose
and/or onerror
listeners, the Replica
will always notify the Group
parent when the WebSocket connection is closed. The event listeners may be asynchronous and their names follow the browser's WebSocket
event names:
onopen
– theReplica
established aWebSocket
connectiononmessage
– theReplica
received a message from theWebSocket
connectiononerror
– theWebSocket
connection terminated due to an erroronclose
– theWebSocket
connection was closed
Note: If defined, the
onclose
listener will be called in the absence of anonerror
listener.
Finally, a Replica
may communicate directly with its Replica
peers in the cluster. This does not rely on WebSockets nor does it require you to use them! It can, however, be leveraged at any point during your HTTP and/or WebSocket handlers.
In DOG, this peer-to-peer communication is called gossip – because Replica
s are typically talking about their connections but without involving the connections; AKA, behind their backs!
In order for a Replica
to hear gossip, it must define an ongossip
method handler. It will receive a decoded JSON object and must return a new JSON object so that DOG can serialize it and deliver it to sender. In practice, this internal communication is happening over HTTP which means that each Gossip.Message
must represent point-in-time information.
Returning to the Counter
example, suppose the Counter
objects needs to coordinate with one another to determine a leaderboard. Refreshing this leaderboard could be done through a new refresh:leaderboard
message, for example:
import { Replica } from '@soketi/dog';
export class Counter extends Replica {
#counts = new Map<string, number>;
#lastupdate = 0; // timestamp
#leaders = []; // Array<string, number>[]
// NOTE: now `async` method
async onmessage(socket, data) {
let reqid = socket.uid;
let current = this.#counts.get(reqid);
// data is always a string
let msg = JSON.parse(data);
// ...
if (msg.type === 'refresh:leaderboard') {
// Only gossip if cache is older than 60s
if (Date.now() - this.#lastupdate > 60e3) {
// `ongossip` returns Array<[string,number][]>
let results = await this.gossip({ type: 'ask:scores' });
let leaders = results.flat(1); // [ [reqid,count], [reqid,count], ... ]
// sort by highest scores, keep top 10 only
this.#scores = leaders.sort((a, b) => b[1] - a[1]).slice(0, 10);
this.#lastupdate = Date.now();
}
// Tell EVERYONE in cluster
return socket.broadcast({
leaders: this.#scores,
timestamp: this.#lastupdate,
});
}
}
ongossip(msg) {
// Return array of tuples: Array<[string, number]>
if (msg.type === 'ask:scores') return [...this.#counts];
throw new Error(`Missing "${msg.type}" handler in ongossip`);
}
// ...
}
MIT © Cloudflare