Skip to content

Commit 85aa73b

Browse files
committed
fix(core): disconnect ws when user logout (#8188)
1 parent d93c3b3 commit 85aa73b

File tree

12 files changed

+96
-50
lines changed

12 files changed

+96
-50
lines changed

packages/common/infra/src/modules/workspace/entities/engine.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,7 @@ export class WorkspaceEngine extends Entity<{
7070

7171
override dispose(): void {
7272
this.forceStop();
73+
this.doc.dispose();
74+
this.awareness.dispose();
7375
}
7476
}

packages/common/infra/src/modules/workspace/services/engine.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,10 @@ export class WorkspaceEngineService extends Service {
1616
constructor(private readonly workspaceScope: WorkspaceScope) {
1717
super();
1818
}
19+
20+
override dispose(): void {
21+
this._engine?.dispose();
22+
this._engine = null;
23+
super.dispose();
24+
}
1925
}

packages/common/infra/src/sync/awareness.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { Awareness } from 'y-protocols/awareness.js';
33
export interface AwarenessConnection {
44
connect(awareness: Awareness): void;
55
disconnect(): void;
6+
dispose?(): void;
67
}
78

89
export class AwarenessEngine {
@@ -15,4 +16,8 @@ export class AwarenessEngine {
1516
disconnect() {
1617
this.connections.forEach(connection => connection.disconnect());
1718
}
19+
20+
dispose() {
21+
this.connections.forEach(connection => connection.dispose?.());
22+
}
1823
}

packages/common/infra/src/sync/doc/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,4 +219,9 @@ export class DocEngine {
219219
});
220220
});
221221
}
222+
223+
dispose() {
224+
this.stop();
225+
this.server?.dispose?.();
226+
}
222227
}

packages/common/infra/src/sync/doc/server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@ export interface DocServer {
2323
waitForConnectingServer(signal: AbortSignal): Promise<void>;
2424
disconnectServer(): void;
2525
onInterrupted(cb: (reason: string) => void): void;
26+
27+
dispose?(): void;
2628
}

