Skip to content

Commit

Permalink
feat: check remove connection awareness
Browse files Browse the repository at this point in the history
  • Loading branch information
naporin0624 committed Apr 25, 2024
1 parent 3fa75b9 commit 6c4f5af
Showing 1 changed file with 32 additions and 4 deletions.
36 changes: 32 additions & 4 deletions src/yjs/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/* eslint-disable no-console */
import { Hono } from "hono";
import { removeAwarenessStates } from "y-protocols/awareness";
import { applyUpdate, encodeStateAsUpdate } from "yjs";

import { upgrade } from "../middleware";
Expand All @@ -7,6 +9,7 @@ import { WSSharedDoc } from "../yjs/remote";
import { setupWSConnection } from "./client/setup";
import { YTransactionStorageImpl } from "./storage";

import type { AwarenessChanges } from "../yjs/remote";
import type { Env } from "hono";

export class YDurableObjects<T extends Env> implements DurableObject {
Expand All @@ -21,6 +24,7 @@ export class YDurableObjects<T extends Env> implements DurableObject {
transaction: (closure) => this.state.storage.transaction(closure),
});
private sessions = new Map<WebSocket, () => void>();
private awarenessClients = new Set<number>();

constructor(
private readonly state: DurableObjectState,
Expand All @@ -29,16 +33,37 @@ export class YDurableObjects<T extends Env> implements DurableObject {
void this.state.blockConcurrencyWhile(async () => {
const doc = await this.storage.getYDoc();
applyUpdate(this.doc, encodeStateAsUpdate(doc));

this.doc.on("update", async (update) => {
await this.storage.storeUpdate(update);
});
const clients = await this.state.storage.get<Set<number>>(
"ydoc:awareness_clients",
);
this.awarenessClients = clients ?? new Set<number>();
console.log("set awareness clients", this.awarenessClients.size);

for (const ws of this.state.getWebSockets()) {
this.connect(ws);
}
});

this.doc.on("update", async (update) => {
await this.storage.storeUpdate(update);
});
this.doc.awareness.on(
"update",
async ({ added, removed }: AwarenessChanges) => {
for (const client of added) {
this.awarenessClients.add(client);
}
for (const client of removed) {
this.awarenessClients.delete(client);
}
console.log("awareness update", this.awarenessClients);
await this.state.storage.put(
"ydoc:awareness_clients",
this.awarenessClients,
);
},
);

this.app.get("/", upgrade(), async () => {
const pair = new WebSocketPair();
const client = pair[0];
Expand Down Expand Up @@ -88,6 +113,9 @@ export class YDurableObjects<T extends Env> implements DurableObject {
const dispose = this.sessions.get(ws);
dispose?.();
this.sessions.delete(ws);
const clientIds = this.awarenessClients;
console.log("disconnect", this.awarenessClients);
removeAwarenessStates(this.doc.awareness, Array.from(clientIds), null);
} catch (e) {
// eslint-disable-next-line no-console
console.error(e);
Expand Down

0 comments on commit 6c4f5af

Please sign in to comment.