packages/frontend/core/src/modules/cloud/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ export function configureCloudModule(framework: Framework) {
5959
framework
6060
.service(FetchService)
6161
.service(GraphQLService, [FetchService])
62-
.service(WebSocketService)
62+
.service(WebSocketService, [AuthService])
6363
.service(ServerConfigService)
6464
.entity(ServerConfig, [ServerConfigStore])
6565
.store(ServerConfigStore, [GraphQLService])
Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,47 @@
1-
import { OnEvent, Service } from '@toeverything/infra';
2-
import type { Socket } from 'socket.io-client';
1+
import { ApplicationStarted, OnEvent, Service } from '@toeverything/infra';
32
import { Manager } from 'socket.io-client';
43

54
import { getAffineCloudBaseUrl } from '../services/fetch';
5+
import type { AuthService } from './auth';
66
import { AccountChanged } from './auth';
77

8-
@OnEvent(AccountChanged, e => e.reconnect)
8+
@OnEvent(AccountChanged, e => e.update)
9+
@OnEvent(ApplicationStarted, e => e.update)
910
export class WebSocketService extends Service {
1011
ioManager: Manager = new Manager(`${getAffineCloudBaseUrl()}/`, {
1112
autoConnect: false,
1213
transports: ['websocket'],
1314
secure: location.protocol === 'https:',
1415
});
15-
sockets: Set<Socket> = new Set();
16+
socket = this.ioManager.socket('/');
17+
refCount = 0;
1618

17-
constructor() {
19+
constructor(private readonly authService: AuthService) {
1820
super();
1921
}
2022

21-
newSocket(): Socket {
22-
const socket = this.ioManager.socket('/');
23-
this.sockets.add(socket);
24-
25-
return socket;
23+
/**
24+
* Connect socket, with automatic connect and reconnect logic.
25+
* External code should not call `socket.connect()` or `socket.disconnect()` manually.
26+
* When socket is no longer needed, call `dispose()` to clean up resources.
27+
*/
28+
connect() {
29+
this.refCount++;
30+
this.update();
31+
return {
32+
socket: this.socket,
33+
dispose: () => {
34+
this.refCount--;
35+
this.update();
36+
},
37+
};
2638
}
2739

28-
reconnect(): void {
29-
for (const socket of this.sockets) {
30-
socket.disconnect();
31-
}
32-
33-
for (const socket of this.sockets) {
34-
socket.connect();
40+
update(): void {
41+
if (this.authService.session.account$.value && this.refCount > 0) {
42+
this.socket.connect();
43+
} else {
44+
this.socket.disconnect();
3545
}
3646
}
3747
}

packages/frontend/core/src/modules/userspace/entities/user-db-engine.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@ export class UserDBEngine extends Entity<{
88
userId: string;
99
}> {
1010
private readonly userId = this.props.userId;
11-
private readonly socket = this.websocketService.newSocket();
1211
readonly docEngine = new DocEngine(
1312
this.userspaceStorageProvider.getDocStorage('affine-cloud:' + this.userId),
14-
new UserDBDocServer(this.userId, this.socket)
13+
new UserDBDocServer(this.userId, this.websocketService)
1514
);
1615

1716
canGracefulStop() {
@@ -29,6 +28,5 @@ export class UserDBEngine extends Entity<{
2928

3029
override dispose() {
3130
this.docEngine.stop();
32-
this.socket.close();
3331
}
3432
}

packages/frontend/core/src/modules/userspace/impls/user-db-doc-server.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
import { type DocServer, throwIfAborted } from '@toeverything/infra';
88
import type { Socket } from 'socket.io-client';
99

10+
import type { WebSocketService } from '../../cloud';
1011
import {
1112
base64ToUint8Array,
1213
uint8ArrayToBase64,
@@ -19,10 +20,17 @@ export class UserDBDocServer implements DocServer {
1920
interruptCb: ((reason: string) => void) | null = null;
2021
SEND_TIMEOUT = 30000;
2122

23+
socket: Socket;
24+
disposeSocket: () => void;
25+
2226
constructor(
2327
private readonly userId: string,
24-
private readonly socket: Socket
25-
) {}
28+
webSocketService: WebSocketService
29+
) {
30+
const { socket, dispose } = webSocketService.connect();
31+
this.socket = socket;
32+
this.disposeSocket = dispose;
33+
}
2634

2735
private async clientHandShake() {
2836
await this.socket.emitWithAck('space:join', {
@@ -154,7 +162,6 @@ export class UserDBDocServer implements DocServer {
154162
if (this.socket.connected) {
155163
await this.clientHandShake();
156164
} else {
157-
this.socket.connect();
158165
await new Promise<void>((resolve, reject) => {
159166
this.socket.on('connect', () => {
160167
resolve();
@@ -168,17 +175,12 @@ export class UserDBDocServer implements DocServer {
168175
}
169176
}
170177
disconnectServer(): void {
171-
if (!this.socket) {
172-
return;
173-
}
174-
175178
this.socket.emit('space:leave', {
176179
spaceType: 'userspace',
177180
spaceId: this.userId,
178181
});
179182
this.socket.off('server-version-rejected', this.handleVersionRejected);
180183
this.socket.off('disconnect', this.handleDisconnect);
181-
this.socket.disconnect();
182184
}
183185
onInterrupted = (cb: (reason: string) => void) => {
184186
this.interruptCb = cb;
@@ -192,4 +194,9 @@ export class UserDBDocServer implements DocServer {
192194
handleVersionRejected = () => {
193195
this.interruptCb?.('Client version rejected');
194196
};
197+
198+
dispose(): void {
199+
this.disconnectServer();
200+
this.disposeSocket();
201+
}
195202
}

packages/frontend/core/src/modules/workspace-engine/impls/cloud.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -243,17 +243,11 @@ export class CloudWorkspaceFlavourProviderService
243243
getAwarenessConnections: () => {
244244
return [
245245
new BroadcastChannelAwarenessConnection(workspaceId),
246-
new CloudAwarenessConnection(
247-
workspaceId,
248-
this.webSocketService.newSocket()
249-
),
246+
new CloudAwarenessConnection(workspaceId, this.webSocketService),
250247
];
251248
},
252249
getDocServer: () => {
253-
return new CloudDocEngineServer(
254-
workspaceId,
255-
this.webSocketService.newSocket()
256-
);
250+
return new CloudDocEngineServer(workspaceId, this.webSocketService);
257251
},
258252
getDocStorage: () => {
259253
return this.storageProvider.getDocStorage(workspaceId);

0 commit comments

Comments
 (0)